source: trunk/libtransmission/peer-mgr.c @ 10017

Last change on this file since 10017 was 10017, checked in by charles, 12 years ago

(trunk libT) #2792 "Since 1.80 update downloads failing" -- a better fix than r10015...

  • Property svn:keywords set to Date Rev Author Id
File size: 92.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 10017 2010-01-25 05:19:54Z charles $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16
17#include <event.h>
18
19#include "transmission.h"
20#include "announcer.h"
21#include "bandwidth.h"
22#include "bencode.h"
23#include "blocklist.h"
24#include "clients.h"
25#include "completion.h"
26#include "crypto.h"
27#include "handshake.h"
28#include "inout.h" /* tr_ioTestPiece */
29#include "net.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "session.h"
35#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
36#include "torrent.h"
37#include "utils.h"
38#include "webseed.h"
39
40enum
41{
42    /* how frequently to cull old atoms */
43    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
44
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* how frequently to reallocate bandwidth */
49    BANDWIDTH_PERIOD_MSEC = 500,
50
51    /* how frequently to age out old piece request lists */
52    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
53
54    /* how frequently to decide which peers live and die */
55    RECONNECT_PERIOD_MSEC = 500,
56
57    /* when many peers are available, keep idle ones this long */
58    MIN_UPLOAD_IDLE_SECS = ( 60 ),
59
60    /* when few peers are available, keep idle ones this long */
61    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
62
63    /* max # of peers to ask fer per torrent per reconnect pulse */
64    MAX_RECONNECTIONS_PER_PULSE = 8,
65
66    /* max number of peers to ask for per second overall.
67    * this throttle is to avoid overloading the router */
68    MAX_CONNECTIONS_PER_SECOND = 16,
69
70    /* number of bad pieces a peer is allowed to send before we ban them */
71    MAX_BAD_PIECES_PER_PEER = 5,
72
73    /* amount of time to keep a list of request pieces lying around
74       before it's considered too old and needs to be rebuilt */
75    PIECE_LIST_SHELF_LIFE_SECS = 60,
76
77    /* use for bitwise operations w/peer_atom.myflags */
78    MYFLAG_BANNED = 1,
79
80    /* use for bitwise operations w/peer_atom.myflags */
81    /* unreachable for now... but not banned.
82     * if they try to connect to us it's okay */
83    MYFLAG_UNREACHABLE = 2,
84
85    /* the minimum we'll wait before attempting to reconnect to a peer */
86    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
87
88    /** how long we'll let requests we've made linger before we cancel them */
89    REQUEST_TTL_SECS = 45
90};
91
92
93/**
94***
95**/
96
97enum
98{
99    UPLOAD_ONLY_UKNOWN,
100    UPLOAD_ONLY_YES,
101    UPLOAD_ONLY_NO
102};
103
104/**
105 * Peer information that should be kept even before we've connected and
106 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
107 * which ones would make good candidates for connecting to, and to watch out
108 * for banned peers.
109 *
110 * @see tr_peer
111 * @see tr_peermsgs
112 */
113struct peer_atom
114{
115    tr_peer   * peer;        /* will be NULL if not connected */
116    uint8_t     from;
117    uint8_t     flags;       /* these match the added_f flags */
118    uint8_t     myflags;     /* flags that aren't defined in added_f */
119    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
120    tr_port     port;
121    uint16_t    numFails;
122    tr_address  addr;
123    time_t      time;        /* when the peer's connection status last changed */
124    time_t      piece_data_time;
125
126    /* similar to a TTL field, but less rigid --
127     * if the swarm is small, the atom will be kept past this date. */
128    time_t      shelf_date;
129};
130
131#ifdef NDEBUG
132#define tr_isAtom(a) (TRUE)
133#else
134static tr_bool
135tr_isAtom( const struct peer_atom * atom )
136{
137    return ( atom != NULL )
138        && ( atom->from < TR_PEER_FROM__MAX )
139        && ( tr_isAddress( &atom->addr ) );
140}
141#endif
142
143static const char*
144tr_atomAddrStr( const struct peer_atom * atom )
145{
146    return tr_peerIoAddrStr( &atom->addr, atom->port );
147}
148
149struct block_request
150{
151    tr_block_index_t block;
152    tr_peer * peer;
153    time_t sentAt;
154};
155
156struct weighted_piece
157{
158    tr_piece_index_t index;
159    int16_t salt;
160    int16_t requestCount;
161};
162
163/** @brief Opaque, per-torrent data structure for peer connection information */
164typedef struct tr_torrent_peers
165{
166    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
167    tr_ptrArray                pool; /* struct peer_atom */
168    tr_ptrArray                peers; /* tr_peer */
169    tr_ptrArray                webseeds; /* tr_webseed */
170
171    tr_torrent               * tor;
172    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
173    struct tr_peerMgr        * manager;
174
175    tr_bool                    isRunning;
176    tr_bool                    needsCompletenessCheck;
177
178    struct block_request     * requests;
179    int                        requestsSort;
180    int                        requestCount;
181    int                        requestAlloc;
182
183    struct weighted_piece    * pieces;
184    int                        pieceCount;
185}
186Torrent;
187
188struct tr_peerMgr
189{
190    tr_session    * session;
191    tr_ptrArray     incomingHandshakes; /* tr_handshake */
192    struct event  * bandwidthTimer;
193    struct event  * rechokeTimer;
194    struct event  * reconnectTimer;
195    struct event  * refillUpkeepTimer;
196    struct event  * atomTimer;
197};
198
199#define tordbg( t, ... ) \
200    do { \
201        if( tr_deepLoggingIsActive( ) ) \
202            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
203    } while( 0 )
204
205#define dbgmsg( ... ) \
206    do { \
207        if( tr_deepLoggingIsActive( ) ) \
208            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
209    } while( 0 )
210
211/**
212***
213**/
214
215static inline void
216managerLock( const struct tr_peerMgr * manager )
217{
218    tr_sessionLock( manager->session );
219}
220
221static inline void
222managerUnlock( const struct tr_peerMgr * manager )
223{
224    tr_sessionUnlock( manager->session );
225}
226
227static inline void
228torrentLock( Torrent * torrent )
229{
230    managerLock( torrent->manager );
231}
232
233static inline void
234torrentUnlock( Torrent * torrent )
235{
236    managerUnlock( torrent->manager );
237}
238
239static inline int
240torrentIsLocked( const Torrent * t )
241{
242    return tr_sessionIsLocked( t->manager->session );
243}
244
245/**
246***
247**/
248
249static int
250handshakeCompareToAddr( const void * va, const void * vb )
251{
252    const tr_handshake * a = va;
253
254    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
255}
256
257static int
258handshakeCompare( const void * a, const void * b )
259{
260    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
261}
262
263static tr_handshake*
264getExistingHandshake( tr_ptrArray      * handshakes,
265                      const tr_address * addr )
266{
267    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
268}
269
270static int
271comparePeerAtomToAddress( const void * va, const void * vb )
272{
273    const struct peer_atom * a = va;
274
275    return tr_compareAddresses( &a->addr, vb );
276}
277
278static int
279compareAtomsByAddress( const void * va, const void * vb )
280{
281    const struct peer_atom * b = vb;
282
283    assert( tr_isAtom( b ) );
284
285    return comparePeerAtomToAddress( va, &b->addr );
286}
287
288/**
289***
290**/
291
292const tr_address *
293tr_peerAddress( const tr_peer * peer )
294{
295    return &peer->atom->addr;
296}
297
298static Torrent*
299getExistingTorrent( tr_peerMgr *    manager,
300                    const uint8_t * hash )
301{
302    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
303
304    return tor == NULL ? NULL : tor->torrentPeers;
305}
306
307static int
308peerCompare( const void * a, const void * b )
309{
310    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
311}
312
313static struct peer_atom*
314getExistingAtom( const Torrent    * t,
315                 const tr_address * addr )
316{
317    Torrent * tt = (Torrent*)t;
318    assert( torrentIsLocked( t ) );
319    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
320}
321
322static tr_bool
323peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
324{
325    Torrent * t = (Torrent*) ct;
326
327    assert( torrentIsLocked ( t ) );
328
329    return ( atom->peer != NULL )
330        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
331        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
332}
333
334static tr_peer*
335peerConstructor( struct peer_atom * atom )
336{
337    tr_peer * peer = tr_new0( tr_peer, 1 );
338    tr_bitsetConstructor( &peer->have, 0 );
339    peer->atom = atom;
340    atom->peer = peer;
341    return peer;
342}
343
344static tr_peer*
345getPeer( Torrent * torrent, struct peer_atom * atom )
346{
347    tr_peer * peer;
348
349    assert( torrentIsLocked( torrent ) );
350
351    peer = atom->peer;
352
353    if( peer == NULL )
354    {
355        peer = peerConstructor( atom );
356        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
357    }
358
359    return peer;
360}
361
362static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
363
364static void
365peerDestructor( Torrent * t, tr_peer * peer )
366{
367    assert( peer != NULL );
368
369    peerDeclinedAllRequests( t, peer );
370
371    if( peer->msgs != NULL )
372    {
373        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
374        tr_peerMsgsFree( peer->msgs );
375    }
376
377    tr_peerIoClear( peer->io );
378    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
379
380    tr_bitsetDestructor( &peer->have );
381    tr_bitfieldFree( peer->blame );
382    tr_free( peer->client );
383    peer->atom->peer = NULL;
384
385    tr_free( peer );
386}
387
388static void
389removePeer( Torrent * t, tr_peer * peer )
390{
391    tr_peer * removed;
392    struct peer_atom * atom = peer->atom;
393
394    assert( torrentIsLocked( t ) );
395    assert( atom );
396
397    atom->time = tr_time( );
398
399    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
400    assert( removed == peer );
401    peerDestructor( t, removed );
402}
403
404static void
405removeAllPeers( Torrent * t )
406{
407    while( !tr_ptrArrayEmpty( &t->peers ) )
408        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
409}
410
411static void
412torrentDestructor( void * vt )
413{
414    Torrent * t = vt;
415
416    assert( t );
417    assert( !t->isRunning );
418    assert( torrentIsLocked( t ) );
419    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
420    assert( tr_ptrArrayEmpty( &t->peers ) );
421
422    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
423    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
424    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
425    tr_ptrArrayDestruct( &t->peers, NULL );
426
427    tr_free( t->requests );
428    tr_free( t->pieces );
429    tr_free( t );
430}
431
432static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
433
434static Torrent*
435torrentConstructor( tr_peerMgr * manager,
436                    tr_torrent * tor )
437{
438    int       i;
439    Torrent * t;
440
441    t = tr_new0( Torrent, 1 );
442    t->manager = manager;
443    t->tor = tor;
444    t->pool = TR_PTR_ARRAY_INIT;
445    t->peers = TR_PTR_ARRAY_INIT;
446    t->webseeds = TR_PTR_ARRAY_INIT;
447    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
448    t->requests = 0;
449
450    for( i = 0; i < tor->info.webseedCount; ++i )
451    {
452        tr_webseed * w =
453            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
454        tr_ptrArrayAppend( &t->webseeds, w );
455    }
456
457    return t;
458}
459
460tr_peerMgr*
461tr_peerMgrNew( tr_session * session )
462{
463    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
464    m->session = session;
465    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
466    return m;
467}
468
469static void
470deleteTimer( struct event ** t )
471{
472    if( *t != NULL )
473    {
474        evtimer_del( *t );
475        tr_free( *t );
476        *t = NULL;
477    }
478}
479
480static void
481deleteTimers( struct tr_peerMgr * m )
482{
483    deleteTimer( &m->atomTimer );
484    deleteTimer( &m->bandwidthTimer );
485    deleteTimer( &m->rechokeTimer );
486    deleteTimer( &m->reconnectTimer );
487    deleteTimer( &m->refillUpkeepTimer );
488}
489
490void
491tr_peerMgrFree( tr_peerMgr * manager )
492{
493    managerLock( manager );
494
495    deleteTimers( manager );
496
497    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
498     * the item from manager->handshakes, so this is a little roundabout... */
499    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
500        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
501
502    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
503
504    managerUnlock( manager );
505    tr_free( manager );
506}
507
508static int
509clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
510{
511    if( !tr_torrentHasMetadata( tor ) )
512        return TRUE;
513
514    return peer->clientIsInterested && !peer->clientIsChoked;
515}
516
517static int
518clientIsUploadingTo( const tr_peer * peer )
519{
520    return peer->peerIsInterested && !peer->peerIsChoked;
521}
522
523/***
524****
525***/
526
527tr_bool
528tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
529                      const tr_address  * addr )
530{
531    tr_bool isSeed = FALSE;
532    const Torrent * t = tor->torrentPeers;
533    const struct peer_atom * atom = getExistingAtom( t, addr );
534
535    if( atom )
536        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
537
538    return isSeed;
539}
540
541/**
542***  REQUESTS
543***
544*** There are two data structures associated with managing block requests:
545***
546*** 1. Torrent::requests, an array of "struct block_request" which keeps
547***    track of which blocks have been requested, and when, and by which peers.
548***    This is list is used for (a) cancelling requests that have been pending
549***    for too long and (b) avoiding duplicate requests before endgame.
550***
551*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
552***    pieces that we want to request.  It's used to decide which pieces to
553***    return next when tr_peerMgrGetBlockRequests() is called.
554**/
555
556/**
557*** struct block_request
558**/
559
560enum
561{
562    REQ_UNSORTED,
563    REQ_SORTED_BY_BLOCK,
564    REQ_SORTED_BY_TIME
565};
566
567static int
568compareReqByBlock( const void * va, const void * vb )
569{
570    const struct block_request * a = va;
571    const struct block_request * b = vb;
572
573    /* primary key: block */
574    if( a->block < b->block ) return -1;
575    if( a->block > b->block ) return 1;
576
577    /* secondary key: peer */
578    if( a->peer < b->peer ) return -1;
579    if( a->peer > b->peer ) return 1;
580
581    return 0;
582}
583
584static int
585compareReqByTime( const void * va, const void * vb )
586{
587    const struct block_request * a = va;
588    const struct block_request * b = vb;
589
590    /* primary key: time */
591    if( a->sentAt < b->sentAt ) return -1;
592    if( a->sentAt > b->sentAt ) return 1;
593
594    /* secondary key: peer */
595    if( a->peer < b->peer ) return -1;
596    if( a->peer > b->peer ) return 1;
597
598    return 0;
599}
600
601static void
602requestListSort( Torrent * t, int mode )
603{
604    assert( mode==REQ_SORTED_BY_BLOCK || mode==REQ_SORTED_BY_TIME );
605
606    if( t->requestsSort != mode )
607    {
608        int(*compar)(const void *, const void *);
609
610        t->requestsSort = mode;
611
612        switch( mode ) {
613            case REQ_SORTED_BY_BLOCK: compar = compareReqByBlock; break;
614            case REQ_SORTED_BY_TIME: compar = compareReqByTime; break;
615            default: assert( 0 && "unhandled" );
616        }
617
618        qsort( t->requests, t->requestCount,
619               sizeof( struct block_request ), compar );
620    }
621}
622
623static void
624requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
625{
626    struct block_request key;
627
628    /* ensure enough room is available... */
629    if( t->requestCount + 1 >= t->requestAlloc )
630    {
631        const int CHUNK_SIZE = 128;
632        t->requestAlloc += CHUNK_SIZE;
633        t->requests = tr_renew( struct block_request,
634                                t->requests, t->requestAlloc );
635    }
636
637    /* populate the record we're inserting */
638    key.block = block;
639    key.peer = peer;
640    key.sentAt = tr_time( );
641
642    /* insert the request to our array... */
643    switch( t->requestsSort )
644    {
645        case REQ_UNSORTED:
646        case REQ_SORTED_BY_TIME:
647            t->requests[t->requestCount++] = key;
648            break;
649
650        case REQ_SORTED_BY_BLOCK: {
651            tr_bool exact;
652            const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
653                                           sizeof( struct block_request ),
654                                           compareReqByBlock, &exact );
655            assert( !exact );
656            memmove( t->requests + pos + 1,
657                     t->requests + pos,
658                     sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
659            t->requests[pos] = key;
660            break;
661        }
662    }
663
664    if( peer != NULL )
665    {
666        ++peer->pendingReqsToPeer;
667        assert( peer->pendingReqsToPeer >= 0 );
668    }
669
670    /*fprintf( stderr, "added request of block %lu from peer %s... "
671                       "there are now %d block\n",
672                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
673}
674
675static struct block_request *
676requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
677{
678    struct block_request key;
679    key.block = block;
680    key.peer = (tr_peer*) peer;
681
682    requestListSort( t, REQ_SORTED_BY_BLOCK );
683
684    return bsearch( &key, t->requests, t->requestCount,
685                    sizeof( struct block_request ),
686                    compareReqByBlock );
687}
688
689/* how many peers are we currently requesting this block from... */
690static int
691countBlockRequests( Torrent * t, tr_block_index_t block )
692{
693    tr_bool exact;
694    int i, n, pos;
695    struct block_request key;
696
697    requestListSort( t, REQ_SORTED_BY_BLOCK );
698    key.block = block;
699    key.peer = NULL;
700    pos = tr_lowerBound( &key, t->requests, t->requestCount,
701                         sizeof( struct block_request ),
702                         compareReqByBlock, &exact );
703
704    assert( !exact ); /* shouldn't have a request with .peer == NULL */
705
706    n = 0;
707    for( i=pos; i<t->requestCount; ++i ) {
708        if( t->requests[i].block == block )
709            ++n;
710        else
711            break;
712    }
713
714    return n;
715}
716
717static void
718decrementPendingReqCount( const struct block_request * b )
719{
720    if( b->peer != NULL )
721        if( b->peer->pendingReqsToPeer > 0 )
722            --b->peer->pendingReqsToPeer;
723}
724
725static void
726requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
727{
728    const struct block_request * b = requestListLookup( t, block, peer );
729    if( b != NULL )
730    {
731        const int pos = b - t->requests;
732        assert( pos < t->requestCount );
733
734        decrementPendingReqCount( b );
735
736        memmove( t->requests + pos,
737                 t->requests + pos + 1,
738                 sizeof( struct block_request ) * ( --t->requestCount - pos ) );
739        /*fprintf( stderr, "removing request of block %lu from peer %s... "
740                           "there are now %d block requests left\n",
741                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
742    }
743}
744
745/**
746*** struct weighted_piece
747**/
748
749enum
750{
751    PIECES_UNSORTED,
752    PIECES_SORTED_BY_INDEX,
753    PIECES_SORTED_BY_WEIGHT
754};
755
756const tr_torrent * weightTorrent;
757
758/* we try to create a "weight" s.t. high-priority pieces come before others,
759 * and that partially-complete pieces come before empty ones. */
760static int
761comparePieceByWeight( const void * va, const void * vb )
762{
763    const struct weighted_piece * a = va;
764    const struct weighted_piece * b = vb;
765    int ia, ib, missing, pending;
766    const tr_torrent * tor = weightTorrent;
767
768    /* primary key: weight */
769    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
770    pending = a->requestCount;
771    ia = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
772    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
773    pending = b->requestCount;
774    ib = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
775    if( ia < ib ) return -1;
776    if( ia > ib ) return 1;
777
778    /* secondary key: higher priorities go first */
779    ia = tor->info.pieces[a->index].priority;
780    ib = tor->info.pieces[b->index].priority;
781    if( ia > ib ) return -1;
782    if( ia < ib ) return 1;
783
784    /* tertiary key: random */
785    if( a->salt < b->salt ) return -1;
786    if( a->salt > b->salt ) return 1;
787
788    /* okay, they're equal */
789    return 0;
790}
791
792static int
793comparePieceByIndex( const void * va, const void * vb )
794{
795    const struct weighted_piece * a = va;
796    const struct weighted_piece * b = vb;
797    if( a->index < b->index ) return -1;
798    if( a->index > b->index ) return 1;
799    return 0;
800}
801
802static void
803pieceListSort( Torrent * t, int mode )
804{
805    int(*compar)(const void *, const void *);
806
807    assert( mode==PIECES_SORTED_BY_INDEX
808         || mode==PIECES_SORTED_BY_WEIGHT );
809
810    switch( mode ) {
811        case PIECES_SORTED_BY_WEIGHT: compar = comparePieceByWeight; break;
812        case PIECES_SORTED_BY_INDEX: compar = comparePieceByIndex; break;
813        default: assert( 0 && "unhandled" );  break;
814    }
815
816    weightTorrent = t->tor;
817    qsort( t->pieces, t->pieceCount,
818           sizeof( struct weighted_piece ), compar );
819}
820
821static tr_bool
822isInEndgame( Torrent * t )
823{
824    tr_bool endgame = TRUE;
825
826    if( ( t->pieces != NULL ) && ( t->pieceCount > 0 ) )
827    {
828        const tr_completion * cp = &t->tor->completion;
829        const struct weighted_piece * p = t->pieces;
830        const int pending = p->requestCount;
831        const int missing = tr_cpMissingBlocksInPiece( cp, p->index );
832        endgame = pending >= missing;
833    }
834
835    /*if( endgame ) fprintf( stderr, "ENDGAME reached\n" );*/
836    return endgame;
837}
838
839static struct weighted_piece *
840pieceListLookup( Torrent * t, tr_piece_index_t index )
841{
842    const struct weighted_piece * limit = t->pieces;
843    struct weighted_piece * piece = t->pieces + t->pieceCount - 1;
844
845    if ( t->pieceCount == 0 ) return NULL;
846
847    /* reverse linear search */
848    for( ;; )
849    { 
850        if( piece < limit ) return NULL;
851        if( index == piece->index ) return piece; else --piece;
852    }
853}
854
855static void
856pieceListRebuild( Torrent * t )
857{
858    if( !tr_torrentIsSeed( t->tor ) )
859    {
860        tr_piece_index_t i;
861        tr_piece_index_t * pool;
862        tr_piece_index_t poolCount = 0;
863        const tr_torrent * tor = t->tor;
864        const tr_info * inf = tr_torrentInfo( tor );
865        struct weighted_piece * pieces;
866        int pieceCount;
867
868        /* build the new list */
869        pool = tr_new( tr_piece_index_t, inf->pieceCount );
870        for( i=0; i<inf->pieceCount; ++i )
871            if( !inf->pieces[i].dnd )
872                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
873                    pool[poolCount++] = i;
874        pieceCount = poolCount;
875        pieces = tr_new0( struct weighted_piece, pieceCount );
876        for( i=0; i<poolCount; ++i ) {
877            struct weighted_piece * piece = pieces + i;
878            piece->index = pool[i];
879            piece->requestCount = 0;
880            piece->salt = tr_cryptoWeakRandInt( 4096 );
881        }
882
883        /* if we already had a list of pieces, merge it into
884         * the new list so we don't lose its requestCounts */
885        if( t->pieces != NULL )
886        {
887            struct weighted_piece * o = t->pieces;
888            struct weighted_piece * oend = o + t->pieceCount;
889            struct weighted_piece * n = pieces;
890            struct weighted_piece * nend = n + pieceCount;
891
892            pieceListSort( t, PIECES_SORTED_BY_INDEX );
893
894            while( o!=oend && n!=nend ) {
895                if( o->index < n->index )
896                    ++o;
897                else if( o->index > n->index )
898                    ++n;
899                else
900                    *n++ = *o++;
901            }
902
903            tr_free( t->pieces );
904        }
905
906        t->pieces = pieces;
907        t->pieceCount = pieceCount;
908
909        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
910
911        /* cleanup */
912        tr_free( pool );
913    }
914}
915
916static void
917pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
918{
919    struct weighted_piece * p = pieceListLookup( t, piece );
920
921    if( p != NULL )
922    {
923        const int pos = p - t->pieces;
924
925        memmove( t->pieces + pos,
926                 t->pieces + pos + 1,
927                 sizeof( struct weighted_piece ) * ( --t->pieceCount - pos ) );
928
929        if( t->pieceCount == 0 )
930        {
931            tr_free( t->pieces );
932            t->pieces = NULL;
933        }
934    }
935}
936
937static void
938pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
939{
940    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
941    const struct weighted_piece * p = pieceListLookup( t, index );
942
943    if( p != NULL )
944    {
945        const int pos = p - t->pieces;
946        struct weighted_piece piece = t->pieces[pos];
947        int newpos;
948        tr_bool exact;
949
950        /* remove request */
951        if( piece.requestCount > 0 )
952            --piece.requestCount;
953
954        /* List is nearly sorted (by weight) : insert piece into the right place. */
955
956        weightTorrent = t->tor;
957        newpos = tr_lowerBound( &piece, t->pieces, t->pieceCount,
958                                sizeof( struct weighted_piece ),
959                                comparePieceByWeight, &exact );
960
961        if( newpos == pos || newpos == pos + 1 )
962        {
963            /* it's VERY likely that piece keeps its position */
964            t->pieces[pos].requestCount = piece.requestCount;
965        }
966        else
967        {
968            /* piece is removed temporally to make insertion easier */
969            memmove( &t->pieces[pos],
970                     &t->pieces[pos + 1],
971                     sizeof( struct weighted_piece ) * ( --t->pieceCount - pos ) );
972
973            if( newpos > pos ) --newpos;
974
975            memmove( &t->pieces[newpos + 1],
976                     &t->pieces[newpos],
977                     sizeof( struct weighted_piece ) * ( t->pieceCount++ - newpos ) );
978
979            t->pieces[newpos] = piece;
980        }
981    }
982}
983
984/**
985***
986**/
987
988void
989tr_peerMgrRebuildRequests( tr_torrent * tor )
990{
991    assert( tr_isTorrent( tor ) );
992
993    pieceListRebuild( tor->torrentPeers );
994}
995
996void
997tr_peerMgrGetNextRequests( tr_torrent           * tor,
998                           tr_peer              * peer,
999                           int                    numwant,
1000                           tr_block_index_t     * setme,
1001                           int                  * numgot )
1002{
1003    int i;
1004    int got;
1005    Torrent * t;
1006    tr_bool endgame;
1007    struct weighted_piece * pieces;
1008    const tr_bitset * have = &peer->have;
1009
1010    /* sanity clause */
1011    assert( tr_isTorrent( tor ) );
1012    assert( numwant > 0 );
1013
1014    /* walk through the pieces and find blocks that should be requested */
1015    got = 0;
1016    t = tor->torrentPeers;
1017
1018    /* prep the pieces list */
1019    if( t->pieces == NULL )
1020        pieceListRebuild( t );
1021
1022    endgame = isInEndgame( t );
1023
1024    pieces = t->pieces;
1025    for( i=0; i<t->pieceCount && got<numwant; ++i )
1026    {
1027        struct weighted_piece * p = pieces + i;
1028        const int missing = tr_cpMissingBlocksInPiece( &tor->completion, p->index );
1029        const int maxDuplicatesPerBlock = endgame ? 3 : 1;
1030
1031        if( p->requestCount > ( missing * maxDuplicatesPerBlock ) )
1032            continue;
1033
1034        /* if the peer has this piece that we want... */
1035        if( tr_bitsetHasFast( have, p->index ) )
1036        {
1037            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
1038            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
1039
1040            for( ; b!=e && got<numwant; ++b )
1041            {
1042                /* don't request blocks we've already got */
1043                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
1044                    continue;
1045
1046                /* don't send the same request to the same peer twice */
1047                if( tr_peerMgrDidPeerRequest( tor, peer, b ) )
1048                    continue;
1049
1050                /* don't send the same request to any peer too many times */
1051                if( countBlockRequests( t, b ) >= maxDuplicatesPerBlock )
1052                    continue;
1053
1054                /* update the caller's table */
1055                setme[got++] = b;
1056
1057                /* update our own tables */
1058                requestListAdd( t, b, peer );
1059                ++p->requestCount;
1060            }
1061        }
1062    }
1063
1064    /* In most cases we've just changed the weights of a small number of pieces.
1065     * So rather than qsort()ing the entire array, it's faster to apply an
1066     * adaptive insertion sort algorithm. */
1067    if( got > 0 )
1068    {
1069        /* not enough requests || last piece modified */
1070        if ( i == t->pieceCount ) --i;
1071
1072        weightTorrent = t->tor;
1073        while( --i >= 0 )
1074        {
1075            tr_bool exact;
1076
1077            /* relative position! */
1078            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1079                                              t->pieceCount - (i + 1),
1080                                              sizeof( struct weighted_piece ),
1081                                              comparePieceByWeight, &exact );
1082            if( newpos > 0 )
1083            {
1084                const struct weighted_piece piece = t->pieces[i];
1085                memmove( &t->pieces[i],
1086                         &t->pieces[i + 1],
1087                         sizeof( struct weighted_piece ) * ( newpos ) );
1088                t->pieces[i + newpos] = piece;
1089            }
1090        }
1091    }
1092
1093    *numgot = got;
1094}
1095
1096tr_bool
1097tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1098                          const tr_peer     * peer,
1099                          tr_block_index_t    block )
1100{
1101    const Torrent * t = tor->torrentPeers;
1102    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1103}
1104
1105/* cancel requests that are too old */
1106static void
1107refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1108{
1109    time_t now;
1110    time_t too_old;
1111    tr_torrent * tor;
1112    tr_peerMgr * mgr = vmgr;
1113    managerLock( mgr );
1114
1115    now = tr_time( );
1116    too_old = now - REQUEST_TTL_SECS;
1117
1118    tor = NULL;
1119    while(( tor = tr_torrentNext( mgr->session, tor )))
1120    {
1121        Torrent * t = tor->torrentPeers;
1122        const int n = t->requestCount;
1123        if( n > 0 )
1124        {
1125            int keepCount = 0;
1126            int cancelCount = 0;
1127            struct block_request * cancel = tr_new( struct block_request, n );
1128            const struct block_request * it;
1129            const struct block_request * end;
1130
1131            for( it=t->requests, end=it+n; it!=end; ++it )
1132            {
1133                if( it->sentAt <= too_old )
1134                    cancel[cancelCount++] = *it;
1135                else
1136                {
1137                    if( it != &t->requests[keepCount] )
1138                        t->requests[keepCount] = *it;
1139                    keepCount++;
1140                }
1141            }
1142
1143            /* prune out the ones we aren't keeping */
1144            t->requestCount = keepCount;
1145
1146            /* send cancel messages for all the "cancel" ones */
1147            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1148                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1149                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1150                    decrementPendingReqCount( it );
1151                }
1152            }
1153
1154            /* decrement the pending request counts for the timed-out blocks */
1155            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1156                pieceListRemoveRequest( t, it->block );
1157
1158            /* cleanup loop */
1159            tr_free( cancel );
1160        }
1161    }
1162
1163    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1164    managerUnlock( mgr );
1165}
1166
1167static void
1168addStrike( Torrent * t, tr_peer * peer )
1169{
1170    tordbg( t, "increasing peer %s strike count to %d",
1171            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1172
1173    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1174    {
1175        struct peer_atom * atom = peer->atom;
1176        atom->myflags |= MYFLAG_BANNED;
1177        peer->doPurge = 1;
1178        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1179    }
1180}
1181
1182static void
1183gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1184{
1185    tr_torrent *   tor = t->tor;
1186    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1187
1188    tor->corruptCur += byteCount;
1189    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1190}
1191
1192static void
1193peerSuggestedPiece( Torrent            * t UNUSED,
1194                    tr_peer            * peer UNUSED,
1195                    tr_piece_index_t     pieceIndex UNUSED,
1196                    int                  isFastAllowed UNUSED )
1197{
1198#if 0
1199    assert( t );
1200    assert( peer );
1201    assert( peer->msgs );
1202
1203    /* is this a valid piece? */
1204    if(  pieceIndex >= t->tor->info.pieceCount )
1205        return;
1206
1207    /* don't ask for it if we've already got it */
1208    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1209        return;
1210
1211    /* don't ask for it if they don't have it */
1212    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1213        return;
1214
1215    /* don't ask for it if we're choked and it's not fast */
1216    if( !isFastAllowed && peer->clientIsChoked )
1217        return;
1218
1219    /* request the blocks that we don't have in this piece */
1220    {
1221        tr_block_index_t block;
1222        const tr_torrent * tor = t->tor;
1223        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
1224        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
1225
1226        for( block=start; block<end; ++block )
1227        {
1228            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1229            {
1230                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1231                const uint32_t length = tr_torBlockCountBytes( tor, block );
1232                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1233                incrementPieceRequests( t, pieceIndex );
1234            }
1235        }
1236    }
1237#endif
1238}
1239
1240static void
1241decrementDownloadedCount( tr_torrent * tor, uint32_t byteCount )
1242{
1243    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1244}
1245
1246static void
1247clientGotUnwantedBlock( tr_torrent * tor, tr_block_index_t block )
1248{
1249    decrementDownloadedCount( tor, tr_torBlockCountBytes( tor, block ) );
1250}
1251
1252static void
1253removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1254{
1255    requestListRemove( t, block, peer );
1256    pieceListRemoveRequest( t, block );
1257}
1258
1259/* peer choked us, or maybe it disconnected.
1260   either way we need to remove all its requests */
1261static void
1262peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1263{
1264    int i, n;
1265    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1266
1267    for( i=n=0; i<t->requestCount; ++i )
1268        if( peer == t->requests[i].peer )
1269            blocks[n++] = t->requests[i].block;
1270
1271    for( i=0; i<n; ++i )
1272        removeRequestFromTables( t, blocks[i], peer );
1273
1274    tr_free( blocks );
1275}
1276
1277static void
1278peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1279{
1280    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1281    Torrent * t = vt;
1282    const tr_peer_event * e = vevent;
1283
1284    torrentLock( t );
1285
1286    switch( e->eventType )
1287    {
1288        case TR_PEER_UPLOAD_ONLY:
1289            /* update our atom */
1290            if( peer ) {
1291                if( e->uploadOnly ) {
1292                    peer->atom->uploadOnly = UPLOAD_ONLY_YES;
1293                    peer->atom->flags |= ADDED_F_SEED_FLAG;
1294                } else {
1295                    peer->atom->uploadOnly = UPLOAD_ONLY_NO;
1296                    peer->atom->flags &= ~ADDED_F_SEED_FLAG;
1297                }
1298            }
1299            break;
1300
1301        case TR_PEER_PEER_GOT_DATA:
1302        {
1303            const time_t now = tr_time( );
1304            tr_torrent * tor = t->tor;
1305
1306            tr_torrentSetActivityDate( tor, now );
1307
1308            if( e->wasPieceData ) {
1309                tor->uploadedCur += e->length;
1310                tr_torrentSetDirty( tor );
1311            }
1312
1313            /* update the stats */
1314            if( e->wasPieceData )
1315                tr_statsAddUploaded( tor->session, e->length );
1316
1317            /* update our atom */
1318            if( peer && e->wasPieceData )
1319                peer->atom->piece_data_time = now;
1320
1321            tor->needsSeedRatioCheck = TRUE;
1322
1323            break;
1324        }
1325
1326        case TR_PEER_CLIENT_GOT_REJ:
1327            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ), peer );
1328            break;
1329
1330        case TR_PEER_CLIENT_GOT_CHOKE:
1331            peerDeclinedAllRequests( t, peer );
1332            break;
1333
1334        case TR_PEER_CLIENT_GOT_PORT:
1335            if( peer )
1336                peer->atom->port = e->port;
1337            break;
1338
1339        case TR_PEER_CLIENT_GOT_SUGGEST:
1340            if( peer )
1341                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1342            break;
1343
1344        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1345            if( peer )
1346                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1347            break;
1348
1349        case TR_PEER_CLIENT_GOT_DATA:
1350        {
1351            const time_t now = tr_time( );
1352            tr_torrent * tor = t->tor;
1353
1354            tr_torrentSetActivityDate( tor, now );
1355
1356            /* only add this to downloadedCur if we got it from a peer --
1357             * webseeds shouldn't count against our ratio.  As one tracker
1358             * admin put it, "Those pieces are downloaded directly from the
1359             * content distributor, not the peers, it is the tracker's job
1360             * to manage the swarms, not the web server and does not fit
1361             * into the jurisdiction of the tracker." */
1362            if( peer && e->wasPieceData ) {
1363                tor->downloadedCur += e->length;
1364                tr_torrentSetDirty( tor );
1365            }
1366
1367            /* update the stats */
1368            if( e->wasPieceData )
1369                tr_statsAddDownloaded( tor->session, e->length );
1370
1371            /* update our atom */
1372            if( peer && e->wasPieceData )
1373                peer->atom->piece_data_time = now;
1374
1375            break;
1376        }
1377
1378        case TR_PEER_PEER_PROGRESS:
1379        {
1380            if( peer )
1381            {
1382                struct peer_atom * atom = peer->atom;
1383                if( e->progress >= 1.0 ) {
1384                    tordbg( t, "marking peer %s as a seed",
1385                            tr_atomAddrStr( atom ) );
1386                    atom->flags |= ADDED_F_SEED_FLAG;
1387                }
1388            }
1389            break;
1390        }
1391
1392        case TR_PEER_CLIENT_GOT_BLOCK:
1393        {
1394            tr_torrent * tor = t->tor;
1395            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1396
1397            requestListRemove( t, block, peer );
1398
1399            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1400            {
1401                tordbg( t, "we have this block already..." );
1402                clientGotUnwantedBlock( tor, block );
1403                pieceListRemoveRequest( t, block );
1404            }
1405            else
1406            {
1407                tr_cpBlockAdd( &tor->completion, block );
1408                pieceListRemoveRequest( t, block );
1409                tr_torrentSetDirty( tor );
1410
1411                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1412                {
1413                    const tr_piece_index_t p = e->pieceIndex;
1414                    const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1415
1416                    if( !ok )
1417                    {
1418                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1419                                   (unsigned long)p );
1420                    }
1421
1422                    tr_torrentSetHasPiece( tor, p, ok );
1423                    tr_torrentSetPieceChecked( tor, p, TRUE );
1424                    tr_peerMgrSetBlame( tor, p, ok );
1425
1426                    if( !ok )
1427                    {
1428                        gotBadPiece( t, p );
1429                    }
1430                    else
1431                    {
1432                        int i;
1433                        int peerCount;
1434                        tr_peer ** peers;
1435                        tr_file_index_t fileIndex;
1436
1437                        peerCount = tr_ptrArraySize( &t->peers );
1438                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1439                        for( i=0; i<peerCount; ++i )
1440                            tr_peerMsgsHave( peers[i]->msgs, p );
1441
1442                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1443                            const tr_file * file = &tor->info.files[fileIndex];
1444                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) )
1445                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1446                                    tr_torrentFileCompleted( tor, fileIndex );
1447                        }
1448
1449                        pieceListRemovePiece( t, p );
1450                    }
1451                }
1452
1453                t->needsCompletenessCheck = TRUE;
1454            }
1455            break;
1456        }
1457
1458        case TR_PEER_ERROR:
1459            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1460            {
1461                /* some protocol error from the peer */
1462                peer->doPurge = 1;
1463                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1464                        tr_atomAddrStr( peer->atom ) );
1465            }
1466            else
1467            {
1468                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1469            }
1470            break;
1471
1472        default:
1473            assert( 0 );
1474    }
1475
1476    torrentUnlock( t );
1477}
1478
1479static int
1480getDefaultShelfLife( uint8_t from )
1481{
1482    /* in general, peers obtained from firsthand contact
1483     * are better than those from secondhand, etc etc */
1484    switch( from )
1485    {
1486        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1487        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1488        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1489        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1490        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1491        case TR_PEER_FROM_RESUME   : return 60 * 60;
1492        default                    : return 60 * 60;
1493    }
1494}
1495
1496
1497static void
1498ensureAtomExists( Torrent           * t,
1499                  const tr_address  * addr,
1500                  const tr_port       port,
1501                  const uint8_t       flags,
1502                  const uint8_t       from )
1503{
1504    assert( tr_isAddress( addr ) );
1505    assert( from < TR_PEER_FROM__MAX );
1506
1507    if( getExistingAtom( t, addr ) == NULL )
1508    {
1509        struct peer_atom * a;
1510        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1511
1512        a = tr_new0( struct peer_atom, 1 );
1513        a->addr = *addr;
1514        a->port = port;
1515        a->flags = flags;
1516        a->from = from;
1517        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1518        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1519
1520        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1521    }
1522}
1523
1524static int
1525getMaxPeerCount( const tr_torrent * tor )
1526{
1527    return tor->maxConnectedPeers;
1528}
1529
1530static int
1531getPeerCount( const Torrent * t )
1532{
1533    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1534}
1535
1536/* FIXME: this is kind of a mess. */
1537static tr_bool
1538myHandshakeDoneCB( tr_handshake  * handshake,
1539                   tr_peerIo     * io,
1540                   tr_bool         isConnected,
1541                   const uint8_t * peer_id,
1542                   void          * vmanager )
1543{
1544    tr_bool            ok = isConnected;
1545    tr_bool            success = FALSE;
1546    tr_port            port;
1547    const tr_address * addr;
1548    tr_peerMgr       * manager = vmanager;
1549    Torrent          * t;
1550    tr_handshake     * ours;
1551
1552    assert( io );
1553    assert( tr_isBool( ok ) );
1554
1555    t = tr_peerIoHasTorrentHash( io )
1556        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1557        : NULL;
1558
1559    if( tr_peerIoIsIncoming ( io ) )
1560        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1561                                        handshake, handshakeCompare );
1562    else if( t )
1563        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1564                                        handshake, handshakeCompare );
1565    else
1566        ours = handshake;
1567
1568    assert( ours );
1569    assert( ours == handshake );
1570
1571    if( t )
1572        torrentLock( t );
1573
1574    addr = tr_peerIoGetAddress( io, &port );
1575
1576    if( !ok || !t || !t->isRunning )
1577    {
1578        if( t )
1579        {
1580            struct peer_atom * atom = getExistingAtom( t, addr );
1581            if( atom )
1582                ++atom->numFails;
1583        }
1584    }
1585    else /* looking good */
1586    {
1587        struct peer_atom * atom;
1588
1589        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1590        atom = getExistingAtom( t, addr );
1591        atom->time = tr_time( );
1592        atom->piece_data_time = 0;
1593
1594        if( atom->myflags & MYFLAG_BANNED )
1595        {
1596            tordbg( t, "banned peer %s tried to reconnect",
1597                    tr_atomAddrStr( atom ) );
1598        }
1599        else if( tr_peerIoIsIncoming( io )
1600               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1601
1602        {
1603        }
1604        else
1605        {
1606            tr_peer * peer = atom->peer;
1607
1608            if( peer ) /* we already have this peer */
1609            {
1610            }
1611            else
1612            {
1613                peer = getPeer( t, atom );
1614                tr_free( peer->client );
1615
1616                if( !peer_id )
1617                    peer->client = NULL;
1618                else {
1619                    char client[128];
1620                    tr_clientForId( client, sizeof( client ), peer_id );
1621                    peer->client = tr_strdup( client );
1622                }
1623
1624                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1625                                                                balanced by our unref in peerDestructor()  */
1626                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1627                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1628
1629                success = TRUE;
1630            }
1631        }
1632    }
1633
1634    if( t )
1635        torrentUnlock( t );
1636
1637    return success;
1638}
1639
1640void
1641tr_peerMgrAddIncoming( tr_peerMgr * manager,
1642                       tr_address * addr,
1643                       tr_port      port,
1644                       int          socket )
1645{
1646    tr_session * session;
1647
1648    managerLock( manager );
1649
1650    assert( tr_isSession( manager->session ) );
1651    session = manager->session;
1652
1653    if( tr_sessionIsAddressBlocked( session, addr ) )
1654    {
1655        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1656        tr_netClose( session, socket );
1657    }
1658    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1659    {
1660        tr_netClose( session, socket );
1661    }
1662    else /* we don't have a connection to them yet... */
1663    {
1664        tr_peerIo *    io;
1665        tr_handshake * handshake;
1666
1667        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket );
1668
1669        handshake = tr_handshakeNew( io,
1670                                     session->encryptionMode,
1671                                     myHandshakeDoneCB,
1672                                     manager );
1673
1674        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1675
1676        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1677                                 handshakeCompare );
1678    }
1679
1680    managerUnlock( manager );
1681}
1682
1683static tr_bool
1684tr_isPex( const tr_pex * pex )
1685{
1686    return pex && tr_isAddress( &pex->addr );
1687}
1688
1689void
1690tr_peerMgrAddPex( tr_torrent   *  tor,
1691                  uint8_t         from,
1692                  const tr_pex *  pex )
1693{
1694    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1695    {
1696        Torrent * t = tor->torrentPeers;
1697        managerLock( t->manager );
1698
1699        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1700            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1701                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1702
1703        managerUnlock( t->manager );
1704    }
1705}
1706
1707tr_pex *
1708tr_peerMgrCompactToPex( const void *    compact,
1709                        size_t          compactLen,
1710                        const uint8_t * added_f,
1711                        size_t          added_f_len,
1712                        size_t *        pexCount )
1713{
1714    size_t          i;
1715    size_t          n = compactLen / 6;
1716    const uint8_t * walk = compact;
1717    tr_pex *        pex = tr_new0( tr_pex, n );
1718
1719    for( i = 0; i < n; ++i )
1720    {
1721        pex[i].addr.type = TR_AF_INET;
1722        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1723        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1724        if( added_f && ( n == added_f_len ) )
1725            pex[i].flags = added_f[i];
1726    }
1727
1728    *pexCount = n;
1729    return pex;
1730}
1731
1732tr_pex *
1733tr_peerMgrCompact6ToPex( const void    * compact,
1734                         size_t          compactLen,
1735                         const uint8_t * added_f,
1736                         size_t          added_f_len,
1737                         size_t        * pexCount )
1738{
1739    size_t          i;
1740    size_t          n = compactLen / 18;
1741    const uint8_t * walk = compact;
1742    tr_pex *        pex = tr_new0( tr_pex, n );
1743
1744    for( i = 0; i < n; ++i )
1745    {
1746        pex[i].addr.type = TR_AF_INET6;
1747        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1748        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1749        if( added_f && ( n == added_f_len ) )
1750            pex[i].flags = added_f[i];
1751    }
1752
1753    *pexCount = n;
1754    return pex;
1755}
1756
1757tr_pex *
1758tr_peerMgrArrayToPex( const void * array,
1759                      size_t       arrayLen,
1760                      size_t      * pexCount )
1761{
1762    size_t          i;
1763    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1764    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1765    const uint8_t * walk = array;
1766    tr_pex        * pex = tr_new0( tr_pex, n );
1767
1768    for( i = 0 ; i < n ; i++ ) {
1769        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1770        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1771        pex[i].flags = 0x00;
1772        walk += sizeof( tr_address ) + 2;
1773    }
1774
1775    *pexCount = n;
1776    return pex;
1777}
1778
1779/**
1780***
1781**/
1782
1783void
1784tr_peerMgrSetBlame( tr_torrent     * tor,
1785                    tr_piece_index_t pieceIndex,
1786                    int              success )
1787{
1788    if( !success )
1789    {
1790        int        peerCount, i;
1791        Torrent *  t = tor->torrentPeers;
1792        tr_peer ** peers;
1793
1794        assert( torrentIsLocked( t ) );
1795
1796        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1797        for( i = 0; i < peerCount; ++i )
1798        {
1799            tr_peer * peer = peers[i];
1800            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1801            {
1802                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1803                        tr_atomAddrStr( peer->atom ),
1804                        pieceIndex, (int)peer->strikes + 1 );
1805                addStrike( t, peer );
1806            }
1807        }
1808    }
1809}
1810
1811int
1812tr_pexCompare( const void * va, const void * vb )
1813{
1814    const tr_pex * a = va;
1815    const tr_pex * b = vb;
1816    int i;
1817
1818    assert( tr_isPex( a ) );
1819    assert( tr_isPex( b ) );
1820
1821    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1822        return i;
1823
1824    if( a->port != b->port )
1825        return a->port < b->port ? -1 : 1;
1826
1827    return 0;
1828}
1829
1830#if 0
1831static int
1832peerPrefersCrypto( const tr_peer * peer )
1833{
1834    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1835        return TRUE;
1836
1837    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1838        return FALSE;
1839
1840    return tr_peerIoIsEncrypted( peer->io );
1841}
1842#endif
1843
1844/* better goes first */
1845static int
1846compareAtomsByUsefulness( const void * va, const void *vb )
1847{
1848    const struct peer_atom * a = * (const struct peer_atom**) va;
1849    const struct peer_atom * b = * (const struct peer_atom**) vb;
1850
1851    assert( tr_isAtom( a ) );
1852    assert( tr_isAtom( b ) );
1853
1854    if( a->piece_data_time != b->piece_data_time )
1855        return a->piece_data_time > b->piece_data_time ? -1 : 1;
1856    if( a->from != b->from )
1857        return a->from < b->from ? -1 : 1;
1858    if( a->numFails != b->numFails )
1859        return a->numFails < b->numFails ? -1 : 1;
1860
1861    return 0;
1862}
1863
1864int
1865tr_peerMgrGetPeers( tr_torrent   * tor,
1866                    tr_pex      ** setme_pex,
1867                    uint8_t        af,
1868                    uint8_t        list_mode,
1869                    int            maxCount )
1870{
1871    int i;
1872    int n;
1873    int count = 0;
1874    int atomCount = 0;
1875    const Torrent * t = tor->torrentPeers;
1876    struct peer_atom ** atoms = NULL;
1877    tr_pex * pex;
1878    tr_pex * walk;
1879
1880    assert( tr_isTorrent( tor ) );
1881    assert( setme_pex != NULL );
1882    assert( af==TR_AF_INET || af==TR_AF_INET6 );
1883    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
1884
1885    managerLock( t->manager );
1886
1887    /**
1888    ***  build a list of atoms
1889    **/
1890
1891    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
1892    {
1893        int i;
1894        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1895        atomCount = tr_ptrArraySize( &t->peers );
1896        atoms = tr_new( struct peer_atom *, atomCount );
1897        for( i=0; i<atomCount; ++i )
1898            atoms[i] = peers[i]->atom;
1899    }
1900    else /* TR_PEERS_ALL */
1901    {
1902        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
1903        atomCount = tr_ptrArraySize( &t->pool );
1904        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
1905    }
1906
1907    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
1908
1909    /**
1910    ***  add the first N of them into our return list
1911    **/
1912
1913    n = MIN( atomCount, maxCount );
1914    pex = walk = tr_new0( tr_pex, n );
1915
1916    for( i=0; i<atomCount && count<n; ++i )
1917    {
1918        const struct peer_atom * atom = atoms[i];
1919        if( atom->addr.type == af )
1920        {
1921            assert( tr_isAddress( &atom->addr ) );
1922            walk->addr = atom->addr;
1923            walk->port = atom->port;
1924            walk->flags = atom->flags;
1925            ++count;
1926            ++walk;
1927        }
1928    }
1929
1930    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
1931
1932    assert( ( walk - pex ) == count );
1933    *setme_pex = pex;
1934
1935    /* cleanup */
1936    tr_free( atoms );
1937    managerUnlock( t->manager );
1938    return count;
1939}
1940
1941static void atomPulse      ( int, short, void * );
1942static void bandwidthPulse ( int, short, void * );
1943static void rechokePulse   ( int, short, void * );
1944static void reconnectPulse ( int, short, void * );
1945
1946static struct event *
1947createTimer( int msec, void (*callback)(int, short, void *), void * cbdata )
1948{
1949    struct event * timer = tr_new0( struct event, 1 );
1950    evtimer_set( timer, callback, cbdata );
1951    tr_timerAddMsec( timer, msec );
1952    return timer;
1953}
1954
1955static void
1956ensureMgrTimersExist( struct tr_peerMgr * m )
1957{
1958    if( m->atomTimer == NULL )
1959        m->atomTimer = createTimer( ATOM_PERIOD_MSEC, atomPulse, m );
1960
1961    if( m->bandwidthTimer == NULL )
1962        m->bandwidthTimer = createTimer( BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
1963
1964    if( m->rechokeTimer == NULL )
1965        m->rechokeTimer = createTimer( RECHOKE_PERIOD_MSEC, rechokePulse, m );
1966
1967    if( m->reconnectTimer == NULL )
1968        m->reconnectTimer = createTimer( RECONNECT_PERIOD_MSEC, reconnectPulse, m );
1969
1970    if( m->refillUpkeepTimer == NULL )
1971        m->refillUpkeepTimer = createTimer( REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
1972}
1973
1974void
1975tr_peerMgrStartTorrent( tr_torrent * tor )
1976{
1977    Torrent * t = tor->torrentPeers;
1978
1979    assert( t != NULL );
1980    managerLock( t->manager );
1981    ensureMgrTimersExist( t->manager );
1982
1983    t->isRunning = TRUE;
1984
1985    rechokePulse( 0, 0, t->manager );
1986    managerUnlock( t->manager );
1987}
1988
1989static void
1990stopTorrent( Torrent * t )
1991{
1992    int i, n;
1993
1994    assert( torrentIsLocked( t ) );
1995
1996    t->isRunning = FALSE;
1997
1998    /* disconnect the peers. */
1999    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
2000        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
2001    tr_ptrArrayClear( &t->peers );
2002
2003    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
2004     * which removes the handshake from t->outgoingHandshakes... */
2005    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2006        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2007}
2008
2009void
2010tr_peerMgrStopTorrent( tr_torrent * tor )
2011{
2012    Torrent * t = tor->torrentPeers;
2013
2014    managerLock( t->manager );
2015
2016    stopTorrent( t );
2017
2018    managerUnlock( t->manager );
2019}
2020
2021void
2022tr_peerMgrAddTorrent( tr_peerMgr * manager,
2023                      tr_torrent * tor )
2024{
2025    managerLock( manager );
2026
2027    assert( tor );
2028    assert( tor->torrentPeers == NULL );
2029
2030    tor->torrentPeers = torrentConstructor( manager, tor );
2031
2032    managerUnlock( manager );
2033}
2034
2035void
2036tr_peerMgrRemoveTorrent( tr_torrent * tor )
2037{
2038    tr_torrentLock( tor );
2039
2040    stopTorrent( tor->torrentPeers );
2041    torrentDestructor( tor->torrentPeers );
2042
2043    tr_torrentUnlock( tor );
2044}
2045
2046void
2047tr_peerMgrTorrentAvailability( const tr_torrent * tor,
2048                               int8_t           * tab,
2049                               unsigned int       tabCount )
2050{
2051    tr_piece_index_t   i;
2052    const Torrent *    t;
2053    float              interval;
2054    tr_bool            isSeed;
2055    int                peerCount;
2056    const tr_peer **   peers;
2057    tr_torrentLock( tor );
2058
2059    t = tor->torrentPeers;
2060    tor = t->tor;
2061    interval = tor->info.pieceCount / (float)tabCount;
2062    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
2063    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2064    peerCount = tr_ptrArraySize( &t->peers );
2065
2066    memset( tab, 0, tabCount );
2067
2068    for( i = 0; tor && i < tabCount; ++i )
2069    {
2070        const int piece = i * interval;
2071
2072        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2073            tab[i] = -1;
2074        else if( peerCount ) {
2075            int j;
2076            for( j = 0; j < peerCount; ++j )
2077                if( tr_bitsetHas( &peers[j]->have, i ) )
2078                    ++tab[i];
2079        }
2080    }
2081
2082    tr_torrentUnlock( tor );
2083}
2084
2085/* Returns the pieces that are available from peers */
2086tr_bitfield*
2087tr_peerMgrGetAvailable( const tr_torrent * tor )
2088{
2089    int i;
2090    int peerCount;
2091    Torrent * t = tor->torrentPeers;
2092    const tr_peer ** peers;
2093    tr_bitfield * pieces;
2094    managerLock( t->manager );
2095
2096    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
2097    peerCount = tr_ptrArraySize( &t->peers );
2098    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2099    for( i=0; i<peerCount; ++i )
2100        tr_bitsetOr( pieces, &peers[i]->have );
2101
2102    managerUnlock( t->manager );
2103    return pieces;
2104}
2105
2106void
2107tr_peerMgrTorrentStats( tr_torrent       * tor,
2108                        int              * setmePeersKnown,
2109                        int              * setmePeersConnected,
2110                        int              * setmeSeedsConnected,
2111                        int              * setmeWebseedsSendingToUs,
2112                        int              * setmePeersSendingToUs,
2113                        int              * setmePeersGettingFromUs,
2114                        int              * setmePeersFrom )
2115{
2116    int i, size;
2117    const Torrent * t = tor->torrentPeers;
2118    const tr_peer ** peers;
2119    const tr_webseed ** webseeds;
2120
2121    managerLock( t->manager );
2122
2123    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2124    size = tr_ptrArraySize( &t->peers );
2125
2126    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
2127    *setmePeersConnected       = 0;
2128    *setmeSeedsConnected       = 0;
2129    *setmePeersGettingFromUs   = 0;
2130    *setmePeersSendingToUs     = 0;
2131    *setmeWebseedsSendingToUs  = 0;
2132
2133    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2134        setmePeersFrom[i] = 0;
2135
2136    for( i=0; i<size; ++i )
2137    {
2138        const tr_peer * peer = peers[i];
2139        const struct peer_atom * atom = peer->atom;
2140
2141        if( peer->io == NULL ) /* not connected */
2142            continue;
2143
2144        ++*setmePeersConnected;
2145
2146        ++setmePeersFrom[atom->from];
2147
2148        if( clientIsDownloadingFrom( tor, peer ) )
2149            ++*setmePeersSendingToUs;
2150
2151        if( clientIsUploadingTo( peer ) )
2152            ++*setmePeersGettingFromUs;
2153
2154        if( atom->flags & ADDED_F_SEED_FLAG )
2155            ++*setmeSeedsConnected;
2156    }
2157
2158    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2159    size = tr_ptrArraySize( &t->webseeds );
2160    for( i=0; i<size; ++i )
2161        if( tr_webseedIsActive( webseeds[i] ) )
2162            ++*setmeWebseedsSendingToUs;
2163
2164    managerUnlock( t->manager );
2165}
2166
2167float
2168tr_peerMgrGetWebseedSpeed( const tr_torrent * tor, uint64_t now )
2169{
2170    int i;
2171    float tmp;
2172    float ret = 0;
2173
2174    const Torrent * t = tor->torrentPeers;
2175    const int n = tr_ptrArraySize( &t->webseeds );
2176    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2177
2178    for( i=0; i<n; ++i )
2179        if( tr_webseedGetSpeed( webseeds[i], now, &tmp ) )
2180            ret += tmp;
2181
2182    return ret;
2183}
2184
2185
2186float*
2187tr_peerMgrWebSpeeds( const tr_torrent * tor )
2188{
2189    const Torrent * t = tor->torrentPeers;
2190    const tr_webseed ** webseeds;
2191    int i;
2192    int webseedCount;
2193    float * ret;
2194    uint64_t now;
2195
2196    assert( t->manager );
2197    managerLock( t->manager );
2198
2199    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2200    webseedCount = tr_ptrArraySize( &t->webseeds );
2201    assert( webseedCount == tor->info.webseedCount );
2202    ret = tr_new0( float, webseedCount );
2203    now = tr_date( );
2204
2205    for( i=0; i<webseedCount; ++i )
2206        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
2207            ret[i] = -1.0;
2208
2209    managerUnlock( t->manager );
2210    return ret;
2211}
2212
2213double
2214tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
2215{
2216    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
2217}
2218
2219
2220struct tr_peer_stat *
2221tr_peerMgrPeerStats( const tr_torrent    * tor,
2222                     int                 * setmeCount )
2223{
2224    int i, size;
2225    const Torrent * t = tor->torrentPeers;
2226    const tr_peer ** peers;
2227    tr_peer_stat * ret;
2228    uint64_t now;
2229
2230    assert( t->manager );
2231    managerLock( t->manager );
2232
2233    size = tr_ptrArraySize( &t->peers );
2234    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2235    ret = tr_new0( tr_peer_stat, size );
2236    now = tr_date( );
2237
2238    for( i=0; i<size; ++i )
2239    {
2240        char *                   pch;
2241        const tr_peer *          peer = peers[i];
2242        const struct peer_atom * atom = peer->atom;
2243        tr_peer_stat *           stat = ret + i;
2244
2245        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2246        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2247                   sizeof( stat->client ) );
2248        stat->port                = ntohs( peer->atom->port );
2249        stat->from                = atom->from;
2250        stat->progress            = peer->progress;
2251        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2252        stat->rateToPeer          = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
2253        stat->rateToClient        = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
2254        stat->peerIsChoked        = peer->peerIsChoked;
2255        stat->peerIsInterested    = peer->peerIsInterested;
2256        stat->clientIsChoked      = peer->clientIsChoked;
2257        stat->clientIsInterested  = peer->clientIsInterested;
2258        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2259        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2260        stat->isUploadingTo       = clientIsUploadingTo( peer );
2261        stat->isSeed              = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
2262        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2263        stat->pendingReqsToClient = peer->pendingReqsToClient;
2264
2265        pch = stat->flagStr;
2266        if( t->optimistic == peer ) *pch++ = 'O';
2267        if( stat->isDownloadingFrom ) *pch++ = 'D';
2268        else if( stat->clientIsInterested ) *pch++ = 'd';
2269        if( stat->isUploadingTo ) *pch++ = 'U';
2270        else if( stat->peerIsInterested ) *pch++ = 'u';
2271        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2272        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2273        if( stat->isEncrypted ) *pch++ = 'E';
2274        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2275        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2276        if( stat->isIncoming ) *pch++ = 'I';
2277        *pch = '\0';
2278    }
2279
2280    *setmeCount = size;
2281
2282    managerUnlock( t->manager );
2283    return ret;
2284}
2285
2286/**
2287***
2288**/
2289
2290struct ChokeData
2291{
2292    tr_bool         doUnchoke;
2293    tr_bool         isInterested;
2294    tr_bool         isChoked;
2295    int             rate;
2296    tr_peer *       peer;
2297};
2298
2299static int
2300compareChoke( const void * va,
2301              const void * vb )
2302{
2303    const struct ChokeData * a = va;
2304    const struct ChokeData * b = vb;
2305
2306    if( a->rate != b->rate ) /* prefer higher overall speeds */
2307        return a->rate > b->rate ? -1 : 1;
2308
2309    if( a->isChoked != b->isChoked ) /* prefer unchoked */
2310        return a->isChoked ? 1 : -1;
2311
2312    return 0;
2313}
2314
2315static int
2316isNew( const tr_peer * peer )
2317{
2318    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2319}
2320
2321static int
2322isSame( const tr_peer * peer )
2323{
2324    return peer && peer->client && strstr( peer->client, "Transmission" );
2325}
2326
2327/**
2328***
2329**/
2330
2331static void
2332rechokeTorrent( Torrent * t, const uint64_t now )
2333{
2334    int i, size, unchokedInterested;
2335    const int peerCount = tr_ptrArraySize( &t->peers );
2336    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2337    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2338    const tr_session * session = t->manager->session;
2339    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
2340
2341    assert( torrentIsLocked( t ) );
2342
2343    /* sort the peers by preference and rate */
2344    for( i = 0, size = 0; i < peerCount; ++i )
2345    {
2346        tr_peer * peer = peers[i];
2347        struct peer_atom * atom = peer->atom;
2348
2349        if( peer->progress >= 1.0 ) /* choke all seeds */
2350        {
2351            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2352        }
2353        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2354        {
2355            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2356        }
2357        else if( chokeAll ) /* choke everyone if we're not uploading */
2358        {
2359            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2360        }
2361        else
2362        {
2363            struct ChokeData * n = &choke[size++];
2364            n->peer         = peer;
2365            n->isInterested = peer->peerIsInterested;
2366            n->isChoked     = peer->peerIsChoked;
2367            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
2368        }
2369    }
2370
2371    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
2372
2373    /**
2374     * Reciprocation and number of uploads capping is managed by unchoking
2375     * the N peers which have the best upload rate and are interested.
2376     * This maximizes the client's download rate. These N peers are
2377     * referred to as downloaders, because they are interested in downloading
2378     * from the client.
2379     *
2380     * Peers which have a better upload rate (as compared to the downloaders)
2381     * but aren't interested get unchoked. If they become interested, the
2382     * downloader with the worst upload rate gets choked. If a client has
2383     * a complete file, it uses its upload rate rather than its download
2384     * rate to decide which peers to unchoke.
2385     */
2386    unchokedInterested = 0;
2387    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
2388        choke[i].doUnchoke = 1;
2389        if( choke[i].isInterested )
2390            ++unchokedInterested;
2391    }
2392
2393    /* optimistic unchoke */
2394    if( i < size )
2395    {
2396        int n;
2397        struct ChokeData * c;
2398        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2399
2400        for( ; i<size; ++i )
2401        {
2402            if( choke[i].isInterested )
2403            {
2404                const tr_peer * peer = choke[i].peer;
2405                int x = 1, y;
2406                if( isNew( peer ) ) x *= 3;
2407                if( isSame( peer ) ) x *= 3;
2408                for( y=0; y<x; ++y )
2409                    tr_ptrArrayAppend( &randPool, &choke[i] );
2410            }
2411        }
2412
2413        if(( n = tr_ptrArraySize( &randPool )))
2414        {
2415            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2416            c->doUnchoke = 1;
2417            t->optimistic = c->peer;
2418        }
2419
2420        tr_ptrArrayDestruct( &randPool, NULL );
2421    }
2422
2423    for( i=0; i<size; ++i )
2424        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2425
2426    /* cleanup */
2427    tr_free( choke );
2428}
2429
2430static void
2431rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
2432{
2433    uint64_t now;
2434    tr_torrent * tor = NULL;
2435    tr_peerMgr * mgr = vmgr;
2436    managerLock( mgr );
2437
2438    now = tr_date( );
2439    while(( tor = tr_torrentNext( mgr->session, tor )))
2440        if( tor->isRunning )
2441            rechokeTorrent( tor->torrentPeers, now );
2442
2443    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
2444    managerUnlock( mgr );
2445}
2446
2447/***
2448****
2449****  Life and Death
2450****
2451***/
2452
2453typedef enum
2454{
2455    TR_CAN_KEEP,
2456    TR_CAN_CLOSE,
2457    TR_MUST_CLOSE,
2458}
2459tr_close_type_t;
2460
2461static tr_close_type_t
2462shouldPeerBeClosed( const Torrent    * t,
2463                    const tr_peer    * peer,
2464                    int                peerCount,
2465                    const time_t       now )
2466{
2467    const tr_torrent *       tor = t->tor;
2468    const struct peer_atom * atom = peer->atom;
2469
2470    /* if it's marked for purging, close it */
2471    if( peer->doPurge )
2472    {
2473        tordbg( t, "purging peer %s because its doPurge flag is set",
2474                tr_atomAddrStr( atom ) );
2475        return TR_MUST_CLOSE;
2476    }
2477
2478    /* if we're seeding and the peer has everything we have,
2479     * and enough time has passed for a pex exchange, then disconnect */
2480    if( tr_torrentIsSeed( tor ) )
2481    {
2482        int peerHasEverything;
2483        if( atom->flags & ADDED_F_SEED_FLAG )
2484            peerHasEverything = TRUE;
2485        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2486            peerHasEverything = FALSE;
2487        else {
2488            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2489            tr_bitsetDifference( tmp, &peer->have );
2490            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2491            tr_bitfieldFree( tmp );
2492        }
2493
2494        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2495        {
2496            tordbg( t, "purging peer %s because we're both seeds",
2497                    tr_atomAddrStr( atom ) );
2498            return TR_MUST_CLOSE;
2499        }
2500    }
2501
2502    /* disconnect if it's been too long since piece data has been transferred.
2503     * this is on a sliding scale based on number of available peers... */
2504    {
2505        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2506        /* if we have >= relaxIfFewerThan, strictness is 100%.
2507         * if we have zero connections, strictness is 0% */
2508        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2509                               ? 1.0
2510                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2511        const int lo = MIN_UPLOAD_IDLE_SECS;
2512        const int hi = MAX_UPLOAD_IDLE_SECS;
2513        const int limit = hi - ( ( hi - lo ) * strictness );
2514        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2515/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2516        if( idleTime > limit ) {
2517            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2518                       tr_atomAddrStr( atom ), idleTime );
2519            return TR_CAN_CLOSE;
2520        }
2521    }
2522
2523    return TR_CAN_KEEP;
2524}
2525
2526static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
2527
2528static tr_peer **
2529getPeersToClose( Torrent * t, tr_close_type_t closeType, const time_t now, int * setmeSize )
2530{
2531    int i, peerCount, outsize;
2532    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2533    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2534
2535    assert( torrentIsLocked( t ) );
2536
2537    for( i = outsize = 0; i < peerCount; ++i )
2538        if( shouldPeerBeClosed( t, peers[i], peerCount, now ) == closeType )
2539            ret[outsize++] = peers[i];
2540
2541    sortPeersByLivelinessReverse ( ret, NULL, outsize, tr_date( ) );
2542
2543    *setmeSize = outsize;
2544    return ret;
2545}
2546
2547static int
2548compareCandidates( const void * va, const void * vb )
2549{
2550    const struct peer_atom * a = *(const struct peer_atom**) va;
2551    const struct peer_atom * b = *(const struct peer_atom**) vb;
2552
2553    /* <Charles> Here we would probably want to try reconnecting to
2554     * peers that had most recently given us data. Lots of users have
2555     * trouble with resets due to their routers and/or ISPs. This way we
2556     * can quickly recover from an unwanted reset. So we sort
2557     * piece_data_time in descending order.
2558     */
2559
2560    if( a->piece_data_time != b->piece_data_time )
2561        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2562
2563    if( a->numFails != b->numFails )
2564        return a->numFails < b->numFails ? -1 : 1;
2565
2566    if( a->time != b->time )
2567        return a->time < b->time ? -1 : 1;
2568
2569    /* In order to avoid fragmenting the swarm, peers from trackers and
2570     * from the DHT should be preferred to peers from PEX. */
2571    if( a->from != b->from )
2572        return a->from < b->from ? -1 : 1;
2573
2574    return 0;
2575}
2576
2577static int
2578getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
2579{
2580    int sec;
2581
2582    /* if we were recently connected to this peer and transferring piece
2583     * data, try to reconnect to them sooner rather that later -- we don't
2584     * want network troubles to get in the way of a good peer. */
2585    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2586        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2587
2588    /* don't allow reconnects more often than our minimum */
2589    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2590        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2591
2592    /* otherwise, the interval depends on how many times we've tried
2593     * and failed to connect to the peer */
2594    else switch( atom->numFails ) {
2595        case 0: sec = 0; break;
2596        case 1: sec = 5; break;
2597        case 2: sec = 2 * 60; break;
2598        case 3: sec = 15 * 60; break;
2599        case 4: sec = 30 * 60; break;
2600        case 5: sec = 60 * 60; break;
2601        default: sec = 120 * 60; break;
2602    }
2603
2604    return sec;
2605}
2606
2607static struct peer_atom **
2608getPeerCandidates( Torrent * t, const time_t now, int * setmeSize )
2609{
2610    int                 i, atomCount, retCount;
2611    struct peer_atom ** atoms;
2612    struct peer_atom ** ret;
2613    const int           seed = tr_torrentIsSeed( t->tor );
2614
2615    assert( torrentIsLocked( t ) );
2616
2617    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2618    ret = tr_new( struct peer_atom*, atomCount );
2619    for( i = retCount = 0; i < atomCount; ++i )
2620    {
2621        int                interval;
2622        struct peer_atom * atom = atoms[i];
2623
2624        /* peer fed us too much bad data ... we only keep it around
2625         * now to weed it out in case someone sends it to us via pex */
2626        if( atom->myflags & MYFLAG_BANNED )
2627            continue;
2628
2629        /* peer was unconnectable before, so we're not going to keep trying.
2630         * this is needs a separate flag from `banned', since if they try
2631         * to connect to us later, we'll let them in */
2632        if( atom->myflags & MYFLAG_UNREACHABLE )
2633            continue;
2634
2635        /* no need to connect if we're both seeds... */
2636        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2637                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2638            continue;
2639
2640        /* don't reconnect too often */
2641        interval = getReconnectIntervalSecs( atom, now );
2642        if( ( now - atom->time ) < interval )
2643        {
2644            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2645                    i, tr_atomAddrStr( atom ), interval );
2646            continue;
2647        }
2648
2649        /* Don't connect to peers in our blocklist */
2650        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2651            continue;
2652
2653        /* we don't need two connections to the same peer... */
2654        if( peerIsInUse( t, atom ) )
2655            continue;
2656
2657        ret[retCount++] = atom;
2658    }
2659
2660    if( retCount != 0 )
2661        qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2662    *setmeSize = retCount;
2663    return ret;
2664}
2665
2666static void
2667closePeer( Torrent * t, tr_peer * peer )
2668{
2669    struct peer_atom * atom;
2670
2671    assert( t != NULL );
2672    assert( peer != NULL );
2673
2674    atom = peer->atom;
2675
2676    /* if we transferred piece data, then they might be good peers,
2677       so reset their `numFails' weight to zero.  otherwise we connected
2678       to them fruitlessly, so mark it as another fail */
2679    if( atom->piece_data_time )
2680        atom->numFails = 0;
2681    else
2682        ++atom->numFails;
2683
2684    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2685    removePeer( t, peer );
2686}
2687
2688static void
2689reconnectTorrent( Torrent * t )
2690{
2691    static time_t prevTime = 0;
2692    static int    newConnectionsThisSecond = 0;
2693    const time_t  now = tr_time( );
2694
2695    if( prevTime != now )
2696    {
2697        prevTime = now;
2698        newConnectionsThisSecond = 0;
2699    }
2700
2701    if( !t->isRunning )
2702    {
2703        removeAllPeers( t );
2704    }
2705    else
2706    {
2707        int i;
2708        int mustCloseCount;
2709        int maxCandidates;
2710        struct tr_peer ** mustClose;
2711
2712        /* disconnect the really bad peers */
2713        mustClose = getPeersToClose( t, TR_MUST_CLOSE, now, &mustCloseCount );
2714        for( i=0; i<mustCloseCount; ++i )
2715            closePeer( t, mustClose[i] );
2716        tr_free( mustClose );
2717
2718        /* decide how many peers can we try to add in this pass */
2719        maxCandidates = MAX_RECONNECTIONS_PER_PULSE;
2720        if( tr_announcerHasBacklog( t->manager->session->announcer ) )
2721            maxCandidates /= 2;
2722        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2723        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2724
2725        /* select the best candidates, if they are requested */
2726        if( maxCandidates == 0 )
2727        {
2728            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
2729                       "NO connection candidates needed, %d atoms, "
2730                       "max per pulse is %d",
2731                       t->tor->info.name, mustCloseCount,
2732                       tr_ptrArraySize( &t->pool ),
2733                       MAX_RECONNECTIONS_PER_PULSE );
2734
2735            tordbg( t, "maxCandidates is %d, MAX_RECONNECTIONS_PER_PULSE is %d, "
2736                       "getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2737                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2738                       maxCandidates, MAX_RECONNECTIONS_PER_PULSE,
2739                       getPeerCount( t ), getMaxPeerCount( t->tor ),
2740                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2741        }
2742        else
2743        {
2744            int canCloseCount = 0;
2745            int candidateCount;
2746            struct peer_atom ** candidates;
2747
2748            candidates = getPeerCandidates( t, now, &candidateCount );
2749            maxCandidates = MIN( maxCandidates, candidateCount );
2750
2751            /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2752            if( maxCandidates != 0 )
2753            {
2754                struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, now, &canCloseCount );
2755                for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2756                   closePeer( t, canClose[i] );
2757                tr_free( canClose );
2758            }
2759
2760            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
2761                       "%d can-close connections, %d connection candidates, "
2762                       "%d atoms, max per pulse is %d",
2763                       t->tor->info.name, mustCloseCount,
2764                       canCloseCount, candidateCount,
2765                       tr_ptrArraySize( &t->pool ), MAX_RECONNECTIONS_PER_PULSE );
2766
2767            tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2768                       " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2769                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2770                       candidateCount, MAX_RECONNECTIONS_PER_PULSE,
2771                       getPeerCount( t ), getMaxPeerCount( t->tor ),
2772                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2773
2774            /* add some new ones */
2775            for( i=0; i<maxCandidates; ++i )
2776            {
2777                tr_peerMgr        * mgr = t->manager;
2778                struct peer_atom  * atom = candidates[i];
2779                tr_peerIo         * io;
2780
2781                tordbg( t, "Starting an OUTGOING connection with %s",
2782                        tr_atomAddrStr( atom ) );
2783
2784                io = tr_peerIoNewOutgoing( mgr->session,
2785                                           mgr->session->bandwidth,
2786                                           &atom->addr,
2787                                           atom->port,
2788                                           t->tor->info.hash,
2789                                           t->tor->completeness == TR_SEED );
2790
2791                if( io == NULL )
2792                {
2793                    tordbg( t, "peerIo not created; marking peer %s as unreachable",
2794                            tr_atomAddrStr( atom ) );
2795                    atom->myflags |= MYFLAG_UNREACHABLE;
2796                }
2797                else
2798                {
2799                    tr_handshake * handshake = tr_handshakeNew( io,
2800                                                                mgr->session->encryptionMode,
2801                                                                myHandshakeDoneCB,
2802                                                                mgr );
2803
2804                    assert( tr_peerIoGetTorrentHash( io ) );
2805
2806                    tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2807
2808                    ++newConnectionsThisSecond;
2809
2810                    tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2811                                             handshakeCompare );
2812                }
2813
2814                atom->time = now;
2815            }
2816            tr_free( candidates );
2817        }
2818    }
2819}
2820
2821struct peer_liveliness
2822{
2823    tr_peer * peer;
2824    void * clientData;
2825    time_t pieceDataTime;
2826    time_t time;
2827    int speed;
2828    tr_bool doPurge;
2829};
2830
2831static int
2832comparePeerLiveliness( const void * va, const void * vb )
2833{
2834    const struct peer_liveliness * a = va;
2835    const struct peer_liveliness * b = vb;
2836
2837    if( a->doPurge != b->doPurge )
2838        return a->doPurge ? 1 : -1;
2839
2840    if( a->speed != b->speed ) /* faster goes first */
2841        return a->speed > b->speed ? -1 : 1;
2842
2843    /* the one to give us data more recently goes first */
2844    if( a->pieceDataTime != b->pieceDataTime )
2845        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
2846
2847    /* the one we connected to most recently goes first */
2848    if( a->time != b->time )
2849        return a->time > b->time ? -1 : 1;
2850
2851    return 0;
2852}
2853
2854static int
2855comparePeerLivelinessReverse( const void * va, const void * vb )
2856{
2857    return -comparePeerLiveliness (va, vb);
2858}
2859
2860static void
2861sortPeersByLivelinessImpl( tr_peer  ** peers,
2862                           void     ** clientData,
2863                           int         n,
2864                           uint64_t    now,
2865                           int (*compare) ( const void *va, const void *vb ) )
2866{
2867    int i;
2868    struct peer_liveliness *lives, *l;
2869
2870    /* build a sortable array of peer + extra info */
2871    lives = l = tr_new0( struct peer_liveliness, n );
2872    for( i=0; i<n; ++i, ++l )
2873    {
2874        tr_peer * p = peers[i];
2875        l->peer = p;
2876        l->doPurge = p->doPurge;
2877        l->pieceDataTime = p->atom->piece_data_time;
2878        l->time = p->atom->time;
2879        l->speed = 1024.0 * (   tr_peerGetPieceSpeed( p, now, TR_UP )
2880                              + tr_peerGetPieceSpeed( p, now, TR_DOWN ) );
2881        if( clientData )
2882            l->clientData = clientData[i];
2883    }
2884
2885    /* sort 'em */
2886    assert( n == ( l - lives ) );
2887    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
2888
2889    /* build the peer array */
2890    for( i=0, l=lives; i<n; ++i, ++l ) {
2891        peers[i] = l->peer;
2892        if( clientData )
2893            clientData[i] = l->clientData;
2894    }
2895    assert( n == ( l - lives ) );
2896
2897    /* cleanup */
2898    tr_free( lives );
2899}
2900
2901static void
2902sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2903{
2904    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
2905}
2906
2907static void
2908sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2909{
2910    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
2911}
2912
2913
2914static void
2915enforceTorrentPeerLimit( Torrent * t, uint64_t now )
2916{
2917    int n = tr_ptrArraySize( &t->peers );
2918    const int max = tr_torrentGetPeerLimit( t->tor );
2919    if( n > max )
2920    {
2921        void * base = tr_ptrArrayBase( &t->peers );
2922        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
2923        sortPeersByLiveliness( peers, NULL, n, now );
2924        while( n > max )
2925            closePeer( t, peers[--n] );
2926        tr_free( peers );
2927    }
2928}
2929
2930static void
2931enforceSessionPeerLimit( tr_session * session, uint64_t now )
2932{
2933    int n = 0;
2934    tr_torrent * tor = NULL;
2935    const int max = tr_sessionGetPeerLimit( session );
2936
2937    /* count the total number of peers */
2938    while(( tor = tr_torrentNext( session, tor )))
2939        n += tr_ptrArraySize( &tor->torrentPeers->peers );
2940
2941    /* if there are too many, prune out the worst */
2942    if( n > max )
2943    {
2944        tr_peer ** peers = tr_new( tr_peer*, n );
2945        Torrent ** torrents = tr_new( Torrent*, n );
2946
2947        /* populate the peer array */
2948        n = 0;
2949        tor = NULL;
2950        while(( tor = tr_torrentNext( session, tor ))) {
2951            int i;
2952            Torrent * t = tor->torrentPeers;
2953            const int tn = tr_ptrArraySize( &t->peers );
2954            for( i=0; i<tn; ++i, ++n ) {
2955                peers[n] = tr_ptrArrayNth( &t->peers, i );
2956                torrents[n] = t;
2957            }
2958        }
2959
2960        /* sort 'em */
2961        sortPeersByLiveliness( peers, (void**)torrents, n, now );
2962
2963        /* cull out the crappiest */
2964        while( n-- > max )
2965            closePeer( torrents[n], peers[n] );
2966
2967        /* cleanup */
2968        tr_free( torrents );
2969        tr_free( peers );
2970    }
2971}
2972
2973struct reconnectTorrentStruct
2974{
2975    tr_torrent * torrent;
2976    int salt;
2977};
2978
2979static int
2980compareReconnectTorrents( const void * va, const void * vb )
2981{
2982    int ai, bi;
2983    const struct reconnectTorrentStruct * a = va;
2984    const struct reconnectTorrentStruct * b = vb;
2985
2986    /* primary key: higher priority goes first */
2987    ai = tr_torrentGetPriority( a->torrent );
2988    bi = tr_torrentGetPriority( b->torrent );
2989    if( ai != bi )
2990        return ai > bi ? -1 : 1;
2991
2992    /* secondary key: since users tend to stare at the screens
2993     * watching their downloads' progress, give downloads a
2994     * first shot at attempting outbound peer connections. */
2995    ai = tr_torrentIsSeed( a->torrent );
2996    bi = tr_torrentIsSeed( b->torrent );
2997    if( ai != bi )
2998        return bi ? -1 : 1;
2999
3000    /* tertiary key: random */
3001    if( a->salt != b->salt )
3002        return a->salt - b->salt;
3003
3004    return 0;
3005}
3006
3007static void
3008reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3009{
3010    tr_torrent * tor;
3011    tr_peerMgr * mgr = vmgr;
3012    struct reconnectTorrentStruct * torrents;
3013    int torrentCount;
3014    int i;
3015    uint64_t now;
3016    managerLock( mgr );
3017
3018    now = tr_date( );
3019
3020    /**
3021    ***  enforce the per-session and per-torrent peer limits
3022    **/
3023
3024    /* if we're over the per-torrent peer limits, cull some peers */
3025    tor = NULL;
3026    while(( tor = tr_torrentNext( mgr->session, tor )))
3027        if( tor->isRunning )
3028            enforceTorrentPeerLimit( tor->torrentPeers, now );
3029
3030    /* if we're over the per-session peer limits, cull some peers */
3031    enforceSessionPeerLimit( mgr->session, now );
3032
3033    /**
3034    ***  try to make new peer connections
3035    **/
3036
3037    torrentCount = 0;
3038    torrents = tr_new( struct reconnectTorrentStruct,
3039                       mgr->session->torrentCount );
3040    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3041        if( tor->isRunning ) {
3042            struct reconnectTorrentStruct * r = torrents + torrentCount++;
3043            r->torrent = tor;
3044            r->salt = tr_cryptoWeakRandInt( 1024 );
3045        }
3046    }
3047    qsort( torrents,
3048           torrentCount, sizeof( struct reconnectTorrentStruct ),
3049           compareReconnectTorrents );
3050    for( i=0; i<torrentCount; ++i )
3051        reconnectTorrent( torrents[i].torrent->torrentPeers );
3052
3053
3054    /* cleanup */
3055    tr_free( torrents );
3056    tr_timerAddMsec( mgr->reconnectTimer, RECONNECT_PERIOD_MSEC );
3057    managerUnlock( mgr );
3058}
3059
3060/****
3061*****
3062*****  BANDWIDTH ALLOCATION
3063*****
3064****/
3065
3066static void
3067pumpAllPeers( tr_peerMgr * mgr )
3068{
3069    tr_torrent * tor = NULL;
3070
3071    while(( tor = tr_torrentNext( mgr->session, tor )))
3072    {
3073        int j;
3074        Torrent * t = tor->torrentPeers;
3075
3076        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3077        {
3078            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3079            tr_peerMsgsPulse( peer->msgs );
3080        }
3081    }
3082}
3083
3084static void
3085bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3086{
3087    tr_torrent * tor = NULL;
3088    tr_peerMgr * mgr = vmgr;
3089    managerLock( mgr );
3090
3091    /* FIXME: this next line probably isn't necessary... */
3092    pumpAllPeers( mgr );
3093
3094    /* allocate bandwidth to the peers */
3095    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3096    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3097
3098    /* possibly stop torrents that have seeded enough */
3099    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3100        if( tor->needsSeedRatioCheck ) {
3101            tor->needsSeedRatioCheck = FALSE;
3102            tr_torrentCheckSeedRatio( tor );
3103        }
3104    }
3105
3106    /* run the completeness check for any torrents that need it */
3107    tor = NULL;
3108    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3109        if( tor->torrentPeers->needsCompletenessCheck ) {
3110            tor->torrentPeers->needsCompletenessCheck  = FALSE;
3111            tr_torrentRecheckCompleteness( tor );
3112        }
3113    }
3114
3115    /* possibly stop torrents that have an error */
3116    tor = NULL;
3117    while(( tor = tr_torrentNext( mgr->session, tor )))
3118        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR ))
3119            tr_torrentStop( tor );
3120
3121    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3122    managerUnlock( mgr );
3123}
3124
3125/***
3126****
3127***/
3128
3129static int
3130compareAtomPtrsByAddress( const void * va, const void *vb )
3131{
3132    const struct peer_atom * a = * (const struct peer_atom**) va;
3133    const struct peer_atom * b = * (const struct peer_atom**) vb;
3134
3135    assert( tr_isAtom( a ) );
3136    assert( tr_isAtom( b ) );
3137
3138    return tr_compareAddresses( &a->addr, &b->addr );
3139}
3140
3141/* best come first, worst go last */
3142static int
3143compareAtomPtrsByShelfDate( const void * va, const void *vb )
3144{
3145    time_t atime;
3146    time_t btime;
3147    const struct peer_atom * a = * (const struct peer_atom**) va;
3148    const struct peer_atom * b = * (const struct peer_atom**) vb;
3149    const int data_time_cutoff_secs = 60 * 60;
3150    const time_t tr_now = tr_time( );
3151
3152    assert( tr_isAtom( a ) );
3153    assert( tr_isAtom( b ) );
3154
3155    /* primary key: the last piece data time *if* it was within the last hour */
3156    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3157    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3158    if( atime != btime )
3159        return atime > btime ? -1 : 1;
3160
3161    /* secondary key: shelf date. */
3162    if( a->shelf_date != b->shelf_date )
3163        return a->shelf_date > b->shelf_date ? -1 : 1;
3164
3165    return 0;
3166}
3167
3168static int
3169getMaxAtomCount( const tr_torrent * tor )
3170{
3171    /* FIXME: this curve should be smoother... */
3172    const int n = tor->maxConnectedPeers;
3173    if( n >= 200 ) return n * 1.5;
3174    if( n >= 100 ) return n * 2;
3175    if( n >=  50 ) return n * 3;
3176    if( n >=  20 ) return n * 5;
3177    return n * 10;
3178}
3179
3180static void
3181atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3182{
3183    tr_torrent * tor = NULL;
3184    tr_peerMgr * mgr = vmgr;
3185    managerLock( mgr );
3186
3187    while(( tor = tr_torrentNext( mgr->session, tor )))
3188    {
3189        int atomCount;
3190        Torrent * t = tor->torrentPeers;
3191        const int maxAtomCount = getMaxAtomCount( tor );
3192        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3193
3194        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3195        {
3196            int i;
3197            int keepCount = 0;
3198            int testCount = 0;
3199            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3200            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3201
3202            /* keep the ones that are in use */
3203            for( i=0; i<atomCount; ++i ) {
3204                struct peer_atom * atom = atoms[i];
3205                if( peerIsInUse( t, atom ) )
3206                    keep[keepCount++] = atom;
3207                else
3208                    test[testCount++] = atom;
3209            }
3210
3211            /* if there's room, keep the best of what's left */
3212            i = 0;
3213            if( keepCount < maxAtomCount ) {
3214                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3215                while( i<testCount && keepCount<maxAtomCount )
3216                    keep[keepCount++] = test[i++];
3217            }
3218
3219            /* free the culled atoms */
3220            while( i<testCount )
3221                tr_free( test[i++] );
3222
3223            /* rebuild Torrent.pool with what's left */
3224            tr_ptrArrayDestruct( &t->pool, NULL );
3225            t->pool = TR_PTR_ARRAY_INIT;
3226            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3227            for( i=0; i<keepCount; ++i )
3228                tr_ptrArrayAppend( &t->pool, keep[i] );
3229
3230            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3231
3232            /* cleanup */
3233            tr_free( test );
3234            tr_free( keep );
3235        }
3236    }
3237
3238    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3239    managerUnlock( mgr );
3240}
Note: See TracBrowser for help on using the repository browser.