source: trunk/libtransmission/peer-mgr.c @ 3184

Last change on this file since 3184 was 3184, checked in by charles, 15 years ago

peer i/o cleanup

  • Property svn:keywords set to Date Rev Author Id
File size: 35.2 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3184 2007-09-26 14:42:03Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include "transmission.h"
20#include "clients.h"
21#include "completion.h"
22#include "handshake.h"
23#include "net.h"
24#include "peer-io.h"
25#include "peer-mgr.h"
26#include "peer-mgr-private.h"
27#include "peer-msgs.h"
28#include "ptrarray.h"
29#include "trevent.h"
30#include "utils.h"
31
32enum
33{
34    /* how frequently to change which peers are choked */
35    RECHOKE_PERIOD_MSEC = (15 * 1000),
36
37    /* how frequently to decide which peers live and die */
38    RECONNECT_PERIOD_MSEC = (15 * 1000),
39
40    /* how frequently to refill peers' request lists */
41    REFILL_PERIOD_MSEC = 1000,
42
43    /* how many peers to unchoke per-torrent. */
44    /* FIXME: make this user-configurable? */
45    NUM_UNCHOKED_PEERS_PER_TORRENT = 8,
46
47    /* don't change a peer's choke status more often than this */
48    MIN_CHOKE_PERIOD_SEC = 10,
49
50    /* how soon is `soon' in the rechokeSoon, reconnecSoon funcs */
51    SOON_MSEC = 1000,
52
53    /* following the BT spec, we consider ourselves `snubbed' if
54     * we're we don't get piece data from a peer in this long */
55    SNUBBED_SEC = 60,
56
57    /* if our connection count for a torrent is <= N% of what we wanted,
58     * start relaxing the rules that decide when to disconnect a peer */
59    RELAX_RULES_PERCENTAGE = 25,
60
61    /* if we're not relaxing the rules, disconnect a peer that hasn't
62     * given us anything (or taken, if we're seeding) in this long */
63    MIN_TRANSFER_IDLE = 90000,
64
65    /* even if we're relaxing the rules, disconnect a peer that hasn't
66     * given us anything (or taken, if we're seeding) in this long */
67    MAX_TRANSFER_IDLE = 240000,
68
69    /* this is arbitrary and, hopefully, temporary until we come up
70     * with a better idea for managing the connection limits */
71    MAX_CONNECTED_PEERS_PER_TORRENT = 60,
72
73    /* if we hang up on a peer for being worthless, don't try to
74     * reconnect to it for this long. */
75    MIN_HANGUP_PERIOD_SEC = 120
76};
77
78/**
79***
80**/
81
82typedef struct
83{
84    uint8_t hash[SHA_DIGEST_LENGTH];
85    tr_ptrArray * peers; /* tr_peer */
86    tr_timer * reconnectTimer;
87    tr_timer * reconnectSoonTimer;
88    tr_timer * rechokeTimer;
89    tr_timer * rechokeSoonTimer;
90    tr_timer * refillTimer;
91    tr_torrent * tor;
92    tr_bitfield * requested;
93
94    unsigned int isRunning : 1;
95
96    struct tr_peerMgr * manager;
97}
98Torrent;
99
100struct tr_peerMgr
101{
102    tr_handle * handle;
103    tr_ptrArray * torrents; /* Torrent */
104    int connectionCount;
105    tr_ptrArray * handshakes; /* in-process */
106};
107
108/**
109***
110**/
111
112static int
113handshakeCompareToAddr( const void * va, const void * vb )
114{
115    const tr_handshake * a = va;
116    const struct in_addr * b = vb;
117    return memcmp( tr_handshakeGetAddr( a, NULL ), b, sizeof( struct in_addr ) );
118}
119
120static int
121handshakeCompare( const void * a, const void * b )
122{
123    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
124}
125
126static tr_handshake*
127getExistingHandshake( tr_peerMgr * mgr, const struct in_addr * in_addr )
128{
129    return tr_ptrArrayFindSorted( mgr->handshakes,
130                                  in_addr,
131                                  handshakeCompareToAddr );
132}
133
134/**
135***
136**/
137
138static int
139torrentCompare( const void * va, const void * vb )
140{
141    const Torrent * a = va;
142    const Torrent * b = vb;
143    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
144}
145
146static int
147torrentCompareToHash( const void * va, const void * vb )
148{
149    const Torrent * a = (const Torrent*) va;
150    const uint8_t * b_hash = (const uint8_t*) vb;
151    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
152}
153
154static Torrent*
155getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
156{
157    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
158                                             hash,
159                                             torrentCompareToHash );
160}
161
162static int
163peerCompare( const void * va, const void * vb )
164{
165    const tr_peer * a = (const tr_peer *) va;
166    const tr_peer * b = (const tr_peer *) vb;
167    return memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
168}
169
170static int
171peerCompareToAddr( const void * va, const void * vb )
172{
173    const tr_peer * a = (const tr_peer *) va;
174    const struct in_addr * b = (const struct in_addr *) vb;
175    return memcmp( &a->in_addr, b, sizeof(struct in_addr) );
176}
177
178static tr_peer*
179getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
180{
181    assert( torrent != NULL );
182    assert( torrent->peers != NULL );
183    assert( in_addr != NULL );
184
185    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
186                                             in_addr,
187                                             peerCompareToAddr );
188}
189
190static tr_peer*
191getPeer( Torrent * torrent, const struct in_addr * in_addr, int * isNew )
192{
193    tr_peer * peer = getExistingPeer( torrent, in_addr );
194
195    if( isNew )
196        *isNew = peer == NULL;
197
198    if( peer == NULL )
199    {
200        peer = tr_new0( tr_peer, 1 );
201        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
202        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
203    }
204
205    return peer;
206}
207
208static void
209disconnectPeer( tr_peer * peer )
210{
211    assert( peer != NULL );
212
213    tr_peerIoFree( peer->io );
214    peer->io = NULL;
215
216    if( peer->msgs != NULL )
217    {
218        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
219        tr_peerMsgsFree( peer->msgs );
220        peer->msgs = NULL;
221    }
222
223    tr_bitfieldFree( peer->have );
224    peer->have = NULL;
225
226    tr_bitfieldFree( peer->blame );
227    peer->blame = NULL;
228
229    tr_bitfieldFree( peer->banned );
230    peer->banned = NULL;
231}
232
233static void
234freePeer( tr_peer * peer )
235{
236    disconnectPeer( peer );
237    tr_free( peer->client );
238    tr_free( peer );
239}
240
241static void
242freeTorrent( tr_peerMgr * manager, Torrent * t )
243{
244    int i, size;
245    tr_peer ** peers;
246    uint8_t hash[SHA_DIGEST_LENGTH];
247
248    assert( manager != NULL );
249    assert( t != NULL );
250    assert( t->peers != NULL );
251    assert( getExistingTorrent( manager, t->hash ) != NULL );
252
253    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
254
255    tr_timerFree( &t->reconnectTimer );
256    tr_timerFree( &t->reconnectSoonTimer );
257    tr_timerFree( &t->rechokeTimer );
258    tr_timerFree( &t->rechokeSoonTimer );
259    tr_timerFree( &t->refillTimer );
260
261    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
262    for( i=0; i<size; ++i )
263        freePeer( peers[i] );
264
265    tr_bitfieldFree( t->requested );
266    tr_ptrArrayFree( t->peers );
267    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
268    tr_free( t );
269
270    assert( getExistingTorrent( manager, hash ) == NULL );
271}
272
273/**
274***
275**/
276
277tr_peerMgr*
278tr_peerMgrNew( tr_handle * handle )
279{
280    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
281    m->handle = handle;
282    m->torrents = tr_ptrArrayNew( );
283    m->handshakes = tr_ptrArrayNew( );
284    return m;
285}
286
287void
288tr_peerMgrFree( tr_peerMgr * manager )
289{
290    while( !tr_ptrArrayEmpty( manager->handshakes ) )
291        tr_handshakeAbort( (tr_handshake*)tr_ptrArrayNth( manager->handshakes, 0) );
292    tr_ptrArrayFree( manager->handshakes );
293
294    while( !tr_ptrArrayEmpty( manager->torrents ) )
295        freeTorrent( manager, (Torrent*)tr_ptrArrayNth( manager->torrents, 0) );
296    tr_ptrArrayFree( manager->torrents );
297
298    tr_free( manager );
299}
300
301static tr_peer**
302getConnectedPeers( Torrent * t, int * setmeCount )
303{
304    int i, peerCount, connectionCount;
305    tr_peer **peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
306    tr_peer **ret = tr_new( tr_peer*, peerCount );
307
308    for( i=connectionCount=0; i<peerCount; ++i )
309        if( peers[i]->msgs != NULL )
310            ret[connectionCount++] = peers[i];
311
312    *setmeCount = connectionCount;
313    return ret;
314}
315
316/***
317****  Refill
318***/
319
320struct tr_refill_piece
321{
322    tr_priority_t priority;
323    uint32_t piece;
324    uint32_t peerCount;
325};
326
327static int
328compareRefillPiece (const void * aIn, const void * bIn)
329{
330    const struct tr_refill_piece * a = aIn;
331    const struct tr_refill_piece * b = bIn;
332
333    /* if one piece has a higher priority, it goes first */
334    if (a->priority != b->priority)
335        return a->priority > b->priority ? -1 : 1;
336
337    /* otherwise if one has fewer peers, it goes first */
338    if (a->peerCount != b->peerCount)
339        return a->peerCount < b->peerCount ? -1 : 1;
340
341    /* otherwise go with the earlier piece */
342    return a->piece - b->piece;
343}
344
345static int
346isPieceInteresting( const tr_torrent  * tor,
347                    int                 piece )
348{
349    if( tor->info.pieces[piece].dnd ) /* we don't want it */
350        return 0;
351
352    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
353        return 0;
354
355    return 1;
356}
357
358static uint32_t*
359getPreferredPieces( Torrent     * t,
360                    uint32_t    * pieceCount )
361{
362    const tr_torrent * tor = t->tor;
363    const tr_info * inf = &tor->info;
364
365    int i;
366    uint32_t poolSize = 0;
367    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
368
369    int peerCount;
370    tr_peer** peers = getConnectedPeers( t, &peerCount );
371
372    for( i=0; i<inf->pieceCount; ++i )
373        if( isPieceInteresting( tor, i ) )
374            pool[poolSize++] = i;
375
376    /* sort the pool from most interesting to least... */
377    if( poolSize > 1 )
378    {
379        uint32_t j;
380        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
381
382        for( j=0; j<poolSize; ++j )
383        {
384            int k;
385            const int piece = pool[j];
386            struct tr_refill_piece * setme = p + j;
387
388            setme->piece = piece;
389            setme->priority = inf->pieces[piece].priority;
390            setme->peerCount = 0;
391
392            for( k=0; k<peerCount; ++k ) {
393                const tr_peer * peer = peers[k];
394                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
395                    ++setme->peerCount;
396            }
397        }
398
399        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
400
401        for( j=0; j<poolSize; ++j )
402            pool[j] = p[j].piece;
403
404        tr_free( p );
405    }
406
407#if 0
408fprintf (stderr, "new pool: ");
409for (i=0; i<15 && i<(int)poolSize; ++i ) fprintf (stderr, "%d, ", (int)pool[i] );
410fprintf (stderr, "\n");
411#endif
412    tr_free( peers );
413
414    *pieceCount = poolSize;
415    return pool;
416}
417
418static uint64_t*
419getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
420{
421    uint32_t i;
422    uint32_t pieceCount;
423    uint32_t * pieces;
424    uint64_t *req, *unreq, *ret, *walk;
425    int reqCount, unreqCount;
426    const tr_torrent * tor = t->tor;
427
428    pieces = getPreferredPieces( t, &pieceCount );
429/*fprintf( stderr, "REFILL refillPulse for {%s} got %d of %d pieces\n", tor->info.name, (int)pieceCount, t->tor->info.pieceCount );*/
430
431    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
432    reqCount = 0;
433    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
434    unreqCount = 0;
435
436    for( i=0; i<pieceCount; ++i ) {
437        const uint32_t index = pieces[i];
438        const int begin = tr_torPieceFirstBlock( tor, index );
439        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
440        int block;
441        for( block=begin; block<end; ++block )
442            if( tr_cpBlockIsComplete( tor->completion, block ) )
443                continue;
444            else if( tr_bitfieldHas( t->requested, block ) )
445                req[reqCount++] = block;
446            else
447                unreq[unreqCount++] = block;
448    }
449
450/*fprintf( stderr, "REFILL refillPulse for {%s} reqCount is %d, unreqCount is %d\n", tor->info.name, (int)reqCount, (int)unreqCount );*/
451    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
452    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
453    walk += unreqCount;
454    memcpy( walk, req, sizeof(uint64_t) * reqCount );
455    walk += reqCount;
456    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
457    *setmeCount = walk - ret;
458
459    tr_free( req );
460    tr_free( unreq );
461    tr_free( pieces );
462
463    return ret;
464}
465
466static int
467refillPulse( void * vtorrent )
468{
469    Torrent * t = vtorrent;
470    tr_torrent * tor = t->tor;
471    uint32_t i;
472    int peerCount;
473    tr_peer ** peers;
474    uint64_t blockCount;
475    uint64_t * blocks;
476
477    if( !t->isRunning )
478        return TRUE;
479    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
480        return TRUE;
481
482    blocks = getPreferredBlocks( t, &blockCount );
483    peers = getConnectedPeers( t, &peerCount );
484
485/*fprintf( stderr, "REFILL refillPulse for {%s} got %d blocks\n", tor->info.name, (int)blockCount );*/
486
487    for( i=0; peerCount && i<blockCount; ++i )
488    {
489        const int block = blocks[i];
490        const uint32_t index = tr_torBlockPiece( tor, block );
491        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
492        const uint32_t length = tr_torBlockCountBytes( tor, block );
493        int j;
494        assert( _tr_block( tor, index, begin ) == block );
495        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
496        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
497
498
499        /* find a peer who can ask for this block */
500        for( j=0; j<peerCount; )
501        {
502            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
503            switch( val )
504            {
505                case TR_ADDREQ_FULL: 
506                case TR_ADDREQ_CLIENT_CHOKED:
507                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
508                    break;
509
510                case TR_ADDREQ_MISSING: 
511                    ++j;
512                    break;
513
514                case TR_ADDREQ_OK:
515                    /*fprintf( stderr, "REFILL peer %p took the request for block %d\n", peers[j]->msgs, block );*/
516                    tr_bitfieldAdd( t->requested, block );
517                    j = peerCount;
518                    break;
519
520                default:
521                    assert( 0 && "unhandled value" );
522                    break;
523            }
524        }
525    }
526
527    /* cleanup */
528    tr_free( peers );
529    tr_free( blocks );
530
531    t->refillTimer = NULL;
532    return FALSE;
533}
534
535static void
536broadcastClientHave( Torrent * t, uint32_t index )
537{
538    int i, size;
539    tr_peer ** peers = getConnectedPeers( t, &size );
540    for( i=0; i<size; ++i )
541        tr_peerMsgsHave( peers[i]->msgs, index );
542    tr_free( peers );
543}
544
545static void
546broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
547{
548    int i, size;
549    tr_peer ** peers = getConnectedPeers( t, &size );
550    for( i=0; i<size; ++i )
551        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
552    tr_free( peers );
553}
554
555/**
556***
557**/
558
559static int reconnectPulse( void * vtorrent );
560
561static void
562restartReconnectTimer( Torrent * t )
563{
564    tr_timerFree( &t->reconnectTimer );
565    if( t->isRunning )
566        t->reconnectTimer = tr_timerNew( t->manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC );
567}
568
569static void
570reconnectNow( Torrent * t )
571{
572    reconnectPulse( t );
573    restartReconnectTimer( t );
574}
575
576static int
577reconnectSoonCB( void * vt )
578{
579    Torrent * t = vt;
580    reconnectNow( t );
581    t->reconnectSoonTimer = NULL;
582    return FALSE;
583}
584
585static void
586reconnectSoon( Torrent * t )
587{
588    if( t->reconnectSoonTimer == NULL )
589        t->reconnectSoonTimer = tr_timerNew( t->manager->handle,
590                                             reconnectSoonCB, t, SOON_MSEC );
591}
592
593/**
594***
595**/
596
597static int rechokePulse( void * vtorrent );
598
599static void
600restartChokeTimer( Torrent * t )
601{
602    tr_timerFree( &t->rechokeTimer );
603    if( t->isRunning )
604        t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
605}
606
607static void
608rechokeNow( Torrent * t )
609{
610    rechokePulse( t );
611    restartChokeTimer( t );
612}
613
614static int
615rechokeSoonCB( void * vt )
616{
617    Torrent * t = vt;
618    rechokeNow( t );
619    t->rechokeSoonTimer = NULL;
620    return FALSE;
621}
622
623static void
624rechokeSoon( Torrent * t )
625{
626    if( t->rechokeSoonTimer == NULL )
627        t->rechokeSoonTimer = tr_timerNew( t->manager->handle,
628                                           rechokeSoonCB, t, SOON_MSEC );
629}
630
631static void
632msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
633{
634    tr_peer * peer = vpeer;
635    Torrent * t = (Torrent *) vt;
636    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
637
638    switch( e->eventType )
639    {
640        case TR_PEERMSG_NEED_REQ:
641            if( t->refillTimer == NULL )
642                t->refillTimer = tr_timerNew( t->manager->handle,
643                                              refillPulse, t,
644                                              REFILL_PERIOD_MSEC );
645            break;
646
647        case TR_PEERMSG_CLIENT_HAVE:
648            broadcastClientHave( t, e->pieceIndex );
649            tr_torrentRecheckCompleteness( t->tor );
650            break;
651
652        case TR_PEERMSG_PEER_PROGRESS: { /* if we're both seeds, then disconnect. */
653            const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
654            const int peerIsSeed = e->progress >= 1.0;
655            if( clientIsSeed && peerIsSeed )
656                peer->doPurge = 1;
657            break;
658        }
659
660        case TR_PEERMSG_CLIENT_BLOCK:
661            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
662            break;
663
664        case TR_PEERMSG_GOT_ERROR:
665            peer->doPurge = 1;
666            reconnectSoon( t );
667            break;
668
669        default:
670            assert(0);
671    }
672}
673
674static void
675myHandshakeDoneCB( tr_handshake    * handshake,
676                   tr_peerIo       * io,
677                   int               isConnected,
678                   const uint8_t   * peer_id,
679                   int               peerSupportsEncryption,
680                   void            * vmanager )
681{
682    int ok = isConnected;
683    uint16_t port;
684    const struct in_addr * in_addr;
685    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
686    const uint8_t * hash = NULL;
687    Torrent * t;
688    tr_handshake * ours;
689
690    assert( io != NULL );
691    assert( isConnected==0 || isConnected==1 );
692    assert( peerSupportsEncryption==0 || peerSupportsEncryption==1 );
693
694    ours = tr_ptrArrayRemoveSorted( manager->handshakes,
695                                    handshake,
696                                    handshakeCompare );
697    //assert( ours != NULL );
698    //assert( ours == handshake );
699
700    in_addr = tr_peerIoGetAddress( io, &port );
701
702    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
703    {
704        tr_peerIoFree( io );
705        --manager->connectionCount;
706        return;
707    }
708
709    hash = tr_peerIoGetTorrentHash( io );
710    t = getExistingTorrent( manager, hash );
711    if( !t || !t->isRunning )
712    {
713        tr_peerIoFree( io );
714        --manager->connectionCount;
715        return;
716    }
717
718    /* if we couldn't connect or were snubbed,
719     * the peer's probably not worth remembering. */
720    if( !ok ) {
721        tr_peer * peer = getExistingPeer( t, in_addr );
722        tr_peerIoFree( io );
723        --manager->connectionCount;
724        if( peer )
725            peer->doPurge = 1;
726        return;
727    }
728
729    if( 1 ) {
730        tr_peer * peer = getPeer( t, in_addr, NULL );
731        if( peer->msgs != NULL ) { /* we already have this peer */
732            tr_peerIoFree( io );
733            --manager->connectionCount;
734        } else {
735            peer->port = port;
736            peer->io = io;
737            peer->msgs = tr_peerMsgsNew( t->tor, peer );
738            tr_free( peer->client );
739            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
740            peer->peerSupportsEncryption = peerSupportsEncryption ? 1 : 0;
741            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
742            peer->connectionChangedAt = time( NULL );
743            rechokeSoon( t );
744        }
745    }
746}
747
748static void
749initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
750{
751    tr_handshake * handshake = tr_handshakeNew( io,
752                                                manager->handle->encryptionMode,
753                                                myHandshakeDoneCB,
754                                                manager );
755    ++manager->connectionCount;
756
757    tr_ptrArrayInsertSorted( manager->handshakes, handshake, handshakeCompare );
758}
759
760void
761tr_peerMgrAddIncoming( tr_peerMgr      * manager,
762                       struct in_addr  * addr,
763                       uint16_t          port,
764                       int               socket )
765{
766    if( getExistingHandshake( manager, addr ) == NULL )
767    {
768        tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
769        initiateHandshake( manager, io );
770    }
771}
772
773void
774tr_peerMgrAddPex( tr_peerMgr     * manager,
775                  const uint8_t  * torrentHash,
776                  int              from,
777                  const tr_pex   * pex,
778                  int              pexCount )
779{
780    Torrent * t = getExistingTorrent( manager, torrentHash );
781    const tr_pex * end = pex + pexCount;
782    while( pex != end )
783    {
784        int isNew;
785        tr_peer * peer = getPeer( t, &pex->in_addr, &isNew );
786        if( isNew ) {
787            peer->port = pex->port;
788            peer->from = from;
789        }
790        ++pex;
791    }
792    reconnectSoon( t );
793}
794
795void
796tr_peerMgrAddPeers( tr_peerMgr    * manager,
797                    const uint8_t * torrentHash,
798                    int             from,
799                    const uint8_t * peerCompact,
800                    int             peerCount )
801{
802    int i;
803    const uint8_t * walk = peerCompact;
804    Torrent * t = getExistingTorrent( manager, torrentHash );
805    for( i=0; t!=NULL && i<peerCount; ++i )
806    {
807        int isNew;
808        tr_peer * peer;
809        struct in_addr addr;
810        uint16_t port;
811        memcpy( &addr, walk, 4 ); walk += 4;
812        memcpy( &port, walk, 2 ); walk += 2;
813        peer = getPeer( t, &addr, &isNew );
814        if( isNew ) {
815            peer->port = port;
816            peer->from = from;
817        }
818    }
819    reconnectSoon( t );
820}
821
822/**
823***
824**/
825
826int
827tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager UNUSED )
828{
829    return TRUE; /* manager->connectionCount < MAX_CONNECTED_PEERS; */
830}
831
832void
833tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
834                    const uint8_t  * torrentHash UNUSED,
835                    int              pieceIndex UNUSED,
836                    int              success UNUSED )
837{
838    /*fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );*/
839}
840
841int
842tr_pexCompare( const void * va, const void * vb )
843{
844    const tr_pex * a = (const tr_pex *) va;
845    const tr_pex * b = (const tr_pex *) vb;
846    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
847    if( i ) return i;
848    if( a->port < b->port ) return -1;
849    if( a->port > b->port ) return 1;
850    return 0;
851}
852
853int tr_pexCompare( const void * a, const void * b );
854
855
856int
857tr_peerMgrGetPeers( tr_peerMgr      * manager,
858                    const uint8_t   * torrentHash,
859                    tr_pex         ** setme_pex )
860{
861    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
862    int i, peerCount;
863    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
864    tr_pex * pex = tr_new( tr_pex, peerCount );
865    tr_pex * walk = pex;
866
867    for( i=0; i<peerCount; ++i, ++walk )
868    {
869        const tr_peer * peer = peers[i];
870
871        walk->in_addr = peer->in_addr;
872
873        walk->port = peer->port;
874
875        walk->flags = 0;
876        if( peer->peerSupportsEncryption ) walk->flags |= 1;
877        if( peer->progress >= 1.0 )        walk->flags |= 2;
878    }
879
880    assert( ( walk - pex ) == peerCount );
881    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
882    *setme_pex = pex;
883    return peerCount;
884}
885
886void
887tr_peerMgrStartTorrent( tr_peerMgr     * manager,
888                        const uint8_t  * torrentHash )
889{
890    Torrent * t = getExistingTorrent( manager, torrentHash );
891    t->isRunning = 1;
892    restartChokeTimer( t );
893    reconnectNow( t );
894}
895
896void
897tr_peerMgrStopTorrent( tr_peerMgr     * manager,
898                       const uint8_t  * torrentHash)
899{
900    Torrent * t = getExistingTorrent( manager, torrentHash );
901    t->isRunning = 0;
902    tr_timerFree( &t->rechokeTimer );
903    tr_timerFree( &t->reconnectTimer );
904    reconnectPulse( t );
905}
906
907void
908tr_peerMgrAddTorrent( tr_peerMgr * manager,
909                      tr_torrent * tor )
910{
911    Torrent * t;
912
913    assert( tor != NULL );
914    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
915
916    t = tr_new0( Torrent, 1 );
917    t->manager = manager;
918    t->tor = tor;
919    t->peers = tr_ptrArrayNew( );
920    t->requested = tr_bitfieldNew( tor->blockCount );
921    restartChokeTimer( t );
922    restartReconnectTimer( t );
923
924    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
925    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
926}
927
928void
929tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
930                         const uint8_t  * torrentHash )
931{
932    Torrent * t = getExistingTorrent( manager, torrentHash );
933    assert( t != NULL );
934    tr_peerMgrStopTorrent( manager, torrentHash );
935    freeTorrent( manager, t );
936}
937
938void
939tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
940                               const uint8_t    * torrentHash,
941                               int8_t           * tab,
942                               int                tabCount )
943{
944    int i;
945    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
946    const tr_torrent * tor = t->tor;
947    const float interval = tor->info.pieceCount / (float)tabCount;
948
949    memset( tab, 0, tabCount );
950
951    for( i=0; i<tabCount; ++i )
952    {
953        const int piece = i * interval;
954
955        if( tor == NULL )
956            tab[i] = 0;
957        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
958            tab[i] = -1;
959        else {
960            int j, peerCount;
961            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
962            for( j=0; j<peerCount; ++j )
963                if( tr_bitfieldHas( peers[j]->have, i ) )
964                    ++tab[i];
965        }
966    }
967}
968
969
970void
971tr_peerMgrTorrentStats( const tr_peerMgr * manager,
972                        const uint8_t    * torrentHash,
973                        int              * setmePeersTotal,
974                        int              * setmePeersConnected,
975                        int              * setmePeersSendingToUs,
976                        int              * setmePeersGettingFromUs,
977                        int              * setmePeersFrom )
978{
979    int i, size;
980    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
981    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
982
983    *setmePeersTotal          = size;
984    *setmePeersConnected      = 0;
985    *setmePeersSendingToUs    = 0;
986    *setmePeersGettingFromUs  = 0;
987
988    for( i=0; i<TR_PEER_FROM__MAX; ++i )
989        setmePeersFrom[i] = 0;
990
991    for( i=0; i<size; ++i )
992    {
993        const tr_peer * peer = peers[i];
994
995        if( peer->io == NULL ) /* not connected */
996            continue;
997
998        ++*setmePeersConnected;
999
1000        ++setmePeersFrom[peer->from];
1001
1002        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
1003            ++*setmePeersGettingFromUs;
1004
1005        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
1006            ++*setmePeersSendingToUs;
1007    }
1008}
1009
1010struct tr_peer_stat *
1011tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1012                     const uint8_t     * torrentHash,
1013                     int               * setmeCount UNUSED )
1014{
1015    int i, size;
1016    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1017    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1018    tr_peer_stat * ret;
1019
1020    ret = tr_new0( tr_peer_stat, size );
1021
1022    for( i=0; i<size; ++i )
1023    {
1024        const tr_peer * peer = peers[i];
1025        const int live = peer->io != NULL;
1026        tr_peer_stat * stat = ret + i;
1027
1028        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1029        stat->port             = peer->port;
1030        stat->from             = peer->from;
1031        stat->client           = peer->client;
1032        stat->progress         = peer->progress;
1033        stat->isConnected      = live ? 1 : 0;
1034        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1035        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
1036        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
1037        stat->isDownloading    = stat->uploadToRate > 0.01;
1038        stat->isUploading      = stat->downloadFromRate > 0.01;
1039    }
1040
1041    *setmeCount = size;
1042    return ret;
1043}
1044
1045/**
1046***
1047**/
1048
1049typedef struct
1050{
1051    tr_peer * peer;
1052    float rate;
1053    int randomKey;
1054    int preferred;
1055    int doUnchoke;
1056}
1057ChokeData;
1058
1059static int
1060compareChoke( const void * va, const void * vb )
1061{
1062    const ChokeData * a = ( const ChokeData * ) va;
1063    const ChokeData * b = ( const ChokeData * ) vb;
1064
1065    if( a->preferred != b->preferred )
1066        return a->preferred ? -1 : 1;
1067
1068    if( a->preferred )
1069    {
1070        if( a->rate > b->rate ) return -1;
1071        if( a->rate < b->rate ) return 1;
1072        return 0;
1073    }
1074    else
1075    {
1076        return a->randomKey - b->randomKey;
1077    }
1078}
1079
1080static int
1081clientIsSnubbedBy( const tr_peer * peer )
1082{
1083    assert( peer != NULL );
1084
1085    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1086}
1087
1088/**
1089***
1090**/
1091
1092static void
1093rechokeLeech( Torrent * t )
1094{
1095    int i, peerCount, size=0, unchoked=0;
1096    const time_t ignorePeersNewerThan = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1097    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1098    ChokeData * choke = tr_new0( ChokeData, peerCount );
1099
1100    /* sort the peers by preference and rate */
1101    for( i=0; i<peerCount; ++i )
1102    {
1103        tr_peer * peer = peers[i];
1104        ChokeData * node;
1105        if( peer->chokeChangedAt > ignorePeersNewerThan )
1106            continue;
1107
1108        node = &choke[size++];
1109        node->peer = peer;
1110        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1111        node->randomKey = tr_rand( INT_MAX );
1112        node->rate = tr_peerIoGetRateToClient( peer->io );
1113    }
1114
1115    qsort( choke, size, sizeof(ChokeData), compareChoke );
1116
1117    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1118        choke[i].doUnchoke = 1;
1119        ++unchoked;
1120    }
1121
1122    for( ; i<size; ++i ) {
1123        choke[i].doUnchoke = 1;
1124        ++unchoked;
1125        if( choke[i].peer->peerIsInterested )
1126            break;
1127    }
1128
1129    for( i=0; i<size; ++i )
1130        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1131
1132    /* cleanup */
1133    tr_free( choke );
1134    tr_free( peers );
1135}
1136
1137static void
1138rechokeSeed( Torrent * t )
1139{
1140    int i, size;
1141    tr_peer ** peers = getConnectedPeers( t, &size );
1142
1143    /* FIXME */
1144    for( i=0; i<size; ++i )
1145        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1146
1147    tr_free( peers );
1148}
1149
1150static int
1151rechokePulse( void * vtorrent )
1152{
1153    Torrent * t = vtorrent;
1154    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1155    if( done )
1156        rechokeLeech( vtorrent );
1157    else
1158        rechokeSeed( vtorrent );
1159    return TRUE;
1160}
1161
1162/**
1163***
1164**/
1165
1166static int
1167shouldPeerBeDisconnected( Torrent * t, tr_peer * peer, int peerCount, int isSeeding )
1168{
1169    const time_t now = time( NULL );
1170    int relaxStrictnessIfFewerThanN;
1171    double strictness;
1172
1173    if( peer->io == NULL ) /* not connected */
1174        return FALSE;
1175
1176    if( !t->isRunning ) /* the torrent is stopped... nobody should be connected */
1177        return TRUE;
1178
1179    /* not enough peers to go around... might as well keep this one;
1180     * they might unchoke us or give us a pex or something */
1181    if( peerCount < MAX_CONNECTED_PEERS_PER_TORRENT )
1182        return FALSE;
1183
1184    /* when deciding whether or not to keep a peer, judge its responsiveness
1185       on a sliding scale that's based on how many other peers are available */
1186    relaxStrictnessIfFewerThanN =
1187        (int)(((TR_MAX_PEER_COUNT * RELAX_RULES_PERCENTAGE) / 100.0) + 0.5);
1188
1189    /* if we have >= relaxIfFewerThan, strictness is 100%.
1190       if we have zero connections, strictness is 0% */
1191    if( peerCount >= relaxStrictnessIfFewerThanN )
1192        strictness = 1.0;
1193    else
1194        strictness = peerCount / (double)relaxStrictnessIfFewerThanN;
1195
1196    /* test: has it been too long since we exchanged piece data? */
1197    if( ( now - peer->connectionChangedAt ) >= MAX_TRANSFER_IDLE ) {
1198        const uint64_t lo = MIN_TRANSFER_IDLE;
1199        const uint64_t hi = MAX_TRANSFER_IDLE;
1200        const uint64_t limit = lo + ((hi-lo) * strictness);
1201        const uint64_t interval = now - (isSeeding ? peer->clientSentPieceDataAt : peer->peerSentPieceDataAt);
1202        if( interval > limit )
1203            return TRUE;
1204    }
1205
1206    /* FIXME: SWE had other tests too... */
1207
1208    return FALSE;
1209}
1210
1211static int
1212comparePeerByConnectionDate( const void * va, const void * vb )
1213{
1214    const tr_peer * a = *(const tr_peer**) va;
1215    const tr_peer * b = *(const tr_peer**) vb;
1216    return tr_compareUint64( a->connectionChangedAt, b->connectionChangedAt );
1217}
1218
1219static int
1220reconnectPulse( void * vt UNUSED )
1221{
1222    int i, size, liveCount;
1223    Torrent * t = vt;
1224    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &size );
1225    const int isSeeding = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1226
1227    /* how many connections do we have? */
1228    for( i=liveCount=0; i<size; ++i )
1229        if( peers[i]->msgs != NULL )
1230            ++liveCount;
1231
1232    /* destroy and/or disconnect from some peers */
1233    for( i=0; i<size; )
1234    {
1235        tr_peer * peer = peers[i];
1236
1237        if( peer->doPurge ) {
1238            tr_ptrArrayErase( t->peers, i, i+1 );
1239            freePeer( peer );
1240            --size;
1241            --liveCount;
1242            continue;
1243        }
1244
1245        if( shouldPeerBeDisconnected( t, peer, liveCount, isSeeding ) ) {
1246            disconnectPeer( peer );
1247            --liveCount;
1248        }
1249
1250        ++i;
1251    }
1252
1253    /* maybe connect to some new peers */ 
1254    if( t->isRunning && (liveCount<MAX_CONNECTED_PEERS_PER_TORRENT) )
1255    {
1256        int poolSize;
1257        int left = MAX_CONNECTED_PEERS_PER_TORRENT - liveCount;
1258        tr_peer ** pool;
1259        tr_peerMgr * manager = t->manager;
1260        const time_t now = time( NULL );
1261
1262        /* make a list of peers we know about but aren't connected to */
1263        poolSize = 0;
1264        pool = tr_new0( tr_peer*, size );
1265        for( i=0; i<size; ++i ) {
1266            tr_peer * peer = peers[i];
1267            if( peer->msgs == NULL )
1268                pool[poolSize++] = peer;
1269        }
1270
1271        /* sort them s.t. the ones we've already tried are at the last of the list */
1272        qsort( pool, poolSize, sizeof(tr_peer*), comparePeerByConnectionDate );
1273
1274        /* make some connections */
1275        for( i=0; i<poolSize && left>0; ++i )
1276        {
1277            tr_peer * peer = pool[i];
1278            tr_peerIo * io;
1279
1280            if( ( now - peer->connectionChangedAt ) < MIN_HANGUP_PERIOD_SEC )
1281                break;
1282
1283            /* already have a handshake pending */
1284            if( getExistingHandshake( manager, &peer->in_addr ) != NULL )
1285                continue;
1286
1287            /* initiate a connection to the peer */
1288            io = tr_peerIoNewOutgoing( manager->handle, &peer->in_addr, peer->port, t->hash );
1289            /*fprintf( stderr, "[%s] connecting to potential peer %s\n", t->tor->info.name, tr_peerIoGetAddrStr(io) );*/
1290            peer->connectionChangedAt = time( NULL );
1291            initiateHandshake( manager, io );
1292            --left;
1293        }
1294
1295        tr_free( pool );
1296    }
1297
1298    return TRUE;
1299}
Note: See TracBrowser for help on using the repository browser.