source: trunk/libtransmission/peer-mgr.c @ 3123

Last change on this file since 3123 was 3123, checked in by charles, 15 years ago

fix stat bug reported by BentMyWookie?

  • Property svn:keywords set to Date Rev Author Id
File size: 29.1 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3123 2007-09-21 15:13:23Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include "transmission.h"
20#include "clients.h"
21#include "completion.h"
22#include "handshake.h"
23#include "net.h"
24#include "peer-io.h"
25#include "peer-mgr.h"
26#include "peer-mgr-private.h"
27#include "peer-msgs.h"
28#include "ptrarray.h"
29#include "trevent.h"
30#include "utils.h"
31
32/* how frequently to change which peers are choked */
33#define RECHOKE_PERIOD_SECONDS (15 * 1000)
34
35#define REFILL_PERIOD_MSEC 1000
36
37/* how many peers to unchoke per-torrent. */
38/* FIXME: make this user-configurable? */
39#define NUM_UNCHOKED_PEERS_PER_TORRENT 8
40
41/**
42***
43**/
44
45typedef struct
46{
47    uint8_t hash[SHA_DIGEST_LENGTH];
48    tr_ptrArray * peers; /* tr_peer */
49    tr_timer * chokeTimer;
50    tr_timer * refillTimer;
51    tr_torrent * tor;
52    tr_bitfield * requested;
53
54    unsigned int isRunning : 1;
55
56    struct tr_peerMgr * manager;
57}
58Torrent;
59
60struct tr_peerMgr
61{
62    tr_handle * handle;
63    tr_ptrArray * torrents; /* Torrent */
64    int connectionCount;
65    tr_ptrArray * handshakes; /* in-process */
66};
67
68/**
69***
70**/
71
72static int
73handshakeCompareToAddr( const void * va, const void * vb )
74{
75    const tr_handshake * a = va;
76    const struct in_addr * b = vb;
77    return memcmp( tr_handshakeGetAddr( a, NULL ), b, sizeof( struct in_addr ) );
78}
79
80static int
81handshakeCompare( const void * va, const void * vb )
82{
83    const tr_handshake * a = va;
84    const tr_handshake * b = vb;
85    return memcmp( tr_handshakeGetAddr( a, NULL ),
86                   tr_handshakeGetAddr( b, NULL ),
87                   sizeof( struct in_addr ) );
88}
89
90static tr_handshake*
91getExistingHandshake( tr_peerMgr * mgr, const struct in_addr * in_addr )
92{
93    return tr_ptrArrayFindSorted( mgr->handshakes,
94                                  in_addr,
95                                  handshakeCompareToAddr );
96}
97
98/**
99***
100**/
101
102static int
103torrentCompare( const void * va, const void * vb )
104{
105    const Torrent * a = va;
106    const Torrent * b = vb;
107    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
108}
109
110static int
111torrentCompareToHash( const void * va, const void * vb )
112{
113    const Torrent * a = (const Torrent*) va;
114    const uint8_t * b_hash = (const uint8_t*) vb;
115    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
116}
117
118static Torrent*
119getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
120{
121    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
122                                             hash,
123                                             torrentCompareToHash );
124}
125
126static int chokePulse( void * vtorrent );
127
128static int
129peerCompare( const void * va, const void * vb )
130{
131    const tr_peer * a = (const tr_peer *) va;
132    const tr_peer * b = (const tr_peer *) vb;
133    return memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
134}
135
136static int
137peerCompareToAddr( const void * va, const void * vb )
138{
139    const tr_peer * a = (const tr_peer *) va;
140    const struct in_addr * b = (const struct in_addr *) vb;
141    return memcmp( &a->in_addr, b, sizeof(struct in_addr) );
142}
143
144static tr_peer*
145getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
146{
147    assert( torrent != NULL );
148    assert( torrent->peers != NULL );
149    assert( in_addr != NULL );
150
151    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
152                                             in_addr,
153                                             peerCompareToAddr );
154}
155
156static tr_peer*
157getPeer( Torrent * torrent, const struct in_addr * in_addr, int * isNew )
158{
159    tr_peer * peer = getExistingPeer( torrent, in_addr );
160
161    if( isNew )
162        *isNew = peer == NULL;
163
164    if( peer == NULL )
165    {
166        peer = tr_new0( tr_peer, 1 );
167        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
168        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
169    }
170
171    return peer;
172}
173
174static void
175disconnectPeer( tr_peer * peer )
176{
177    assert( peer != NULL );
178
179    tr_peerIoFree( peer->io );
180    peer->io = NULL;
181
182    if( peer->msgs != NULL )
183    {
184        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
185        tr_peerMsgsFree( peer->msgs );
186        peer->msgs = NULL;
187    }
188
189    tr_bitfieldFree( peer->have );
190    peer->have = NULL;
191
192    tr_bitfieldFree( peer->blame );
193    peer->blame = NULL;
194
195    tr_bitfieldFree( peer->banned );
196    peer->banned = NULL;
197}
198
199static void
200freePeer( tr_peer * peer )
201{
202    disconnectPeer( peer );
203    tr_free( peer->client );
204    tr_free( peer );
205}
206
207static void
208freeTorrent( tr_peerMgr * manager, Torrent * t )
209{
210    int i, size;
211    tr_peer ** peers;
212    uint8_t hash[SHA_DIGEST_LENGTH];
213
214fprintf( stderr, "timer freeTorrent %p\n", t );
215
216    assert( manager != NULL );
217    assert( t != NULL );
218    assert( t->peers != NULL );
219    assert( getExistingTorrent( manager, t->hash ) != NULL );
220
221    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
222
223    tr_timerFree( &t->chokeTimer );
224    tr_timerFree( &t->refillTimer );
225
226    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
227    for( i=0; i<size; ++i )
228        freePeer( peers[i] );
229
230    tr_bitfieldFree( t->requested );
231    tr_ptrArrayFree( t->peers );
232    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
233    tr_free( t );
234
235    assert( getExistingTorrent( manager, hash ) == NULL );
236}
237
238/**
239***
240**/
241
242tr_peerMgr*
243tr_peerMgrNew( tr_handle * handle )
244{
245    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
246    m->handle = handle;
247    m->torrents = tr_ptrArrayNew( );
248    m->handshakes = tr_ptrArrayNew( );
249    return m;
250}
251
252void
253tr_peerMgrFree( tr_peerMgr * manager )
254{
255fprintf( stderr, "timer peerMgrFree\n" );
256
257    while( !tr_ptrArrayEmpty( manager->handshakes ) )
258        tr_handshakeAbort( (tr_handshake*)tr_ptrArrayNth( manager->handshakes, 0) );
259    tr_ptrArrayFree( manager->handshakes );
260
261    while( !tr_ptrArrayEmpty( manager->torrents ) )
262        freeTorrent( manager, (Torrent*)tr_ptrArrayNth( manager->torrents, 0) );
263    tr_ptrArrayFree( manager->torrents );
264
265    tr_free( manager );
266}
267
268static tr_peer**
269getConnectedPeers( Torrent * t, int * setmeCount )
270{
271    int i, peerCount, connectionCount;
272    tr_peer **peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
273    tr_peer **ret = tr_new( tr_peer*, peerCount );
274
275    for( i=connectionCount=0; i<peerCount; ++i )
276        if( peers[i]->msgs != NULL )
277            ret[connectionCount++] = peers[i];
278
279    *setmeCount = connectionCount;
280    return ret;
281}
282
283/***
284****  Refill
285***/
286
287struct tr_refill_piece
288{
289    tr_priority_t priority;
290    uint32_t piece;
291    uint32_t peerCount;
292};
293
294static int
295compareRefillPiece (const void * aIn, const void * bIn)
296{
297    const struct tr_refill_piece * a = aIn;
298    const struct tr_refill_piece * b = bIn;
299
300    /* if one piece has a higher priority, it goes first */
301    if (a->priority != b->priority)
302        return a->priority > b->priority ? -1 : 1;
303
304    /* otherwise if one has fewer peers, it goes first */
305    if (a->peerCount != b->peerCount)
306        return a->peerCount < b->peerCount ? -1 : 1;
307
308    /* otherwise go with the earlier piece */
309    return a->piece - b->piece;
310}
311
312static int
313isPieceInteresting( const tr_torrent  * tor,
314                    int                 piece )
315{
316    if( tor->info.pieces[piece].dnd ) /* we don't want it */
317        return 0;
318
319    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
320        return 0;
321
322    return 1;
323}
324
325static uint32_t*
326getPreferredPieces( Torrent     * t,
327                    uint32_t    * pieceCount )
328{
329    const tr_torrent * tor = t->tor;
330    const tr_info * inf = &tor->info;
331
332    int i;
333    uint32_t poolSize = 0;
334    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
335
336    int peerCount;
337    tr_peer** peers = getConnectedPeers( t, &peerCount );
338
339    for( i=0; i<inf->pieceCount; ++i )
340        if( isPieceInteresting( tor, i ) )
341            pool[poolSize++] = i;
342
343    /* sort the pool from most interesting to least... */
344    if( poolSize > 1 )
345    {
346        uint32_t j;
347        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
348
349        for( j=0; j<poolSize; ++j )
350        {
351            int k;
352            const int piece = pool[j];
353            struct tr_refill_piece * setme = p + j;
354
355            setme->piece = piece;
356            setme->priority = inf->pieces[piece].priority;
357            setme->peerCount = 0;
358
359            for( k=0; k<peerCount; ++k ) {
360                const tr_peer * peer = peers[k];
361                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
362                    ++setme->peerCount;
363            }
364        }
365
366        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
367
368        for( j=0; j<poolSize; ++j )
369            pool[j] = p[j].piece;
370
371        tr_free( p );
372    }
373
374#if 0
375fprintf (stderr, "new pool: ");
376for (i=0; i<15 && i<(int)poolSize; ++i ) fprintf (stderr, "%d, ", (int)pool[i] );
377fprintf (stderr, "\n");
378#endif
379    tr_free( peers );
380
381    *pieceCount = poolSize;
382    return pool;
383}
384
385static uint64_t*
386getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
387{
388    uint32_t i;
389    uint32_t pieceCount;
390    uint32_t * pieces;
391    uint64_t *req, *unreq, *ret, *walk;
392    int reqCount, unreqCount;
393    const tr_torrent * tor = t->tor;
394
395    pieces = getPreferredPieces( t, &pieceCount );
396/*fprintf( stderr, "REFILL refillPulse for {%s} got %d of %d pieces\n", tor->info.name, (int)pieceCount, t->tor->info.pieceCount );*/
397
398    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
399    reqCount = 0;
400    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
401    unreqCount = 0;
402
403    for( i=0; i<pieceCount; ++i ) {
404        const uint32_t index = pieces[i];
405        const int begin = tr_torPieceFirstBlock( tor, index );
406        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
407        int block;
408        for( block=begin; block<end; ++block )
409            if( tr_cpBlockIsComplete( tor->completion, block ) )
410                continue;
411            else if( tr_bitfieldHas( t->requested, block ) )
412                req[reqCount++] = block;
413            else
414                unreq[unreqCount++] = block;
415    }
416
417/*fprintf( stderr, "REFILL refillPulse for {%s} reqCount is %d, unreqCount is %d\n", tor->info.name, (int)reqCount, (int)unreqCount );*/
418    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
419    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
420    walk += unreqCount;
421    memcpy( walk, req, sizeof(uint64_t) * reqCount );
422    walk += reqCount;
423    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
424    *setmeCount = walk - ret;
425
426    tr_free( req );
427    tr_free( unreq );
428    tr_free( pieces );
429
430    return ret;
431}
432
433static int
434refillPulse( void * vtorrent )
435{
436    Torrent * t = vtorrent;
437    tr_torrent * tor = t->tor;
438    uint32_t i;
439    int peerCount;
440    tr_peer ** peers;
441    uint64_t blockCount;
442    uint64_t * blocks;
443
444    if( !t->isRunning )
445        return TRUE;
446    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
447        return TRUE;
448
449    blocks = getPreferredBlocks( t, &blockCount );
450    peers = getConnectedPeers( t, &peerCount );
451
452/*fprintf( stderr, "REFILL refillPulse for {%s} got %d blocks\n", tor->info.name, (int)blockCount );*/
453
454    for( i=0; peerCount && i<blockCount; ++i )
455    {
456        const int block = blocks[i];
457        const uint32_t index = tr_torBlockPiece( tor, block );
458        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
459        const uint32_t length = tr_torBlockCountBytes( tor, block );
460        int j;
461        assert( _tr_block( tor, index, begin ) == block );
462        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
463        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
464
465
466        /* find a peer who can ask for this block */
467        for( j=0; j<peerCount; )
468        {
469            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
470            switch( val )
471            {
472                case TR_ADDREQ_FULL: 
473                case TR_ADDREQ_CLIENT_CHOKED:
474                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
475                    break;
476
477                case TR_ADDREQ_MISSING: 
478                    ++j;
479                    break;
480
481                case TR_ADDREQ_OK:
482                    /*fprintf( stderr, "REFILL peer %p took the request for block %d\n", peers[j]->msgs, block );*/
483                    tr_bitfieldAdd( t->requested, block );
484                    j = peerCount;
485                    break;
486
487                default:
488                    assert( 0 && "unhandled value" );
489                    break;
490            }
491        }
492    }
493
494    /* cleanup */
495    tr_free( peers );
496    tr_free( blocks );
497
498    t->refillTimer = NULL;
499    return FALSE;
500}
501
502static void
503broadcastClientHave( Torrent * t, uint32_t index )
504{
505    int i, size;
506    tr_peer ** peers = getConnectedPeers( t, &size );
507    for( i=0; i<size; ++i )
508        tr_peerMsgsHave( peers[i]->msgs, index );
509    tr_free( peers );
510}
511
512static void
513broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
514{
515    int i, size;
516    tr_peer ** peers = getConnectedPeers( t, &size );
517    for( i=0; i<size; ++i )
518        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
519    tr_free( peers );
520}
521
522static void
523msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
524{
525    tr_peer * peer = vpeer;
526    Torrent * t = (Torrent *) vt;
527    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
528
529    switch( e->eventType )
530    {
531        case TR_PEERMSG_NEED_REQ:
532            if( t->refillTimer == NULL )
533                t->refillTimer = tr_timerNew( t->manager->handle,
534                                              refillPulse, t,
535                                              REFILL_PERIOD_MSEC );
536            break;
537
538        case TR_PEERMSG_CLIENT_HAVE:
539            broadcastClientHave( t, e->pieceIndex );
540            break;
541
542        case TR_PEERMSG_PEER_PROGRESS: { /* if we're both seeds, then disconnect. */
543            const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
544            const int peerIsSeed = e->progress >= 1.0;
545            if( clientIsSeed && peerIsSeed ) {
546                fprintf( stderr, "DISCONNECTING FROM PEER because both of us are seeds\n" );
547                peer->doDisconnect = 1;
548            }
549            break;
550        }
551
552        case TR_PEERMSG_CLIENT_BLOCK:
553            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
554            break;
555
556        case TR_PEERMSG_GOT_ERROR:
557            peer->doDisconnect = 1;
558            break;
559
560        default:
561            assert(0);
562    }
563}
564
565static void
566myHandshakeDoneCB( tr_handshake    * handshake,
567                   tr_peerIo       * io,
568                   int               isConnected,
569                   const uint8_t   * peer_id,
570                   int               peerSupportsEncryption,
571                   void            * vmanager )
572{
573    int ok = isConnected;
574    uint16_t port;
575    const struct in_addr * in_addr;
576    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
577    const uint8_t * hash = NULL;
578    Torrent * t;
579    tr_handshake * ours;
580
581    assert( io != NULL );
582    assert( isConnected==0 || isConnected==1 );
583    assert( peerSupportsEncryption==0 || peerSupportsEncryption==1 );
584
585    ours = tr_ptrArrayRemoveSorted( manager->handshakes,
586                                    handshake,
587                                    handshakeCompare );
588    assert( handshake == ours );
589
590    in_addr = tr_peerIoGetAddress( io, &port );
591
592    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
593    {
594        tr_peerIoFree( io );
595        --manager->connectionCount;
596        return;
597    }
598
599    hash = tr_peerIoGetTorrentHash( io );
600    t = getExistingTorrent( manager, hash );
601    if( !t || !t->isRunning )
602    {
603        tr_peerIoFree( io );
604        --manager->connectionCount;
605        return;
606    }
607
608    fprintf( stderr, "peer-mgr: torrent [%s] finished a handshake. Connected? %s.\n", t->tor->info.name, (isConnected?"yes":"no") );
609
610    /* if we couldn't connect or were snubbed,
611     * the peer's probably not worth remembering. */
612    if( !ok ) {
613        tr_peer * peer = getExistingPeer( t, in_addr );
614        tr_peerIoFree( io );
615        --manager->connectionCount;
616        if( peer ) {
617            tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
618            freePeer( peer );
619        }
620        return;
621    }
622
623    if( 1 ) {
624        tr_peer * peer = getPeer( t, in_addr, NULL );
625        if( peer->msgs != NULL ) { /* we already have this peer */
626            tr_peerIoFree( io );
627            --manager->connectionCount;
628        } else {
629            peer->port = port;
630            peer->io = io;
631            peer->msgs = tr_peerMsgsNew( t->tor, peer );
632            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
633            peer->peerSupportsEncryption = peerSupportsEncryption ? 1 : 0;
634            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
635        }
636    }
637}
638
639static void
640initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
641{
642    tr_handshake * handshake = tr_handshakeNew( io,
643                                                manager->handle->encryptionMode,
644                                                myHandshakeDoneCB,
645                                                manager );
646    ++manager->connectionCount;
647
648    tr_ptrArrayInsertSorted( manager->handshakes, handshake, handshakeCompare );
649}
650
651void
652tr_peerMgrAddIncoming( tr_peerMgr      * manager,
653                       struct in_addr  * addr,
654                       int               socket )
655{
656    if( getExistingHandshake( manager, addr ) == NULL )
657    {
658        tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, socket );
659        initiateHandshake( manager, io );
660    }
661}
662
663static void
664maybeConnect( tr_peerMgr * manager, Torrent * t, tr_peer * peer )
665{
666    tr_peerIo * io;
667
668    assert( manager != NULL );
669    assert( t != NULL );
670    assert( peer != NULL );
671
672    if( peer->io != NULL ) { /* already connected */
673        fprintf( stderr, "not connecting because we already have an IO for that address\n" );
674        return;
675    }
676    if( !t->isRunning ) { /* torrent's not running */
677        fprintf( stderr, "OUTGOING connection not being made because t [%s] is not running\n", t->tor->info.name );
678        return;
679    }
680    if( getExistingHandshake( manager, &peer->in_addr ) != NULL ) { /* already have a handshake pending */
681        fprintf( stderr, "already have a handshake for that address\n" );
682        return;
683    }
684
685    io = tr_peerIoNewOutgoing( manager->handle,
686                               &peer->in_addr,
687                               peer->port,
688                               t->hash );
689    initiateHandshake( manager, io );
690}
691
692void
693tr_peerMgrAddPex( tr_peerMgr     * manager,
694                  const uint8_t  * torrentHash,
695                  int              from,
696                  const tr_pex   * pex,
697                  int              pexCount )
698{
699    Torrent * t = getExistingTorrent( manager, torrentHash );
700    const tr_pex * end = pex + pexCount;
701    while( pex != end )
702    {
703        int isNew;
704        tr_peer * peer = getPeer( t, &pex->in_addr, &isNew );
705        if( isNew ) {
706            peer->port = pex->port;
707            peer->from = from;
708            maybeConnect( manager, t, peer );
709        }
710        ++pex;
711    }
712}
713
714void
715tr_peerMgrAddPeers( tr_peerMgr    * manager,
716                    const uint8_t * torrentHash,
717                    int             from,
718                    const uint8_t * peerCompact,
719                    int             peerCount )
720{
721    int i;
722    const uint8_t * walk = peerCompact;
723    Torrent * t = getExistingTorrent( manager, torrentHash );
724    for( i=0; t!=NULL && i<peerCount; ++i )
725    {
726        int isNew;
727        tr_peer * peer;
728        struct in_addr addr;
729        uint16_t port;
730        memcpy( &addr, walk, 4 ); walk += 4;
731        memcpy( &port, walk, 2 ); walk += 2;
732        peer = getPeer( t, &addr, &isNew );
733        if( isNew ) {
734            peer->port = port;
735            peer->from = from;
736            maybeConnect( manager, t, peer );
737        }
738    }
739}
740
741/**
742***
743**/
744
745int
746tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager UNUSED )
747{
748    return TRUE; /* manager->connectionCount < MAX_CONNECTED_PEERS; */
749}
750
751void
752tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
753                    const uint8_t  * torrentHash UNUSED,
754                    int              pieceIndex UNUSED,
755                    int              success UNUSED )
756{
757    fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );
758}
759
760int
761tr_pexCompare( const void * va, const void * vb )
762{
763    const tr_pex * a = (const tr_pex *) va;
764    const tr_pex * b = (const tr_pex *) vb;
765    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
766    if( i ) return i;
767    if( a->port < b->port ) return -1;
768    if( a->port > b->port ) return 1;
769    return 0;
770}
771
772int tr_pexCompare( const void * a, const void * b );
773
774
775int
776tr_peerMgrGetPeers( tr_peerMgr      * manager,
777                    const uint8_t   * torrentHash,
778                    tr_pex         ** setme_pex )
779{
780    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
781    int i, peerCount;
782    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
783    tr_pex * pex = tr_new( tr_pex, peerCount );
784    tr_pex * walk = pex;
785
786    for( i=0; i<peerCount; ++i, ++walk )
787    {
788        const tr_peer * peer = peers[i];
789
790        walk->in_addr = peer->in_addr;
791
792        walk->port = peer->port;
793
794        walk->flags = 0;
795        if( peer->peerSupportsEncryption ) walk->flags |= 1;
796        if( peer->progress >= 1.0 )        walk->flags |= 2;
797    }
798
799    assert( ( walk - pex ) == peerCount );
800    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
801    *setme_pex = pex;
802    return peerCount;
803}
804
805void
806tr_peerMgrStartTorrent( tr_peerMgr     * manager,
807                        const uint8_t  * torrentHash )
808{
809    int i, peerCount;
810    Torrent * t = getExistingTorrent( manager, torrentHash );
811
812    t->isRunning = 1;
813
814    peerCount = tr_ptrArraySize( t->peers );
815    for( i=0; i<peerCount; ++i )
816        maybeConnect( manager, t, tr_ptrArrayNth( t->peers, i ) );
817}
818
819void
820tr_peerMgrStopTorrent( tr_peerMgr     * manager,
821                       const uint8_t  * torrentHash)
822{
823    int i, peerCount;
824    Torrent * t = getExistingTorrent( manager, torrentHash );
825
826    t->isRunning = 0;
827
828    peerCount = tr_ptrArraySize( t->peers );
829    for( i=0; i<peerCount; ++i )
830        disconnectPeer( tr_ptrArrayNth( t->peers, i ) );
831}
832
833void
834tr_peerMgrAddTorrent( tr_peerMgr * manager,
835                      tr_torrent * tor )
836{
837    Torrent * t;
838
839    assert( tor != NULL );
840    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
841
842    t = tr_new0( Torrent, 1 );
843    t->manager = manager;
844    t->tor = tor;
845    t->peers = tr_ptrArrayNew( );
846    t->requested = tr_bitfieldNew( tor->blockCount );
847    t->chokeTimer = tr_timerNew( manager->handle, chokePulse, t, RECHOKE_PERIOD_SECONDS );
848
849    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
850    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
851}
852
853void
854tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
855                         const uint8_t  * torrentHash )
856{
857    Torrent * t = getExistingTorrent( manager, torrentHash );
858    assert( t != NULL );
859    tr_peerMgrStopTorrent( manager, torrentHash );
860    freeTorrent( manager, t );
861}
862
863void
864tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
865                               const uint8_t    * torrentHash,
866                               int8_t           * tab,
867                               int                tabCount )
868{
869    int i;
870    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
871    const tr_torrent * tor = t->tor;
872    const float interval = tor->info.pieceCount / (float)tabCount;
873
874    memset( tab, 0, tabCount );
875
876    for( i=0; i<tabCount; ++i )
877    {
878        const int piece = i * interval;
879
880        if( tor == NULL )
881            tab[i] = 0;
882        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
883            tab[i] = -1;
884        else {
885            int j, peerCount;
886            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
887            for( j=0; j<peerCount; ++j )
888                if( tr_bitfieldHas( peers[j]->have, i ) )
889                    ++tab[i];
890        }
891    }
892}
893
894
895void
896tr_peerMgrTorrentStats( const tr_peerMgr * manager,
897                        const uint8_t    * torrentHash,
898                        int              * setmePeersTotal,
899                        int              * setmePeersConnected,
900                        int              * setmePeersSendingToUs,
901                        int              * setmePeersGettingFromUs,
902                        int              * setmePeersFrom )
903{
904    int i, size;
905    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
906    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
907
908    *setmePeersTotal          = size;
909    *setmePeersConnected      = 0;
910    *setmePeersSendingToUs    = 0;
911    *setmePeersGettingFromUs  = 0;
912
913    for( i=0; i<TR_PEER_FROM__MAX; ++i )
914        setmePeersFrom[i] = 0;
915
916    for( i=0; i<size; ++i )
917    {
918        const tr_peer * peer = peers[i];
919
920        if( peer->io == NULL ) /* not connected */
921            continue;
922
923        ++*setmePeersConnected;
924
925        ++setmePeersFrom[peer->from];
926
927        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
928            ++*setmePeersGettingFromUs;
929
930        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
931            ++*setmePeersSendingToUs;
932    }
933}
934
935struct tr_peer_stat *
936tr_peerMgrPeerStats( const tr_peerMgr  * manager,
937                     const uint8_t     * torrentHash,
938                     int               * setmeCount UNUSED )
939{
940    int i, size;
941    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
942    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
943    tr_peer_stat * ret;
944
945    ret = tr_new0( tr_peer_stat, size );
946
947    for( i=0; i<size; ++i )
948    {
949        const tr_peer * peer = peers[i];
950        const int live = peer->io != NULL;
951        tr_peer_stat * stat = ret + i;
952
953        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
954        stat->port             = peer->port;
955        stat->from             = peer->from;
956        stat->client           = peer->client;
957        stat->progress         = peer->progress;
958        stat->isConnected      = live ? 1 : 0;
959        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
960        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
961        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
962        stat->isDownloading    = stat->uploadToRate > 0.01;
963        stat->isUploading      = stat->downloadFromRate > 0.01;
964    }
965
966    *setmeCount = size;
967    return ret;
968}
969
970/**
971***
972**/
973
974typedef struct
975{
976    tr_peer * peer;
977    float rate;
978    int randomKey;
979    int preferred;
980    int doUnchoke;
981}
982ChokeData;
983
984static int
985compareChoke( const void * va, const void * vb )
986{
987    const ChokeData * a = ( const ChokeData * ) va;
988    const ChokeData * b = ( const ChokeData * ) vb;
989
990    if( a->preferred != b->preferred )
991        return a->preferred ? -1 : 1;
992
993    if( a->preferred )
994    {
995        if( a->rate > b->rate ) return -1;
996        if( a->rate < b->rate ) return 1;
997        return 0;
998    }
999    else
1000    {
1001        return a->randomKey - b->randomKey;
1002    }
1003}
1004
1005static int
1006clientIsSnubbedBy( const tr_peer * peer )
1007{
1008    assert( peer != NULL );
1009
1010    return peer->peerSentDataAt < (time(NULL) - 30);
1011}
1012
1013static void
1014rechokeLeech( Torrent * t )
1015{
1016    int i, size, unchoked=0;
1017    tr_peer ** peers = getConnectedPeers( t, &size );
1018    ChokeData * choke = tr_new0( ChokeData, size );
1019
1020    /* sort the peers by preference and rate */
1021    for( i=0; i<size; ++i ) {
1022        tr_peer * peer = peers[i];
1023        ChokeData * node = &choke[i];
1024        node->peer = peer;
1025        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1026        node->randomKey = tr_rand( INT_MAX );
1027        node->rate = tr_peerIoGetRateToClient( peer->io );
1028    }
1029    qsort( choke, size, sizeof(ChokeData), compareChoke );
1030
1031    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1032        choke[i].doUnchoke = 1;
1033        ++unchoked;
1034    }
1035
1036    for( ; i<size; ++i ) {
1037        choke[i].doUnchoke = 1;
1038        ++unchoked;
1039        if( choke[i].peer->peerIsInterested )
1040            break;
1041    }
1042
1043    for( i=0; i<size; ++i )
1044        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1045
1046    /* cleanup */
1047    tr_free( choke );
1048    tr_free( peers );
1049}
1050
1051static void
1052rechokeSeed( Torrent * t )
1053{
1054    int i, size;
1055    tr_peer ** peers = getConnectedPeers( t, &size );
1056
1057    /* FIXME */
1058    for( i=0; i<size; ++i )
1059        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1060
1061    tr_free( peers );
1062}
1063
1064static int
1065chokePulse( void * vtorrent )
1066{
1067    Torrent * t = vtorrent;
1068    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1069    if( done )
1070        rechokeLeech( vtorrent );
1071    else
1072        rechokeSeed( vtorrent );
1073    return TRUE;
1074}
Note: See TracBrowser for help on using the repository browser.