source: trunk/libtransmission/peer-mgr.c @ 3144

Last change on this file since 3144 was 3144, checked in by charles, 15 years ago

fix bug that tended to disconnect from valid peers when we were seeding. also, follow the BT spec's terminology a little closer.

  • Property svn:keywords set to Date Rev Author Id
File size: 34.7 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3144 2007-09-23 02:19:59Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include "transmission.h"
20#include "clients.h"
21#include "completion.h"
22#include "handshake.h"
23#include "net.h"
24#include "peer-io.h"
25#include "peer-mgr.h"
26#include "peer-mgr-private.h"
27#include "peer-msgs.h"
28#include "ptrarray.h"
29#include "trevent.h"
30#include "utils.h"
31
32enum
33{
34    /* how frequently to change which peers are choked */
35    RECHOKE_PERIOD_MSEC = (15 * 1000),
36
37    /* how frequently to decide which peers live and die */
38    RECONNECT_PERIOD_MSEC = (15 * 1000),
39
40    /* how frequently to refill peers' request lists */
41    REFILL_PERIOD_MSEC = 1000,
42
43    /* how many peers to unchoke per-torrent. */
44    /* FIXME: make this user-configurable? */
45    NUM_UNCHOKED_PEERS_PER_TORRENT = 8,
46
47    /* don't change a peer's choke status more often than this */
48    MIN_CHOKE_PERIOD_SEC = 10,
49
50    /* how soon is `soon' in the rechokeSoon, reconnecSoon funcs */
51    SOON_MSEC = 1000,
52
53    /* following the BT spec, we consider ourselves `snubbed' if
54     * we're we don't get piece data from a peer in this long */
55    SNUBBED_SEC = 60,
56
57    /* if our connection count for a torrent is <= N% of what we wanted,
58     * start relaxing the rules that decide when to disconnect a peer */
59    RELAX_RULES_PERCENTAGE = 25,
60
61    /* if we're not relaxing the rules, disconnect a peer that hasn't
62     * given us anything (or taken, if we're seeding) in this long */
63    MIN_TRANSFER_IDLE = 90000,
64
65    /* even if we're relaxing the rules, disconnect a peer that hasn't
66     * given us anything (or taken, if we're seeding) in this long */
67    MAX_TRANSFER_IDLE = 240000,
68
69    /* this is arbitrary and, hopefully, temporary until we come up
70     * with a better idea for managing the connection limits */
71    MAX_CONNECTED_PEERS_PER_TORRENT = 60,
72
73    /* if we hang up on a peer for being worthless, don't try to
74     * reconnect to it for this long. */
75    MIN_HANGUP_PERIOD_SEC = 120
76};
77
78/**
79***
80**/
81
82typedef struct
83{
84    uint8_t hash[SHA_DIGEST_LENGTH];
85    tr_ptrArray * peers; /* tr_peer */
86    tr_timer * reconnectTimer;
87    tr_timer * reconnectSoonTimer;
88    tr_timer * rechokeTimer;
89    tr_timer * rechokeSoonTimer;
90    tr_timer * refillTimer;
91    tr_torrent * tor;
92    tr_bitfield * requested;
93
94    unsigned int isRunning : 1;
95
96    struct tr_peerMgr * manager;
97}
98Torrent;
99
100struct tr_peerMgr
101{
102    tr_handle * handle;
103    tr_ptrArray * torrents; /* Torrent */
104    int connectionCount;
105    tr_ptrArray * handshakes; /* in-process */
106};
107
108/**
109***
110**/
111
112static int
113handshakeCompareToAddr( const void * va, const void * vb )
114{
115    const tr_handshake * a = va;
116    const struct in_addr * b = vb;
117    return memcmp( tr_handshakeGetAddr( a, NULL ), b, sizeof( struct in_addr ) );
118}
119
120static int
121handshakeCompare( const void * a, const void * b )
122{
123    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
124}
125
126static tr_handshake*
127getExistingHandshake( tr_peerMgr * mgr, const struct in_addr * in_addr )
128{
129    return tr_ptrArrayFindSorted( mgr->handshakes,
130                                  in_addr,
131                                  handshakeCompareToAddr );
132}
133
134/**
135***
136**/
137
138static int
139torrentCompare( const void * va, const void * vb )
140{
141    const Torrent * a = va;
142    const Torrent * b = vb;
143    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
144}
145
146static int
147torrentCompareToHash( const void * va, const void * vb )
148{
149    const Torrent * a = (const Torrent*) va;
150    const uint8_t * b_hash = (const uint8_t*) vb;
151    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
152}
153
154static Torrent*
155getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
156{
157    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
158                                             hash,
159                                             torrentCompareToHash );
160}
161
162static int
163peerCompare( const void * va, const void * vb )
164{
165    const tr_peer * a = (const tr_peer *) va;
166    const tr_peer * b = (const tr_peer *) vb;
167    return memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
168}
169
170static int
171peerCompareToAddr( const void * va, const void * vb )
172{
173    const tr_peer * a = (const tr_peer *) va;
174    const struct in_addr * b = (const struct in_addr *) vb;
175    return memcmp( &a->in_addr, b, sizeof(struct in_addr) );
176}
177
178static tr_peer*
179getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
180{
181    assert( torrent != NULL );
182    assert( torrent->peers != NULL );
183    assert( in_addr != NULL );
184
185    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
186                                             in_addr,
187                                             peerCompareToAddr );
188}
189
190static tr_peer*
191getPeer( Torrent * torrent, const struct in_addr * in_addr, int * isNew )
192{
193    tr_peer * peer = getExistingPeer( torrent, in_addr );
194
195    if( isNew )
196        *isNew = peer == NULL;
197
198    if( peer == NULL )
199    {
200        peer = tr_new0( tr_peer, 1 );
201        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
202        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
203    }
204
205    return peer;
206}
207
208static void
209disconnectPeer( tr_peer * peer )
210{
211    assert( peer != NULL );
212
213    tr_peerIoFree( peer->io );
214    peer->io = NULL;
215
216    if( peer->msgs != NULL )
217    {
218        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
219        tr_peerMsgsFree( peer->msgs );
220        peer->msgs = NULL;
221    }
222
223    tr_bitfieldFree( peer->have );
224    peer->have = NULL;
225
226    tr_bitfieldFree( peer->blame );
227    peer->blame = NULL;
228
229    tr_bitfieldFree( peer->banned );
230    peer->banned = NULL;
231}
232
233static void
234freePeer( tr_peer * peer )
235{
236    disconnectPeer( peer );
237    tr_free( peer->client );
238    tr_free( peer );
239}
240
241static void
242freeTorrent( tr_peerMgr * manager, Torrent * t )
243{
244    int i, size;
245    tr_peer ** peers;
246    uint8_t hash[SHA_DIGEST_LENGTH];
247
248    assert( manager != NULL );
249    assert( t != NULL );
250    assert( t->peers != NULL );
251    assert( getExistingTorrent( manager, t->hash ) != NULL );
252
253    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
254
255    tr_timerFree( &t->reconnectTimer );
256    tr_timerFree( &t->reconnectSoonTimer );
257    tr_timerFree( &t->rechokeTimer );
258    tr_timerFree( &t->rechokeSoonTimer );
259    tr_timerFree( &t->refillTimer );
260
261    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
262    for( i=0; i<size; ++i )
263        freePeer( peers[i] );
264
265    tr_bitfieldFree( t->requested );
266    tr_ptrArrayFree( t->peers );
267    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
268    tr_free( t );
269
270    assert( getExistingTorrent( manager, hash ) == NULL );
271}
272
273/**
274***
275**/
276
277tr_peerMgr*
278tr_peerMgrNew( tr_handle * handle )
279{
280    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
281    m->handle = handle;
282    m->torrents = tr_ptrArrayNew( );
283    m->handshakes = tr_ptrArrayNew( );
284    return m;
285}
286
287void
288tr_peerMgrFree( tr_peerMgr * manager )
289{
290    while( !tr_ptrArrayEmpty( manager->handshakes ) )
291        tr_handshakeAbort( (tr_handshake*)tr_ptrArrayNth( manager->handshakes, 0) );
292    tr_ptrArrayFree( manager->handshakes );
293
294    while( !tr_ptrArrayEmpty( manager->torrents ) )
295        freeTorrent( manager, (Torrent*)tr_ptrArrayNth( manager->torrents, 0) );
296    tr_ptrArrayFree( manager->torrents );
297
298    tr_free( manager );
299}
300
301static tr_peer**
302getConnectedPeers( Torrent * t, int * setmeCount )
303{
304    int i, peerCount, connectionCount;
305    tr_peer **peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
306    tr_peer **ret = tr_new( tr_peer*, peerCount );
307
308    for( i=connectionCount=0; i<peerCount; ++i )
309        if( peers[i]->msgs != NULL )
310            ret[connectionCount++] = peers[i];
311
312    *setmeCount = connectionCount;
313    return ret;
314}
315
316/***
317****  Refill
318***/
319
320struct tr_refill_piece
321{
322    tr_priority_t priority;
323    uint32_t piece;
324    uint32_t peerCount;
325};
326
327static int
328compareRefillPiece (const void * aIn, const void * bIn)
329{
330    const struct tr_refill_piece * a = aIn;
331    const struct tr_refill_piece * b = bIn;
332
333    /* if one piece has a higher priority, it goes first */
334    if (a->priority != b->priority)
335        return a->priority > b->priority ? -1 : 1;
336
337    /* otherwise if one has fewer peers, it goes first */
338    if (a->peerCount != b->peerCount)
339        return a->peerCount < b->peerCount ? -1 : 1;
340
341    /* otherwise go with the earlier piece */
342    return a->piece - b->piece;
343}
344
345static int
346isPieceInteresting( const tr_torrent  * tor,
347                    int                 piece )
348{
349    if( tor->info.pieces[piece].dnd ) /* we don't want it */
350        return 0;
351
352    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
353        return 0;
354
355    return 1;
356}
357
358static uint32_t*
359getPreferredPieces( Torrent     * t,
360                    uint32_t    * pieceCount )
361{
362    const tr_torrent * tor = t->tor;
363    const tr_info * inf = &tor->info;
364
365    int i;
366    uint32_t poolSize = 0;
367    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
368
369    int peerCount;
370    tr_peer** peers = getConnectedPeers( t, &peerCount );
371
372    for( i=0; i<inf->pieceCount; ++i )
373        if( isPieceInteresting( tor, i ) )
374            pool[poolSize++] = i;
375
376    /* sort the pool from most interesting to least... */
377    if( poolSize > 1 )
378    {
379        uint32_t j;
380        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
381
382        for( j=0; j<poolSize; ++j )
383        {
384            int k;
385            const int piece = pool[j];
386            struct tr_refill_piece * setme = p + j;
387
388            setme->piece = piece;
389            setme->priority = inf->pieces[piece].priority;
390            setme->peerCount = 0;
391
392            for( k=0; k<peerCount; ++k ) {
393                const tr_peer * peer = peers[k];
394                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
395                    ++setme->peerCount;
396            }
397        }
398
399        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
400
401        for( j=0; j<poolSize; ++j )
402            pool[j] = p[j].piece;
403
404        tr_free( p );
405    }
406
407#if 0
408fprintf (stderr, "new pool: ");
409for (i=0; i<15 && i<(int)poolSize; ++i ) fprintf (stderr, "%d, ", (int)pool[i] );
410fprintf (stderr, "\n");
411#endif
412    tr_free( peers );
413
414    *pieceCount = poolSize;
415    return pool;
416}
417
418static uint64_t*
419getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
420{
421    uint32_t i;
422    uint32_t pieceCount;
423    uint32_t * pieces;
424    uint64_t *req, *unreq, *ret, *walk;
425    int reqCount, unreqCount;
426    const tr_torrent * tor = t->tor;
427
428    pieces = getPreferredPieces( t, &pieceCount );
429/*fprintf( stderr, "REFILL refillPulse for {%s} got %d of %d pieces\n", tor->info.name, (int)pieceCount, t->tor->info.pieceCount );*/
430
431    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
432    reqCount = 0;
433    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
434    unreqCount = 0;
435
436    for( i=0; i<pieceCount; ++i ) {
437        const uint32_t index = pieces[i];
438        const int begin = tr_torPieceFirstBlock( tor, index );
439        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
440        int block;
441        for( block=begin; block<end; ++block )
442            if( tr_cpBlockIsComplete( tor->completion, block ) )
443                continue;
444            else if( tr_bitfieldHas( t->requested, block ) )
445                req[reqCount++] = block;
446            else
447                unreq[unreqCount++] = block;
448    }
449
450/*fprintf( stderr, "REFILL refillPulse for {%s} reqCount is %d, unreqCount is %d\n", tor->info.name, (int)reqCount, (int)unreqCount );*/
451    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
452    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
453    walk += unreqCount;
454    memcpy( walk, req, sizeof(uint64_t) * reqCount );
455    walk += reqCount;
456    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
457    *setmeCount = walk - ret;
458
459    tr_free( req );
460    tr_free( unreq );
461    tr_free( pieces );
462
463    return ret;
464}
465
466static int
467refillPulse( void * vtorrent )
468{
469    Torrent * t = vtorrent;
470    tr_torrent * tor = t->tor;
471    uint32_t i;
472    int peerCount;
473    tr_peer ** peers;
474    uint64_t blockCount;
475    uint64_t * blocks;
476
477    if( !t->isRunning )
478        return TRUE;
479    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
480        return TRUE;
481
482    blocks = getPreferredBlocks( t, &blockCount );
483    peers = getConnectedPeers( t, &peerCount );
484
485/*fprintf( stderr, "REFILL refillPulse for {%s} got %d blocks\n", tor->info.name, (int)blockCount );*/
486
487    for( i=0; peerCount && i<blockCount; ++i )
488    {
489        const int block = blocks[i];
490        const uint32_t index = tr_torBlockPiece( tor, block );
491        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
492        const uint32_t length = tr_torBlockCountBytes( tor, block );
493        int j;
494        assert( _tr_block( tor, index, begin ) == block );
495        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
496        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
497
498
499        /* find a peer who can ask for this block */
500        for( j=0; j<peerCount; )
501        {
502            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
503            switch( val )
504            {
505                case TR_ADDREQ_FULL: 
506                case TR_ADDREQ_CLIENT_CHOKED:
507                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
508                    break;
509
510                case TR_ADDREQ_MISSING: 
511                    ++j;
512                    break;
513
514                case TR_ADDREQ_OK:
515                    /*fprintf( stderr, "REFILL peer %p took the request for block %d\n", peers[j]->msgs, block );*/
516                    tr_bitfieldAdd( t->requested, block );
517                    j = peerCount;
518                    break;
519
520                default:
521                    assert( 0 && "unhandled value" );
522                    break;
523            }
524        }
525    }
526
527    /* cleanup */
528    tr_free( peers );
529    tr_free( blocks );
530
531    t->refillTimer = NULL;
532    return FALSE;
533}
534
535static void
536broadcastClientHave( Torrent * t, uint32_t index )
537{
538    int i, size;
539    tr_peer ** peers = getConnectedPeers( t, &size );
540    for( i=0; i<size; ++i )
541        tr_peerMsgsHave( peers[i]->msgs, index );
542    tr_free( peers );
543}
544
545static void
546broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
547{
548    int i, size;
549    tr_peer ** peers = getConnectedPeers( t, &size );
550    for( i=0; i<size; ++i )
551        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
552    tr_free( peers );
553}
554
555/**
556***
557**/
558
559static int reconnectPulse( void * vtorrent );
560
561static void
562reconnectNow( Torrent * t )
563{
564    reconnectPulse( t );
565    tr_timerFree( &t->reconnectTimer );
566    t->reconnectTimer = tr_timerNew( t->manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC );
567}
568
569static int
570reconnectSoonCB( void * vt )
571{
572    Torrent * t = vt;
573    reconnectNow( t );
574    t->reconnectSoonTimer = NULL;
575    return FALSE;
576}
577
578static void
579reconnectSoon( Torrent * t )
580{
581    if( t->reconnectSoonTimer == NULL )
582        t->reconnectSoonTimer = tr_timerNew( t->manager->handle,
583                                             reconnectSoonCB, t, SOON_MSEC );
584}
585
586/**
587***
588**/
589
590static int rechokePulse( void * vtorrent );
591
592static void
593rechokeNow( Torrent * t )
594{
595    rechokePulse( t );
596    tr_timerFree( &t->rechokeTimer );
597    t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
598}
599
600static int
601rechokeSoonCB( void * vt )
602{
603    Torrent * t = vt;
604    rechokeNow( t );
605    t->rechokeSoonTimer = NULL;
606    return FALSE;
607}
608
609static void
610rechokeSoon( Torrent * t )
611{
612    if( t->rechokeSoonTimer == NULL )
613        t->rechokeSoonTimer = tr_timerNew( t->manager->handle,
614                                           rechokeSoonCB, t, SOON_MSEC );
615}
616
617static void
618msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
619{
620    tr_peer * peer = vpeer;
621    Torrent * t = (Torrent *) vt;
622    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
623
624    switch( e->eventType )
625    {
626        case TR_PEERMSG_NEED_REQ:
627            if( t->refillTimer == NULL )
628                t->refillTimer = tr_timerNew( t->manager->handle,
629                                              refillPulse, t,
630                                              REFILL_PERIOD_MSEC );
631            break;
632
633        case TR_PEERMSG_CLIENT_HAVE:
634            broadcastClientHave( t, e->pieceIndex );
635            break;
636
637        case TR_PEERMSG_PEER_PROGRESS: { /* if we're both seeds, then disconnect. */
638            const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
639            const int peerIsSeed = e->progress >= 1.0;
640            if( clientIsSeed && peerIsSeed )
641                peer->doDisconnect = 1;
642            break;
643        }
644
645        case TR_PEERMSG_CLIENT_BLOCK:
646            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
647            break;
648
649        case TR_PEERMSG_GOT_ERROR:
650            peer->doDisconnect = 1;
651            reconnectSoon( t );
652            break;
653
654        default:
655            assert(0);
656    }
657}
658
659static void
660myHandshakeDoneCB( tr_handshake    * handshake,
661                   tr_peerIo       * io,
662                   int               isConnected,
663                   const uint8_t   * peer_id,
664                   int               peerSupportsEncryption,
665                   void            * vmanager )
666{
667    int ok = isConnected;
668    uint16_t port;
669    const struct in_addr * in_addr;
670    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
671    const uint8_t * hash = NULL;
672    Torrent * t;
673    tr_handshake * ours;
674
675    assert( io != NULL );
676    assert( isConnected==0 || isConnected==1 );
677    assert( peerSupportsEncryption==0 || peerSupportsEncryption==1 );
678
679    ours = tr_ptrArrayRemoveSorted( manager->handshakes,
680                                    handshake,
681                                    handshakeCompare );
682    //assert( ours != NULL );
683    //assert( ours == handshake );
684
685    in_addr = tr_peerIoGetAddress( io, &port );
686
687    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
688    {
689        tr_peerIoFree( io );
690        --manager->connectionCount;
691        return;
692    }
693
694    hash = tr_peerIoGetTorrentHash( io );
695    t = getExistingTorrent( manager, hash );
696    if( !t || !t->isRunning )
697    {
698        tr_peerIoFree( io );
699        --manager->connectionCount;
700        return;
701    }
702
703    fprintf( stderr, "peer-mgr: torrent [%s] finished a handshake. Connected? %s.\n", t->tor->info.name, (isConnected?"yes":"no") );
704
705    /* if we couldn't connect or were snubbed,
706     * the peer's probably not worth remembering. */
707    if( !ok ) {
708        tr_peer * peer = getExistingPeer( t, in_addr );
709        tr_peerIoFree( io );
710        --manager->connectionCount;
711        if( peer ) {
712            tr_ptrArrayRemoveSorted( t->peers, peer, peerCompare );
713            freePeer( peer );
714        }
715        return;
716    }
717
718    if( 1 ) {
719        tr_peer * peer = getPeer( t, in_addr, NULL );
720        if( peer->msgs != NULL ) { /* we already have this peer */
721            tr_peerIoFree( io );
722            --manager->connectionCount;
723        } else {
724            peer->port = port;
725            peer->io = io;
726            peer->msgs = tr_peerMsgsNew( t->tor, peer );
727            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
728            peer->peerSupportsEncryption = peerSupportsEncryption ? 1 : 0;
729            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
730            peer->connectionChangedAt = time( NULL );
731            rechokeSoon( t );
732        }
733    }
734}
735
736static void
737initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
738{
739    tr_handshake * handshake = tr_handshakeNew( io,
740                                                manager->handle->encryptionMode,
741                                                myHandshakeDoneCB,
742                                                manager );
743    ++manager->connectionCount;
744
745    tr_ptrArrayInsertSorted( manager->handshakes, handshake, handshakeCompare );
746}
747
748void
749tr_peerMgrAddIncoming( tr_peerMgr      * manager,
750                       struct in_addr  * addr,
751                       int               socket )
752{
753    if( getExistingHandshake( manager, addr ) == NULL )
754    {
755        tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, socket );
756        initiateHandshake( manager, io );
757    }
758}
759
760void
761tr_peerMgrAddPex( tr_peerMgr     * manager,
762                  const uint8_t  * torrentHash,
763                  int              from,
764                  const tr_pex   * pex,
765                  int              pexCount )
766{
767    Torrent * t = getExistingTorrent( manager, torrentHash );
768    const tr_pex * end = pex + pexCount;
769    while( pex != end )
770    {
771        int isNew;
772        tr_peer * peer = getPeer( t, &pex->in_addr, &isNew );
773        if( isNew ) {
774            peer->port = pex->port;
775            peer->from = from;
776        }
777        ++pex;
778    }
779    reconnectSoon( t );
780}
781
782void
783tr_peerMgrAddPeers( tr_peerMgr    * manager,
784                    const uint8_t * torrentHash,
785                    int             from,
786                    const uint8_t * peerCompact,
787                    int             peerCount )
788{
789    int i;
790    const uint8_t * walk = peerCompact;
791    Torrent * t = getExistingTorrent( manager, torrentHash );
792    for( i=0; t!=NULL && i<peerCount; ++i )
793    {
794        int isNew;
795        tr_peer * peer;
796        struct in_addr addr;
797        uint16_t port;
798        memcpy( &addr, walk, 4 ); walk += 4;
799        memcpy( &port, walk, 2 ); walk += 2;
800        peer = getPeer( t, &addr, &isNew );
801        if( isNew ) {
802            peer->port = port;
803            peer->from = from;
804        }
805    }
806    reconnectSoon( t );
807}
808
809/**
810***
811**/
812
813int
814tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager UNUSED )
815{
816    return TRUE; /* manager->connectionCount < MAX_CONNECTED_PEERS; */
817}
818
819void
820tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
821                    const uint8_t  * torrentHash UNUSED,
822                    int              pieceIndex UNUSED,
823                    int              success UNUSED )
824{
825    fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );
826}
827
828int
829tr_pexCompare( const void * va, const void * vb )
830{
831    const tr_pex * a = (const tr_pex *) va;
832    const tr_pex * b = (const tr_pex *) vb;
833    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
834    if( i ) return i;
835    if( a->port < b->port ) return -1;
836    if( a->port > b->port ) return 1;
837    return 0;
838}
839
840int tr_pexCompare( const void * a, const void * b );
841
842
843int
844tr_peerMgrGetPeers( tr_peerMgr      * manager,
845                    const uint8_t   * torrentHash,
846                    tr_pex         ** setme_pex )
847{
848    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
849    int i, peerCount;
850    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
851    tr_pex * pex = tr_new( tr_pex, peerCount );
852    tr_pex * walk = pex;
853
854    for( i=0; i<peerCount; ++i, ++walk )
855    {
856        const tr_peer * peer = peers[i];
857
858        walk->in_addr = peer->in_addr;
859
860        walk->port = peer->port;
861
862        walk->flags = 0;
863        if( peer->peerSupportsEncryption ) walk->flags |= 1;
864        if( peer->progress >= 1.0 )        walk->flags |= 2;
865    }
866
867    assert( ( walk - pex ) == peerCount );
868    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
869    *setme_pex = pex;
870    return peerCount;
871}
872
873void
874tr_peerMgrStartTorrent( tr_peerMgr     * manager,
875                        const uint8_t  * torrentHash )
876{
877    Torrent * t = getExistingTorrent( manager, torrentHash );
878    t->isRunning = 1;
879    reconnectNow( t );
880}
881
882void
883tr_peerMgrStopTorrent( tr_peerMgr     * manager,
884                       const uint8_t  * torrentHash)
885{
886    Torrent * t = getExistingTorrent( manager, torrentHash );
887    t->isRunning = 0;
888    reconnectNow( t );
889}
890
891void
892tr_peerMgrAddTorrent( tr_peerMgr * manager,
893                      tr_torrent * tor )
894{
895    Torrent * t;
896
897    assert( tor != NULL );
898    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
899
900    t = tr_new0( Torrent, 1 );
901    t->manager = manager;
902    t->tor = tor;
903    t->peers = tr_ptrArrayNew( );
904    t->requested = tr_bitfieldNew( tor->blockCount );
905    t->rechokeTimer = tr_timerNew( manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
906    t->reconnectTimer = tr_timerNew( manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC );
907
908    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
909    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
910}
911
912void
913tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
914                         const uint8_t  * torrentHash )
915{
916    Torrent * t = getExistingTorrent( manager, torrentHash );
917    assert( t != NULL );
918    tr_peerMgrStopTorrent( manager, torrentHash );
919    freeTorrent( manager, t );
920}
921
922void
923tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
924                               const uint8_t    * torrentHash,
925                               int8_t           * tab,
926                               int                tabCount )
927{
928    int i;
929    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
930    const tr_torrent * tor = t->tor;
931    const float interval = tor->info.pieceCount / (float)tabCount;
932
933    memset( tab, 0, tabCount );
934
935    for( i=0; i<tabCount; ++i )
936    {
937        const int piece = i * interval;
938
939        if( tor == NULL )
940            tab[i] = 0;
941        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
942            tab[i] = -1;
943        else {
944            int j, peerCount;
945            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
946            for( j=0; j<peerCount; ++j )
947                if( tr_bitfieldHas( peers[j]->have, i ) )
948                    ++tab[i];
949        }
950    }
951}
952
953
954void
955tr_peerMgrTorrentStats( const tr_peerMgr * manager,
956                        const uint8_t    * torrentHash,
957                        int              * setmePeersTotal,
958                        int              * setmePeersConnected,
959                        int              * setmePeersSendingToUs,
960                        int              * setmePeersGettingFromUs,
961                        int              * setmePeersFrom )
962{
963    int i, size;
964    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
965    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
966
967    *setmePeersTotal          = size;
968    *setmePeersConnected      = 0;
969    *setmePeersSendingToUs    = 0;
970    *setmePeersGettingFromUs  = 0;
971
972    for( i=0; i<TR_PEER_FROM__MAX; ++i )
973        setmePeersFrom[i] = 0;
974
975    for( i=0; i<size; ++i )
976    {
977        const tr_peer * peer = peers[i];
978
979        if( peer->io == NULL ) /* not connected */
980            continue;
981
982        ++*setmePeersConnected;
983
984        ++setmePeersFrom[peer->from];
985
986        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
987            ++*setmePeersGettingFromUs;
988
989        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
990            ++*setmePeersSendingToUs;
991    }
992}
993
994struct tr_peer_stat *
995tr_peerMgrPeerStats( const tr_peerMgr  * manager,
996                     const uint8_t     * torrentHash,
997                     int               * setmeCount UNUSED )
998{
999    int i, size;
1000    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1001    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1002    tr_peer_stat * ret;
1003
1004    ret = tr_new0( tr_peer_stat, size );
1005
1006    for( i=0; i<size; ++i )
1007    {
1008        const tr_peer * peer = peers[i];
1009        const int live = peer->io != NULL;
1010        tr_peer_stat * stat = ret + i;
1011
1012        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1013        stat->port             = peer->port;
1014        stat->from             = peer->from;
1015        stat->client           = peer->client;
1016        stat->progress         = peer->progress;
1017        stat->isConnected      = live ? 1 : 0;
1018        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1019        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
1020        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
1021        stat->isDownloading    = stat->uploadToRate > 0.01;
1022        stat->isUploading      = stat->downloadFromRate > 0.01;
1023    }
1024
1025    *setmeCount = size;
1026    return ret;
1027}
1028
1029/**
1030***
1031**/
1032
1033typedef struct
1034{
1035    tr_peer * peer;
1036    float rate;
1037    int randomKey;
1038    int preferred;
1039    int doUnchoke;
1040}
1041ChokeData;
1042
1043static int
1044compareChoke( const void * va, const void * vb )
1045{
1046    const ChokeData * a = ( const ChokeData * ) va;
1047    const ChokeData * b = ( const ChokeData * ) vb;
1048
1049    if( a->preferred != b->preferred )
1050        return a->preferred ? -1 : 1;
1051
1052    if( a->preferred )
1053    {
1054        if( a->rate > b->rate ) return -1;
1055        if( a->rate < b->rate ) return 1;
1056        return 0;
1057    }
1058    else
1059    {
1060        return a->randomKey - b->randomKey;
1061    }
1062}
1063
1064static int
1065clientIsSnubbedBy( const tr_peer * peer )
1066{
1067    assert( peer != NULL );
1068
1069    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1070}
1071
1072/**
1073***
1074**/
1075
1076static void
1077rechokeLeech( Torrent * t )
1078{
1079    int i, peerCount, size=0, unchoked=0;
1080    const time_t ignorePeersNewerThan = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1081    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1082    ChokeData * choke = tr_new0( ChokeData, peerCount );
1083
1084    /* sort the peers by preference and rate */
1085    for( i=0; i<peerCount; ++i )
1086    {
1087        tr_peer * peer = peers[i];
1088        ChokeData * node;
1089        if( peer->chokeChangedAt > ignorePeersNewerThan )
1090            continue;
1091
1092        node = &choke[size++];
1093        node->peer = peer;
1094        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1095        node->randomKey = tr_rand( INT_MAX );
1096        node->rate = tr_peerIoGetRateToClient( peer->io );
1097    }
1098
1099    qsort( choke, size, sizeof(ChokeData), compareChoke );
1100
1101    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1102        choke[i].doUnchoke = 1;
1103        ++unchoked;
1104    }
1105
1106    for( ; i<size; ++i ) {
1107        choke[i].doUnchoke = 1;
1108        ++unchoked;
1109        if( choke[i].peer->peerIsInterested )
1110            break;
1111    }
1112
1113    for( i=0; i<size; ++i )
1114        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1115
1116    /* cleanup */
1117    tr_free( choke );
1118    tr_free( peers );
1119}
1120
1121static void
1122rechokeSeed( Torrent * t )
1123{
1124    int i, size;
1125    tr_peer ** peers = getConnectedPeers( t, &size );
1126
1127    /* FIXME */
1128    for( i=0; i<size; ++i )
1129        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1130
1131    tr_free( peers );
1132}
1133
1134static int
1135rechokePulse( void * vtorrent )
1136{
1137    Torrent * t = vtorrent;
1138    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1139    if( done )
1140        rechokeLeech( vtorrent );
1141    else
1142        rechokeSeed( vtorrent );
1143    return TRUE;
1144}
1145
1146/**
1147***
1148**/
1149
1150static int
1151shouldPeerBeDisconnected( Torrent * t, tr_peer * peer, int peerCount, int isSeeding )
1152{
1153    const time_t now = time( NULL );
1154    int relaxStrictnessIfFewerThanN;
1155    double strictness;
1156
1157    if( peer->io == NULL ) /* not connected */
1158        return FALSE;
1159
1160    if( !t->isRunning ) /* the torrent is stopped... nobody should be connected */
1161        return TRUE;
1162
1163    if( peer->doDisconnect ) /* someone set a `doDisconnect' flag somewhere */
1164        return TRUE;
1165
1166    /* when deciding whether or not to keep a peer, judge its responsiveness
1167       on a sliding scale that's based on how many other peers are available */
1168    relaxStrictnessIfFewerThanN =
1169        (int)(((TR_MAX_PEER_COUNT * RELAX_RULES_PERCENTAGE) / 100.0) + 0.5);
1170
1171    /* if we have >= relaxIfFewerThan, strictness is 100%.
1172       if we have zero connections, strictness is 0% */
1173    if( peerCount >= relaxStrictnessIfFewerThanN )
1174        strictness = 1.0;
1175    else
1176        strictness = peerCount / (double)relaxStrictnessIfFewerThanN;
1177
1178    /* test: has it been too long since we exchanged piece data? */
1179    if( ( now - peer->connectionChangedAt ) >= MAX_TRANSFER_IDLE ) {
1180        const uint64_t lo = MIN_TRANSFER_IDLE;
1181        const uint64_t hi = MAX_TRANSFER_IDLE;
1182        const uint64_t limit = lo + ((hi-lo) * strictness);
1183        const uint64_t interval = now - (isSeeding ? peer->clientSentPieceDataAt : peer->peerSentPieceDataAt);
1184        if( interval > limit )
1185            return TRUE;
1186    }
1187
1188    /* FIXME: SWE had other tests too... */
1189
1190    return FALSE;
1191}
1192
1193static int
1194comparePeerByConnectionDate( const void * va, const void * vb )
1195{
1196    const tr_peer * a = *(const tr_peer**) va;
1197    const tr_peer * b = *(const tr_peer**) vb;
1198    return tr_compareUint64( a->connectionChangedAt, b->connectionChangedAt );
1199}
1200
1201static int
1202reconnectPulse( void * vt UNUSED )
1203{
1204    int i, size, liveCount;
1205    Torrent * t = vt;
1206    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &size );
1207    const int isSeeding = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1208
1209    /* how many connections do we have? */
1210    for( i=liveCount=0; i<size; ++i )
1211        if( peers[i]->msgs != NULL )
1212            ++liveCount;
1213
1214    /* disconnect from some peers */
1215    for( i=0; i<size; ++i ) {
1216        tr_peer * peer = peers[i];
1217        if( shouldPeerBeDisconnected( t, peer, liveCount, isSeeding ) ) {
1218            disconnectPeer( peer );
1219            --liveCount;
1220        }
1221    }
1222 
1223    /* maybe connect to some new peers */ 
1224    if( t->isRunning && (liveCount<MAX_CONNECTED_PEERS_PER_TORRENT) )
1225    {
1226        int poolSize;
1227        int left = MAX_CONNECTED_PEERS_PER_TORRENT - liveCount;
1228        tr_peer ** pool;
1229        tr_peerMgr * manager = t->manager;
1230        const time_t now = time( NULL );
1231
1232        /* make a list of peers we know about but aren't connected to */
1233        poolSize = 0;
1234        pool = tr_new0( tr_peer*, size );
1235        for( i=0; i<size; ++i ) {
1236            tr_peer * peer = peers[i];
1237            if( peer->msgs == NULL )
1238                pool[poolSize++] = peer;
1239        }
1240
1241        /* sort them s.t. the ones we've already tried are at the last of the list */
1242        qsort( pool, poolSize, sizeof(tr_peer*), comparePeerByConnectionDate );
1243
1244        /* make some connections */
1245        for( i=0; i<poolSize && left>0; ++i )
1246        {
1247            tr_peer * peer = pool[i];
1248            tr_peerIo * io;
1249
1250            if( ( now - peer->connectionChangedAt ) < MIN_HANGUP_PERIOD_SEC )
1251                break;
1252
1253            /* already have a handshake pending */
1254            if( getExistingHandshake( manager, &peer->in_addr ) != NULL )
1255                continue;
1256
1257            /* initiate a connection to the peer */
1258            io = tr_peerIoNewOutgoing( manager->handle, &peer->in_addr, peer->port, t->hash );
1259            fprintf( stderr, "[%s] connecting to potential peer %s\n", t->tor->info.name, tr_peerIoGetAddrStr(io) );
1260            peer->connectionChangedAt = time( NULL );
1261            initiateHandshake( manager, io );
1262            --left;
1263        }
1264
1265        tr_free( pool );
1266    }
1267
1268    return TRUE;
1269}
Note: See TracBrowser for help on using the repository browser.