source: trunk/libtransmission/peer-mgr.c @ 3225

Last change on this file since 3225 was 3225, checked in by charles, 15 years ago

remove obsolete MAX_PEERS in internal.h

  • Property svn:keywords set to Date Rev Author Id
File size: 35.8 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 3225 2007-09-28 16:00:43Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp */
15#include <stdlib.h> /* qsort */
16#include <stdio.h> /* printf */
17#include <limits.h> /* INT_MAX */
18
19#include "transmission.h"
20#include "clients.h"
21#include "completion.h"
22#include "handshake.h"
23#include "net.h"
24#include "peer-io.h"
25#include "peer-mgr.h"
26#include "peer-mgr-private.h"
27#include "peer-msgs.h"
28#include "ptrarray.h"
29#include "trevent.h"
30#include "utils.h"
31
32enum
33{
34    /* how frequently to change which peers are choked */
35    RECHOKE_PERIOD_MSEC = (15 * 1000),
36
37    /* how frequently to decide which peers live and die */
38    RECONNECT_PERIOD_MSEC = (15 * 1000),
39
40    /* how frequently to refill peers' request lists */
41    REFILL_PERIOD_MSEC = 1000,
42
43    /* how many peers to unchoke per-torrent. */
44    /* FIXME: make this user-configurable? */
45    NUM_UNCHOKED_PEERS_PER_TORRENT = 8,
46
47    /* don't change a peer's choke status more often than this */
48    MIN_CHOKE_PERIOD_SEC = 10,
49
50    /* how soon is `soon' in the rechokeSoon, reconnecSoon funcs */
51    SOON_MSEC = 1000,
52
53    /* following the BT spec, we consider ourselves `snubbed' if
54     * we're we don't get piece data from a peer in this long */
55    SNUBBED_SEC = 60,
56
57    /* if our connection count for a torrent is <= N% of what we wanted,
58     * start relaxing the rules that decide when to disconnect a peer */
59    RELAX_RULES_PERCENTAGE = 25,
60
61    /* if we're not relaxing the rules, disconnect a peer that hasn't
62     * given us anything (or taken, if we're seeding) in this long */
63    MIN_TRANSFER_IDLE = 90000,
64
65    /* even if we're relaxing the rules, disconnect a peer that hasn't
66     * given us anything (or taken, if we're seeding) in this long */
67    MAX_TRANSFER_IDLE = 240000,
68
69    /* this is arbitrary and, hopefully, temporary until we come up
70     * with a better idea for managing the connection limits */
71    MAX_CONNECTED_PEERS_PER_TORRENT = 100,
72
73    /* if we hang up on a peer for being worthless, don't try to
74     * reconnect to it for this long. */
75    MIN_HANGUP_PERIOD_SEC = 120
76};
77
78/**
79***
80**/
81
82typedef struct
83{
84    uint8_t hash[SHA_DIGEST_LENGTH];
85    tr_ptrArray * peers; /* tr_peer */
86    tr_timer * reconnectTimer;
87    tr_timer * reconnectSoonTimer;
88    tr_timer * rechokeTimer;
89    tr_timer * rechokeSoonTimer;
90    tr_timer * refillTimer;
91    tr_torrent * tor;
92    tr_bitfield * requested;
93
94    unsigned int isRunning : 1;
95
96    struct tr_peerMgr * manager;
97}
98Torrent;
99
100struct tr_peerMgr
101{
102    tr_handle * handle;
103    tr_ptrArray * torrents; /* Torrent */
104    int connectionCount;
105    tr_ptrArray * handshakes; /* in-process */
106};
107
108/**
109***
110**/
111
112static int
113handshakeCompareToAddr( const void * va, const void * vb )
114{
115    const tr_handshake * a = va;
116    const struct in_addr * b = vb;
117    return memcmp( tr_handshakeGetAddr( a, NULL ), b, sizeof( struct in_addr ) );
118}
119
120static int
121handshakeCompare( const void * a, const void * b )
122{
123    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
124}
125
126static tr_handshake*
127getExistingHandshake( tr_peerMgr * mgr, const struct in_addr * in_addr )
128{
129    return tr_ptrArrayFindSorted( mgr->handshakes,
130                                  in_addr,
131                                  handshakeCompareToAddr );
132}
133
134/**
135***
136**/
137
138static int
139torrentCompare( const void * va, const void * vb )
140{
141    const Torrent * a = va;
142    const Torrent * b = vb;
143    return memcmp( a->hash, b->hash, SHA_DIGEST_LENGTH );
144}
145
146static int
147torrentCompareToHash( const void * va, const void * vb )
148{
149    const Torrent * a = (const Torrent*) va;
150    const uint8_t * b_hash = (const uint8_t*) vb;
151    return memcmp( a->hash, b_hash, SHA_DIGEST_LENGTH );
152}
153
154static Torrent*
155getExistingTorrent( tr_peerMgr * manager, const uint8_t * hash )
156{
157    return (Torrent*) tr_ptrArrayFindSorted( manager->torrents,
158                                             hash,
159                                             torrentCompareToHash );
160}
161
162static int
163peerCompare( const void * va, const void * vb )
164{
165    const tr_peer * a = (const tr_peer *) va;
166    const tr_peer * b = (const tr_peer *) vb;
167    return memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
168}
169
170static int
171peerCompareToAddr( const void * va, const void * vb )
172{
173    const tr_peer * a = (const tr_peer *) va;
174    const struct in_addr * b = (const struct in_addr *) vb;
175    return memcmp( &a->in_addr, b, sizeof(struct in_addr) );
176}
177
178static tr_peer*
179getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
180{
181    assert( torrent != NULL );
182    assert( torrent->peers != NULL );
183    assert( in_addr != NULL );
184
185    return (tr_peer*) tr_ptrArrayFindSorted( torrent->peers,
186                                             in_addr,
187                                             peerCompareToAddr );
188}
189
190static tr_peer*
191getPeer( Torrent * torrent, const struct in_addr * in_addr, int * isNew )
192{
193    tr_peer * peer = getExistingPeer( torrent, in_addr );
194
195    if( isNew )
196        *isNew = peer == NULL;
197
198    if( peer == NULL )
199    {
200        peer = tr_new0( tr_peer, 1 );
201        memcpy( &peer->in_addr, in_addr, sizeof(struct in_addr) );
202        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
203    }
204
205    return peer;
206}
207
208static void
209disconnectPeer( tr_peer * peer )
210{
211    assert( peer != NULL );
212
213    tr_peerIoFree( peer->io );
214    peer->io = NULL;
215
216    if( peer->msgs != NULL )
217    {
218        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
219        tr_peerMsgsFree( peer->msgs );
220        peer->msgs = NULL;
221    }
222
223    tr_bitfieldFree( peer->have );
224    peer->have = NULL;
225
226    tr_bitfieldFree( peer->blame );
227    peer->blame = NULL;
228
229    tr_bitfieldFree( peer->banned );
230    peer->banned = NULL;
231}
232
233static void
234freePeer( tr_peer * peer )
235{
236    disconnectPeer( peer );
237    tr_free( peer->client );
238    tr_free( peer );
239}
240
241static void
242freeTorrent( tr_peerMgr * manager, Torrent * t )
243{
244    int i, size;
245    tr_peer ** peers;
246    uint8_t hash[SHA_DIGEST_LENGTH];
247
248    assert( manager != NULL );
249    assert( t != NULL );
250    assert( t->peers != NULL );
251    assert( getExistingTorrent( manager, t->hash ) != NULL );
252
253    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
254
255    tr_timerFree( &t->reconnectTimer );
256    tr_timerFree( &t->reconnectSoonTimer );
257    tr_timerFree( &t->rechokeTimer );
258    tr_timerFree( &t->rechokeSoonTimer );
259    tr_timerFree( &t->refillTimer );
260
261    peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
262    for( i=0; i<size; ++i )
263        freePeer( peers[i] );
264
265    tr_bitfieldFree( t->requested );
266    tr_ptrArrayFree( t->peers );
267    tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
268    tr_free( t );
269
270    assert( getExistingTorrent( manager, hash ) == NULL );
271}
272
273/**
274***
275**/
276
277tr_peerMgr*
278tr_peerMgrNew( tr_handle * handle )
279{
280    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
281    m->handle = handle;
282    m->torrents = tr_ptrArrayNew( );
283    m->handshakes = tr_ptrArrayNew( );
284    return m;
285}
286
287void
288tr_peerMgrFree( tr_peerMgr * manager )
289{
290    while( !tr_ptrArrayEmpty( manager->handshakes ) )
291        tr_handshakeAbort( (tr_handshake*)tr_ptrArrayNth( manager->handshakes, 0) );
292    tr_ptrArrayFree( manager->handshakes );
293
294    while( !tr_ptrArrayEmpty( manager->torrents ) )
295        freeTorrent( manager, (Torrent*)tr_ptrArrayNth( manager->torrents, 0) );
296    tr_ptrArrayFree( manager->torrents );
297
298    tr_free( manager );
299}
300
301static tr_peer**
302getConnectedPeers( Torrent * t, int * setmeCount )
303{
304    int i, peerCount, connectionCount;
305    tr_peer **peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
306    tr_peer **ret = tr_new( tr_peer*, peerCount );
307
308    for( i=connectionCount=0; i<peerCount; ++i )
309        if( peers[i]->msgs != NULL )
310            ret[connectionCount++] = peers[i];
311
312    *setmeCount = connectionCount;
313    return ret;
314}
315
316/***
317****  Refill
318***/
319
320struct tr_refill_piece
321{
322    tr_priority_t priority;
323    uint32_t piece;
324    uint32_t peerCount;
325};
326
327static int
328compareRefillPiece (const void * aIn, const void * bIn)
329{
330    const struct tr_refill_piece * a = aIn;
331    const struct tr_refill_piece * b = bIn;
332
333    /* if one piece has a higher priority, it goes first */
334    if (a->priority != b->priority)
335        return a->priority > b->priority ? -1 : 1;
336
337    /* otherwise if one has fewer peers, it goes first */
338    if (a->peerCount != b->peerCount)
339        return a->peerCount < b->peerCount ? -1 : 1;
340
341    /* otherwise go with the earlier piece */
342    return a->piece - b->piece;
343}
344
345static int
346isPieceInteresting( const tr_torrent  * tor,
347                    int                 piece )
348{
349    if( tor->info.pieces[piece].dnd ) /* we don't want it */
350        return 0;
351
352    if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we already have it */
353        return 0;
354
355    return 1;
356}
357
358static uint32_t*
359getPreferredPieces( Torrent     * t,
360                    uint32_t    * pieceCount )
361{
362    const tr_torrent * tor = t->tor;
363    const tr_info * inf = &tor->info;
364
365    int i;
366    uint32_t poolSize = 0;
367    uint32_t * pool = tr_new( uint32_t, inf->pieceCount );
368
369    int peerCount;
370    tr_peer** peers = getConnectedPeers( t, &peerCount );
371
372    for( i=0; i<inf->pieceCount; ++i )
373        if( isPieceInteresting( tor, i ) )
374            pool[poolSize++] = i;
375
376    /* sort the pool from most interesting to least... */
377    if( poolSize > 1 )
378    {
379        uint32_t j;
380        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
381
382        for( j=0; j<poolSize; ++j )
383        {
384            int k;
385            const int piece = pool[j];
386            struct tr_refill_piece * setme = p + j;
387
388            setme->piece = piece;
389            setme->priority = inf->pieces[piece].priority;
390            setme->peerCount = 0;
391
392            for( k=0; k<peerCount; ++k ) {
393                const tr_peer * peer = peers[k];
394                if( peer->peerIsInterested && !peer->clientIsChoked && tr_bitfieldHas( peer->have, piece ) )
395                    ++setme->peerCount;
396            }
397        }
398
399        qsort (p, poolSize, sizeof(struct tr_refill_piece), compareRefillPiece);
400
401        for( j=0; j<poolSize; ++j )
402            pool[j] = p[j].piece;
403
404        tr_free( p );
405    }
406
407#if 0
408fprintf (stderr, "new pool: ");
409for (i=0; i<15 && i<(int)poolSize; ++i ) fprintf (stderr, "%d, ", (int)pool[i] );
410fprintf (stderr, "\n");
411#endif
412    tr_free( peers );
413
414    *pieceCount = poolSize;
415    return pool;
416}
417
418static uint64_t*
419getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
420{
421    uint32_t i;
422    uint32_t pieceCount;
423    uint32_t * pieces;
424    uint64_t *req, *unreq, *ret, *walk;
425    int reqCount, unreqCount;
426    const tr_torrent * tor = t->tor;
427
428    pieces = getPreferredPieces( t, &pieceCount );
429/*fprintf( stderr, "REFILL refillPulse for {%s} got %d of %d pieces\n", tor->info.name, (int)pieceCount, t->tor->info.pieceCount );*/
430
431    req = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
432    reqCount = 0;
433    unreq = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
434    unreqCount = 0;
435
436    for( i=0; i<pieceCount; ++i ) {
437        const uint32_t index = pieces[i];
438        const int begin = tr_torPieceFirstBlock( tor, index );
439        const int end = begin + tr_torPieceCountBlocks( tor, (int)index );
440        int block;
441        for( block=begin; block<end; ++block )
442            if( tr_cpBlockIsComplete( tor->completion, block ) )
443                continue;
444            else if( tr_bitfieldHas( t->requested, block ) )
445                req[reqCount++] = block;
446            else
447                unreq[unreqCount++] = block;
448    }
449
450/*fprintf( stderr, "REFILL refillPulse for {%s} reqCount is %d, unreqCount is %d\n", tor->info.name, (int)reqCount, (int)unreqCount );*/
451    ret = walk = tr_new( uint64_t, unreqCount + reqCount );
452    memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
453    walk += unreqCount;
454    memcpy( walk, req, sizeof(uint64_t) * reqCount );
455    walk += reqCount;
456    assert( ( walk - ret ) == ( unreqCount + reqCount ) );
457    *setmeCount = walk - ret;
458
459    tr_free( req );
460    tr_free( unreq );
461    tr_free( pieces );
462
463    return ret;
464}
465
466static int
467refillPulse( void * vtorrent )
468{
469    Torrent * t = vtorrent;
470    tr_torrent * tor = t->tor;
471    uint32_t i;
472    int peerCount;
473    tr_peer ** peers;
474    uint64_t blockCount;
475    uint64_t * blocks;
476
477    if( !t->isRunning )
478        return TRUE;
479    if( tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE )
480        return TRUE;
481
482    blocks = getPreferredBlocks( t, &blockCount );
483    peers = getConnectedPeers( t, &peerCount );
484
485/*fprintf( stderr, "REFILL refillPulse for {%s} got %d blocks\n", tor->info.name, (int)blockCount );*/
486
487    for( i=0; peerCount && i<blockCount; ++i )
488    {
489        const int block = blocks[i];
490        const uint32_t index = tr_torBlockPiece( tor, block );
491        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
492        const uint32_t length = tr_torBlockCountBytes( tor, block );
493        int j;
494        assert( _tr_block( tor, index, begin ) == block );
495        assert( begin < (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
496        assert( (begin + length) <= (uint32_t)tr_torPieceCountBytes( tor, (int)index ) );
497
498
499        /* find a peer who can ask for this block */
500        for( j=0; j<peerCount; )
501        {
502            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
503            switch( val )
504            {
505                case TR_ADDREQ_FULL: 
506                case TR_ADDREQ_CLIENT_CHOKED:
507                    memmove( peers+j, peers+j+1, sizeof(tr_peer*)*(--peerCount-j) );
508                    break;
509
510                case TR_ADDREQ_MISSING: 
511                    ++j;
512                    break;
513
514                case TR_ADDREQ_OK:
515                    /*fprintf( stderr, "REFILL peer %p took the request for block %d\n", peers[j]->msgs, block );*/
516                    tr_bitfieldAdd( t->requested, block );
517                    j = peerCount;
518                    break;
519
520                default:
521                    assert( 0 && "unhandled value" );
522                    break;
523            }
524        }
525    }
526
527    /* cleanup */
528    tr_free( peers );
529    tr_free( blocks );
530
531    t->refillTimer = NULL;
532    return FALSE;
533}
534
535static void
536broadcastClientHave( Torrent * t, uint32_t index )
537{
538    int i, size;
539    tr_peer ** peers = getConnectedPeers( t, &size );
540    for( i=0; i<size; ++i )
541        tr_peerMsgsHave( peers[i]->msgs, index );
542    tr_free( peers );
543}
544
545static void
546broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
547{
548    int i, size;
549    tr_peer ** peers = getConnectedPeers( t, &size );
550    for( i=0; i<size; ++i )
551        tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
552    tr_free( peers );
553}
554
555/**
556***
557**/
558
559static int reconnectPulse( void * vtorrent );
560
561static void
562restartReconnectTimer( Torrent * t )
563{
564    tr_timerFree( &t->reconnectTimer );
565    if( t->isRunning )
566        t->reconnectTimer = tr_timerNew( t->manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC );
567}
568
569static void
570reconnectNow( Torrent * t )
571{
572    reconnectPulse( t );
573    restartReconnectTimer( t );
574}
575
576static int
577reconnectSoonCB( void * vt )
578{
579    Torrent * t = vt;
580    reconnectNow( t );
581    t->reconnectSoonTimer = NULL;
582    return FALSE;
583}
584
585static void
586reconnectSoon( Torrent * t )
587{
588    if( t->reconnectSoonTimer == NULL )
589        t->reconnectSoonTimer = tr_timerNew( t->manager->handle,
590                                             reconnectSoonCB, t, SOON_MSEC );
591}
592
593/**
594***
595**/
596
597static int rechokePulse( void * vtorrent );
598
599static void
600restartChokeTimer( Torrent * t )
601{
602    tr_timerFree( &t->rechokeTimer );
603    if( t->isRunning )
604        t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
605}
606
607static void
608rechokeNow( Torrent * t )
609{
610    rechokePulse( t );
611    restartChokeTimer( t );
612}
613
614static int
615rechokeSoonCB( void * vt )
616{
617    Torrent * t = vt;
618    rechokeNow( t );
619    t->rechokeSoonTimer = NULL;
620    return FALSE;
621}
622
623static void
624rechokeSoon( Torrent * t )
625{
626    if( t->rechokeSoonTimer == NULL )
627        t->rechokeSoonTimer = tr_timerNew( t->manager->handle,
628                                           rechokeSoonCB, t, SOON_MSEC );
629}
630
631static void
632msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
633{
634    tr_peer * peer = vpeer;
635    Torrent * t = (Torrent *) vt;
636    const tr_peermsgs_event * e = (const tr_peermsgs_event *) vevent;
637
638    switch( e->eventType )
639    {
640        case TR_PEERMSG_NEED_REQ:
641            if( t->refillTimer == NULL )
642                t->refillTimer = tr_timerNew( t->manager->handle,
643                                              refillPulse, t,
644                                              REFILL_PERIOD_MSEC );
645            break;
646
647        case TR_PEERMSG_CLIENT_HAVE:
648            broadcastClientHave( t, e->pieceIndex );
649            tr_torrentRecheckCompleteness( t->tor );
650            break;
651
652        case TR_PEERMSG_PEER_PROGRESS: { /* if we're both seeds, then disconnect. */
653#if 0
654            const int clientIsSeed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
655            const int peerIsSeed = e->progress >= 1.0;
656            if( clientIsSeed && peerIsSeed )
657                peer->doPurge = 1;
658#endif
659            break;
660        }
661
662        case TR_PEERMSG_CLIENT_BLOCK:
663            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
664            break;
665
666        case TR_PEERMSG_GOT_ERROR:
667            peer->doPurge = 1;
668            reconnectSoon( t );
669            break;
670
671        default:
672            assert(0);
673    }
674}
675
676static void
677myHandshakeDoneCB( tr_handshake    * handshake,
678                   tr_peerIo       * io,
679                   int               isConnected,
680                   const uint8_t   * peer_id,
681                   int               peerSupportsEncryption,
682                   void            * vmanager )
683{
684    int ok = isConnected;
685    uint16_t port;
686    const struct in_addr * in_addr;
687    tr_peerMgr * manager = (tr_peerMgr*) vmanager;
688    const uint8_t * hash = NULL;
689    Torrent * t;
690    tr_handshake * ours;
691
692    assert( io != NULL );
693    assert( isConnected==0 || isConnected==1 );
694    assert( peerSupportsEncryption==0 || peerSupportsEncryption==1 );
695
696    ours = tr_ptrArrayRemoveSorted( manager->handshakes,
697                                    handshake,
698                                    handshakeCompare );
699    //assert( ours != NULL );
700    //assert( ours == handshake );
701
702    in_addr = tr_peerIoGetAddress( io, &port );
703
704    if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
705    {
706        tr_peerIoFree( io );
707        --manager->connectionCount;
708        return;
709    }
710
711    hash = tr_peerIoGetTorrentHash( io );
712    t = getExistingTorrent( manager, hash );
713    if( !t || !t->isRunning )
714    {
715        tr_peerIoFree( io );
716        --manager->connectionCount;
717        return;
718    }
719
720    /* if we couldn't connect or were snubbed,
721     * the peer's probably not worth remembering. */
722    if( !ok ) {
723        tr_peer * peer = getExistingPeer( t, in_addr );
724        tr_peerIoFree( io );
725        --manager->connectionCount;
726        if( peer )
727            peer->doPurge = 1;
728        return;
729    }
730
731    if( 1 ) {
732        tr_peer * peer = getPeer( t, in_addr, NULL );
733        if( peer->msgs != NULL ) { /* we already have this peer */
734            tr_peerIoFree( io );
735            --manager->connectionCount;
736        } else {
737            peer->port = port;
738            peer->io = io;
739            peer->msgs = tr_peerMsgsNew( t->tor, peer );
740            tr_free( peer->client );
741            peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
742            peer->peerSupportsEncryption = peerSupportsEncryption ? 1 : 0;
743            peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
744            peer->connectionChangedAt = time( NULL );
745            rechokeSoon( t );
746        }
747    }
748}
749
750static void
751initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
752{
753    tr_handshake * handshake = tr_handshakeNew( io,
754                                                manager->handle->encryptionMode,
755                                                myHandshakeDoneCB,
756                                                manager );
757    ++manager->connectionCount;
758
759    tr_ptrArrayInsertSorted( manager->handshakes, handshake, handshakeCompare );
760}
761
762void
763tr_peerMgrAddIncoming( tr_peerMgr      * manager,
764                       struct in_addr  * addr,
765                       uint16_t          port,
766                       int               socket )
767{
768    if( getExistingHandshake( manager, addr ) == NULL )
769    {
770        tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
771        initiateHandshake( manager, io );
772    }
773}
774
775void
776tr_peerMgrAddPex( tr_peerMgr     * manager,
777                  const uint8_t  * torrentHash,
778                  int              from,
779                  const tr_pex   * pex,
780                  int              pexCount )
781{
782    Torrent * t = getExistingTorrent( manager, torrentHash );
783    const tr_pex * end = pex + pexCount;
784    while( pex != end )
785    {
786        int isNew;
787        tr_peer * peer = getPeer( t, &pex->in_addr, &isNew );
788        if( isNew ) {
789            peer->port = pex->port;
790            peer->from = from;
791        }
792        ++pex;
793    }
794    reconnectSoon( t );
795}
796
797void
798tr_peerMgrAddPeers( tr_peerMgr    * manager,
799                    const uint8_t * torrentHash,
800                    int             from,
801                    const uint8_t * peerCompact,
802                    int             peerCount )
803{
804    int i;
805    const uint8_t * walk = peerCompact;
806    Torrent * t = getExistingTorrent( manager, torrentHash );
807    for( i=0; t!=NULL && i<peerCount; ++i )
808    {
809        int isNew;
810        tr_peer * peer;
811        struct in_addr addr;
812        uint16_t port;
813        memcpy( &addr, walk, 4 ); walk += 4;
814        memcpy( &port, walk, 2 ); walk += 2;
815        peer = getPeer( t, &addr, &isNew );
816        if( isNew ) {
817            peer->port = port;
818            peer->from = from;
819        }
820    }
821    reconnectSoon( t );
822}
823
824/**
825***
826**/
827
828int
829tr_peerMgrIsAcceptingConnections( const tr_peerMgr * manager UNUSED )
830{
831    return TRUE; /* manager->connectionCount < MAX_CONNECTED_PEERS; */
832}
833
834void
835tr_peerMgrSetBlame( tr_peerMgr     * manager UNUSED,
836                    const uint8_t  * torrentHash UNUSED,
837                    int              pieceIndex UNUSED,
838                    int              success UNUSED )
839{
840    /*fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );*/
841}
842
843int
844tr_pexCompare( const void * va, const void * vb )
845{
846    const tr_pex * a = (const tr_pex *) va;
847    const tr_pex * b = (const tr_pex *) vb;
848    int i = memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
849    if( i ) return i;
850    if( a->port < b->port ) return -1;
851    if( a->port > b->port ) return 1;
852    return 0;
853}
854
855int tr_pexCompare( const void * a, const void * b );
856
857
858int
859tr_peerMgrGetPeers( tr_peerMgr      * manager,
860                    const uint8_t   * torrentHash,
861                    tr_pex         ** setme_pex )
862{
863    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
864    int i, peerCount;
865    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
866    tr_pex * pex = tr_new( tr_pex, peerCount );
867    tr_pex * walk = pex;
868
869    for( i=0; i<peerCount; ++i, ++walk )
870    {
871        const tr_peer * peer = peers[i];
872
873        walk->in_addr = peer->in_addr;
874
875        walk->port = peer->port;
876
877        walk->flags = 0;
878        if( peer->peerSupportsEncryption ) walk->flags |= 1;
879        if( peer->progress >= 1.0 )        walk->flags |= 2;
880    }
881
882    assert( ( walk - pex ) == peerCount );
883    qsort( pex, peerCount, sizeof(tr_pex), tr_pexCompare );
884    *setme_pex = pex;
885    return peerCount;
886}
887
888void
889tr_peerMgrStartTorrent( tr_peerMgr     * manager,
890                        const uint8_t  * torrentHash )
891{
892    Torrent * t = getExistingTorrent( manager, torrentHash );
893    t->isRunning = 1;
894    restartChokeTimer( t );
895    reconnectNow( t );
896}
897
898void
899tr_peerMgrStopTorrent( tr_peerMgr     * manager,
900                       const uint8_t  * torrentHash)
901{
902    Torrent * t = getExistingTorrent( manager, torrentHash );
903    t->isRunning = 0;
904    tr_timerFree( &t->rechokeTimer );
905    tr_timerFree( &t->reconnectTimer );
906    reconnectPulse( t );
907}
908
909void
910tr_peerMgrAddTorrent( tr_peerMgr * manager,
911                      tr_torrent * tor )
912{
913    Torrent * t;
914
915    assert( tor != NULL );
916    assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
917
918    t = tr_new0( Torrent, 1 );
919    t->manager = manager;
920    t->tor = tor;
921    t->peers = tr_ptrArrayNew( );
922    t->requested = tr_bitfieldNew( tor->blockCount );
923    restartChokeTimer( t );
924    restartReconnectTimer( t );
925
926    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
927    tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
928}
929
930void
931tr_peerMgrRemoveTorrent( tr_peerMgr     * manager,
932                         const uint8_t  * torrentHash )
933{
934    Torrent * t = getExistingTorrent( manager, torrentHash );
935    assert( t != NULL );
936    tr_peerMgrStopTorrent( manager, torrentHash );
937    freeTorrent( manager, t );
938}
939
940void
941tr_peerMgrTorrentAvailability( const tr_peerMgr * manager,
942                               const uint8_t    * torrentHash,
943                               int8_t           * tab,
944                               int                tabCount )
945{
946    int i;
947    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
948    const tr_torrent * tor = t->tor;
949    const float interval = tor->info.pieceCount / (float)tabCount;
950
951    memset( tab, 0, tabCount );
952
953    for( i=0; i<tabCount; ++i )
954    {
955        const int piece = i * interval;
956
957        if( tor == NULL )
958            tab[i] = 0;
959        else if( tr_cpPieceIsComplete( tor->completion, piece ) )
960            tab[i] = -1;
961        else {
962            int j, peerCount;
963            const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
964            for( j=0; j<peerCount; ++j )
965                if( tr_bitfieldHas( peers[j]->have, i ) )
966                    ++tab[i];
967        }
968    }
969}
970
971/* Returns the pieces that we and/or a connected peer has */
972tr_bitfield*
973tr_peerMgrGetAvailable( const tr_peerMgr * manager,
974                        const uint8_t    * torrentHash )
975{
976    int i, size;
977    const Torrent * t;
978    const tr_peer ** peers;
979    tr_bitfield * pieces;
980
981    t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
982    peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
983    pieces = tr_bitfieldDup( tr_cpPieceBitfield( t->tor->completion ) );
984    for( i=0; i<size; ++i )
985        if( peers[i]->io != NULL )
986            tr_bitfieldAnd( pieces, peers[i]->have );
987
988    return pieces;
989}
990
991void
992tr_peerMgrTorrentStats( const tr_peerMgr * manager,
993                        const uint8_t    * torrentHash,
994                        int              * setmePeersTotal,
995                        int              * setmePeersConnected,
996                        int              * setmePeersSendingToUs,
997                        int              * setmePeersGettingFromUs,
998                        int              * setmePeersFrom )
999{
1000    int i, size;
1001    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1002    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1003
1004    *setmePeersTotal          = size;
1005    *setmePeersConnected      = 0;
1006    *setmePeersSendingToUs    = 0;
1007    *setmePeersGettingFromUs  = 0;
1008
1009    for( i=0; i<TR_PEER_FROM__MAX; ++i )
1010        setmePeersFrom[i] = 0;
1011
1012    for( i=0; i<size; ++i )
1013    {
1014        const tr_peer * peer = peers[i];
1015
1016        if( peer->io == NULL ) /* not connected */
1017            continue;
1018
1019        ++*setmePeersConnected;
1020
1021        ++setmePeersFrom[peer->from];
1022
1023        if( tr_peerIoGetRateToPeer( peer->io ) > 0.01 )
1024            ++*setmePeersGettingFromUs;
1025
1026        if( tr_peerIoGetRateToClient( peer->io ) > 0.01 )
1027            ++*setmePeersSendingToUs;
1028    }
1029}
1030
1031struct tr_peer_stat *
1032tr_peerMgrPeerStats( const tr_peerMgr  * manager,
1033                     const uint8_t     * torrentHash,
1034                     int               * setmeCount UNUSED )
1035{
1036    int i, size;
1037    const Torrent * t = getExistingTorrent( (tr_peerMgr*)manager, torrentHash );
1038    const tr_peer ** peers = (const tr_peer **) tr_ptrArrayPeek( t->peers, &size );
1039    tr_peer_stat * ret;
1040
1041    ret = tr_new0( tr_peer_stat, size );
1042
1043    for( i=0; i<size; ++i )
1044    {
1045        const tr_peer * peer = peers[i];
1046        const int live = peer->io != NULL;
1047        tr_peer_stat * stat = ret + i;
1048
1049        tr_netNtop( &peer->in_addr, stat->addr, sizeof(stat->addr) );
1050        stat->port             = peer->port;
1051        stat->from             = peer->from;
1052        stat->client           = peer->client;
1053        stat->progress         = peer->progress;
1054        stat->isConnected      = live ? 1 : 0;
1055        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
1056        stat->uploadToRate     = tr_peerIoGetRateToPeer( peer->io );
1057        stat->downloadFromRate = tr_peerIoGetRateToClient( peer->io );
1058        stat->isDownloading    = stat->uploadToRate > 0.01;
1059        stat->isUploading      = stat->downloadFromRate > 0.01;
1060    }
1061
1062    *setmeCount = size;
1063    return ret;
1064}
1065
1066/**
1067***
1068**/
1069
1070typedef struct
1071{
1072    tr_peer * peer;
1073    float rate;
1074    int randomKey;
1075    int preferred;
1076    int doUnchoke;
1077}
1078ChokeData;
1079
1080static int
1081compareChoke( const void * va, const void * vb )
1082{
1083    const ChokeData * a = ( const ChokeData * ) va;
1084    const ChokeData * b = ( const ChokeData * ) vb;
1085
1086    if( a->preferred != b->preferred )
1087        return a->preferred ? -1 : 1;
1088
1089    if( a->preferred )
1090    {
1091        if( a->rate > b->rate ) return -1;
1092        if( a->rate < b->rate ) return 1;
1093        return 0;
1094    }
1095    else
1096    {
1097        return a->randomKey - b->randomKey;
1098    }
1099}
1100
1101static int
1102clientIsSnubbedBy( const tr_peer * peer )
1103{
1104    assert( peer != NULL );
1105
1106    return peer->peerSentPieceDataAt < (time(NULL) - SNUBBED_SEC);
1107}
1108
1109/**
1110***
1111**/
1112
1113static void
1114rechokeLeech( Torrent * t )
1115{
1116    int i, peerCount, size=0, unchoked=0;
1117    const time_t ignorePeersNewerThan = time(NULL) - MIN_CHOKE_PERIOD_SEC;
1118    tr_peer ** peers = getConnectedPeers( t, &peerCount );
1119    ChokeData * choke = tr_new0( ChokeData, peerCount );
1120
1121    /* sort the peers by preference and rate */
1122    for( i=0; i<peerCount; ++i )
1123    {
1124        tr_peer * peer = peers[i];
1125        ChokeData * node;
1126        if( peer->chokeChangedAt > ignorePeersNewerThan )
1127            continue;
1128
1129        node = &choke[size++];
1130        node->peer = peer;
1131        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
1132        node->randomKey = tr_rand( INT_MAX );
1133        node->rate = tr_peerIoGetRateToClient( peer->io );
1134    }
1135
1136    qsort( choke, size, sizeof(ChokeData), compareChoke );
1137
1138    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
1139        choke[i].doUnchoke = 1;
1140        ++unchoked;
1141    }
1142
1143    for( ; i<size; ++i ) {
1144        choke[i].doUnchoke = 1;
1145        ++unchoked;
1146        if( choke[i].peer->peerIsInterested )
1147            break;
1148    }
1149
1150    for( i=0; i<size; ++i )
1151        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
1152
1153    /* cleanup */
1154    tr_free( choke );
1155    tr_free( peers );
1156}
1157
1158static void
1159rechokeSeed( Torrent * t )
1160{
1161    int i, size;
1162    tr_peer ** peers = getConnectedPeers( t, &size );
1163
1164    /* FIXME */
1165    for( i=0; i<size; ++i )
1166        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
1167
1168    tr_free( peers );
1169}
1170
1171static int
1172rechokePulse( void * vtorrent )
1173{
1174    Torrent * t = vtorrent;
1175    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1176    if( done )
1177        rechokeLeech( vtorrent );
1178    else
1179        rechokeSeed( vtorrent );
1180    return TRUE;
1181}
1182
1183/**
1184***
1185**/
1186
1187static int
1188shouldPeerBeDisconnected( Torrent * t, tr_peer * peer, int peerCount, int isSeeding )
1189{
1190    const time_t now = time( NULL );
1191    int relaxStrictnessIfFewerThanN;
1192    double strictness;
1193
1194    if( peer->io == NULL ) /* not connected */
1195        return FALSE;
1196
1197    if( !t->isRunning ) /* the torrent is stopped... nobody should be connected */
1198        return TRUE;
1199
1200    /* not enough peers to go around... might as well keep this one;
1201     * they might unchoke us or give us a pex or something */
1202    if( peerCount < MAX_CONNECTED_PEERS_PER_TORRENT )
1203        return FALSE;
1204
1205    /* when deciding whether or not to keep a peer, judge its responsiveness
1206       on a sliding scale that's based on how many other peers are available */
1207    relaxStrictnessIfFewerThanN =
1208        (int)(((MAX_CONNECTED_PEERS_PER_TORRENT * RELAX_RULES_PERCENTAGE) / 100.0) + 0.5);
1209
1210    /* if we have >= relaxIfFewerThan, strictness is 100%.
1211       if we have zero connections, strictness is 0% */
1212    if( peerCount >= relaxStrictnessIfFewerThanN )
1213        strictness = 1.0;
1214    else
1215        strictness = peerCount / (double)relaxStrictnessIfFewerThanN;
1216
1217    /* test: has it been too long since we exchanged piece data? */
1218    if( ( now - peer->connectionChangedAt ) >= MAX_TRANSFER_IDLE ) {
1219        const uint64_t lo = MIN_TRANSFER_IDLE;
1220        const uint64_t hi = MAX_TRANSFER_IDLE;
1221        const uint64_t limit = lo + ((hi-lo) * strictness);
1222        const uint64_t interval = now - (isSeeding ? peer->clientSentPieceDataAt : peer->peerSentPieceDataAt);
1223        if( interval > limit )
1224            return TRUE;
1225    }
1226
1227    /* FIXME: SWE had other tests too... */
1228
1229    return FALSE;
1230}
1231
1232static int
1233comparePeerByConnectionDate( const void * va, const void * vb )
1234{
1235    const tr_peer * a = *(const tr_peer**) va;
1236    const tr_peer * b = *(const tr_peer**) vb;
1237    return tr_compareUint64( a->connectionChangedAt, b->connectionChangedAt );
1238}
1239
1240static int
1241reconnectPulse( void * vt UNUSED )
1242{
1243    int i, size, liveCount;
1244    Torrent * t = vt;
1245    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &size );
1246    const int isSeeding = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
1247
1248    /* how many connections do we have? */
1249    for( i=liveCount=0; i<size; ++i )
1250        if( peers[i]->msgs != NULL )
1251            ++liveCount;
1252
1253    /* destroy and/or disconnect from some peers */
1254    for( i=0; i<size; )
1255    {
1256        tr_peer * peer = peers[i];
1257
1258        if( peer->doPurge ) {
1259            tr_ptrArrayErase( t->peers, i, i+1 );
1260            freePeer( peer );
1261            --size;
1262            --liveCount;
1263            continue;
1264        }
1265
1266        if( shouldPeerBeDisconnected( t, peer, liveCount, isSeeding ) ) {
1267            disconnectPeer( peer );
1268            --liveCount;
1269        }
1270
1271        ++i;
1272    }
1273
1274    /* maybe connect to some new peers */ 
1275    if( t->isRunning && (liveCount<MAX_CONNECTED_PEERS_PER_TORRENT) )
1276    {
1277        int poolSize;
1278        int left = MAX_CONNECTED_PEERS_PER_TORRENT - liveCount;
1279        tr_peer ** pool;
1280        tr_peerMgr * manager = t->manager;
1281        const time_t now = time( NULL );
1282
1283        /* make a list of peers we know about but aren't connected to */
1284        poolSize = 0;
1285        pool = tr_new0( tr_peer*, size );
1286        for( i=0; i<size; ++i ) {
1287            tr_peer * peer = peers[i];
1288            if( peer->msgs == NULL )
1289                pool[poolSize++] = peer;
1290        }
1291
1292        /* sort them s.t. the ones we've already tried are at the last of the list */
1293        qsort( pool, poolSize, sizeof(tr_peer*), comparePeerByConnectionDate );
1294
1295        /* make some connections */
1296        for( i=0; i<poolSize && left>0; ++i )
1297        {
1298            tr_peer * peer = pool[i];
1299            tr_peerIo * io;
1300
1301            if( ( now - peer->connectionChangedAt ) < MIN_HANGUP_PERIOD_SEC )
1302                break;
1303
1304            /* already have a handshake pending */
1305            if( getExistingHandshake( manager, &peer->in_addr ) != NULL )
1306                continue;
1307
1308            /* initiate a connection to the peer */
1309            io = tr_peerIoNewOutgoing( manager->handle, &peer->in_addr, peer->port, t->hash );
1310            /*fprintf( stderr, "[%s] connecting to potential peer %s\n", t->tor->info.name, tr_peerIoGetAddrStr(io) );*/
1311            peer->connectionChangedAt = time( NULL );
1312            initiateHandshake( manager, io );
1313            --left;
1314        }
1315
1316        tr_free( pool );
1317    }
1318
1319    return TRUE;
1320}
Note: See TracBrowser for help on using the repository browser.