source: trunk/libtransmission/peer-mgr.c @ 3197

Last change on this file since 3197 was 3197, checked in by charles, 15 years ago

some experimental code. (1) try to improve throughput to peers. (2) add first draft of new tr_stat fields requested by BentMyWookie? (3) raise the per-torrent peer limit to 100 to match LibTorrent?'s defaults

  • Property svn:keywords set to Date Rev Author Id
File size: 35.8 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3197 2007-09-27 03:03:38Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include "transmission.h"
20#include "clients.h"
21#include "completion.h"
22#include "handshake.h"
23#include "net.h"
24#include "peer-io.h"
25#include "peer-mgr.h"
26#include "peer-mgr-private.h"
27#include "peer-msgs.h"
28#include "ptrarray.h"
29#include "trevent.h"
30#include "utils.h"
31
32enum
33{
34    /* how frequently to change which peers are choked */
35    RECHOKE_PERIOD_MSEC = (15 * 1000),
36
37    /* how frequently to decide which peers live and die */
38    RECONNECT_PERIOD_MSEC = (15 * 1000),
39
40    /* how frequently to refill peers' request lists */
41    REFILL_PERIOD_MSEC = 1000,
42
43    /* how many peers to unchoke per-torrent. */
44    /* FIXME: make this user-configurable? */
45    NUM_UNCHOKED_PEERS_PER_TORRENT = 8,
46
47    /* don't change a peer's choke status more often than this */
48    MIN_CHOKE_PERIOD_SEC = 10,
49
50    /* how soon is `soon' in the rechokeSoon, reconnecSoon funcs */
51    SOON_MSEC = 1000,
52
53    /* following the BT spec, we consider ourselves `snubbed' if
54     * we're we don't get piece data from a peer in this long */
55    SNUBBED_SEC = 60,
56
57    /* if our connection count for a torrent is <= N% of what we wanted,
58     * start relaxing the rules that decide when to disconnect a peer */
59    RELAX_RULES_PERCENTAGE = 25,
60
61    /* if we're not relaxing the rules, disconnect a peer that hasn't
62     * given us anything (or taken, if we're seeding) in this long */
63    MIN_TRANSFER_IDLE = 90000,
64
65    /* even if we're relaxing the rules, disconnect a peer that hasn't
66     * given us anything (or taken, if we're seeding) in this long */
67    MAX_TRANSFER_IDLE = 240000,
68
69    /* this is arbitrary and, hopefully, temporary until we come up
70     * with a better idea for managing the connection limits */
71    MAX_CONNECTED_PEERS_PER_TORRENT = 100,
72
73    /* if we hang up on a peer for being worthless, don't try to
74     * reconnect to it for this long. */
75    MIN_HANGUP_PERIOD_SEC = 120
76};
77
78/**
79***
80**/
81
82typedef struct
83{
84    uint8_t hash[SHA_DIGEST_LENGTH];
85    tr_ptrArray * peers; /* tr_peer */
86    tr_timer * reconnectTimer;
87    tr_timer * reconnectSoonTimer;
88    tr_timer * rechokeTimer;
89    tr_timer * rechokeSoonTimer;
90    tr_timer * refillTimer;
91    tr_torrent * tor;
92    tr_bitfield * requested;
93
94    unsigned int isRunning : 1;
95
96    struct tr_peerMgr * manager;
97}
98Torrent;
99
100struct tr_peerMgr
101{
102    tr_handle * handle;
103    tr_ptrArray * torrents; /* Torrent */
104    int connectionCount;
105    tr_ptrArray * handshakes; /* in-process */
106};
107
108/**
109***
110**/
111
112static int
113handshakeCompareToAddr( const void * va, const void * vb )
114{
115    const tr_handshake * a = va;
116    const struct in_addr * b = vb;
117    return memcmp( tr_handshakeGetAddr( a, NULL ), b, sizeof( struct in_addr ) );
118}
119
120static int
121handshakeCompare( const void * a, const void * b )
122{
123    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
124}
125
126static tr_handshake*
127getExistingHandshake( tr_peerMgr * mgr, const struct in_addr * in_addr )
128{
129    return tr_ptrArrayFindSorted( mgr->handshakes,
130                                  in_addr,
131                                  handshakeCompareToAddr );
132}
133
134/**
135***
136**/
137
138static int
139torrentCompare( const void * va, const void * vb )
140{
141    const Torrent * a = va;
142    const Torrent * b = vb;
143    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
144}
145
146static int
147torrentCompareToHash( const void * va, const void * vb )
148{
149    const Torrent * a = (const Torrent*) va;
150    const uint8_t * b_hash = (const uint8_t*) vb;
151    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
152}
153
154static Torrent*
155getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
156{
157    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
158                                             hash,
159                                             torrentCompareToHash );
160}
161
162static int
163peerCompare( const void * va, const void * vb )
164{
165    const tr_peer * a = (const tr_peer *) va;
166    const tr_peer * b = (const tr_peer *) vb;
167    return memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
168}
169
170static int
171peerCompareToAddr( const void * va, const void * vb )
172{
173    const tr_peer * a = (const tr_peer *) va;
174    const struct in_addr * b = (const struct in_addr *) vb;
175    return memcmp( &a->in_addr, b, sizeof(struct in_addr) );
176}
177
178static tr_peer*
179getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
180{
181    assert( torrent != NULL );
182    assert( torrent->peers != NULL );
183    assert( in_addr != NULL );
184
185    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
186                                             in_addr,
187                                             peerCompareToAddr );
188}
189
190static tr_peer*
191getPeer( Torrent * torrent, const struct in_addr * in_addr, int * isNew )
192{
193    tr_peer * peer = getExistingPeer( torrent, in_addr );
194
195    if( isNew )
196        *isNew = peer == NULL;
197
198    if( peer == NULL )
199    {
200        peer = tr_new0( tr_peer, 1 );
201        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
202        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
203    }
204
205    return peer;
206}
207
208static void
209disconnectPeer( tr_peer * peer )
210{
211    assert( peer != NULL );
212
213    tr_peerIoFree( peer->io );
214    peer->io = NULL;
215
216    if( peer->msgs != NULL )
217    {
218        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
219        tr_peerMsgsFree( peer->msgs );
220        peer->msgs = NULL;
221    }
222
223    tr_bitfieldFree( peer->have );
224    peer->have = NULL;
225
226    tr_bitfieldFree( peer->blame );
227    peer->blame = NULL;
228
229    tr_bitfieldFree( peer->banned );
230    peer->banned = NULL;
231}
232
233static void
234freePeer( tr_peer * peer )
235{
236    disconnectPeer( peer );
237    tr_free( peer->client );
238    tr_free( peer );
239}
240
241static void
242freeTorrent( tr_peerMgr * manager, Torrent * t )
243{
244    int i, size;
245    tr_peer ** peers;
246    uint8_t hash[SHA_DIGEST_LENGTH];
247
248    assert( manager != NULL );
249    assert( t != NULL );
250    assert( t->peers != NULL );
251    assert( getExistingTorrent( manager, t->hash ) != NULL );
252
253    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
254
255    tr_timerFree( &t->reconnectTimer );
256    tr_timerFree( &t->reconnectSoonTimer );
257    tr_timerFree( &t->rechokeTimer );
258    tr_timerFree( &t->rechokeSoonTimer );
259    tr_timerFree( &t->refillTimer );
260
261    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
262    for( i=0; i<size; ++i )
263        freePeer( peers[i] );
264
265    tr_bitfieldFree( t->requested );
266    tr_ptrArrayFree( t->peers );
267    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
268    tr_free( t );
269
270    assert( getExistingTorrent( manager, hash ) == NULL );
271}
272
273/**
274***
275**/
276
277tr_peerMgr*
278tr_peerMgrNew( tr_handle * handle )
279{
280    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
281    m->handle = handle;
282    m->torrents = tr_ptrArrayNew( );
283    m->handshakes = tr_ptrArrayNew( );
284    return m;
285}
286
287void
288tr_peerMgrFree( tr_peerMgr * manager )
289{
290    while( !tr_ptrArrayEmpty( manager->handshakes ) )
291        tr_handshakeAbort( (tr_handshake*)tr_ptrArrayNth( manager->handshakes, 0) );
292    tr_ptrArrayFree( manager->handshakes );
293
294    while( !tr_ptrArrayEmpty( manager->torrents ) )
295        freeTorrent( manager, (Torrent*)tr_ptrArrayNth( manager->torrents, 0) );
296    tr_ptrArrayFree( manager->torrents );
297
298    tr_free( manager );
299}
300
301static tr_peer**
302getConnectedPeers( Torrent * t, int * setmeCount )
303{
304    int i, peerCount, connectionCount;
305    tr_peer **peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
306    tr_peer **ret = tr_new( tr_peer*, peerCount );
307
308    for( i=connectionCount=0; i<peerCount; ++i )
309        if( peers[i]->msgs != NULL )
310            ret[connectionCount++] = peers[i];
311
312    *setmeCount = connectionCount;
313    return ret;
314}
315
316/***
317****  Refill
318***/
319
320struct tr_refill_piece
321{
322    tr_priority_t priority;
323    uint32_t piece;
324    uint32_t peerCount;
325};
326
327static int
328compareRefillPiece (const void * aIn, const void * bIn)
329{
330    const struct tr_refill_piece * a = aIn;
331    const struct tr_refill_piece * b = bIn;
332
333    /* if one piece has a higher priority, it goes first */
334    if (a->priority != b->priority)
335        return a->priority > b->priority ? -1 : 1;
336
337    /* otherwise if one has fewer peers, it goes first */
338    if (a->peerCount != b->peerCount)
339        return a->peerCount < b->peerCount ? -1 : 1;
340
341    /* otherwise go with the earlier piece */
342    return a->piece - b->piece;
343}
344
345static int
346isPieceInteresting( const tr_torrent  * tor,
347                    int                 piece )
348{
349    if( tor->info.pieces[piece].dnd ) /* we don't want it */
350        return 0;
351
352    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
353        return 0;
354
355    return 1;
356}
357
358static uint32_t*
359getPreferredPieces( Torrent     * t,
360                    uint32_t    * pieceCount )
361{
362    const tr_torrent * tor = t->tor;
363    const tr_info * inf = &tor->info;
364
365    int i;
366    uint32_t poolSize = 0;
367    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
368
369    int peerCount;
370    tr_peer** peers = getConnectedPeers( t, &peerCount );
371
372    for( i=0; i<inf->pieceCount; ++i )
373        if( isPieceInteresting( tor, i ) )
374            pool[poolSize++] = i;
375
376    /* sort the pool from most interesting to least... */
377    if( poolSize > 1 )
378    {
379        uint32_t j;
380        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
381
382        for( j=0; j<poolSize; ++j )
383        {
384            int k;
385            const int piece = pool[j];
386            struct tr_refill_piece * setme = p + j;
387
388            setme->piece = piece;
389            setme->priority = inf->pieces[piece].priority;
390            setme->peerCount = 0;
391
392            for( k=0; k<peerCount; ++k ) {
393                const tr_peer * peer = peers[k];
394                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
395                    ++setme->peerCount;
396            }
397        }
398
399        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
400
401        for( j=0; j<poolSize; ++j )
402            pool[j] = p[j].piece;
403
404        tr_free( p );
405    }
406
407#if 0
408fprintf (stderr, "new pool: ");
409for (i=0; i<15 && i<(int)poolSize; ++i ) fprintf (stderr, "%d, ", (int)pool[i] );
410fprintf (stderr, "\n");
411#endif
412    tr_free( peers );
413
414    *pieceCount = poolSize;
415    return pool;
416}
417
418static uint64_t*
419getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
420{
421    uint32_t i;
422    uint32_t pieceCount;
423    uint32_t * pieces;
424    uint64_t *req, *unreq, *ret, *walk;
425    int reqCount, unreqCount;
426    const tr_torrent * tor = t->tor;
427
428    pieces = getPreferredPieces( t, &pieceCount );
429/*fprintf( stderr, "REFILL refillPulse for {%s} got %d of %d pieces\n", tor->info.name, (int)pieceCount, t->tor->info.pieceCount );*/
430
431    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
432    reqCount = 0;
433    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
434    unreqCount = 0;
435
436    for( i=0; i<pieceCount; ++i ) {
437        const uint32_t index = pieces[i];
438        const int begin = tr_torPieceFirstBlock( tor, index );
439        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
440        int block;
441        for( block=begin; block<end; ++block )
442            if( tr_cpBlockIsComplete( tor->completion, block ) )
443                continue;
444            else if( tr_bitfieldHas( t->requested, block ) )
445                req[reqCount++] = block;
446            else
447                unreq[unreqCount++] = block;
448    }
449
450/*fprintf( stderr, "REFILL refillPulse for {%s} reqCount is %d, unreqCount is %d\n", tor->info.name, (int)reqCount, (int)unreqCount );*/
451    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
452    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
453    walk += unreqCount;
454    memcpy( walk, req, sizeof(uint64_t) * reqCount );
455    walk += reqCount;
456    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
457    *setmeCount = walk - ret;
458
459    tr_free( req );
460    tr_free( unreq );
461    tr_free( pieces );
462
463    return ret;
464}
465
466static int
467refillPulse( void * vtorrent )
468{
469    Torrent * t = vtorrent;
470    tr_torrent * tor = t->tor;
471    uint32_t i;
472    int peerCount;
473    tr_peer ** peers;
474    uint64_t blockCount;
475    uint64_t * blocks;
476
477    if( !t->isRunning )
478        return TRUE;
479    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
480        return TRUE;
481
482    blocks = getPreferredBlocks( t, &blockCount );
483    peers = getConnectedPeers( t, &peerCount );
484
485/*fprintf( stderr, "REFILL refillPulse for {%s} got %d blocks\n", tor->info.name, (int)blockCount );*/
486
487    for( i=0; peerCount && i<blockCount; ++i )
488    {
489        const int block = blocks[i];
490        const uint32_t index = tr_torBlockPiece( tor, block );
491        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
492        const uint32_t length = tr_torBlockCountBytes( tor, block );
493        int j;
494        assert( _tr_block( tor, index, begin ) == block );
495        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
496        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
497
498
499        /* find a peer who can ask for this block */
500        for( j=0; j<peerCount; )
501        {
502            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
503            switch( val )
504            {
505                case TR_ADDREQ_FULL: 
506                case TR_ADDREQ_CLIENT_CHOKED:
507                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
508                    break;
509
510                case TR_ADDREQ_MISSING: 
511                    ++j;
512                    break;
513
514                case TR_ADDREQ_OK:
515                    /*fprintf( stderr, "REFILL peer %p took the request for block %d\n", peers[j]->msgs, block );*/
516                    tr_bitfieldAdd( t->requested, block );
517                    j = peerCount;
518                    break;
519
520                default:
521                    assert( 0 && "unhandled value" );
522                    break;
523            }
524        }
525    }
526
527    /* cleanup */
528    tr_free( peers );
529    tr_free( blocks );
530
531    t->refillTimer = NULL;
532    return FALSE;
533}
534
535static void
536broadcastClientHave( Torrent * t, uint32_t index )
537{
538    int i, size;
539    tr_peer ** peers = getConnectedPeers( t, &size );
540    for( i=0; i<size; ++i )
541        tr_peerMsgsHave( peers[i]->msgs, index );
542    tr_free( peers );
543}
544
545static void
546broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
547{
548    int i, size;
549    tr_peer ** peers = getConnectedPeers( t, &size );
550    for( i=0; i<size; ++i )
551        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
552    tr_free( peers );
553}
554
555/**
556***
557**/
558
559static int reconnectPulse( void * vtorrent );
560
561static void
562restartReconnectTimer( Torrent * t )
563{
564    tr_timerFree( &t->reconnectTimer );
565    if( t->isRunning )
566        t->reconnectTimer = tr_timerNew( t->manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC );
567}
568
569static void
570reconnectNow( Torrent * t )
571{
572    reconnectPulse( t );
573    restartReconnectTimer( t );
574}
575
576static int
577reconnectSoonCB( void * vt )
578{
579    Torrent * t = vt;
580    reconnectNow( t );
581    t->reconnectSoonTimer = NULL;
582    return FALSE;
583}
584
585static void
586reconnectSoon( Torrent * t )
587{
588    if( t->reconnectSoonTimer == NULL )
589        t->reconnectSoonTimer = tr_timerNew( t->manager->handle,
590                                             reconnectSoonCB, t, SOON_MSEC );
591}
592
593/**
594***
595**/
596
597static int rechokePulse( void * vtorrent );
598
599static void
600restartChokeTimer( Torrent * t )
601{
602    tr_timerFree( &t->rechokeTimer );
603    if( t->isRunning )
604        t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
605}
606
607static void
608rechokeNow( Torrent * t )
609{
610    rechokePulse( t );
611    restartChokeTimer( t );
612}
613
614static int
615rechokeSoonCB( void * vt )
616{
617    Torrent * t = vt;
618    rechokeNow( t );
619    t->rechokeSoonTimer = NULL;
620    return FALSE;
621}
622
623static void
624rechokeSoon( Torrent * t )
625{
626    if( t->rechokeSoonTimer == NULL )
627        t->rechokeSoonTimer = tr_timerNew( t->manager->handle,
628                                           rechokeSoonCB, t, SOON_MSEC );
629}
630
631static void
632msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
633{
634    tr_peer * peer = vpeer;
635    Torrent * t = (Torrent *) vt;
636    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
637
638    switch( e->eventType )
639    {
640        case TR_PEERMSG_NEED_REQ:
641            if( t->refillTimer == NULL )
642                t->refillTimer = tr_timerNew( t->manager->handle,
643                                              refillPulse, t,
644                                              REFILL_PERIOD_MSEC );
645            break;
646
647        case TR_PEERMSG_CLIENT_HAVE:
648            broadcastClientHave( t, e->pieceIndex );
649            tr_torrentRecheckCompleteness( t->tor );
650            break;
651
652        case TR_PEERMSG_PEER_PROGRESS: { /* if we're both seeds, then disconnect. */
653#if 0
654            const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
655            const int peerIsSeed = e->progress >= 1.0;
656            if( clientIsSeed && peerIsSeed )
657                peer->doPurge = 1;
658#endif
659            break;
660        }
661
662        case TR_PEERMSG_CLIENT_BLOCK:
663            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
664            break;
665
666        case TR_PEERMSG_GOT_ERROR:
667            peer->doPurge = 1;
668            reconnectSoon( t );
669            break;
670
671        default:
672            assert(0);
673    }
674}
675
676static void
677myHandshakeDoneCB( tr_handshake    * handshake,
678                   tr_peerIo       * io,
679                   int               isConnected,
680                   const uint8_t   * peer_id,
681                   int               peerSupportsEncryption,
682                   void            * vmanager )
683{
684    int ok = isConnected;
685    uint16_t port;
686    const struct in_addr * in_addr;
687    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
688    const uint8_t * hash = NULL;
689    Torrent * t;
690    tr_handshake * ours;
691
692    assert( io != NULL );
693    assert( isConnected==0 || isConnected==1 );
694    assert( peerSupportsEncryption==0 || peerSupportsEncryption==1 );
695
696    ours = tr_ptrArrayRemoveSorted( manager->handshakes,
697                                    handshake,
698                                    handshakeCompare );
699    //assert( ours != NULL );
700    //assert( ours == handshake );
701
702    in_addr = tr_peerIoGetAddress( io, &port );
703
704    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
705    {
706        tr_peerIoFree( io );
707        --manager->connectionCount;
708        return;
709    }
710
711    hash = tr_peerIoGetTorrentHash( io );
712    t = getExistingTorrent( manager, hash );
713    if( !t || !t->isRunning )
714    {
715        tr_peerIoFree( io );
716        --manager->connectionCount;
717        return;
718    }
719
720    /* if we couldn't connect or were snubbed,
721     * the peer's probably not worth remembering. */
722    if( !ok ) {
723        tr_peer * peer = getExistingPeer( t, in_addr );
724        tr_peerIoFree( io );
725        --manager->connectionCount;
726        if( peer )
727            peer->doPurge = 1;
728        return;
729    }
730
731    if( 1 ) {
732        tr_peer * peer = getPeer( t, in_addr, NULL );
733        if( peer->msgs != NULL ) { /* we already have this peer */
734            tr_peerIoFree( io );
735            --manager->connectionCount;
736        } else {
737            peer->port = port;
738            peer->io = io;
739            peer->msgs = tr_peerMsgsNew( t->tor, peer );
740            tr_free( peer->client );
741            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
742            peer->peerSupportsEncryption = peerSupportsEncryption ? 1 : 0;
743            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
744            peer->connectionChangedAt = time( NULL );
745            rechokeSoon( t );
746        }
747    }
748}
749
750static void
751initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
752{
753    tr_handshake * handshake = tr_handshakeNew( io,
754                                                manager->handle->encryptionMode,
755                                                myHandshakeDoneCB,
756                                                manager );
757    ++manager->connectionCount;
758
759    tr_ptrArrayInsertSorted( manager->handshakes, handshake, handshakeCompare );
760}
761
762void
763tr_peerMgrAddIncoming( tr_peerMgr      * manager,
764                       struct in_addr  * addr,
765                       uint16_t          port,
766                       int               socket )
767{
768    if( getExistingHandshake( manager, addr ) == NULL )
769    {
770        tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
771        initiateHandshake( manager, io );
772    }
773}
774
775void
776tr_peerMgrAddPex( tr_peerMgr     * manager,
777                  const uint8_t  * torrentHash,
778                  int              from,
779                  const tr_pex   * pex,
780                  int              pexCount )
781{
782    Torrent * t = getExistingTorrent( manager, torrentHash );
783    const tr_pex * end = pex + pexCount;
784    while( pex != end )
785    {
786        int isNew;
787        tr_peer * peer = getPeer( t, &pex->in_addr, &isNew );
788        if( isNew ) {
789            peer->port = pex->port;
790            peer->from = from;
791        }
792        ++pex;
793    }
794    reconnectSoon( t );
795}
796
797void
798tr_peerMgrAddPeers( tr_peerMgr    * manager,
799                    const uint8_t * torrentHash,
800                    int             from,
801                    const uint8_t * peerCompact,
802                    int             peerCount )
803{
804    int i;
805    const uint8_t * walk = peerCompact;
806    Torrent * t = getExistingTorrent( manager, torrentHash );
807    for( i=0; t!=NULL && i<peerCount; ++i )
808    {
809        int isNew;
810        tr_peer * peer;
811        struct in_addr addr;
812        uint16_t port;
813        memcpy( &addr, walk, 4 ); walk += 4;
814        memcpy( &port, walk, 2 ); walk += 2;
815        peer = getPeer( t, &addr, &isNew );
816        if( isNew ) {
817            peer->port = port;
818            peer->from = from;
819        }
820    }
821    reconnectSoon( t );
822}
823
824/**
825***
826**/
827
828int
829tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager UNUSED )
830{
831    return TRUE; /* manager->connectionCount < MAX_CONNECTED_PEERS; */
832}
833
834void
835tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
836                    const uint8_t  * torrentHash UNUSED,
837                    int              pieceIndex UNUSED,
838                    int              success UNUSED )
839{
840    /*fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );*/
841}
842
843int
844tr_pexCompare( const void * va, const void * vb )
845{
846    const tr_pex * a = (const tr_pex *) va;
847    const tr_pex * b = (const tr_pex *) vb;
848    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
849    if( i ) return i;
850    if( a->port < b->port ) return -1;
851    if( a->port > b->port ) return 1;
852    return 0;
853}
854
855int tr_pexCompare( const void * a, const void * b );
856
857
858int
859tr_peerMgrGetPeers( tr_peerMgr      * manager,
860                    const uint8_t   * torrentHash,
861                    tr_pex         ** setme_pex )
862{
863    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
864    int i, peerCount;
865    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
866    tr_pex * pex = tr_new( tr_pex, peerCount );
867    tr_pex * walk = pex;
868
869    for( i=0; i<peerCount; ++i, ++walk )
870    {
871        const tr_peer * peer = peers[i];
872
873        walk->in_addr = peer->in_addr;
874
875        walk->port = peer->port;
876
877        walk->flags = 0;
878        if( peer->peerSupportsEncryption ) walk->flags |= 1;
879        if( peer->progress >= 1.0 )        walk->flags |= 2;
880    }
881
882    assert( ( walk - pex ) == peerCount );
883    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
884    *setme_pex = pex;
885    return peerCount;
886}
887
888void
889tr_peerMgrStartTorrent( tr_peerMgr     * manager,
890                        const uint8_t  * torrentHash )
891{
892    Torrent * t = getExistingTorrent( manager, torrentHash );
893    t->isRunning = 1;
894    restartChokeTimer( t );
895    reconnectNow( t );
896}
897
898void
899tr_peerMgrStopTorrent( tr_peerMgr     * manager,
900                       const uint8_t  * torrentHash)
901{
902    Torrent * t = getExistingTorrent( manager, torrentHash );
903    t->isRunning = 0;
904    tr_timerFree( &t->rechokeTimer );
905    tr_timerFree( &t->reconnectTimer );
906    reconnectPulse( t );
907}
908
909void
910tr_peerMgrAddTorrent( tr_peerMgr * manager,
911                      tr_torrent * tor )
912{
913    Torrent * t;
914
915    assert( tor != NULL );
916    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
917
918    t = tr_new0( Torrent, 1 );
919    t->manager = manager;
920    t->tor = tor;
921    t->peers = tr_ptrArrayNew( );
922    t->requested = tr_bitfieldNew( tor->blockCount );
923    restartChokeTimer( t );
924    restartReconnectTimer( t );
925
926    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
927    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
928}
929
930void
931tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
932                         const uint8_t  * torrentHash )
933{
934    Torrent * t = getExistingTorrent( manager, torrentHash );
935    assert( t != NULL );
936    tr_peerMgrStopTorrent( manager, torrentHash );
937    freeTorrent( manager, t );
938}
939
940void
941tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
942                               const uint8_t    * torrentHash,
943                               int8_t           * tab,
944                               int                tabCount )
945{
946    int i;
947    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
948    const tr_torrent * tor = t->tor;
949    const float interval = tor->info.pieceCount / (float)tabCount;
950
951    memset( tab, 0, tabCount );
952
953    for( i=0; i<tabCount; ++i )
954    {
955        const int piece = i * interval;
956
957        if( tor == NULL )
958            tab[i] = 0;
959        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
960            tab[i] = -1;
961        else {
962            int j, peerCount;
963            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
964            for( j=0; j<peerCount; ++j )
965                if( tr_bitfieldHas( peers[j]->have, i ) )
966                    ++tab[i];
967        }
968    }
969}
970
971/* Returns the pieces that we and/or a connected peer has */
972tr_bitfield*
973tr_peerMgrGetAvailable( const tr_peerMgr * manager,
974                        const uint8_t    * torrentHash )
975{
976    int i, size;
977    const Torrent * t;
978    const tr_peer ** peers;
979    tr_bitfield * pieces;
980
981    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
982    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
983    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
984    for( i=0; i<size; ++i )
985        if( peers[i]->io != NULL )
986            tr_bitfieldAnd( pieces, peers[i]->have );
987
988    return pieces;
989}
990
991void
992tr_peerMgrTorrentStats( const tr_peerMgr * manager,
993                        const uint8_t    * torrentHash,
994                        int              * setmePeersTotal,
995                        int              * setmePeersConnected,
996                        int              * setmePeersSendingToUs,
997                        int              * setmePeersGettingFromUs,
998                        int              * setmePeersFrom )
999{
1000    int i, size;
1001    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1002    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1003
1004    *setmePeersTotal          = size;
1005    *setmePeersConnected      = 0;
1006    *setmePeersSendingToUs    = 0;
1007    *setmePeersGettingFromUs  = 0;
1008
1009    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1010        setmePeersFrom[i] = 0;
1011
1012    for( i=0; i<size; ++i )
1013    {
1014        const tr_peer * peer = peers[i];
1015
1016        if( peer->io == NULL ) /* not connected */
1017            continue;
1018
1019        ++*setmePeersConnected;
1020
1021        ++setmePeersFrom[peer->from];
1022
1023        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
1024            ++*setmePeersGettingFromUs;
1025
1026        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
1027            ++*setmePeersSendingToUs;
1028    }
1029}
1030
1031struct tr_peer_stat *
1032tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1033                     const uint8_t     * torrentHash,
1034                     int               * setmeCount UNUSED )
1035{
1036    int i, size;
1037    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1038    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1039    tr_peer_stat * ret;
1040
1041    ret = tr_new0( tr_peer_stat, size );
1042
1043    for( i=0; i<size; ++i )
1044    {
1045        const tr_peer * peer = peers[i];
1046        const int live = peer->io != NULL;
1047        tr_peer_stat * stat = ret + i;
1048
1049        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1050        stat->port             = peer->port;
1051        stat->from             = peer->from;
1052        stat->client           = peer->client;
1053        stat->progress         = peer->progress;
1054        stat->isConnected      = live ? 1 : 0;
1055        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1056        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
1057        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
1058        stat->isDownloading    = stat->uploadToRate > 0.01;
1059        stat->isUploading      = stat->downloadFromRate > 0.01;
1060    }
1061
1062    *setmeCount = size;
1063    return ret;
1064}
1065
1066/**
1067***
1068**/
1069
1070typedef struct
1071{
1072    tr_peer * peer;
1073    float rate;
1074    int randomKey;
1075    int preferred;
1076    int doUnchoke;
1077}
1078ChokeData;
1079
1080static int
1081compareChoke( const void * va, const void * vb )
1082{
1083    const ChokeData * a = ( const ChokeData * ) va;
1084    const ChokeData * b = ( const ChokeData * ) vb;
1085
1086    if( a->preferred != b->preferred )
1087        return a->preferred ? -1 : 1;
1088
1089    if( a->preferred )
1090    {
1091        if( a->rate > b->rate ) return -1;
1092        if( a->rate < b->rate ) return 1;
1093        return 0;
1094    }
1095    else
1096    {
1097        return a->randomKey - b->randomKey;
1098    }
1099}
1100
1101static int
1102clientIsSnubbedBy( const tr_peer * peer )
1103{
1104    assert( peer != NULL );
1105
1106    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1107}
1108
1109/**
1110***
1111**/
1112
1113static void
1114rechokeLeech( Torrent * t )
1115{
1116    int i, peerCount, size=0, unchoked=0;
1117    const time_t ignorePeersNewerThan = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1118    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1119    ChokeData * choke = tr_new0( ChokeData, peerCount );
1120
1121    /* sort the peers by preference and rate */
1122    for( i=0; i<peerCount; ++i )
1123    {
1124        tr_peer * peer = peers[i];
1125        ChokeData * node;
1126        if( peer->chokeChangedAt > ignorePeersNewerThan )
1127            continue;
1128
1129        node = &choke[size++];
1130        node->peer = peer;
1131        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1132        node->randomKey = tr_rand( INT_MAX );
1133        node->rate = tr_peerIoGetRateToClient( peer->io );
1134    }
1135
1136    qsort( choke, size, sizeof(ChokeData), compareChoke );
1137
1138    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1139        choke[i].doUnchoke = 1;
1140        ++unchoked;
1141    }
1142
1143    for( ; i<size; ++i ) {
1144        choke[i].doUnchoke = 1;
1145        ++unchoked;
1146        if( choke[i].peer->peerIsInterested )
1147            break;
1148    }
1149
1150    for( i=0; i<size; ++i )
1151        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1152
1153    /* cleanup */
1154    tr_free( choke );
1155    tr_free( peers );
1156}
1157
1158static void
1159rechokeSeed( Torrent * t )
1160{
1161    int i, size;
1162    tr_peer ** peers = getConnectedPeers( t, &size );
1163
1164    /* FIXME */
1165    for( i=0; i<size; ++i )
1166        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1167
1168    tr_free( peers );
1169}
1170
1171static int
1172rechokePulse( void * vtorrent )
1173{
1174    Torrent * t = vtorrent;
1175    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1176    if( done )
1177        rechokeLeech( vtorrent );
1178    else
1179        rechokeSeed( vtorrent );
1180    return TRUE;
1181}
1182
1183/**
1184***
1185**/
1186
1187static int
1188shouldPeerBeDisconnected( Torrent * t, tr_peer * peer, int peerCount, int isSeeding )
1189{
1190    const time_t now = time( NULL );
1191    int relaxStrictnessIfFewerThanN;
1192    double strictness;
1193
1194    if( peer->io == NULL ) /* not connected */
1195        return FALSE;
1196
1197    if( !t->isRunning ) /* the torrent is stopped... nobody should be connected */
1198        return TRUE;
1199
1200    /* not enough peers to go around... might as well keep this one;
1201     * they might unchoke us or give us a pex or something */
1202    if( peerCount < MAX_CONNECTED_PEERS_PER_TORRENT )
1203        return FALSE;
1204
1205    /* when deciding whether or not to keep a peer, judge its responsiveness
1206       on a sliding scale that's based on how many other peers are available */
1207    relaxStrictnessIfFewerThanN =
1208        (int)(((TR_MAX_PEER_COUNT * RELAX_RULES_PERCENTAGE) / 100.0) + 0.5);
1209
1210    /* if we have >= relaxIfFewerThan, strictness is 100%.
1211       if we have zero connections, strictness is 0% */
1212    if( peerCount >= relaxStrictnessIfFewerThanN )
1213        strictness = 1.0;
1214    else
1215        strictness = peerCount / (double)relaxStrictnessIfFewerThanN;
1216
1217    /* test: has it been too long since we exchanged piece data? */
1218    if( ( now - peer->connectionChangedAt ) >= MAX_TRANSFER_IDLE ) {
1219        const uint64_t lo = MIN_TRANSFER_IDLE;
1220        const uint64_t hi = MAX_TRANSFER_IDLE;
1221        const uint64_t limit = lo + ((hi-lo) * strictness);
1222        const uint64_t interval = now - (isSeeding ? peer->clientSentPieceDataAt : peer->peerSentPieceDataAt);
1223        if( interval > limit )
1224            return TRUE;
1225    }
1226
1227    /* FIXME: SWE had other tests too... */
1228
1229    return FALSE;
1230}
1231
1232static int
1233comparePeerByConnectionDate( const void * va, const void * vb )
1234{
1235    const tr_peer * a = *(const tr_peer**) va;
1236    const tr_peer * b = *(const tr_peer**) vb;
1237    return tr_compareUint64( a->connectionChangedAt, b->connectionChangedAt );
1238}
1239
1240static int
1241reconnectPulse( void * vt UNUSED )
1242{
1243    int i, size, liveCount;
1244    Torrent * t = vt;
1245    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &size );
1246    const int isSeeding = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1247
1248    /* how many connections do we have? */
1249    for( i=liveCount=0; i<size; ++i )
1250        if( peers[i]->msgs != NULL )
1251            ++liveCount;
1252
1253    /* destroy and/or disconnect from some peers */
1254    for( i=0; i<size; )
1255    {
1256        tr_peer * peer = peers[i];
1257
1258        if( peer->doPurge ) {
1259            tr_ptrArrayErase( t->peers, i, i+1 );
1260            freePeer( peer );
1261            --size;
1262            --liveCount;
1263            continue;
1264        }
1265
1266        if( shouldPeerBeDisconnected( t, peer, liveCount, isSeeding ) ) {
1267            disconnectPeer( peer );
1268            --liveCount;
1269        }
1270
1271        ++i;
1272    }
1273
1274    /* maybe connect to some new peers */ 
1275    if( t->isRunning && (liveCount<MAX_CONNECTED_PEERS_PER_TORRENT) )
1276    {
1277        int poolSize;
1278        int left = MAX_CONNECTED_PEERS_PER_TORRENT - liveCount;
1279        tr_peer ** pool;
1280        tr_peerMgr * manager = t->manager;
1281        const time_t now = time( NULL );
1282
1283        /* make a list of peers we know about but aren't connected to */
1284        poolSize = 0;
1285        pool = tr_new0( tr_peer*, size );
1286        for( i=0; i<size; ++i ) {
1287            tr_peer * peer = peers[i];
1288            if( peer->msgs == NULL )
1289                pool[poolSize++] = peer;
1290        }
1291
1292        /* sort them s.t. the ones we've already tried are at the last of the list */
1293        qsort( pool, poolSize, sizeof(tr_peer*), comparePeerByConnectionDate );
1294
1295        /* make some connections */
1296        for( i=0; i<poolSize && left>0; ++i )
1297        {
1298            tr_peer * peer = pool[i];
1299            tr_peerIo * io;
1300
1301            if( ( now - peer->connectionChangedAt ) < MIN_HANGUP_PERIOD_SEC )
1302                break;
1303
1304            /* already have a handshake pending */
1305            if( getExistingHandshake( manager, &peer->in_addr ) != NULL )
1306                continue;
1307
1308            /* initiate a connection to the peer */
1309            io = tr_peerIoNewOutgoing( manager->handle, &peer->in_addr, peer->port, t->hash );
1310            /*fprintf( stderr, "[%s] connecting to potential peer %s\n", t->tor->info.name, tr_peerIoGetAddrStr(io) );*/
1311            peer->connectionChangedAt = time( NULL );
1312            initiateHandshake( manager, io );
1313            --left;
1314        }
1315
1316        tr_free( pool );
1317    }
1318
1319    return TRUE;
1320}
Note: See TracBrowser for help on using the repository browser.