source: trunk/libtransmission/peer-mgr.c @ 9816

Last change on this file since 9816 was 9816, checked in by charles, 12 years ago

(trunk libT) #2632 "Add streaming capability to libtransmission (but not the Transmission GUI clients)" -- implemented

  • Property svn:keywords set to Date Rev Author Id
File size: 92.3 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 9816 2009-12-24 01:02:54Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16
17#include <event.h>
18
19#include "transmission.h"
20#include "announcer.h"
21#include "bandwidth.h"
22#include "bencode.h"
23#include "blocklist.h"
24#include "clients.h"
25#include "completion.h"
26#include "crypto.h"
27#include "handshake.h"
28#include "inout.h" /* tr_ioTestPiece */
29#include "net.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "session.h"
35#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
36#include "torrent.h"
37#include "utils.h"
38#include "webseed.h"
39
40enum
41{
42    /* how frequently to cull old atoms */
43    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
44
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* how frequently to reallocate bandwidth */
49    BANDWIDTH_PERIOD_MSEC = 500,
50
51    /* how frequently to age out old piece request lists */
52    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
53
54    /* how frequently to decide which peers live and die */
55    RECONNECT_PERIOD_MSEC = 500,
56
57    /* when many peers are available, keep idle ones this long */
58    MIN_UPLOAD_IDLE_SECS = ( 60 ),
59
60    /* when few peers are available, keep idle ones this long */
61    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
62
63    /* max # of peers to ask fer per torrent per reconnect pulse */
64    MAX_RECONNECTIONS_PER_PULSE = 8,
65
66    /* max number of peers to ask for per second overall.
67    * this throttle is to avoid overloading the router */
68    MAX_CONNECTIONS_PER_SECOND = 16,
69
70    /* number of bad pieces a peer is allowed to send before we ban them */
71    MAX_BAD_PIECES_PER_PEER = 5,
72
73    /* amount of time to keep a list of request pieces lying around
74       before it's considered too old and needs to be rebuilt */
75    PIECE_LIST_SHELF_LIFE_SECS = 60,
76
77    /* use for bitwise operations w/peer_atom.myflags */
78    MYFLAG_BANNED = 1,
79
80    /* use for bitwise operations w/peer_atom.myflags */
81    /* unreachable for now... but not banned.
82     * if they try to connect to us it's okay */
83    MYFLAG_UNREACHABLE = 2,
84
85    /* the minimum we'll wait before attempting to reconnect to a peer */
86    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
87
88    /** how long we'll let requests we've made linger before we cancel them */
89    REQUEST_TTL_SECS = 45
90};
91
92
93/**
94***
95**/
96
97enum
98{
99    UPLOAD_ONLY_UKNOWN,
100    UPLOAD_ONLY_YES,
101    UPLOAD_ONLY_NO
102};
103
104/**
105 * Peer information that should be kept even before we've connected and
106 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
107 * which ones would make good candidates for connecting to, and to watch out
108 * for banned peers.
109 *
110 * @see tr_peer
111 * @see tr_peermsgs
112 */
113struct peer_atom
114{
115    tr_peer   * peer;        /* will be NULL if not connected */
116    uint8_t     from;
117    uint8_t     flags;       /* these match the added_f flags */
118    uint8_t     myflags;     /* flags that aren't defined in added_f */
119    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
120    tr_port     port;
121    uint16_t    numFails;
122    tr_address  addr;
123    time_t      time;        /* when the peer's connection status last changed */
124    time_t      piece_data_time;
125
126    /* similar to a TTL field, but less rigid --
127     * if the swarm is small, the atom will be kept past this date. */
128    time_t      shelf_date;
129};
130
131static tr_bool
132tr_isAtom( const struct peer_atom * atom )
133{
134    return ( atom != NULL )
135        && ( atom->from < TR_PEER_FROM__MAX )
136        && ( tr_isAddress( &atom->addr ) );
137}
138
139static const char*
140tr_atomAddrStr( const struct peer_atom * atom )
141{
142    return tr_peerIoAddrStr( &atom->addr, atom->port );
143}
144
145struct block_request
146{
147    tr_block_index_t block;
148    tr_peer * peer;
149    time_t sentAt;
150};
151
152struct weighted_piece
153{
154    tr_piece_index_t index;
155    tr_piece_index_t salt;
156    int16_t requestCount;
157};
158
159typedef struct tr_torrent_peers
160{
161    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
162    tr_ptrArray                pool; /* struct peer_atom */
163    tr_ptrArray                peers; /* tr_peer */
164    tr_ptrArray                webseeds; /* tr_webseed */
165
166    tr_torrent               * tor;
167    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
168    struct tr_peerMgr        * manager;
169
170    tr_bool                    isRunning;
171    tr_bool                    needsCompletenessCheck;
172
173    struct block_request     * requests;
174    int                        requestsSort;
175    int                        requestCount;
176    int                        requestAlloc;
177
178    struct weighted_piece    * pieces;
179    int                        piecesSort;
180    int                        pieceCount;
181
182    tr_bool                    isInEndgame;
183}
184Torrent;
185
186struct tr_peerMgr
187{
188    tr_session    * session;
189    tr_ptrArray     incomingHandshakes; /* tr_handshake */
190    struct event  * bandwidthTimer;
191    struct event  * rechokeTimer;
192    struct event  * reconnectTimer;
193    struct event  * refillUpkeepTimer;
194    struct event  * atomTimer;
195};
196
197#define tordbg( t, ... ) \
198    do { \
199        if( tr_deepLoggingIsActive( ) ) \
200            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
201    } while( 0 )
202
203#define dbgmsg( ... ) \
204    do { \
205        if( tr_deepLoggingIsActive( ) ) \
206            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
207    } while( 0 )
208
209/**
210***
211**/
212
213static TR_INLINE void
214managerLock( const struct tr_peerMgr * manager )
215{
216    tr_globalLock( manager->session );
217}
218
219static TR_INLINE void
220managerUnlock( const struct tr_peerMgr * manager )
221{
222    tr_globalUnlock( manager->session );
223}
224
225static TR_INLINE void
226torrentLock( Torrent * torrent )
227{
228    managerLock( torrent->manager );
229}
230
231static TR_INLINE void
232torrentUnlock( Torrent * torrent )
233{
234    managerUnlock( torrent->manager );
235}
236
237static TR_INLINE int
238torrentIsLocked( const Torrent * t )
239{
240    return tr_globalIsLocked( t->manager->session );
241}
242
243/**
244***
245**/
246
247static int
248handshakeCompareToAddr( const void * va, const void * vb )
249{
250    const tr_handshake * a = va;
251
252    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
253}
254
255static int
256handshakeCompare( const void * a, const void * b )
257{
258    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
259}
260
261static tr_handshake*
262getExistingHandshake( tr_ptrArray      * handshakes,
263                      const tr_address * addr )
264{
265    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
266}
267
268static int
269comparePeerAtomToAddress( const void * va, const void * vb )
270{
271    const struct peer_atom * a = va;
272
273    return tr_compareAddresses( &a->addr, vb );
274}
275
276static int
277compareAtomsByAddress( const void * va, const void * vb )
278{
279    const struct peer_atom * b = vb;
280
281    assert( tr_isAtom( b ) );
282
283    return comparePeerAtomToAddress( va, &b->addr );
284}
285
286/**
287***
288**/
289
290const tr_address *
291tr_peerAddress( const tr_peer * peer )
292{
293    return &peer->atom->addr;
294}
295
296static Torrent*
297getExistingTorrent( tr_peerMgr *    manager,
298                    const uint8_t * hash )
299{
300    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
301
302    return tor == NULL ? NULL : tor->torrentPeers;
303}
304
305static int
306peerCompare( const void * a, const void * b )
307{
308    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
309}
310
311static struct peer_atom*
312getExistingAtom( const Torrent    * t,
313                 const tr_address * addr )
314{
315    Torrent * tt = (Torrent*)t;
316    assert( torrentIsLocked( t ) );
317    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
318}
319
320static tr_bool
321peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
322{
323    Torrent * t = (Torrent*) ct;
324
325    assert( torrentIsLocked ( t ) );
326
327    return ( atom->peer != NULL )
328        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
329        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
330}
331
332static tr_peer*
333peerConstructor( struct peer_atom * atom )
334{
335    tr_peer * peer = tr_new0( tr_peer, 1 );
336    tr_bitsetConstructor( &peer->have, 0 );
337    peer->atom = atom;
338    atom->peer = peer;
339    return peer;
340}
341
342static tr_peer*
343getPeer( Torrent * torrent, struct peer_atom * atom )
344{
345    tr_peer * peer;
346
347    assert( torrentIsLocked( torrent ) );
348
349    peer = atom->peer;
350
351    if( peer == NULL )
352    {
353        peer = peerConstructor( atom );
354        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
355    }
356
357    return peer;
358}
359
360static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
361
362static void
363peerDestructor( Torrent * t, tr_peer * peer )
364{
365    assert( peer != NULL );
366
367    peerDeclinedAllRequests( t, peer );
368
369    if( peer->msgs != NULL )
370    {
371        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
372        tr_peerMsgsFree( peer->msgs );
373    }
374
375    tr_peerIoClear( peer->io );
376    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
377
378    tr_bitsetDestructor( &peer->have );
379    tr_bitfieldFree( peer->blame );
380    tr_free( peer->client );
381    peer->atom->peer = NULL;
382
383    tr_free( peer );
384}
385
386static void
387removePeer( Torrent * t, tr_peer * peer )
388{
389    tr_peer * removed;
390    struct peer_atom * atom = peer->atom;
391
392    assert( torrentIsLocked( t ) );
393    assert( atom );
394
395    atom->time = tr_time( );
396
397    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
398    assert( removed == peer );
399    peerDestructor( t, removed );
400}
401
402static void
403removeAllPeers( Torrent * t )
404{
405    while( !tr_ptrArrayEmpty( &t->peers ) )
406        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
407}
408
409static void
410torrentDestructor( void * vt )
411{
412    Torrent * t = vt;
413
414    assert( t );
415    assert( !t->isRunning );
416    assert( torrentIsLocked( t ) );
417    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
418    assert( tr_ptrArrayEmpty( &t->peers ) );
419
420    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
421    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
422    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
423    tr_ptrArrayDestruct( &t->peers, NULL );
424
425    tr_free( t->requests );
426    tr_free( t->pieces );
427    tr_free( t );
428}
429
430static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
431
432static Torrent*
433torrentConstructor( tr_peerMgr * manager,
434                    tr_torrent * tor )
435{
436    int       i;
437    Torrent * t;
438
439    t = tr_new0( Torrent, 1 );
440    t->manager = manager;
441    t->tor = tor;
442    t->pool = TR_PTR_ARRAY_INIT;
443    t->peers = TR_PTR_ARRAY_INIT;
444    t->webseeds = TR_PTR_ARRAY_INIT;
445    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
446    t->requests = 0;
447
448    for( i = 0; i < tor->info.webseedCount; ++i )
449    {
450        tr_webseed * w =
451            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
452        tr_ptrArrayAppend( &t->webseeds, w );
453    }
454
455    return t;
456}
457
458tr_peerMgr*
459tr_peerMgrNew( tr_session * session )
460{
461    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
462    m->session = session;
463    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
464    return m;
465}
466
467static void
468deleteTimer( struct event ** t )
469{
470    if( *t != NULL )
471    {
472        evtimer_del( *t );
473        tr_free( *t );
474        *t = NULL;
475    }
476}
477
478static void
479deleteTimers( struct tr_peerMgr * m )
480{
481    deleteTimer( &m->atomTimer );
482    deleteTimer( &m->bandwidthTimer );
483    deleteTimer( &m->rechokeTimer );
484    deleteTimer( &m->reconnectTimer );
485    deleteTimer( &m->refillUpkeepTimer );
486}
487
488void
489tr_peerMgrFree( tr_peerMgr * manager )
490{
491    managerLock( manager );
492
493    deleteTimers( manager );
494
495    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
496     * the item from manager->handshakes, so this is a little roundabout... */
497    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
498        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
499
500    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
501
502    managerUnlock( manager );
503    tr_free( manager );
504}
505
506static int
507clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
508{
509    if( !tr_torrentHasMetadata( tor ) )
510        return TRUE;
511
512    return peer->clientIsInterested && !peer->clientIsChoked;
513}
514
515static int
516clientIsUploadingTo( const tr_peer * peer )
517{
518    return peer->peerIsInterested && !peer->peerIsChoked;
519}
520
521/***
522****
523***/
524
525tr_bool
526tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
527                      const tr_address  * addr )
528{
529    tr_bool isSeed = FALSE;
530    const Torrent * t = tor->torrentPeers;
531    const struct peer_atom * atom = getExistingAtom( t, addr );
532
533    if( atom )
534        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
535
536    return isSeed;
537}
538
539/**
540***  REQUESTS
541***
542*** There are two data structures associated with managing block requests:
543***
544*** 1. Torrent::requests, an array of "struct block_request" which keeps
545***    track of which blocks have been requested, and when, and by which peers.
546***    This is list is used for (a) cancelling requests that have been pending
547***    for too long and (b) avoiding duplicate requests before endgame.
548***
549*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
550***    pieces that we want to request.  It's used to decide which pieces to
551***    return next when tr_peerMgrGetBlockRequests() is called.
552**/
553
554/**
555*** struct block_request
556**/
557
558enum
559{
560    REQ_UNSORTED,
561    REQ_SORTED_BY_BLOCK,
562    REQ_SORTED_BY_TIME
563};
564
565static int
566compareReqByBlock( const void * va, const void * vb )
567{
568    const struct block_request * a = va;
569    const struct block_request * b = vb;
570
571    /* primary key: block */
572    if( a->block < b->block ) return -1;
573    if( a->block > b->block ) return 1;
574
575    /* secondary key: peer */
576    if( a->peer < b->peer ) return -1;
577    if( a->peer > b->peer ) return 1;
578
579    return 0;
580}
581
582static int
583compareReqByTime( const void * va, const void * vb )
584{
585    const struct block_request * a = va;
586    const struct block_request * b = vb;
587
588    /* primary key: time */
589    if( a->sentAt < b->sentAt ) return -1;
590    if( a->sentAt > b->sentAt ) return 1;
591
592    /* secondary key: peer */
593    if( a->peer < b->peer ) return -1;
594    if( a->peer > b->peer ) return 1;
595
596    return 0;
597}
598
599static void
600requestListSort( Torrent * t, int mode )
601{
602    assert( mode==REQ_SORTED_BY_BLOCK || mode==REQ_SORTED_BY_TIME );
603
604    if( t->requestsSort != mode )
605    {
606        int(*compar)(const void *, const void *);
607
608        t->requestsSort = mode;
609
610        switch( mode ) {
611            case REQ_SORTED_BY_BLOCK: compar = compareReqByBlock; break;
612            case REQ_SORTED_BY_TIME: compar = compareReqByTime; break;
613            default: assert( 0 && "unhandled" );
614        }
615
616        qsort( t->requests, t->requestCount,
617               sizeof( struct block_request ), compar );
618    }
619}
620
621static void
622requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
623{
624    struct block_request key;
625
626    /* ensure enough room is available... */
627    if( t->requestCount + 1 >= t->requestAlloc )
628    {
629        const int CHUNK_SIZE = 128;
630        t->requestAlloc += CHUNK_SIZE;
631        t->requests = tr_renew( struct block_request,
632                                t->requests, t->requestAlloc );
633    }
634
635    /* populate the record we're inserting */
636    key.block = block;
637    key.peer = peer;
638    key.sentAt = tr_time( );
639
640    /* insert the request to our array... */
641    switch( t->requestsSort )
642    {
643        case REQ_UNSORTED:
644        case REQ_SORTED_BY_TIME:
645            t->requests[t->requestCount++] = key;
646            break;
647
648        case REQ_SORTED_BY_BLOCK: {
649            tr_bool exact;
650            const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
651                                           sizeof( struct block_request ),
652                                           compareReqByBlock, &exact );
653            assert( !exact );
654            memmove( t->requests + pos + 1,
655                     t->requests + pos,
656                     sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
657            t->requests[pos] = key;
658            break;
659        }
660    }
661
662    if( peer != NULL )
663    {
664        ++peer->pendingReqsToPeer;
665        assert( peer->pendingReqsToPeer >= 0 );
666    }
667
668    /*fprintf( stderr, "added request of block %lu from peer %s... "
669                       "there are now %d block\n",
670                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
671}
672
673static struct block_request *
674requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
675{
676    struct block_request key;
677    key.block = block;
678    key.peer = (tr_peer*) peer;
679
680    requestListSort( t, REQ_SORTED_BY_BLOCK );
681
682    return bsearch( &key, t->requests, t->requestCount,
683                    sizeof( struct block_request ),
684                    compareReqByBlock );
685}
686
687/* how many peers are we currently requesting this block from... */
688static int
689countBlockRequests( Torrent * t, tr_block_index_t block )
690{
691    tr_bool exact;
692    int i, n, pos;
693    struct block_request key;
694
695    requestListSort( t, REQ_SORTED_BY_BLOCK );
696    key.block = block;
697    key.peer = NULL;
698    pos = tr_lowerBound( &key, t->requests, t->requestCount,
699                         sizeof( struct block_request ),
700                         compareReqByBlock, &exact );
701
702    assert( !exact ); /* shouldn't have a request with .peer == NULL */
703
704    n = 0;
705    for( i=pos; i<t->requestCount; ++i ) {
706        if( t->requests[i].block == block )
707            ++n;
708        else
709            break;
710    }
711
712    return n;
713}
714
715static void
716requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
717{
718    const struct block_request * b = requestListLookup( t, block, peer );
719    if( b != NULL )
720    {
721        const int pos = b - t->requests;
722        assert( pos < t->requestCount );
723
724        if( b->peer != NULL )
725        {
726            --b->peer->pendingReqsToPeer;
727            assert( b->peer->pendingReqsToPeer >= 0 );
728        }
729
730        memmove( t->requests + pos,
731                 t->requests + pos + 1,
732                 sizeof( struct block_request ) * ( --t->requestCount - pos ) );
733        /*fprintf( stderr, "removing request of block %lu from peer %s... "
734                           "there are now %d block requests left\n",
735                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
736    }
737}
738
739/**
740*** struct weighted_piece
741**/
742
743enum
744{
745    PIECES_UNSORTED,
746    PIECES_SORTED_BY_INDEX,
747    PIECES_SORTED_BY_WEIGHT
748};
749
750const tr_torrent * weightTorrent;
751
752/* we try to create a "weight" s.t. high-priority pieces come before others,
753 * and that partially-complete pieces come before empty ones. */
754static int
755comparePieceByWeight( const void * va, const void * vb )
756{
757    const struct weighted_piece * a = va;
758    const struct weighted_piece * b = vb;
759    int ia, ib, missing, pending;
760    const tr_torrent * tor = weightTorrent;
761
762    /* primary key: weight */
763    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
764    pending = a->requestCount;
765    ia = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
766    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
767    pending = b->requestCount;
768    ib = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
769    if( ia < ib ) return -1;
770    if( ia > ib ) return 1;
771
772    /* secondary key: higher priorities go first */
773    ia = tor->info.pieces[a->index].priority;
774    ib = tor->info.pieces[b->index].priority;
775    if( ia > ib ) return -1;
776    if( ia < ib ) return 1;
777
778    /* tertiary key: random */
779    if( a->salt < b->salt ) return -1;
780    if( a->salt > b->salt ) return 1;
781
782    /* okay, they're equal */
783    return 0;
784}
785
786static int
787comparePieceByIndex( const void * va, const void * vb )
788{
789    const struct weighted_piece * a = va;
790    const struct weighted_piece * b = vb;
791    if( a->index < b->index ) return -1;
792    if( a->index > b->index ) return 1;
793    return 0;
794}
795
796static void
797pieceListSort( Torrent * t, int mode )
798{
799    int(*compar)(const void *, const void *);
800
801    assert( mode==PIECES_SORTED_BY_INDEX
802         || mode==PIECES_SORTED_BY_WEIGHT );
803
804    if( t->piecesSort != mode )
805    {
806        t->piecesSort = mode;
807
808        switch( mode ) {
809            case PIECES_SORTED_BY_WEIGHT: compar = comparePieceByWeight; break;
810            case PIECES_SORTED_BY_INDEX: compar = comparePieceByIndex; break;
811            default: assert( 0 && "unhandled" );  break;
812        }
813
814        weightTorrent = t->tor;
815        qsort( t->pieces, t->pieceCount,
816               sizeof( struct weighted_piece ), compar );
817    }
818
819    /* Also, as long as we've got the pieces sorted by weight,
820     * let's also update t.isInEndgame */
821    if( t->piecesSort == PIECES_SORTED_BY_WEIGHT )
822    {
823        tr_bool endgame = TRUE;
824
825        if( ( t->pieces != NULL ) && ( t->pieceCount > 0 ) )
826        {
827            const tr_completion * cp = &t->tor->completion;
828            const struct weighted_piece * p = t->pieces;
829            const int pending = p->requestCount;
830            const int missing = tr_cpMissingBlocksInPiece( cp, p->index );
831            endgame = pending >= missing;
832        }
833
834        t->isInEndgame = endgame;
835        /*if( t->isInEndgame ) fprintf( stderr, "ENDGAME reached\n" );*/
836    }
837}
838
839static struct weighted_piece *
840pieceListLookup( Torrent * t, tr_piece_index_t index )
841{
842    struct weighted_piece key;
843    key.index = index;
844
845    pieceListSort( t, PIECES_SORTED_BY_INDEX );
846
847    return bsearch( &key, t->pieces, t->pieceCount,
848                    sizeof( struct weighted_piece ),
849                    comparePieceByIndex );
850}
851
852static void
853pieceListRebuild( Torrent * t )
854{
855    if( !tr_torrentIsSeed( t->tor ) )
856    {
857        tr_piece_index_t i;
858        tr_piece_index_t * pool;
859        tr_piece_index_t poolCount = 0;
860        const tr_torrent * tor = t->tor;
861        const tr_info * inf = tr_torrentInfo( tor );
862        const tr_bool isStreaming = tr_torrentIsStreaming( tor );
863        struct weighted_piece * pieces;
864        int pieceCount;
865
866        /* build the new list */
867        pool = tr_new( tr_piece_index_t, inf->pieceCount );
868        for( i=0; i<inf->pieceCount; ++i )
869            if( !inf->pieces[i].dnd )
870                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
871                    pool[poolCount++] = i;
872        pieceCount = poolCount;
873        pieces = tr_new0( struct weighted_piece, pieceCount );
874        for( i=0; i<poolCount; ++i ) {
875            struct weighted_piece * piece = pieces + i;
876            piece->index = pool[i];
877            piece->requestCount = 0;
878            piece->salt = isStreaming ? piece->index
879                                      : (tr_piece_index_t)tr_cryptoWeakRandInt( 4096 );
880        }
881
882        /* if we already had a list of pieces, merge it into
883         * the new list so we don't lose its requestCounts */
884        if( t->pieces != NULL )
885        {
886            struct weighted_piece * o = t->pieces;
887            struct weighted_piece * oend = o + t->pieceCount;
888            struct weighted_piece * n = pieces;
889            struct weighted_piece * nend = n + pieceCount;
890
891            pieceListSort( t, PIECES_SORTED_BY_INDEX );
892
893            while( o!=oend && n!=nend ) {
894                if( o->index < n->index )
895                    ++o;
896                else if( o->index > n->index )
897                    ++n;
898                else
899                    *n++ = *o++;
900            }
901
902            tr_free( t->pieces );
903        }
904
905        t->pieces = pieces;
906        t->pieceCount = pieceCount;
907        t->piecesSort = PIECES_SORTED_BY_INDEX;
908
909        /* cleanup */
910        tr_free( pool );
911    }
912}
913
914static void
915pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
916{
917    struct weighted_piece * p = pieceListLookup( t, piece );
918
919    if( p != NULL )
920    {
921        const int pos = p - t->pieces;
922
923        memmove( t->pieces + pos,
924                 t->pieces + pos + 1,
925                 sizeof( struct weighted_piece ) * ( --t->pieceCount - pos ) );
926
927        if( t->pieceCount == 0 )
928        {
929            tr_free( t->pieces );
930            t->pieces = NULL;
931        }
932    }
933}
934
935static void
936pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
937{
938    struct weighted_piece * p;
939    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
940
941    if(( p = pieceListLookup( t, index )))
942        if( p->requestCount > 0 )
943            --p->requestCount;
944
945    /* note: this invalidates the weighted.piece.weight field,
946     * but that's OK since the call to pieceListLookup ensured
947     * that we were sorted by index anyway.. next time we resort
948     * by weight, pieceListSort() will update the weights */
949}
950
951/**
952***
953**/
954
955void
956tr_peerMgrRebuildRequests( tr_torrent * tor )
957{
958    assert( tr_isTorrent( tor ) );
959
960    pieceListRebuild( tor->torrentPeers );
961}
962
963void
964tr_peerMgrGetNextRequests( tr_torrent           * tor,
965                           tr_peer              * peer,
966                           int                    numwant,
967                           tr_block_index_t     * setme,
968                           int                  * numgot )
969{
970    int i;
971    int got;
972    Torrent * t;
973    struct weighted_piece * pieces;
974    const tr_bitset * have = &peer->have;
975
976    /* sanity clause */
977    assert( tr_isTorrent( tor ) );
978    assert( numwant > 0 );
979
980    /* walk through the pieces and find blocks that should be requested */
981    got = 0;
982    t = tor->torrentPeers;
983
984    /* prep the pieces list */
985    if( t->pieces == NULL )
986        pieceListRebuild( t );
987    pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
988
989    pieces = t->pieces;
990    for( i=0; i<t->pieceCount && got<numwant; ++i )
991    {
992        struct weighted_piece * p = pieces + i;
993        const int missing = tr_cpMissingBlocksInPiece( &tor->completion, p->index );
994        const int maxDuplicatesPerBlock = t->isInEndgame ? 3 : 1;
995
996        if( p->requestCount > ( missing * maxDuplicatesPerBlock ) )
997            continue;
998
999        /* if the peer has this piece that we want... */
1000        if( tr_bitsetHasFast( have, p->index ) )
1001        {
1002            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
1003            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
1004
1005            for( ; b!=e && got<numwant; ++b )
1006            {
1007                /* don't request blocks we've already got */
1008                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
1009                    continue;
1010
1011                /* don't send the same request to the same peer twice */
1012                if( tr_peerMgrDidPeerRequest( tor, peer, b ) )
1013                    continue;
1014
1015                /* don't send the same request to any peer too many times */
1016                if( countBlockRequests( t, b ) >= maxDuplicatesPerBlock )
1017                    continue;
1018
1019                /* update the caller's table */
1020                setme[got++] = b;
1021
1022                /* update our own tables */
1023                requestListAdd( t, b, peer );
1024                ++p->requestCount;
1025            }
1026        }
1027    }
1028
1029    /* In most cases we've just changed the weights of a small number of pieces.
1030     * So rather than qsort()ing the entire array, it's faster to sort only the
1031     * changed ones, then do a standard merge-two-sorted-arrays pass on the
1032     * changed and unchanged pieces. */
1033    if( got > 0 )
1034    {
1035        struct weighted_piece * p;
1036        struct weighted_piece * pieces;
1037        struct weighted_piece * a = t->pieces;
1038        struct weighted_piece * a_end = t->pieces + i;
1039        struct weighted_piece * b = a_end;
1040        struct weighted_piece * b_end = t->pieces + t->pieceCount;
1041
1042        /* resort the pieces that we changed */
1043        weightTorrent = t->tor;
1044        qsort( a, a_end-a, sizeof( struct weighted_piece ), comparePieceByWeight );
1045
1046        /* allocate a new array */
1047        p = pieces = tr_new( struct weighted_piece, t->pieceCount );
1048
1049        /* merge the two sorted arrays into this new array */
1050        weightTorrent = t->tor;
1051        while( a!=a_end && b!=b_end )
1052            *p++ = comparePieceByWeight( a, b ) < 0 ? *a++ : *b++;
1053        while( a!=a_end ) *p++ = *a++;
1054        while( b!=b_end ) *p++ = *b++;
1055
1056#if 0
1057        /* make sure we did it right */
1058        assert( p - pieces == t->pieceCount );
1059        for( it=pieces; it+1<p; ++it )
1060            assert( it->weight <= it[1].weight );
1061#endif
1062
1063        /* update */
1064        tr_free( t->pieces );
1065        t->pieces = pieces;
1066    }
1067
1068    *numgot = got;
1069}
1070
1071tr_bool
1072tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1073                          const tr_peer     * peer,
1074                          tr_block_index_t    block )
1075{
1076    const Torrent * t = tor->torrentPeers;
1077    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1078}
1079
1080/* cancel requests that are too old */
1081static void
1082refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1083{
1084    time_t now;
1085    time_t too_old;
1086    tr_torrent * tor;
1087    tr_peerMgr * mgr = vmgr;
1088    managerLock( mgr );
1089
1090    now = tr_time( );
1091    too_old = now - REQUEST_TTL_SECS;
1092
1093    tor = NULL;
1094    while(( tor = tr_torrentNext( mgr->session, tor )))
1095    {
1096        Torrent * t = tor->torrentPeers;
1097        const int n = t->requestCount;
1098        if( n > 0 )
1099        {
1100            int keepCount = 0;
1101            int cancelCount = 0;
1102            struct block_request * keep = tr_new( struct block_request, n );
1103            struct block_request * cancel = tr_new( struct block_request, n );
1104            const struct block_request * it;
1105            const struct block_request * end;
1106
1107            for( it=t->requests, end=it+n; it!=end; ++it )
1108                if( it->sentAt <= too_old )
1109                    cancel[cancelCount++] = *it;
1110                else
1111                    keep[keepCount++] = *it;
1112
1113            /* prune out the ones we aren't keeping */
1114            tr_free( t->requests );
1115            t->requests = keep;
1116            t->requestCount = keepCount;
1117            t->requestAlloc = n;
1118
1119            /* send cancel messages for all the "cancel" ones */
1120            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1121                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) )
1122                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1123
1124            /* decrement the pending request counts for the timed-out blocks */
1125            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1126                pieceListRemoveRequest( t, it->block );
1127
1128            /* cleanup loop */
1129            tr_free( cancel );
1130        }
1131    }
1132
1133    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1134    managerUnlock( mgr );
1135}
1136
1137static void
1138addStrike( Torrent * t, tr_peer * peer )
1139{
1140    tordbg( t, "increasing peer %s strike count to %d",
1141            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1142
1143    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1144    {
1145        struct peer_atom * atom = peer->atom;
1146        atom->myflags |= MYFLAG_BANNED;
1147        peer->doPurge = 1;
1148        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1149    }
1150}
1151
1152static void
1153gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1154{
1155    tr_torrent *   tor = t->tor;
1156    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1157
1158    tor->corruptCur += byteCount;
1159    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1160}
1161
1162static void
1163peerSuggestedPiece( Torrent            * t UNUSED,
1164                    tr_peer            * peer UNUSED,
1165                    tr_piece_index_t     pieceIndex UNUSED,
1166                    int                  isFastAllowed UNUSED )
1167{
1168#if 0
1169    assert( t );
1170    assert( peer );
1171    assert( peer->msgs );
1172
1173    /* is this a valid piece? */
1174    if(  pieceIndex >= t->tor->info.pieceCount )
1175        return;
1176
1177    /* don't ask for it if we've already got it */
1178    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1179        return;
1180
1181    /* don't ask for it if they don't have it */
1182    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1183        return;
1184
1185    /* don't ask for it if we're choked and it's not fast */
1186    if( !isFastAllowed && peer->clientIsChoked )
1187        return;
1188
1189    /* request the blocks that we don't have in this piece */
1190    {
1191        tr_block_index_t block;
1192        const tr_torrent * tor = t->tor;
1193        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
1194        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
1195
1196        for( block=start; block<end; ++block )
1197        {
1198            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1199            {
1200                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1201                const uint32_t length = tr_torBlockCountBytes( tor, block );
1202                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1203                incrementPieceRequests( t, pieceIndex );
1204            }
1205        }
1206    }
1207#endif
1208}
1209
1210static void
1211decrementDownloadedCount( tr_torrent * tor, uint32_t byteCount )
1212{
1213    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1214}
1215
1216static void
1217clientGotUnwantedBlock( tr_torrent * tor, tr_block_index_t block )
1218{
1219    decrementDownloadedCount( tor, tr_torBlockCountBytes( tor, block ) );
1220}
1221
1222static void
1223removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1224{
1225    requestListRemove( t, block, peer );
1226    pieceListRemoveRequest( t, block );
1227}
1228
1229/* peer choked us, or maybe it disconnected.
1230   either way we need to remove all its requests */
1231static void
1232peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1233{
1234    int i, n;
1235    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1236
1237    for( i=n=0; i<t->requestCount; ++i )
1238        if( peer == t->requests[i].peer )
1239            blocks[n++] = t->requests[i].block;
1240
1241    for( i=0; i<n; ++i )
1242        removeRequestFromTables( t, blocks[i], peer );
1243
1244    tr_free( blocks );
1245}
1246
1247static void
1248peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1249{
1250    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1251    Torrent * t = vt;
1252    const tr_peer_event * e = vevent;
1253
1254    torrentLock( t );
1255
1256    switch( e->eventType )
1257    {
1258        case TR_PEER_UPLOAD_ONLY:
1259            /* update our atom */
1260            if( peer ) {
1261                if( e->uploadOnly ) {
1262                    peer->atom->uploadOnly = UPLOAD_ONLY_YES;
1263                    peer->atom->flags |= ADDED_F_SEED_FLAG;
1264                } else {
1265                    peer->atom->uploadOnly = UPLOAD_ONLY_NO;
1266                    peer->atom->flags &= ~ADDED_F_SEED_FLAG;
1267                }
1268            }
1269            break;
1270
1271        case TR_PEER_PEER_GOT_DATA:
1272        {
1273            const time_t now = tr_time( );
1274            tr_torrent * tor = t->tor;
1275
1276            tr_torrentSetActivityDate( tor, now );
1277
1278            if( e->wasPieceData ) {
1279                tor->uploadedCur += e->length;
1280                tr_torrentSetDirty( tor );
1281            }
1282
1283            /* update the stats */
1284            if( e->wasPieceData )
1285                tr_statsAddUploaded( tor->session, e->length );
1286
1287            /* update our atom */
1288            if( peer && e->wasPieceData )
1289                peer->atom->piece_data_time = now;
1290
1291            tor->needsSeedRatioCheck = TRUE;
1292
1293            break;
1294        }
1295
1296        case TR_PEER_CLIENT_GOT_REJ:
1297            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ), peer );
1298            break;
1299
1300        case TR_PEER_CLIENT_GOT_CHOKE:
1301            peerDeclinedAllRequests( t, peer );
1302            break;
1303
1304        case TR_PEER_CLIENT_GOT_PORT:
1305            if( peer )
1306                peer->atom->port = e->port;
1307            break;
1308
1309        case TR_PEER_CLIENT_GOT_SUGGEST:
1310            if( peer )
1311                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1312            break;
1313
1314        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1315            if( peer )
1316                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1317            break;
1318
1319        case TR_PEER_CLIENT_GOT_DATA:
1320        {
1321            const time_t now = tr_time( );
1322            tr_torrent * tor = t->tor;
1323
1324            tr_torrentSetActivityDate( tor, now );
1325
1326            /* only add this to downloadedCur if we got it from a peer --
1327             * webseeds shouldn't count against our ratio.  As one tracker
1328             * admin put it, "Those pieces are downloaded directly from the
1329             * content distributor, not the peers, it is the tracker's job
1330             * to manage the swarms, not the web server and does not fit
1331             * into the jurisdiction of the tracker." */
1332            if( peer && e->wasPieceData ) {
1333                tor->downloadedCur += e->length;
1334                tr_torrentSetDirty( tor );
1335            }
1336
1337            /* update the stats */
1338            if( e->wasPieceData )
1339                tr_statsAddDownloaded( tor->session, e->length );
1340
1341            /* update our atom */
1342            if( peer && e->wasPieceData )
1343                peer->atom->piece_data_time = now;
1344
1345            break;
1346        }
1347
1348        case TR_PEER_PEER_PROGRESS:
1349        {
1350            if( peer )
1351            {
1352                struct peer_atom * atom = peer->atom;
1353                if( e->progress >= 1.0 ) {
1354                    tordbg( t, "marking peer %s as a seed",
1355                            tr_atomAddrStr( atom ) );
1356                    atom->flags |= ADDED_F_SEED_FLAG;
1357                }
1358            }
1359            break;
1360        }
1361
1362        case TR_PEER_CLIENT_GOT_BLOCK:
1363        {
1364            tr_torrent * tor = t->tor;
1365            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1366
1367            requestListRemove( t, block, peer );
1368            pieceListRemoveRequest( t, block );
1369
1370            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1371            {
1372                tordbg( t, "we have this block already..." );
1373                clientGotUnwantedBlock( tor, block );
1374            }
1375            else
1376            {
1377                tr_cpBlockAdd( &tor->completion, block );
1378                tr_torrentSetDirty( tor );
1379
1380                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1381                {
1382                    const tr_piece_index_t p = e->pieceIndex;
1383                    const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1384
1385                    if( !ok )
1386                    {
1387                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1388                                   (unsigned long)p );
1389                    }
1390
1391                    tr_torrentSetHasPiece( tor, p, ok );
1392                    tr_torrentSetPieceChecked( tor, p, TRUE );
1393                    tr_peerMgrSetBlame( tor, p, ok );
1394
1395                    if( !ok )
1396                    {
1397                        gotBadPiece( t, p );
1398                    }
1399                    else
1400                    {
1401                        int i;
1402                        int peerCount;
1403                        tr_peer ** peers;
1404                        tr_file_index_t fileIndex;
1405
1406                        peerCount = tr_ptrArraySize( &t->peers );
1407                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1408                        for( i=0; i<peerCount; ++i )
1409                            tr_peerMsgsHave( peers[i]->msgs, p );
1410
1411                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1412                            const tr_file * file = &tor->info.files[fileIndex];
1413                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) )
1414                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1415                                    tr_torrentFileCompleted( tor, fileIndex );
1416                        }
1417
1418                        pieceListRemovePiece( t, p );
1419                    }
1420                }
1421
1422                t->needsCompletenessCheck = TRUE;
1423            }
1424            break;
1425        }
1426
1427        case TR_PEER_ERROR:
1428            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1429            {
1430                /* some protocol error from the peer */
1431                peer->doPurge = 1;
1432                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1433                        tr_atomAddrStr( peer->atom ) );
1434            }
1435            else
1436            {
1437                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1438            }
1439            break;
1440
1441        default:
1442            assert( 0 );
1443    }
1444
1445    torrentUnlock( t );
1446}
1447
1448static int
1449getDefaultShelfLife( uint8_t from )
1450{
1451    /* in general, peers obtained from firsthand contact
1452     * are better than those from secondhand, etc etc */
1453    switch( from )
1454    {
1455        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1456        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1457        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1458        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1459        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1460        case TR_PEER_FROM_RESUME   : return 60 * 60;
1461        default                    : return 60 * 60;
1462    }
1463}
1464
1465
1466static void
1467ensureAtomExists( Torrent           * t,
1468                  const tr_address  * addr,
1469                  const tr_port       port,
1470                  const uint8_t       flags,
1471                  const uint8_t       from )
1472{
1473    assert( tr_isAddress( addr ) );
1474    assert( from < TR_PEER_FROM__MAX );
1475
1476    if( getExistingAtom( t, addr ) == NULL )
1477    {
1478        struct peer_atom * a;
1479        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1480
1481        a = tr_new0( struct peer_atom, 1 );
1482        a->addr = *addr;
1483        a->port = port;
1484        a->flags = flags;
1485        a->from = from;
1486        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1487        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1488
1489        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1490    }
1491}
1492
1493static int
1494getMaxPeerCount( const tr_torrent * tor )
1495{
1496    return tor->maxConnectedPeers;
1497}
1498
1499static int
1500getPeerCount( const Torrent * t )
1501{
1502    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1503}
1504
1505/* FIXME: this is kind of a mess. */
1506static tr_bool
1507myHandshakeDoneCB( tr_handshake  * handshake,
1508                   tr_peerIo     * io,
1509                   tr_bool         isConnected,
1510                   const uint8_t * peer_id,
1511                   void          * vmanager )
1512{
1513    tr_bool            ok = isConnected;
1514    tr_bool            success = FALSE;
1515    tr_port            port;
1516    const tr_address * addr;
1517    tr_peerMgr       * manager = vmanager;
1518    Torrent          * t;
1519    tr_handshake     * ours;
1520
1521    assert( io );
1522    assert( tr_isBool( ok ) );
1523
1524    t = tr_peerIoHasTorrentHash( io )
1525        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1526        : NULL;
1527
1528    if( tr_peerIoIsIncoming ( io ) )
1529        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1530                                        handshake, handshakeCompare );
1531    else if( t )
1532        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1533                                        handshake, handshakeCompare );
1534    else
1535        ours = handshake;
1536
1537    assert( ours );
1538    assert( ours == handshake );
1539
1540    if( t )
1541        torrentLock( t );
1542
1543    addr = tr_peerIoGetAddress( io, &port );
1544
1545    if( !ok || !t || !t->isRunning )
1546    {
1547        if( t )
1548        {
1549            struct peer_atom * atom = getExistingAtom( t, addr );
1550            if( atom )
1551                ++atom->numFails;
1552        }
1553    }
1554    else /* looking good */
1555    {
1556        struct peer_atom * atom;
1557
1558        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1559        atom = getExistingAtom( t, addr );
1560        atom->time = tr_time( );
1561        atom->piece_data_time = 0;
1562
1563        if( atom->myflags & MYFLAG_BANNED )
1564        {
1565            tordbg( t, "banned peer %s tried to reconnect",
1566                    tr_atomAddrStr( atom ) );
1567        }
1568        else if( tr_peerIoIsIncoming( io )
1569               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1570
1571        {
1572        }
1573        else
1574        {
1575            tr_peer * peer = atom->peer;
1576
1577            if( peer ) /* we already have this peer */
1578            {
1579            }
1580            else
1581            {
1582                peer = getPeer( t, atom );
1583                tr_free( peer->client );
1584
1585                if( !peer_id )
1586                    peer->client = NULL;
1587                else {
1588                    char client[128];
1589                    tr_clientForId( client, sizeof( client ), peer_id );
1590                    peer->client = tr_strdup( client );
1591                }
1592
1593                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1594                                                                balanced by our unref in peerDestructor()  */
1595                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1596                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1597
1598                success = TRUE;
1599            }
1600        }
1601    }
1602
1603    if( t )
1604        torrentUnlock( t );
1605
1606    return success;
1607}
1608
1609void
1610tr_peerMgrAddIncoming( tr_peerMgr * manager,
1611                       tr_address * addr,
1612                       tr_port      port,
1613                       int          socket )
1614{
1615    tr_session * session;
1616
1617    managerLock( manager );
1618
1619    assert( tr_isSession( manager->session ) );
1620    session = manager->session;
1621
1622    if( tr_sessionIsAddressBlocked( session, addr ) )
1623    {
1624        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1625        tr_netClose( session, socket );
1626    }
1627    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1628    {
1629        tr_netClose( session, socket );
1630    }
1631    else /* we don't have a connection to them yet... */
1632    {
1633        tr_peerIo *    io;
1634        tr_handshake * handshake;
1635
1636        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket );
1637
1638        handshake = tr_handshakeNew( io,
1639                                     session->encryptionMode,
1640                                     myHandshakeDoneCB,
1641                                     manager );
1642
1643        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1644
1645        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1646                                 handshakeCompare );
1647    }
1648
1649    managerUnlock( manager );
1650}
1651
1652static tr_bool
1653tr_isPex( const tr_pex * pex )
1654{
1655    return pex && tr_isAddress( &pex->addr );
1656}
1657
1658void
1659tr_peerMgrAddPex( tr_torrent   *  tor,
1660                  uint8_t         from,
1661                  const tr_pex *  pex )
1662{
1663    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1664    {
1665        Torrent * t = tor->torrentPeers;
1666        managerLock( t->manager );
1667
1668        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1669            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1670                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1671
1672        managerUnlock( t->manager );
1673    }
1674}
1675
1676tr_pex *
1677tr_peerMgrCompactToPex( const void *    compact,
1678                        size_t          compactLen,
1679                        const uint8_t * added_f,
1680                        size_t          added_f_len,
1681                        size_t *        pexCount )
1682{
1683    size_t          i;
1684    size_t          n = compactLen / 6;
1685    const uint8_t * walk = compact;
1686    tr_pex *        pex = tr_new0( tr_pex, n );
1687
1688    for( i = 0; i < n; ++i )
1689    {
1690        pex[i].addr.type = TR_AF_INET;
1691        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1692        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1693        if( added_f && ( n == added_f_len ) )
1694            pex[i].flags = added_f[i];
1695    }
1696
1697    *pexCount = n;
1698    return pex;
1699}
1700
1701tr_pex *
1702tr_peerMgrCompact6ToPex( const void    * compact,
1703                         size_t          compactLen,
1704                         const uint8_t * added_f,
1705                         size_t          added_f_len,
1706                         size_t        * pexCount )
1707{
1708    size_t          i;
1709    size_t          n = compactLen / 18;
1710    const uint8_t * walk = compact;
1711    tr_pex *        pex = tr_new0( tr_pex, n );
1712
1713    for( i = 0; i < n; ++i )
1714    {
1715        pex[i].addr.type = TR_AF_INET6;
1716        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1717        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1718        if( added_f && ( n == added_f_len ) )
1719            pex[i].flags = added_f[i];
1720    }
1721
1722    *pexCount = n;
1723    return pex;
1724}
1725
1726tr_pex *
1727tr_peerMgrArrayToPex( const void * array,
1728                      size_t       arrayLen,
1729                      size_t      * pexCount )
1730{
1731    size_t          i;
1732    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1733    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1734    const uint8_t * walk = array;
1735    tr_pex        * pex = tr_new0( tr_pex, n );
1736
1737    for( i = 0 ; i < n ; i++ ) {
1738        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1739        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1740        pex[i].flags = 0x00;
1741        walk += sizeof( tr_address ) + 2;
1742    }
1743
1744    *pexCount = n;
1745    return pex;
1746}
1747
1748/**
1749***
1750**/
1751
1752void
1753tr_peerMgrSetBlame( tr_torrent     * tor,
1754                    tr_piece_index_t pieceIndex,
1755                    int              success )
1756{
1757    if( !success )
1758    {
1759        int        peerCount, i;
1760        Torrent *  t = tor->torrentPeers;
1761        tr_peer ** peers;
1762
1763        assert( torrentIsLocked( t ) );
1764
1765        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1766        for( i = 0; i < peerCount; ++i )
1767        {
1768            tr_peer * peer = peers[i];
1769            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1770            {
1771                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1772                        tr_atomAddrStr( peer->atom ),
1773                        pieceIndex, (int)peer->strikes + 1 );
1774                addStrike( t, peer );
1775            }
1776        }
1777    }
1778}
1779
1780int
1781tr_pexCompare( const void * va, const void * vb )
1782{
1783    const tr_pex * a = va;
1784    const tr_pex * b = vb;
1785    int i;
1786
1787    assert( tr_isPex( a ) );
1788    assert( tr_isPex( b ) );
1789
1790    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1791        return i;
1792
1793    if( a->port != b->port )
1794        return a->port < b->port ? -1 : 1;
1795
1796    return 0;
1797}
1798
1799#if 0
1800static int
1801peerPrefersCrypto( const tr_peer * peer )
1802{
1803    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1804        return TRUE;
1805
1806    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1807        return FALSE;
1808
1809    return tr_peerIoIsEncrypted( peer->io );
1810}
1811#endif
1812
1813/* better goes first */
1814static int
1815compareAtomsByUsefulness( const void * va, const void *vb )
1816{
1817    const struct peer_atom * a = * (const struct peer_atom**) va;
1818    const struct peer_atom * b = * (const struct peer_atom**) vb;
1819
1820    assert( tr_isAtom( a ) );
1821    assert( tr_isAtom( b ) );
1822
1823    if( a->piece_data_time != b->piece_data_time )
1824        return a->piece_data_time > b->piece_data_time ? -1 : 1;
1825    if( a->from != b->from )
1826        return a->from < b->from ? -1 : 1;
1827    if( a->numFails != b->numFails )
1828        return a->numFails < b->numFails ? -1 : 1;
1829
1830    return 0;
1831}
1832
1833int
1834tr_peerMgrGetPeers( tr_torrent   * tor,
1835                    tr_pex      ** setme_pex,
1836                    uint8_t        af,
1837                    uint8_t        list_mode,
1838                    int            maxCount )
1839{
1840    int i;
1841    int n;
1842    int count = 0;
1843    int atomCount = 0;
1844    const Torrent * t = tor->torrentPeers;
1845    struct peer_atom ** atoms = NULL;
1846    tr_pex * pex;
1847    tr_pex * walk;
1848
1849    assert( tr_isTorrent( tor ) );
1850    assert( setme_pex != NULL );
1851    assert( af==TR_AF_INET || af==TR_AF_INET6 );
1852    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
1853
1854    managerLock( t->manager );
1855
1856    /**
1857    ***  build a list of atoms
1858    **/
1859
1860    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
1861    {
1862        int i;
1863        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1864        atomCount = tr_ptrArraySize( &t->peers );
1865        atoms = tr_new( struct peer_atom *, atomCount );
1866        for( i=0; i<atomCount; ++i )
1867            atoms[i] = peers[i]->atom;
1868    }
1869    else /* TR_PEERS_ALL */
1870    {
1871        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
1872        atomCount = tr_ptrArraySize( &t->pool );
1873        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
1874    }
1875
1876    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
1877
1878    /**
1879    ***  add the first N of them into our return list
1880    **/
1881
1882    n = MIN( atomCount, maxCount );
1883    pex = walk = tr_new0( tr_pex, n );
1884
1885    for( i=0; i<atomCount && count<n; ++i )
1886    {
1887        const struct peer_atom * atom = atoms[i];
1888        if( atom->addr.type == af )
1889        {
1890            assert( tr_isAddress( &atom->addr ) );
1891            walk->addr = atom->addr;
1892            walk->port = atom->port;
1893            walk->flags = atom->flags;
1894            ++count;
1895            ++walk;
1896        }
1897    }
1898
1899    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
1900
1901    assert( ( walk - pex ) == count );
1902    *setme_pex = pex;
1903
1904    /* cleanup */
1905    tr_free( atoms );
1906    managerUnlock( t->manager );
1907    return count;
1908}
1909
1910static void atomPulse      ( int, short, void * );
1911static void bandwidthPulse ( int, short, void * );
1912static void rechokePulse   ( int, short, void * );
1913static void reconnectPulse ( int, short, void * );
1914
1915static struct event *
1916createTimer( int msec, void (*callback)(int, short, void *), void * cbdata )
1917{
1918    struct event * timer = tr_new0( struct event, 1 );
1919    evtimer_set( timer, callback, cbdata );
1920    tr_timerAddMsec( timer, msec );
1921    return timer;
1922}
1923
1924static void
1925ensureMgrTimersExist( struct tr_peerMgr * m )
1926{
1927    if( m->atomTimer == NULL )
1928        m->atomTimer = createTimer( ATOM_PERIOD_MSEC, atomPulse, m );
1929
1930    if( m->bandwidthTimer == NULL )
1931        m->bandwidthTimer = createTimer( BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
1932
1933    if( m->rechokeTimer == NULL )
1934        m->rechokeTimer = createTimer( RECHOKE_PERIOD_MSEC, rechokePulse, m );
1935
1936    if( m->reconnectTimer == NULL )
1937        m->reconnectTimer = createTimer( RECONNECT_PERIOD_MSEC, reconnectPulse, m );
1938
1939    if( m->refillUpkeepTimer == NULL )
1940        m->refillUpkeepTimer = createTimer( REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
1941}
1942
1943void
1944tr_peerMgrStartTorrent( tr_torrent * tor )
1945{
1946    Torrent * t = tor->torrentPeers;
1947
1948    assert( t != NULL );
1949    managerLock( t->manager );
1950    ensureMgrTimersExist( t->manager );
1951
1952    t->isRunning = TRUE;
1953
1954    rechokePulse( 0, 0, t->manager );
1955    managerUnlock( t->manager );
1956}
1957
1958static void
1959stopTorrent( Torrent * t )
1960{
1961    int i, n;
1962
1963    assert( torrentIsLocked( t ) );
1964
1965    t->isRunning = FALSE;
1966
1967    /* disconnect the peers. */
1968    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
1969        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
1970    tr_ptrArrayClear( &t->peers );
1971
1972    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1973     * which removes the handshake from t->outgoingHandshakes... */
1974    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1975        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1976}
1977
1978void
1979tr_peerMgrStopTorrent( tr_torrent * tor )
1980{
1981    Torrent * t = tor->torrentPeers;
1982
1983    managerLock( t->manager );
1984
1985    stopTorrent( t );
1986
1987    managerUnlock( t->manager );
1988}
1989
1990void
1991tr_peerMgrAddTorrent( tr_peerMgr * manager,
1992                      tr_torrent * tor )
1993{
1994    managerLock( manager );
1995
1996    assert( tor );
1997    assert( tor->torrentPeers == NULL );
1998
1999    tor->torrentPeers = torrentConstructor( manager, tor );
2000
2001    managerUnlock( manager );
2002}
2003
2004void
2005tr_peerMgrRemoveTorrent( tr_torrent * tor )
2006{
2007    tr_torrentLock( tor );
2008
2009    stopTorrent( tor->torrentPeers );
2010    torrentDestructor( tor->torrentPeers );
2011
2012    tr_torrentUnlock( tor );
2013}
2014
2015void
2016tr_peerMgrTorrentAvailability( const tr_torrent * tor,
2017                               int8_t           * tab,
2018                               unsigned int       tabCount )
2019{
2020    tr_piece_index_t   i;
2021    const Torrent *    t;
2022    float              interval;
2023    tr_bool            isSeed;
2024    int                peerCount;
2025    const tr_peer **   peers;
2026    tr_torrentLock( tor );
2027
2028    t = tor->torrentPeers;
2029    tor = t->tor;
2030    interval = tor->info.pieceCount / (float)tabCount;
2031    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
2032    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2033    peerCount = tr_ptrArraySize( &t->peers );
2034
2035    memset( tab, 0, tabCount );
2036
2037    for( i = 0; tor && i < tabCount; ++i )
2038    {
2039        const int piece = i * interval;
2040
2041        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2042            tab[i] = -1;
2043        else if( peerCount ) {
2044            int j;
2045            for( j = 0; j < peerCount; ++j )
2046                if( tr_bitsetHas( &peers[j]->have, i ) )
2047                    ++tab[i];
2048        }
2049    }
2050
2051    tr_torrentUnlock( tor );
2052}
2053
2054/* Returns the pieces that are available from peers */
2055tr_bitfield*
2056tr_peerMgrGetAvailable( const tr_torrent * tor )
2057{
2058    int i;
2059    int peerCount;
2060    Torrent * t = tor->torrentPeers;
2061    const tr_peer ** peers;
2062    tr_bitfield * pieces;
2063    managerLock( t->manager );
2064
2065    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
2066    peerCount = tr_ptrArraySize( &t->peers );
2067    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2068    for( i=0; i<peerCount; ++i )
2069        tr_bitsetOr( pieces, &peers[i]->have );
2070
2071    managerUnlock( t->manager );
2072    return pieces;
2073}
2074
2075void
2076tr_peerMgrTorrentStats( tr_torrent       * tor,
2077                        int              * setmePeersKnown,
2078                        int              * setmePeersConnected,
2079                        int              * setmeSeedsConnected,
2080                        int              * setmeWebseedsSendingToUs,
2081                        int              * setmePeersSendingToUs,
2082                        int              * setmePeersGettingFromUs,
2083                        int              * setmePeersFrom )
2084{
2085    int i, size;
2086    const Torrent * t = tor->torrentPeers;
2087    const tr_peer ** peers;
2088    const tr_webseed ** webseeds;
2089
2090    managerLock( t->manager );
2091
2092    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2093    size = tr_ptrArraySize( &t->peers );
2094
2095    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
2096    *setmePeersConnected       = 0;
2097    *setmeSeedsConnected       = 0;
2098    *setmePeersGettingFromUs   = 0;
2099    *setmePeersSendingToUs     = 0;
2100    *setmeWebseedsSendingToUs  = 0;
2101
2102    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2103        setmePeersFrom[i] = 0;
2104
2105    for( i=0; i<size; ++i )
2106    {
2107        const tr_peer * peer = peers[i];
2108        const struct peer_atom * atom = peer->atom;
2109
2110        if( peer->io == NULL ) /* not connected */
2111            continue;
2112
2113        ++*setmePeersConnected;
2114
2115        ++setmePeersFrom[atom->from];
2116
2117        if( clientIsDownloadingFrom( tor, peer ) )
2118            ++*setmePeersSendingToUs;
2119
2120        if( clientIsUploadingTo( peer ) )
2121            ++*setmePeersGettingFromUs;
2122
2123        if( atom->flags & ADDED_F_SEED_FLAG )
2124            ++*setmeSeedsConnected;
2125    }
2126
2127    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2128    size = tr_ptrArraySize( &t->webseeds );
2129    for( i=0; i<size; ++i )
2130        if( tr_webseedIsActive( webseeds[i] ) )
2131            ++*setmeWebseedsSendingToUs;
2132
2133    managerUnlock( t->manager );
2134}
2135
2136float
2137tr_peerMgrGetWebseedSpeed( const tr_torrent * tor, uint64_t now )
2138{
2139    int i;
2140    float tmp;
2141    float ret = 0;
2142
2143    const Torrent * t = tor->torrentPeers;
2144    const int n = tr_ptrArraySize( &t->webseeds );
2145    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2146
2147    for( i=0; i<n; ++i )
2148        if( tr_webseedGetSpeed( webseeds[i], now, &tmp ) )
2149            ret += tmp;
2150
2151    return ret;
2152}
2153
2154
2155float*
2156tr_peerMgrWebSpeeds( const tr_torrent * tor )
2157{
2158    const Torrent * t = tor->torrentPeers;
2159    const tr_webseed ** webseeds;
2160    int i;
2161    int webseedCount;
2162    float * ret;
2163    uint64_t now;
2164
2165    assert( t->manager );
2166    managerLock( t->manager );
2167
2168    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2169    webseedCount = tr_ptrArraySize( &t->webseeds );
2170    assert( webseedCount == tor->info.webseedCount );
2171    ret = tr_new0( float, webseedCount );
2172    now = tr_date( );
2173
2174    for( i=0; i<webseedCount; ++i )
2175        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
2176            ret[i] = -1.0;
2177
2178    managerUnlock( t->manager );
2179    return ret;
2180}
2181
2182double
2183tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
2184{
2185    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
2186}
2187
2188
2189struct tr_peer_stat *
2190tr_peerMgrPeerStats( const tr_torrent    * tor,
2191                     int                 * setmeCount )
2192{
2193    int i, size;
2194    const Torrent * t = tor->torrentPeers;
2195    const tr_peer ** peers;
2196    tr_peer_stat * ret;
2197    uint64_t now;
2198
2199    assert( t->manager );
2200    managerLock( t->manager );
2201
2202    size = tr_ptrArraySize( &t->peers );
2203    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2204    ret = tr_new0( tr_peer_stat, size );
2205    now = tr_date( );
2206
2207    for( i=0; i<size; ++i )
2208    {
2209        char *                   pch;
2210        const tr_peer *          peer = peers[i];
2211        const struct peer_atom * atom = peer->atom;
2212        tr_peer_stat *           stat = ret + i;
2213
2214        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2215        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2216                   sizeof( stat->client ) );
2217        stat->port                = ntohs( peer->atom->port );
2218        stat->from                = atom->from;
2219        stat->progress            = peer->progress;
2220        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2221        stat->rateToPeer          = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
2222        stat->rateToClient        = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
2223        stat->peerIsChoked        = peer->peerIsChoked;
2224        stat->peerIsInterested    = peer->peerIsInterested;
2225        stat->clientIsChoked      = peer->clientIsChoked;
2226        stat->clientIsInterested  = peer->clientIsInterested;
2227        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2228        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2229        stat->isUploadingTo       = clientIsUploadingTo( peer );
2230        stat->isSeed              = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
2231        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2232        stat->pendingReqsToClient = peer->pendingReqsToClient;
2233
2234        pch = stat->flagStr;
2235        if( t->optimistic == peer ) *pch++ = 'O';
2236        if( stat->isDownloadingFrom ) *pch++ = 'D';
2237        else if( stat->clientIsInterested ) *pch++ = 'd';
2238        if( stat->isUploadingTo ) *pch++ = 'U';
2239        else if( stat->peerIsInterested ) *pch++ = 'u';
2240        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2241        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2242        if( stat->isEncrypted ) *pch++ = 'E';
2243        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2244        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2245        if( stat->isIncoming ) *pch++ = 'I';
2246        *pch = '\0';
2247    }
2248
2249    *setmeCount = size;
2250
2251    managerUnlock( t->manager );
2252    return ret;
2253}
2254
2255/**
2256***
2257**/
2258
2259struct ChokeData
2260{
2261    tr_bool         doUnchoke;
2262    tr_bool         isInterested;
2263    tr_bool         isChoked;
2264    int             rate;
2265    tr_peer *       peer;
2266};
2267
2268static int
2269compareChoke( const void * va,
2270              const void * vb )
2271{
2272    const struct ChokeData * a = va;
2273    const struct ChokeData * b = vb;
2274
2275    if( a->rate != b->rate ) /* prefer higher overall speeds */
2276        return a->rate > b->rate ? -1 : 1;
2277
2278    if( a->isChoked != b->isChoked ) /* prefer unchoked */
2279        return a->isChoked ? 1 : -1;
2280
2281    return 0;
2282}
2283
2284static int
2285isNew( const tr_peer * peer )
2286{
2287    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2288}
2289
2290static int
2291isSame( const tr_peer * peer )
2292{
2293    return peer && peer->client && strstr( peer->client, "Transmission" );
2294}
2295
2296/**
2297***
2298**/
2299
2300static void
2301rechokeTorrent( Torrent * t, const uint64_t now )
2302{
2303    int i, size, unchokedInterested;
2304    const int peerCount = tr_ptrArraySize( &t->peers );
2305    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2306    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2307    const tr_session * session = t->manager->session;
2308    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
2309
2310    assert( torrentIsLocked( t ) );
2311
2312    /* sort the peers by preference and rate */
2313    for( i = 0, size = 0; i < peerCount; ++i )
2314    {
2315        tr_peer * peer = peers[i];
2316        struct peer_atom * atom = peer->atom;
2317
2318        if( peer->progress >= 1.0 ) /* choke all seeds */
2319        {
2320            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2321        }
2322        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2323        {
2324            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2325        }
2326        else if( chokeAll ) /* choke everyone if we're not uploading */
2327        {
2328            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2329        }
2330        else
2331        {
2332            struct ChokeData * n = &choke[size++];
2333            n->peer         = peer;
2334            n->isInterested = peer->peerIsInterested;
2335            n->isChoked     = peer->peerIsChoked;
2336            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
2337        }
2338    }
2339
2340    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
2341
2342    /**
2343     * Reciprocation and number of uploads capping is managed by unchoking
2344     * the N peers which have the best upload rate and are interested.
2345     * This maximizes the client's download rate. These N peers are
2346     * referred to as downloaders, because they are interested in downloading
2347     * from the client.
2348     *
2349     * Peers which have a better upload rate (as compared to the downloaders)
2350     * but aren't interested get unchoked. If they become interested, the
2351     * downloader with the worst upload rate gets choked. If a client has
2352     * a complete file, it uses its upload rate rather than its download
2353     * rate to decide which peers to unchoke.
2354     */
2355    unchokedInterested = 0;
2356    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
2357        choke[i].doUnchoke = 1;
2358        if( choke[i].isInterested )
2359            ++unchokedInterested;
2360    }
2361
2362    /* optimistic unchoke */
2363    if( i < size )
2364    {
2365        int n;
2366        struct ChokeData * c;
2367        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2368
2369        for( ; i<size; ++i )
2370        {
2371            if( choke[i].isInterested )
2372            {
2373                const tr_peer * peer = choke[i].peer;
2374                int x = 1, y;
2375                if( isNew( peer ) ) x *= 3;
2376                if( isSame( peer ) ) x *= 3;
2377                for( y=0; y<x; ++y )
2378                    tr_ptrArrayAppend( &randPool, &choke[i] );
2379            }
2380        }
2381
2382        if(( n = tr_ptrArraySize( &randPool )))
2383        {
2384            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2385            c->doUnchoke = 1;
2386            t->optimistic = c->peer;
2387        }
2388
2389        tr_ptrArrayDestruct( &randPool, NULL );
2390    }
2391
2392    for( i=0; i<size; ++i )
2393        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2394
2395    /* cleanup */
2396    tr_free( choke );
2397}
2398
2399static void
2400rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
2401{
2402    uint64_t now;
2403    tr_torrent * tor = NULL;
2404    tr_peerMgr * mgr = vmgr;
2405    managerLock( mgr );
2406
2407    now = tr_date( );
2408    while(( tor = tr_torrentNext( mgr->session, tor )))
2409        if( tor->isRunning )
2410            rechokeTorrent( tor->torrentPeers, now );
2411
2412    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
2413    managerUnlock( mgr );
2414}
2415
2416/***
2417****
2418****  Life and Death
2419****
2420***/
2421
2422typedef enum
2423{
2424    TR_CAN_KEEP,
2425    TR_CAN_CLOSE,
2426    TR_MUST_CLOSE,
2427}
2428tr_close_type_t;
2429
2430static tr_close_type_t
2431shouldPeerBeClosed( const Torrent    * t,
2432                    const tr_peer    * peer,
2433                    int                peerCount,
2434                    const time_t       now )
2435{
2436    const tr_torrent *       tor = t->tor;
2437    const struct peer_atom * atom = peer->atom;
2438
2439    /* if it's marked for purging, close it */
2440    if( peer->doPurge )
2441    {
2442        tordbg( t, "purging peer %s because its doPurge flag is set",
2443                tr_atomAddrStr( atom ) );
2444        return TR_MUST_CLOSE;
2445    }
2446
2447    /* if we're seeding and the peer has everything we have,
2448     * and enough time has passed for a pex exchange, then disconnect */
2449    if( tr_torrentIsSeed( tor ) )
2450    {
2451        int peerHasEverything;
2452        if( atom->flags & ADDED_F_SEED_FLAG )
2453            peerHasEverything = TRUE;
2454        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2455            peerHasEverything = FALSE;
2456        else {
2457            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2458            tr_bitsetDifference( tmp, &peer->have );
2459            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2460            tr_bitfieldFree( tmp );
2461        }
2462
2463        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2464        {
2465            tordbg( t, "purging peer %s because we're both seeds",
2466                    tr_atomAddrStr( atom ) );
2467            return TR_MUST_CLOSE;
2468        }
2469    }
2470
2471    /* disconnect if it's been too long since piece data has been transferred.
2472     * this is on a sliding scale based on number of available peers... */
2473    {
2474        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2475        /* if we have >= relaxIfFewerThan, strictness is 100%.
2476         * if we have zero connections, strictness is 0% */
2477        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2478                               ? 1.0
2479                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2480        const int lo = MIN_UPLOAD_IDLE_SECS;
2481        const int hi = MAX_UPLOAD_IDLE_SECS;
2482        const int limit = hi - ( ( hi - lo ) * strictness );
2483        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2484/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2485        if( idleTime > limit ) {
2486            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2487                       tr_atomAddrStr( atom ), idleTime );
2488            return TR_CAN_CLOSE;
2489        }
2490    }
2491
2492    return TR_CAN_KEEP;
2493}
2494
2495static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
2496
2497static tr_peer **
2498getPeersToClose( Torrent * t, tr_close_type_t closeType, const time_t now, int * setmeSize )
2499{
2500    int i, peerCount, outsize;
2501    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2502    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2503
2504    assert( torrentIsLocked( t ) );
2505
2506    for( i = outsize = 0; i < peerCount; ++i )
2507        if( shouldPeerBeClosed( t, peers[i], peerCount, now ) == closeType )
2508            ret[outsize++] = peers[i];
2509
2510    sortPeersByLivelinessReverse ( ret, NULL, outsize, tr_date( ) );
2511
2512    *setmeSize = outsize;
2513    return ret;
2514}
2515
2516static int
2517compareCandidates( const void * va, const void * vb )
2518{
2519    const struct peer_atom * a = *(const struct peer_atom**) va;
2520    const struct peer_atom * b = *(const struct peer_atom**) vb;
2521
2522    /* <Charles> Here we would probably want to try reconnecting to
2523     * peers that had most recently given us data. Lots of users have
2524     * trouble with resets due to their routers and/or ISPs. This way we
2525     * can quickly recover from an unwanted reset. So we sort
2526     * piece_data_time in descending order.
2527     */
2528
2529    if( a->piece_data_time != b->piece_data_time )
2530        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2531
2532    if( a->numFails != b->numFails )
2533        return a->numFails < b->numFails ? -1 : 1;
2534
2535    if( a->time != b->time )
2536        return a->time < b->time ? -1 : 1;
2537
2538    /* In order to avoid fragmenting the swarm, peers from trackers and
2539     * from the DHT should be preferred to peers from PEX. */
2540    if( a->from != b->from )
2541        return a->from < b->from ? -1 : 1;
2542
2543    return 0;
2544}
2545
2546static int
2547getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
2548{
2549    int sec;
2550
2551    /* if we were recently connected to this peer and transferring piece
2552     * data, try to reconnect to them sooner rather that later -- we don't
2553     * want network troubles to get in the way of a good peer. */
2554    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2555        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2556
2557    /* don't allow reconnects more often than our minimum */
2558    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2559        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2560
2561    /* otherwise, the interval depends on how many times we've tried
2562     * and failed to connect to the peer */
2563    else switch( atom->numFails ) {
2564        case 0: sec = 0; break;
2565        case 1: sec = 5; break;
2566        case 2: sec = 2 * 60; break;
2567        case 3: sec = 15 * 60; break;
2568        case 4: sec = 30 * 60; break;
2569        case 5: sec = 60 * 60; break;
2570        default: sec = 120 * 60; break;
2571    }
2572
2573    return sec;
2574}
2575
2576static struct peer_atom **
2577getPeerCandidates( Torrent * t, const time_t now, int * setmeSize )
2578{
2579    int                 i, atomCount, retCount;
2580    struct peer_atom ** atoms;
2581    struct peer_atom ** ret;
2582    const int           seed = tr_torrentIsSeed( t->tor );
2583
2584    assert( torrentIsLocked( t ) );
2585
2586    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2587    ret = tr_new( struct peer_atom*, atomCount );
2588    for( i = retCount = 0; i < atomCount; ++i )
2589    {
2590        int                interval;
2591        struct peer_atom * atom = atoms[i];
2592
2593        /* peer fed us too much bad data ... we only keep it around
2594         * now to weed it out in case someone sends it to us via pex */
2595        if( atom->myflags & MYFLAG_BANNED )
2596            continue;
2597
2598        /* peer was unconnectable before, so we're not going to keep trying.
2599         * this is needs a separate flag from `banned', since if they try
2600         * to connect to us later, we'll let them in */
2601        if( atom->myflags & MYFLAG_UNREACHABLE )
2602            continue;
2603
2604        /* no need to connect if we're both seeds... */
2605        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2606                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2607            continue;
2608
2609        /* don't reconnect too often */
2610        interval = getReconnectIntervalSecs( atom, now );
2611        if( ( now - atom->time ) < interval )
2612        {
2613            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2614                    i, tr_atomAddrStr( atom ), interval );
2615            continue;
2616        }
2617
2618        /* Don't connect to peers in our blocklist */
2619        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2620            continue;
2621
2622        /* we don't need two connections to the same peer... */
2623        if( peerIsInUse( t, atom ) )
2624            continue;
2625
2626        ret[retCount++] = atom;
2627    }
2628
2629    if( retCount != 0 )
2630        qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2631    *setmeSize = retCount;
2632    return ret;
2633}
2634
2635static void
2636closePeer( Torrent * t, tr_peer * peer )
2637{
2638    struct peer_atom * atom;
2639
2640    assert( t != NULL );
2641    assert( peer != NULL );
2642
2643    atom = peer->atom;
2644
2645    /* if we transferred piece data, then they might be good peers,
2646       so reset their `numFails' weight to zero.  otherwise we connected
2647       to them fruitlessly, so mark it as another fail */
2648    if( atom->piece_data_time )
2649        atom->numFails = 0;
2650    else
2651        ++atom->numFails;
2652
2653    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2654    removePeer( t, peer );
2655}
2656
2657static void
2658reconnectTorrent( Torrent * t )
2659{
2660    static time_t prevTime = 0;
2661    static int    newConnectionsThisSecond = 0;
2662    const time_t  now = tr_time( );
2663
2664    if( prevTime != now )
2665    {
2666        prevTime = now;
2667        newConnectionsThisSecond = 0;
2668    }
2669
2670    if( !t->isRunning )
2671    {
2672        removeAllPeers( t );
2673    }
2674    else
2675    {
2676        int i;
2677        int mustCloseCount;
2678        int maxCandidates;
2679        struct tr_peer ** mustClose;
2680
2681        /* disconnect the really bad peers */
2682        mustClose = getPeersToClose( t, TR_MUST_CLOSE, now, &mustCloseCount );
2683        for( i=0; i<mustCloseCount; ++i )
2684            closePeer( t, mustClose[i] );
2685        tr_free( mustClose );
2686
2687        /* decide how many peers can we try to add in this pass */
2688        maxCandidates = MAX_RECONNECTIONS_PER_PULSE;
2689        if( tr_announcerHasBacklog( t->manager->session->announcer ) )
2690            maxCandidates /= 2;
2691        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2692        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2693
2694        /* select the best candidates, if they are requested */
2695        if( maxCandidates == 0 )
2696        {
2697            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
2698                       "NO connection candidates needed, %d atoms, "
2699                       "max per pulse is %d",
2700                       t->tor->info.name, mustCloseCount,
2701                       tr_ptrArraySize( &t->pool ),
2702                       MAX_RECONNECTIONS_PER_PULSE );
2703
2704            tordbg( t, "maxCandidates is %d, MAX_RECONNECTIONS_PER_PULSE is %d, "
2705                       "getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2706                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2707                       maxCandidates, MAX_RECONNECTIONS_PER_PULSE,
2708                       getPeerCount( t ), getMaxPeerCount( t->tor ),
2709                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2710        }
2711        else
2712        {
2713            int canCloseCount = 0;
2714            int candidateCount;
2715            struct peer_atom ** candidates;
2716
2717            candidates = getPeerCandidates( t, now, &candidateCount );
2718            maxCandidates = MIN( maxCandidates, candidateCount );
2719
2720            /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2721            if( maxCandidates != 0 )
2722            {
2723                struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, now, &canCloseCount );
2724                for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2725                   closePeer( t, canClose[i] );
2726                tr_free( canClose );
2727            }
2728
2729            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
2730                       "%d can-close connections, %d connection candidates, "
2731                       "%d atoms, max per pulse is %d",
2732                       t->tor->info.name, mustCloseCount,
2733                       canCloseCount, candidateCount,
2734                       tr_ptrArraySize( &t->pool ), MAX_RECONNECTIONS_PER_PULSE );
2735
2736            tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2737                       " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2738                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2739                       candidateCount, MAX_RECONNECTIONS_PER_PULSE,
2740                       getPeerCount( t ), getMaxPeerCount( t->tor ),
2741                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2742
2743            /* add some new ones */
2744            for( i=0; i<maxCandidates; ++i )
2745            {
2746                tr_peerMgr        * mgr = t->manager;
2747                struct peer_atom  * atom = candidates[i];
2748                tr_peerIo         * io;
2749
2750                tordbg( t, "Starting an OUTGOING connection with %s",
2751                        tr_atomAddrStr( atom ) );
2752
2753                io = tr_peerIoNewOutgoing( mgr->session,
2754                                           mgr->session->bandwidth,
2755                                           &atom->addr,
2756                                           atom->port,
2757                                           t->tor->info.hash,
2758                                           t->tor->completeness == TR_SEED );
2759
2760                if( io == NULL )
2761                {
2762                    tordbg( t, "peerIo not created; marking peer %s as unreachable",
2763                            tr_atomAddrStr( atom ) );
2764                    atom->myflags |= MYFLAG_UNREACHABLE;
2765                }
2766                else
2767                {
2768                    tr_handshake * handshake = tr_handshakeNew( io,
2769                                                                mgr->session->encryptionMode,
2770                                                                myHandshakeDoneCB,
2771                                                                mgr );
2772
2773                    assert( tr_peerIoGetTorrentHash( io ) );
2774
2775                    tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2776
2777                    ++newConnectionsThisSecond;
2778
2779                    tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2780                                             handshakeCompare );
2781                }
2782
2783                atom->time = now;
2784            }
2785            tr_free( candidates );
2786        }
2787    }
2788}
2789
2790struct peer_liveliness
2791{
2792    tr_peer * peer;
2793    void * clientData;
2794    time_t pieceDataTime;
2795    time_t time;
2796    int speed;
2797    tr_bool doPurge;
2798};
2799
2800static int
2801comparePeerLiveliness( const void * va, const void * vb )
2802{
2803    const struct peer_liveliness * a = va;
2804    const struct peer_liveliness * b = vb;
2805
2806    if( a->doPurge != b->doPurge )
2807        return a->doPurge ? 1 : -1;
2808
2809    if( a->speed != b->speed ) /* faster goes first */
2810        return a->speed > b->speed ? -1 : 1;
2811
2812    /* the one to give us data more recently goes first */
2813    if( a->pieceDataTime != b->pieceDataTime )
2814        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
2815
2816    /* the one we connected to most recently goes first */
2817    if( a->time != b->time )
2818        return a->time > b->time ? -1 : 1;
2819
2820    return 0;
2821}
2822
2823static int
2824comparePeerLivelinessReverse( const void * va, const void * vb )
2825{
2826    return -comparePeerLiveliness (va, vb);
2827}
2828
2829static void
2830sortPeersByLivelinessImpl( tr_peer  ** peers,
2831                           void     ** clientData,
2832                           int         n,
2833                           uint64_t    now,
2834                           int (*compare) ( const void *va, const void *vb ) )
2835{
2836    int i;
2837    struct peer_liveliness *lives, *l;
2838
2839    /* build a sortable array of peer + extra info */
2840    lives = l = tr_new0( struct peer_liveliness, n );
2841    for( i=0; i<n; ++i, ++l )
2842    {
2843        tr_peer * p = peers[i];
2844        l->peer = p;
2845        l->doPurge = p->doPurge;
2846        l->pieceDataTime = p->atom->piece_data_time;
2847        l->time = p->atom->time;
2848        l->speed = 1024.0 * (   tr_peerGetPieceSpeed( p, now, TR_UP )
2849                              + tr_peerGetPieceSpeed( p, now, TR_DOWN ) );
2850        if( clientData )
2851            l->clientData = clientData[i];
2852    }
2853
2854    /* sort 'em */
2855    assert( n == ( l - lives ) );
2856    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
2857
2858    /* build the peer array */
2859    for( i=0, l=lives; i<n; ++i, ++l ) {
2860        peers[i] = l->peer;
2861        if( clientData )
2862            clientData[i] = l->clientData;
2863    }
2864    assert( n == ( l - lives ) );
2865
2866    /* cleanup */
2867    tr_free( lives );
2868}
2869
2870static void
2871sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2872{
2873    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
2874}
2875
2876static void
2877sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2878{
2879    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
2880}
2881
2882
2883static void
2884enforceTorrentPeerLimit( Torrent * t, uint64_t now )
2885{
2886    int n = tr_ptrArraySize( &t->peers );
2887    const int max = tr_torrentGetPeerLimit( t->tor );
2888    if( n > max )
2889    {
2890        void * base = tr_ptrArrayBase( &t->peers );
2891        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
2892        sortPeersByLiveliness( peers, NULL, n, now );
2893        while( n > max )
2894            closePeer( t, peers[--n] );
2895        tr_free( peers );
2896    }
2897}
2898
2899static void
2900enforceSessionPeerLimit( tr_session * session, uint64_t now )
2901{
2902    int n = 0;
2903    tr_torrent * tor = NULL;
2904    const int max = tr_sessionGetPeerLimit( session );
2905
2906    /* count the total number of peers */
2907    while(( tor = tr_torrentNext( session, tor )))
2908        n += tr_ptrArraySize( &tor->torrentPeers->peers );
2909
2910    /* if there are too many, prune out the worst */
2911    if( n > max )
2912    {
2913        tr_peer ** peers = tr_new( tr_peer*, n );
2914        Torrent ** torrents = tr_new( Torrent*, n );
2915
2916        /* populate the peer array */
2917        n = 0;
2918        tor = NULL;
2919        while(( tor = tr_torrentNext( session, tor ))) {
2920            int i;
2921            Torrent * t = tor->torrentPeers;
2922            const int tn = tr_ptrArraySize( &t->peers );
2923            for( i=0; i<tn; ++i, ++n ) {
2924                peers[n] = tr_ptrArrayNth( &t->peers, i );
2925                torrents[n] = t;
2926            }
2927        }
2928
2929        /* sort 'em */
2930        sortPeersByLiveliness( peers, (void**)torrents, n, now );
2931
2932        /* cull out the crappiest */
2933        while( n-- > max )
2934            closePeer( torrents[n], peers[n] );
2935
2936        /* cleanup */
2937        tr_free( torrents );
2938        tr_free( peers );
2939    }
2940}
2941
2942struct reconnectTorrentStruct
2943{
2944    tr_torrent * torrent;
2945    int salt;
2946};
2947
2948static int
2949compareReconnectTorrents( const void * va, const void * vb )
2950{
2951    int ai, bi;
2952    const struct reconnectTorrentStruct * a = va;
2953    const struct reconnectTorrentStruct * b = vb;
2954
2955    /* primary key: higher priority goes first */
2956    ai = tr_torrentGetPriority( a->torrent );
2957    bi = tr_torrentGetPriority( b->torrent );
2958    if( ai != bi )
2959        return ai > bi ? -1 : 1;
2960
2961    /* secondary key: since users tend to stare at the screens
2962     * watching their downloads' progress, give downloads a
2963     * first shot at attempting outbound peer connections. */
2964    ai = tr_torrentIsSeed( a->torrent );
2965    bi = tr_torrentIsSeed( b->torrent );
2966    if( ai != bi )
2967        return bi ? -1 : 1;
2968
2969    /* tertiary key: random */
2970    if( a->salt != b->salt )
2971        return a->salt - b->salt;
2972
2973    return 0;
2974}
2975   
2976static void
2977reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
2978{
2979    tr_torrent * tor;
2980    tr_peerMgr * mgr = vmgr;
2981    struct reconnectTorrentStruct * torrents;
2982    int torrentCount;
2983    int i;
2984    uint64_t now;
2985    managerLock( mgr );
2986
2987    now = tr_date( );
2988
2989    /**
2990    ***  enforce the per-session and per-torrent peer limits
2991    **/
2992
2993    /* if we're over the per-torrent peer limits, cull some peers */
2994    tor = NULL;
2995    while(( tor = tr_torrentNext( mgr->session, tor )))
2996        if( tor->isRunning )
2997            enforceTorrentPeerLimit( tor->torrentPeers, now );
2998
2999    /* if we're over the per-session peer limits, cull some peers */
3000    enforceSessionPeerLimit( mgr->session, now );
3001
3002    /**
3003    ***  try to make new peer connections
3004    **/
3005
3006    torrentCount = 0;
3007    torrents = tr_new( struct reconnectTorrentStruct,
3008                       mgr->session->torrentCount );
3009    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3010        if( tor->isRunning ) {
3011            struct reconnectTorrentStruct * r = torrents + torrentCount++;
3012            r->torrent = tor;
3013            r->salt = tr_cryptoWeakRandInt( 1024 );
3014        }
3015    }
3016    qsort( torrents,
3017           torrentCount, sizeof( struct reconnectTorrentStruct ),
3018           compareReconnectTorrents );
3019    for( i=0; i<torrentCount; ++i )
3020        reconnectTorrent( torrents[i].torrent->torrentPeers );
3021
3022
3023    /* cleanup */
3024    tr_free( torrents );
3025    tr_timerAddMsec( mgr->reconnectTimer, RECONNECT_PERIOD_MSEC );
3026    managerUnlock( mgr );
3027}
3028
3029/****
3030*****
3031*****  BANDWIDTH ALLOCATION
3032*****
3033****/
3034
3035static void
3036pumpAllPeers( tr_peerMgr * mgr )
3037{
3038    tr_torrent * tor = NULL;
3039
3040    while(( tor = tr_torrentNext( mgr->session, tor )))
3041    {
3042        int j;
3043        Torrent * t = tor->torrentPeers;
3044
3045        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3046        {
3047            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3048            tr_peerMsgsPulse( peer->msgs );
3049        }
3050    }
3051}
3052
3053static void
3054bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3055{
3056    tr_torrent * tor = NULL;
3057    tr_peerMgr * mgr = vmgr;
3058    managerLock( mgr );
3059
3060    /* FIXME: this next line probably isn't necessary... */
3061    pumpAllPeers( mgr );
3062
3063    /* allocate bandwidth to the peers */
3064    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3065    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3066
3067    /* possibly stop torrents that have seeded enough */
3068    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3069        if( tor->needsSeedRatioCheck ) {
3070            tor->needsSeedRatioCheck = FALSE;
3071            tr_torrentCheckSeedRatio( tor );
3072        }
3073    }
3074
3075    /* run the completeness check for any torrents that need it */
3076    tor = NULL;
3077    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3078        if( tor->torrentPeers->needsCompletenessCheck ) {
3079            tor->torrentPeers->needsCompletenessCheck  = FALSE;
3080            tr_torrentRecheckCompleteness( tor );
3081        }
3082    }
3083
3084    /* possibly stop torrents that have an error */
3085    tor = NULL;
3086    while(( tor = tr_torrentNext( mgr->session, tor )))
3087        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR ))
3088            tr_torrentStop( tor );
3089
3090    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3091    managerUnlock( mgr );
3092}
3093
3094/***
3095****
3096***/
3097
3098static int
3099compareAtomPtrsByAddress( const void * va, const void *vb )
3100{
3101    const struct peer_atom * a = * (const struct peer_atom**) va;
3102    const struct peer_atom * b = * (const struct peer_atom**) vb;
3103
3104    assert( tr_isAtom( a ) );
3105    assert( tr_isAtom( b ) );
3106
3107    return tr_compareAddresses( &a->addr, &b->addr );
3108}
3109
3110/* best come first, worst go last */
3111static int
3112compareAtomPtrsByShelfDate( const void * va, const void *vb )
3113{
3114    time_t atime;
3115    time_t btime;
3116    const struct peer_atom * a = * (const struct peer_atom**) va;
3117    const struct peer_atom * b = * (const struct peer_atom**) vb;
3118    const int data_time_cutoff_secs = 60 * 60;
3119    const time_t tr_now = tr_time( );
3120
3121    assert( tr_isAtom( a ) );
3122    assert( tr_isAtom( b ) );
3123
3124    /* primary key: the last piece data time *if* it was within the last hour */
3125    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3126    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3127    if( atime != btime )
3128        return atime > btime ? -1 : 1;
3129
3130    /* secondary key: shelf date. */
3131    if( a->shelf_date != b->shelf_date )
3132        return a->shelf_date > b->shelf_date ? -1 : 1;
3133
3134    return 0;
3135}
3136
3137static int
3138getMaxAtomCount( const tr_torrent * tor )
3139{
3140    /* FIXME: this curve should be smoother... */
3141    const int n = tor->maxConnectedPeers;
3142    if( n >= 200 ) return n * 1.5;
3143    if( n >= 100 ) return n * 2;
3144    if( n >=  50 ) return n * 3;
3145    if( n >=  20 ) return n * 5;
3146    return n * 10;
3147}
3148
3149static void
3150atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3151{
3152    tr_torrent * tor = NULL;
3153    tr_peerMgr * mgr = vmgr;
3154    managerLock( mgr );
3155
3156    while(( tor = tr_torrentNext( mgr->session, tor )))
3157    {
3158        int atomCount;
3159        Torrent * t = tor->torrentPeers;
3160        const int maxAtomCount = getMaxAtomCount( tor );
3161        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3162
3163        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3164        {
3165            int i;
3166            int keepCount = 0;
3167            int testCount = 0;
3168            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3169            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3170
3171            /* keep the ones that are in use */
3172            for( i=0; i<atomCount; ++i ) {
3173                struct peer_atom * atom = atoms[i];
3174                if( peerIsInUse( t, atom ) )
3175                    keep[keepCount++] = atom;
3176                else
3177                    test[testCount++] = atom;
3178            }
3179
3180            /* if there's room, keep the best of what's left */
3181            i = 0;
3182            if( keepCount < maxAtomCount ) {
3183                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3184                while( i<testCount && keepCount<maxAtomCount )
3185                    keep[keepCount++] = test[i++];
3186            }
3187
3188            /* free the culled atoms */
3189            while( i<testCount )
3190                tr_free( test[i++] );
3191
3192            /* rebuild Torrent.pool with what's left */
3193            tr_ptrArrayDestruct( &t->pool, NULL );
3194            t->pool = TR_PTR_ARRAY_INIT;
3195            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3196            for( i=0; i<keepCount; ++i )
3197                tr_ptrArrayAppend( &t->pool, keep[i] );
3198
3199            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3200
3201            /* cleanup */
3202            tr_free( test );
3203            tr_free( keep );
3204        }
3205    }
3206
3207    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3208    managerUnlock( mgr );
3209}
Note: See TracBrowser for help on using the repository browser.