source: branches/encryption/libtransmission/peer-mgr.c @ 3098

Last change on this file since 3098 was 3098, checked in by charles, 15 years ago

fix a memory leak and delete some dead code.

  • Property svn:keywords set to Date Rev Author Id
File size: 28.0 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3098 2007-09-18 16:56:53Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include "transmission.h"
20#include "clients.h"
21#include "completion.h"
22#include "handshake.h"
23#include "net.h"
24#include "peer-io.h"
25#include "peer-mgr.h"
26#include "peer-mgr-private.h"
27#include "peer-msgs.h"
28#include "ptrarray.h"
29#include "trevent.h"
30#include "utils.h"
31
32/* how frequently to change which peers are choked */
33#define RECHOKE_PERIOD_SECONDS (15 * 1000)
34
35#define REFILL_PERIOD_MSEC 1000
36
37/* how many peers to unchoke per-torrent. */
38/* FIXME: make this user-configurable? */
39#define NUM_UNCHOKED_PEERS_PER_TORRENT 8
40
41/**
42***
43**/
44
45typedef struct
46{
47    uint8_t hash[SHA_DIGEST_LENGTH];
48    tr_ptrArray * peers; /* tr_peer */
49    tr_timer * chokeTimer;
50    tr_timer * refillTimer;
51    tr_torrent * tor;
52    tr_bitfield * requested;
53
54    unsigned int isRunning : 1;
55
56    struct tr_peerMgr * manager;
57}
58Torrent;
59
60struct tr_peerMgr
61{
62    tr_handle * handle;
63    tr_ptrArray * torrents; /* Torrent */
64    int connectionCount;
65    tr_ptrArray * handshakes; /* in-process */
66};
67
68/**
69***
70**/
71
72static int
73torrentCompare( const void * va, const void * vb )
74{
75    const Torrent * a = (const Torrent*) va;
76    const Torrent * b = (const Torrent*) vb;
77    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
78}
79
80static int
81torrentCompareToHash( const void * va, const void * vb )
82{
83    const Torrent * a = (const Torrent*) va;
84    const uint8_t * b_hash = (const uint8_t*) vb;
85    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
86}
87
88static Torrent*
89getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
90{
91    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
92                                             hash,
93                                             torrentCompareToHash );
94}
95
96static int chokePulse( void * vtorrent );
97
98static int
99peerCompare( const void * va, const void * vb )
100{
101    const tr_peer * a = (const tr_peer *) va;
102    const tr_peer * b = (const tr_peer *) vb;
103    return memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
104}
105
106static int
107peerCompareToAddr( const void * va, const void * vb )
108{
109    const tr_peer * a = (const tr_peer *) va;
110    const struct in_addr * b = (const struct in_addr *) vb;
111    return memcmp( &a->in_addr, b, sizeof(struct in_addr) );
112}
113
114static tr_peer*
115getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
116{
117    assert( torrent != NULL );
118    assert( torrent->peers != NULL );
119    assert( in_addr != NULL );
120
121    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
122                                             in_addr,
123                                             peerCompareToAddr );
124}
125
126static tr_peer*
127getPeer( Torrent * torrent, const struct in_addr * in_addr, int * isNew )
128{
129    tr_peer * peer = getExistingPeer( torrent, in_addr );
130
131    if( isNew )
132        *isNew = peer == NULL;
133
134    if( peer == NULL )
135    {
136        peer = tr_new0( tr_peer, 1 );
137        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
138        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
139fprintf( stderr, "getPeer: torrent %p now has %d peers\n", torrent, tr_ptrArraySize(torrent->peers) );
140    }
141
142    return peer;
143}
144
145static void
146disconnectPeer( tr_peer * peer )
147{
148    assert( peer != NULL );
149
150    tr_peerIoFree( peer->io );
151    peer->io = NULL;
152
153    if( peer->msgs != NULL )
154    {
155fprintf( stderr, "PUB unsub peer %p from msgs %p\n", peer, peer->msgs );
156        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
157        tr_peerMsgsFree( peer->msgs );
158        peer->msgs = NULL;
159    }
160
161    tr_bitfieldFree( peer->have );
162    peer->have = NULL;
163
164    tr_bitfieldFree( peer->blame );
165    peer->blame = NULL;
166
167    tr_bitfieldFree( peer->banned );
168    peer->banned = NULL;
169}
170
171static void
172freePeer( tr_peer * peer )
173{
174    disconnectPeer( peer );
175    tr_free( peer->client );
176    tr_free( peer );
177}
178
179static void
180freeTorrent( tr_peerMgr * manager, Torrent * t )
181{
182    int i, size;
183    tr_peer ** peers;
184    uint8_t hash[SHA_DIGEST_LENGTH];
185
186fprintf( stderr, "timer freeTorrent %p\n", t );
187
188    assert( manager != NULL );
189    assert( t != NULL );
190    assert( t->peers != NULL );
191    assert( getExistingTorrent( manager, t->hash ) != NULL );
192
193    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
194
195    tr_timerFree( &t->chokeTimer );
196    tr_timerFree( &t->refillTimer );
197
198    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
199    for( i=0; i<size; ++i )
200        freePeer( peers[i] );
201
202    tr_bitfieldFree( t->requested );
203    tr_ptrArrayFree( t->peers );
204    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
205    tr_free( t );
206
207    assert( getExistingTorrent( manager, hash ) == NULL );
208}
209
210/**
211***
212**/
213
214tr_peerMgr*
215tr_peerMgrNew( tr_handle * handle )
216{
217    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
218    m->handle = handle;
219    m->torrents = tr_ptrArrayNew( );
220    m->handshakes = tr_ptrArrayNew( );
221    return m;
222}
223
224void
225tr_peerMgrFree( tr_peerMgr * manager )
226{
227    int i, n;
228fprintf( stderr, "timer peerMgrFree\n" );
229
230    while( !tr_ptrArrayEmpty( manager->torrents ) )
231        freeTorrent( manager, (Torrent*)tr_ptrArrayNth( manager->torrents, 0) );
232    tr_ptrArrayFree( manager->torrents );
233
234    for( i=0, n=tr_ptrArraySize(manager->handshakes); i<n; ++i )
235        tr_handshakeAbort( (tr_handshake*) tr_ptrArrayNth( manager->handshakes, i) );
236    tr_ptrArrayFree( manager->handshakes );
237
238    tr_free( manager );
239}
240
241static tr_peer**
242getConnectedPeers( Torrent * t, int * setmeCount )
243{
244    int i, peerCount, connectionCount;
245    tr_peer **peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
246    tr_peer **ret = tr_new( tr_peer*, peerCount );
247
248    for( i=connectionCount=0; i<peerCount; ++i )
249        if( peers[i]->msgs != NULL )
250            ret[connectionCount++] = peers[i];
251
252    *setmeCount = connectionCount;
253    return ret;
254}
255
256/***
257****  Refill
258***/
259
260struct tr_refill_piece
261{
262    tr_priority_t priority;
263    uint32_t piece;
264    uint32_t peerCount;
265};
266
267static int
268compareRefillPiece (const void * aIn, const void * bIn)
269{
270    const struct tr_refill_piece * a = aIn;
271    const struct tr_refill_piece * b = bIn;
272
273    /* if one piece has a higher priority, it goes first */
274    if (a->priority != b->priority)
275        return a->priority > b->priority ? -1 : 1;
276
277    /* otherwise if one has fewer peers, it goes first */
278    if (a->peerCount != b->peerCount)
279        return a->peerCount < b->peerCount ? -1 : 1;
280
281    /* otherwise go with the earlier piece */
282    return a->piece - b->piece;
283}
284
285static int
286isPieceInteresting( const tr_torrent  * tor,
287                    int                 piece )
288{
289    if( tor->info.pieces[piece].dnd ) /* we don't want it */
290        return 0;
291
292    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
293        return 0;
294
295    return 1;
296}
297
298static uint32_t*
299getPreferredPieces( Torrent     * t,
300                    uint32_t    * pieceCount )
301{
302    const tr_torrent * tor = t->tor;
303    const tr_info * inf = &tor->info;
304
305    int i;
306    uint32_t poolSize = 0;
307    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
308
309    int peerCount;
310    tr_peer** peers = getConnectedPeers( t, &peerCount );
311
312    for( i=0; i<inf->pieceCount; ++i )
313        if( isPieceInteresting( tor, i ) )
314            pool[poolSize++] = i;
315
316    /* sort the pool from most interesting to least... */
317    if( poolSize > 1 )
318    {
319        uint32_t j;
320        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
321
322        for( j=0; j<poolSize; ++j )
323        {
324            int k;
325            const int piece = pool[j];
326            struct tr_refill_piece * setme = p + j;
327
328            setme->piece = piece;
329            setme->priority = inf->pieces[piece].priority;
330            setme->peerCount = 0;
331
332            for( k=0; k<peerCount; ++k ) {
333                const tr_peer * peer = peers[k];
334                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
335                    ++setme->peerCount;
336            }
337        }
338
339        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
340
341        for( j=0; j<poolSize; ++j )
342            pool[j] = p[j].piece;
343
344        tr_free( p );
345    }
346
347#if 0
348fprintf (stderr, "new pool: ");
349for (i=0; i<15 && i<(int)poolSize; ++i ) fprintf (stderr, "%d, ", (int)pool[i] );
350fprintf (stderr, "\n");
351#endif
352    tr_free( peers );
353
354    *pieceCount = poolSize;
355    return pool;
356}
357
358static uint64_t*
359getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
360{
361    uint32_t i;
362    uint32_t pieceCount;
363    uint32_t * pieces;
364    uint64_t *req, *unreq, *ret, *walk;
365    int reqCount, unreqCount;
366    const tr_torrent * tor = t->tor;
367
368    pieces = getPreferredPieces( t, &pieceCount );
369fprintf( stderr, "REFILL refillPulse for {%s} got %d of %d pieces\n", tor->info.name, (int)pieceCount, t->tor->info.pieceCount );
370
371    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
372    reqCount = 0;
373    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
374    unreqCount = 0;
375
376    for( i=0; i<pieceCount; ++i ) {
377        const uint32_t index = pieces[i];
378        const int begin = tr_torPieceFirstBlock( tor, index );
379        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
380        int block;
381        for( block=begin; block<end; ++block )
382            if( tr_cpBlockIsComplete( tor->completion, block ) )
383                continue;
384            else if( tr_bitfieldHas( t->requested, block ) )
385                req[reqCount++] = block;
386            else
387                unreq[unreqCount++] = block;
388    }
389
390fprintf( stderr, "REFILL refillPulse for {%s} reqCount is %d, unreqCount is %d\n", tor->info.name, (int)reqCount, (int)unreqCount );
391    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
392    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
393    walk += unreqCount;
394    memcpy( walk, req, sizeof(uint64_t) * reqCount );
395    walk += reqCount;
396    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
397    *setmeCount = walk - ret;
398
399    tr_free( req );
400    tr_free( unreq );
401    tr_free( pieces );
402
403    return ret;
404}
405
406static int
407refillPulse( void * vtorrent )
408{
409    Torrent * t = vtorrent;
410    tr_torrent * tor = t->tor;
411    uint32_t i;
412    int peerCount;
413    tr_peer ** peers;
414    uint64_t blockCount;
415    uint64_t * blocks;
416
417    if( !t->isRunning )
418        return TRUE;
419    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
420        return TRUE;
421
422    blocks = getPreferredBlocks( t, &blockCount );
423    peers = getConnectedPeers( t, &peerCount );
424
425fprintf( stderr, "REFILL refillPulse for {%s} got %d blocks\n", tor->info.name, (int)blockCount );
426
427    for( i=0; peerCount && i<blockCount; ++i )
428    {
429        const int block = blocks[i];
430        const uint32_t index = tr_torBlockPiece( tor, block );
431        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
432        const uint32_t length = tr_torBlockCountBytes( tor, block );
433        int j;
434        assert( _tr_block( tor, index, begin ) == block );
435        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
436        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
437
438
439        /* find a peer who can ask for this block */
440        for( j=0; j<peerCount; )
441        {
442            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
443            switch( val )
444            {
445                case TR_ADDREQ_FULL: 
446                case TR_ADDREQ_CLIENT_CHOKED:
447                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
448                    break;
449
450                case TR_ADDREQ_MISSING: 
451                    ++j;
452                    break;
453
454                case TR_ADDREQ_OK:
455                    fprintf( stderr, "REFILL peer %p took the request for block %d\n", peers[j]->msgs, block );
456                    tr_bitfieldAdd( t->requested, block );
457                    j = peerCount;
458                    break;
459
460                default:
461                    assert( 0 && "unhandled value" );
462                    break;
463            }
464        }
465    }
466
467    /* cleanup */
468    tr_free( peers );
469    tr_free( blocks );
470
471    t->refillTimer = NULL;
472    return FALSE;
473}
474
475static void
476broadcastClientHave( Torrent * t, uint32_t index )
477{
478    int i, size;
479    tr_peer ** peers = getConnectedPeers( t, &size );
480    for( i=0; i<size; ++i )
481        tr_peerMsgsHave( peers[i]->msgs, index );
482    tr_free( peers );
483}
484
485static void
486broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
487{
488    int i, size;
489    tr_peer ** peers = getConnectedPeers( t, &size );
490    for( i=0; i<size; ++i )
491        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
492    tr_free( peers );
493}
494
495static void
496msgsCallbackFunc( void * source UNUSED, void * vevent, void * vt )
497{
498    Torrent * t = (Torrent *) vt;
499    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
500
501    switch( e->eventType )
502    {
503        case TR_PEERMSG_NEED_REQ:
504            if( t->refillTimer == NULL )
505                t->refillTimer = tr_timerNew( t->manager->handle,
506                                              refillPulse, t,
507                                              REFILL_PERIOD_MSEC );
508            break;
509
510        case TR_PEERMSG_CLIENT_HAVE:
511            broadcastClientHave( t, e->pieceIndex );
512            break;
513
514        case TR_PEERMSG_CLIENT_BLOCK:
515            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
516            break;
517
518        case TR_PEERMSG_GOT_ERROR:
519            /* FIXME */
520            break;
521
522        default:
523            assert(0);
524    }
525}
526
527static void
528myHandshakeDoneCB( tr_handshake    * handshake,
529                   tr_peerIo       * io,
530                   int               isConnected,
531                   const uint8_t   * peer_id,
532                   int               peerSupportsEncryption,
533                   void            * vmanager )
534{
535    int ok = isConnected;
536    uint16_t port;
537    const struct in_addr * in_addr;
538    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
539    const uint8_t * hash = NULL;
540    Torrent * t;
541    tr_handshake * ours;
542
543    assert( io != NULL );
544    assert( isConnected==0 || isConnected==1 );
545    assert( peerSupportsEncryption==0 || peerSupportsEncryption==1 );
546
547    ours = tr_ptrArrayRemoveSorted( manager->handshakes,
548                                    handshake,
549                                    tr_comparePointers );
550    assert( handshake == ours );
551
552    in_addr = tr_peerIoGetAddress( io, &port );
553
554    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
555    {
556        tr_peerIoFree( io );
557        --manager->connectionCount;
558        return;
559    }
560
561    hash = tr_peerIoGetTorrentHash( io );
562    t = getExistingTorrent( manager, hash );
563    if( !t || !t->isRunning )
564    {
565        tr_peerIoFree( io );
566        --manager->connectionCount;
567        return;
568    }
569
570    fprintf( stderr, "peer-mgr: torrent [%s] finished a handshake; isConnected is %d\n", t->tor->info.name, isConnected );
571
572    /* if we couldn't connect or were snubbed,
573     * the peer's probably not worth remembering. */
574    if( !ok ) {
575        tr_peer * peer = getExistingPeer( t, in_addr );
576        fprintf( stderr, "peer-mgr: torrent [%s] got a bad one, and you know what? fuck them.\n", t->tor->info.name );
577        tr_peerIoFree( io );
578        --manager->connectionCount;
579        if( peer ) {
580            tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
581            freePeer( peer );
582        }
583        return;
584    }
585
586    if( 1 ) {
587        tr_peer * peer = getPeer( t, in_addr, NULL );
588        if( peer->msgs != NULL ) { /* we already have this peer */
589            tr_peerIoFree( io );
590            --manager->connectionCount;
591        } else {
592            peer->port = port;
593            peer->io = io;
594            peer->msgs = tr_peerMsgsNew( t->tor, peer );
595            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
596            peer->peerSupportsEncryption = peerSupportsEncryption ? 1 : 0;
597            fprintf( stderr, "PUB sub peer %p to msgs %p\n", peer, peer->msgs );
598            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
599        }
600    }
601}
602
603static void
604initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
605{
606    tr_handshake * handshake = tr_handshakeNew( io,
607                                                HANDSHAKE_ENCRYPTION_PREFERRED,
608                                                myHandshakeDoneCB,
609                                                manager );
610    ++manager->connectionCount;
611
612    tr_ptrArrayInsertSorted( manager->handshakes, handshake, tr_comparePointers );
613}
614
615void
616tr_peerMgrAddIncoming( tr_peerMgr      * manager,
617                       struct in_addr  * addr,
618                       int               socket )
619{
620    tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, socket );
621    initiateHandshake( manager, io );
622}
623
624static void
625maybeConnect( tr_peerMgr * manager, Torrent * t, tr_peer * peer )
626{
627    tr_peerIo * io;
628
629    assert( manager != NULL );
630    assert( t != NULL );
631    assert( peer != NULL );
632
633    if( peer->io != NULL ) { /* already connected */
634        fprintf( stderr, "not connecting because we already have an IO for that address\n" );
635        return;
636    }
637    if( !t->isRunning ) { /* torrent's not running */
638        fprintf( stderr, "OUTGOING connection not being made because t [%s] is not running\n", t->tor->info.name );
639        return;
640    }
641
642    io = tr_peerIoNewOutgoing( manager->handle,
643                               &peer->in_addr,
644                               peer->port,
645                               t->hash );
646    initiateHandshake( manager, io );
647}
648
649void
650tr_peerMgrAddPex( tr_peerMgr     * manager,
651                  const uint8_t  * torrentHash,
652                  int              from,
653                  const tr_pex   * pex,
654                  int              pexCount )
655{
656    Torrent * t = getExistingTorrent( manager, torrentHash );
657    const tr_pex * end = pex + pexCount;
658    while( pex != end )
659    {
660        int isNew;
661        tr_peer * peer = getPeer( t, &pex->in_addr, &isNew );
662        if( isNew ) {
663            peer->port = pex->port;
664            peer->from = from;
665            maybeConnect( manager, t, peer );
666        }
667        ++pex;
668    }
669}
670
671void
672tr_peerMgrAddPeers( tr_peerMgr    * manager,
673                    const uint8_t * torrentHash,
674                    int             from,
675                    const uint8_t * peerCompact,
676                    int             peerCount )
677{
678    int i;
679    const uint8_t * walk = peerCompact;
680    Torrent * t = getExistingTorrent( manager, torrentHash );
681    for( i=0; t!=NULL && i<peerCount; ++i )
682    {
683        int isNew;
684        tr_peer * peer;
685        struct in_addr addr;
686        uint16_t port;
687        memcpy( &addr, walk, 4 ); walk += 4;
688        memcpy( &port, walk, 2 ); walk += 2;
689        peer = getPeer( t, &addr, &isNew );
690        if( isNew ) {
691            peer->port = port;
692            peer->from = from;
693            maybeConnect( manager, t, peer );
694        }
695    }
696}
697
698/**
699***
700**/
701
702int
703tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager UNUSED )
704{
705    return TRUE; /* manager->connectionCount < MAX_CONNECTED_PEERS; */
706}
707
708void
709tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
710                    const uint8_t  * torrentHash UNUSED,
711                    int              pieceIndex UNUSED,
712                    int              success UNUSED )
713{
714    fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );
715}
716
717int
718tr_pexCompare( const void * va, const void * vb )
719{
720    const tr_pex * a = (const tr_pex *) va;
721    const tr_pex * b = (const tr_pex *) vb;
722    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
723    if( i ) return i;
724    if( a->port < b->port ) return -1;
725    if( a->port > b->port ) return 1;
726    return 0;
727}
728
729int tr_pexCompare( const void * a, const void * b );
730
731
732int
733tr_peerMgrGetPeers( tr_peerMgr      * manager,
734                    const uint8_t   * torrentHash,
735                    tr_pex         ** setme_pex )
736{
737    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
738    int i, peerCount;
739    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
740    tr_pex * pex = tr_new( tr_pex, peerCount );
741    tr_pex * walk = pex;
742
743    for( i=0; i<peerCount; ++i, ++walk )
744    {
745        const tr_peer * peer = peers[i];
746
747        walk->in_addr = peer->in_addr;
748
749        walk->port = peer->port;
750
751        walk->flags = 0;
752        if( peer->peerSupportsEncryption ) walk->flags |= 1;
753        if( peer->progress >= 1.0 )        walk->flags |= 2;
754    }
755
756    assert( ( walk - pex ) == peerCount );
757    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
758    *setme_pex = pex;
759    return peerCount;
760}
761
762void
763tr_peerMgrStartTorrent( tr_peerMgr     * manager,
764                        const uint8_t  * torrentHash )
765{
766    int i, peerCount;
767    Torrent * t = getExistingTorrent( manager, torrentHash );
768
769    t->isRunning = 1;
770
771    peerCount = tr_ptrArraySize( t->peers );
772    for( i=0; i<peerCount; ++i )
773        maybeConnect( manager, t, tr_ptrArrayNth( t->peers, i ) );
774}
775
776void
777tr_peerMgrStopTorrent( tr_peerMgr     * manager,
778                       const uint8_t  * torrentHash)
779{
780    int i, peerCount;
781    Torrent * t = getExistingTorrent( manager, torrentHash );
782
783    t->isRunning = 0;
784
785    peerCount = tr_ptrArraySize( t->peers );
786    for( i=0; i<peerCount; ++i )
787        disconnectPeer( tr_ptrArrayNth( t->peers, i ) );
788}
789
790void
791tr_peerMgrAddTorrent( tr_peerMgr * manager,
792                      tr_torrent * tor )
793{
794    Torrent * t;
795
796    assert( tor != NULL );
797    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
798
799    t = tr_new0( Torrent, 1 );
800    t->manager = manager;
801    t->tor = tor;
802    t->peers = tr_ptrArrayNew( );
803    t->requested = tr_bitfieldNew( tor->blockCount );
804    t->chokeTimer = tr_timerNew( manager->handle, chokePulse, t, RECHOKE_PERIOD_SECONDS );
805
806    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
807    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
808}
809
810void
811tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
812                         const uint8_t  * torrentHash )
813{
814    Torrent * t = getExistingTorrent( manager, torrentHash );
815    assert( t != NULL );
816    tr_peerMgrStopTorrent( manager, torrentHash );
817    freeTorrent( manager, t );
818}
819
820void
821tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
822                               const uint8_t    * torrentHash,
823                               int8_t           * tab,
824                               int                tabCount )
825{
826    int i;
827    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
828    const tr_torrent * tor = t->tor;
829    const float interval = tor->info.pieceCount / (float)tabCount;
830
831    memset( tab, 0, tabCount );
832
833    for( i=0; i<tabCount; ++i )
834    {
835        const int piece = i * interval;
836
837        if( tor == NULL )
838            tab[i] = 0;
839        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
840            tab[i] = -1;
841        else {
842            int j, peerCount;
843            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
844            for( j=0; j<peerCount; ++j )
845                if( tr_bitfieldHas( peers[j]->have, i ) )
846                    ++tab[i];
847        }
848    }
849}
850
851
852void
853tr_peerMgrTorrentStats( const tr_peerMgr * manager,
854                        const uint8_t    * torrentHash,
855                        int              * setmePeersTotal,
856                        int              * setmePeersConnected,
857                        int              * setmePeersSendingToUs,
858                        int              * setmePeersGettingFromUs,
859                        int              * setmePeersFrom )
860{
861    int i, size;
862    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
863    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
864
865    *setmePeersTotal          = size;
866    *setmePeersConnected      = 0;
867    *setmePeersSendingToUs    = 0;
868    *setmePeersGettingFromUs  = 0;
869
870    for( i=0; i<size; ++i )
871    {
872        const tr_peer * peer = peers[i];
873
874        if( peer->io == NULL ) /* not connected */
875            continue;
876
877        ++*setmePeersConnected;
878
879        ++setmePeersFrom[peer->from];
880
881        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
882            ++*setmePeersGettingFromUs;
883
884        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
885            ++*setmePeersSendingToUs;
886    }
887}
888
889struct tr_peer_stat *
890tr_peerMgrPeerStats( const tr_peerMgr  * manager,
891                     const uint8_t     * torrentHash,
892                     int               * setmeCount UNUSED )
893{
894    int i, size;
895    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
896    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
897    tr_peer_stat * ret;
898
899    ret = tr_new0( tr_peer_stat, size );
900
901    for( i=0; i<size; ++i )
902    {
903        const tr_peer * peer = peers[i];
904        const int live = peer->io != NULL;
905        tr_peer_stat * stat = ret + i;
906
907        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
908        stat->port             = peer->port;
909        stat->from             = peer->from;
910        stat->client           = peer->client;
911        stat->progress         = peer->progress;
912        stat->isConnected      = live ? 1 : 0;
913        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
914        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
915        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
916        stat->isDownloading    = stat->uploadToRate > 0.01;
917        stat->isUploading      = stat->downloadFromRate > 0.01;
918    }
919
920    *setmeCount = size;
921    return ret;
922}
923
924/**
925***
926**/
927
928typedef struct
929{
930    tr_peer * peer;
931    float rate;
932    int randomKey;
933    int preferred;
934    int doUnchoke;
935}
936ChokeData;
937
938static int
939compareChoke( const void * va, const void * vb )
940{
941    const ChokeData * a = ( const ChokeData * ) va;
942    const ChokeData * b = ( const ChokeData * ) vb;
943
944    if( a->preferred != b->preferred )
945        return a->preferred ? -1 : 1;
946
947    if( a->preferred )
948    {
949        if( a->rate > b->rate ) return -1;
950        if( a->rate < b->rate ) return 1;
951        return 0;
952    }
953    else
954    {
955        return a->randomKey - b->randomKey;
956    }
957}
958
959static int
960clientIsSnubbedBy( const tr_peer * peer )
961{
962    assert( peer != NULL );
963
964    return peer->peerSentDataAt < (time(NULL) - 30);
965}
966
967static void
968rechokeLeech( Torrent * t )
969{
970    int i, size, unchoked=0;
971    tr_peer ** peers = getConnectedPeers( t, &size );
972    ChokeData * choke = tr_new0( ChokeData, size );
973
974    /* sort the peers by preference and rate */
975    for( i=0; i<size; ++i ) {
976        tr_peer * peer = peers[i];
977        ChokeData * node = &choke[i];
978        node->peer = peer;
979        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
980        node->randomKey = tr_rand( INT_MAX );
981        node->rate = tr_peerIoGetRateToClient( peer->io );
982    }
983    qsort( choke, size, sizeof(ChokeData), compareChoke );
984
985    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
986        choke[i].doUnchoke = 1;
987        ++unchoked;
988    }
989
990    for( ; i<size; ++i ) {
991        choke[i].doUnchoke = 1;
992        ++unchoked;
993        if( choke[i].peer->peerIsInterested )
994            break;
995    }
996
997    for( i=0; i<size; ++i )
998        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
999
1000    /* cleanup */
1001    tr_free( choke );
1002    tr_free( peers );
1003}
1004
1005static void
1006rechokeSeed( Torrent * t )
1007{
1008    int i, size;
1009    tr_peer ** peers = getConnectedPeers( t, &size );
1010
1011    /* FIXME */
1012    for( i=0; i<size; ++i )
1013        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1014
1015    tr_free( peers );
1016}
1017
1018static int
1019chokePulse( void * vtorrent )
1020{
1021    Torrent * t = vtorrent;
1022    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1023    if( done )
1024        rechokeLeech( vtorrent );
1025    else
1026        rechokeSeed( vtorrent );
1027    return TRUE;
1028}
Note: See TracBrowser for help on using the repository browser.