source: trunk/libtransmission/peer-mgr.c @ 3147

Last change on this file since 3147 was 3147, checked in by charles, 15 years ago
  • when a torrent's done downloading, have it switch to seeding mode.
  • tweak the gtk+ client's torrent inspector's display of transfer rates
  • Property svn:keywords set to Date Rev Author Id
File size: 34.7 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3147 2007-09-23 13:53:44Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include "transmission.h"
20#include "clients.h"
21#include "completion.h"
22#include "handshake.h"
23#include "net.h"
24#include "peer-io.h"
25#include "peer-mgr.h"
26#include "peer-mgr-private.h"
27#include "peer-msgs.h"
28#include "ptrarray.h"
29#include "trevent.h"
30#include "utils.h"
31
32enum
33{
34    /* how frequently to change which peers are choked */
35    RECHOKE_PERIOD_MSEC = (15 * 1000),
36
37    /* how frequently to decide which peers live and die */
38    RECONNECT_PERIOD_MSEC = (15 * 1000),
39
40    /* how frequently to refill peers' request lists */
41    REFILL_PERIOD_MSEC = 1000,
42
43    /* how many peers to unchoke per-torrent. */
44    /* FIXME: make this user-configurable? */
45    NUM_UNCHOKED_PEERS_PER_TORRENT = 8,
46
47    /* don't change a peer's choke status more often than this */
48    MIN_CHOKE_PERIOD_SEC = 10,
49
50    /* how soon is `soon' in the rechokeSoon, reconnecSoon funcs */
51    SOON_MSEC = 1000,
52
53    /* following the BT spec, we consider ourselves `snubbed' if
54     * we're we don't get piece data from a peer in this long */
55    SNUBBED_SEC = 60,
56
57    /* if our connection count for a torrent is <= N% of what we wanted,
58     * start relaxing the rules that decide when to disconnect a peer */
59    RELAX_RULES_PERCENTAGE = 25,
60
61    /* if we're not relaxing the rules, disconnect a peer that hasn't
62     * given us anything (or taken, if we're seeding) in this long */
63    MIN_TRANSFER_IDLE = 90000,
64
65    /* even if we're relaxing the rules, disconnect a peer that hasn't
66     * given us anything (or taken, if we're seeding) in this long */
67    MAX_TRANSFER_IDLE = 240000,
68
69    /* this is arbitrary and, hopefully, temporary until we come up
70     * with a better idea for managing the connection limits */
71    MAX_CONNECTED_PEERS_PER_TORRENT = 60,
72
73    /* if we hang up on a peer for being worthless, don't try to
74     * reconnect to it for this long. */
75    MIN_HANGUP_PERIOD_SEC = 120
76};
77
78/**
79***
80**/
81
82typedef struct
83{
84    uint8_t hash[SHA_DIGEST_LENGTH];
85    tr_ptrArray * peers; /* tr_peer */
86    tr_timer * reconnectTimer;
87    tr_timer * reconnectSoonTimer;
88    tr_timer * rechokeTimer;
89    tr_timer * rechokeSoonTimer;
90    tr_timer * refillTimer;
91    tr_torrent * tor;
92    tr_bitfield * requested;
93
94    unsigned int isRunning : 1;
95
96    struct tr_peerMgr * manager;
97}
98Torrent;
99
100struct tr_peerMgr
101{
102    tr_handle * handle;
103    tr_ptrArray * torrents; /* Torrent */
104    int connectionCount;
105    tr_ptrArray * handshakes; /* in-process */
106};
107
108/**
109***
110**/
111
112static int
113handshakeCompareToAddr( const void * va, const void * vb )
114{
115    const tr_handshake * a = va;
116    const struct in_addr * b = vb;
117    return memcmp( tr_handshakeGetAddr( a, NULL ), b, sizeof( struct in_addr ) );
118}
119
120static int
121handshakeCompare( const void * a, const void * b )
122{
123    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
124}
125
126static tr_handshake*
127getExistingHandshake( tr_peerMgr * mgr, const struct in_addr * in_addr )
128{
129    return tr_ptrArrayFindSorted( mgr->handshakes,
130                                  in_addr,
131                                  handshakeCompareToAddr );
132}
133
134/**
135***
136**/
137
138static int
139torrentCompare( const void * va, const void * vb )
140{
141    const Torrent * a = va;
142    const Torrent * b = vb;
143    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
144}
145
146static int
147torrentCompareToHash( const void * va, const void * vb )
148{
149    const Torrent * a = (const Torrent*) va;
150    const uint8_t * b_hash = (const uint8_t*) vb;
151    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
152}
153
154static Torrent*
155getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
156{
157    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
158                                             hash,
159                                             torrentCompareToHash );
160}
161
162static int
163peerCompare( const void * va, const void * vb )
164{
165    const tr_peer * a = (const tr_peer *) va;
166    const tr_peer * b = (const tr_peer *) vb;
167    return memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
168}
169
170static int
171peerCompareToAddr( const void * va, const void * vb )
172{
173    const tr_peer * a = (const tr_peer *) va;
174    const struct in_addr * b = (const struct in_addr *) vb;
175    return memcmp( &a->in_addr, b, sizeof(struct in_addr) );
176}
177
178static tr_peer*
179getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
180{
181    assert( torrent != NULL );
182    assert( torrent->peers != NULL );
183    assert( in_addr != NULL );
184
185    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
186                                             in_addr,
187                                             peerCompareToAddr );
188}
189
190static tr_peer*
191getPeer( Torrent * torrent, const struct in_addr * in_addr, int * isNew )
192{
193    tr_peer * peer = getExistingPeer( torrent, in_addr );
194
195    if( isNew )
196        *isNew = peer == NULL;
197
198    if( peer == NULL )
199    {
200        peer = tr_new0( tr_peer, 1 );
201        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
202        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
203    }
204
205    return peer;
206}
207
208static void
209disconnectPeer( tr_peer * peer )
210{
211    assert( peer != NULL );
212
213    tr_peerIoFree( peer->io );
214    peer->io = NULL;
215
216    if( peer->msgs != NULL )
217    {
218        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
219        tr_peerMsgsFree( peer->msgs );
220        peer->msgs = NULL;
221    }
222
223    tr_bitfieldFree( peer->have );
224    peer->have = NULL;
225
226    tr_bitfieldFree( peer->blame );
227    peer->blame = NULL;
228
229    tr_bitfieldFree( peer->banned );
230    peer->banned = NULL;
231}
232
233static void
234freePeer( tr_peer * peer )
235{
236    disconnectPeer( peer );
237    tr_free( peer->client );
238    tr_free( peer );
239}
240
241static void
242freeTorrent( tr_peerMgr * manager, Torrent * t )
243{
244    int i, size;
245    tr_peer ** peers;
246    uint8_t hash[SHA_DIGEST_LENGTH];
247
248    assert( manager != NULL );
249    assert( t != NULL );
250    assert( t->peers != NULL );
251    assert( getExistingTorrent( manager, t->hash ) != NULL );
252
253    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
254
255    tr_timerFree( &t->reconnectTimer );
256    tr_timerFree( &t->reconnectSoonTimer );
257    tr_timerFree( &t->rechokeTimer );
258    tr_timerFree( &t->rechokeSoonTimer );
259    tr_timerFree( &t->refillTimer );
260
261    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
262    for( i=0; i<size; ++i )
263        freePeer( peers[i] );
264
265    tr_bitfieldFree( t->requested );
266    tr_ptrArrayFree( t->peers );
267    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
268    tr_free( t );
269
270    assert( getExistingTorrent( manager, hash ) == NULL );
271}
272
273/**
274***
275**/
276
277tr_peerMgr*
278tr_peerMgrNew( tr_handle * handle )
279{
280    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
281    m->handle = handle;
282    m->torrents = tr_ptrArrayNew( );
283    m->handshakes = tr_ptrArrayNew( );
284    return m;
285}
286
287void
288tr_peerMgrFree( tr_peerMgr * manager )
289{
290    while( !tr_ptrArrayEmpty( manager->handshakes ) )
291        tr_handshakeAbort( (tr_handshake*)tr_ptrArrayNth( manager->handshakes, 0) );
292    tr_ptrArrayFree( manager->handshakes );
293
294    while( !tr_ptrArrayEmpty( manager->torrents ) )
295        freeTorrent( manager, (Torrent*)tr_ptrArrayNth( manager->torrents, 0) );
296    tr_ptrArrayFree( manager->torrents );
297
298    tr_free( manager );
299}
300
301static tr_peer**
302getConnectedPeers( Torrent * t, int * setmeCount )
303{
304    int i, peerCount, connectionCount;
305    tr_peer **peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
306    tr_peer **ret = tr_new( tr_peer*, peerCount );
307
308    for( i=connectionCount=0; i<peerCount; ++i )
309        if( peers[i]->msgs != NULL )
310            ret[connectionCount++] = peers[i];
311
312    *setmeCount = connectionCount;
313    return ret;
314}
315
316/***
317****  Refill
318***/
319
320struct tr_refill_piece
321{
322    tr_priority_t priority;
323    uint32_t piece;
324    uint32_t peerCount;
325};
326
327static int
328compareRefillPiece (const void * aIn, const void * bIn)
329{
330    const struct tr_refill_piece * a = aIn;
331    const struct tr_refill_piece * b = bIn;
332
333    /* if one piece has a higher priority, it goes first */
334    if (a->priority != b->priority)
335        return a->priority > b->priority ? -1 : 1;
336
337    /* otherwise if one has fewer peers, it goes first */
338    if (a->peerCount != b->peerCount)
339        return a->peerCount < b->peerCount ? -1 : 1;
340
341    /* otherwise go with the earlier piece */
342    return a->piece - b->piece;
343}
344
345static int
346isPieceInteresting( const tr_torrent  * tor,
347                    int                 piece )
348{
349    if( tor->info.pieces[piece].dnd ) /* we don't want it */
350        return 0;
351
352    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
353        return 0;
354
355    return 1;
356}
357
358static uint32_t*
359getPreferredPieces( Torrent     * t,
360                    uint32_t    * pieceCount )
361{
362    const tr_torrent * tor = t->tor;
363    const tr_info * inf = &tor->info;
364
365    int i;
366    uint32_t poolSize = 0;
367    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
368
369    int peerCount;
370    tr_peer** peers = getConnectedPeers( t, &peerCount );
371
372    for( i=0; i<inf->pieceCount; ++i )
373        if( isPieceInteresting( tor, i ) )
374            pool[poolSize++] = i;
375
376    /* sort the pool from most interesting to least... */
377    if( poolSize > 1 )
378    {
379        uint32_t j;
380        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
381
382        for( j=0; j<poolSize; ++j )
383        {
384            int k;
385            const int piece = pool[j];
386            struct tr_refill_piece * setme = p + j;
387
388            setme->piece = piece;
389            setme->priority = inf->pieces[piece].priority;
390            setme->peerCount = 0;
391
392            for( k=0; k<peerCount; ++k ) {
393                const tr_peer * peer = peers[k];
394                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
395                    ++setme->peerCount;
396            }
397        }
398
399        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
400
401        for( j=0; j<poolSize; ++j )
402            pool[j] = p[j].piece;
403
404        tr_free( p );
405    }
406
407#if 0
408fprintf (stderr, "new pool: ");
409for (i=0; i<15 && i<(int)poolSize; ++i ) fprintf (stderr, "%d, ", (int)pool[i] );
410fprintf (stderr, "\n");
411#endif
412    tr_free( peers );
413
414    *pieceCount = poolSize;
415    return pool;
416}
417
418static uint64_t*
419getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
420{
421    uint32_t i;
422    uint32_t pieceCount;
423    uint32_t * pieces;
424    uint64_t *req, *unreq, *ret, *walk;
425    int reqCount, unreqCount;
426    const tr_torrent * tor = t->tor;
427
428    pieces = getPreferredPieces( t, &pieceCount );
429/*fprintf( stderr, "REFILL refillPulse for {%s} got %d of %d pieces\n", tor->info.name, (int)pieceCount, t->tor->info.pieceCount );*/
430
431    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
432    reqCount = 0;
433    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
434    unreqCount = 0;
435
436    for( i=0; i<pieceCount; ++i ) {
437        const uint32_t index = pieces[i];
438        const int begin = tr_torPieceFirstBlock( tor, index );
439        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
440        int block;
441        for( block=begin; block<end; ++block )
442            if( tr_cpBlockIsComplete( tor->completion, block ) )
443                continue;
444            else if( tr_bitfieldHas( t->requested, block ) )
445                req[reqCount++] = block;
446            else
447                unreq[unreqCount++] = block;
448    }
449
450/*fprintf( stderr, "REFILL refillPulse for {%s} reqCount is %d, unreqCount is %d\n", tor->info.name, (int)reqCount, (int)unreqCount );*/
451    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
452    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
453    walk += unreqCount;
454    memcpy( walk, req, sizeof(uint64_t) * reqCount );
455    walk += reqCount;
456    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
457    *setmeCount = walk - ret;
458
459    tr_free( req );
460    tr_free( unreq );
461    tr_free( pieces );
462
463    return ret;
464}
465
466static int
467refillPulse( void * vtorrent )
468{
469    Torrent * t = vtorrent;
470    tr_torrent * tor = t->tor;
471    uint32_t i;
472    int peerCount;
473    tr_peer ** peers;
474    uint64_t blockCount;
475    uint64_t * blocks;
476
477    if( !t->isRunning )
478        return TRUE;
479    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
480        return TRUE;
481
482    blocks = getPreferredBlocks( t, &blockCount );
483    peers = getConnectedPeers( t, &peerCount );
484
485/*fprintf( stderr, "REFILL refillPulse for {%s} got %d blocks\n", tor->info.name, (int)blockCount );*/
486
487    for( i=0; peerCount && i<blockCount; ++i )
488    {
489        const int block = blocks[i];
490        const uint32_t index = tr_torBlockPiece( tor, block );
491        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
492        const uint32_t length = tr_torBlockCountBytes( tor, block );
493        int j;
494        assert( _tr_block( tor, index, begin ) == block );
495        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
496        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
497
498
499        /* find a peer who can ask for this block */
500        for( j=0; j<peerCount; )
501        {
502            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
503            switch( val )
504            {
505                case TR_ADDREQ_FULL: 
506                case TR_ADDREQ_CLIENT_CHOKED:
507                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
508                    break;
509
510                case TR_ADDREQ_MISSING: 
511                    ++j;
512                    break;
513
514                case TR_ADDREQ_OK:
515                    /*fprintf( stderr, "REFILL peer %p took the request for block %d\n", peers[j]->msgs, block );*/
516                    tr_bitfieldAdd( t->requested, block );
517                    j = peerCount;
518                    break;
519
520                default:
521                    assert( 0 && "unhandled value" );
522                    break;
523            }
524        }
525    }
526
527    /* cleanup */
528    tr_free( peers );
529    tr_free( blocks );
530
531    t->refillTimer = NULL;
532    return FALSE;
533}
534
535static void
536broadcastClientHave( Torrent * t, uint32_t index )
537{
538    int i, size;
539    tr_peer ** peers = getConnectedPeers( t, &size );
540    for( i=0; i<size; ++i )
541        tr_peerMsgsHave( peers[i]->msgs, index );
542    tr_free( peers );
543}
544
545static void
546broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
547{
548    int i, size;
549    tr_peer ** peers = getConnectedPeers( t, &size );
550    for( i=0; i<size; ++i )
551        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
552    tr_free( peers );
553}
554
555/**
556***
557**/
558
559static int reconnectPulse( void * vtorrent );
560
561static void
562reconnectNow( Torrent * t )
563{
564    reconnectPulse( t );
565    tr_timerFree( &t->reconnectTimer );
566    t->reconnectTimer = tr_timerNew( t->manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC );
567}
568
569static int
570reconnectSoonCB( void * vt )
571{
572    Torrent * t = vt;
573    reconnectNow( t );
574    t->reconnectSoonTimer = NULL;
575    return FALSE;
576}
577
578static void
579reconnectSoon( Torrent * t )
580{
581    if( t->reconnectSoonTimer == NULL )
582        t->reconnectSoonTimer = tr_timerNew( t->manager->handle,
583                                             reconnectSoonCB, t, SOON_MSEC );
584}
585
586/**
587***
588**/
589
590static int rechokePulse( void * vtorrent );
591
592static void
593rechokeNow( Torrent * t )
594{
595    rechokePulse( t );
596    tr_timerFree( &t->rechokeTimer );
597    t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
598}
599
600static int
601rechokeSoonCB( void * vt )
602{
603    Torrent * t = vt;
604    rechokeNow( t );
605    t->rechokeSoonTimer = NULL;
606    return FALSE;
607}
608
609static void
610rechokeSoon( Torrent * t )
611{
612    if( t->rechokeSoonTimer == NULL )
613        t->rechokeSoonTimer = tr_timerNew( t->manager->handle,
614                                           rechokeSoonCB, t, SOON_MSEC );
615}
616
617static void
618msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
619{
620    tr_peer * peer = vpeer;
621    Torrent * t = (Torrent *) vt;
622    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
623
624    switch( e->eventType )
625    {
626        case TR_PEERMSG_NEED_REQ:
627            if( t->refillTimer == NULL )
628                t->refillTimer = tr_timerNew( t->manager->handle,
629                                              refillPulse, t,
630                                              REFILL_PERIOD_MSEC );
631            break;
632
633        case TR_PEERMSG_CLIENT_HAVE:
634            broadcastClientHave( t, e->pieceIndex );
635            tr_torrentRecheckCompleteness( t->tor );
636            break;
637
638        case TR_PEERMSG_PEER_PROGRESS: { /* if we're both seeds, then disconnect. */
639            const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
640            const int peerIsSeed = e->progress >= 1.0;
641            if( clientIsSeed && peerIsSeed )
642                peer->doDisconnect = 1;
643            break;
644        }
645
646        case TR_PEERMSG_CLIENT_BLOCK:
647            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
648            break;
649
650        case TR_PEERMSG_GOT_ERROR:
651            peer->doDisconnect = 1;
652            reconnectSoon( t );
653            break;
654
655        default:
656            assert(0);
657    }
658}
659
660static void
661myHandshakeDoneCB( tr_handshake    * handshake,
662                   tr_peerIo       * io,
663                   int               isConnected,
664                   const uint8_t   * peer_id,
665                   int               peerSupportsEncryption,
666                   void            * vmanager )
667{
668    int ok = isConnected;
669    uint16_t port;
670    const struct in_addr * in_addr;
671    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
672    const uint8_t * hash = NULL;
673    Torrent * t;
674    tr_handshake * ours;
675
676    assert( io != NULL );
677    assert( isConnected==0 || isConnected==1 );
678    assert( peerSupportsEncryption==0 || peerSupportsEncryption==1 );
679
680    ours = tr_ptrArrayRemoveSorted( manager->handshakes,
681                                    handshake,
682                                    handshakeCompare );
683    //assert( ours != NULL );
684    //assert( ours == handshake );
685
686    in_addr = tr_peerIoGetAddress( io, &port );
687
688    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
689    {
690        tr_peerIoFree( io );
691        --manager->connectionCount;
692        return;
693    }
694
695    hash = tr_peerIoGetTorrentHash( io );
696    t = getExistingTorrent( manager, hash );
697    if( !t || !t->isRunning )
698    {
699        tr_peerIoFree( io );
700        --manager->connectionCount;
701        return;
702    }
703
704    /* if we couldn't connect or were snubbed,
705     * the peer's probably not worth remembering. */
706    if( !ok ) {
707        tr_peer * peer = getExistingPeer( t, in_addr );
708        tr_peerIoFree( io );
709        --manager->connectionCount;
710        if( peer ) {
711            tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
712            freePeer( peer );
713        }
714        return;
715    }
716
717    if( 1 ) {
718        tr_peer * peer = getPeer( t, in_addr, NULL );
719        if( peer->msgs != NULL ) { /* we already have this peer */
720            tr_peerIoFree( io );
721            --manager->connectionCount;
722        } else {
723            peer->port = port;
724            peer->io = io;
725            peer->msgs = tr_peerMsgsNew( t->tor, peer );
726            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
727            peer->peerSupportsEncryption = peerSupportsEncryption ? 1 : 0;
728            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
729            peer->connectionChangedAt = time( NULL );
730            rechokeSoon( t );
731        }
732    }
733}
734
735static void
736initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
737{
738    tr_handshake * handshake = tr_handshakeNew( io,
739                                                manager->handle->encryptionMode,
740                                                myHandshakeDoneCB,
741                                                manager );
742    ++manager->connectionCount;
743
744    tr_ptrArrayInsertSorted( manager->handshakes, handshake, handshakeCompare );
745}
746
747void
748tr_peerMgrAddIncoming( tr_peerMgr      * manager,
749                       struct in_addr  * addr,
750                       int               socket )
751{
752    if( getExistingHandshake( manager, addr ) == NULL )
753    {
754        tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, socket );
755        initiateHandshake( manager, io );
756    }
757}
758
759void
760tr_peerMgrAddPex( tr_peerMgr     * manager,
761                  const uint8_t  * torrentHash,
762                  int              from,
763                  const tr_pex   * pex,
764                  int              pexCount )
765{
766    Torrent * t = getExistingTorrent( manager, torrentHash );
767    const tr_pex * end = pex + pexCount;
768    while( pex != end )
769    {
770        int isNew;
771        tr_peer * peer = getPeer( t, &pex->in_addr, &isNew );
772        if( isNew ) {
773            peer->port = pex->port;
774            peer->from = from;
775        }
776        ++pex;
777    }
778    reconnectSoon( t );
779}
780
781void
782tr_peerMgrAddPeers( tr_peerMgr    * manager,
783                    const uint8_t * torrentHash,
784                    int             from,
785                    const uint8_t * peerCompact,
786                    int             peerCount )
787{
788    int i;
789    const uint8_t * walk = peerCompact;
790    Torrent * t = getExistingTorrent( manager, torrentHash );
791    for( i=0; t!=NULL && i<peerCount; ++i )
792    {
793        int isNew;
794        tr_peer * peer;
795        struct in_addr addr;
796        uint16_t port;
797        memcpy( &addr, walk, 4 ); walk += 4;
798        memcpy( &port, walk, 2 ); walk += 2;
799        peer = getPeer( t, &addr, &isNew );
800        if( isNew ) {
801            peer->port = port;
802            peer->from = from;
803        }
804    }
805    reconnectSoon( t );
806}
807
808/**
809***
810**/
811
812int
813tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager UNUSED )
814{
815    return TRUE; /* manager->connectionCount < MAX_CONNECTED_PEERS; */
816}
817
818void
819tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
820                    const uint8_t  * torrentHash UNUSED,
821                    int              pieceIndex UNUSED,
822                    int              success UNUSED )
823{
824    fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );
825}
826
827int
828tr_pexCompare( const void * va, const void * vb )
829{
830    const tr_pex * a = (const tr_pex *) va;
831    const tr_pex * b = (const tr_pex *) vb;
832    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
833    if( i ) return i;
834    if( a->port < b->port ) return -1;
835    if( a->port > b->port ) return 1;
836    return 0;
837}
838
839int tr_pexCompare( const void * a, const void * b );
840
841
842int
843tr_peerMgrGetPeers( tr_peerMgr      * manager,
844                    const uint8_t   * torrentHash,
845                    tr_pex         ** setme_pex )
846{
847    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
848    int i, peerCount;
849    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
850    tr_pex * pex = tr_new( tr_pex, peerCount );
851    tr_pex * walk = pex;
852
853    for( i=0; i<peerCount; ++i, ++walk )
854    {
855        const tr_peer * peer = peers[i];
856
857        walk->in_addr = peer->in_addr;
858
859        walk->port = peer->port;
860
861        walk->flags = 0;
862        if( peer->peerSupportsEncryption ) walk->flags |= 1;
863        if( peer->progress >= 1.0 )        walk->flags |= 2;
864    }
865
866    assert( ( walk - pex ) == peerCount );
867    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
868    *setme_pex = pex;
869    return peerCount;
870}
871
872void
873tr_peerMgrStartTorrent( tr_peerMgr     * manager,
874                        const uint8_t  * torrentHash )
875{
876    Torrent * t = getExistingTorrent( manager, torrentHash );
877    t->isRunning = 1;
878    reconnectNow( t );
879}
880
881void
882tr_peerMgrStopTorrent( tr_peerMgr     * manager,
883                       const uint8_t  * torrentHash)
884{
885    Torrent * t = getExistingTorrent( manager, torrentHash );
886    t->isRunning = 0;
887    reconnectNow( t );
888}
889
890void
891tr_peerMgrAddTorrent( tr_peerMgr * manager,
892                      tr_torrent * tor )
893{
894    Torrent * t;
895
896    assert( tor != NULL );
897    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
898
899    t = tr_new0( Torrent, 1 );
900    t->manager = manager;
901    t->tor = tor;
902    t->peers = tr_ptrArrayNew( );
903    t->requested = tr_bitfieldNew( tor->blockCount );
904    t->rechokeTimer = tr_timerNew( manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
905    t->reconnectTimer = tr_timerNew( manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC );
906
907    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
908    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
909}
910
911void
912tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
913                         const uint8_t  * torrentHash )
914{
915    Torrent * t = getExistingTorrent( manager, torrentHash );
916    assert( t != NULL );
917    tr_peerMgrStopTorrent( manager, torrentHash );
918    freeTorrent( manager, t );
919}
920
921void
922tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
923                               const uint8_t    * torrentHash,
924                               int8_t           * tab,
925                               int                tabCount )
926{
927    int i;
928    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
929    const tr_torrent * tor = t->tor;
930    const float interval = tor->info.pieceCount / (float)tabCount;
931
932    memset( tab, 0, tabCount );
933
934    for( i=0; i<tabCount; ++i )
935    {
936        const int piece = i * interval;
937
938        if( tor == NULL )
939            tab[i] = 0;
940        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
941            tab[i] = -1;
942        else {
943            int j, peerCount;
944            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
945            for( j=0; j<peerCount; ++j )
946                if( tr_bitfieldHas( peers[j]->have, i ) )
947                    ++tab[i];
948        }
949    }
950}
951
952
953void
954tr_peerMgrTorrentStats( const tr_peerMgr * manager,
955                        const uint8_t    * torrentHash,
956                        int              * setmePeersTotal,
957                        int              * setmePeersConnected,
958                        int              * setmePeersSendingToUs,
959                        int              * setmePeersGettingFromUs,
960                        int              * setmePeersFrom )
961{
962    int i, size;
963    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
964    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
965
966    *setmePeersTotal          = size;
967    *setmePeersConnected      = 0;
968    *setmePeersSendingToUs    = 0;
969    *setmePeersGettingFromUs  = 0;
970
971    for( i=0; i<TR_PEER_FROM__MAX; ++i )
972        setmePeersFrom[i] = 0;
973
974    for( i=0; i<size; ++i )
975    {
976        const tr_peer * peer = peers[i];
977
978        if( peer->io == NULL ) /* not connected */
979            continue;
980
981        ++*setmePeersConnected;
982
983        ++setmePeersFrom[peer->from];
984
985        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
986            ++*setmePeersGettingFromUs;
987
988        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
989            ++*setmePeersSendingToUs;
990    }
991}
992
993struct tr_peer_stat *
994tr_peerMgrPeerStats( const tr_peerMgr  * manager,
995                     const uint8_t     * torrentHash,
996                     int               * setmeCount UNUSED )
997{
998    int i, size;
999    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1000    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1001    tr_peer_stat * ret;
1002
1003    ret = tr_new0( tr_peer_stat, size );
1004
1005    for( i=0; i<size; ++i )
1006    {
1007        const tr_peer * peer = peers[i];
1008        const int live = peer->io != NULL;
1009        tr_peer_stat * stat = ret + i;
1010
1011        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1012        stat->port             = peer->port;
1013        stat->from             = peer->from;
1014        stat->client           = peer->client;
1015        stat->progress         = peer->progress;
1016        stat->isConnected      = live ? 1 : 0;
1017        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1018        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
1019        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
1020        stat->isDownloading    = stat->uploadToRate > 0.01;
1021        stat->isUploading      = stat->downloadFromRate > 0.01;
1022    }
1023
1024    *setmeCount = size;
1025    return ret;
1026}
1027
1028/**
1029***
1030**/
1031
1032typedef struct
1033{
1034    tr_peer * peer;
1035    float rate;
1036    int randomKey;
1037    int preferred;
1038    int doUnchoke;
1039}
1040ChokeData;
1041
1042static int
1043compareChoke( const void * va, const void * vb )
1044{
1045    const ChokeData * a = ( const ChokeData * ) va;
1046    const ChokeData * b = ( const ChokeData * ) vb;
1047
1048    if( a->preferred != b->preferred )
1049        return a->preferred ? -1 : 1;
1050
1051    if( a->preferred )
1052    {
1053        if( a->rate > b->rate ) return -1;
1054        if( a->rate < b->rate ) return 1;
1055        return 0;
1056    }
1057    else
1058    {
1059        return a->randomKey - b->randomKey;
1060    }
1061}
1062
1063static int
1064clientIsSnubbedBy( const tr_peer * peer )
1065{
1066    assert( peer != NULL );
1067
1068    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1069}
1070
1071/**
1072***
1073**/
1074
1075static void
1076rechokeLeech( Torrent * t )
1077{
1078    int i, peerCount, size=0, unchoked=0;
1079    const time_t ignorePeersNewerThan = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1080    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1081    ChokeData * choke = tr_new0( ChokeData, peerCount );
1082
1083    /* sort the peers by preference and rate */
1084    for( i=0; i<peerCount; ++i )
1085    {
1086        tr_peer * peer = peers[i];
1087        ChokeData * node;
1088        if( peer->chokeChangedAt > ignorePeersNewerThan )
1089            continue;
1090
1091        node = &choke[size++];
1092        node->peer = peer;
1093        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1094        node->randomKey = tr_rand( INT_MAX );
1095        node->rate = tr_peerIoGetRateToClient( peer->io );
1096    }
1097
1098    qsort( choke, size, sizeof(ChokeData), compareChoke );
1099
1100    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1101        choke[i].doUnchoke = 1;
1102        ++unchoked;
1103    }
1104
1105    for( ; i<size; ++i ) {
1106        choke[i].doUnchoke = 1;
1107        ++unchoked;
1108        if( choke[i].peer->peerIsInterested )
1109            break;
1110    }
1111
1112    for( i=0; i<size; ++i )
1113        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1114
1115    /* cleanup */
1116    tr_free( choke );
1117    tr_free( peers );
1118}
1119
1120static void
1121rechokeSeed( Torrent * t )
1122{
1123    int i, size;
1124    tr_peer ** peers = getConnectedPeers( t, &size );
1125
1126    /* FIXME */
1127    for( i=0; i<size; ++i )
1128        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1129
1130    tr_free( peers );
1131}
1132
1133static int
1134rechokePulse( void * vtorrent )
1135{
1136    Torrent * t = vtorrent;
1137    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1138    if( done )
1139        rechokeLeech( vtorrent );
1140    else
1141        rechokeSeed( vtorrent );
1142    return TRUE;
1143}
1144
1145/**
1146***
1147**/
1148
1149static int
1150shouldPeerBeDisconnected( Torrent * t, tr_peer * peer, int peerCount, int isSeeding )
1151{
1152    const time_t now = time( NULL );
1153    int relaxStrictnessIfFewerThanN;
1154    double strictness;
1155
1156    if( peer->io == NULL ) /* not connected */
1157        return FALSE;
1158
1159    if( !t->isRunning ) /* the torrent is stopped... nobody should be connected */
1160        return TRUE;
1161
1162    if( peer->doDisconnect ) /* someone set a `doDisconnect' flag somewhere */
1163        return TRUE;
1164
1165    /* when deciding whether or not to keep a peer, judge its responsiveness
1166       on a sliding scale that's based on how many other peers are available */
1167    relaxStrictnessIfFewerThanN =
1168        (int)(((TR_MAX_PEER_COUNT * RELAX_RULES_PERCENTAGE) / 100.0) + 0.5);
1169
1170    /* if we have >= relaxIfFewerThan, strictness is 100%.
1171       if we have zero connections, strictness is 0% */
1172    if( peerCount >= relaxStrictnessIfFewerThanN )
1173        strictness = 1.0;
1174    else
1175        strictness = peerCount / (double)relaxStrictnessIfFewerThanN;
1176
1177    /* test: has it been too long since we exchanged piece data? */
1178    if( ( now - peer->connectionChangedAt ) >= MAX_TRANSFER_IDLE ) {
1179        const uint64_t lo = MIN_TRANSFER_IDLE;
1180        const uint64_t hi = MAX_TRANSFER_IDLE;
1181        const uint64_t limit = lo + ((hi-lo) * strictness);
1182        const uint64_t interval = now - (isSeeding ? peer->clientSentPieceDataAt : peer->peerSentPieceDataAt);
1183        if( interval > limit )
1184            return TRUE;
1185    }
1186
1187    /* FIXME: SWE had other tests too... */
1188
1189    return FALSE;
1190}
1191
1192static int
1193comparePeerByConnectionDate( const void * va, const void * vb )
1194{
1195    const tr_peer * a = *(const tr_peer**) va;
1196    const tr_peer * b = *(const tr_peer**) vb;
1197    return tr_compareUint64( a->connectionChangedAt, b->connectionChangedAt );
1198}
1199
1200static int
1201reconnectPulse( void * vt UNUSED )
1202{
1203    int i, size, liveCount;
1204    Torrent * t = vt;
1205    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &size );
1206    const int isSeeding = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1207
1208    /* how many connections do we have? */
1209    for( i=liveCount=0; i<size; ++i )
1210        if( peers[i]->msgs != NULL )
1211            ++liveCount;
1212
1213    /* disconnect from some peers */
1214    for( i=0; i<size; ++i ) {
1215        tr_peer * peer = peers[i];
1216        if( shouldPeerBeDisconnected( t, peer, liveCount, isSeeding ) ) {
1217            disconnectPeer( peer );
1218            --liveCount;
1219        }
1220    }
1221 
1222    /* maybe connect to some new peers */ 
1223    if( t->isRunning && (liveCount<MAX_CONNECTED_PEERS_PER_TORRENT) )
1224    {
1225        int poolSize;
1226        int left = MAX_CONNECTED_PEERS_PER_TORRENT - liveCount;
1227        tr_peer ** pool;
1228        tr_peerMgr * manager = t->manager;
1229        const time_t now = time( NULL );
1230
1231        /* make a list of peers we know about but aren't connected to */
1232        poolSize = 0;
1233        pool = tr_new0( tr_peer*, size );
1234        for( i=0; i<size; ++i ) {
1235            tr_peer * peer = peers[i];
1236            if( peer->msgs == NULL )
1237                pool[poolSize++] = peer;
1238        }
1239
1240        /* sort them s.t. the ones we've already tried are at the last of the list */
1241        qsort( pool, poolSize, sizeof(tr_peer*), comparePeerByConnectionDate );
1242
1243        /* make some connections */
1244        for( i=0; i<poolSize && left>0; ++i )
1245        {
1246            tr_peer * peer = pool[i];
1247            tr_peerIo * io;
1248
1249            if( ( now - peer->connectionChangedAt ) < MIN_HANGUP_PERIOD_SEC )
1250                break;
1251
1252            /* already have a handshake pending */
1253            if( getExistingHandshake( manager, &peer->in_addr ) != NULL )
1254                continue;
1255
1256            /* initiate a connection to the peer */
1257            io = tr_peerIoNewOutgoing( manager->handle, &peer->in_addr, peer->port, t->hash );
1258            fprintf( stderr, "[%s] connecting to potential peer %s\n", t->tor->info.name, tr_peerIoGetAddrStr(io) );
1259            peer->connectionChangedAt = time( NULL );
1260            initiateHandshake( manager, io );
1261            --left;
1262        }
1263
1264        tr_free( pool );
1265    }
1266
1267    return TRUE;
1268}
Note: See TracBrowser for help on using the repository browser.