source: branches/encryption/libtransmission/peer-mgr.c @ 3095

Last change on this file since 3095 was 3095, checked in by charles, 15 years ago

fix a couple of bugs I introduced yesterday.

  • Property svn:keywords set to Date Rev Author Id
File size: 27.9 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3095 2007-09-17 13:09:48Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include "transmission.h"
20#include "clients.h"
21#include "completion.h"
22#include "handshake.h"
23#include "net.h"
24#include "peer-io.h"
25#include "peer-mgr.h"
26#include "peer-mgr-private.h"
27#include "peer-msgs.h"
28#include "ptrarray.h"
29#include "trevent.h"
30#include "utils.h"
31
32/* how frequently to change which peers are choked */
33#define RECHOKE_PERIOD_SECONDS (15 * 1000)
34
35#define REFILL_PERIOD_MSEC 1000
36
37/* how many peers to unchoke per-torrent. */
38/* FIXME: make this user-configurable? */
39#define NUM_UNCHOKED_PEERS_PER_TORRENT 8
40
41/**
42***
43**/
44
45typedef struct
46{
47    uint8_t hash[SHA_DIGEST_LENGTH];
48    tr_ptrArray * peers; /* tr_peer */
49    tr_timer * chokeTimer;
50    tr_timer * refillTimer;
51    tr_torrent * tor;
52    tr_bitfield * requested;
53
54    unsigned int isRunning : 1;
55
56    struct tr_peerMgr * manager;
57}
58Torrent;
59
60struct tr_peerMgr
61{
62    tr_handle * handle;
63    tr_ptrArray * torrents; /* Torrent */
64    int connectionCount;
65    tr_ptrArray * handshakes; /* in-process */
66};
67
68/**
69***
70**/
71
72static int
73torrentCompare( const void * va, const void * vb )
74{
75    const Torrent * a = (const Torrent*) va;
76    const Torrent * b = (const Torrent*) vb;
77    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
78}
79
80static int
81torrentCompareToHash( const void * va, const void * vb )
82{
83    const Torrent * a = (const Torrent*) va;
84    const uint8_t * b_hash = (const uint8_t*) vb;
85    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
86}
87
88static Torrent*
89getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
90{
91    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
92                                             hash,
93                                             torrentCompareToHash );
94}
95
96static int chokePulse( void * vtorrent );
97
98static int
99peerCompare( const void * va, const void * vb )
100{
101    const tr_peer * a = (const tr_peer *) va;
102    const tr_peer * b = (const tr_peer *) vb;
103    return memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
104}
105
106static int
107peerCompareToAddr( const void * va, const void * vb )
108{
109    const tr_peer * a = (const tr_peer *) va;
110    const struct in_addr * b = (const struct in_addr *) vb;
111    return memcmp( &a->in_addr, b, sizeof(struct in_addr) );
112}
113
114static tr_peer*
115getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
116{
117    assert( torrent != NULL );
118    assert( torrent->peers != NULL );
119    assert( in_addr != NULL );
120
121    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
122                                             in_addr,
123                                             peerCompareToAddr );
124}
125
126static tr_peer*
127getPeer( Torrent * torrent, const struct in_addr * in_addr, int * isNew )
128{
129    tr_peer * peer = getExistingPeer( torrent, in_addr );
130
131    if( isNew )
132        *isNew = peer == NULL;
133
134    if( peer == NULL )
135    {
136        peer = tr_new0( tr_peer, 1 );
137        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
138        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
139fprintf( stderr, "getPeer: torrent %p now has %d peers\n", torrent, tr_ptrArraySize(torrent->peers) );
140    }
141
142    return peer;
143}
144
145static void
146disconnectPeer( tr_peer * peer )
147{
148    assert( peer != NULL );
149
150    tr_peerIoFree( peer->io );
151    peer->io = NULL;
152
153    if( peer->msgs != NULL )
154    {
155fprintf( stderr, "PUB unsub peer %p from msgs %p\n", peer, peer->msgs );
156        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
157        tr_peerMsgsFree( peer->msgs );
158        peer->msgs = NULL;
159    }
160
161    tr_bitfieldFree( peer->have );
162    peer->have = NULL;
163
164    tr_bitfieldFree( peer->blame );
165    peer->blame = NULL;
166
167    tr_bitfieldFree( peer->banned );
168    peer->banned = NULL;
169}
170
171static void
172freePeer( tr_peer * peer )
173{
174    disconnectPeer( peer );
175    tr_free( peer->client );
176    tr_free( peer );
177}
178
179static void
180freeTorrent( tr_peerMgr * manager, Torrent * t )
181{
182    int i, size;
183    tr_peer ** peers;
184    uint8_t hash[SHA_DIGEST_LENGTH];
185
186fprintf( stderr, "timer freeTorrent %p\n", t );
187
188    assert( manager != NULL );
189    assert( t != NULL );
190    assert( t->peers != NULL );
191    assert( getExistingTorrent( manager, t->hash ) != NULL );
192
193    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
194
195    tr_timerFree( &t->chokeTimer );
196    tr_timerFree( &t->refillTimer );
197
198    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
199    for( i=0; i<size; ++i )
200        freePeer( peers[i] );
201
202    tr_bitfieldFree( t->requested );
203    tr_ptrArrayFree( t->peers );
204    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
205    tr_free( t );
206
207    assert( getExistingTorrent( manager, hash ) == NULL );
208}
209
210/**
211***
212**/
213
214tr_peerMgr*
215tr_peerMgrNew( tr_handle * handle )
216{
217    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
218    m->handle = handle;
219    m->torrents = tr_ptrArrayNew( );
220    m->handshakes = tr_ptrArrayNew( );
221    return m;
222}
223
224void
225tr_peerMgrFree( tr_peerMgr * manager )
226{
227    int i, n;
228fprintf( stderr, "timer peerMgrFree\n" );
229
230    while( !tr_ptrArrayEmpty( manager->torrents ) )
231        freeTorrent( manager, (Torrent*)tr_ptrArrayNth( manager->torrents, 0) );
232    tr_ptrArrayFree( manager->torrents );
233
234    for( i=0, n=tr_ptrArraySize(manager->handshakes); i<n; ++i )
235        tr_handshakeAbort( (tr_handshake*) tr_ptrArrayNth( manager->handshakes, i) );
236    tr_ptrArrayFree( manager->handshakes );
237
238    tr_free( manager );
239}
240
241static tr_peer**
242getConnectedPeers( Torrent * t, int * setmeCount )
243{
244    int i, peerCount, connectionCount;
245    tr_peer **peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
246    tr_peer **ret = tr_new( tr_peer*, peerCount );
247
248    for( i=connectionCount=0; i<peerCount; ++i )
249        if( peers[i]->msgs != NULL )
250            ret[connectionCount++] = peers[i];
251
252    *setmeCount = connectionCount;
253    return ret;
254}
255
256/***
257****  Refill
258***/
259
260struct tr_refill_piece
261{
262    tr_priority_t priority;
263    uint32_t piece;
264    uint32_t peerCount;
265};
266
267static int
268compareRefillPiece (const void * aIn, const void * bIn)
269{
270    const struct tr_refill_piece * a = aIn;
271    const struct tr_refill_piece * b = bIn;
272
273    /* if one piece has a higher priority, it goes first */
274    if (a->priority != b->priority)
275        return a->priority > b->priority ? -1 : 1;
276
277    /* otherwise if one has fewer peers, it goes first */
278    if (a->peerCount != b->peerCount)
279        return a->peerCount < b->peerCount ? -1 : 1;
280
281    /* otherwise go with the earlier piece */
282    return a->piece - b->piece;
283}
284
285static int
286isPieceInteresting( const tr_torrent  * tor,
287                    int                 piece )
288{
289    if( tor->info.pieces[piece].dnd ) /* we don't want it */
290        return 0;
291
292    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
293        return 0;
294
295    return 1;
296}
297
298static uint32_t*
299getPreferredPieces( Torrent     * t,
300                    uint32_t    * pieceCount )
301{
302    const tr_torrent * tor = t->tor;
303    const tr_info * inf = &tor->info;
304
305    int i;
306    uint32_t poolSize = 0;
307    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
308
309    int peerCount;
310    tr_peer** peers = getConnectedPeers( t, &peerCount );
311
312    for( i=0; i<inf->pieceCount; ++i )
313        if( isPieceInteresting( tor, i ) )
314            pool[poolSize++] = i;
315
316    /* sort the pool from most interesting to least... */
317    if( poolSize > 1 )
318    {
319        uint32_t j;
320        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
321
322        for( j=0; j<poolSize; ++j )
323        {
324            int k;
325            const int piece = pool[j];
326            struct tr_refill_piece * setme = p + j;
327
328            setme->piece = piece;
329            setme->priority = inf->pieces[piece].priority;
330            setme->peerCount = 0;
331
332            for( k=0; k<peerCount; ++k ) {
333                const tr_peer * peer = peers[k];
334                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
335                    ++setme->peerCount;
336            }
337        }
338
339        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
340
341        for( j=0; j<poolSize; ++j )
342            pool[j] = p[j].piece;
343
344        tr_free( p );
345    }
346
347#if 0
348fprintf (stderr, "new pool: ");
349for (i=0; i<15 && i<(int)poolSize; ++i ) fprintf (stderr, "%d, ", (int)pool[i] );
350fprintf (stderr, "\n");
351#endif
352    tr_free( peers );
353
354    *pieceCount = poolSize;
355    return pool;
356}
357
358static uint64_t*
359getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
360{
361    uint32_t i;
362    uint32_t pieceCount;
363    uint32_t * pieces;
364    uint64_t *req, *unreq, *ret, *walk;
365    int reqCount, unreqCount;
366    const tr_torrent * tor = t->tor;
367
368    pieces = getPreferredPieces( t, &pieceCount );
369fprintf( stderr, "REFILL refillPulse for {%s} got %d of %d pieces\n", tor->info.name, (int)pieceCount, t->tor->info.pieceCount );
370
371    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
372    reqCount = 0;
373    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
374    unreqCount = 0;
375
376    for( i=0; i<pieceCount; ++i ) {
377        const uint32_t index = pieces[i];
378        const int begin = tr_torPieceFirstBlock( tor, index );
379        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
380        int block;
381        for( block=begin; block<end; ++block )
382            if( tr_cpBlockIsComplete( tor->completion, block ) )
383                continue;
384            else if( tr_bitfieldHas( t->requested, block ) )
385                req[reqCount++] = block;
386            else
387                unreq[unreqCount++] = block;
388    }
389
390fprintf( stderr, "REFILL refillPulse for {%s} reqCount is %d, unreqCount is %d\n", tor->info.name, (int)reqCount, (int)unreqCount );
391    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
392    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
393    walk += unreqCount;
394    memcpy( walk, req, sizeof(uint64_t) * reqCount );
395    walk += reqCount;
396    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
397    *setmeCount = walk - ret;
398
399    tr_free( req );
400    tr_free( unreq );
401
402    return ret;
403}
404
405static int
406refillPulse( void * vtorrent )
407{
408    Torrent * t = vtorrent;
409    tr_torrent * tor = t->tor;
410    uint32_t i;
411    int peerCount;
412    tr_peer ** peers;
413    uint64_t blockCount;
414    uint64_t * blocks;
415
416    if( !t->isRunning )
417        return TRUE;
418    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
419        return TRUE;
420
421    blocks = getPreferredBlocks( t, &blockCount );
422    peers = getConnectedPeers( t, &peerCount );
423
424fprintf( stderr, "REFILL refillPulse for {%s} got %d blocks\n", tor->info.name, (int)blockCount );
425
426    for( i=0; peerCount && i<blockCount; ++i )
427    {
428        const int block = blocks[i];
429        const uint32_t index = tr_torBlockPiece( tor, block );
430        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
431        const uint32_t length = tr_torBlockCountBytes( tor, block );
432        int j;
433        assert( _tr_block( tor, index, begin ) == block );
434        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
435        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
436
437
438        /* find a peer who can ask for this block */
439        for( j=0; j<peerCount; )
440        {
441            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
442            switch( val )
443            {
444                case TR_ADDREQ_FULL: 
445                case TR_ADDREQ_CLIENT_CHOKED:
446                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
447                    break;
448
449                case TR_ADDREQ_MISSING: 
450                    ++j;
451                    break;
452
453                case TR_ADDREQ_OK:
454                    fprintf( stderr, "REFILL peer %p took the request for block %d\n", peers[j]->msgs, block );
455                    tr_bitfieldAdd( t->requested, block );
456                    j = peerCount;
457                    break;
458
459                default:
460                    assert( 0 && "unhandled value" );
461                    break;
462            }
463        }
464    }
465
466    /* cleanup */
467    tr_free( peers );
468    tr_free( blocks );
469
470    t->refillTimer = NULL;
471    return FALSE;
472}
473
474static void
475broadcastClientHave( Torrent * t, uint32_t index )
476{
477    int i, size;
478    tr_peer ** peers = getConnectedPeers( t, &size );
479    for( i=0; i<size; ++i )
480        tr_peerMsgsHave( peers[i]->msgs, index );
481    tr_free( peers );
482}
483
484static void
485broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
486{
487    int i, size;
488    tr_peer ** peers = getConnectedPeers( t, &size );
489    for( i=0; i<size; ++i )
490        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
491    tr_free( peers );
492}
493
494static void
495msgsCallbackFunc( void * source UNUSED, void * vevent, void * vt )
496{
497    Torrent * t = (Torrent *) vt;
498    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
499
500    switch( e->eventType )
501    {
502        case TR_PEERMSG_NEED_REQ:
503            if( t->refillTimer == NULL )
504                t->refillTimer = tr_timerNew( t->manager->handle,
505                                              refillPulse, t,
506                                              REFILL_PERIOD_MSEC );
507            break;
508
509        case TR_PEERMSG_CLIENT_HAVE:
510            broadcastClientHave( t, e->pieceIndex );
511            break;
512
513        case TR_PEERMSG_CLIENT_BLOCK:
514            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
515            break;
516
517        case TR_PEERMSG_GOT_ERROR:
518            /* FIXME */
519            break;
520
521        default:
522            assert(0);
523    }
524}
525
526static void
527myHandshakeDoneCB( tr_handshake    * handshake,
528                   tr_peerIo       * io,
529                   int               isConnected,
530                   const uint8_t   * peer_id,
531                   int               peerSupportsEncryption,
532                   void            * vmanager )
533{
534    int ok = isConnected;
535    uint16_t port;
536    const struct in_addr * in_addr;
537    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
538    const uint8_t * hash = NULL;
539    Torrent * t;
540    tr_handshake * ours;
541
542    assert( io != NULL );
543    assert( isConnected==0 || isConnected==1 );
544    assert( peerSupportsEncryption==0 || peerSupportsEncryption==1 );
545
546    ours = tr_ptrArrayRemoveSorted( manager->handshakes,
547                                    handshake,
548                                    tr_comparePointers );
549    assert( handshake == ours );
550
551    in_addr = tr_peerIoGetAddress( io, &port );
552
553    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
554    {
555        tr_peerIoFree( io );
556        --manager->connectionCount;
557        return;
558    }
559
560    hash = tr_peerIoGetTorrentHash( io );
561    t = getExistingTorrent( manager, hash );
562    if( !t || !t->isRunning )
563    {
564        tr_peerIoFree( io );
565        --manager->connectionCount;
566        return;
567    }
568
569    fprintf( stderr, "peer-mgr: torrent [%s] finished a handshake; isConnected is %d\n", t->tor->info.name, isConnected );
570
571    /* if we couldn't connect or were snubbed,
572     * the peer's probably not worth remembering. */
573    if( !ok ) {
574        tr_peer * peer = getExistingPeer( t, in_addr );
575        fprintf( stderr, "peer-mgr: torrent [%s] got a bad one, and you know what? fuck them.\n", t->tor->info.name );
576        tr_peerIoFree( io );
577        --manager->connectionCount;
578        if( peer ) {
579            tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
580            freePeer( peer );
581        }
582        return;
583    }
584
585    if( 1 ) {
586        tr_peer * peer = getPeer( t, in_addr, NULL );
587        if( peer->msgs != NULL ) { /* we already have this peer */
588            tr_peerIoFree( io );
589            --manager->connectionCount;
590        } else {
591            peer->port = port;
592            peer->io = io;
593            peer->msgs = tr_peerMsgsNew( t->tor, peer );
594            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
595            peer->peerSupportsEncryption = peerSupportsEncryption ? 1 : 0;
596            fprintf( stderr, "PUB sub peer %p to msgs %p\n", peer, peer->msgs );
597            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
598        }
599    }
600}
601
602static void
603initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
604{
605    tr_handshake * handshake = tr_handshakeNew( io,
606                                                HANDSHAKE_ENCRYPTION_PREFERRED,
607                                                myHandshakeDoneCB,
608                                                manager );
609    ++manager->connectionCount;
610
611    tr_ptrArrayInsertSorted( manager->handshakes, handshake, tr_comparePointers );
612}
613
614void
615tr_peerMgrAddIncoming( tr_peerMgr      * manager,
616                       struct in_addr  * addr,
617                       int               socket )
618{
619    tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, socket );
620    initiateHandshake( manager, io );
621}
622
623static void
624maybeConnect( tr_peerMgr * manager, Torrent * t, tr_peer * peer )
625{
626    tr_peerIo * io;
627
628    assert( manager != NULL );
629    assert( t != NULL );
630    assert( peer != NULL );
631
632    if( peer->io != NULL ) { /* already connected */
633        fprintf( stderr, "not connecting because we already have an IO for that address\n" );
634        return;
635    }
636    if( !t->isRunning ) { /* torrent's not running */
637        fprintf( stderr, "OUTGOING connection not being made because t [%s] is not running\n", t->tor->info.name );
638        return;
639    }
640
641    io = tr_peerIoNewOutgoing( manager->handle,
642                               &peer->in_addr,
643                               peer->port,
644                               t->hash );
645    initiateHandshake( manager, io );
646}
647
648void
649tr_peerMgrAddPex( tr_peerMgr     * manager,
650                  const uint8_t  * torrentHash,
651                  int              from,
652                  const tr_pex   * pex,
653                  int              pexCount )
654{
655    Torrent * t = getExistingTorrent( manager, torrentHash );
656    const tr_pex * end = pex + pexCount;
657    while( pex != end )
658    {
659        int isNew;
660        tr_peer * peer = getPeer( t, &pex->in_addr, &isNew );
661        if( isNew ) {
662            peer->port = pex->port;
663            peer->from = from;
664            maybeConnect( manager, t, peer );
665        }
666        ++pex;
667    }
668}
669
670void
671tr_peerMgrAddPeers( tr_peerMgr    * manager,
672                    const uint8_t * torrentHash,
673                    int             from,
674                    const uint8_t * peerCompact,
675                    int             peerCount )
676{
677    int i;
678    const uint8_t * walk = peerCompact;
679    Torrent * t = getExistingTorrent( manager, torrentHash );
680    for( i=0; t!=NULL && i<peerCount; ++i )
681    {
682        int isNew;
683        tr_peer * peer;
684        struct in_addr addr;
685        uint16_t port;
686        memcpy( &addr, walk, 4 ); walk += 4;
687        memcpy( &port, walk, 2 ); walk += 2;
688        peer = getPeer( t, &addr, &isNew );
689        if( isNew ) {
690            peer->port = port;
691            peer->from = from;
692            maybeConnect( manager, t, peer );
693        }
694    }
695}
696
697/**
698***
699**/
700
701int
702tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager UNUSED )
703{
704    return TRUE; /* manager->connectionCount < MAX_CONNECTED_PEERS; */
705}
706
707void
708tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
709                    const uint8_t  * torrentHash UNUSED,
710                    int              pieceIndex UNUSED,
711                    int              success UNUSED )
712{
713    fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );
714}
715
716int
717tr_pexCompare( const void * va, const void * vb )
718{
719    const tr_pex * a = (const tr_pex *) va;
720    const tr_pex * b = (const tr_pex *) vb;
721    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
722    if( i ) return i;
723    if( a->port < b->port ) return -1;
724    if( a->port > b->port ) return 1;
725    return 0;
726}
727
728int tr_pexCompare( const void * a, const void * b );
729
730
731int
732tr_peerMgrGetPeers( tr_peerMgr      * manager,
733                    const uint8_t   * torrentHash,
734                    tr_pex         ** setme_pex )
735{
736    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
737    int i, peerCount;
738    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
739    tr_pex * pex = tr_new( tr_pex, peerCount );
740    tr_pex * walk = pex;
741
742    for( i=0; i<peerCount; ++i, ++walk )
743    {
744        const tr_peer * peer = peers[i];
745
746        walk->in_addr = peer->in_addr;
747
748        walk->port = peer->port;
749
750        walk->flags = 0;
751        if( peer->peerSupportsEncryption ) walk->flags |= 1;
752        if( peer->progress >= 1.0 )        walk->flags |= 2;
753    }
754
755    assert( ( walk - pex ) == peerCount );
756    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
757    *setme_pex = pex;
758    return peerCount;
759}
760
761void
762tr_peerMgrStartTorrent( tr_peerMgr     * manager,
763                        const uint8_t  * torrentHash )
764{
765    int i, peerCount;
766    Torrent * t = getExistingTorrent( manager, torrentHash );
767
768    t->isRunning = 1;
769
770    peerCount = tr_ptrArraySize( t->peers );
771    for( i=0; i<peerCount; ++i )
772        maybeConnect( manager, t, tr_ptrArrayNth( t->peers, i ) );
773}
774
775void
776tr_peerMgrStopTorrent( tr_peerMgr     * manager,
777                       const uint8_t  * torrentHash)
778{
779    int i, peerCount;
780    Torrent * t = getExistingTorrent( manager, torrentHash );
781
782    t->isRunning = 0;
783
784    peerCount = tr_ptrArraySize( t->peers );
785    for( i=0; i<peerCount; ++i )
786        disconnectPeer( tr_ptrArrayNth( t->peers, i ) );
787}
788
789void
790tr_peerMgrAddTorrent( tr_peerMgr * manager,
791                      tr_torrent * tor )
792{
793    Torrent * t;
794
795    assert( tor != NULL );
796    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
797
798    t = tr_new0( Torrent, 1 );
799    t->manager = manager;
800    t->tor = tor;
801    t->peers = tr_ptrArrayNew( );
802    t->requested = tr_bitfieldNew( tor->blockCount );
803    t->chokeTimer = tr_timerNew( manager->handle, chokePulse, t, RECHOKE_PERIOD_SECONDS );
804
805    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
806    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
807}
808
809void
810tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
811                         const uint8_t  * torrentHash )
812{
813    Torrent * t = getExistingTorrent( manager, torrentHash );
814    assert( t != NULL );
815    tr_peerMgrStopTorrent( manager, torrentHash );
816    freeTorrent( manager, t );
817}
818
819void
820tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
821                               const uint8_t    * torrentHash,
822                               int8_t           * tab,
823                               int                tabCount )
824{
825    int i;
826    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
827    const tr_torrent * tor = t->tor;
828    const float interval = tor->info.pieceCount / (float)tabCount;
829
830    memset( tab, 0, tabCount );
831
832    for( i=0; i<tabCount; ++i )
833    {
834        const int piece = i * interval;
835
836        if( tor == NULL )
837            tab[i] = 0;
838        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
839            tab[i] = -1;
840        else {
841            int j, peerCount;
842            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
843            for( j=0; j<peerCount; ++j )
844                if( tr_bitfieldHas( peers[j]->have, i ) )
845                    ++tab[i];
846        }
847    }
848}
849
850
851void
852tr_peerMgrTorrentStats( const tr_peerMgr * manager,
853                        const uint8_t    * torrentHash,
854                        int              * setmePeersTotal,
855                        int              * setmePeersConnected,
856                        int              * setmePeersSendingToUs,
857                        int              * setmePeersGettingFromUs,
858                        int              * setmePeersFrom )
859{
860    int i, size;
861    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
862    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
863
864    *setmePeersTotal          = size;
865    *setmePeersConnected      = 0;
866    *setmePeersSendingToUs    = 0;
867    *setmePeersGettingFromUs  = 0;
868
869    for( i=0; i<size; ++i )
870    {
871        const tr_peer * peer = peers[i];
872
873        if( peer->io == NULL ) /* not connected */
874            continue;
875
876        ++*setmePeersConnected;
877
878        ++setmePeersFrom[peer->from];
879
880        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
881            ++*setmePeersGettingFromUs;
882
883        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
884            ++*setmePeersSendingToUs;
885    }
886}
887
888struct tr_peer_stat *
889tr_peerMgrPeerStats( const tr_peerMgr  * manager,
890                     const uint8_t     * torrentHash,
891                     int               * setmeCount UNUSED )
892{
893    int i, size;
894    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
895    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
896    tr_peer_stat * ret;
897
898    ret = tr_new0( tr_peer_stat, size );
899
900    for( i=0; i<size; ++i )
901    {
902        const tr_peer * peer = peers[i];
903        const int live = peer->io != NULL;
904        tr_peer_stat * stat = ret + i;
905
906        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
907        stat->port             = peer->port;
908        stat->from             = peer->from;
909        stat->client           = peer->client;
910        stat->progress         = peer->progress;
911        stat->isConnected      = live ? 1 : 0;
912        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
913        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
914        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
915        stat->isDownloading    = stat->uploadToRate > 0.01;
916        stat->isUploading      = stat->downloadFromRate > 0.01;
917    }
918
919    *setmeCount = size;
920    return ret;
921}
922
923/**
924***
925**/
926
927typedef struct
928{
929    tr_peer * peer;
930    float rate;
931    int randomKey;
932    int preferred;
933    int doUnchoke;
934}
935ChokeData;
936
937static int
938compareChoke( const void * va, const void * vb )
939{
940    const ChokeData * a = ( const ChokeData * ) va;
941    const ChokeData * b = ( const ChokeData * ) vb;
942
943    if( a->preferred != b->preferred )
944        return a->preferred ? -1 : 1;
945
946    if( a->preferred )
947    {
948        if( a->rate > b->rate ) return -1;
949        if( a->rate < b->rate ) return 1;
950        return 0;
951    }
952    else
953    {
954        return a->randomKey - b->randomKey;
955    }
956}
957
958static int
959clientIsSnubbedBy( const tr_peer * peer )
960{
961    assert( peer != NULL );
962
963    return peer->peerSentDataAt < (time(NULL) - 30);
964}
965
966static void
967rechokeLeech( Torrent * t )
968{
969    int i, size, unchoked=0;
970    tr_peer ** peers = getConnectedPeers( t, &size );
971    ChokeData * choke = tr_new0( ChokeData, size );
972
973    /* sort the peers by preference and rate */
974    for( i=0; i<size; ++i ) {
975        tr_peer * peer = peers[i];
976        ChokeData * node = &choke[i];
977        node->peer = peer;
978        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
979        node->randomKey = tr_rand( INT_MAX );
980        node->rate = tr_peerIoGetRateToClient( peer->io );
981    }
982    qsort( choke, size, sizeof(ChokeData), compareChoke );
983
984    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
985        choke[i].doUnchoke = 1;
986        ++unchoked;
987    }
988
989    for( ; i<size; ++i ) {
990        choke[i].doUnchoke = 1;
991        ++unchoked;
992        if( choke[i].peer->peerIsInterested )
993            break;
994    }
995
996    for( i=0; i<size; ++i )
997        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
998
999    /* cleanup */
1000    tr_free( choke );
1001    tr_free( peers );
1002}
1003
1004static void
1005rechokeSeed( Torrent * t )
1006{
1007    int i, size;
1008    tr_peer ** peers = getConnectedPeers( t, &size );
1009
1010    /* FIXME */
1011    for( i=0; i<size; ++i )
1012        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1013
1014    tr_free( peers );
1015}
1016
1017static int
1018chokePulse( void * vtorrent )
1019{
1020    Torrent * t = vtorrent;
1021    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1022    if( done )
1023        rechokeLeech( vtorrent );
1024    else
1025        rechokeSeed( vtorrent );
1026    return TRUE;
1027}
Note: See TracBrowser for help on using the repository browser.