source: trunk/libtransmission/peer-mgr.c @ 3125

Last change on this file since 3125 was 3125, checked in by charles, 15 years ago

give waldorf a different assertion to crash on.

  • Property svn:keywords set to Date Rev Author Id
File size: 29.0 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3125 2007-09-21 15:31:46Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include "transmission.h"
20#include "clients.h"
21#include "completion.h"
22#include "handshake.h"
23#include "net.h"
24#include "peer-io.h"
25#include "peer-mgr.h"
26#include "peer-mgr-private.h"
27#include "peer-msgs.h"
28#include "ptrarray.h"
29#include "trevent.h"
30#include "utils.h"
31
32/* how frequently to change which peers are choked */
33#define RECHOKE_PERIOD_SECONDS (15 * 1000)
34
35#define REFILL_PERIOD_MSEC 1000
36
37/* how many peers to unchoke per-torrent. */
38/* FIXME: make this user-configurable? */
39#define NUM_UNCHOKED_PEERS_PER_TORRENT 8
40
41/**
42***
43**/
44
45typedef struct
46{
47    uint8_t hash[SHA_DIGEST_LENGTH];
48    tr_ptrArray * peers; /* tr_peer */
49    tr_timer * chokeTimer;
50    tr_timer * refillTimer;
51    tr_torrent * tor;
52    tr_bitfield * requested;
53
54    unsigned int isRunning : 1;
55
56    struct tr_peerMgr * manager;
57}
58Torrent;
59
60struct tr_peerMgr
61{
62    tr_handle * handle;
63    tr_ptrArray * torrents; /* Torrent */
64    int connectionCount;
65    tr_ptrArray * handshakes; /* in-process */
66};
67
68/**
69***
70**/
71
72static int
73handshakeCompareToAddr( const void * va, const void * vb )
74{
75    const tr_handshake * a = va;
76    const struct in_addr * b = vb;
77    return memcmp( tr_handshakeGetAddr( a, NULL ), b, sizeof( struct in_addr ) );
78}
79
80static int
81handshakeCompare( const void * a, const void * b )
82{
83    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
84}
85
86static tr_handshake*
87getExistingHandshake( tr_peerMgr * mgr, const struct in_addr * in_addr )
88{
89    return tr_ptrArrayFindSorted( mgr->handshakes,
90                                  in_addr,
91                                  handshakeCompareToAddr );
92}
93
94/**
95***
96**/
97
98static int
99torrentCompare( const void * va, const void * vb )
100{
101    const Torrent * a = va;
102    const Torrent * b = vb;
103    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
104}
105
106static int
107torrentCompareToHash( const void * va, const void * vb )
108{
109    const Torrent * a = (const Torrent*) va;
110    const uint8_t * b_hash = (const uint8_t*) vb;
111    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
112}
113
114static Torrent*
115getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
116{
117    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
118                                             hash,
119                                             torrentCompareToHash );
120}
121
122static int chokePulse( void * vtorrent );
123
124static int
125peerCompare( const void * va, const void * vb )
126{
127    const tr_peer * a = (const tr_peer *) va;
128    const tr_peer * b = (const tr_peer *) vb;
129    return memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
130}
131
132static int
133peerCompareToAddr( const void * va, const void * vb )
134{
135    const tr_peer * a = (const tr_peer *) va;
136    const struct in_addr * b = (const struct in_addr *) vb;
137    return memcmp( &a->in_addr, b, sizeof(struct in_addr) );
138}
139
140static tr_peer*
141getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
142{
143    assert( torrent != NULL );
144    assert( torrent->peers != NULL );
145    assert( in_addr != NULL );
146
147    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
148                                             in_addr,
149                                             peerCompareToAddr );
150}
151
152static tr_peer*
153getPeer( Torrent * torrent, const struct in_addr * in_addr, int * isNew )
154{
155    tr_peer * peer = getExistingPeer( torrent, in_addr );
156
157    if( isNew )
158        *isNew = peer == NULL;
159
160    if( peer == NULL )
161    {
162        peer = tr_new0( tr_peer, 1 );
163        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
164        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
165    }
166
167    return peer;
168}
169
170static void
171disconnectPeer( tr_peer * peer )
172{
173    assert( peer != NULL );
174
175    tr_peerIoFree( peer->io );
176    peer->io = NULL;
177
178    if( peer->msgs != NULL )
179    {
180        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
181        tr_peerMsgsFree( peer->msgs );
182        peer->msgs = NULL;
183    }
184
185    tr_bitfieldFree( peer->have );
186    peer->have = NULL;
187
188    tr_bitfieldFree( peer->blame );
189    peer->blame = NULL;
190
191    tr_bitfieldFree( peer->banned );
192    peer->banned = NULL;
193}
194
195static void
196freePeer( tr_peer * peer )
197{
198    disconnectPeer( peer );
199    tr_free( peer->client );
200    tr_free( peer );
201}
202
203static void
204freeTorrent( tr_peerMgr * manager, Torrent * t )
205{
206    int i, size;
207    tr_peer ** peers;
208    uint8_t hash[SHA_DIGEST_LENGTH];
209
210fprintf( stderr, "timer freeTorrent %p\n", t );
211
212    assert( manager != NULL );
213    assert( t != NULL );
214    assert( t->peers != NULL );
215    assert( getExistingTorrent( manager, t->hash ) != NULL );
216
217    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
218
219    tr_timerFree( &t->chokeTimer );
220    tr_timerFree( &t->refillTimer );
221
222    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
223    for( i=0; i<size; ++i )
224        freePeer( peers[i] );
225
226    tr_bitfieldFree( t->requested );
227    tr_ptrArrayFree( t->peers );
228    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
229    tr_free( t );
230
231    assert( getExistingTorrent( manager, hash ) == NULL );
232}
233
234/**
235***
236**/
237
238tr_peerMgr*
239tr_peerMgrNew( tr_handle * handle )
240{
241    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
242    m->handle = handle;
243    m->torrents = tr_ptrArrayNew( );
244    m->handshakes = tr_ptrArrayNew( );
245    return m;
246}
247
248void
249tr_peerMgrFree( tr_peerMgr * manager )
250{
251fprintf( stderr, "timer peerMgrFree\n" );
252
253    while( !tr_ptrArrayEmpty( manager->handshakes ) )
254        tr_handshakeAbort( (tr_handshake*)tr_ptrArrayNth( manager->handshakes, 0) );
255    tr_ptrArrayFree( manager->handshakes );
256
257    while( !tr_ptrArrayEmpty( manager->torrents ) )
258        freeTorrent( manager, (Torrent*)tr_ptrArrayNth( manager->torrents, 0) );
259    tr_ptrArrayFree( manager->torrents );
260
261    tr_free( manager );
262}
263
264static tr_peer**
265getConnectedPeers( Torrent * t, int * setmeCount )
266{
267    int i, peerCount, connectionCount;
268    tr_peer **peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
269    tr_peer **ret = tr_new( tr_peer*, peerCount );
270
271    for( i=connectionCount=0; i<peerCount; ++i )
272        if( peers[i]->msgs != NULL )
273            ret[connectionCount++] = peers[i];
274
275    *setmeCount = connectionCount;
276    return ret;
277}
278
279/***
280****  Refill
281***/
282
283struct tr_refill_piece
284{
285    tr_priority_t priority;
286    uint32_t piece;
287    uint32_t peerCount;
288};
289
290static int
291compareRefillPiece (const void * aIn, const void * bIn)
292{
293    const struct tr_refill_piece * a = aIn;
294    const struct tr_refill_piece * b = bIn;
295
296    /* if one piece has a higher priority, it goes first */
297    if (a->priority != b->priority)
298        return a->priority > b->priority ? -1 : 1;
299
300    /* otherwise if one has fewer peers, it goes first */
301    if (a->peerCount != b->peerCount)
302        return a->peerCount < b->peerCount ? -1 : 1;
303
304    /* otherwise go with the earlier piece */
305    return a->piece - b->piece;
306}
307
308static int
309isPieceInteresting( const tr_torrent  * tor,
310                    int                 piece )
311{
312    if( tor->info.pieces[piece].dnd ) /* we don't want it */
313        return 0;
314
315    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
316        return 0;
317
318    return 1;
319}
320
321static uint32_t*
322getPreferredPieces( Torrent     * t,
323                    uint32_t    * pieceCount )
324{
325    const tr_torrent * tor = t->tor;
326    const tr_info * inf = &tor->info;
327
328    int i;
329    uint32_t poolSize = 0;
330    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
331
332    int peerCount;
333    tr_peer** peers = getConnectedPeers( t, &peerCount );
334
335    for( i=0; i<inf->pieceCount; ++i )
336        if( isPieceInteresting( tor, i ) )
337            pool[poolSize++] = i;
338
339    /* sort the pool from most interesting to least... */
340    if( poolSize > 1 )
341    {
342        uint32_t j;
343        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
344
345        for( j=0; j<poolSize; ++j )
346        {
347            int k;
348            const int piece = pool[j];
349            struct tr_refill_piece * setme = p + j;
350
351            setme->piece = piece;
352            setme->priority = inf->pieces[piece].priority;
353            setme->peerCount = 0;
354
355            for( k=0; k<peerCount; ++k ) {
356                const tr_peer * peer = peers[k];
357                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
358                    ++setme->peerCount;
359            }
360        }
361
362        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
363
364        for( j=0; j<poolSize; ++j )
365            pool[j] = p[j].piece;
366
367        tr_free( p );
368    }
369
370#if 0
371fprintf (stderr, "new pool: ");
372for (i=0; i<15 && i<(int)poolSize; ++i ) fprintf (stderr, "%d, ", (int)pool[i] );
373fprintf (stderr, "\n");
374#endif
375    tr_free( peers );
376
377    *pieceCount = poolSize;
378    return pool;
379}
380
381static uint64_t*
382getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
383{
384    uint32_t i;
385    uint32_t pieceCount;
386    uint32_t * pieces;
387    uint64_t *req, *unreq, *ret, *walk;
388    int reqCount, unreqCount;
389    const tr_torrent * tor = t->tor;
390
391    pieces = getPreferredPieces( t, &pieceCount );
392/*fprintf( stderr, "REFILL refillPulse for {%s} got %d of %d pieces\n", tor->info.name, (int)pieceCount, t->tor->info.pieceCount );*/
393
394    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
395    reqCount = 0;
396    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
397    unreqCount = 0;
398
399    for( i=0; i<pieceCount; ++i ) {
400        const uint32_t index = pieces[i];
401        const int begin = tr_torPieceFirstBlock( tor, index );
402        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
403        int block;
404        for( block=begin; block<end; ++block )
405            if( tr_cpBlockIsComplete( tor->completion, block ) )
406                continue;
407            else if( tr_bitfieldHas( t->requested, block ) )
408                req[reqCount++] = block;
409            else
410                unreq[unreqCount++] = block;
411    }
412
413/*fprintf( stderr, "REFILL refillPulse for {%s} reqCount is %d, unreqCount is %d\n", tor->info.name, (int)reqCount, (int)unreqCount );*/
414    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
415    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
416    walk += unreqCount;
417    memcpy( walk, req, sizeof(uint64_t) * reqCount );
418    walk += reqCount;
419    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
420    *setmeCount = walk - ret;
421
422    tr_free( req );
423    tr_free( unreq );
424    tr_free( pieces );
425
426    return ret;
427}
428
429static int
430refillPulse( void * vtorrent )
431{
432    Torrent * t = vtorrent;
433    tr_torrent * tor = t->tor;
434    uint32_t i;
435    int peerCount;
436    tr_peer ** peers;
437    uint64_t blockCount;
438    uint64_t * blocks;
439
440    if( !t->isRunning )
441        return TRUE;
442    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
443        return TRUE;
444
445    blocks = getPreferredBlocks( t, &blockCount );
446    peers = getConnectedPeers( t, &peerCount );
447
448/*fprintf( stderr, "REFILL refillPulse for {%s} got %d blocks\n", tor->info.name, (int)blockCount );*/
449
450    for( i=0; peerCount && i<blockCount; ++i )
451    {
452        const int block = blocks[i];
453        const uint32_t index = tr_torBlockPiece( tor, block );
454        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
455        const uint32_t length = tr_torBlockCountBytes( tor, block );
456        int j;
457        assert( _tr_block( tor, index, begin ) == block );
458        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
459        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
460
461
462        /* find a peer who can ask for this block */
463        for( j=0; j<peerCount; )
464        {
465            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
466            switch( val )
467            {
468                case TR_ADDREQ_FULL: 
469                case TR_ADDREQ_CLIENT_CHOKED:
470                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
471                    break;
472
473                case TR_ADDREQ_MISSING: 
474                    ++j;
475                    break;
476
477                case TR_ADDREQ_OK:
478                    /*fprintf( stderr, "REFILL peer %p took the request for block %d\n", peers[j]->msgs, block );*/
479                    tr_bitfieldAdd( t->requested, block );
480                    j = peerCount;
481                    break;
482
483                default:
484                    assert( 0 && "unhandled value" );
485                    break;
486            }
487        }
488    }
489
490    /* cleanup */
491    tr_free( peers );
492    tr_free( blocks );
493
494    t->refillTimer = NULL;
495    return FALSE;
496}
497
498static void
499broadcastClientHave( Torrent * t, uint32_t index )
500{
501    int i, size;
502    tr_peer ** peers = getConnectedPeers( t, &size );
503    for( i=0; i<size; ++i )
504        tr_peerMsgsHave( peers[i]->msgs, index );
505    tr_free( peers );
506}
507
508static void
509broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
510{
511    int i, size;
512    tr_peer ** peers = getConnectedPeers( t, &size );
513    for( i=0; i<size; ++i )
514        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
515    tr_free( peers );
516}
517
518static void
519msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
520{
521    tr_peer * peer = vpeer;
522    Torrent * t = (Torrent *) vt;
523    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
524
525    switch( e->eventType )
526    {
527        case TR_PEERMSG_NEED_REQ:
528            if( t->refillTimer == NULL )
529                t->refillTimer = tr_timerNew( t->manager->handle,
530                                              refillPulse, t,
531                                              REFILL_PERIOD_MSEC );
532            break;
533
534        case TR_PEERMSG_CLIENT_HAVE:
535            broadcastClientHave( t, e->pieceIndex );
536            break;
537
538        case TR_PEERMSG_PEER_PROGRESS: { /* if we're both seeds, then disconnect. */
539            const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
540            const int peerIsSeed = e->progress >= 1.0;
541            if( clientIsSeed && peerIsSeed ) {
542                fprintf( stderr, "DISCONNECTING FROM PEER because both of us are seeds\n" );
543                peer->doDisconnect = 1;
544            }
545            break;
546        }
547
548        case TR_PEERMSG_CLIENT_BLOCK:
549            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
550            break;
551
552        case TR_PEERMSG_GOT_ERROR:
553            peer->doDisconnect = 1;
554            break;
555
556        default:
557            assert(0);
558    }
559}
560
561static void
562myHandshakeDoneCB( tr_handshake    * handshake,
563                   tr_peerIo       * io,
564                   int               isConnected,
565                   const uint8_t   * peer_id,
566                   int               peerSupportsEncryption,
567                   void            * vmanager )
568{
569    int ok = isConnected;
570    uint16_t port;
571    const struct in_addr * in_addr;
572    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
573    const uint8_t * hash = NULL;
574    Torrent * t;
575    tr_handshake * ours;
576
577    assert( io != NULL );
578    assert( isConnected==0 || isConnected==1 );
579    assert( peerSupportsEncryption==0 || peerSupportsEncryption==1 );
580
581    ours = tr_ptrArrayRemoveSorted( manager->handshakes,
582                                    handshake,
583                                    handshakeCompare );
584    assert( ours != NULL );
585    assert( ours == handshake );
586
587    in_addr = tr_peerIoGetAddress( io, &port );
588
589    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
590    {
591        tr_peerIoFree( io );
592        --manager->connectionCount;
593        return;
594    }
595
596    hash = tr_peerIoGetTorrentHash( io );
597    t = getExistingTorrent( manager, hash );
598    if( !t || !t->isRunning )
599    {
600        tr_peerIoFree( io );
601        --manager->connectionCount;
602        return;
603    }
604
605    fprintf( stderr, "peer-mgr: torrent [%s] finished a handshake. Connected? %s.\n", t->tor->info.name, (isConnected?"yes":"no") );
606
607    /* if we couldn't connect or were snubbed,
608     * the peer's probably not worth remembering. */
609    if( !ok ) {
610        tr_peer * peer = getExistingPeer( t, in_addr );
611        tr_peerIoFree( io );
612        --manager->connectionCount;
613        if( peer ) {
614            tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
615            freePeer( peer );
616        }
617        return;
618    }
619
620    if( 1 ) {
621        tr_peer * peer = getPeer( t, in_addr, NULL );
622        if( peer->msgs != NULL ) { /* we already have this peer */
623            tr_peerIoFree( io );
624            --manager->connectionCount;
625        } else {
626            peer->port = port;
627            peer->io = io;
628            peer->msgs = tr_peerMsgsNew( t->tor, peer );
629            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
630            peer->peerSupportsEncryption = peerSupportsEncryption ? 1 : 0;
631            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
632        }
633    }
634}
635
636static void
637initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
638{
639    tr_handshake * handshake = tr_handshakeNew( io,
640                                                manager->handle->encryptionMode,
641                                                myHandshakeDoneCB,
642                                                manager );
643    ++manager->connectionCount;
644
645    tr_ptrArrayInsertSorted( manager->handshakes, handshake, handshakeCompare );
646}
647
648void
649tr_peerMgrAddIncoming( tr_peerMgr      * manager,
650                       struct in_addr  * addr,
651                       int               socket )
652{
653    if( getExistingHandshake( manager, addr ) == NULL )
654    {
655        tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, socket );
656        initiateHandshake( manager, io );
657    }
658}
659
660static void
661maybeConnect( tr_peerMgr * manager, Torrent * t, tr_peer * peer )
662{
663    tr_peerIo * io;
664
665    assert( manager != NULL );
666    assert( t != NULL );
667    assert( peer != NULL );
668
669    if( peer->io != NULL ) { /* already connected */
670        fprintf( stderr, "not connecting because we already have an IO for that address\n" );
671        return;
672    }
673    if( !t->isRunning ) { /* torrent's not running */
674        fprintf( stderr, "OUTGOING connection not being made because t [%s] is not running\n", t->tor->info.name );
675        return;
676    }
677    if( getExistingHandshake( manager, &peer->in_addr ) != NULL ) { /* already have a handshake pending */
678        fprintf( stderr, "already have a handshake for that address\n" );
679        return;
680    }
681
682    io = tr_peerIoNewOutgoing( manager->handle,
683                               &peer->in_addr,
684                               peer->port,
685                               t->hash );
686    initiateHandshake( manager, io );
687}
688
689void
690tr_peerMgrAddPex( tr_peerMgr     * manager,
691                  const uint8_t  * torrentHash,
692                  int              from,
693                  const tr_pex   * pex,
694                  int              pexCount )
695{
696    Torrent * t = getExistingTorrent( manager, torrentHash );
697    const tr_pex * end = pex + pexCount;
698    while( pex != end )
699    {
700        int isNew;
701        tr_peer * peer = getPeer( t, &pex->in_addr, &isNew );
702        if( isNew ) {
703            peer->port = pex->port;
704            peer->from = from;
705            maybeConnect( manager, t, peer );
706        }
707        ++pex;
708    }
709}
710
711void
712tr_peerMgrAddPeers( tr_peerMgr    * manager,
713                    const uint8_t * torrentHash,
714                    int             from,
715                    const uint8_t * peerCompact,
716                    int             peerCount )
717{
718    int i;
719    const uint8_t * walk = peerCompact;
720    Torrent * t = getExistingTorrent( manager, torrentHash );
721    for( i=0; t!=NULL && i<peerCount; ++i )
722    {
723        int isNew;
724        tr_peer * peer;
725        struct in_addr addr;
726        uint16_t port;
727        memcpy( &addr, walk, 4 ); walk += 4;
728        memcpy( &port, walk, 2 ); walk += 2;
729        peer = getPeer( t, &addr, &isNew );
730        if( isNew ) {
731            peer->port = port;
732            peer->from = from;
733            maybeConnect( manager, t, peer );
734        }
735    }
736}
737
738/**
739***
740**/
741
742int
743tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager UNUSED )
744{
745    return TRUE; /* manager->connectionCount < MAX_CONNECTED_PEERS; */
746}
747
748void
749tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
750                    const uint8_t  * torrentHash UNUSED,
751                    int              pieceIndex UNUSED,
752                    int              success UNUSED )
753{
754    fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );
755}
756
757int
758tr_pexCompare( const void * va, const void * vb )
759{
760    const tr_pex * a = (const tr_pex *) va;
761    const tr_pex * b = (const tr_pex *) vb;
762    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
763    if( i ) return i;
764    if( a->port < b->port ) return -1;
765    if( a->port > b->port ) return 1;
766    return 0;
767}
768
769int tr_pexCompare( const void * a, const void * b );
770
771
772int
773tr_peerMgrGetPeers( tr_peerMgr      * manager,
774                    const uint8_t   * torrentHash,
775                    tr_pex         ** setme_pex )
776{
777    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
778    int i, peerCount;
779    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
780    tr_pex * pex = tr_new( tr_pex, peerCount );
781    tr_pex * walk = pex;
782
783    for( i=0; i<peerCount; ++i, ++walk )
784    {
785        const tr_peer * peer = peers[i];
786
787        walk->in_addr = peer->in_addr;
788
789        walk->port = peer->port;
790
791        walk->flags = 0;
792        if( peer->peerSupportsEncryption ) walk->flags |= 1;
793        if( peer->progress >= 1.0 )        walk->flags |= 2;
794    }
795
796    assert( ( walk - pex ) == peerCount );
797    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
798    *setme_pex = pex;
799    return peerCount;
800}
801
802void
803tr_peerMgrStartTorrent( tr_peerMgr     * manager,
804                        const uint8_t  * torrentHash )
805{
806    int i, peerCount;
807    Torrent * t = getExistingTorrent( manager, torrentHash );
808
809    t->isRunning = 1;
810
811    peerCount = tr_ptrArraySize( t->peers );
812    for( i=0; i<peerCount; ++i )
813        maybeConnect( manager, t, tr_ptrArrayNth( t->peers, i ) );
814}
815
816void
817tr_peerMgrStopTorrent( tr_peerMgr     * manager,
818                       const uint8_t  * torrentHash)
819{
820    int i, peerCount;
821    Torrent * t = getExistingTorrent( manager, torrentHash );
822
823    t->isRunning = 0;
824
825    peerCount = tr_ptrArraySize( t->peers );
826    for( i=0; i<peerCount; ++i )
827        disconnectPeer( tr_ptrArrayNth( t->peers, i ) );
828}
829
830void
831tr_peerMgrAddTorrent( tr_peerMgr * manager,
832                      tr_torrent * tor )
833{
834    Torrent * t;
835
836    assert( tor != NULL );
837    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
838
839    t = tr_new0( Torrent, 1 );
840    t->manager = manager;
841    t->tor = tor;
842    t->peers = tr_ptrArrayNew( );
843    t->requested = tr_bitfieldNew( tor->blockCount );
844    t->chokeTimer = tr_timerNew( manager->handle, chokePulse, t, RECHOKE_PERIOD_SECONDS );
845
846    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
847    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
848}
849
850void
851tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
852                         const uint8_t  * torrentHash )
853{
854    Torrent * t = getExistingTorrent( manager, torrentHash );
855    assert( t != NULL );
856    tr_peerMgrStopTorrent( manager, torrentHash );
857    freeTorrent( manager, t );
858}
859
860void
861tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
862                               const uint8_t    * torrentHash,
863                               int8_t           * tab,
864                               int                tabCount )
865{
866    int i;
867    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
868    const tr_torrent * tor = t->tor;
869    const float interval = tor->info.pieceCount / (float)tabCount;
870
871    memset( tab, 0, tabCount );
872
873    for( i=0; i<tabCount; ++i )
874    {
875        const int piece = i * interval;
876
877        if( tor == NULL )
878            tab[i] = 0;
879        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
880            tab[i] = -1;
881        else {
882            int j, peerCount;
883            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
884            for( j=0; j<peerCount; ++j )
885                if( tr_bitfieldHas( peers[j]->have, i ) )
886                    ++tab[i];
887        }
888    }
889}
890
891
892void
893tr_peerMgrTorrentStats( const tr_peerMgr * manager,
894                        const uint8_t    * torrentHash,
895                        int              * setmePeersTotal,
896                        int              * setmePeersConnected,
897                        int              * setmePeersSendingToUs,
898                        int              * setmePeersGettingFromUs,
899                        int              * setmePeersFrom )
900{
901    int i, size;
902    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
903    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
904
905    *setmePeersTotal          = size;
906    *setmePeersConnected      = 0;
907    *setmePeersSendingToUs    = 0;
908    *setmePeersGettingFromUs  = 0;
909
910    for( i=0; i<TR_PEER_FROM__MAX; ++i )
911        setmePeersFrom[i] = 0;
912
913    for( i=0; i<size; ++i )
914    {
915        const tr_peer * peer = peers[i];
916
917        if( peer->io == NULL ) /* not connected */
918            continue;
919
920        ++*setmePeersConnected;
921
922        ++setmePeersFrom[peer->from];
923
924        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
925            ++*setmePeersGettingFromUs;
926
927        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
928            ++*setmePeersSendingToUs;
929    }
930}
931
932struct tr_peer_stat *
933tr_peerMgrPeerStats( const tr_peerMgr  * manager,
934                     const uint8_t     * torrentHash,
935                     int               * setmeCount UNUSED )
936{
937    int i, size;
938    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
939    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
940    tr_peer_stat * ret;
941
942    ret = tr_new0( tr_peer_stat, size );
943
944    for( i=0; i<size; ++i )
945    {
946        const tr_peer * peer = peers[i];
947        const int live = peer->io != NULL;
948        tr_peer_stat * stat = ret + i;
949
950        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
951        stat->port             = peer->port;
952        stat->from             = peer->from;
953        stat->client           = peer->client;
954        stat->progress         = peer->progress;
955        stat->isConnected      = live ? 1 : 0;
956        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
957        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
958        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
959        stat->isDownloading    = stat->uploadToRate > 0.01;
960        stat->isUploading      = stat->downloadFromRate > 0.01;
961    }
962
963    *setmeCount = size;
964    return ret;
965}
966
967/**
968***
969**/
970
971typedef struct
972{
973    tr_peer * peer;
974    float rate;
975    int randomKey;
976    int preferred;
977    int doUnchoke;
978}
979ChokeData;
980
981static int
982compareChoke( const void * va, const void * vb )
983{
984    const ChokeData * a = ( const ChokeData * ) va;
985    const ChokeData * b = ( const ChokeData * ) vb;
986
987    if( a->preferred != b->preferred )
988        return a->preferred ? -1 : 1;
989
990    if( a->preferred )
991    {
992        if( a->rate > b->rate ) return -1;
993        if( a->rate < b->rate ) return 1;
994        return 0;
995    }
996    else
997    {
998        return a->randomKey - b->randomKey;
999    }
1000}
1001
1002static int
1003clientIsSnubbedBy( const tr_peer * peer )
1004{
1005    assert( peer != NULL );
1006
1007    return peer->peerSentDataAt < (time(NULL) - 30);
1008}
1009
1010static void
1011rechokeLeech( Torrent * t )
1012{
1013    int i, size, unchoked=0;
1014    tr_peer ** peers = getConnectedPeers( t, &size );
1015    ChokeData * choke = tr_new0( ChokeData, size );
1016
1017    /* sort the peers by preference and rate */
1018    for( i=0; i<size; ++i ) {
1019        tr_peer * peer = peers[i];
1020        ChokeData * node = &choke[i];
1021        node->peer = peer;
1022        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1023        node->randomKey = tr_rand( INT_MAX );
1024        node->rate = tr_peerIoGetRateToClient( peer->io );
1025    }
1026    qsort( choke, size, sizeof(ChokeData), compareChoke );
1027
1028    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1029        choke[i].doUnchoke = 1;
1030        ++unchoked;
1031    }
1032
1033    for( ; i<size; ++i ) {
1034        choke[i].doUnchoke = 1;
1035        ++unchoked;
1036        if( choke[i].peer->peerIsInterested )
1037            break;
1038    }
1039
1040    for( i=0; i<size; ++i )
1041        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1042
1043    /* cleanup */
1044    tr_free( choke );
1045    tr_free( peers );
1046}
1047
1048static void
1049rechokeSeed( Torrent * t )
1050{
1051    int i, size;
1052    tr_peer ** peers = getConnectedPeers( t, &size );
1053
1054    /* FIXME */
1055    for( i=0; i<size; ++i )
1056        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1057
1058    tr_free( peers );
1059}
1060
1061static int
1062chokePulse( void * vtorrent )
1063{
1064    Torrent * t = vtorrent;
1065    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1066    if( done )
1067        rechokeLeech( vtorrent );
1068    else
1069        rechokeSeed( vtorrent );
1070    return TRUE;
1071}
Note: See TracBrowser for help on using the repository browser.