source: branches/encryption/libtransmission/peer-mgr.c @ 2984

Last change on this file since 2984 was 2984, checked in by charles, 15 years ago

periodic checkin. nothing to see here.

  • Property svn:keywords set to Date Rev Author Id
File size: 19.2 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 2984 2007-09-07 17:05:56Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17
18#include "transmission.h"
19#include "handshake.h"
20#include "completion.h"
21#include "net.h"
22#include "peer-io.h"
23#include "peer-mgr.h"
24#include "peer-mgr-private.h"
25#include "peer-msgs.h"
26#include "ptrarray.h"
27#include "timer.h"
28#include "utils.h"
29
30#define MINUTES_TO_MSEC(N) ((N) * 60 * 1000)
31
32/* how frequently to change which peers are choked */
33#define RECHOKE_PERIOD_SECONDS (MINUTES_TO_MSEC(10))
34
35/* how many downloaders to unchoke per-torrent.
36 * http://wiki.theory.org/BitTorrentSpecification#Choking_and_Optimistic_Unchoking */
37#define NUM_DOWNLOADERS_TO_UNCHOKE 4
38
39/* across all torrents, how many peers maximum do we want connected? */
40#define MAX_CONNECTED_PEERS 80
41
42struct tr_block
43{
44    uint32_t block;
45    uint16_t scarcity;
46    uint8_t priority;
47    uint8_t requestCount;
48};
49
50typedef struct
51{
52    uint8_t hash[SHA_DIGEST_LENGTH];
53    tr_ptrArray * peers; /* tr_peer */
54    tr_timer_tag choke_tag;
55    tr_torrent * tor;
56
57    struct tr_block * blocks;
58    uint32_t blockCount;
59}
60Torrent;
61
62struct tr_peerMgr
63{
64    tr_handle * handle;
65    tr_ptrArray * torrents; /* Torrent */
66    int connectionCount;
67};
68
69/**
70***
71**/
72
73/**
74***
75**/
76
77static int
78torrentCompare( const void * va, const void * vb )
79{
80    const Torrent * a = (const Torrent*) va;
81    const Torrent * b = (const Torrent*) vb;
82    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
83}
84
85static int
86torrentCompareToHash( const void * va, const void * vb )
87{
88    const Torrent * a = (const Torrent*) va;
89    const uint8_t * b_hash = (const uint8_t*) vb;
90    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
91}
92
93static Torrent*
94getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
95{
96    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
97                                             hash,
98                                             torrentCompareToHash );
99}
100
101static int chokePulse( void * vtorrent );
102
103static int
104peerCompare( const void * va, const void * vb )
105{
106    const tr_peer * a = (const tr_peer *) va;
107    const tr_peer * b = (const tr_peer *) vb;
108    return memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
109}
110
111static int
112peerCompareToAddr( const void * va, const void * vb )
113{
114    const tr_peer * a = (const tr_peer *) va;
115    const struct in_addr * b = (const struct in_addr *) vb;
116    return memcmp( &a->in_addr, b, sizeof(struct in_addr) );
117}
118
119static tr_peer*
120getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
121{
122    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
123                                             in_addr,
124                                             peerCompareToAddr );
125}
126
127static tr_peer*
128getPeer( Torrent * torrent, const struct in_addr * in_addr )
129{
130    tr_peer * peer = getExistingPeer( torrent, in_addr );
131    if( peer == NULL )
132    {
133        peer = tr_new0( tr_peer, 1 );
134        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
135        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
136fprintf( stderr, "getPeer: torrent %p now has %d peers\n", torrent, tr_ptrArraySize(torrent->peers) );
137    }
138    return peer;
139}
140
141static void
142freePeer( tr_peer * peer )
143{
144    tr_peerMsgsFree( peer->msgs );
145    tr_bitfieldFree( peer->have );
146    tr_bitfieldFree( peer->blame );
147    tr_bitfieldFree( peer->banned );
148    tr_peerIoFree( peer->io );
149    tr_free( peer->client );
150    tr_free( peer );
151}
152
153static void
154freeTorrent( tr_peerMgr * manager, Torrent * t )
155{
156    int i, size;
157    tr_peer ** peers;
158
159    assert( manager != NULL );
160    assert( t != NULL );
161    assert( t->peers != NULL );
162
163    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
164    tr_timerFree( &t->choke_tag );
165    for( i=0; i<size; ++i )
166        freePeer( peers[i] );
167    tr_ptrArrayFree( t->peers );
168    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
169    tr_free( t );
170}
171
172/**
173***
174**/
175
176tr_peerMgr*
177tr_peerMgrNew( tr_handle * handle )
178{
179    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
180    m->handle = handle;
181    m->torrents = tr_ptrArrayNew( );
182    return m;
183}
184
185void
186tr_peerMgrFree( tr_peerMgr * manager )
187{
188    while( !tr_ptrArrayEmpty( manager->torrents ) )
189        freeTorrent( manager, (Torrent*)tr_ptrArrayNth( manager->torrents,0) );
190    tr_ptrArrayFree( manager->torrents );
191    tr_free( manager );
192}
193
194/**
195***
196**/
197
198static void
199msgsCallbackFunc( void * source UNUSED, void * vevent, void * vt )
200{
201    Torrent * t = (Torrent *) vt;
202    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
203
204    switch( e->eventType )
205    {
206        case TR_PEERMSG_GOT_BITFIELD: {
207            const uint32_t begin = 0;
208            const uint32_t end = begin + t->blockCount;
209            uint32_t i;
210            for( i=begin; i<end; ++i ) {
211                if( !tr_bitfieldHas( e->bitfield, i ) )
212                    continue;
213                assert( t->blocks[i].block == i );
214                if( t->blocks[i].scarcity < UINT8_MAX )
215                    t->blocks[i].scarcity++;
216            }
217            break;
218        }
219
220        case TR_PEERMSG_GOT_HAVE: {
221            const uint32_t begin = tr_torPieceFirstBlock( t->tor, e->pieceIndex );
222            const uint32_t end = begin + (uint32_t)tr_torPieceCountBlocks( t->tor, (int)e->pieceIndex );
223            uint32_t i;
224            for( i=begin; i<end; ++i ) {
225                assert( t->blocks[i].block == i );
226                if( t->blocks[i].scarcity < UINT8_MAX )
227                    t->blocks[i].scarcity++;
228            }
229            break;
230        }
231
232        case TR_PEERMSG_GOT_PEX:
233            /* FIXME */
234            break;
235
236        case TR_PEERMSG_GOT_ERROR:
237            /* FIXME */
238            break;
239
240        case TR_PEERMSG_BLOCKS_RUNNING_LOW:
241            /* FIXME */
242            break;
243
244        default:
245            assert(0);
246    }
247}
248
249static void
250myHandshakeDoneCB( tr_peerIo * io, int isConnected, void * vmanager )
251{
252    int ok = isConnected;
253    const uint8_t * hash = tr_peerIoGetTorrentHash( io );
254    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
255    Torrent * t = getExistingTorrent( manager, hash );
256    uint16_t port;
257    const struct in_addr * in_addr;
258
259    fprintf( stderr, "peer-mgr: torrent [%s] finished a handshake; isConnected is %d\n", t->tor->info.name, isConnected );
260
261    assert( io != NULL );
262
263    in_addr = tr_peerIoGetAddress( io, &port );
264
265    /* if we couldn't connect or were snubbed,
266     * the peer's probably not worth remembering. */
267    if( !ok ) {
268        tr_peer * peer = getExistingPeer( t, in_addr );
269        fprintf( stderr, "peer-mgr: torrent [%s] got a bad one, and you know what? fuck them.\n", t->tor->info.name );
270        if( peer ) {
271            tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
272            freePeer( peer );
273        } else  {
274            tr_peerIoFree( io );
275        }
276        --manager->connectionCount;
277        return;
278    }
279
280#if 0
281    /* ONLY DO THIS TEST FOR INCOMING CONNECTIONS */
282    /* check for duplicates */
283    if( getExistingPeer( t, in_addr ) ) {
284        tr_dbg( "dropping a duplicate connection... dropping." );
285        tr_peerIoFree( io );
286        return;
287    }
288#endif
289
290    if( 1 ) {
291        tr_peer * peer = getPeer( t, in_addr );
292        peer->port = port;
293        peer->msgs = tr_peerMsgsNew( t->tor, peer );
294        peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
295        chokePulse( t );
296    }
297}
298
299void
300tr_peerMgrAddIncoming( tr_peerMgr      * manager,
301                       struct in_addr  * addr,
302                       int               socket )
303{
304    ++manager->connectionCount;
305
306    tr_handshakeAdd( tr_peerIoNewIncoming( manager->handle, addr, socket ),
307                     HANDSHAKE_ENCRYPTION_PREFERRED,
308                     myHandshakeDoneCB,
309                     manager );
310}
311
312static void
313maybeConnect( tr_peerMgr * manager, Torrent * t, tr_peer * peer )
314{
315    if( tr_peerMgrIsAcceptingConnections( manager ) )
316    {
317        fprintf( stderr, "peer-mgr: torrent [%s] is handshaking with a new peer %08x:%04x\n",
318                 t->tor->info.name,
319                 (uint32_t) peer->in_addr.s_addr, peer->port );
320
321        peer->io = tr_peerIoNewOutgoing( manager->handle, &peer->in_addr, peer->port, t->hash );
322
323        tr_handshakeAdd( peer->io, HANDSHAKE_ENCRYPTION_PREFERRED,
324                         myHandshakeDoneCB, manager );
325    }
326}
327
328void
329tr_peerMgrAddPex( tr_peerMgr     * manager,
330                  const uint8_t  * torrentHash,
331                  int              from,
332                  const tr_pex   * pex,
333                  int              pexCount )
334{
335    int i;
336    const tr_pex * walk = pex;
337    Torrent * t = getExistingTorrent( manager, torrentHash );
338    for( i=0; i<pexCount; ++i )
339    {
340        tr_peer * peer = getPeer( t, &walk->in_addr );
341        peer->port = walk->port;
342        peer->from = from;
343        maybeConnect( manager, t, peer );
344    }
345}
346
347void
348tr_peerMgrAddPeers( tr_peerMgr    * manager,
349                    const uint8_t * torrentHash,
350                    int             from,
351                    const uint8_t * peerCompact,
352                    int             peerCount )
353{
354    int i;
355    const uint8_t * walk = peerCompact;
356    Torrent * t = getExistingTorrent( manager, torrentHash );
357    for( i=0; i<peerCount; ++i )
358    {
359        tr_peer * peer;
360        struct in_addr addr;
361        uint16_t port;
362        memcpy( &addr, walk, 4 ); walk += 4;
363        memcpy( &port, walk, 2 ); walk += 2;
364        peer = getPeer( t, &addr );
365        peer->port = port;
366        peer->from = from;
367        maybeConnect( manager, t, peer );
368    }
369}
370
371/**
372***
373**/
374
375int
376tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager )
377{
378    return manager->connectionCount < MAX_CONNECTED_PEERS;
379}
380
381void
382tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
383                    const uint8_t  * torrentHash UNUSED,
384                    int              pieceIndex UNUSED,
385                    int              success UNUSED )
386{
387    assert( 0 );
388}
389
390int
391tr_pexCompare( const void * va, const void * vb )
392{
393    const tr_pex * a = (const tr_pex *) va;
394    const tr_pex * b = (const tr_pex *) vb;
395    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
396    if( i ) return i;
397    if( a->port < b->port ) return -1;
398    if( a->port > b->port ) return 1;
399    return 0;
400}
401
402int tr_pexCompare( const void * a, const void * b );
403
404
405int
406tr_peerMgrGetPeers( tr_peerMgr      * manager,
407                    const uint8_t   * torrentHash,
408                    tr_pex         ** setme_pex )
409{
410    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
411    int i, peerCount;
412    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
413    tr_pex * pex = tr_new( tr_pex, peerCount );
414    tr_pex * walk = pex;
415
416    for( i=0; i<peerCount; ++i, ++walk )
417    {
418        walk->in_addr = peers[i]->in_addr;
419        walk->port = peers[i]->port;
420        walk->flags = '\0'; /* FIXME */
421    }
422
423    assert( ( walk - pex ) == peerCount );
424    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
425    *setme_pex = pex;
426    return peerCount;
427}
428
429void
430tr_peerMgrStartTorrent( tr_peerMgr     * manager UNUSED,
431                        const uint8_t  * torrentHash UNUSED)
432{
433    //fprintf( stderr, "FIXME\n" );
434}
435
436void
437tr_peerMgrStopTorrent( tr_peerMgr     * manager UNUSED,
438                       const uint8_t  * torrentHash UNUSED )
439{
440    //fprintf( stderr, "FIXME\n" );
441}
442
443void
444tr_peerMgrAddTorrent( tr_peerMgr * manager,
445                      tr_torrent * tor )
446{
447    Torrent * t;
448    uint32_t i;
449
450    assert( tor != NULL );
451    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
452
453    t = tr_new0( Torrent, 1 );
454    t->tor = tor;
455    t->peers = tr_ptrArrayNew( );
456    t->choke_tag = tr_timerNew( manager->handle,
457                                chokePulse, t, NULL, 
458                                RECHOKE_PERIOD_SECONDS );
459    t->blockCount = tor->blockCount;
460    t->blocks = tr_new0( struct tr_block, t->blockCount );
461    for( i=0; i<t->blockCount; ++i ) {
462        t->blocks[i].block = i;
463        t->blocks[i].scarcity = tr_cpPieceIsComplete( t->tor->completion, i )
464                                                    ? UINT32_MAX : 0;
465    }
466    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
467    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
468}
469
470void
471tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
472                         const uint8_t  * torrentHash )
473{
474    Torrent * t = getExistingTorrent( manager, torrentHash );
475    if( t != NULL ) {
476        tr_peerMgrStopTorrent( manager, torrentHash );
477        freeTorrent( manager, t );
478    }
479}
480
481void
482tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
483                               const uint8_t    * torrentHash,
484                               int8_t           * tab,
485                               int                tabCount )
486{
487    int i;
488    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
489    const tr_torrent * tor = t->tor;
490    const float interval = tor->info.pieceCount / (float)tabCount;
491
492    for( i=0; i<tabCount; ++i )
493    {
494        const int piece = i * interval;
495
496        if( tor == NULL )
497            tab[i] = 0;
498        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
499            tab[i] = -1;
500        else {
501            int j, peerCount;
502            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
503            for( j=0; j<peerCount; ++j )
504                if( tr_bitfieldHas( peers[j]->have, i ) )
505                    ++tab[i];
506        }
507    }
508}
509
510
511void
512tr_peerMgrTorrentStats( const tr_peerMgr * manager,
513                        const uint8_t    * torrentHash,
514                        int              * setmePeersTotal,
515                        int              * setmePeersConnected,
516                        int              * setmePeersSendingToUs,
517                        int              * setmePeersGettingFromUs,
518                        int              * setmePeersFrom )
519{
520    int i, size;
521    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
522    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
523
524    *setmePeersTotal          = size;
525    *setmePeersConnected      = 0;
526    *setmePeersSendingToUs    = 0;
527    *setmePeersGettingFromUs  = 0;
528
529    for( i=0; i<size; ++i )
530    {
531        const tr_peer * peer = peers[i];
532
533        if( peer->io == NULL ) /* not connected */
534            continue;
535
536        ++*setmePeersConnected;
537
538        ++setmePeersFrom[peer->from];
539
540        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
541            ++*setmePeersGettingFromUs;
542
543        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
544            ++*setmePeersSendingToUs;
545    }
546}
547
548struct tr_peer_stat *
549tr_peerMgrPeerStats( const tr_peerMgr  * manager,
550                     const uint8_t     * torrentHash,
551                     int               * setmeCount UNUSED )
552{
553    int i, size;
554    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
555    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
556    tr_peer_stat * ret;
557
558    ret = tr_new0( tr_peer_stat, size );
559
560    for( i=0; i<size; ++i )
561    {
562        const tr_peer * peer = peers[i];
563        const int live = peer->io != NULL;
564        tr_peer_stat * stat = ret + i;
565
566        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
567        stat->port             = peer->port;
568        stat->from             = peer->from;
569        stat->client           = peer->client;
570        stat->progress         = peer->progress;
571        stat->isConnected      = live;
572        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
573        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
574        stat->isDownloading    = stat->uploadToRate > 0.01;
575        stat->isUploading      = stat->downloadFromRate > 0.01;
576    }
577
578    *setmeCount = size;
579    return ret;
580}
581
582void
583tr_peerMgrDisablePex( tr_peerMgr    * manager,
584                      const uint8_t * torrentHash,
585                      int             disable)
586{
587    Torrent * t = getExistingTorrent( manager, torrentHash );
588    tr_torrent * tor = t->tor;
589
590    if( ( tor->pexDisabled != disable ) && ! ( TR_FLAG_PRIVATE & tor->info.flags ) )
591    {
592        int i, size;
593        tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
594        for( i=0; i<size; ++i ) {
595            peers[i]->pexEnabled = disable ? 0 : 1;
596            peers[i]->lastPexTime = 0;
597        }
598
599        tor->pexDisabled = disable;
600    }
601}
602
603/**
604***
605**/
606
607typedef struct
608{
609    tr_peer * peer;
610    float rate;
611    int isInterested;
612}
613ChokeData;
614
615static int
616compareChokeByRate( const void * va, const void * vb )
617{
618    const ChokeData * a = ( const ChokeData * ) va;
619    const ChokeData * b = ( const ChokeData * ) vb;
620    if( a->rate > b->rate ) return -1;
621    if( a->rate < b->rate ) return 1;
622    return 0;
623}
624
625static int
626compareChokeByDownloader( const void * va, const void * vb )
627{
628    const ChokeData * a = ( const ChokeData * ) va;
629    const ChokeData * b = ( const ChokeData * ) vb;
630
631    /* primary key: interest */
632    if(  a->isInterested && !b->isInterested ) return -1;
633    if( !a->isInterested &&  b->isInterested ) return 1;
634
635    /* second key: rate */
636    return compareChokeByRate( va, vb );
637}
638
639static int
640chokePulse( void * vtorrent )
641{
642    Torrent * t = (Torrent *) vtorrent;
643    int i, size, unchoked;
644    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
645    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
646    float bestDownloaderRate;
647    ChokeData * data;
648
649fprintf( stderr, "rechoking torrent %p, with %d peers\n", t, size );
650
651    if( size < 1 )
652        return TRUE;
653
654    data = tr_new( ChokeData, size );
655    for( i=0; i<size; ++i ) {
656        data[i].peer = peers[i];
657        data[i].isInterested = peers[i]->peerIsInterested;
658        data[i].rate = done ? tr_peerIoGetRateToPeer( peers[i]->io )
659                            : tr_peerIoGetRateToClient( peers[i]->io );
660    }
661
662    /* find the best downloaders and unchoke them */
663    qsort( data, size, sizeof(ChokeData), compareChokeByDownloader );
664    bestDownloaderRate = data[0].rate;
665    for( i=unchoked=0; i<size && unchoked<NUM_DOWNLOADERS_TO_UNCHOKE; ++i ) {
666        if( data[i].peer->msgs != NULL ) {
667            tr_peerMsgsSetChoke( data[i].peer->msgs, FALSE );
668            ++unchoked;
669        }
670    }
671    memmove( data, data+i, sizeof(ChokeData)*(size-i) );
672    size -= i;
673
674    /* of those remaining, unchoke those that are faster than the downloaders */
675    qsort( data, size, sizeof(ChokeData), compareChokeByRate );
676    for( i=0; i<size && data[i].rate >= bestDownloaderRate; ++i )
677        tr_peerMsgsSetChoke( data[i].peer->msgs, FALSE );
678    memmove( data, data+i, sizeof(ChokeData)*(size-i) );
679    size -= i;
680
681    /* of those remaining, optimistically unchoke one; choke the rest */
682    if( size > 0 ) {
683        const int optimistic = tr_rand( size );
684        for( i=0; i<size; ++i )
685            tr_peerMsgsSetChoke( data[i].peer->msgs, i!=optimistic );
686    }
687
688    /* cleanup */
689    tr_free( data );
690    return TRUE;
691}
Note: See TracBrowser for help on using the repository browser.