source: trunk/libtransmission/peer-mgr.c @ 9965

Last change on this file since 9965 was 9965, checked in by charles, 12 years ago

(trunk) No code changes here... filling in some of the blanks in the "peers" and "utils" doxygen groups' documentation.

  • Property svn:keywords set to Date Rev Author Id
File size: 92.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 9965 2010-01-19 19:37:00Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16
17#include <event.h>
18
19#include "transmission.h"
20#include "announcer.h"
21#include "bandwidth.h"
22#include "bencode.h"
23#include "blocklist.h"
24#include "clients.h"
25#include "completion.h"
26#include "crypto.h"
27#include "handshake.h"
28#include "inout.h" /* tr_ioTestPiece */
29#include "net.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "session.h"
35#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
36#include "torrent.h"
37#include "utils.h"
38#include "webseed.h"
39
40enum
41{
42    /* how frequently to cull old atoms */
43    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
44
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* how frequently to reallocate bandwidth */
49    BANDWIDTH_PERIOD_MSEC = 500,
50
51    /* how frequently to age out old piece request lists */
52    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
53
54    /* how frequently to decide which peers live and die */
55    RECONNECT_PERIOD_MSEC = 500,
56
57    /* when many peers are available, keep idle ones this long */
58    MIN_UPLOAD_IDLE_SECS = ( 60 ),
59
60    /* when few peers are available, keep idle ones this long */
61    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
62
63    /* max # of peers to ask fer per torrent per reconnect pulse */
64    MAX_RECONNECTIONS_PER_PULSE = 8,
65
66    /* max number of peers to ask for per second overall.
67    * this throttle is to avoid overloading the router */
68    MAX_CONNECTIONS_PER_SECOND = 16,
69
70    /* number of bad pieces a peer is allowed to send before we ban them */
71    MAX_BAD_PIECES_PER_PEER = 5,
72
73    /* amount of time to keep a list of request pieces lying around
74       before it's considered too old and needs to be rebuilt */
75    PIECE_LIST_SHELF_LIFE_SECS = 60,
76
77    /* use for bitwise operations w/peer_atom.myflags */
78    MYFLAG_BANNED = 1,
79
80    /* use for bitwise operations w/peer_atom.myflags */
81    /* unreachable for now... but not banned.
82     * if they try to connect to us it's okay */
83    MYFLAG_UNREACHABLE = 2,
84
85    /* the minimum we'll wait before attempting to reconnect to a peer */
86    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
87
88    /** how long we'll let requests we've made linger before we cancel them */
89    REQUEST_TTL_SECS = 45
90};
91
92
93/**
94***
95**/
96
97enum
98{
99    UPLOAD_ONLY_UKNOWN,
100    UPLOAD_ONLY_YES,
101    UPLOAD_ONLY_NO
102};
103
104/**
105 * Peer information that should be kept even before we've connected and
106 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
107 * which ones would make good candidates for connecting to, and to watch out
108 * for banned peers.
109 *
110 * @see tr_peer
111 * @see tr_peermsgs
112 */
113struct peer_atom
114{
115    tr_peer   * peer;        /* will be NULL if not connected */
116    uint8_t     from;
117    uint8_t     flags;       /* these match the added_f flags */
118    uint8_t     myflags;     /* flags that aren't defined in added_f */
119    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
120    tr_port     port;
121    uint16_t    numFails;
122    tr_address  addr;
123    time_t      time;        /* when the peer's connection status last changed */
124    time_t      piece_data_time;
125
126    /* similar to a TTL field, but less rigid --
127     * if the swarm is small, the atom will be kept past this date. */
128    time_t      shelf_date;
129};
130
131#ifdef NDEBUG
132#define tr_isAtom(a) (TRUE)
133#else
134static tr_bool
135tr_isAtom( const struct peer_atom * atom )
136{
137    return ( atom != NULL )
138        && ( atom->from < TR_PEER_FROM__MAX )
139        && ( tr_isAddress( &atom->addr ) );
140}
141#endif
142
143static const char*
144tr_atomAddrStr( const struct peer_atom * atom )
145{
146    return tr_peerIoAddrStr( &atom->addr, atom->port );
147}
148
149struct block_request
150{
151    tr_block_index_t block;
152    tr_peer * peer;
153    time_t sentAt;
154};
155
156struct weighted_piece
157{
158    tr_piece_index_t index;
159    int16_t salt;
160    int16_t requestCount;
161};
162
163/** @brief Opaque, per-torrent data structure for peer connection information */
164typedef struct tr_torrent_peers
165{
166    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
167    tr_ptrArray                pool; /* struct peer_atom */
168    tr_ptrArray                peers; /* tr_peer */
169    tr_ptrArray                webseeds; /* tr_webseed */
170
171    tr_torrent               * tor;
172    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
173    struct tr_peerMgr        * manager;
174
175    tr_bool                    isRunning;
176    tr_bool                    needsCompletenessCheck;
177
178    struct block_request     * requests;
179    int                        requestsSort;
180    int                        requestCount;
181    int                        requestAlloc;
182
183    struct weighted_piece    * pieces;
184    int                        pieceCount;
185}
186Torrent;
187
188struct tr_peerMgr
189{
190    tr_session    * session;
191    tr_ptrArray     incomingHandshakes; /* tr_handshake */
192    struct event  * bandwidthTimer;
193    struct event  * rechokeTimer;
194    struct event  * reconnectTimer;
195    struct event  * refillUpkeepTimer;
196    struct event  * atomTimer;
197};
198
199#define tordbg( t, ... ) \
200    do { \
201        if( tr_deepLoggingIsActive( ) ) \
202            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
203    } while( 0 )
204
205#define dbgmsg( ... ) \
206    do { \
207        if( tr_deepLoggingIsActive( ) ) \
208            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
209    } while( 0 )
210
211/**
212***
213**/
214
215static inline void
216managerLock( const struct tr_peerMgr * manager )
217{
218    tr_sessionLock( manager->session );
219}
220
221static inline void
222managerUnlock( const struct tr_peerMgr * manager )
223{
224    tr_sessionUnlock( manager->session );
225}
226
227static inline void
228torrentLock( Torrent * torrent )
229{
230    managerLock( torrent->manager );
231}
232
233static inline void
234torrentUnlock( Torrent * torrent )
235{
236    managerUnlock( torrent->manager );
237}
238
239static inline int
240torrentIsLocked( const Torrent * t )
241{
242    return tr_sessionIsLocked( t->manager->session );
243}
244
245/**
246***
247**/
248
249static int
250handshakeCompareToAddr( const void * va, const void * vb )
251{
252    const tr_handshake * a = va;
253
254    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
255}
256
257static int
258handshakeCompare( const void * a, const void * b )
259{
260    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
261}
262
263static tr_handshake*
264getExistingHandshake( tr_ptrArray      * handshakes,
265                      const tr_address * addr )
266{
267    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
268}
269
270static int
271comparePeerAtomToAddress( const void * va, const void * vb )
272{
273    const struct peer_atom * a = va;
274
275    return tr_compareAddresses( &a->addr, vb );
276}
277
278static int
279compareAtomsByAddress( const void * va, const void * vb )
280{
281    const struct peer_atom * b = vb;
282
283    assert( tr_isAtom( b ) );
284
285    return comparePeerAtomToAddress( va, &b->addr );
286}
287
288/**
289***
290**/
291
292const tr_address *
293tr_peerAddress( const tr_peer * peer )
294{
295    return &peer->atom->addr;
296}
297
298static Torrent*
299getExistingTorrent( tr_peerMgr *    manager,
300                    const uint8_t * hash )
301{
302    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
303
304    return tor == NULL ? NULL : tor->torrentPeers;
305}
306
307static int
308peerCompare( const void * a, const void * b )
309{
310    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
311}
312
313static struct peer_atom*
314getExistingAtom( const Torrent    * t,
315                 const tr_address * addr )
316{
317    Torrent * tt = (Torrent*)t;
318    assert( torrentIsLocked( t ) );
319    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
320}
321
322static tr_bool
323peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
324{
325    Torrent * t = (Torrent*) ct;
326
327    assert( torrentIsLocked ( t ) );
328
329    return ( atom->peer != NULL )
330        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
331        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
332}
333
334static tr_peer*
335peerConstructor( struct peer_atom * atom )
336{
337    tr_peer * peer = tr_new0( tr_peer, 1 );
338    tr_bitsetConstructor( &peer->have, 0 );
339    peer->atom = atom;
340    atom->peer = peer;
341    return peer;
342}
343
344static tr_peer*
345getPeer( Torrent * torrent, struct peer_atom * atom )
346{
347    tr_peer * peer;
348
349    assert( torrentIsLocked( torrent ) );
350
351    peer = atom->peer;
352
353    if( peer == NULL )
354    {
355        peer = peerConstructor( atom );
356        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
357    }
358
359    return peer;
360}
361
362static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
363
364static void
365peerDestructor( Torrent * t, tr_peer * peer )
366{
367    assert( peer != NULL );
368
369    peerDeclinedAllRequests( t, peer );
370
371    if( peer->msgs != NULL )
372    {
373        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
374        tr_peerMsgsFree( peer->msgs );
375    }
376
377    tr_peerIoClear( peer->io );
378    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
379
380    tr_bitsetDestructor( &peer->have );
381    tr_bitfieldFree( peer->blame );
382    tr_free( peer->client );
383    peer->atom->peer = NULL;
384
385    tr_free( peer );
386}
387
388static void
389removePeer( Torrent * t, tr_peer * peer )
390{
391    tr_peer * removed;
392    struct peer_atom * atom = peer->atom;
393
394    assert( torrentIsLocked( t ) );
395    assert( atom );
396
397    atom->time = tr_time( );
398
399    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
400    assert( removed == peer );
401    peerDestructor( t, removed );
402}
403
404static void
405removeAllPeers( Torrent * t )
406{
407    while( !tr_ptrArrayEmpty( &t->peers ) )
408        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
409}
410
411static void
412torrentDestructor( void * vt )
413{
414    Torrent * t = vt;
415
416    assert( t );
417    assert( !t->isRunning );
418    assert( torrentIsLocked( t ) );
419    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
420    assert( tr_ptrArrayEmpty( &t->peers ) );
421
422    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
423    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
424    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
425    tr_ptrArrayDestruct( &t->peers, NULL );
426
427    tr_free( t->requests );
428    tr_free( t->pieces );
429    tr_free( t );
430}
431
432static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
433
434static Torrent*
435torrentConstructor( tr_peerMgr * manager,
436                    tr_torrent * tor )
437{
438    int       i;
439    Torrent * t;
440
441    t = tr_new0( Torrent, 1 );
442    t->manager = manager;
443    t->tor = tor;
444    t->pool = TR_PTR_ARRAY_INIT;
445    t->peers = TR_PTR_ARRAY_INIT;
446    t->webseeds = TR_PTR_ARRAY_INIT;
447    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
448    t->requests = 0;
449
450    for( i = 0; i < tor->info.webseedCount; ++i )
451    {
452        tr_webseed * w =
453            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
454        tr_ptrArrayAppend( &t->webseeds, w );
455    }
456
457    return t;
458}
459
460tr_peerMgr*
461tr_peerMgrNew( tr_session * session )
462{
463    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
464    m->session = session;
465    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
466    return m;
467}
468
469static void
470deleteTimer( struct event ** t )
471{
472    if( *t != NULL )
473    {
474        evtimer_del( *t );
475        tr_free( *t );
476        *t = NULL;
477    }
478}
479
480static void
481deleteTimers( struct tr_peerMgr * m )
482{
483    deleteTimer( &m->atomTimer );
484    deleteTimer( &m->bandwidthTimer );
485    deleteTimer( &m->rechokeTimer );
486    deleteTimer( &m->reconnectTimer );
487    deleteTimer( &m->refillUpkeepTimer );
488}
489
490void
491tr_peerMgrFree( tr_peerMgr * manager )
492{
493    managerLock( manager );
494
495    deleteTimers( manager );
496
497    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
498     * the item from manager->handshakes, so this is a little roundabout... */
499    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
500        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
501
502    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
503
504    managerUnlock( manager );
505    tr_free( manager );
506}
507
508static int
509clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
510{
511    if( !tr_torrentHasMetadata( tor ) )
512        return TRUE;
513
514    return peer->clientIsInterested && !peer->clientIsChoked;
515}
516
517static int
518clientIsUploadingTo( const tr_peer * peer )
519{
520    return peer->peerIsInterested && !peer->peerIsChoked;
521}
522
523/***
524****
525***/
526
527tr_bool
528tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
529                      const tr_address  * addr )
530{
531    tr_bool isSeed = FALSE;
532    const Torrent * t = tor->torrentPeers;
533    const struct peer_atom * atom = getExistingAtom( t, addr );
534
535    if( atom )
536        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
537
538    return isSeed;
539}
540
541/**
542***  REQUESTS
543***
544*** There are two data structures associated with managing block requests:
545***
546*** 1. Torrent::requests, an array of "struct block_request" which keeps
547***    track of which blocks have been requested, and when, and by which peers.
548***    This is list is used for (a) cancelling requests that have been pending
549***    for too long and (b) avoiding duplicate requests before endgame.
550***
551*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
552***    pieces that we want to request.  It's used to decide which pieces to
553***    return next when tr_peerMgrGetBlockRequests() is called.
554**/
555
556/**
557*** struct block_request
558**/
559
560enum
561{
562    REQ_UNSORTED,
563    REQ_SORTED_BY_BLOCK,
564    REQ_SORTED_BY_TIME
565};
566
567static int
568compareReqByBlock( const void * va, const void * vb )
569{
570    const struct block_request * a = va;
571    const struct block_request * b = vb;
572
573    /* primary key: block */
574    if( a->block < b->block ) return -1;
575    if( a->block > b->block ) return 1;
576
577    /* secondary key: peer */
578    if( a->peer < b->peer ) return -1;
579    if( a->peer > b->peer ) return 1;
580
581    return 0;
582}
583
584static int
585compareReqByTime( const void * va, const void * vb )
586{
587    const struct block_request * a = va;
588    const struct block_request * b = vb;
589
590    /* primary key: time */
591    if( a->sentAt < b->sentAt ) return -1;
592    if( a->sentAt > b->sentAt ) return 1;
593
594    /* secondary key: peer */
595    if( a->peer < b->peer ) return -1;
596    if( a->peer > b->peer ) return 1;
597
598    return 0;
599}
600
601static void
602requestListSort( Torrent * t, int mode )
603{
604    assert( mode==REQ_SORTED_BY_BLOCK || mode==REQ_SORTED_BY_TIME );
605
606    if( t->requestsSort != mode )
607    {
608        int(*compar)(const void *, const void *);
609
610        t->requestsSort = mode;
611
612        switch( mode ) {
613            case REQ_SORTED_BY_BLOCK: compar = compareReqByBlock; break;
614            case REQ_SORTED_BY_TIME: compar = compareReqByTime; break;
615            default: assert( 0 && "unhandled" );
616        }
617
618        qsort( t->requests, t->requestCount,
619               sizeof( struct block_request ), compar );
620    }
621}
622
623static void
624requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
625{
626    struct block_request key;
627
628    /* ensure enough room is available... */
629    if( t->requestCount + 1 >= t->requestAlloc )
630    {
631        const int CHUNK_SIZE = 128;
632        t->requestAlloc += CHUNK_SIZE;
633        t->requests = tr_renew( struct block_request,
634                                t->requests, t->requestAlloc );
635    }
636
637    /* populate the record we're inserting */
638    key.block = block;
639    key.peer = peer;
640    key.sentAt = tr_time( );
641
642    /* insert the request to our array... */
643    switch( t->requestsSort )
644    {
645        case REQ_UNSORTED:
646        case REQ_SORTED_BY_TIME:
647            t->requests[t->requestCount++] = key;
648            break;
649
650        case REQ_SORTED_BY_BLOCK: {
651            tr_bool exact;
652            const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
653                                           sizeof( struct block_request ),
654                                           compareReqByBlock, &exact );
655            assert( !exact );
656            memmove( t->requests + pos + 1,
657                     t->requests + pos,
658                     sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
659            t->requests[pos] = key;
660            break;
661        }
662    }
663
664    if( peer != NULL )
665    {
666        ++peer->pendingReqsToPeer;
667        assert( peer->pendingReqsToPeer >= 0 );
668    }
669
670    /*fprintf( stderr, "added request of block %lu from peer %s... "
671                       "there are now %d block\n",
672                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
673}
674
675static struct block_request *
676requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
677{
678    struct block_request key;
679    key.block = block;
680    key.peer = (tr_peer*) peer;
681
682    requestListSort( t, REQ_SORTED_BY_BLOCK );
683
684    return bsearch( &key, t->requests, t->requestCount,
685                    sizeof( struct block_request ),
686                    compareReqByBlock );
687}
688
689/* how many peers are we currently requesting this block from... */
690static int
691countBlockRequests( Torrent * t, tr_block_index_t block )
692{
693    tr_bool exact;
694    int i, n, pos;
695    struct block_request key;
696
697    requestListSort( t, REQ_SORTED_BY_BLOCK );
698    key.block = block;
699    key.peer = NULL;
700    pos = tr_lowerBound( &key, t->requests, t->requestCount,
701                         sizeof( struct block_request ),
702                         compareReqByBlock, &exact );
703
704    assert( !exact ); /* shouldn't have a request with .peer == NULL */
705
706    n = 0;
707    for( i=pos; i<t->requestCount; ++i ) {
708        if( t->requests[i].block == block )
709            ++n;
710        else
711            break;
712    }
713
714    return n;
715}
716
717static void
718requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
719{
720    const struct block_request * b = requestListLookup( t, block, peer );
721    if( b != NULL )
722    {
723        const int pos = b - t->requests;
724        assert( pos < t->requestCount );
725
726        if( b->peer != NULL )
727        {
728            --b->peer->pendingReqsToPeer;
729            assert( b->peer->pendingReqsToPeer >= 0 );
730        }
731
732        memmove( t->requests + pos,
733                 t->requests + pos + 1,
734                 sizeof( struct block_request ) * ( --t->requestCount - pos ) );
735        /*fprintf( stderr, "removing request of block %lu from peer %s... "
736                           "there are now %d block requests left\n",
737                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
738    }
739}
740
741/**
742*** struct weighted_piece
743**/
744
745enum
746{
747    PIECES_UNSORTED,
748    PIECES_SORTED_BY_INDEX,
749    PIECES_SORTED_BY_WEIGHT
750};
751
752const tr_torrent * weightTorrent;
753
754/* we try to create a "weight" s.t. high-priority pieces come before others,
755 * and that partially-complete pieces come before empty ones. */
756static int
757comparePieceByWeight( const void * va, const void * vb )
758{
759    const struct weighted_piece * a = va;
760    const struct weighted_piece * b = vb;
761    int ia, ib, missing, pending;
762    const tr_torrent * tor = weightTorrent;
763
764    /* primary key: weight */
765    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
766    pending = a->requestCount;
767    ia = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
768    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
769    pending = b->requestCount;
770    ib = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
771    if( ia < ib ) return -1;
772    if( ia > ib ) return 1;
773
774    /* secondary key: higher priorities go first */
775    ia = tor->info.pieces[a->index].priority;
776    ib = tor->info.pieces[b->index].priority;
777    if( ia > ib ) return -1;
778    if( ia < ib ) return 1;
779
780    /* tertiary key: random */
781    if( a->salt < b->salt ) return -1;
782    if( a->salt > b->salt ) return 1;
783
784    /* okay, they're equal */
785    return 0;
786}
787
788static int
789comparePieceByIndex( const void * va, const void * vb )
790{
791    const struct weighted_piece * a = va;
792    const struct weighted_piece * b = vb;
793    if( a->index < b->index ) return -1;
794    if( a->index > b->index ) return 1;
795    return 0;
796}
797
798static void
799pieceListSort( Torrent * t, int mode )
800{
801    int(*compar)(const void *, const void *);
802
803    assert( mode==PIECES_SORTED_BY_INDEX
804         || mode==PIECES_SORTED_BY_WEIGHT );
805
806    switch( mode ) {
807        case PIECES_SORTED_BY_WEIGHT: compar = comparePieceByWeight; break;
808        case PIECES_SORTED_BY_INDEX: compar = comparePieceByIndex; break;
809        default: assert( 0 && "unhandled" );  break;
810    }
811
812    weightTorrent = t->tor;
813    qsort( t->pieces, t->pieceCount,
814           sizeof( struct weighted_piece ), compar );
815}
816
817static tr_bool
818isInEndgame( Torrent * t )
819{
820    tr_bool endgame = TRUE;
821
822    if( ( t->pieces != NULL ) && ( t->pieceCount > 0 ) )
823    {
824        const tr_completion * cp = &t->tor->completion;
825        const struct weighted_piece * p = t->pieces;
826        const int pending = p->requestCount;
827        const int missing = tr_cpMissingBlocksInPiece( cp, p->index );
828        endgame = pending >= missing;
829    }
830
831    /*if( endgame ) fprintf( stderr, "ENDGAME reached\n" );*/
832    return endgame;
833}
834
835static struct weighted_piece *
836pieceListLookup( Torrent * t, tr_piece_index_t index )
837{
838    const struct weighted_piece * limit = t->pieces;
839    struct weighted_piece * piece = t->pieces + t->pieceCount - 1;
840
841    if ( t->pieceCount == 0 ) return NULL;
842
843    /* reverse linear search */
844    for( ;; )
845    { 
846        if( piece < limit ) return NULL;
847        if( index == piece->index ) return piece; else --piece;
848    }
849}
850
851static void
852pieceListRebuild( Torrent * t )
853{
854    if( !tr_torrentIsSeed( t->tor ) )
855    {
856        tr_piece_index_t i;
857        tr_piece_index_t * pool;
858        tr_piece_index_t poolCount = 0;
859        const tr_torrent * tor = t->tor;
860        const tr_info * inf = tr_torrentInfo( tor );
861        struct weighted_piece * pieces;
862        int pieceCount;
863
864        /* build the new list */
865        pool = tr_new( tr_piece_index_t, inf->pieceCount );
866        for( i=0; i<inf->pieceCount; ++i )
867            if( !inf->pieces[i].dnd )
868                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
869                    pool[poolCount++] = i;
870        pieceCount = poolCount;
871        pieces = tr_new0( struct weighted_piece, pieceCount );
872        for( i=0; i<poolCount; ++i ) {
873            struct weighted_piece * piece = pieces + i;
874            piece->index = pool[i];
875            piece->requestCount = 0;
876            piece->salt = tr_cryptoWeakRandInt( 4096 );
877        }
878
879        /* if we already had a list of pieces, merge it into
880         * the new list so we don't lose its requestCounts */
881        if( t->pieces != NULL )
882        {
883            struct weighted_piece * o = t->pieces;
884            struct weighted_piece * oend = o + t->pieceCount;
885            struct weighted_piece * n = pieces;
886            struct weighted_piece * nend = n + pieceCount;
887
888            pieceListSort( t, PIECES_SORTED_BY_INDEX );
889
890            while( o!=oend && n!=nend ) {
891                if( o->index < n->index )
892                    ++o;
893                else if( o->index > n->index )
894                    ++n;
895                else
896                    *n++ = *o++;
897            }
898
899            tr_free( t->pieces );
900        }
901
902        t->pieces = pieces;
903        t->pieceCount = pieceCount;
904
905        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
906
907        /* cleanup */
908        tr_free( pool );
909    }
910}
911
912static void
913pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
914{
915    struct weighted_piece * p = pieceListLookup( t, piece );
916
917    if( p != NULL )
918    {
919        const int pos = p - t->pieces;
920
921        memmove( t->pieces + pos,
922                 t->pieces + pos + 1,
923                 sizeof( struct weighted_piece ) * ( --t->pieceCount - pos ) );
924
925        if( t->pieceCount == 0 )
926        {
927            tr_free( t->pieces );
928            t->pieces = NULL;
929        }
930    }
931}
932
933static void
934pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
935{
936    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
937    const struct weighted_piece * p = pieceListLookup( t, index );
938
939    if( p != NULL )
940    {
941        const int pos = p - t->pieces;
942        struct weighted_piece piece = t->pieces[pos];
943        int newpos;
944        tr_bool exact;
945
946        /* remove request */
947        if( piece.requestCount > 0 )
948            --piece.requestCount;
949
950        /* List is nearly sorted (by weight) : insert piece into the right place. */
951
952        weightTorrent = t->tor;
953        newpos = tr_lowerBound( &piece, t->pieces, t->pieceCount,
954                                sizeof( struct weighted_piece ),
955                                comparePieceByWeight, &exact );
956
957        if( newpos == pos || newpos == pos + 1 )
958        {
959            /* it's VERY likely that piece keeps its position */
960            t->pieces[pos].requestCount = piece.requestCount;
961        }
962        else
963        {
964            /* piece is removed temporally to make insertion easier */
965            memmove( &t->pieces[pos],
966                     &t->pieces[pos + 1],
967                     sizeof( struct weighted_piece ) * ( --t->pieceCount - pos ) );
968
969            if( newpos > pos ) --newpos;
970
971            memmove( &t->pieces[newpos + 1],
972                     &t->pieces[newpos],
973                     sizeof( struct weighted_piece ) * ( t->pieceCount++ - newpos ) );
974
975            t->pieces[newpos] = piece;
976        }
977    }
978}
979
980/**
981***
982**/
983
984void
985tr_peerMgrRebuildRequests( tr_torrent * tor )
986{
987    assert( tr_isTorrent( tor ) );
988
989    pieceListRebuild( tor->torrentPeers );
990}
991
992void
993tr_peerMgrGetNextRequests( tr_torrent           * tor,
994                           tr_peer              * peer,
995                           int                    numwant,
996                           tr_block_index_t     * setme,
997                           int                  * numgot )
998{
999    int i;
1000    int got;
1001    Torrent * t;
1002    tr_bool endgame;
1003    struct weighted_piece * pieces;
1004    const tr_bitset * have = &peer->have;
1005
1006    /* sanity clause */
1007    assert( tr_isTorrent( tor ) );
1008    assert( numwant > 0 );
1009
1010    /* walk through the pieces and find blocks that should be requested */
1011    got = 0;
1012    t = tor->torrentPeers;
1013
1014    /* prep the pieces list */
1015    if( t->pieces == NULL )
1016        pieceListRebuild( t );
1017
1018    endgame = isInEndgame( t );
1019
1020    pieces = t->pieces;
1021    for( i=0; i<t->pieceCount && got<numwant; ++i )
1022    {
1023        struct weighted_piece * p = pieces + i;
1024        const int missing = tr_cpMissingBlocksInPiece( &tor->completion, p->index );
1025        const int maxDuplicatesPerBlock = endgame ? 3 : 1;
1026
1027        if( p->requestCount > ( missing * maxDuplicatesPerBlock ) )
1028            continue;
1029
1030        /* if the peer has this piece that we want... */
1031        if( tr_bitsetHasFast( have, p->index ) )
1032        {
1033            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
1034            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
1035
1036            for( ; b!=e && got<numwant; ++b )
1037            {
1038                /* don't request blocks we've already got */
1039                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
1040                    continue;
1041
1042                /* don't send the same request to the same peer twice */
1043                if( tr_peerMgrDidPeerRequest( tor, peer, b ) )
1044                    continue;
1045
1046                /* don't send the same request to any peer too many times */
1047                if( countBlockRequests( t, b ) >= maxDuplicatesPerBlock )
1048                    continue;
1049
1050                /* update the caller's table */
1051                setme[got++] = b;
1052
1053                /* update our own tables */
1054                requestListAdd( t, b, peer );
1055                ++p->requestCount;
1056            }
1057        }
1058    }
1059
1060    /* In most cases we've just changed the weights of a small number of pieces.
1061     * So rather than qsort()ing the entire array, it's faster to apply an
1062     * adaptive insertion sort algorithm. */
1063    if( got > 0 )
1064    {
1065        /* not enough requests || last piece modified */
1066        if ( i == t->pieceCount ) --i;
1067
1068        weightTorrent = t->tor;
1069        while( --i >= 0 )
1070        {
1071            tr_bool exact;
1072
1073            /* relative position! */
1074            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1075                                              t->pieceCount - (i + 1),
1076                                              sizeof( struct weighted_piece ),
1077                                              comparePieceByWeight, &exact );
1078            if( newpos > 0 )
1079            {
1080                const struct weighted_piece piece = t->pieces[i];
1081                memmove( &t->pieces[i],
1082                         &t->pieces[i + 1],
1083                         sizeof( struct weighted_piece ) * ( newpos ) );
1084                t->pieces[i + newpos] = piece;
1085            }
1086        }
1087    }
1088
1089    *numgot = got;
1090}
1091
1092tr_bool
1093tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1094                          const tr_peer     * peer,
1095                          tr_block_index_t    block )
1096{
1097    const Torrent * t = tor->torrentPeers;
1098    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1099}
1100
1101/* cancel requests that are too old */
1102static void
1103refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1104{
1105    time_t now;
1106    time_t too_old;
1107    tr_torrent * tor;
1108    tr_peerMgr * mgr = vmgr;
1109    managerLock( mgr );
1110
1111    now = tr_time( );
1112    too_old = now - REQUEST_TTL_SECS;
1113
1114    tor = NULL;
1115    while(( tor = tr_torrentNext( mgr->session, tor )))
1116    {
1117        Torrent * t = tor->torrentPeers;
1118        const int n = t->requestCount;
1119        if( n > 0 )
1120        {
1121            int keepCount = 0;
1122            int cancelCount = 0;
1123            struct block_request * keep = tr_new( struct block_request, n );
1124            struct block_request * cancel = tr_new( struct block_request, n );
1125            const struct block_request * it;
1126            const struct block_request * end;
1127
1128            for( it=t->requests, end=it+n; it!=end; ++it )
1129                if( it->sentAt <= too_old )
1130                    cancel[cancelCount++] = *it;
1131                else
1132                    keep[keepCount++] = *it;
1133
1134            /* prune out the ones we aren't keeping */
1135            tr_free( t->requests );
1136            t->requests = keep;
1137            t->requestCount = keepCount;
1138            t->requestAlloc = n;
1139
1140            /* send cancel messages for all the "cancel" ones */
1141            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1142                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) )
1143                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1144
1145            /* decrement the pending request counts for the timed-out blocks */
1146            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1147                pieceListRemoveRequest( t, it->block );
1148
1149            /* cleanup loop */
1150            tr_free( cancel );
1151        }
1152    }
1153
1154    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1155    managerUnlock( mgr );
1156}
1157
1158static void
1159addStrike( Torrent * t, tr_peer * peer )
1160{
1161    tordbg( t, "increasing peer %s strike count to %d",
1162            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1163
1164    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1165    {
1166        struct peer_atom * atom = peer->atom;
1167        atom->myflags |= MYFLAG_BANNED;
1168        peer->doPurge = 1;
1169        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1170    }
1171}
1172
1173static void
1174gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1175{
1176    tr_torrent *   tor = t->tor;
1177    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1178
1179    tor->corruptCur += byteCount;
1180    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1181}
1182
1183static void
1184peerSuggestedPiece( Torrent            * t UNUSED,
1185                    tr_peer            * peer UNUSED,
1186                    tr_piece_index_t     pieceIndex UNUSED,
1187                    int                  isFastAllowed UNUSED )
1188{
1189#if 0
1190    assert( t );
1191    assert( peer );
1192    assert( peer->msgs );
1193
1194    /* is this a valid piece? */
1195    if(  pieceIndex >= t->tor->info.pieceCount )
1196        return;
1197
1198    /* don't ask for it if we've already got it */
1199    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1200        return;
1201
1202    /* don't ask for it if they don't have it */
1203    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1204        return;
1205
1206    /* don't ask for it if we're choked and it's not fast */
1207    if( !isFastAllowed && peer->clientIsChoked )
1208        return;
1209
1210    /* request the blocks that we don't have in this piece */
1211    {
1212        tr_block_index_t block;
1213        const tr_torrent * tor = t->tor;
1214        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
1215        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
1216
1217        for( block=start; block<end; ++block )
1218        {
1219            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1220            {
1221                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1222                const uint32_t length = tr_torBlockCountBytes( tor, block );
1223                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1224                incrementPieceRequests( t, pieceIndex );
1225            }
1226        }
1227    }
1228#endif
1229}
1230
1231static void
1232decrementDownloadedCount( tr_torrent * tor, uint32_t byteCount )
1233{
1234    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1235}
1236
1237static void
1238clientGotUnwantedBlock( tr_torrent * tor, tr_block_index_t block )
1239{
1240    decrementDownloadedCount( tor, tr_torBlockCountBytes( tor, block ) );
1241}
1242
1243static void
1244removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1245{
1246    requestListRemove( t, block, peer );
1247    pieceListRemoveRequest( t, block );
1248}
1249
1250/* peer choked us, or maybe it disconnected.
1251   either way we need to remove all its requests */
1252static void
1253peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1254{
1255    int i, n;
1256    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1257
1258    for( i=n=0; i<t->requestCount; ++i )
1259        if( peer == t->requests[i].peer )
1260            blocks[n++] = t->requests[i].block;
1261
1262    for( i=0; i<n; ++i )
1263        removeRequestFromTables( t, blocks[i], peer );
1264
1265    tr_free( blocks );
1266}
1267
1268static void
1269peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1270{
1271    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1272    Torrent * t = vt;
1273    const tr_peer_event * e = vevent;
1274
1275    torrentLock( t );
1276
1277    switch( e->eventType )
1278    {
1279        case TR_PEER_UPLOAD_ONLY:
1280            /* update our atom */
1281            if( peer ) {
1282                if( e->uploadOnly ) {
1283                    peer->atom->uploadOnly = UPLOAD_ONLY_YES;
1284                    peer->atom->flags |= ADDED_F_SEED_FLAG;
1285                } else {
1286                    peer->atom->uploadOnly = UPLOAD_ONLY_NO;
1287                    peer->atom->flags &= ~ADDED_F_SEED_FLAG;
1288                }
1289            }
1290            break;
1291
1292        case TR_PEER_PEER_GOT_DATA:
1293        {
1294            const time_t now = tr_time( );
1295            tr_torrent * tor = t->tor;
1296
1297            tr_torrentSetActivityDate( tor, now );
1298
1299            if( e->wasPieceData ) {
1300                tor->uploadedCur += e->length;
1301                tr_torrentSetDirty( tor );
1302            }
1303
1304            /* update the stats */
1305            if( e->wasPieceData )
1306                tr_statsAddUploaded( tor->session, e->length );
1307
1308            /* update our atom */
1309            if( peer && e->wasPieceData )
1310                peer->atom->piece_data_time = now;
1311
1312            tor->needsSeedRatioCheck = TRUE;
1313
1314            break;
1315        }
1316
1317        case TR_PEER_CLIENT_GOT_REJ:
1318            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ), peer );
1319            break;
1320
1321        case TR_PEER_CLIENT_GOT_CHOKE:
1322            peerDeclinedAllRequests( t, peer );
1323            break;
1324
1325        case TR_PEER_CLIENT_GOT_PORT:
1326            if( peer )
1327                peer->atom->port = e->port;
1328            break;
1329
1330        case TR_PEER_CLIENT_GOT_SUGGEST:
1331            if( peer )
1332                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1333            break;
1334
1335        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1336            if( peer )
1337                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1338            break;
1339
1340        case TR_PEER_CLIENT_GOT_DATA:
1341        {
1342            const time_t now = tr_time( );
1343            tr_torrent * tor = t->tor;
1344
1345            tr_torrentSetActivityDate( tor, now );
1346
1347            /* only add this to downloadedCur if we got it from a peer --
1348             * webseeds shouldn't count against our ratio.  As one tracker
1349             * admin put it, "Those pieces are downloaded directly from the
1350             * content distributor, not the peers, it is the tracker's job
1351             * to manage the swarms, not the web server and does not fit
1352             * into the jurisdiction of the tracker." */
1353            if( peer && e->wasPieceData ) {
1354                tor->downloadedCur += e->length;
1355                tr_torrentSetDirty( tor );
1356            }
1357
1358            /* update the stats */
1359            if( e->wasPieceData )
1360                tr_statsAddDownloaded( tor->session, e->length );
1361
1362            /* update our atom */
1363            if( peer && e->wasPieceData )
1364                peer->atom->piece_data_time = now;
1365
1366            break;
1367        }
1368
1369        case TR_PEER_PEER_PROGRESS:
1370        {
1371            if( peer )
1372            {
1373                struct peer_atom * atom = peer->atom;
1374                if( e->progress >= 1.0 ) {
1375                    tordbg( t, "marking peer %s as a seed",
1376                            tr_atomAddrStr( atom ) );
1377                    atom->flags |= ADDED_F_SEED_FLAG;
1378                }
1379            }
1380            break;
1381        }
1382
1383        case TR_PEER_CLIENT_GOT_BLOCK:
1384        {
1385            tr_torrent * tor = t->tor;
1386            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1387
1388            requestListRemove( t, block, peer );
1389
1390            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1391            {
1392                tordbg( t, "we have this block already..." );
1393                clientGotUnwantedBlock( tor, block );
1394                pieceListRemoveRequest( t, block );
1395            }
1396            else
1397            {
1398                tr_cpBlockAdd( &tor->completion, block );
1399                pieceListRemoveRequest( t, block );
1400                tr_torrentSetDirty( tor );
1401
1402                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1403                {
1404                    const tr_piece_index_t p = e->pieceIndex;
1405                    const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1406
1407                    if( !ok )
1408                    {
1409                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1410                                   (unsigned long)p );
1411                    }
1412
1413                    tr_torrentSetHasPiece( tor, p, ok );
1414                    tr_torrentSetPieceChecked( tor, p, TRUE );
1415                    tr_peerMgrSetBlame( tor, p, ok );
1416
1417                    if( !ok )
1418                    {
1419                        gotBadPiece( t, p );
1420                    }
1421                    else
1422                    {
1423                        int i;
1424                        int peerCount;
1425                        tr_peer ** peers;
1426                        tr_file_index_t fileIndex;
1427
1428                        peerCount = tr_ptrArraySize( &t->peers );
1429                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1430                        for( i=0; i<peerCount; ++i )
1431                            tr_peerMsgsHave( peers[i]->msgs, p );
1432
1433                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1434                            const tr_file * file = &tor->info.files[fileIndex];
1435                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) )
1436                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1437                                    tr_torrentFileCompleted( tor, fileIndex );
1438                        }
1439
1440                        pieceListRemovePiece( t, p );
1441                    }
1442                }
1443
1444                t->needsCompletenessCheck = TRUE;
1445            }
1446            break;
1447        }
1448
1449        case TR_PEER_ERROR:
1450            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1451            {
1452                /* some protocol error from the peer */
1453                peer->doPurge = 1;
1454                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1455                        tr_atomAddrStr( peer->atom ) );
1456            }
1457            else
1458            {
1459                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1460            }
1461            break;
1462
1463        default:
1464            assert( 0 );
1465    }
1466
1467    torrentUnlock( t );
1468}
1469
1470static int
1471getDefaultShelfLife( uint8_t from )
1472{
1473    /* in general, peers obtained from firsthand contact
1474     * are better than those from secondhand, etc etc */
1475    switch( from )
1476    {
1477        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1478        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1479        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1480        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1481        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1482        case TR_PEER_FROM_RESUME   : return 60 * 60;
1483        default                    : return 60 * 60;
1484    }
1485}
1486
1487
1488static void
1489ensureAtomExists( Torrent           * t,
1490                  const tr_address  * addr,
1491                  const tr_port       port,
1492                  const uint8_t       flags,
1493                  const uint8_t       from )
1494{
1495    assert( tr_isAddress( addr ) );
1496    assert( from < TR_PEER_FROM__MAX );
1497
1498    if( getExistingAtom( t, addr ) == NULL )
1499    {
1500        struct peer_atom * a;
1501        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1502
1503        a = tr_new0( struct peer_atom, 1 );
1504        a->addr = *addr;
1505        a->port = port;
1506        a->flags = flags;
1507        a->from = from;
1508        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1509        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1510
1511        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1512    }
1513}
1514
1515static int
1516getMaxPeerCount( const tr_torrent * tor )
1517{
1518    return tor->maxConnectedPeers;
1519}
1520
1521static int
1522getPeerCount( const Torrent * t )
1523{
1524    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1525}
1526
1527/* FIXME: this is kind of a mess. */
1528static tr_bool
1529myHandshakeDoneCB( tr_handshake  * handshake,
1530                   tr_peerIo     * io,
1531                   tr_bool         isConnected,
1532                   const uint8_t * peer_id,
1533                   void          * vmanager )
1534{
1535    tr_bool            ok = isConnected;
1536    tr_bool            success = FALSE;
1537    tr_port            port;
1538    const tr_address * addr;
1539    tr_peerMgr       * manager = vmanager;
1540    Torrent          * t;
1541    tr_handshake     * ours;
1542
1543    assert( io );
1544    assert( tr_isBool( ok ) );
1545
1546    t = tr_peerIoHasTorrentHash( io )
1547        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1548        : NULL;
1549
1550    if( tr_peerIoIsIncoming ( io ) )
1551        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1552                                        handshake, handshakeCompare );
1553    else if( t )
1554        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1555                                        handshake, handshakeCompare );
1556    else
1557        ours = handshake;
1558
1559    assert( ours );
1560    assert( ours == handshake );
1561
1562    if( t )
1563        torrentLock( t );
1564
1565    addr = tr_peerIoGetAddress( io, &port );
1566
1567    if( !ok || !t || !t->isRunning )
1568    {
1569        if( t )
1570        {
1571            struct peer_atom * atom = getExistingAtom( t, addr );
1572            if( atom )
1573                ++atom->numFails;
1574        }
1575    }
1576    else /* looking good */
1577    {
1578        struct peer_atom * atom;
1579
1580        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1581        atom = getExistingAtom( t, addr );
1582        atom->time = tr_time( );
1583        atom->piece_data_time = 0;
1584
1585        if( atom->myflags & MYFLAG_BANNED )
1586        {
1587            tordbg( t, "banned peer %s tried to reconnect",
1588                    tr_atomAddrStr( atom ) );
1589        }
1590        else if( tr_peerIoIsIncoming( io )
1591               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1592
1593        {
1594        }
1595        else
1596        {
1597            tr_peer * peer = atom->peer;
1598
1599            if( peer ) /* we already have this peer */
1600            {
1601            }
1602            else
1603            {
1604                peer = getPeer( t, atom );
1605                tr_free( peer->client );
1606
1607                if( !peer_id )
1608                    peer->client = NULL;
1609                else {
1610                    char client[128];
1611                    tr_clientForId( client, sizeof( client ), peer_id );
1612                    peer->client = tr_strdup( client );
1613                }
1614
1615                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1616                                                                balanced by our unref in peerDestructor()  */
1617                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1618                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1619
1620                success = TRUE;
1621            }
1622        }
1623    }
1624
1625    if( t )
1626        torrentUnlock( t );
1627
1628    return success;
1629}
1630
1631void
1632tr_peerMgrAddIncoming( tr_peerMgr * manager,
1633                       tr_address * addr,
1634                       tr_port      port,
1635                       int          socket )
1636{
1637    tr_session * session;
1638
1639    managerLock( manager );
1640
1641    assert( tr_isSession( manager->session ) );
1642    session = manager->session;
1643
1644    if( tr_sessionIsAddressBlocked( session, addr ) )
1645    {
1646        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1647        tr_netClose( session, socket );
1648    }
1649    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1650    {
1651        tr_netClose( session, socket );
1652    }
1653    else /* we don't have a connection to them yet... */
1654    {
1655        tr_peerIo *    io;
1656        tr_handshake * handshake;
1657
1658        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket );
1659
1660        handshake = tr_handshakeNew( io,
1661                                     session->encryptionMode,
1662                                     myHandshakeDoneCB,
1663                                     manager );
1664
1665        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1666
1667        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1668                                 handshakeCompare );
1669    }
1670
1671    managerUnlock( manager );
1672}
1673
1674static tr_bool
1675tr_isPex( const tr_pex * pex )
1676{
1677    return pex && tr_isAddress( &pex->addr );
1678}
1679
1680void
1681tr_peerMgrAddPex( tr_torrent   *  tor,
1682                  uint8_t         from,
1683                  const tr_pex *  pex )
1684{
1685    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1686    {
1687        Torrent * t = tor->torrentPeers;
1688        managerLock( t->manager );
1689
1690        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1691            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1692                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1693
1694        managerUnlock( t->manager );
1695    }
1696}
1697
1698tr_pex *
1699tr_peerMgrCompactToPex( const void *    compact,
1700                        size_t          compactLen,
1701                        const uint8_t * added_f,
1702                        size_t          added_f_len,
1703                        size_t *        pexCount )
1704{
1705    size_t          i;
1706    size_t          n = compactLen / 6;
1707    const uint8_t * walk = compact;
1708    tr_pex *        pex = tr_new0( tr_pex, n );
1709
1710    for( i = 0; i < n; ++i )
1711    {
1712        pex[i].addr.type = TR_AF_INET;
1713        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1714        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1715        if( added_f && ( n == added_f_len ) )
1716            pex[i].flags = added_f[i];
1717    }
1718
1719    *pexCount = n;
1720    return pex;
1721}
1722
1723tr_pex *
1724tr_peerMgrCompact6ToPex( const void    * compact,
1725                         size_t          compactLen,
1726                         const uint8_t * added_f,
1727                         size_t          added_f_len,
1728                         size_t        * pexCount )
1729{
1730    size_t          i;
1731    size_t          n = compactLen / 18;
1732    const uint8_t * walk = compact;
1733    tr_pex *        pex = tr_new0( tr_pex, n );
1734
1735    for( i = 0; i < n; ++i )
1736    {
1737        pex[i].addr.type = TR_AF_INET6;
1738        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1739        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1740        if( added_f && ( n == added_f_len ) )
1741            pex[i].flags = added_f[i];
1742    }
1743
1744    *pexCount = n;
1745    return pex;
1746}
1747
1748tr_pex *
1749tr_peerMgrArrayToPex( const void * array,
1750                      size_t       arrayLen,
1751                      size_t      * pexCount )
1752{
1753    size_t          i;
1754    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1755    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1756    const uint8_t * walk = array;
1757    tr_pex        * pex = tr_new0( tr_pex, n );
1758
1759    for( i = 0 ; i < n ; i++ ) {
1760        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1761        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1762        pex[i].flags = 0x00;
1763        walk += sizeof( tr_address ) + 2;
1764    }
1765
1766    *pexCount = n;
1767    return pex;
1768}
1769
1770/**
1771***
1772**/
1773
1774void
1775tr_peerMgrSetBlame( tr_torrent     * tor,
1776                    tr_piece_index_t pieceIndex,
1777                    int              success )
1778{
1779    if( !success )
1780    {
1781        int        peerCount, i;
1782        Torrent *  t = tor->torrentPeers;
1783        tr_peer ** peers;
1784
1785        assert( torrentIsLocked( t ) );
1786
1787        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1788        for( i = 0; i < peerCount; ++i )
1789        {
1790            tr_peer * peer = peers[i];
1791            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1792            {
1793                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1794                        tr_atomAddrStr( peer->atom ),
1795                        pieceIndex, (int)peer->strikes + 1 );
1796                addStrike( t, peer );
1797            }
1798        }
1799    }
1800}
1801
1802int
1803tr_pexCompare( const void * va, const void * vb )
1804{
1805    const tr_pex * a = va;
1806    const tr_pex * b = vb;
1807    int i;
1808
1809    assert( tr_isPex( a ) );
1810    assert( tr_isPex( b ) );
1811
1812    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1813        return i;
1814
1815    if( a->port != b->port )
1816        return a->port < b->port ? -1 : 1;
1817
1818    return 0;
1819}
1820
1821#if 0
1822static int
1823peerPrefersCrypto( const tr_peer * peer )
1824{
1825    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1826        return TRUE;
1827
1828    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1829        return FALSE;
1830
1831    return tr_peerIoIsEncrypted( peer->io );
1832}
1833#endif
1834
1835/* better goes first */
1836static int
1837compareAtomsByUsefulness( const void * va, const void *vb )
1838{
1839    const struct peer_atom * a = * (const struct peer_atom**) va;
1840    const struct peer_atom * b = * (const struct peer_atom**) vb;
1841
1842    assert( tr_isAtom( a ) );
1843    assert( tr_isAtom( b ) );
1844
1845    if( a->piece_data_time != b->piece_data_time )
1846        return a->piece_data_time > b->piece_data_time ? -1 : 1;
1847    if( a->from != b->from )
1848        return a->from < b->from ? -1 : 1;
1849    if( a->numFails != b->numFails )
1850        return a->numFails < b->numFails ? -1 : 1;
1851
1852    return 0;
1853}
1854
1855int
1856tr_peerMgrGetPeers( tr_torrent   * tor,
1857                    tr_pex      ** setme_pex,
1858                    uint8_t        af,
1859                    uint8_t        list_mode,
1860                    int            maxCount )
1861{
1862    int i;
1863    int n;
1864    int count = 0;
1865    int atomCount = 0;
1866    const Torrent * t = tor->torrentPeers;
1867    struct peer_atom ** atoms = NULL;
1868    tr_pex * pex;
1869    tr_pex * walk;
1870
1871    assert( tr_isTorrent( tor ) );
1872    assert( setme_pex != NULL );
1873    assert( af==TR_AF_INET || af==TR_AF_INET6 );
1874    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
1875
1876    managerLock( t->manager );
1877
1878    /**
1879    ***  build a list of atoms
1880    **/
1881
1882    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
1883    {
1884        int i;
1885        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1886        atomCount = tr_ptrArraySize( &t->peers );
1887        atoms = tr_new( struct peer_atom *, atomCount );
1888        for( i=0; i<atomCount; ++i )
1889            atoms[i] = peers[i]->atom;
1890    }
1891    else /* TR_PEERS_ALL */
1892    {
1893        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
1894        atomCount = tr_ptrArraySize( &t->pool );
1895        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
1896    }
1897
1898    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
1899
1900    /**
1901    ***  add the first N of them into our return list
1902    **/
1903
1904    n = MIN( atomCount, maxCount );
1905    pex = walk = tr_new0( tr_pex, n );
1906
1907    for( i=0; i<atomCount && count<n; ++i )
1908    {
1909        const struct peer_atom * atom = atoms[i];
1910        if( atom->addr.type == af )
1911        {
1912            assert( tr_isAddress( &atom->addr ) );
1913            walk->addr = atom->addr;
1914            walk->port = atom->port;
1915            walk->flags = atom->flags;
1916            ++count;
1917            ++walk;
1918        }
1919    }
1920
1921    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
1922
1923    assert( ( walk - pex ) == count );
1924    *setme_pex = pex;
1925
1926    /* cleanup */
1927    tr_free( atoms );
1928    managerUnlock( t->manager );
1929    return count;
1930}
1931
1932static void atomPulse      ( int, short, void * );
1933static void bandwidthPulse ( int, short, void * );
1934static void rechokePulse   ( int, short, void * );
1935static void reconnectPulse ( int, short, void * );
1936
1937static struct event *
1938createTimer( int msec, void (*callback)(int, short, void *), void * cbdata )
1939{
1940    struct event * timer = tr_new0( struct event, 1 );
1941    evtimer_set( timer, callback, cbdata );
1942    tr_timerAddMsec( timer, msec );
1943    return timer;
1944}
1945
1946static void
1947ensureMgrTimersExist( struct tr_peerMgr * m )
1948{
1949    if( m->atomTimer == NULL )
1950        m->atomTimer = createTimer( ATOM_PERIOD_MSEC, atomPulse, m );
1951
1952    if( m->bandwidthTimer == NULL )
1953        m->bandwidthTimer = createTimer( BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
1954
1955    if( m->rechokeTimer == NULL )
1956        m->rechokeTimer = createTimer( RECHOKE_PERIOD_MSEC, rechokePulse, m );
1957
1958    if( m->reconnectTimer == NULL )
1959        m->reconnectTimer = createTimer( RECONNECT_PERIOD_MSEC, reconnectPulse, m );
1960
1961    if( m->refillUpkeepTimer == NULL )
1962        m->refillUpkeepTimer = createTimer( REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
1963}
1964
1965void
1966tr_peerMgrStartTorrent( tr_torrent * tor )
1967{
1968    Torrent * t = tor->torrentPeers;
1969
1970    assert( t != NULL );
1971    managerLock( t->manager );
1972    ensureMgrTimersExist( t->manager );
1973
1974    t->isRunning = TRUE;
1975
1976    rechokePulse( 0, 0, t->manager );
1977    managerUnlock( t->manager );
1978}
1979
1980static void
1981stopTorrent( Torrent * t )
1982{
1983    int i, n;
1984
1985    assert( torrentIsLocked( t ) );
1986
1987    t->isRunning = FALSE;
1988
1989    /* disconnect the peers. */
1990    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
1991        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
1992    tr_ptrArrayClear( &t->peers );
1993
1994    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1995     * which removes the handshake from t->outgoingHandshakes... */
1996    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1997        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1998}
1999
2000void
2001tr_peerMgrStopTorrent( tr_torrent * tor )
2002{
2003    Torrent * t = tor->torrentPeers;
2004
2005    managerLock( t->manager );
2006
2007    stopTorrent( t );
2008
2009    managerUnlock( t->manager );
2010}
2011
2012void
2013tr_peerMgrAddTorrent( tr_peerMgr * manager,
2014                      tr_torrent * tor )
2015{
2016    managerLock( manager );
2017
2018    assert( tor );
2019    assert( tor->torrentPeers == NULL );
2020
2021    tor->torrentPeers = torrentConstructor( manager, tor );
2022
2023    managerUnlock( manager );
2024}
2025
2026void
2027tr_peerMgrRemoveTorrent( tr_torrent * tor )
2028{
2029    tr_torrentLock( tor );
2030
2031    stopTorrent( tor->torrentPeers );
2032    torrentDestructor( tor->torrentPeers );
2033
2034    tr_torrentUnlock( tor );
2035}
2036
2037void
2038tr_peerMgrTorrentAvailability( const tr_torrent * tor,
2039                               int8_t           * tab,
2040                               unsigned int       tabCount )
2041{
2042    tr_piece_index_t   i;
2043    const Torrent *    t;
2044    float              interval;
2045    tr_bool            isSeed;
2046    int                peerCount;
2047    const tr_peer **   peers;
2048    tr_torrentLock( tor );
2049
2050    t = tor->torrentPeers;
2051    tor = t->tor;
2052    interval = tor->info.pieceCount / (float)tabCount;
2053    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
2054    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2055    peerCount = tr_ptrArraySize( &t->peers );
2056
2057    memset( tab, 0, tabCount );
2058
2059    for( i = 0; tor && i < tabCount; ++i )
2060    {
2061        const int piece = i * interval;
2062
2063        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2064            tab[i] = -1;
2065        else if( peerCount ) {
2066            int j;
2067            for( j = 0; j < peerCount; ++j )
2068                if( tr_bitsetHas( &peers[j]->have, i ) )
2069                    ++tab[i];
2070        }
2071    }
2072
2073    tr_torrentUnlock( tor );
2074}
2075
2076/* Returns the pieces that are available from peers */
2077tr_bitfield*
2078tr_peerMgrGetAvailable( const tr_torrent * tor )
2079{
2080    int i;
2081    int peerCount;
2082    Torrent * t = tor->torrentPeers;
2083    const tr_peer ** peers;
2084    tr_bitfield * pieces;
2085    managerLock( t->manager );
2086
2087    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
2088    peerCount = tr_ptrArraySize( &t->peers );
2089    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2090    for( i=0; i<peerCount; ++i )
2091        tr_bitsetOr( pieces, &peers[i]->have );
2092
2093    managerUnlock( t->manager );
2094    return pieces;
2095}
2096
2097void
2098tr_peerMgrTorrentStats( tr_torrent       * tor,
2099                        int              * setmePeersKnown,
2100                        int              * setmePeersConnected,
2101                        int              * setmeSeedsConnected,
2102                        int              * setmeWebseedsSendingToUs,
2103                        int              * setmePeersSendingToUs,
2104                        int              * setmePeersGettingFromUs,
2105                        int              * setmePeersFrom )
2106{
2107    int i, size;
2108    const Torrent * t = tor->torrentPeers;
2109    const tr_peer ** peers;
2110    const tr_webseed ** webseeds;
2111
2112    managerLock( t->manager );
2113
2114    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2115    size = tr_ptrArraySize( &t->peers );
2116
2117    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
2118    *setmePeersConnected       = 0;
2119    *setmeSeedsConnected       = 0;
2120    *setmePeersGettingFromUs   = 0;
2121    *setmePeersSendingToUs     = 0;
2122    *setmeWebseedsSendingToUs  = 0;
2123
2124    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2125        setmePeersFrom[i] = 0;
2126
2127    for( i=0; i<size; ++i )
2128    {
2129        const tr_peer * peer = peers[i];
2130        const struct peer_atom * atom = peer->atom;
2131
2132        if( peer->io == NULL ) /* not connected */
2133            continue;
2134
2135        ++*setmePeersConnected;
2136
2137        ++setmePeersFrom[atom->from];
2138
2139        if( clientIsDownloadingFrom( tor, peer ) )
2140            ++*setmePeersSendingToUs;
2141
2142        if( clientIsUploadingTo( peer ) )
2143            ++*setmePeersGettingFromUs;
2144
2145        if( atom->flags & ADDED_F_SEED_FLAG )
2146            ++*setmeSeedsConnected;
2147    }
2148
2149    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2150    size = tr_ptrArraySize( &t->webseeds );
2151    for( i=0; i<size; ++i )
2152        if( tr_webseedIsActive( webseeds[i] ) )
2153            ++*setmeWebseedsSendingToUs;
2154
2155    managerUnlock( t->manager );
2156}
2157
2158float
2159tr_peerMgrGetWebseedSpeed( const tr_torrent * tor, uint64_t now )
2160{
2161    int i;
2162    float tmp;
2163    float ret = 0;
2164
2165    const Torrent * t = tor->torrentPeers;
2166    const int n = tr_ptrArraySize( &t->webseeds );
2167    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2168
2169    for( i=0; i<n; ++i )
2170        if( tr_webseedGetSpeed( webseeds[i], now, &tmp ) )
2171            ret += tmp;
2172
2173    return ret;
2174}
2175
2176
2177float*
2178tr_peerMgrWebSpeeds( const tr_torrent * tor )
2179{
2180    const Torrent * t = tor->torrentPeers;
2181    const tr_webseed ** webseeds;
2182    int i;
2183    int webseedCount;
2184    float * ret;
2185    uint64_t now;
2186
2187    assert( t->manager );
2188    managerLock( t->manager );
2189
2190    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2191    webseedCount = tr_ptrArraySize( &t->webseeds );
2192    assert( webseedCount == tor->info.webseedCount );
2193    ret = tr_new0( float, webseedCount );
2194    now = tr_date( );
2195
2196    for( i=0; i<webseedCount; ++i )
2197        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
2198            ret[i] = -1.0;
2199
2200    managerUnlock( t->manager );
2201    return ret;
2202}
2203
2204double
2205tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
2206{
2207    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
2208}
2209
2210
2211struct tr_peer_stat *
2212tr_peerMgrPeerStats( const tr_torrent    * tor,
2213                     int                 * setmeCount )
2214{
2215    int i, size;
2216    const Torrent * t = tor->torrentPeers;
2217    const tr_peer ** peers;
2218    tr_peer_stat * ret;
2219    uint64_t now;
2220
2221    assert( t->manager );
2222    managerLock( t->manager );
2223
2224    size = tr_ptrArraySize( &t->peers );
2225    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2226    ret = tr_new0( tr_peer_stat, size );
2227    now = tr_date( );
2228
2229    for( i=0; i<size; ++i )
2230    {
2231        char *                   pch;
2232        const tr_peer *          peer = peers[i];
2233        const struct peer_atom * atom = peer->atom;
2234        tr_peer_stat *           stat = ret + i;
2235
2236        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2237        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2238                   sizeof( stat->client ) );
2239        stat->port                = ntohs( peer->atom->port );
2240        stat->from                = atom->from;
2241        stat->progress            = peer->progress;
2242        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2243        stat->rateToPeer          = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
2244        stat->rateToClient        = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
2245        stat->peerIsChoked        = peer->peerIsChoked;
2246        stat->peerIsInterested    = peer->peerIsInterested;
2247        stat->clientIsChoked      = peer->clientIsChoked;
2248        stat->clientIsInterested  = peer->clientIsInterested;
2249        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2250        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2251        stat->isUploadingTo       = clientIsUploadingTo( peer );
2252        stat->isSeed              = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
2253        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2254        stat->pendingReqsToClient = peer->pendingReqsToClient;
2255
2256        pch = stat->flagStr;
2257        if( t->optimistic == peer ) *pch++ = 'O';
2258        if( stat->isDownloadingFrom ) *pch++ = 'D';
2259        else if( stat->clientIsInterested ) *pch++ = 'd';
2260        if( stat->isUploadingTo ) *pch++ = 'U';
2261        else if( stat->peerIsInterested ) *pch++ = 'u';
2262        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2263        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2264        if( stat->isEncrypted ) *pch++ = 'E';
2265        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2266        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2267        if( stat->isIncoming ) *pch++ = 'I';
2268        *pch = '\0';
2269    }
2270
2271    *setmeCount = size;
2272
2273    managerUnlock( t->manager );
2274    return ret;
2275}
2276
2277/**
2278***
2279**/
2280
2281struct ChokeData
2282{
2283    tr_bool         doUnchoke;
2284    tr_bool         isInterested;
2285    tr_bool         isChoked;
2286    int             rate;
2287    tr_peer *       peer;
2288};
2289
2290static int
2291compareChoke( const void * va,
2292              const void * vb )
2293{
2294    const struct ChokeData * a = va;
2295    const struct ChokeData * b = vb;
2296
2297    if( a->rate != b->rate ) /* prefer higher overall speeds */
2298        return a->rate > b->rate ? -1 : 1;
2299
2300    if( a->isChoked != b->isChoked ) /* prefer unchoked */
2301        return a->isChoked ? 1 : -1;
2302
2303    return 0;
2304}
2305
2306static int
2307isNew( const tr_peer * peer )
2308{
2309    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2310}
2311
2312static int
2313isSame( const tr_peer * peer )
2314{
2315    return peer && peer->client && strstr( peer->client, "Transmission" );
2316}
2317
2318/**
2319***
2320**/
2321
2322static void
2323rechokeTorrent( Torrent * t, const uint64_t now )
2324{
2325    int i, size, unchokedInterested;
2326    const int peerCount = tr_ptrArraySize( &t->peers );
2327    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2328    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2329    const tr_session * session = t->manager->session;
2330    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
2331
2332    assert( torrentIsLocked( t ) );
2333
2334    /* sort the peers by preference and rate */
2335    for( i = 0, size = 0; i < peerCount; ++i )
2336    {
2337        tr_peer * peer = peers[i];
2338        struct peer_atom * atom = peer->atom;
2339
2340        if( peer->progress >= 1.0 ) /* choke all seeds */
2341        {
2342            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2343        }
2344        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2345        {
2346            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2347        }
2348        else if( chokeAll ) /* choke everyone if we're not uploading */
2349        {
2350            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2351        }
2352        else
2353        {
2354            struct ChokeData * n = &choke[size++];
2355            n->peer         = peer;
2356            n->isInterested = peer->peerIsInterested;
2357            n->isChoked     = peer->peerIsChoked;
2358            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
2359        }
2360    }
2361
2362    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
2363
2364    /**
2365     * Reciprocation and number of uploads capping is managed by unchoking
2366     * the N peers which have the best upload rate and are interested.
2367     * This maximizes the client's download rate. These N peers are
2368     * referred to as downloaders, because they are interested in downloading
2369     * from the client.
2370     *
2371     * Peers which have a better upload rate (as compared to the downloaders)
2372     * but aren't interested get unchoked. If they become interested, the
2373     * downloader with the worst upload rate gets choked. If a client has
2374     * a complete file, it uses its upload rate rather than its download
2375     * rate to decide which peers to unchoke.
2376     */
2377    unchokedInterested = 0;
2378    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
2379        choke[i].doUnchoke = 1;
2380        if( choke[i].isInterested )
2381            ++unchokedInterested;
2382    }
2383
2384    /* optimistic unchoke */
2385    if( i < size )
2386    {
2387        int n;
2388        struct ChokeData * c;
2389        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2390
2391        for( ; i<size; ++i )
2392        {
2393            if( choke[i].isInterested )
2394            {
2395                const tr_peer * peer = choke[i].peer;
2396                int x = 1, y;
2397                if( isNew( peer ) ) x *= 3;
2398                if( isSame( peer ) ) x *= 3;
2399                for( y=0; y<x; ++y )
2400                    tr_ptrArrayAppend( &randPool, &choke[i] );
2401            }
2402        }
2403
2404        if(( n = tr_ptrArraySize( &randPool )))
2405        {
2406            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2407            c->doUnchoke = 1;
2408            t->optimistic = c->peer;
2409        }
2410
2411        tr_ptrArrayDestruct( &randPool, NULL );
2412    }
2413
2414    for( i=0; i<size; ++i )
2415        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2416
2417    /* cleanup */
2418    tr_free( choke );
2419}
2420
2421static void
2422rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
2423{
2424    uint64_t now;
2425    tr_torrent * tor = NULL;
2426    tr_peerMgr * mgr = vmgr;
2427    managerLock( mgr );
2428
2429    now = tr_date( );
2430    while(( tor = tr_torrentNext( mgr->session, tor )))
2431        if( tor->isRunning )
2432            rechokeTorrent( tor->torrentPeers, now );
2433
2434    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
2435    managerUnlock( mgr );
2436}
2437
2438/***
2439****
2440****  Life and Death
2441****
2442***/
2443
2444typedef enum
2445{
2446    TR_CAN_KEEP,
2447    TR_CAN_CLOSE,
2448    TR_MUST_CLOSE,
2449}
2450tr_close_type_t;
2451
2452static tr_close_type_t
2453shouldPeerBeClosed( const Torrent    * t,
2454                    const tr_peer    * peer,
2455                    int                peerCount,
2456                    const time_t       now )
2457{
2458    const tr_torrent *       tor = t->tor;
2459    const struct peer_atom * atom = peer->atom;
2460
2461    /* if it's marked for purging, close it */
2462    if( peer->doPurge )
2463    {
2464        tordbg( t, "purging peer %s because its doPurge flag is set",
2465                tr_atomAddrStr( atom ) );
2466        return TR_MUST_CLOSE;
2467    }
2468
2469    /* if we're seeding and the peer has everything we have,
2470     * and enough time has passed for a pex exchange, then disconnect */
2471    if( tr_torrentIsSeed( tor ) )
2472    {
2473        int peerHasEverything;
2474        if( atom->flags & ADDED_F_SEED_FLAG )
2475            peerHasEverything = TRUE;
2476        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2477            peerHasEverything = FALSE;
2478        else {
2479            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2480            tr_bitsetDifference( tmp, &peer->have );
2481            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2482            tr_bitfieldFree( tmp );
2483        }
2484
2485        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2486        {
2487            tordbg( t, "purging peer %s because we're both seeds",
2488                    tr_atomAddrStr( atom ) );
2489            return TR_MUST_CLOSE;
2490        }
2491    }
2492
2493    /* disconnect if it's been too long since piece data has been transferred.
2494     * this is on a sliding scale based on number of available peers... */
2495    {
2496        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2497        /* if we have >= relaxIfFewerThan, strictness is 100%.
2498         * if we have zero connections, strictness is 0% */
2499        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2500                               ? 1.0
2501                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2502        const int lo = MIN_UPLOAD_IDLE_SECS;
2503        const int hi = MAX_UPLOAD_IDLE_SECS;
2504        const int limit = hi - ( ( hi - lo ) * strictness );
2505        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2506/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2507        if( idleTime > limit ) {
2508            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2509                       tr_atomAddrStr( atom ), idleTime );
2510            return TR_CAN_CLOSE;
2511        }
2512    }
2513
2514    return TR_CAN_KEEP;
2515}
2516
2517static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
2518
2519static tr_peer **
2520getPeersToClose( Torrent * t, tr_close_type_t closeType, const time_t now, int * setmeSize )
2521{
2522    int i, peerCount, outsize;
2523    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2524    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2525
2526    assert( torrentIsLocked( t ) );
2527
2528    for( i = outsize = 0; i < peerCount; ++i )
2529        if( shouldPeerBeClosed( t, peers[i], peerCount, now ) == closeType )
2530            ret[outsize++] = peers[i];
2531
2532    sortPeersByLivelinessReverse ( ret, NULL, outsize, tr_date( ) );
2533
2534    *setmeSize = outsize;
2535    return ret;
2536}
2537
2538static int
2539compareCandidates( const void * va, const void * vb )
2540{
2541    const struct peer_atom * a = *(const struct peer_atom**) va;
2542    const struct peer_atom * b = *(const struct peer_atom**) vb;
2543
2544    /* <Charles> Here we would probably want to try reconnecting to
2545     * peers that had most recently given us data. Lots of users have
2546     * trouble with resets due to their routers and/or ISPs. This way we
2547     * can quickly recover from an unwanted reset. So we sort
2548     * piece_data_time in descending order.
2549     */
2550
2551    if( a->piece_data_time != b->piece_data_time )
2552        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2553
2554    if( a->numFails != b->numFails )
2555        return a->numFails < b->numFails ? -1 : 1;
2556
2557    if( a->time != b->time )
2558        return a->time < b->time ? -1 : 1;
2559
2560    /* In order to avoid fragmenting the swarm, peers from trackers and
2561     * from the DHT should be preferred to peers from PEX. */
2562    if( a->from != b->from )
2563        return a->from < b->from ? -1 : 1;
2564
2565    return 0;
2566}
2567
2568static int
2569getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
2570{
2571    int sec;
2572
2573    /* if we were recently connected to this peer and transferring piece
2574     * data, try to reconnect to them sooner rather that later -- we don't
2575     * want network troubles to get in the way of a good peer. */
2576    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2577        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2578
2579    /* don't allow reconnects more often than our minimum */
2580    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2581        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2582
2583    /* otherwise, the interval depends on how many times we've tried
2584     * and failed to connect to the peer */
2585    else switch( atom->numFails ) {
2586        case 0: sec = 0; break;
2587        case 1: sec = 5; break;
2588        case 2: sec = 2 * 60; break;
2589        case 3: sec = 15 * 60; break;
2590        case 4: sec = 30 * 60; break;
2591        case 5: sec = 60 * 60; break;
2592        default: sec = 120 * 60; break;
2593    }
2594
2595    return sec;
2596}
2597
2598static struct peer_atom **
2599getPeerCandidates( Torrent * t, const time_t now, int * setmeSize )
2600{
2601    int                 i, atomCount, retCount;
2602    struct peer_atom ** atoms;
2603    struct peer_atom ** ret;
2604    const int           seed = tr_torrentIsSeed( t->tor );
2605
2606    assert( torrentIsLocked( t ) );
2607
2608    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2609    ret = tr_new( struct peer_atom*, atomCount );
2610    for( i = retCount = 0; i < atomCount; ++i )
2611    {
2612        int                interval;
2613        struct peer_atom * atom = atoms[i];
2614
2615        /* peer fed us too much bad data ... we only keep it around
2616         * now to weed it out in case someone sends it to us via pex */
2617        if( atom->myflags & MYFLAG_BANNED )
2618            continue;
2619
2620        /* peer was unconnectable before, so we're not going to keep trying.
2621         * this is needs a separate flag from `banned', since if they try
2622         * to connect to us later, we'll let them in */
2623        if( atom->myflags & MYFLAG_UNREACHABLE )
2624            continue;
2625
2626        /* no need to connect if we're both seeds... */
2627        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2628                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2629            continue;
2630
2631        /* don't reconnect too often */
2632        interval = getReconnectIntervalSecs( atom, now );
2633        if( ( now - atom->time ) < interval )
2634        {
2635            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2636                    i, tr_atomAddrStr( atom ), interval );
2637            continue;
2638        }
2639
2640        /* Don't connect to peers in our blocklist */
2641        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2642            continue;
2643
2644        /* we don't need two connections to the same peer... */
2645        if( peerIsInUse( t, atom ) )
2646            continue;
2647
2648        ret[retCount++] = atom;
2649    }
2650
2651    if( retCount != 0 )
2652        qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2653    *setmeSize = retCount;
2654    return ret;
2655}
2656
2657static void
2658closePeer( Torrent * t, tr_peer * peer )
2659{
2660    struct peer_atom * atom;
2661
2662    assert( t != NULL );
2663    assert( peer != NULL );
2664
2665    atom = peer->atom;
2666
2667    /* if we transferred piece data, then they might be good peers,
2668       so reset their `numFails' weight to zero.  otherwise we connected
2669       to them fruitlessly, so mark it as another fail */
2670    if( atom->piece_data_time )
2671        atom->numFails = 0;
2672    else
2673        ++atom->numFails;
2674
2675    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2676    removePeer( t, peer );
2677}
2678
2679static void
2680reconnectTorrent( Torrent * t )
2681{
2682    static time_t prevTime = 0;
2683    static int    newConnectionsThisSecond = 0;
2684    const time_t  now = tr_time( );
2685
2686    if( prevTime != now )
2687    {
2688        prevTime = now;
2689        newConnectionsThisSecond = 0;
2690    }
2691
2692    if( !t->isRunning )
2693    {
2694        removeAllPeers( t );
2695    }
2696    else
2697    {
2698        int i;
2699        int mustCloseCount;
2700        int maxCandidates;
2701        struct tr_peer ** mustClose;
2702
2703        /* disconnect the really bad peers */
2704        mustClose = getPeersToClose( t, TR_MUST_CLOSE, now, &mustCloseCount );
2705        for( i=0; i<mustCloseCount; ++i )
2706            closePeer( t, mustClose[i] );
2707        tr_free( mustClose );
2708
2709        /* decide how many peers can we try to add in this pass */
2710        maxCandidates = MAX_RECONNECTIONS_PER_PULSE;
2711        if( tr_announcerHasBacklog( t->manager->session->announcer ) )
2712            maxCandidates /= 2;
2713        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2714        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2715
2716        /* select the best candidates, if they are requested */
2717        if( maxCandidates == 0 )
2718        {
2719            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
2720                       "NO connection candidates needed, %d atoms, "
2721                       "max per pulse is %d",
2722                       t->tor->info.name, mustCloseCount,
2723                       tr_ptrArraySize( &t->pool ),
2724                       MAX_RECONNECTIONS_PER_PULSE );
2725
2726            tordbg( t, "maxCandidates is %d, MAX_RECONNECTIONS_PER_PULSE is %d, "
2727                       "getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2728                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2729                       maxCandidates, MAX_RECONNECTIONS_PER_PULSE,
2730                       getPeerCount( t ), getMaxPeerCount( t->tor ),
2731                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2732        }
2733        else
2734        {
2735            int canCloseCount = 0;
2736            int candidateCount;
2737            struct peer_atom ** candidates;
2738
2739            candidates = getPeerCandidates( t, now, &candidateCount );
2740            maxCandidates = MIN( maxCandidates, candidateCount );
2741
2742            /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2743            if( maxCandidates != 0 )
2744            {
2745                struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, now, &canCloseCount );
2746                for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2747                   closePeer( t, canClose[i] );
2748                tr_free( canClose );
2749            }
2750
2751            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
2752                       "%d can-close connections, %d connection candidates, "
2753                       "%d atoms, max per pulse is %d",
2754                       t->tor->info.name, mustCloseCount,
2755                       canCloseCount, candidateCount,
2756                       tr_ptrArraySize( &t->pool ), MAX_RECONNECTIONS_PER_PULSE );
2757
2758            tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2759                       " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2760                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2761                       candidateCount, MAX_RECONNECTIONS_PER_PULSE,
2762                       getPeerCount( t ), getMaxPeerCount( t->tor ),
2763                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2764
2765            /* add some new ones */
2766            for( i=0; i<maxCandidates; ++i )
2767            {
2768                tr_peerMgr        * mgr = t->manager;
2769                struct peer_atom  * atom = candidates[i];
2770                tr_peerIo         * io;
2771
2772                tordbg( t, "Starting an OUTGOING connection with %s",
2773                        tr_atomAddrStr( atom ) );
2774
2775                io = tr_peerIoNewOutgoing( mgr->session,
2776                                           mgr->session->bandwidth,
2777                                           &atom->addr,
2778                                           atom->port,
2779                                           t->tor->info.hash,
2780                                           t->tor->completeness == TR_SEED );
2781
2782                if( io == NULL )
2783                {
2784                    tordbg( t, "peerIo not created; marking peer %s as unreachable",
2785                            tr_atomAddrStr( atom ) );
2786                    atom->myflags |= MYFLAG_UNREACHABLE;
2787                }
2788                else
2789                {
2790                    tr_handshake * handshake = tr_handshakeNew( io,
2791                                                                mgr->session->encryptionMode,
2792                                                                myHandshakeDoneCB,
2793                                                                mgr );
2794
2795                    assert( tr_peerIoGetTorrentHash( io ) );
2796
2797                    tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2798
2799                    ++newConnectionsThisSecond;
2800
2801                    tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2802                                             handshakeCompare );
2803                }
2804
2805                atom->time = now;
2806            }
2807            tr_free( candidates );
2808        }
2809    }
2810}
2811
2812struct peer_liveliness
2813{
2814    tr_peer * peer;
2815    void * clientData;
2816    time_t pieceDataTime;
2817    time_t time;
2818    int speed;
2819    tr_bool doPurge;
2820};
2821
2822static int
2823comparePeerLiveliness( const void * va, const void * vb )
2824{
2825    const struct peer_liveliness * a = va;
2826    const struct peer_liveliness * b = vb;
2827
2828    if( a->doPurge != b->doPurge )
2829        return a->doPurge ? 1 : -1;
2830
2831    if( a->speed != b->speed ) /* faster goes first */
2832        return a->speed > b->speed ? -1 : 1;
2833
2834    /* the one to give us data more recently goes first */
2835    if( a->pieceDataTime != b->pieceDataTime )
2836        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
2837
2838    /* the one we connected to most recently goes first */
2839    if( a->time != b->time )
2840        return a->time > b->time ? -1 : 1;
2841
2842    return 0;
2843}
2844
2845static int
2846comparePeerLivelinessReverse( const void * va, const void * vb )
2847{
2848    return -comparePeerLiveliness (va, vb);
2849}
2850
2851static void
2852sortPeersByLivelinessImpl( tr_peer  ** peers,
2853                           void     ** clientData,
2854                           int         n,
2855                           uint64_t    now,
2856                           int (*compare) ( const void *va, const void *vb ) )
2857{
2858    int i;
2859    struct peer_liveliness *lives, *l;
2860
2861    /* build a sortable array of peer + extra info */
2862    lives = l = tr_new0( struct peer_liveliness, n );
2863    for( i=0; i<n; ++i, ++l )
2864    {
2865        tr_peer * p = peers[i];
2866        l->peer = p;
2867        l->doPurge = p->doPurge;
2868        l->pieceDataTime = p->atom->piece_data_time;
2869        l->time = p->atom->time;
2870        l->speed = 1024.0 * (   tr_peerGetPieceSpeed( p, now, TR_UP )
2871                              + tr_peerGetPieceSpeed( p, now, TR_DOWN ) );
2872        if( clientData )
2873            l->clientData = clientData[i];
2874    }
2875
2876    /* sort 'em */
2877    assert( n == ( l - lives ) );
2878    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
2879
2880    /* build the peer array */
2881    for( i=0, l=lives; i<n; ++i, ++l ) {
2882        peers[i] = l->peer;
2883        if( clientData )
2884            clientData[i] = l->clientData;
2885    }
2886    assert( n == ( l - lives ) );
2887
2888    /* cleanup */
2889    tr_free( lives );
2890}
2891
2892static void
2893sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2894{
2895    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
2896}
2897
2898static void
2899sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2900{
2901    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
2902}
2903
2904
2905static void
2906enforceTorrentPeerLimit( Torrent * t, uint64_t now )
2907{
2908    int n = tr_ptrArraySize( &t->peers );
2909    const int max = tr_torrentGetPeerLimit( t->tor );
2910    if( n > max )
2911    {
2912        void * base = tr_ptrArrayBase( &t->peers );
2913        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
2914        sortPeersByLiveliness( peers, NULL, n, now );
2915        while( n > max )
2916            closePeer( t, peers[--n] );
2917        tr_free( peers );
2918    }
2919}
2920
2921static void
2922enforceSessionPeerLimit( tr_session * session, uint64_t now )
2923{
2924    int n = 0;
2925    tr_torrent * tor = NULL;
2926    const int max = tr_sessionGetPeerLimit( session );
2927
2928    /* count the total number of peers */
2929    while(( tor = tr_torrentNext( session, tor )))
2930        n += tr_ptrArraySize( &tor->torrentPeers->peers );
2931
2932    /* if there are too many, prune out the worst */
2933    if( n > max )
2934    {
2935        tr_peer ** peers = tr_new( tr_peer*, n );
2936        Torrent ** torrents = tr_new( Torrent*, n );
2937
2938        /* populate the peer array */
2939        n = 0;
2940        tor = NULL;
2941        while(( tor = tr_torrentNext( session, tor ))) {
2942            int i;
2943            Torrent * t = tor->torrentPeers;
2944            const int tn = tr_ptrArraySize( &t->peers );
2945            for( i=0; i<tn; ++i, ++n ) {
2946                peers[n] = tr_ptrArrayNth( &t->peers, i );
2947                torrents[n] = t;
2948            }
2949        }
2950
2951        /* sort 'em */
2952        sortPeersByLiveliness( peers, (void**)torrents, n, now );
2953
2954        /* cull out the crappiest */
2955        while( n-- > max )
2956            closePeer( torrents[n], peers[n] );
2957
2958        /* cleanup */
2959        tr_free( torrents );
2960        tr_free( peers );
2961    }
2962}
2963
2964struct reconnectTorrentStruct
2965{
2966    tr_torrent * torrent;
2967    int salt;
2968};
2969
2970static int
2971compareReconnectTorrents( const void * va, const void * vb )
2972{
2973    int ai, bi;
2974    const struct reconnectTorrentStruct * a = va;
2975    const struct reconnectTorrentStruct * b = vb;
2976
2977    /* primary key: higher priority goes first */
2978    ai = tr_torrentGetPriority( a->torrent );
2979    bi = tr_torrentGetPriority( b->torrent );
2980    if( ai != bi )
2981        return ai > bi ? -1 : 1;
2982
2983    /* secondary key: since users tend to stare at the screens
2984     * watching their downloads' progress, give downloads a
2985     * first shot at attempting outbound peer connections. */
2986    ai = tr_torrentIsSeed( a->torrent );
2987    bi = tr_torrentIsSeed( b->torrent );
2988    if( ai != bi )
2989        return bi ? -1 : 1;
2990
2991    /* tertiary key: random */
2992    if( a->salt != b->salt )
2993        return a->salt - b->salt;
2994
2995    return 0;
2996}
2997
2998static void
2999reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3000{
3001    tr_torrent * tor;
3002    tr_peerMgr * mgr = vmgr;
3003    struct reconnectTorrentStruct * torrents;
3004    int torrentCount;
3005    int i;
3006    uint64_t now;
3007    managerLock( mgr );
3008
3009    now = tr_date( );
3010
3011    /**
3012    ***  enforce the per-session and per-torrent peer limits
3013    **/
3014
3015    /* if we're over the per-torrent peer limits, cull some peers */
3016    tor = NULL;
3017    while(( tor = tr_torrentNext( mgr->session, tor )))
3018        if( tor->isRunning )
3019            enforceTorrentPeerLimit( tor->torrentPeers, now );
3020
3021    /* if we're over the per-session peer limits, cull some peers */
3022    enforceSessionPeerLimit( mgr->session, now );
3023
3024    /**
3025    ***  try to make new peer connections
3026    **/
3027
3028    torrentCount = 0;
3029    torrents = tr_new( struct reconnectTorrentStruct,
3030                       mgr->session->torrentCount );
3031    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3032        if( tor->isRunning ) {
3033            struct reconnectTorrentStruct * r = torrents + torrentCount++;
3034            r->torrent = tor;
3035            r->salt = tr_cryptoWeakRandInt( 1024 );
3036        }
3037    }
3038    qsort( torrents,
3039           torrentCount, sizeof( struct reconnectTorrentStruct ),
3040           compareReconnectTorrents );
3041    for( i=0; i<torrentCount; ++i )
3042        reconnectTorrent( torrents[i].torrent->torrentPeers );
3043
3044
3045    /* cleanup */
3046    tr_free( torrents );
3047    tr_timerAddMsec( mgr->reconnectTimer, RECONNECT_PERIOD_MSEC );
3048    managerUnlock( mgr );
3049}
3050
3051/****
3052*****
3053*****  BANDWIDTH ALLOCATION
3054*****
3055****/
3056
3057static void
3058pumpAllPeers( tr_peerMgr * mgr )
3059{
3060    tr_torrent * tor = NULL;
3061
3062    while(( tor = tr_torrentNext( mgr->session, tor )))
3063    {
3064        int j;
3065        Torrent * t = tor->torrentPeers;
3066
3067        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3068        {
3069            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3070            tr_peerMsgsPulse( peer->msgs );
3071        }
3072    }
3073}
3074
3075static void
3076bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3077{
3078    tr_torrent * tor = NULL;
3079    tr_peerMgr * mgr = vmgr;
3080    managerLock( mgr );
3081
3082    /* FIXME: this next line probably isn't necessary... */
3083    pumpAllPeers( mgr );
3084
3085    /* allocate bandwidth to the peers */
3086    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3087    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3088
3089    /* possibly stop torrents that have seeded enough */
3090    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3091        if( tor->needsSeedRatioCheck ) {
3092            tor->needsSeedRatioCheck = FALSE;
3093            tr_torrentCheckSeedRatio( tor );
3094        }
3095    }
3096
3097    /* run the completeness check for any torrents that need it */
3098    tor = NULL;
3099    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3100        if( tor->torrentPeers->needsCompletenessCheck ) {
3101            tor->torrentPeers->needsCompletenessCheck  = FALSE;
3102            tr_torrentRecheckCompleteness( tor );
3103        }
3104    }
3105
3106    /* possibly stop torrents that have an error */
3107    tor = NULL;
3108    while(( tor = tr_torrentNext( mgr->session, tor )))
3109        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR ))
3110            tr_torrentStop( tor );
3111
3112    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3113    managerUnlock( mgr );
3114}
3115
3116/***
3117****
3118***/
3119
3120static int
3121compareAtomPtrsByAddress( const void * va, const void *vb )
3122{
3123    const struct peer_atom * a = * (const struct peer_atom**) va;
3124    const struct peer_atom * b = * (const struct peer_atom**) vb;
3125
3126    assert( tr_isAtom( a ) );
3127    assert( tr_isAtom( b ) );
3128
3129    return tr_compareAddresses( &a->addr, &b->addr );
3130}
3131
3132/* best come first, worst go last */
3133static int
3134compareAtomPtrsByShelfDate( const void * va, const void *vb )
3135{
3136    time_t atime;
3137    time_t btime;
3138    const struct peer_atom * a = * (const struct peer_atom**) va;
3139    const struct peer_atom * b = * (const struct peer_atom**) vb;
3140    const int data_time_cutoff_secs = 60 * 60;
3141    const time_t tr_now = tr_time( );
3142
3143    assert( tr_isAtom( a ) );
3144    assert( tr_isAtom( b ) );
3145
3146    /* primary key: the last piece data time *if* it was within the last hour */
3147    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3148    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3149    if( atime != btime )
3150        return atime > btime ? -1 : 1;
3151
3152    /* secondary key: shelf date. */
3153    if( a->shelf_date != b->shelf_date )
3154        return a->shelf_date > b->shelf_date ? -1 : 1;
3155
3156    return 0;
3157}
3158
3159static int
3160getMaxAtomCount( const tr_torrent * tor )
3161{
3162    /* FIXME: this curve should be smoother... */
3163    const int n = tor->maxConnectedPeers;
3164    if( n >= 200 ) return n * 1.5;
3165    if( n >= 100 ) return n * 2;
3166    if( n >=  50 ) return n * 3;
3167    if( n >=  20 ) return n * 5;
3168    return n * 10;
3169}
3170
3171static void
3172atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3173{
3174    tr_torrent * tor = NULL;
3175    tr_peerMgr * mgr = vmgr;
3176    managerLock( mgr );
3177
3178    while(( tor = tr_torrentNext( mgr->session, tor )))
3179    {
3180        int atomCount;
3181        Torrent * t = tor->torrentPeers;
3182        const int maxAtomCount = getMaxAtomCount( tor );
3183        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3184
3185        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3186        {
3187            int i;
3188            int keepCount = 0;
3189            int testCount = 0;
3190            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3191            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3192
3193            /* keep the ones that are in use */
3194            for( i=0; i<atomCount; ++i ) {
3195                struct peer_atom * atom = atoms[i];
3196                if( peerIsInUse( t, atom ) )
3197                    keep[keepCount++] = atom;
3198                else
3199                    test[testCount++] = atom;
3200            }
3201
3202            /* if there's room, keep the best of what's left */
3203            i = 0;
3204            if( keepCount < maxAtomCount ) {
3205                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3206                while( i<testCount && keepCount<maxAtomCount )
3207                    keep[keepCount++] = test[i++];
3208            }
3209
3210            /* free the culled atoms */
3211            while( i<testCount )
3212                tr_free( test[i++] );
3213
3214            /* rebuild Torrent.pool with what's left */
3215            tr_ptrArrayDestruct( &t->pool, NULL );
3216            t->pool = TR_PTR_ARRAY_INIT;
3217            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3218            for( i=0; i<keepCount; ++i )
3219                tr_ptrArrayAppend( &t->pool, keep[i] );
3220
3221            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3222
3223            /* cleanup */
3224            tr_free( test );
3225            tr_free( keep );
3226        }
3227    }
3228
3229    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3230    managerUnlock( mgr );
3231}
Note: See TracBrowser for help on using the repository browser.