source: branches/encryption/libtransmission/peer-mgr.c @ 3076

Last change on this file since 3076 was 3076, checked in by charles, 15 years ago
  • better pex handling. we now support uTorrent's added.f flags for the first time ever...
  • fix bugs in tr_set_compare
  • if the torrent has `pexDisabled' set, don't give or recieve pex messages.
  • Property svn:keywords set to Date Rev Author Id
File size: 27.8 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3076 2007-09-15 20:36:46Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17
18#include "transmission.h"
19#include "clients.h"
20#include "completion.h"
21#include "handshake.h"
22#include "net.h"
23#include "peer-io.h"
24#include "peer-mgr.h"
25#include "peer-mgr-private.h"
26#include "peer-msgs.h"
27#include "ptrarray.h"
28#include "trevent.h"
29#include "utils.h"
30
31/* how frequently to change which peers are choked */
32#define RECHOKE_PERIOD_SECONDS (15 * 1000)
33
34#define REFILL_PERIOD_MSEC 1000
35
36/* how many peers to unchoke per-torrent. */
37/* FIXME: make this user-configurable? */
38#define NUM_UNCHOKED_PEERS_PER_TORRENT 8
39
40/**
41***
42**/
43
44struct tr_block
45{
46    unsigned int have          : 1;
47    unsigned int dnd           : 1;
48    unsigned int low_priority  : 1;
49    unsigned int high_priority : 1;
50    uint8_t requestCount;
51    uint8_t scarcity;
52    uint32_t block;
53};
54
55#define MAX_SCARCITY UINT8_MAX
56#define MAX_REQ_COUNT UINT8_MAX
57
58static void
59incrementReqCount( struct tr_block * block )
60{
61    assert( block != NULL );
62
63    if( block->requestCount < MAX_REQ_COUNT )
64        block->requestCount++;
65}
66
67static void
68incrementScarcity( struct tr_block * block )
69{
70    assert( block != NULL );
71
72    if( block->scarcity < MAX_SCARCITY )
73        block->scarcity++;
74}
75
76static int
77compareBlockByIndex( const void * va, const void * vb )
78{
79    const struct tr_block * a = (const struct tr_block *) va;
80    const struct tr_block * b = (const struct tr_block *) vb;
81    return tr_compareUint32( a->block, b->block );
82}
83
84static int
85compareBlockByInterest( const void * va, const void * vb )
86{
87    const struct tr_block * a = (const struct tr_block *) va;
88    const struct tr_block * b = (const struct tr_block *) vb;
89    int i;
90
91    if( a->dnd != b->dnd )
92        return a->dnd ? 1 : -1;
93
94    if( a->have != b->have )
95        return a->have ? 1 : -1;
96
97    if(( i = tr_compareUint8( a->requestCount, b->requestCount )))
98        return i;
99
100    if( a->high_priority != b->high_priority )
101        return a->high_priority ? -1 : 1;
102
103    if( a->low_priority != b->low_priority )
104        return a->low_priority ? 1 : -1;
105
106    if(( i = tr_compareUint16( a->scarcity, b->scarcity )))
107        return i;
108
109    if(( i = tr_compareUint32( a->block, b->block )))
110        return i;
111
112    return 0;
113}
114
115/**
116***
117**/
118
119typedef struct
120{
121    uint8_t hash[SHA_DIGEST_LENGTH];
122    tr_ptrArray * peers; /* tr_peer */
123    tr_timer * chokeTimer;
124    tr_timer * refillTimer;
125    tr_torrent * tor;
126
127    struct tr_block * blocks;
128    uint32_t blockCount;
129
130    unsigned int isRunning : 1;
131
132    struct tr_peerMgr * manager;
133}
134Torrent;
135
136struct tr_peerMgr
137{
138    tr_handle * handle;
139    tr_ptrArray * torrents; /* Torrent */
140    int connectionCount;
141    tr_ptrArray * handshakes; /* in-process */
142};
143
144/**
145***
146**/
147
148static int
149torrentCompare( const void * va, const void * vb )
150{
151    const Torrent * a = (const Torrent*) va;
152    const Torrent * b = (const Torrent*) vb;
153    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
154}
155
156static int
157torrentCompareToHash( const void * va, const void * vb )
158{
159    const Torrent * a = (const Torrent*) va;
160    const uint8_t * b_hash = (const uint8_t*) vb;
161    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
162}
163
164static Torrent*
165getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
166{
167    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
168                                             hash,
169                                             torrentCompareToHash );
170}
171
172static int chokePulse( void * vtorrent );
173
174static int
175peerCompare( const void * va, const void * vb )
176{
177    const tr_peer * a = (const tr_peer *) va;
178    const tr_peer * b = (const tr_peer *) vb;
179    return memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
180}
181
182static int
183peerCompareToAddr( const void * va, const void * vb )
184{
185    const tr_peer * a = (const tr_peer *) va;
186    const struct in_addr * b = (const struct in_addr *) vb;
187    return memcmp( &a->in_addr, b, sizeof(struct in_addr) );
188}
189
190static tr_peer*
191getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
192{
193    assert( torrent != NULL );
194    assert( torrent->peers != NULL );
195    assert( in_addr != NULL );
196
197    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
198                                             in_addr,
199                                             peerCompareToAddr );
200}
201
202static tr_peer*
203getPeer( Torrent * torrent, const struct in_addr * in_addr )
204{
205    tr_peer * peer = getExistingPeer( torrent, in_addr );
206
207    if( peer == NULL )
208    {
209        peer = tr_new0( tr_peer, 1 );
210        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
211        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
212fprintf( stderr, "getPeer: torrent %p now has %d peers\n", torrent, tr_ptrArraySize(torrent->peers) );
213    }
214    return peer;
215}
216
217static void
218disconnectPeer( tr_peer * peer )
219{
220    assert( peer != NULL );
221
222    tr_peerIoFree( peer->io );
223    peer->io = NULL;
224
225    if( peer->msgs != NULL )
226    {
227fprintf( stderr, "PUB unsub peer %p from msgs %p\n", peer, peer->msgs );
228        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
229        tr_peerMsgsFree( peer->msgs );
230        peer->msgs = NULL;
231    }
232
233    tr_bitfieldFree( peer->have );
234    peer->have = NULL;
235
236    tr_bitfieldFree( peer->blame );
237    peer->blame = NULL;
238
239    tr_bitfieldFree( peer->banned );
240    peer->banned = NULL;
241}
242
243static void
244freePeer( tr_peer * peer )
245{
246    disconnectPeer( peer );
247    tr_free( peer->client );
248    tr_free( peer );
249}
250
251static void
252freeTorrent( tr_peerMgr * manager, Torrent * t )
253{
254    int i, size;
255    tr_peer ** peers;
256    uint8_t hash[SHA_DIGEST_LENGTH];
257
258fprintf( stderr, "timer freeTorrent %p\n", t );
259
260    assert( manager != NULL );
261    assert( t != NULL );
262    assert( t->peers != NULL );
263    assert( getExistingTorrent( manager, t->hash ) != NULL );
264
265    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
266
267    tr_timerFree( &t->chokeTimer );
268    tr_timerFree( &t->refillTimer );
269
270    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
271    for( i=0; i<size; ++i )
272        freePeer( peers[i] );
273
274    tr_ptrArrayFree( t->peers );
275    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
276    tr_free( t->blocks );
277    tr_free( t );
278
279    assert( getExistingTorrent( manager, hash ) == NULL );
280}
281
282/**
283***
284**/
285
286tr_peerMgr*
287tr_peerMgrNew( tr_handle * handle )
288{
289    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
290    m->handle = handle;
291    m->torrents = tr_ptrArrayNew( );
292    m->handshakes = tr_ptrArrayNew( );
293    return m;
294}
295
296void
297tr_peerMgrFree( tr_peerMgr * manager )
298{
299    int i, n;
300fprintf( stderr, "timer peerMgrFree\n" );
301
302    while( !tr_ptrArrayEmpty( manager->torrents ) )
303        freeTorrent( manager, (Torrent*)tr_ptrArrayNth( manager->torrents, 0) );
304    tr_ptrArrayFree( manager->torrents );
305
306    for( i=0, n=tr_ptrArraySize(manager->handshakes); i<n; ++i )
307        tr_handshakeAbort( (tr_handshake*) tr_ptrArrayNth( manager->handshakes, i) );
308    tr_ptrArrayFree( manager->handshakes );
309
310    tr_free( manager );
311}
312
313/**
314***
315**/
316
317static tr_peer**
318getConnectedPeers( Torrent * t, int * setmeCount )
319{
320    int i, peerCount, connectionCount;
321    tr_peer **peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
322    tr_peer **ret = tr_new( tr_peer*, peerCount );
323
324    for( i=connectionCount=0; i<peerCount; ++i )
325        if( peers[i]->msgs != NULL )
326            ret[connectionCount++] = peers[i];
327
328    *setmeCount = connectionCount;
329    return ret;
330}
331
332static void
333populateBlockArray( Torrent * t )
334{
335    uint32_t i;
336    struct tr_block * b;
337    const tr_torrent * tor = t->tor;
338
339    t->blockCount = tor->blockCount;
340    t->blocks = b = tr_new( struct tr_block, t->blockCount );
341
342    for( i=0; i<t->blockCount; ++i, ++b )
343    {
344        const int index = tr_torBlockPiece( tor, i );
345        b->have = tr_cpBlockIsComplete( tor->completion, i ) ? 1 : 0;
346        b->dnd = tor->info.pieces[index].dnd ? 1 : 0;
347        b->low_priority = tor->info.pieces[index].priority == TR_PRI_LOW;
348        b->high_priority = tor->info.pieces[index].priority == TR_PRI_HIGH;
349        b->requestCount = 0;
350        b->scarcity = 0;
351        b->block = i;
352    }
353}
354
355static int
356refillPulse( void * vtorrent )
357{
358    uint32_t i;
359    int size;
360    Torrent * t = (Torrent *) vtorrent;
361    tr_peer ** peers;
362    const int isSeeding = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
363    const int wantToRefill = t->isRunning && !isSeeding;
364
365    if( !wantToRefill && t->blocks!=NULL ) /* torrent has stopped or switched to seeding */
366    {
367        tr_free( t->blocks );
368        t->blocks = NULL;
369        t->blockCount = 0;
370    }
371    else if( wantToRefill && t->blocks==NULL ) /* torrent has started or switched to leeching */
372    {
373        populateBlockArray( t );
374    }
375
376    peers = getConnectedPeers( t, &size );
377    if( wantToRefill && size>0 )
378    {
379        /* sort the blocks by interest */
380        fprintf( stderr, "sorting [%s] blocks by interest...", t->tor->info.name );
381        qsort( t->blocks, t->blockCount, sizeof(struct tr_block), compareBlockByInterest );
382        fprintf( stderr, "done\n" );
383
384        /* walk through all the most interesting blocks */
385        for( i=0; i<t->blockCount; ++i )
386        {
387            const uint32_t b = t->blocks[i].block;
388            const uint32_t index = tr_torBlockPiece( t->tor, b );
389            const uint32_t begin = ( b * t->tor->blockSize )-( index * t->tor->info.pieceSize );
390            const uint32_t length = tr_torBlockCountBytes( t->tor, (int)b );
391            int j;
392
393            if( t->blocks[i].have || t->blocks[i].dnd )
394                continue;
395
396            if( !size ) /* all peers full */
397                break;
398
399            /* find a peer who can ask for this block */
400            for( j=0; j<size; )
401            {
402                const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
403                switch( val )
404                {
405                    case TR_ADDREQ_FULL: 
406                    case TR_ADDREQ_CLIENT_CHOKED: 
407                        peers[j] = peers[--size];
408                        break;
409
410                    case TR_ADDREQ_MISSING: 
411                        ++j;
412                        break;
413
414                    case TR_ADDREQ_OK:
415                        fprintf( stderr, "peer %p took the request for block %d\n", peers[j]->msgs, b );
416                        incrementReqCount( &t->blocks[i] );
417                        j = size;
418                        break;
419                }
420            }
421        }
422
423        /* put the blocks back the way we found them */
424        qsort( t->blocks, t->blockCount, sizeof(struct tr_block), compareBlockByIndex );
425    }
426
427    /* cleanup */
428    tr_free( peers );
429
430    return TRUE;
431}
432
433static void
434broadcastClientHave( Torrent * t, uint32_t index )
435{
436    int i, size;
437    tr_peer ** peers = getConnectedPeers( t, &size );
438    for( i=0; i<size; ++i )
439        tr_peerMsgsHave( peers[i]->msgs, index );
440    tr_free( peers );
441}
442
443static void
444broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
445{
446    int i, size;
447    tr_peer ** peers = getConnectedPeers( t, &size );
448    for( i=0; i<size; ++i )
449        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
450    tr_free( peers );
451}
452
453static void
454msgsCallbackFunc( void * source UNUSED, void * vevent, void * vt )
455{
456    Torrent * t = (Torrent *) vt;
457    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
458
459    switch( e->eventType )
460    {
461        case TR_PEERMSG_PEER_BITFIELD: {
462            if( t->blocks!=NULL ) {
463                int i;
464                for( i=0; i<t->tor->info.pieceCount; ++i ) {
465                    const uint32_t begin = tr_torPieceFirstBlock( t->tor, i );
466                    const uint32_t end = begin + tr_torPieceCountBlocks( t->tor, i );
467                    uint32_t j;
468                    for( j=begin; t->blocks!=NULL && j<end; ++j ) {
469                        assert( t->blocks[j].block == j );
470                        incrementScarcity( &t->blocks[j] );
471                    }
472                }
473            }
474            break;
475        }
476
477        case TR_PEERMSG_CLIENT_HAVE:
478            broadcastClientHave( t, e->pieceIndex );
479            break;
480
481        case TR_PEERMSG_PEER_HAVE: {
482            if( t->blocks != NULL ) {
483                uint32_t i;
484                const uint32_t begin = tr_torPieceFirstBlock( t->tor, e->pieceIndex );
485                const uint32_t end = begin + tr_torPieceCountBlocks( t->tor, (int)e->pieceIndex );
486                for( i=begin; i<end; ++i ) {
487                    assert( t->blocks[i].block == i );
488                    incrementScarcity( &t->blocks[i] );
489                }
490            }
491            break;
492        }
493
494        case TR_PEERMSG_CLIENT_BLOCK: {
495            if( t->blocks != NULL ) {
496                const uint32_t i = _tr_block( t->tor, e->pieceIndex, e->offset );
497                assert( t->blocks[i].block == i );
498                t->blocks[i].have = 1;
499            }
500            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
501            break;
502        }
503
504        case TR_PEERMSG_GOT_PEX:
505            /* FIXME */
506            break;
507
508        case TR_PEERMSG_GOT_ERROR:
509            /* FIXME */
510            break;
511
512        default:
513            assert(0);
514    }
515}
516
517static void
518myHandshakeDoneCB( tr_handshake    * handshake,
519                   tr_peerIo       * io,
520                   int               isConnected,
521                   const uint8_t   * peer_id,
522                   int               peerSupportsEncryption,
523                   void            * vmanager )
524{
525    int ok = isConnected;
526    uint16_t port;
527    const struct in_addr * in_addr;
528    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
529    const uint8_t * hash = NULL;
530    Torrent * t;
531    tr_handshake * ours;
532
533    assert( io != NULL );
534    assert( isConnected==0 || isConnected==1 );
535    assert( peerSupportsEncryption==0 || peerSupportsEncryption==1 );
536
537    ours = tr_ptrArrayRemoveSorted( manager->handshakes,
538                                    handshake,
539                                    tr_comparePointers );
540    assert( handshake == ours );
541
542    in_addr = tr_peerIoGetAddress( io, &port );
543
544    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
545    {
546        tr_peerIoFree( io );
547        --manager->connectionCount;
548        return;
549    }
550
551    hash = tr_peerIoGetTorrentHash( io );
552    t = getExistingTorrent( manager, hash );
553    if( !t || !t->isRunning )
554    {
555        tr_peerIoFree( io );
556        --manager->connectionCount;
557        return;
558    }
559
560    fprintf( stderr, "peer-mgr: torrent [%s] finished a handshake; isConnected is %d\n", t->tor->info.name, isConnected );
561
562    /* if we couldn't connect or were snubbed,
563     * the peer's probably not worth remembering. */
564    if( !ok ) {
565        tr_peer * peer = getExistingPeer( t, in_addr );
566        fprintf( stderr, "peer-mgr: torrent [%s] got a bad one, and you know what? fuck them.\n", t->tor->info.name );
567        tr_peerIoFree( io );
568        --manager->connectionCount;
569        if( peer ) {
570            tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
571            freePeer( peer );
572        }
573        return;
574    }
575
576    if( 1 ) {
577        tr_peer * peer = getPeer( t, in_addr );
578        if( peer->msgs != NULL ) { /* we alerady have this peer */
579            tr_peerIoFree( io );
580            --manager->connectionCount;
581        } else {
582            peer->port = port;
583            peer->io = io;
584            peer->msgs = tr_peerMsgsNew( t->tor, peer );
585            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
586            peer->peerSupportsEncryption = peerSupportsEncryption ? 1 : 0;
587            fprintf( stderr, "PUB sub peer %p to msgs %p\n", peer, peer->msgs );
588            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
589        }
590    }
591}
592
593static void
594initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
595{
596    tr_handshake * handshake = tr_handshakeNew( io,
597                                                HANDSHAKE_ENCRYPTION_PREFERRED,
598                                                myHandshakeDoneCB,
599                                                manager );
600    ++manager->connectionCount;
601
602    tr_ptrArrayInsertSorted( manager->handshakes, handshake, tr_comparePointers );
603}
604
605void
606tr_peerMgrAddIncoming( tr_peerMgr      * manager,
607                       struct in_addr  * addr,
608                       int               socket )
609{
610    tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, socket );
611    initiateHandshake( manager, io );
612}
613
614static void
615maybeConnect( tr_peerMgr * manager, Torrent * t, tr_peer * peer )
616{
617    tr_peerIo * io;
618
619    assert( manager != NULL );
620    assert( t != NULL );
621    assert( peer != NULL );
622
623    if( peer->io != NULL ) { /* already connected */
624        fprintf( stderr, "not connecting because we already have an IO for that address\n" );
625        return;
626    }
627    if( !t->isRunning ) { /* torrent's not running */
628        fprintf( stderr, "OUTGOING connection not being made because t [%s] is not running\n", t->tor->info.name );
629        return;
630    }
631
632    io = tr_peerIoNewOutgoing( manager->handle,
633                               &peer->in_addr,
634                               peer->port,
635                               t->hash );
636    initiateHandshake( manager, io );
637}
638
639void
640tr_peerMgrAddPex( tr_peerMgr     * manager,
641                  const uint8_t  * torrentHash,
642                  int              from,
643                  const tr_pex   * pex,
644                  int              pexCount )
645{
646    int i;
647    const tr_pex * walk = pex;
648    Torrent * t = getExistingTorrent( manager, torrentHash );
649    for( i=0; i<pexCount; ++i )
650    {
651        tr_peer * peer = getPeer( t, &walk->in_addr );
652        peer->port = walk->port;
653        peer->from = from;
654        maybeConnect( manager, t, peer );
655    }
656}
657
658void
659tr_peerMgrAddPeers( tr_peerMgr    * manager,
660                    const uint8_t * torrentHash,
661                    int             from,
662                    const uint8_t * peerCompact,
663                    int             peerCount )
664{
665    int i;
666    const uint8_t * walk = peerCompact;
667    Torrent * t = getExistingTorrent( manager, torrentHash );
668    for( i=0; t!=NULL && i<peerCount; ++i )
669    {
670        tr_peer * peer;
671        struct in_addr addr;
672        uint16_t port;
673        memcpy( &addr, walk, 4 ); walk += 4;
674        memcpy( &port, walk, 2 ); walk += 2;
675        peer = getPeer( t, &addr );
676        peer->port = port;
677        peer->from = from;
678        maybeConnect( manager, t, peer );
679    }
680}
681
682/**
683***
684**/
685
686int
687tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager UNUSED )
688{
689    return TRUE; /* manager->connectionCount < MAX_CONNECTED_PEERS; */
690}
691
692void
693tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
694                    const uint8_t  * torrentHash UNUSED,
695                    int              pieceIndex UNUSED,
696                    int              success UNUSED )
697{
698    fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );
699}
700
701int
702tr_pexCompare( const void * va, const void * vb )
703{
704    const tr_pex * a = (const tr_pex *) va;
705    const tr_pex * b = (const tr_pex *) vb;
706    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
707    if( i ) return i;
708    if( a->port < b->port ) return -1;
709    if( a->port > b->port ) return 1;
710    return 0;
711}
712
713int tr_pexCompare( const void * a, const void * b );
714
715
716int
717tr_peerMgrGetPeers( tr_peerMgr      * manager,
718                    const uint8_t   * torrentHash,
719                    tr_pex         ** setme_pex )
720{
721    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
722    int i, peerCount;
723    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
724    tr_pex * pex = tr_new( tr_pex, peerCount );
725    tr_pex * walk = pex;
726
727    for( i=0; i<peerCount; ++i, ++walk )
728    {
729        const tr_peer * peer = peers[i];
730
731        walk->in_addr = peer->in_addr;
732
733        walk->port = peer->port;
734
735        walk->flags = 0;
736        if( peer->peerSupportsEncryption ) walk->flags |= 1;
737        if( peer->progress >= 1.0 )        walk->flags |= 2;
738    }
739
740    assert( ( walk - pex ) == peerCount );
741    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
742    *setme_pex = pex;
743    return peerCount;
744}
745
746void
747tr_peerMgrStartTorrent( tr_peerMgr     * manager,
748                        const uint8_t  * torrentHash )
749{
750    int i, peerCount;
751    Torrent * t = getExistingTorrent( manager, torrentHash );
752
753    t->isRunning = 1;
754
755    peerCount = tr_ptrArraySize( t->peers );
756    for( i=0; i<peerCount; ++i )
757        maybeConnect( manager, t, tr_ptrArrayNth( t->peers, i ) );
758}
759
760void
761tr_peerMgrStopTorrent( tr_peerMgr     * manager,
762                       const uint8_t  * torrentHash)
763{
764    int i, peerCount;
765    Torrent * t = getExistingTorrent( manager, torrentHash );
766
767    t->isRunning = 0;
768
769    peerCount = tr_ptrArraySize( t->peers );
770    for( i=0; i<peerCount; ++i )
771        disconnectPeer( tr_ptrArrayNth( t->peers, i ) );
772}
773
774void
775tr_peerMgrUpdateCompletion( tr_peerMgr     * manager,
776                            const uint8_t  * torrentHash )
777{
778    uint32_t i;
779    Torrent * t = getExistingTorrent( manager, torrentHash );
780
781    for( i=0; t->blocks!=NULL && i<t->blockCount; ++i ) {
782        assert( t->blocks[i].block == i );
783        t->blocks[i].have = tr_cpBlockIsComplete( t->tor->completion, i ) ? 1 : 0;
784    }
785}
786
787void
788tr_peerMgrAddTorrent( tr_peerMgr * manager,
789                      tr_torrent * tor )
790{
791    Torrent * t;
792
793    assert( tor != NULL );
794    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
795
796    t = tr_new0( Torrent, 1 );
797    t->manager = manager;
798    t->tor = tor;
799    t->peers = tr_ptrArrayNew( );
800    t->chokeTimer = tr_timerNew( manager->handle, chokePulse, t, RECHOKE_PERIOD_SECONDS );
801    t->refillTimer = tr_timerNew( t->manager->handle, refillPulse, t, REFILL_PERIOD_MSEC );
802
803    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
804    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
805}
806
807void
808tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
809                         const uint8_t  * torrentHash )
810{
811    Torrent * t = getExistingTorrent( manager, torrentHash );
812    assert( t != NULL );
813    tr_peerMgrStopTorrent( manager, torrentHash );
814    freeTorrent( manager, t );
815}
816
817void
818tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
819                               const uint8_t    * torrentHash,
820                               int8_t           * tab,
821                               int                tabCount )
822{
823    int i;
824    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
825    const tr_torrent * tor = t->tor;
826    const float interval = tor->info.pieceCount / (float)tabCount;
827
828    memset( tab, 0, tabCount );
829
830    for( i=0; i<tabCount; ++i )
831    {
832        const int piece = i * interval;
833
834        if( tor == NULL )
835            tab[i] = 0;
836        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
837            tab[i] = -1;
838        else {
839            int j, peerCount;
840            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
841            for( j=0; j<peerCount; ++j )
842                if( tr_bitfieldHas( peers[j]->have, i ) )
843                    ++tab[i];
844        }
845    }
846}
847
848
849void
850tr_peerMgrTorrentStats( const tr_peerMgr * manager,
851                        const uint8_t    * torrentHash,
852                        int              * setmePeersTotal,
853                        int              * setmePeersConnected,
854                        int              * setmePeersSendingToUs,
855                        int              * setmePeersGettingFromUs,
856                        int              * setmePeersFrom )
857{
858    int i, size;
859    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
860    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
861
862    *setmePeersTotal          = size;
863    *setmePeersConnected      = 0;
864    *setmePeersSendingToUs    = 0;
865    *setmePeersGettingFromUs  = 0;
866
867    for( i=0; i<size; ++i )
868    {
869        const tr_peer * peer = peers[i];
870
871        if( peer->io == NULL ) /* not connected */
872            continue;
873
874        ++*setmePeersConnected;
875
876        ++setmePeersFrom[peer->from];
877
878        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
879            ++*setmePeersGettingFromUs;
880
881        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
882            ++*setmePeersSendingToUs;
883    }
884}
885
886struct tr_peer_stat *
887tr_peerMgrPeerStats( const tr_peerMgr  * manager,
888                     const uint8_t     * torrentHash,
889                     int               * setmeCount UNUSED )
890{
891    int i, size;
892    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
893    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
894    tr_peer_stat * ret;
895
896    ret = tr_new0( tr_peer_stat, size );
897
898    for( i=0; i<size; ++i )
899    {
900        const tr_peer * peer = peers[i];
901        const int live = peer->io != NULL;
902        tr_peer_stat * stat = ret + i;
903
904        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
905        stat->port             = peer->port;
906        stat->from             = peer->from;
907        stat->client           = peer->client;
908        stat->progress         = peer->progress;
909        stat->isConnected      = live ? 1 : 0;
910        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
911        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
912        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
913        stat->isDownloading    = stat->uploadToRate > 0.01;
914        stat->isUploading      = stat->downloadFromRate > 0.01;
915    }
916
917    *setmeCount = size;
918    return ret;
919}
920
921/**
922***
923**/
924
925typedef struct
926{
927    tr_peer * peer;
928    float rate;
929    int randomKey;
930    int preferred;
931    int doUnchoke;
932}
933ChokeData;
934
935static int
936compareChoke( const void * va, const void * vb )
937{
938    const ChokeData * a = ( const ChokeData * ) va;
939    const ChokeData * b = ( const ChokeData * ) vb;
940
941    if( a->preferred != b->preferred )
942        return a->preferred ? -1 : 1;
943
944    if( a->preferred )
945    {
946        if( a->rate > b->rate ) return -1;
947        if( a->rate < b->rate ) return 1;
948        return 0;
949    }
950    else
951    {
952        return a->randomKey - b->randomKey;
953    }
954}
955
956static int
957clientIsSnubbedBy( const tr_peer * peer )
958{
959    assert( peer != NULL );
960
961    return peer->peerSentDataAt < (time(NULL) - 30);
962}
963
964static void
965rechokeLeech( Torrent * t )
966{
967    int i, size, unchoked=0;
968    tr_peer ** peers = getConnectedPeers( t, &size );
969    ChokeData * choke = tr_new0( ChokeData, size );
970
971    /* sort the peers by preference and rate */
972    for( i=0; i<size; ++i ) {
973        tr_peer * peer = peers[i];
974        ChokeData * node = &choke[i];
975        node->peer = peer;
976        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
977        node->randomKey = tr_rand( INT_MAX );
978        node->rate = tr_peerIoGetRateToClient( peer->io );
979    }
980    qsort( choke, size, sizeof(ChokeData), compareChoke );
981
982    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
983        choke[i].doUnchoke = 1;
984        ++unchoked;
985    }
986
987    for( ; i<size; ++i ) {
988        choke[i].doUnchoke = 1;
989        ++unchoked;
990        if( choke[i].peer->peerIsInterested )
991            break;
992    }
993
994    for( i=0; i<size; ++i )
995        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
996
997    /* cleanup */
998    tr_free( choke );
999    tr_free( peers );
1000}
1001
1002static void
1003rechokeSeed( Torrent * t )
1004{
1005    int i, size;
1006    tr_peer ** peers = getConnectedPeers( t, &size );
1007
1008    /* FIXME */
1009    for( i=0; i<size; ++i )
1010        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1011
1012    tr_free( peers );
1013}
1014
1015static int
1016chokePulse( void * vtorrent )
1017{
1018    Torrent * t = vtorrent;
1019    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1020    if( done )
1021        rechokeLeech( vtorrent );
1022    else
1023        rechokeSeed( vtorrent );
1024    return TRUE;
1025}
Note: See TracBrowser for help on using the repository browser.