source: branches/encryption/libtransmission/peer-mgr.c @ 3094

Last change on this file since 3094 was 3094, checked in by charles, 15 years ago

more work on the download speed plateau bug

  • Property svn:keywords set to Date Rev Author Id
File size: 27.9 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3094 2007-09-17 05:20:59Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include "transmission.h"
20#include "clients.h"
21#include "completion.h"
22#include "handshake.h"
23#include "net.h"
24#include "peer-io.h"
25#include "peer-mgr.h"
26#include "peer-mgr-private.h"
27#include "peer-msgs.h"
28#include "ptrarray.h"
29#include "trevent.h"
30#include "utils.h"
31
32/* how frequently to change which peers are choked */
33#define RECHOKE_PERIOD_SECONDS (15 * 1000)
34
35#define REFILL_PERIOD_MSEC 1000
36
37/* how many peers to unchoke per-torrent. */
38/* FIXME: make this user-configurable? */
39#define NUM_UNCHOKED_PEERS_PER_TORRENT 8
40
41/**
42***
43**/
44
45typedef struct
46{
47    uint8_t hash[SHA_DIGEST_LENGTH];
48    tr_ptrArray * peers; /* tr_peer */
49    tr_timer * chokeTimer;
50    tr_timer * refillTimer;
51    tr_torrent * tor;
52    tr_bitfield * requested;
53
54    unsigned int isRunning : 1;
55
56    struct tr_peerMgr * manager;
57}
58Torrent;
59
60struct tr_peerMgr
61{
62    tr_handle * handle;
63    tr_ptrArray * torrents; /* Torrent */
64    int connectionCount;
65    tr_ptrArray * handshakes; /* in-process */
66};
67
68/**
69***
70**/
71
72static int
73torrentCompare( const void * va, const void * vb )
74{
75    const Torrent * a = (const Torrent*) va;
76    const Torrent * b = (const Torrent*) vb;
77    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
78}
79
80static int
81torrentCompareToHash( const void * va, const void * vb )
82{
83    const Torrent * a = (const Torrent*) va;
84    const uint8_t * b_hash = (const uint8_t*) vb;
85    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
86}
87
88static Torrent*
89getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
90{
91    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
92                                             hash,
93                                             torrentCompareToHash );
94}
95
96static int chokePulse( void * vtorrent );
97
98static int
99peerCompare( const void * va, const void * vb )
100{
101    const tr_peer * a = (const tr_peer *) va;
102    const tr_peer * b = (const tr_peer *) vb;
103    return memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
104}
105
106static int
107peerCompareToAddr( const void * va, const void * vb )
108{
109    const tr_peer * a = (const tr_peer *) va;
110    const struct in_addr * b = (const struct in_addr *) vb;
111    return memcmp( &a->in_addr, b, sizeof(struct in_addr) );
112}
113
114static tr_peer*
115getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
116{
117    assert( torrent != NULL );
118    assert( torrent->peers != NULL );
119    assert( in_addr != NULL );
120
121    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
122                                             in_addr,
123                                             peerCompareToAddr );
124}
125
126static tr_peer*
127getPeer( Torrent * torrent, const struct in_addr * in_addr, int * isNew )
128{
129    tr_peer * peer = getExistingPeer( torrent, in_addr );
130
131    if( isNew )
132        *isNew = peer == NULL;
133
134    if( peer == NULL )
135    {
136        peer = tr_new0( tr_peer, 1 );
137        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
138        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
139fprintf( stderr, "getPeer: torrent %p now has %d peers\n", torrent, tr_ptrArraySize(torrent->peers) );
140    }
141
142    return peer;
143}
144
145static void
146disconnectPeer( tr_peer * peer )
147{
148    assert( peer != NULL );
149
150    tr_peerIoFree( peer->io );
151    peer->io = NULL;
152
153    if( peer->msgs != NULL )
154    {
155fprintf( stderr, "PUB unsub peer %p from msgs %p\n", peer, peer->msgs );
156        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
157        tr_peerMsgsFree( peer->msgs );
158        peer->msgs = NULL;
159    }
160
161    tr_bitfieldFree( peer->have );
162    peer->have = NULL;
163
164    tr_bitfieldFree( peer->blame );
165    peer->blame = NULL;
166
167    tr_bitfieldFree( peer->banned );
168    peer->banned = NULL;
169}
170
171static void
172freePeer( tr_peer * peer )
173{
174    disconnectPeer( peer );
175    tr_free( peer->client );
176    tr_free( peer );
177}
178
179static void
180freeTorrent( tr_peerMgr * manager, Torrent * t )
181{
182    int i, size;
183    tr_peer ** peers;
184    uint8_t hash[SHA_DIGEST_LENGTH];
185
186fprintf( stderr, "timer freeTorrent %p\n", t );
187
188    assert( manager != NULL );
189    assert( t != NULL );
190    assert( t->peers != NULL );
191    assert( getExistingTorrent( manager, t->hash ) != NULL );
192
193    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
194
195    tr_timerFree( &t->chokeTimer );
196    tr_timerFree( &t->refillTimer );
197
198    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
199    for( i=0; i<size; ++i )
200        freePeer( peers[i] );
201
202    tr_bitfieldFree( t->requested );
203    tr_ptrArrayFree( t->peers );
204    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
205    tr_free( t );
206
207    assert( getExistingTorrent( manager, hash ) == NULL );
208}
209
210/**
211***
212**/
213
214tr_peerMgr*
215tr_peerMgrNew( tr_handle * handle )
216{
217    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
218    m->handle = handle;
219    m->torrents = tr_ptrArrayNew( );
220    m->handshakes = tr_ptrArrayNew( );
221    return m;
222}
223
224void
225tr_peerMgrFree( tr_peerMgr * manager )
226{
227    int i, n;
228fprintf( stderr, "timer peerMgrFree\n" );
229
230    while( !tr_ptrArrayEmpty( manager->torrents ) )
231        freeTorrent( manager, (Torrent*)tr_ptrArrayNth( manager->torrents, 0) );
232    tr_ptrArrayFree( manager->torrents );
233
234    for( i=0, n=tr_ptrArraySize(manager->handshakes); i<n; ++i )
235        tr_handshakeAbort( (tr_handshake*) tr_ptrArrayNth( manager->handshakes, i) );
236    tr_ptrArrayFree( manager->handshakes );
237
238    tr_free( manager );
239}
240
241static tr_peer**
242getConnectedPeers( Torrent * t, int * setmeCount )
243{
244    int i, peerCount, connectionCount;
245    tr_peer **peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
246    tr_peer **ret = tr_new( tr_peer*, peerCount );
247
248    for( i=connectionCount=0; i<peerCount; ++i )
249        if( peers[i]->msgs != NULL )
250            ret[connectionCount++] = peers[i];
251
252    *setmeCount = connectionCount;
253    return ret;
254}
255
256/***
257****  Refill
258***/
259
260struct tr_refill_piece
261{
262    tr_priority_t priority;
263    uint32_t piece;
264    uint32_t peerCount;
265};
266
267static int
268compareRefillPiece (const void * aIn, const void * bIn)
269{
270    const struct tr_refill_piece * a = aIn;
271    const struct tr_refill_piece * b = bIn;
272
273    /* if one piece has a higher priority, it goes first */
274    if (a->priority != b->priority)
275        return a->priority > b->priority ? -1 : 1;
276
277    /* otherwise if one has fewer peers, it goes first */
278    if (a->peerCount != b->peerCount)
279        return a->peerCount < b->peerCount ? -1 : 1;
280
281    /* otherwise go with the earlier piece */
282    return a->piece - b->piece;
283}
284
285static int
286isPieceInteresting( const tr_torrent  * tor,
287                    int                 piece )
288{
289    if( tor->info.pieces[piece].dnd ) /* we don't want it */
290        return 0;
291
292    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
293        return 0;
294
295    return 1;
296}
297
298static uint32_t*
299getPreferredPieces( Torrent     * t,
300                    uint32_t    * pieceCount )
301{
302    const tr_torrent * tor = t->tor;
303    const tr_info * inf = &tor->info;
304
305    int i;
306    uint32_t poolSize = 0;
307    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
308
309    int peerCount;
310    tr_peer** peers = getConnectedPeers( t, &peerCount );
311
312    for( i=0; i<inf->pieceCount; ++i )
313        if( isPieceInteresting( tor, i ) )
314            pool[poolSize++] = i;
315
316    /* sort the pool from most interesting to least... */
317    if( poolSize > 1 )
318    {
319        uint32_t j;
320        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
321
322        for( j=0; j<poolSize; ++j )
323        {
324            int k;
325            const int piece = pool[j];
326            struct tr_refill_piece * setme = p + j;
327
328            setme->piece = piece;
329            setme->priority = inf->pieces[piece].priority;
330            setme->peerCount = 0;
331
332            for( k=0; k<peerCount; ++k ) {
333                const tr_peer * peer = peers[k];
334                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
335                    ++setme->peerCount;
336            }
337        }
338
339        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
340
341        for( j=0; j<poolSize; ++j )
342            pool[j] = p[j].piece;
343
344        tr_free( p );
345    }
346
347#if 0
348fprintf (stderr, "new pool: ");
349for (i=0; i<15 && i<poolSize; ++i ) fprintf (stderr, "%d, ", pool[i] );
350fprintf (stderr, "\n");
351#endif
352    tr_free( peers );
353
354    *pieceCount = poolSize;
355    return pool;
356}
357
358static uint64_t*
359getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
360{
361    uint32_t i;
362    uint32_t pieceCount;
363    uint32_t * pieces;
364    uint64_t *req, *unreq, *ret;
365    int reqCount, unreqCount;
366    const tr_torrent * tor = t->tor;
367
368    pieces = getPreferredPieces( t, &pieceCount );
369fprintf( stderr, "REFILL refillPulse for {%s} got %d of %d pieces\n", tor->info.name, (int)pieceCount, t->tor->info.pieceCount );
370
371    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
372    reqCount = 0;
373    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
374    unreqCount = 0;
375
376    for( i=0; i<pieceCount; ++i ) {
377        const uint32_t index = pieces[i];
378        const int begin = tr_torPieceFirstBlock( tor, index );
379        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
380        int block;
381        for( block=begin; block<end; ++block )
382            if( tr_cpBlockIsComplete( tor->completion, block ) )
383                continue;
384            else if( tr_bitfieldHas( t->requested, block ) )
385                req[reqCount++] = block;
386            else
387                unreq[unreqCount++] = block;
388    }
389
390fprintf( stderr, "REFILL refillPulse for {%s} reqCount is %d\n", tor->info.name, (int)reqCount );
391fprintf( stderr, "REFILL refillPulse for {%s} unreqCount is %d\n", tor->info.name, (int)unreqCount );
392    ret = tr_new( uint64_t, unreqCount + reqCount );
393    memcpy( ret, unreq, sizeof(uint64_t) * unreqCount );
394    memcpy( ret, req, sizeof(uint64_t) * reqCount );
395    *setmeCount = unreqCount + reqCount;
396
397    tr_free( req );
398    tr_free( unreq );
399
400    return ret;
401}
402
403static int
404refillPulse( void * vtorrent )
405{
406    Torrent * t = vtorrent;
407    tr_torrent * tor = t->tor;
408    uint32_t i;
409    int peerCount;
410    tr_peer ** peers;
411    uint64_t blockCount;
412    uint64_t * blocks;
413
414    if( !t->isRunning )
415        return TRUE;
416    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
417        return TRUE;
418
419    blocks = getPreferredBlocks( t, &blockCount );
420    peers = getConnectedPeers( t, &peerCount );
421
422fprintf( stderr, "REFILL refillPulse for {%s} got %d blocks\n", tor->info.name, (int)blockCount );
423
424    for( i=0; peerCount && i<blockCount; ++i )
425    {
426        const int block = blocks[i];
427        const uint32_t index = tr_torBlockPiece( tor, block );
428        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
429        const uint32_t length = tr_torBlockCountBytes( tor, block );
430        int j;
431        assert( _tr_block( tor, index, begin ) == block );
432        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
433        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
434
435
436        /* find a peer who can ask for this block */
437        for( j=0; j<peerCount; )
438        {
439            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
440            switch( val )
441            {
442                case TR_ADDREQ_FULL: 
443                case TR_ADDREQ_CLIENT_CHOKED:
444                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
445                    break;
446
447                case TR_ADDREQ_MISSING: 
448                    ++j;
449                    break;
450
451                case TR_ADDREQ_OK:
452                    fprintf( stderr, "REFILL peer %p took the request for block %d\n", peers[j]->msgs, block );
453                    tr_bitfieldAdd( t->requested, block );
454                    j = peerCount;
455                    break;
456
457                default:
458                    assert( 0 && "unhandled value" );
459                    break;
460            }
461        }
462    }
463
464    /* cleanup */
465    tr_free( peers );
466    tr_free( blocks );
467
468    t->refillTimer = NULL;
469    return FALSE;
470}
471
472static void
473broadcastClientHave( Torrent * t, uint32_t index )
474{
475    int i, size;
476    tr_peer ** peers = getConnectedPeers( t, &size );
477    for( i=0; i<size; ++i )
478        tr_peerMsgsHave( peers[i]->msgs, index );
479    tr_free( peers );
480}
481
482static void
483broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
484{
485    int i, size;
486    tr_peer ** peers = getConnectedPeers( t, &size );
487    for( i=0; i<size; ++i )
488        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
489    tr_free( peers );
490}
491
492static void
493msgsCallbackFunc( void * source UNUSED, void * vevent, void * vt )
494{
495    Torrent * t = (Torrent *) vt;
496    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
497
498    switch( e->eventType )
499    {
500        case TR_PEERMSG_NEED_REQ:
501            if( t->refillTimer == NULL )
502                t->refillTimer = tr_timerNew( t->manager->handle,
503                                              refillPulse, t,
504                                              REFILL_PERIOD_MSEC );
505            break;
506
507        case TR_PEERMSG_CLIENT_HAVE:
508            broadcastClientHave( t, e->pieceIndex );
509            break;
510
511        case TR_PEERMSG_CLIENT_BLOCK:
512            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
513            break;
514
515        case TR_PEERMSG_GOT_ERROR:
516            /* FIXME */
517            break;
518
519        default:
520            assert(0);
521    }
522}
523
524static void
525myHandshakeDoneCB( tr_handshake    * handshake,
526                   tr_peerIo       * io,
527                   int               isConnected,
528                   const uint8_t   * peer_id,
529                   int               peerSupportsEncryption,
530                   void            * vmanager )
531{
532    int ok = isConnected;
533    uint16_t port;
534    const struct in_addr * in_addr;
535    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
536    const uint8_t * hash = NULL;
537    Torrent * t;
538    tr_handshake * ours;
539
540    assert( io != NULL );
541    assert( isConnected==0 || isConnected==1 );
542    assert( peerSupportsEncryption==0 || peerSupportsEncryption==1 );
543
544    ours = tr_ptrArrayRemoveSorted( manager->handshakes,
545                                    handshake,
546                                    tr_comparePointers );
547    assert( handshake == ours );
548
549    in_addr = tr_peerIoGetAddress( io, &port );
550
551    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
552    {
553        tr_peerIoFree( io );
554        --manager->connectionCount;
555        return;
556    }
557
558    hash = tr_peerIoGetTorrentHash( io );
559    t = getExistingTorrent( manager, hash );
560    if( !t || !t->isRunning )
561    {
562        tr_peerIoFree( io );
563        --manager->connectionCount;
564        return;
565    }
566
567    fprintf( stderr, "peer-mgr: torrent [%s] finished a handshake; isConnected is %d\n", t->tor->info.name, isConnected );
568
569    /* if we couldn't connect or were snubbed,
570     * the peer's probably not worth remembering. */
571    if( !ok ) {
572        tr_peer * peer = getExistingPeer( t, in_addr );
573        fprintf( stderr, "peer-mgr: torrent [%s] got a bad one, and you know what? fuck them.\n", t->tor->info.name );
574        tr_peerIoFree( io );
575        --manager->connectionCount;
576        if( peer ) {
577            tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
578            freePeer( peer );
579        }
580        return;
581    }
582
583    if( 1 ) {
584        tr_peer * peer = getPeer( t, in_addr, NULL );
585        if( peer->msgs != NULL ) { /* we already have this peer */
586            tr_peerIoFree( io );
587            --manager->connectionCount;
588        } else {
589            peer->port = port;
590            peer->io = io;
591            peer->msgs = tr_peerMsgsNew( t->tor, peer );
592            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
593            peer->peerSupportsEncryption = peerSupportsEncryption ? 1 : 0;
594            fprintf( stderr, "PUB sub peer %p to msgs %p\n", peer, peer->msgs );
595            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
596        }
597    }
598}
599
600static void
601initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
602{
603    tr_handshake * handshake = tr_handshakeNew( io,
604                                                HANDSHAKE_ENCRYPTION_PREFERRED,
605                                                myHandshakeDoneCB,
606                                                manager );
607    ++manager->connectionCount;
608
609    tr_ptrArrayInsertSorted( manager->handshakes, handshake, tr_comparePointers );
610}
611
612void
613tr_peerMgrAddIncoming( tr_peerMgr      * manager,
614                       struct in_addr  * addr,
615                       int               socket )
616{
617    tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, socket );
618    initiateHandshake( manager, io );
619}
620
621static void
622maybeConnect( tr_peerMgr * manager, Torrent * t, tr_peer * peer )
623{
624    tr_peerIo * io;
625
626    assert( manager != NULL );
627    assert( t != NULL );
628    assert( peer != NULL );
629
630    if( peer->io != NULL ) { /* already connected */
631        fprintf( stderr, "not connecting because we already have an IO for that address\n" );
632        return;
633    }
634    if( !t->isRunning ) { /* torrent's not running */
635        fprintf( stderr, "OUTGOING connection not being made because t [%s] is not running\n", t->tor->info.name );
636        return;
637    }
638
639    io = tr_peerIoNewOutgoing( manager->handle,
640                               &peer->in_addr,
641                               peer->port,
642                               t->hash );
643    initiateHandshake( manager, io );
644}
645
646void
647tr_peerMgrAddPex( tr_peerMgr     * manager,
648                  const uint8_t  * torrentHash,
649                  int              from,
650                  const tr_pex   * pex,
651                  int              pexCount )
652{
653    int i;
654    const tr_pex * walk = pex;
655    Torrent * t = getExistingTorrent( manager, torrentHash );
656    for( i=0; i<pexCount; ++i )
657    {
658        int isNew;
659        tr_peer * peer = getPeer( t, &walk->in_addr, &isNew );
660        if( isNew ) {
661            peer->port = walk->port;
662            peer->from = from;
663            maybeConnect( manager, t, peer );
664        }
665    }
666}
667
668void
669tr_peerMgrAddPeers( tr_peerMgr    * manager,
670                    const uint8_t * torrentHash,
671                    int             from,
672                    const uint8_t * peerCompact,
673                    int             peerCount )
674{
675    int i;
676    const uint8_t * walk = peerCompact;
677    Torrent * t = getExistingTorrent( manager, torrentHash );
678    for( i=0; t!=NULL && i<peerCount; ++i )
679    {
680        int isNew;
681        tr_peer * peer;
682        struct in_addr addr;
683        uint16_t port;
684        memcpy( &addr, walk, 4 ); walk += 4;
685        memcpy( &port, walk, 2 ); walk += 2;
686        peer = getPeer( t, &addr, &isNew );
687        if( isNew ) {
688            peer->port = port;
689            peer->from = from;
690            maybeConnect( manager, t, peer );
691        }
692    }
693}
694
695/**
696***
697**/
698
699int
700tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager UNUSED )
701{
702    return TRUE; /* manager->connectionCount < MAX_CONNECTED_PEERS; */
703}
704
705void
706tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
707                    const uint8_t  * torrentHash UNUSED,
708                    int              pieceIndex UNUSED,
709                    int              success UNUSED )
710{
711    fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );
712}
713
714int
715tr_pexCompare( const void * va, const void * vb )
716{
717    const tr_pex * a = (const tr_pex *) va;
718    const tr_pex * b = (const tr_pex *) vb;
719    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
720    if( i ) return i;
721    if( a->port < b->port ) return -1;
722    if( a->port > b->port ) return 1;
723    return 0;
724}
725
726int tr_pexCompare( const void * a, const void * b );
727
728
729int
730tr_peerMgrGetPeers( tr_peerMgr      * manager,
731                    const uint8_t   * torrentHash,
732                    tr_pex         ** setme_pex )
733{
734    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
735    int i, peerCount;
736    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
737    tr_pex * pex = tr_new( tr_pex, peerCount );
738    tr_pex * walk = pex;
739
740    for( i=0; i<peerCount; ++i, ++walk )
741    {
742        const tr_peer * peer = peers[i];
743
744        walk->in_addr = peer->in_addr;
745
746        walk->port = peer->port;
747
748        walk->flags = 0;
749        if( peer->peerSupportsEncryption ) walk->flags |= 1;
750        if( peer->progress >= 1.0 )        walk->flags |= 2;
751    }
752
753    assert( ( walk - pex ) == peerCount );
754    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
755    *setme_pex = pex;
756    return peerCount;
757}
758
759void
760tr_peerMgrStartTorrent( tr_peerMgr     * manager,
761                        const uint8_t  * torrentHash )
762{
763    int i, peerCount;
764    Torrent * t = getExistingTorrent( manager, torrentHash );
765
766    t->isRunning = 1;
767
768    peerCount = tr_ptrArraySize( t->peers );
769    for( i=0; i<peerCount; ++i )
770        maybeConnect( manager, t, tr_ptrArrayNth( t->peers, i ) );
771}
772
773void
774tr_peerMgrStopTorrent( tr_peerMgr     * manager,
775                       const uint8_t  * torrentHash)
776{
777    int i, peerCount;
778    Torrent * t = getExistingTorrent( manager, torrentHash );
779
780    t->isRunning = 0;
781
782    peerCount = tr_ptrArraySize( t->peers );
783    for( i=0; i<peerCount; ++i )
784        disconnectPeer( tr_ptrArrayNth( t->peers, i ) );
785}
786
787void
788tr_peerMgrAddTorrent( tr_peerMgr * manager,
789                      tr_torrent * tor )
790{
791    Torrent * t;
792
793    assert( tor != NULL );
794    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
795
796    t = tr_new0( Torrent, 1 );
797    t->manager = manager;
798    t->tor = tor;
799    t->peers = tr_ptrArrayNew( );
800    t->requested = tr_bitfieldNew( tor->blockCount );
801    t->chokeTimer = tr_timerNew( manager->handle, chokePulse, t, RECHOKE_PERIOD_SECONDS );
802
803    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
804    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
805}
806
807void
808tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
809                         const uint8_t  * torrentHash )
810{
811    Torrent * t = getExistingTorrent( manager, torrentHash );
812    assert( t != NULL );
813    tr_peerMgrStopTorrent( manager, torrentHash );
814    freeTorrent( manager, t );
815}
816
817void
818tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
819                               const uint8_t    * torrentHash,
820                               int8_t           * tab,
821                               int                tabCount )
822{
823    int i;
824    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
825    const tr_torrent * tor = t->tor;
826    const float interval = tor->info.pieceCount / (float)tabCount;
827
828    memset( tab, 0, tabCount );
829
830    for( i=0; i<tabCount; ++i )
831    {
832        const int piece = i * interval;
833
834        if( tor == NULL )
835            tab[i] = 0;
836        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
837            tab[i] = -1;
838        else {
839            int j, peerCount;
840            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
841            for( j=0; j<peerCount; ++j )
842                if( tr_bitfieldHas( peers[j]->have, i ) )
843                    ++tab[i];
844        }
845    }
846}
847
848
849void
850tr_peerMgrTorrentStats( const tr_peerMgr * manager,
851                        const uint8_t    * torrentHash,
852                        int              * setmePeersTotal,
853                        int              * setmePeersConnected,
854                        int              * setmePeersSendingToUs,
855                        int              * setmePeersGettingFromUs,
856                        int              * setmePeersFrom )
857{
858    int i, size;
859    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
860    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
861
862    *setmePeersTotal          = size;
863    *setmePeersConnected      = 0;
864    *setmePeersSendingToUs    = 0;
865    *setmePeersGettingFromUs  = 0;
866
867    for( i=0; i<size; ++i )
868    {
869        const tr_peer * peer = peers[i];
870
871        if( peer->io == NULL ) /* not connected */
872            continue;
873
874        ++*setmePeersConnected;
875
876        ++setmePeersFrom[peer->from];
877
878        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
879            ++*setmePeersGettingFromUs;
880
881        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
882            ++*setmePeersSendingToUs;
883    }
884}
885
886struct tr_peer_stat *
887tr_peerMgrPeerStats( const tr_peerMgr  * manager,
888                     const uint8_t     * torrentHash,
889                     int               * setmeCount UNUSED )
890{
891    int i, size;
892    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
893    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
894    tr_peer_stat * ret;
895
896    ret = tr_new0( tr_peer_stat, size );
897
898    for( i=0; i<size; ++i )
899    {
900        const tr_peer * peer = peers[i];
901        const int live = peer->io != NULL;
902        tr_peer_stat * stat = ret + i;
903
904        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
905        stat->port             = peer->port;
906        stat->from             = peer->from;
907        stat->client           = peer->client;
908        stat->progress         = peer->progress;
909        stat->isConnected      = live ? 1 : 0;
910        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
911        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
912        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
913        stat->isDownloading    = stat->uploadToRate > 0.01;
914        stat->isUploading      = stat->downloadFromRate > 0.01;
915    }
916
917    *setmeCount = size;
918    return ret;
919}
920
921/**
922***
923**/
924
925typedef struct
926{
927    tr_peer * peer;
928    float rate;
929    int randomKey;
930    int preferred;
931    int doUnchoke;
932}
933ChokeData;
934
935static int
936compareChoke( const void * va, const void * vb )
937{
938    const ChokeData * a = ( const ChokeData * ) va;
939    const ChokeData * b = ( const ChokeData * ) vb;
940
941    if( a->preferred != b->preferred )
942        return a->preferred ? -1 : 1;
943
944    if( a->preferred )
945    {
946        if( a->rate > b->rate ) return -1;
947        if( a->rate < b->rate ) return 1;
948        return 0;
949    }
950    else
951    {
952        return a->randomKey - b->randomKey;
953    }
954}
955
956static int
957clientIsSnubbedBy( const tr_peer * peer )
958{
959    assert( peer != NULL );
960
961    return peer->peerSentDataAt < (time(NULL) - 30);
962}
963
964static void
965rechokeLeech( Torrent * t )
966{
967    int i, size, unchoked=0;
968    tr_peer ** peers = getConnectedPeers( t, &size );
969    ChokeData * choke = tr_new0( ChokeData, size );
970
971    /* sort the peers by preference and rate */
972    for( i=0; i<size; ++i ) {
973        tr_peer * peer = peers[i];
974        ChokeData * node = &choke[i];
975        node->peer = peer;
976        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
977        node->randomKey = tr_rand( INT_MAX );
978        node->rate = tr_peerIoGetRateToClient( peer->io );
979    }
980    qsort( choke, size, sizeof(ChokeData), compareChoke );
981
982    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
983        choke[i].doUnchoke = 1;
984        ++unchoked;
985    }
986
987    for( ; i<size; ++i ) {
988        choke[i].doUnchoke = 1;
989        ++unchoked;
990        if( choke[i].peer->peerIsInterested )
991            break;
992    }
993
994    for( i=0; i<size; ++i )
995        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
996
997    /* cleanup */
998    tr_free( choke );
999    tr_free( peers );
1000}
1001
1002static void
1003rechokeSeed( Torrent * t )
1004{
1005    int i, size;
1006    tr_peer ** peers = getConnectedPeers( t, &size );
1007
1008    /* FIXME */
1009    for( i=0; i<size; ++i )
1010        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1011
1012    tr_free( peers );
1013}
1014
1015static int
1016chokePulse( void * vtorrent )
1017{
1018    Torrent * t = vtorrent;
1019    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1020    if( done )
1021        rechokeLeech( vtorrent );
1022    else
1023        rechokeSeed( vtorrent );
1024    return TRUE;
1025}
Note: See TracBrowser for help on using the repository browser.