source: branches/encryption/libtransmission/peer-mgr.c @ 2978

Last change on this file since 2978 was 2978, checked in by charles, 15 years ago

still way too buggy, but I can now seed at full speed using < 2% of the CPU. =)

  • Property svn:keywords set to Date Rev Author Id
File size: 17.2 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 2978 2007-09-06 00:33:33Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17
18#include "transmission.h"
19#include "handshake.h"
20#include "completion.h"
21#include "net.h"
22#include "peer-io.h"
23#include "peer-mgr.h"
24#include "peer-mgr-private.h"
25#include "peer-msgs.h"
26#include "ptrarray.h"
27#include "timer.h"
28#include "utils.h"
29
30#define MINUTES_TO_MSEC(N) ((N) * 60 * 1000)
31
32/* how frequently to change which peers are choked */
33#define RECHOKE_PERIOD_SECONDS (MINUTES_TO_MSEC(10))
34
35/* how many downloaders to unchoke per-torrent.
36 * http://wiki.theory.org/BitTorrentSpecification#Choking_and_Optimistic_Unchoking */
37#define NUM_DOWNLOADERS_TO_UNCHOKE 4
38
39/* across all torrents, how many peers maximum do we want connected? */
40#define MAX_CONNECTED_PEERS 80
41
42typedef struct
43{
44    uint8_t hash[SHA_DIGEST_LENGTH];
45    tr_ptrArray * peers; /* tr_peer */
46    tr_timer_tag choke_tag;
47    tr_torrent * tor;
48}
49Torrent;
50
51struct tr_peerMgr
52{
53    tr_handle * handle;
54    tr_ptrArray * torrents; /* Torrent */
55    int connectionCount;
56};
57
58/**
59***
60**/
61
62static int
63torrentCompare( const void * va, const void * vb )
64{
65    const Torrent * a = (const Torrent*) va;
66    const Torrent * b = (const Torrent*) vb;
67    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
68}
69
70static int
71torrentCompareToHash( const void * va, const void * vb )
72{
73    const Torrent * a = (const Torrent*) va;
74    const uint8_t * b_hash = (const uint8_t*) vb;
75    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
76}
77
78static Torrent*
79getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
80{
81    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
82                                             hash,
83                                             torrentCompareToHash );
84}
85
86static int chokePulse( void * vtorrent );
87
88static int
89peerCompare( const void * va, const void * vb )
90{
91    const tr_peer * a = (const tr_peer *) va;
92    const tr_peer * b = (const tr_peer *) vb;
93    return memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
94}
95
96static int
97peerCompareToAddr( const void * va, const void * vb )
98{
99    const tr_peer * a = (const tr_peer *) va;
100    const struct in_addr * b = (const struct in_addr *) vb;
101    return memcmp( &a->in_addr, b, sizeof(struct in_addr) );
102}
103
104static tr_peer*
105getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
106{
107    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
108                                             in_addr,
109                                             peerCompareToAddr );
110}
111
112static tr_peer*
113getPeer( Torrent * torrent, const struct in_addr * in_addr )
114{
115    tr_peer * peer = getExistingPeer( torrent, in_addr );
116    if( peer == NULL )
117    {
118        peer = tr_new0( tr_peer, 1 );
119        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
120        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
121fprintf( stderr, "getPeer: torrent %p now has %d peers\n", torrent, tr_ptrArraySize(torrent->peers) );
122    }
123    return peer;
124}
125
126static void
127freePeer( tr_peer * peer )
128{
129    tr_peerMsgsFree( peer->msgs );
130    tr_bitfieldFree( peer->have );
131    tr_bitfieldFree( peer->blame );
132    tr_bitfieldFree( peer->banned );
133    tr_peerIoFree( peer->io );
134    tr_free( peer->client );
135    tr_free( peer );
136}
137
138static void
139freeTorrent( tr_peerMgr * manager, Torrent * t )
140{
141    int i, size;
142    tr_peer ** peers;
143
144    assert( manager != NULL );
145    assert( t != NULL );
146    assert( t->peers != NULL );
147
148    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
149    tr_timerFree( &t->choke_tag );
150    for( i=0; i<size; ++i )
151        freePeer( peers[i] );
152    tr_ptrArrayFree( t->peers );
153    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
154    tr_free( t );
155}
156
157/**
158***
159**/
160
161tr_peerMgr*
162tr_peerMgrNew( tr_handle * handle )
163{
164    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
165    m->handle = handle;
166    m->torrents = tr_ptrArrayNew( );
167    return m;
168}
169
170void
171tr_peerMgrFree( tr_peerMgr * manager )
172{
173    while( !tr_ptrArrayEmpty( manager->torrents ) )
174        freeTorrent( manager, (Torrent*)tr_ptrArrayNth( manager->torrents,0) );
175    tr_ptrArrayFree( manager->torrents );
176    tr_free( manager );
177}
178
179/**
180***
181**/
182
183static void
184myHandshakeDoneCB( tr_peerIo * io, int isConnected, void * vmanager )
185{
186    int ok = isConnected;
187    const uint8_t * hash = tr_peerIoGetTorrentHash( io );
188    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
189    Torrent * t = getExistingTorrent( manager, hash );
190    uint16_t port;
191    const struct in_addr * in_addr;
192
193    fprintf( stderr, "peer-mgr: torrent [%s] finished a handshake; isConnected is %d\n", t->tor->info.name, isConnected );
194
195    assert( io != NULL );
196
197    in_addr = tr_peerIoGetAddress( io, &port );
198
199    /* if we couldn't connect or were snubbed,
200     * the peer's probably not worth remembering. */
201    if( !ok ) {
202        tr_peer * peer = getExistingPeer( t, in_addr );
203        fprintf( stderr, "peer-mgr: torrent [%s] got a bad one, and you know what? fuck them.\n", t->tor->info.name );
204        if( peer ) {
205            tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
206            freePeer( peer );
207        } else  {
208            tr_peerIoFree( io );
209        }
210        --manager->connectionCount;
211        return;
212    }
213
214#if 0
215    /* ONLY DO THIS TEST FOR INCOMING CONNECTIONS */
216    /* check for duplicates */
217    if( getExistingPeer( t, in_addr ) ) {
218        tr_dbg( "dropping a duplicate connection... dropping." );
219        tr_peerIoFree( io );
220        return;
221    }
222#endif
223
224    if( 1 ) {
225        tr_peer * peer = getPeer( t, in_addr );
226        peer->port = port;
227        peer->msgs = tr_peerMsgsNew( t->tor, peer );
228        chokePulse( t );
229    }
230}
231
232void
233tr_peerMgrAddIncoming( tr_peerMgr      * manager,
234                       struct in_addr  * addr,
235                       int               socket )
236{
237    ++manager->connectionCount;
238
239    tr_handshakeAdd( tr_peerIoNewIncoming( manager->handle, addr, socket ),
240                     HANDSHAKE_ENCRYPTION_PREFERRED,
241                     myHandshakeDoneCB,
242                     manager );
243}
244
245static void
246maybeConnect( tr_peerMgr * manager, Torrent * t, tr_peer * peer )
247{
248    if( tr_peerMgrIsAcceptingConnections( manager ) )
249    {
250        fprintf( stderr, "peer-mgr: torrent [%s] is handshaking with a new peer %08x:%04x\n",
251                 t->tor->info.name,
252                 (uint32_t) peer->in_addr.s_addr, peer->port );
253
254        peer->io = tr_peerIoNewOutgoing( manager->handle, &peer->in_addr, peer->port, t->hash );
255
256        tr_handshakeAdd( peer->io, HANDSHAKE_ENCRYPTION_PREFERRED,
257                         myHandshakeDoneCB, manager );
258    }
259}
260
261void
262tr_peerMgrAddPex( tr_peerMgr     * manager,
263                  const uint8_t  * torrentHash,
264                  int              from,
265                  const tr_pex   * pex,
266                  int              pexCount )
267{
268    int i;
269    const tr_pex * walk = pex;
270    Torrent * t = getExistingTorrent( manager, torrentHash );
271    for( i=0; i<pexCount; ++i )
272    {
273        tr_peer * peer = getPeer( t, &walk->in_addr );
274        peer->port = walk->port;
275        peer->from = from;
276        maybeConnect( manager, t, peer );
277    }
278}
279
280void
281tr_peerMgrAddPeers( tr_peerMgr    * manager,
282                    const uint8_t * torrentHash,
283                    int             from,
284                    const uint8_t * peerCompact,
285                    int             peerCount )
286{
287    int i;
288    const uint8_t * walk = peerCompact;
289    Torrent * t = getExistingTorrent( manager, torrentHash );
290    for( i=0; i<peerCount; ++i )
291    {
292        tr_peer * peer;
293        struct in_addr addr;
294        uint16_t port;
295        memcpy( &addr, walk, 4 ); walk += 4;
296        memcpy( &port, walk, 2 ); walk += 2;
297        peer = getPeer( t, &addr );
298        peer->port = port;
299        peer->from = from;
300        maybeConnect( manager, t, peer );
301    }
302}
303
304/**
305***
306**/
307
308int
309tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager )
310{
311    return manager->connectionCount < MAX_CONNECTED_PEERS;
312}
313
314void
315tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
316                    const uint8_t  * torrentHash UNUSED,
317                    int              pieceIndex UNUSED,
318                    int              success UNUSED )
319{
320    assert( 0 );
321}
322
323int
324tr_pexCompare( const void * va, const void * vb )
325{
326    const tr_pex * a = (const tr_pex *) va;
327    const tr_pex * b = (const tr_pex *) vb;
328    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
329    if( i ) return i;
330    if( a->port < b->port ) return -1;
331    if( a->port > b->port ) return 1;
332    return 0;
333}
334
335int tr_pexCompare( const void * a, const void * b );
336
337
338int
339tr_peerMgrGetPeers( tr_peerMgr      * manager,
340                    const uint8_t   * torrentHash,
341                    tr_pex         ** setme_pex )
342{
343    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
344    int i, peerCount;
345    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
346    tr_pex * pex = tr_new( tr_pex, peerCount );
347    tr_pex * walk = pex;
348
349    for( i=0; i<peerCount; ++i, ++walk )
350    {
351        walk->in_addr = peers[i]->in_addr;
352        walk->port = peers[i]->port;
353        walk->flags = '\0'; /* FIXME */
354    }
355
356    assert( ( walk - pex ) == peerCount );
357    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
358    *setme_pex = pex;
359    return peerCount;
360}
361
362void
363tr_peerMgrStartTorrent( tr_peerMgr     * manager UNUSED,
364                        const uint8_t  * torrentHash UNUSED)
365{
366    //fprintf( stderr, "FIXME\n" );
367}
368
369void
370tr_peerMgrStopTorrent( tr_peerMgr     * manager UNUSED,
371                       const uint8_t  * torrentHash UNUSED )
372{
373    //fprintf( stderr, "FIXME\n" );
374}
375
376void
377tr_peerMgrAddTorrent( tr_peerMgr * manager,
378                      tr_torrent * tor )
379{
380    Torrent * t;
381
382    assert( tor != NULL );
383    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
384
385    t = tr_new0( Torrent, 1 );
386    t->tor = tor;
387    t->peers = tr_ptrArrayNew( );
388    t->choke_tag = tr_timerNew( manager->handle,
389                                chokePulse, t, NULL, 
390                                RECHOKE_PERIOD_SECONDS );
391    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
392    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
393}
394
395void
396tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
397                         const uint8_t  * torrentHash )
398{
399    Torrent * t = getExistingTorrent( manager, torrentHash );
400    if( t != NULL ) {
401        tr_peerMgrStopTorrent( manager, torrentHash );
402        freeTorrent( manager, t );
403    }
404}
405
406void
407tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
408                               const uint8_t    * torrentHash,
409                               int8_t           * tab,
410                               int                tabCount )
411{
412    int i;
413    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
414    const tr_torrent * tor = t->tor;
415    const float interval = tor->info.pieceCount / (float)tabCount;
416
417    for( i=0; i<tabCount; ++i )
418    {
419        const int piece = i * interval;
420
421        if( tor == NULL )
422            tab[i] = 0;
423        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
424            tab[i] = -1;
425        else {
426            int j, peerCount;
427            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
428            for( j=0; j<peerCount; ++j )
429                if( tr_bitfieldHas( peers[j]->have, i ) )
430                    ++tab[i];
431        }
432    }
433}
434
435
436void
437tr_peerMgrTorrentStats( const tr_peerMgr * manager,
438                        const uint8_t    * torrentHash,
439                        int              * setmePeersTotal,
440                        int              * setmePeersConnected,
441                        int              * setmePeersSendingToUs,
442                        int              * setmePeersGettingFromUs,
443                        int              * setmePeersFrom )
444{
445    int i, size;
446    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
447    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
448
449    *setmePeersTotal          = size;
450    *setmePeersConnected      = 0;
451    *setmePeersSendingToUs    = 0;
452    *setmePeersGettingFromUs  = 0;
453
454    for( i=0; i<size; ++i )
455    {
456        const tr_peer * peer = peers[i];
457
458        if( peer->io == NULL ) /* not connected */
459            continue;
460
461        ++*setmePeersConnected;
462
463        ++setmePeersFrom[peer->from];
464
465        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
466            ++*setmePeersGettingFromUs;
467
468        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
469            ++*setmePeersSendingToUs;
470    }
471}
472
473struct tr_peer_stat *
474tr_peerMgrPeerStats( const tr_peerMgr  * manager,
475                     const uint8_t     * torrentHash,
476                     int               * setmeCount UNUSED )
477{
478    int i, size;
479    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
480    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
481    tr_peer_stat * ret;
482
483    ret = tr_new0( tr_peer_stat, size );
484
485    for( i=0; i<size; ++i )
486    {
487        const tr_peer * peer = peers[i];
488        const int live = peer->io != NULL;
489        tr_peer_stat * stat = ret + i;
490
491        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
492        stat->port             = peer->port;
493        stat->from             = peer->from;
494        stat->client           = peer->client;
495        stat->progress         = peer->progress;
496        stat->isConnected      = live;
497        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
498        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
499        stat->isDownloading    = stat->uploadToRate > 0.01;
500        stat->isUploading      = stat->downloadFromRate > 0.01;
501    }
502
503    *setmeCount = size;
504    return ret;
505}
506
507void
508tr_peerMgrDisablePex( tr_peerMgr    * manager,
509                      const uint8_t * torrentHash,
510                      int             disable)
511{
512    Torrent * t = getExistingTorrent( manager, torrentHash );
513    tr_torrent * tor = t->tor;
514
515    if( ( tor->pexDisabled != disable ) && ! ( TR_FLAG_PRIVATE & tor->info.flags ) )
516    {
517        int i, size;
518        tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
519        for( i=0; i<size; ++i ) {
520            peers[i]->pexEnabled = disable ? 0 : 1;
521            peers[i]->lastPexTime = 0;
522        }
523
524        tor->pexDisabled = disable;
525    }
526}
527
528/**
529***
530**/
531
532typedef struct
533{
534    tr_peer * peer;
535    float rate;
536    int isInterested;
537}
538ChokeData;
539
540static int
541compareChokeByRate( const void * va, const void * vb )
542{
543    const ChokeData * a = ( const ChokeData * ) va;
544    const ChokeData * b = ( const ChokeData * ) vb;
545    if( a->rate > b->rate ) return -1;
546    if( a->rate < b->rate ) return 1;
547    return 0;
548}
549
550static int
551compareChokeByDownloader( const void * va, const void * vb )
552{
553    const ChokeData * a = ( const ChokeData * ) va;
554    const ChokeData * b = ( const ChokeData * ) vb;
555
556    /* primary key: interest */
557    if(  a->isInterested && !b->isInterested ) return -1;
558    if( !a->isInterested &&  b->isInterested ) return 1;
559
560    /* second key: rate */
561    return compareChokeByRate( va, vb );
562}
563
564static int
565chokePulse( void * vtorrent )
566{
567    Torrent * t = (Torrent *) vtorrent;
568    int i, size, unchoked;
569    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
570    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
571    float bestDownloaderRate;
572    ChokeData * data;
573
574fprintf( stderr, "rechoking torrent %p, with %d peers\n", t, size );
575
576    if( size < 1 )
577        return TRUE;
578
579    data = tr_new( ChokeData, size );
580    for( i=0; i<size; ++i ) {
581        data[i].peer = peers[i];
582        data[i].isInterested = peers[i]->peerIsInterested;
583        data[i].rate = done ? tr_peerIoGetRateToPeer( peers[i]->io )
584                            : tr_peerIoGetRateToClient( peers[i]->io );
585    }
586
587    /* find the best downloaders and unchoke them */
588    qsort( data, size, sizeof(ChokeData), compareChokeByDownloader );
589    bestDownloaderRate = data[0].rate;
590    for( i=unchoked=0; i<size && unchoked<NUM_DOWNLOADERS_TO_UNCHOKE; ++i ) {
591        if( data[i].peer->msgs != NULL ) {
592            tr_peerMsgsSetChoke( data[i].peer->msgs, FALSE );
593            ++unchoked;
594        }
595    }
596    memmove( data, data+i, sizeof(ChokeData)*(size-i) );
597    size -= i;
598
599    /* of those remaining, unchoke those that are faster than the downloaders */
600    qsort( data, size, sizeof(ChokeData), compareChokeByRate );
601    for( i=0; i<size && data[i].rate >= bestDownloaderRate; ++i )
602        tr_peerMsgsSetChoke( data[i].peer->msgs, FALSE );
603    memmove( data, data+i, sizeof(ChokeData)*(size-i) );
604    size -= i;
605
606    /* of those remaining, optimistically unchoke one; choke the rest */
607    if( size > 0 ) {
608        const int optimistic = tr_rand( size );
609        for( i=0; i<size; ++i )
610            tr_peerMsgsSetChoke( data[i].peer->msgs, i!=optimistic );
611    }
612
613    /* cleanup */
614    tr_free( data );
615    return TRUE;
616}
Note: See TracBrowser for help on using the repository browser.