source: branches/encryption/libtransmission/peer-mgr.c @ 2985

Last change on this file since 2985 was 2985, checked in by charles, 15 years ago

work on incoming connections, and better deciding of which pieces to request first.

  • Property svn:keywords set to Date Rev Author Id
File size: 19.6 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 2985 2007-09-07 20:55:38Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17
18#include "transmission.h"
19#include "handshake.h"
20#include "completion.h"
21#include "net.h"
22#include "peer-io.h"
23#include "peer-mgr.h"
24#include "peer-mgr-private.h"
25#include "peer-msgs.h"
26#include "ptrarray.h"
27#include "timer.h"
28#include "utils.h"
29
30#define MINUTES_TO_MSEC(N) ((N) * 60 * 1000)
31
32/* how frequently to change which peers are choked */
33#define RECHOKE_PERIOD_SECONDS (MINUTES_TO_MSEC(10))
34
35/* how many downloaders to unchoke per-torrent.
36 * http://wiki.theory.org/BitTorrentSpecification#Choking_and_Optimistic_Unchoking */
37#define NUM_DOWNLOADERS_TO_UNCHOKE 4
38
39/* across all torrents, how many peers maximum do we want connected? */
40#define MAX_CONNECTED_PEERS 80
41
42struct tr_block
43{
44    uint32_t block;
45    uint16_t scarcity;
46    uint8_t priority;
47    uint8_t requestCount;
48};
49
50typedef struct
51{
52    uint8_t hash[SHA_DIGEST_LENGTH];
53    tr_ptrArray * peers; /* tr_peer */
54    tr_timer_tag choke_tag;
55    tr_torrent * tor;
56
57    struct tr_block * blocks;
58    uint32_t blockCount;
59}
60Torrent;
61
62struct tr_peerMgr
63{
64    tr_handle * handle;
65    tr_ptrArray * torrents; /* Torrent */
66    int connectionCount;
67};
68
69/**
70***
71**/
72
73/**
74***
75**/
76
77static int
78torrentCompare( const void * va, const void * vb )
79{
80    const Torrent * a = (const Torrent*) va;
81    const Torrent * b = (const Torrent*) vb;
82    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
83}
84
85static int
86torrentCompareToHash( const void * va, const void * vb )
87{
88    const Torrent * a = (const Torrent*) va;
89    const uint8_t * b_hash = (const uint8_t*) vb;
90    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
91}
92
93static Torrent*
94getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
95{
96    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
97                                             hash,
98                                             torrentCompareToHash );
99}
100
101static int chokePulse( void * vtorrent );
102
103static int
104peerCompare( const void * va, const void * vb )
105{
106    const tr_peer * a = (const tr_peer *) va;
107    const tr_peer * b = (const tr_peer *) vb;
108    return memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
109}
110
111static int
112peerCompareToAddr( const void * va, const void * vb )
113{
114    const tr_peer * a = (const tr_peer *) va;
115    const struct in_addr * b = (const struct in_addr *) vb;
116    return memcmp( &a->in_addr, b, sizeof(struct in_addr) );
117}
118
119static tr_peer*
120getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
121{
122    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
123                                             in_addr,
124                                             peerCompareToAddr );
125}
126
127static tr_peer*
128getPeer( Torrent * torrent, const struct in_addr * in_addr )
129{
130    tr_peer * peer = getExistingPeer( torrent, in_addr );
131    if( peer == NULL )
132    {
133        peer = tr_new0( tr_peer, 1 );
134        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
135        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
136fprintf( stderr, "getPeer: torrent %p now has %d peers\n", torrent, tr_ptrArraySize(torrent->peers) );
137    }
138    return peer;
139}
140
141static void
142freePeer( tr_peer * peer )
143{
144    tr_peerMsgsFree( peer->msgs );
145    tr_bitfieldFree( peer->have );
146    tr_bitfieldFree( peer->blame );
147    tr_bitfieldFree( peer->banned );
148    tr_peerIoFree( peer->io );
149    tr_free( peer->client );
150    tr_free( peer );
151}
152
153static void
154freeTorrent( tr_peerMgr * manager, Torrent * t )
155{
156    int i, size;
157    tr_peer ** peers;
158
159    assert( manager != NULL );
160    assert( t != NULL );
161    assert( t->peers != NULL );
162
163    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
164    tr_timerFree( &t->choke_tag );
165    for( i=0; i<size; ++i )
166        freePeer( peers[i] );
167    tr_ptrArrayFree( t->peers );
168    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
169    tr_free( t );
170}
171
172/**
173***
174**/
175
176tr_peerMgr*
177tr_peerMgrNew( tr_handle * handle )
178{
179    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
180    m->handle = handle;
181    m->torrents = tr_ptrArrayNew( );
182    return m;
183}
184
185void
186tr_peerMgrFree( tr_peerMgr * manager )
187{
188    while( !tr_ptrArrayEmpty( manager->torrents ) )
189        freeTorrent( manager, (Torrent*)tr_ptrArrayNth( manager->torrents,0) );
190    tr_ptrArrayFree( manager->torrents );
191    tr_free( manager );
192}
193
194/**
195***
196**/
197
198static void
199msgsCallbackFunc( void * source UNUSED, void * vevent, void * vt )
200{
201    Torrent * t = (Torrent *) vt;
202    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
203
204    switch( e->eventType )
205    {
206        case TR_PEERMSG_GOT_BITFIELD: {
207            const uint32_t begin = 0;
208            const uint32_t end = begin + t->blockCount;
209            uint32_t i;
210            for( i=begin; i<end; ++i ) {
211                if( !tr_bitfieldHas( e->bitfield, i ) )
212                    continue;
213                assert( t->blocks[i].block == i );
214                if( t->blocks[i].scarcity < UINT8_MAX )
215                    t->blocks[i].scarcity++;
216            }
217            break;
218        }
219
220        case TR_PEERMSG_GOT_HAVE: {
221            const uint32_t begin = tr_torPieceFirstBlock( t->tor, e->pieceIndex );
222            const uint32_t end = begin + tr_torPieceCountBlocks( t->tor, (int)e->pieceIndex );
223            uint32_t i;
224            for( i=begin; i<end; ++i ) {
225                assert( t->blocks[i].block == i );
226                if( t->blocks[i].scarcity < UINT8_MAX )
227                    t->blocks[i].scarcity++;
228            }
229            break;
230        }
231
232        case TR_PEERMSG_GOT_PEX:
233            /* FIXME */
234            break;
235
236        case TR_PEERMSG_GOT_ERROR:
237            /* FIXME */
238            break;
239
240        case TR_PEERMSG_BLOCKS_RUNNING_LOW:
241            /* FIXME */
242            break;
243
244        default:
245            assert(0);
246    }
247}
248
249static void
250myHandshakeDoneCB( tr_peerIo * io, int isConnected, void * vmanager )
251{
252    int ok = isConnected;
253    uint16_t port;
254    const struct in_addr * in_addr;
255    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
256    const uint8_t * hash = NULL;
257    Torrent * t;
258
259    assert( io != NULL );
260
261    in_addr = tr_peerIoGetAddress( io, &port );
262
263    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
264    {
265        tr_peerIoFree( io );
266        --manager->connectionCount;
267        return;
268    }
269
270    hash = tr_peerIoGetTorrentHash( io );
271    t = getExistingTorrent( manager, hash );
272
273    fprintf( stderr, "peer-mgr: torrent [%s] finished a handshake; isConnected is %d\n", t->tor->info.name, isConnected );
274
275    /* if we couldn't connect or were snubbed,
276     * the peer's probably not worth remembering. */
277    if( !ok ) {
278        tr_peer * peer = getExistingPeer( t, in_addr );
279        fprintf( stderr, "peer-mgr: torrent [%s] got a bad one, and you know what? fuck them.\n", t->tor->info.name );
280        if( peer ) {
281            tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
282            freePeer( peer );
283        } else  {
284            tr_peerIoFree( io );
285        }
286        --manager->connectionCount;
287        return;
288    }
289
290#if 0
291    /* ONLY DO THIS TEST FOR INCOMING CONNECTIONS */
292    /* check for duplicates */
293    if( getExistingPeer( t, in_addr ) ) {
294        tr_dbg( "dropping a duplicate connection... dropping." );
295        tr_peerIoFree( io );
296        return;
297    }
298#endif
299
300    if( 1 ) {
301        tr_peer * peer = getPeer( t, in_addr );
302        peer->port = port;
303        peer->msgs = tr_peerMsgsNew( t->tor, peer );
304        peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
305        chokePulse( t );
306    }
307}
308
309void
310tr_peerMgrAddIncoming( tr_peerMgr      * manager,
311                       struct in_addr  * addr,
312                       int               socket )
313{
314    ++manager->connectionCount;
315
316fprintf( stderr, "peer-mgr: new INCOMING CONNECTION...\n" );
317    tr_handshakeAdd( tr_peerIoNewIncoming( manager->handle, addr, socket ),
318                     HANDSHAKE_ENCRYPTION_PREFERRED,
319                     myHandshakeDoneCB,
320                     manager );
321}
322
323static void
324maybeConnect( tr_peerMgr * manager, Torrent * t, tr_peer * peer )
325{
326    if( tr_peerMgrIsAcceptingConnections( manager ) )
327    {
328        fprintf( stderr, "peer-mgr: torrent [%s] is handshaking with a new peer %08x:%04x\n",
329                 t->tor->info.name,
330                 (uint32_t) peer->in_addr.s_addr, peer->port );
331
332        ++manager->connectionCount;
333
334        peer->io = tr_peerIoNewOutgoing( manager->handle,
335                                         &peer->in_addr,
336                                         peer->port,
337                                         t->hash );
338
339        tr_handshakeAdd( peer->io, HANDSHAKE_ENCRYPTION_PREFERRED,
340                         myHandshakeDoneCB, manager );
341    }
342}
343
344void
345tr_peerMgrAddPex( tr_peerMgr     * manager,
346                  const uint8_t  * torrentHash,
347                  int              from,
348                  const tr_pex   * pex,
349                  int              pexCount )
350{
351    int i;
352    const tr_pex * walk = pex;
353    Torrent * t = getExistingTorrent( manager, torrentHash );
354    for( i=0; i<pexCount; ++i )
355    {
356        tr_peer * peer = getPeer( t, &walk->in_addr );
357        peer->port = walk->port;
358        peer->from = from;
359        maybeConnect( manager, t, peer );
360    }
361}
362
363void
364tr_peerMgrAddPeers( tr_peerMgr    * manager,
365                    const uint8_t * torrentHash,
366                    int             from,
367                    const uint8_t * peerCompact,
368                    int             peerCount )
369{
370    int i;
371    const uint8_t * walk = peerCompact;
372    Torrent * t = getExistingTorrent( manager, torrentHash );
373    for( i=0; i<peerCount; ++i )
374    {
375        tr_peer * peer;
376        struct in_addr addr;
377        uint16_t port;
378        memcpy( &addr, walk, 4 ); walk += 4;
379        memcpy( &port, walk, 2 ); walk += 2;
380        peer = getPeer( t, &addr );
381        peer->port = port;
382        peer->from = from;
383        maybeConnect( manager, t, peer );
384    }
385}
386
387/**
388***
389**/
390
391int
392tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager )
393{
394    return manager->connectionCount < MAX_CONNECTED_PEERS;
395}
396
397void
398tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
399                    const uint8_t  * torrentHash UNUSED,
400                    int              pieceIndex UNUSED,
401                    int              success UNUSED )
402{
403    assert( 0 );
404}
405
406int
407tr_pexCompare( const void * va, const void * vb )
408{
409    const tr_pex * a = (const tr_pex *) va;
410    const tr_pex * b = (const tr_pex *) vb;
411    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
412    if( i ) return i;
413    if( a->port < b->port ) return -1;
414    if( a->port > b->port ) return 1;
415    return 0;
416}
417
418int tr_pexCompare( const void * a, const void * b );
419
420
421int
422tr_peerMgrGetPeers( tr_peerMgr      * manager,
423                    const uint8_t   * torrentHash,
424                    tr_pex         ** setme_pex )
425{
426    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
427    int i, peerCount;
428    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
429    tr_pex * pex = tr_new( tr_pex, peerCount );
430    tr_pex * walk = pex;
431
432    for( i=0; i<peerCount; ++i, ++walk )
433    {
434        walk->in_addr = peers[i]->in_addr;
435        walk->port = peers[i]->port;
436        walk->flags = '\0'; /* FIXME */
437    }
438
439    assert( ( walk - pex ) == peerCount );
440    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
441    *setme_pex = pex;
442    return peerCount;
443}
444
445void
446tr_peerMgrStartTorrent( tr_peerMgr     * manager UNUSED,
447                        const uint8_t  * torrentHash UNUSED)
448{
449    //fprintf( stderr, "FIXME\n" );
450}
451
452void
453tr_peerMgrStopTorrent( tr_peerMgr     * manager UNUSED,
454                       const uint8_t  * torrentHash UNUSED )
455{
456    //fprintf( stderr, "FIXME\n" );
457}
458
459void
460tr_peerMgrAddTorrent( tr_peerMgr * manager,
461                      tr_torrent * tor )
462{
463    Torrent * t;
464    uint32_t i;
465
466    assert( tor != NULL );
467    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
468
469    t = tr_new0( Torrent, 1 );
470    t->tor = tor;
471    t->peers = tr_ptrArrayNew( );
472    t->choke_tag = tr_timerNew( manager->handle,
473                                chokePulse, t, NULL, 
474                                RECHOKE_PERIOD_SECONDS );
475    t->blockCount = tor->blockCount;
476    t->blocks = tr_new0( struct tr_block, t->blockCount );
477    for( i=0; i<t->blockCount; ++i ) {
478        t->blocks[i].block = i;
479        t->blocks[i].scarcity = tr_cpBlockIsComplete( t->tor->completion, i )
480                                                      ? UINT32_MAX : 0;
481    }
482    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
483    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
484}
485
486void
487tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
488                         const uint8_t  * torrentHash )
489{
490    Torrent * t = getExistingTorrent( manager, torrentHash );
491    if( t != NULL ) {
492        tr_peerMgrStopTorrent( manager, torrentHash );
493        freeTorrent( manager, t );
494    }
495}
496
497void
498tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
499                               const uint8_t    * torrentHash,
500                               int8_t           * tab,
501                               int                tabCount )
502{
503    int i;
504    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
505    const tr_torrent * tor = t->tor;
506    const float interval = tor->info.pieceCount / (float)tabCount;
507
508    for( i=0; i<tabCount; ++i )
509    {
510        const int piece = i * interval;
511
512        if( tor == NULL )
513            tab[i] = 0;
514        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
515            tab[i] = -1;
516        else {
517            int j, peerCount;
518            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
519            for( j=0; j<peerCount; ++j )
520                if( tr_bitfieldHas( peers[j]->have, i ) )
521                    ++tab[i];
522        }
523    }
524}
525
526
527void
528tr_peerMgrTorrentStats( const tr_peerMgr * manager,
529                        const uint8_t    * torrentHash,
530                        int              * setmePeersTotal,
531                        int              * setmePeersConnected,
532                        int              * setmePeersSendingToUs,
533                        int              * setmePeersGettingFromUs,
534                        int              * setmePeersFrom )
535{
536    int i, size;
537    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
538    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
539
540    *setmePeersTotal          = size;
541    *setmePeersConnected      = 0;
542    *setmePeersSendingToUs    = 0;
543    *setmePeersGettingFromUs  = 0;
544
545    for( i=0; i<size; ++i )
546    {
547        const tr_peer * peer = peers[i];
548
549        if( peer->io == NULL ) /* not connected */
550            continue;
551
552        ++*setmePeersConnected;
553
554        ++setmePeersFrom[peer->from];
555
556        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
557            ++*setmePeersGettingFromUs;
558
559        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
560            ++*setmePeersSendingToUs;
561    }
562}
563
564struct tr_peer_stat *
565tr_peerMgrPeerStats( const tr_peerMgr  * manager,
566                     const uint8_t     * torrentHash,
567                     int               * setmeCount UNUSED )
568{
569    int i, size;
570    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
571    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
572    tr_peer_stat * ret;
573
574    ret = tr_new0( tr_peer_stat, size );
575
576    for( i=0; i<size; ++i )
577    {
578        const tr_peer * peer = peers[i];
579        const int live = peer->io != NULL;
580        tr_peer_stat * stat = ret + i;
581
582        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
583        stat->port             = peer->port;
584        stat->from             = peer->from;
585        stat->client           = peer->client;
586        stat->progress         = peer->progress;
587        stat->isConnected      = live;
588        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
589        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
590        stat->isDownloading    = stat->uploadToRate > 0.01;
591        stat->isUploading      = stat->downloadFromRate > 0.01;
592    }
593
594    *setmeCount = size;
595    return ret;
596}
597
598void
599tr_peerMgrDisablePex( tr_peerMgr    * manager,
600                      const uint8_t * torrentHash,
601                      int             disable)
602{
603    Torrent * t = getExistingTorrent( manager, torrentHash );
604    tr_torrent * tor = t->tor;
605
606    if( ( tor->pexDisabled != disable ) && ! ( TR_FLAG_PRIVATE & tor->info.flags ) )
607    {
608        int i, size;
609        tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
610        for( i=0; i<size; ++i ) {
611            peers[i]->pexEnabled = disable ? 0 : 1;
612            peers[i]->lastPexTime = 0;
613        }
614
615        tor->pexDisabled = disable;
616    }
617}
618
619/**
620***
621**/
622
623typedef struct
624{
625    tr_peer * peer;
626    float rate;
627    int isInterested;
628}
629ChokeData;
630
631static int
632compareChokeByRate( const void * va, const void * vb )
633{
634    const ChokeData * a = ( const ChokeData * ) va;
635    const ChokeData * b = ( const ChokeData * ) vb;
636    if( a->rate > b->rate ) return -1;
637    if( a->rate < b->rate ) return 1;
638    return 0;
639}
640
641static int
642compareChokeByDownloader( const void * va, const void * vb )
643{
644    const ChokeData * a = ( const ChokeData * ) va;
645    const ChokeData * b = ( const ChokeData * ) vb;
646
647    /* primary key: interest */
648    if(  a->isInterested && !b->isInterested ) return -1;
649    if( !a->isInterested &&  b->isInterested ) return 1;
650
651    /* second key: rate */
652    return compareChokeByRate( va, vb );
653}
654
655static int
656chokePulse( void * vtorrent )
657{
658    Torrent * t = (Torrent *) vtorrent;
659    int i, size, unchoked;
660    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
661    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
662    float bestDownloaderRate;
663    ChokeData * data;
664
665fprintf( stderr, "rechoking torrent %p, with %d peers\n", t, size );
666
667    if( size < 1 )
668        return TRUE;
669
670    data = tr_new( ChokeData, size );
671    for( i=0; i<size; ++i ) {
672        data[i].peer = peers[i];
673        data[i].isInterested = peers[i]->peerIsInterested;
674        data[i].rate = done ? tr_peerIoGetRateToPeer( peers[i]->io )
675                            : tr_peerIoGetRateToClient( peers[i]->io );
676    }
677
678    /* find the best downloaders and unchoke them */
679    qsort( data, size, sizeof(ChokeData), compareChokeByDownloader );
680    bestDownloaderRate = data[0].rate;
681    for( i=unchoked=0; i<size && unchoked<NUM_DOWNLOADERS_TO_UNCHOKE; ++i ) {
682        if( data[i].peer->msgs != NULL ) {
683            tr_peerMsgsSetChoke( data[i].peer->msgs, FALSE );
684            ++unchoked;
685        }
686    }
687    memmove( data, data+i, sizeof(ChokeData)*(size-i) );
688    size -= i;
689
690    /* of those remaining, unchoke those that are faster than the downloaders */
691    qsort( data, size, sizeof(ChokeData), compareChokeByRate );
692    for( i=0; i<size && data[i].rate >= bestDownloaderRate; ++i )
693        tr_peerMsgsSetChoke( data[i].peer->msgs, FALSE );
694    memmove( data, data+i, sizeof(ChokeData)*(size-i) );
695    size -= i;
696
697    /* of those remaining, optimistically unchoke one; choke the rest */
698    if( size > 0 ) {
699        const int optimistic = tr_rand( size );
700        for( i=0; i<size; ++i )
701            tr_peerMsgsSetChoke( data[i].peer->msgs, i!=optimistic );
702    }
703
704    /* cleanup */
705    tr_free( data );
706    return TRUE;
707}
Note: See TracBrowser for help on using the repository browser.