source: branches/encryption/libtransmission/peer-mgr.c @ 2988

Last change on this file since 2988 was 2988, checked in by charles, 15 years ago

move the torrent recheck into its own thread. we're getting close to getting rid of all the torrent threads =)

  • Property svn:keywords set to Date Rev Author Id
File size: 24.4 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 2988 2007-09-09 00:10:52Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17
18#include "transmission.h"
19#include "handshake.h"
20#include "completion.h"
21#include "net.h"
22#include "peer-io.h"
23#include "peer-mgr.h"
24#include "peer-mgr-private.h"
25#include "peer-msgs.h"
26#include "ptrarray.h"
27#include "timer.h"
28#include "utils.h"
29
30#define MINUTES_TO_MSEC(N) ((N) * 60 * 1000)
31
32/* how frequently to change which peers are choked */
33#define RECHOKE_PERIOD_SECONDS (MINUTES_TO_MSEC(10))
34
35/* how many downloaders to unchoke per-torrent.
36 * http://wiki.theory.org/BitTorrentSpecification#Choking_and_Optimistic_Unchoking */
37#define NUM_DOWNLOADERS_TO_UNCHOKE 4
38
39/* across all torrents, how many peers maximum do we want connected? */
40#define MAX_CONNECTED_PEERS 80
41
42/**
43***
44**/
45
46struct tr_block
47{
48    unsigned int have          : 1;
49    unsigned int dnd           : 1;
50    unsigned int low_priority  : 1;
51    unsigned int high_priority : 1;
52    uint8_t requestCount;
53    uint8_t scarcity;
54    uint32_t block;
55};
56
57#define MAX_SCARCITY UINT8_MAX
58#define MAX_REQ_COUNT UINT8_MAX
59
60static void
61incrementReqCount( struct tr_block * block )
62{
63    assert( block != NULL );
64
65    if( block->requestCount < MAX_REQ_COUNT )
66        block->requestCount++;
67}
68
69static void
70incrementScarcity( struct tr_block * block )
71{
72    assert( block != NULL );
73
74    if( block->scarcity < MAX_SCARCITY )
75        block->scarcity++;
76}
77
78static int
79compareBlockByIndex( const void * va, const void * vb )
80{
81    const struct tr_block * a = (const struct tr_block *) va;
82    const struct tr_block * b = (const struct tr_block *) vb;
83    return tr_compareUint32( a->block, b->block );
84}
85
86static int
87compareBlockByInterest( const void * va, const void * vb )
88{
89    const struct tr_block * a = (const struct tr_block *) va;
90    const struct tr_block * b = (const struct tr_block *) vb;
91    int i;
92
93    if( a->dnd != b->dnd )
94        return a->dnd ? 1 : -1;
95
96    if( a->have != b->have )
97        return a->have ? 1 : -1;
98
99    if(( i = tr_compareUint8( a->requestCount, b->requestCount )))
100        return i;
101
102    if( a->high_priority != b->high_priority )
103        return a->high_priority ? -1 : 1;
104
105    if( a->low_priority != b->low_priority )
106        return a->low_priority ? 1 : -1;
107
108    if(( i = tr_compareUint16( a->scarcity, b->scarcity )))
109        return i;
110
111    if(( i = tr_compareUint32( a->block, b->block )))
112        return i;
113
114    return 0;
115}
116
117/**
118***
119**/
120
121typedef struct
122{
123    uint8_t hash[SHA_DIGEST_LENGTH];
124    tr_ptrArray * peers; /* tr_peer */
125    tr_timer_tag choke_tag;
126    tr_timer_tag refill_tag;
127    tr_torrent * tor;
128
129    struct tr_block * blocks;
130    uint32_t blockCount;
131
132    struct tr_peerMgr * manager;
133}
134Torrent;
135
136struct tr_peerMgr
137{
138    tr_handle * handle;
139    tr_ptrArray * torrents; /* Torrent */
140    int connectionCount;
141};
142
143/**
144***
145**/
146
147static int
148torrentCompare( const void * va, const void * vb )
149{
150    const Torrent * a = (const Torrent*) va;
151    const Torrent * b = (const Torrent*) vb;
152    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
153}
154
155static int
156torrentCompareToHash( const void * va, const void * vb )
157{
158    const Torrent * a = (const Torrent*) va;
159    const uint8_t * b_hash = (const uint8_t*) vb;
160    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
161}
162
163static Torrent*
164getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
165{
166    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
167                                             hash,
168                                             torrentCompareToHash );
169}
170
171static int chokePulse( void * vtorrent );
172
173static int
174peerCompare( const void * va, const void * vb )
175{
176    const tr_peer * a = (const tr_peer *) va;
177    const tr_peer * b = (const tr_peer *) vb;
178    return memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
179}
180
181static int
182peerCompareToAddr( const void * va, const void * vb )
183{
184    const tr_peer * a = (const tr_peer *) va;
185    const struct in_addr * b = (const struct in_addr *) vb;
186    return memcmp( &a->in_addr, b, sizeof(struct in_addr) );
187}
188
189static tr_peer*
190getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
191{
192    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
193                                             in_addr,
194                                             peerCompareToAddr );
195}
196
197static tr_peer*
198getPeer( Torrent * torrent, const struct in_addr * in_addr )
199{
200    tr_peer * peer = getExistingPeer( torrent, in_addr );
201    if( peer == NULL )
202    {
203        peer = tr_new0( tr_peer, 1 );
204        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
205        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
206fprintf( stderr, "getPeer: torrent %p now has %d peers\n", torrent, tr_ptrArraySize(torrent->peers) );
207    }
208    return peer;
209}
210
211static void
212freePeer( tr_peer * peer )
213{
214    tr_peerMsgsFree( peer->msgs );
215    tr_bitfieldFree( peer->have );
216    tr_bitfieldFree( peer->blame );
217    tr_bitfieldFree( peer->banned );
218    tr_peerIoFree( peer->io );
219    tr_free( peer->client );
220    tr_free( peer );
221}
222
223static void
224freeTorrent( tr_peerMgr * manager, Torrent * t )
225{
226    int i, size;
227    tr_peer ** peers;
228
229    assert( manager != NULL );
230    assert( t != NULL );
231    assert( t->peers != NULL );
232
233    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
234    tr_timerFree( &t->choke_tag );
235    tr_timerFree( &t->refill_tag );
236    for( i=0; i<size; ++i )
237        freePeer( peers[i] );
238    tr_ptrArrayFree( t->peers );
239    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
240    tr_free( t->blocks );
241    tr_free( t );
242}
243
244/**
245***
246**/
247
248tr_peerMgr*
249tr_peerMgrNew( tr_handle * handle )
250{
251    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
252    m->handle = handle;
253    m->torrents = tr_ptrArrayNew( );
254    return m;
255}
256
257void
258tr_peerMgrFree( tr_peerMgr * manager )
259{
260    while( !tr_ptrArrayEmpty( manager->torrents ) )
261        freeTorrent( manager, (Torrent*)tr_ptrArrayNth( manager->torrents,0) );
262    tr_ptrArrayFree( manager->torrents );
263    tr_free( manager );
264}
265
266/**
267***
268**/
269
270static tr_peer**
271getConnectedPeers( Torrent * t, int * setmeCount )
272{
273    int i, peerCount, connectionCount;
274    tr_peer **peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
275    tr_peer **ret = tr_new( tr_peer*, peerCount );
276
277    for( i=connectionCount=0; i<peerCount; ++i )
278        if( peers[i]->msgs != NULL )
279            ret[connectionCount++] = peers[i];
280
281    *setmeCount = connectionCount;
282    return ret;
283}
284
285static int
286refillPulse( void * vtorrent )
287{
288    uint32_t i;
289    int size;
290    Torrent * t = (Torrent *) vtorrent;
291    tr_peer ** peers = getConnectedPeers( t, &size );
292fprintf( stderr, "in refill pulse for [%s]... sorting blocks by interest...", t->tor->info.name );
293
294    /* sort the blocks by interest */
295    qsort( t->blocks, t->blockCount, sizeof(struct tr_block), compareBlockByInterest );
296fprintf( stderr, " .done.\n" );
297
298    /* walk through all the most interesting blocks */
299    for( i=0; i<t->blockCount; ++i )
300    {
301        const uint32_t b = t->blocks[i].block;
302        const uint32_t index = tr_torBlockPiece( t->tor, b );
303        const uint32_t begin = ( b * t->tor->blockSize )-( index * t->tor->info.pieceSize );
304        const uint32_t length = tr_torBlockCountBytes( t->tor, (int)b );
305        int j;
306
307        if( t->blocks[i].have || t->blocks[i].dnd )
308            continue;
309        if( !size ) { /* all peers full */
310            fprintf( stderr, "all peers full...\n" );
311            break;
312        }
313
314        /* find a peer who can ask for this block */
315        for( j=0; j<size; )
316        {
317            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
318//fprintf( stderr, " block %"PRIu64", peer %"PRIu64, (uint64_t)i,  (uint64_t)j );
319            if( val == TR_ADDREQ_FULL ) {
320fprintf( stderr, "peer %d of %d is full\n", (int)j, size );
321                peers[j] = peers[--size];
322            }
323            else if( val == TR_ADDREQ_MISSING ) {
324//fprintf( stderr, "peer doesn't have it\n" );
325                ++j;
326            }
327            else if( val == TR_ADDREQ_OK ) {
328fprintf( stderr, "peer %d took the request for block %d\n", j, i );
329                incrementReqCount( &t->blocks[i] );
330                j = size;
331            }
332        }
333    }
334
335    /* put the blocks back by index */
336    qsort( t->blocks, t->blockCount, sizeof(struct tr_block), compareBlockByIndex );
337
338    /* cleanup */
339    tr_free( peers );
340
341    /* let the timer expire */
342    t->refill_tag = NULL;
343    return FALSE;
344}
345
346static void
347ensureRefillTag( Torrent * t )
348{
349    if( t->refill_tag == NULL )
350        t->refill_tag = tr_timerNew( t->manager->handle,
351                                     refillPulse, t, NULL, 5000 );
352}
353
354static void
355msgsCallbackFunc( void * source UNUSED, void * vevent, void * vt )
356{
357    Torrent * t = (Torrent *) vt;
358    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
359
360    switch( e->eventType )
361    {
362        case TR_PEERMSG_GOT_BITFIELD: {
363            const uint32_t begin = 0;
364            const uint32_t end = begin + t->blockCount;
365            uint32_t i;
366            for( i=begin; i<end; ++i ) {
367                if( !tr_bitfieldHas( e->bitfield, i ) )
368                    continue;
369                assert( t->blocks[i].block == i );
370                incrementScarcity( &t->blocks[i] );
371            }
372            break;
373        }
374
375        case TR_PEERMSG_GOT_HAVE: {
376            const uint32_t begin = tr_torPieceFirstBlock( t->tor, e->pieceIndex );
377            const uint32_t end = begin + tr_torPieceCountBlocks( t->tor, (int)e->pieceIndex );
378            uint32_t i;
379            for( i=begin; i<end; ++i ) {
380                assert( t->blocks[i].block == i );
381                incrementScarcity( &t->blocks[i] );
382            }
383            break;
384        }
385
386        case TR_PEERMSG_GOT_BLOCK: {
387            uint32_t i = e->blockIndex;
388            assert( t->blocks[i].block == i );
389            t->blocks[i].have = 1;
390            break;
391        }
392
393        case TR_PEERMSG_GOT_PEX:
394            /* FIXME */
395            break;
396
397        case TR_PEERMSG_GOT_ERROR:
398            /* FIXME */
399            break;
400
401        case TR_PEERMSG_BLOCKS_RUNNING_LOW:
402            ensureRefillTag( t );
403            break;
404
405        default:
406            assert(0);
407    }
408}
409
410static void
411myHandshakeDoneCB( tr_peerIo * io, int isConnected, void * vmanager )
412{
413    int ok = isConnected;
414    uint16_t port;
415    const struct in_addr * in_addr;
416    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
417    const uint8_t * hash = NULL;
418    Torrent * t;
419
420    assert( io != NULL );
421
422    in_addr = tr_peerIoGetAddress( io, &port );
423
424    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
425    {
426        tr_peerIoFree( io );
427        --manager->connectionCount;
428        return;
429    }
430
431    hash = tr_peerIoGetTorrentHash( io );
432    t = getExistingTorrent( manager, hash );
433    if( t == NULL )
434    {
435        tr_peerIoFree( io );
436        --manager->connectionCount;
437        return;
438    }
439
440    fprintf( stderr, "peer-mgr: torrent [%s] finished a handshake; isConnected is %d\n", t->tor->info.name, isConnected );
441
442    /* if we couldn't connect or were snubbed,
443     * the peer's probably not worth remembering. */
444    if( !ok ) {
445        tr_peer * peer = getExistingPeer( t, in_addr );
446        fprintf( stderr, "peer-mgr: torrent [%s] got a bad one, and you know what? fuck them.\n", t->tor->info.name );
447        if( peer ) {
448            tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
449            freePeer( peer );
450        } else  {
451            tr_peerIoFree( io );
452        }
453        --manager->connectionCount;
454        return;
455    }
456
457#if 0
458    /* ONLY DO THIS TEST FOR INCOMING CONNECTIONS */
459    /* check for duplicates */
460    if( getExistingPeer( t, in_addr ) ) {
461        tr_dbg( "dropping a duplicate connection... dropping." );
462        tr_peerIoFree( io );
463        return;
464    }
465#endif
466
467    if( 1 ) {
468        tr_peer * peer = getPeer( t, in_addr );
469        peer->port = port;
470        peer->io = io;
471        peer->msgs = tr_peerMsgsNew( t->tor, peer );
472        peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
473        chokePulse( t );
474    }
475}
476
477void
478tr_peerMgrAddIncoming( tr_peerMgr      * manager,
479                       struct in_addr  * addr,
480                       int               socket )
481{
482    ++manager->connectionCount;
483
484fprintf( stderr, "peer-mgr: new INCOMING CONNECTION...\n" );
485    tr_handshakeAdd( tr_peerIoNewIncoming( manager->handle, addr, socket ),
486                     HANDSHAKE_ENCRYPTION_PREFERRED,
487                     myHandshakeDoneCB,
488                     manager );
489}
490
491static void
492maybeConnect( tr_peerMgr * manager, Torrent * t, tr_peer * peer )
493{
494    if( tr_peerMgrIsAcceptingConnections( manager ) )
495    {
496        fprintf( stderr, "peer-mgr: torrent [%s] is handshaking with a new peer %08x:%04x\n",
497                 t->tor->info.name,
498                 (uint32_t) peer->in_addr.s_addr, peer->port );
499
500        ++manager->connectionCount;
501
502        peer->io = tr_peerIoNewOutgoing( manager->handle,
503                                         &peer->in_addr,
504                                         peer->port,
505                                         t->hash );
506
507        tr_handshakeAdd( peer->io, HANDSHAKE_ENCRYPTION_PREFERRED,
508                         myHandshakeDoneCB, manager );
509    }
510}
511
512void
513tr_peerMgrAddPex( tr_peerMgr     * manager,
514                  const uint8_t  * torrentHash,
515                  int              from,
516                  const tr_pex   * pex,
517                  int              pexCount )
518{
519    int i;
520    const tr_pex * walk = pex;
521    Torrent * t = getExistingTorrent( manager, torrentHash );
522    for( i=0; i<pexCount; ++i )
523    {
524        tr_peer * peer = getPeer( t, &walk->in_addr );
525        peer->port = walk->port;
526        peer->from = from;
527        maybeConnect( manager, t, peer );
528    }
529}
530
531void
532tr_peerMgrAddPeers( tr_peerMgr    * manager,
533                    const uint8_t * torrentHash,
534                    int             from,
535                    const uint8_t * peerCompact,
536                    int             peerCount )
537{
538    int i;
539    const uint8_t * walk = peerCompact;
540    Torrent * t = getExistingTorrent( manager, torrentHash );
541    for( i=0; i<peerCount; ++i )
542    {
543        tr_peer * peer;
544        struct in_addr addr;
545        uint16_t port;
546        memcpy( &addr, walk, 4 ); walk += 4;
547        memcpy( &port, walk, 2 ); walk += 2;
548        peer = getPeer( t, &addr );
549        peer->port = port;
550        peer->from = from;
551        maybeConnect( manager, t, peer );
552    }
553}
554
555/**
556***
557**/
558
559int
560tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager )
561{
562    return manager->connectionCount < MAX_CONNECTED_PEERS;
563}
564
565void
566tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
567                    const uint8_t  * torrentHash UNUSED,
568                    int              pieceIndex UNUSED,
569                    int              success UNUSED )
570{
571    assert( 0 );
572}
573
574int
575tr_pexCompare( const void * va, const void * vb )
576{
577    const tr_pex * a = (const tr_pex *) va;
578    const tr_pex * b = (const tr_pex *) vb;
579    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
580    if( i ) return i;
581    if( a->port < b->port ) return -1;
582    if( a->port > b->port ) return 1;
583    return 0;
584}
585
586int tr_pexCompare( const void * a, const void * b );
587
588
589int
590tr_peerMgrGetPeers( tr_peerMgr      * manager,
591                    const uint8_t   * torrentHash,
592                    tr_pex         ** setme_pex )
593{
594    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
595    int i, peerCount;
596    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
597    tr_pex * pex = tr_new( tr_pex, peerCount );
598    tr_pex * walk = pex;
599
600    for( i=0; i<peerCount; ++i, ++walk )
601    {
602        walk->in_addr = peers[i]->in_addr;
603        walk->port = peers[i]->port;
604        walk->flags = '\0'; /* FIXME */
605    }
606
607    assert( ( walk - pex ) == peerCount );
608    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
609    *setme_pex = pex;
610    return peerCount;
611}
612
613void
614tr_peerMgrStartTorrent( tr_peerMgr     * manager UNUSED,
615                        const uint8_t  * torrentHash UNUSED)
616{
617    //fprintf( stderr, "FIXME\n" );
618}
619
620void
621tr_peerMgrStopTorrent( tr_peerMgr     * manager UNUSED,
622                       const uint8_t  * torrentHash UNUSED )
623{
624    //fprintf( stderr, "FIXME\n" );
625}
626
627void
628tr_peerMgrAddTorrent( tr_peerMgr * manager,
629                      tr_torrent * tor )
630{
631    Torrent * t;
632    uint32_t i;
633
634    assert( tor != NULL );
635    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
636
637    t = tr_new0( Torrent, 1 );
638    t->manager = manager;
639    t->tor = tor;
640    t->peers = tr_ptrArrayNew( );
641    t->choke_tag = tr_timerNew( manager->handle,
642                                chokePulse, t, NULL, 
643                                RECHOKE_PERIOD_SECONDS );
644
645    t->blockCount = tor->blockCount;
646    t->blocks = tr_new( struct tr_block, t->blockCount );
647    for( i=0; i<t->blockCount; ++i ) {
648        const int index = tr_torBlockPiece( tor, i );
649        t->blocks[i].have = tr_cpBlockIsComplete( t->tor->completion, i ) ? 1 : 0;
650if( tr_cpBlockIsComplete( t->tor->completion, i ) ) fprintf( stderr, "have block %d\n", (int)i );
651        t->blocks[i].dnd = tor->info.pieces[index].dnd;
652        t->blocks[i].low_priority = tor->info.pieces[index].priority == TR_PRI_LOW;
653        t->blocks[i].high_priority = tor->info.pieces[index].priority == TR_PRI_HIGH;
654        t->blocks[i].requestCount = 0;
655        t->blocks[i].scarcity = 0;
656        t->blocks[i].block = i;
657    }
658
659    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
660    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
661}
662
663void
664tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
665                         const uint8_t  * torrentHash )
666{
667    Torrent * t = getExistingTorrent( manager, torrentHash );
668    if( t != NULL ) {
669        tr_peerMgrStopTorrent( manager, torrentHash );
670        freeTorrent( manager, t );
671    }
672}
673
674void
675tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
676                               const uint8_t    * torrentHash,
677                               int8_t           * tab,
678                               int                tabCount )
679{
680    int i;
681    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
682    const tr_torrent * tor = t->tor;
683    const float interval = tor->info.pieceCount / (float)tabCount;
684
685    for( i=0; i<tabCount; ++i )
686    {
687        const int piece = i * interval;
688
689        if( tor == NULL )
690            tab[i] = 0;
691        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
692            tab[i] = -1;
693        else {
694            int j, peerCount;
695            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
696            for( j=0; j<peerCount; ++j )
697                if( tr_bitfieldHas( peers[j]->have, i ) )
698                    ++tab[i];
699        }
700    }
701}
702
703
704void
705tr_peerMgrTorrentStats( const tr_peerMgr * manager,
706                        const uint8_t    * torrentHash,
707                        int              * setmePeersTotal,
708                        int              * setmePeersConnected,
709                        int              * setmePeersSendingToUs,
710                        int              * setmePeersGettingFromUs,
711                        int              * setmePeersFrom )
712{
713    int i, size;
714    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
715    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
716
717    *setmePeersTotal          = size;
718    *setmePeersConnected      = 0;
719    *setmePeersSendingToUs    = 0;
720    *setmePeersGettingFromUs  = 0;
721
722    for( i=0; i<size; ++i )
723    {
724        const tr_peer * peer = peers[i];
725
726        if( peer->io == NULL ) /* not connected */
727            continue;
728
729        ++*setmePeersConnected;
730
731        ++setmePeersFrom[peer->from];
732
733        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
734            ++*setmePeersGettingFromUs;
735
736        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
737            ++*setmePeersSendingToUs;
738    }
739}
740
741struct tr_peer_stat *
742tr_peerMgrPeerStats( const tr_peerMgr  * manager,
743                     const uint8_t     * torrentHash,
744                     int               * setmeCount UNUSED )
745{
746    int i, size;
747    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
748    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
749    tr_peer_stat * ret;
750
751    ret = tr_new0( tr_peer_stat, size );
752
753    for( i=0; i<size; ++i )
754    {
755        const tr_peer * peer = peers[i];
756        const int live = peer->io != NULL;
757        tr_peer_stat * stat = ret + i;
758
759        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
760        stat->port             = peer->port;
761        stat->from             = peer->from;
762        stat->client           = peer->client;
763        stat->progress         = peer->progress;
764        stat->isConnected      = live;
765        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
766        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
767        stat->isDownloading    = stat->uploadToRate > 0.01;
768        stat->isUploading      = stat->downloadFromRate > 0.01;
769    }
770
771    *setmeCount = size;
772    return ret;
773}
774
775void
776tr_peerMgrDisablePex( tr_peerMgr    * manager,
777                      const uint8_t * torrentHash,
778                      int             disable)
779{
780    Torrent * t = getExistingTorrent( manager, torrentHash );
781    tr_torrent * tor = t->tor;
782
783    if( ( tor->pexDisabled != disable ) && ! ( TR_FLAG_PRIVATE & tor->info.flags ) )
784    {
785        int i, size;
786        tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
787        for( i=0; i<size; ++i ) {
788            peers[i]->pexEnabled = disable ? 0 : 1;
789            peers[i]->lastPexTime = 0;
790        }
791
792        tor->pexDisabled = disable;
793    }
794}
795
796/**
797***
798**/
799
800typedef struct
801{
802    tr_peer * peer;
803    float rate;
804    int isInterested;
805}
806ChokeData;
807
808static int
809compareChokeByRate( const void * va, const void * vb )
810{
811    const ChokeData * a = ( const ChokeData * ) va;
812    const ChokeData * b = ( const ChokeData * ) vb;
813    if( a->rate > b->rate ) return -1;
814    if( a->rate < b->rate ) return 1;
815    return 0;
816}
817
818static int
819compareChokeByDownloader( const void * va, const void * vb )
820{
821    const ChokeData * a = ( const ChokeData * ) va;
822    const ChokeData * b = ( const ChokeData * ) vb;
823
824    /* primary key: interest */
825    if(  a->isInterested && !b->isInterested ) return -1;
826    if( !a->isInterested &&  b->isInterested ) return 1;
827
828    /* second key: rate */
829    return compareChokeByRate( va, vb );
830}
831
832static int
833chokePulse( void * vtorrent )
834{
835    Torrent * t = (Torrent *) vtorrent;
836    int i, size, unchoked;
837    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
838    float bestDownloaderRate;
839    ChokeData * data;
840    tr_peer ** peers = getConnectedPeers( t, &size );
841
842fprintf( stderr, "rechoking torrent %p, with %d peers\n", t, size );
843
844    if( size < 1 )
845        return TRUE;
846
847    data = tr_new( ChokeData, size );
848    for( i=0; i<size; ++i ) {
849        data[i].peer = peers[i];
850        data[i].isInterested = peers[i]->peerIsInterested;
851        data[i].rate = done ? tr_peerIoGetRateToPeer( peers[i]->io )
852                            : tr_peerIoGetRateToClient( peers[i]->io );
853    }
854
855    /* find the best downloaders and unchoke them */
856    qsort( data, size, sizeof(ChokeData), compareChokeByDownloader );
857    bestDownloaderRate = data[0].rate;
858    for( i=unchoked=0; i<size && unchoked<NUM_DOWNLOADERS_TO_UNCHOKE; ++i ) {
859        tr_peerMsgsSetChoke( data[i].peer->msgs, FALSE );
860        ++unchoked;
861    }
862    memmove( data, data+i, sizeof(ChokeData)*(size-i) );
863    size -= i;
864
865    /* of those remaining, unchoke those that are faster than the downloaders */
866    qsort( data, size, sizeof(ChokeData), compareChokeByRate );
867    for( i=0; i<size && data[i].rate >= bestDownloaderRate; ++i )
868        tr_peerMsgsSetChoke( data[i].peer->msgs, FALSE );
869    memmove( data, data+i, sizeof(ChokeData)*(size-i) );
870    size -= i;
871
872    /* of those remaining, optimistically unchoke one; choke the rest */
873    if( size > 0 ) {
874        const int optimistic = tr_rand( size );
875        for( i=0; i<size; ++i )
876            tr_peerMsgsSetChoke( data[i].peer->msgs, i!=optimistic );
877    }
878
879    /* cleanup */
880    tr_free( data );
881    tr_free( peers );
882    return TRUE;
883}
Note: See TracBrowser for help on using the repository browser.