source: trunk/libtransmission/peer-mgr.c @ 9793

Last change on this file since 9793 was 9793, checked in by charles, 12 years ago

(trunk) #2548 "T's request queue can send out too many duplicate requests" -- (1) fix r9465 implementation bug that caused some peers to get starved of requests if they rejected a request or choked, then unchoked us. (2) increase a block request's TTL by 15 seconds to reduce cancel/req cycles between two or more blocks (3) add a debug mode to the GTK+ client's peer tab to watch the pending requests counts both up & down

  • Property svn:keywords set to Date Rev Author Id
File size: 92.1 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 9793 2009-12-16 18:20:01Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h> /* memcpy, memcmp, strstr */
16#include <stdlib.h> /* qsort */
17#include <limits.h> /* INT_MAX */
18
19#include <event.h>
20
21#include "transmission.h"
22#include "announcer.h"
23#include "bandwidth.h"
24#include "bencode.h"
25#include "blocklist.h"
26#include "clients.h"
27#include "completion.h"
28#include "crypto.h"
29#include "handshake.h"
30#include "inout.h" /* tr_ioTestPiece */
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "session.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to cull old atoms */
45    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
46
47    /* how frequently to change which peers are choked */
48    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
49
50    /* how frequently to reallocate bandwidth */
51    BANDWIDTH_PERIOD_MSEC = 500,
52
53    /* how frequently to age out old piece request lists */
54    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
55
56    /* how frequently to decide which peers live and die */
57    RECONNECT_PERIOD_MSEC = 500,
58
59    /* when many peers are available, keep idle ones this long */
60    MIN_UPLOAD_IDLE_SECS = ( 60 ),
61
62    /* when few peers are available, keep idle ones this long */
63    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
64
65    /* max # of peers to ask fer per torrent per reconnect pulse */
66    MAX_RECONNECTIONS_PER_PULSE = 8,
67
68    /* max number of peers to ask for per second overall.
69    * this throttle is to avoid overloading the router */
70    MAX_CONNECTIONS_PER_SECOND = 16,
71
72    /* number of bad pieces a peer is allowed to send before we ban them */
73    MAX_BAD_PIECES_PER_PEER = 5,
74
75    /* amount of time to keep a list of request pieces lying around
76       before it's considered too old and needs to be rebuilt */
77    PIECE_LIST_SHELF_LIFE_SECS = 60,
78
79    /* use for bitwise operations w/peer_atom.myflags */
80    MYFLAG_BANNED = 1,
81
82    /* use for bitwise operations w/peer_atom.myflags */
83    /* unreachable for now... but not banned.
84     * if they try to connect to us it's okay */
85    MYFLAG_UNREACHABLE = 2,
86
87    /* the minimum we'll wait before attempting to reconnect to a peer */
88    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
89
90    /** how long we'll let requests we've made linger before we cancel them */
91    REQUEST_TTL_SECS = 45
92};
93
94
95/**
96***
97**/
98
99enum
100{
101    UPLOAD_ONLY_UKNOWN,
102    UPLOAD_ONLY_YES,
103    UPLOAD_ONLY_NO
104};
105
106/**
107 * Peer information that should be kept even before we've connected and
108 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
109 * which ones would make good candidates for connecting to, and to watch out
110 * for banned peers.
111 *
112 * @see tr_peer
113 * @see tr_peermsgs
114 */
115struct peer_atom
116{
117    tr_peer   * peer;        /* will be NULL if not connected */
118    uint8_t     from;
119    uint8_t     flags;       /* these match the added_f flags */
120    uint8_t     myflags;     /* flags that aren't defined in added_f */
121    uint8_t     uploadOnly;  /* UPLOAD_ONLY_ */
122    tr_port     port;
123    uint16_t    numFails;
124    tr_address  addr;
125    time_t      time;        /* when the peer's connection status last changed */
126    time_t      piece_data_time;
127
128    /* similar to a TTL field, but less rigid --
129     * if the swarm is small, the atom will be kept past this date. */
130    time_t      shelf_date;
131};
132
133static tr_bool
134tr_isAtom( const struct peer_atom * atom )
135{
136    return ( atom != NULL )
137        && ( atom->from < TR_PEER_FROM__MAX )
138        && ( tr_isAddress( &atom->addr ) );
139}
140
141static const char*
142tr_atomAddrStr( const struct peer_atom * atom )
143{
144    return tr_peerIoAddrStr( &atom->addr, atom->port );
145}
146
147struct block_request
148{
149    tr_block_index_t block;
150    tr_peer * peer;
151    time_t sentAt;
152};
153
154struct weighted_piece
155{
156    tr_piece_index_t index;
157    int16_t salt;
158    int16_t requestCount;
159};
160
161typedef struct tr_torrent_peers
162{
163    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
164    tr_ptrArray                pool; /* struct peer_atom */
165    tr_ptrArray                peers; /* tr_peer */
166    tr_ptrArray                webseeds; /* tr_webseed */
167
168    tr_torrent               * tor;
169    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
170    struct tr_peerMgr        * manager;
171
172    tr_bool                    isRunning;
173    tr_bool                    needsCompletenessCheck;
174
175    struct block_request     * requests;
176    int                        requestsSort;
177    int                        requestCount;
178    int                        requestAlloc;
179
180    struct weighted_piece    * pieces;
181    int                        piecesSort;
182    int                        pieceCount;
183
184    tr_bool                    isInEndgame;
185}
186Torrent;
187
188struct tr_peerMgr
189{
190    tr_session    * session;
191    tr_ptrArray     incomingHandshakes; /* tr_handshake */
192    struct event  * bandwidthTimer;
193    struct event  * rechokeTimer;
194    struct event  * reconnectTimer;
195    struct event  * refillUpkeepTimer;
196    struct event  * atomTimer;
197};
198
199#define tordbg( t, ... ) \
200    do { \
201        if( tr_deepLoggingIsActive( ) ) \
202            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
203    } while( 0 )
204
205#define dbgmsg( ... ) \
206    do { \
207        if( tr_deepLoggingIsActive( ) ) \
208            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
209    } while( 0 )
210
211/**
212***
213**/
214
215static TR_INLINE void
216managerLock( const struct tr_peerMgr * manager )
217{
218    tr_globalLock( manager->session );
219}
220
221static TR_INLINE void
222managerUnlock( const struct tr_peerMgr * manager )
223{
224    tr_globalUnlock( manager->session );
225}
226
227static TR_INLINE void
228torrentLock( Torrent * torrent )
229{
230    managerLock( torrent->manager );
231}
232
233static TR_INLINE void
234torrentUnlock( Torrent * torrent )
235{
236    managerUnlock( torrent->manager );
237}
238
239static TR_INLINE int
240torrentIsLocked( const Torrent * t )
241{
242    return tr_globalIsLocked( t->manager->session );
243}
244
245/**
246***
247**/
248
249static int
250handshakeCompareToAddr( const void * va, const void * vb )
251{
252    const tr_handshake * a = va;
253
254    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
255}
256
257static int
258handshakeCompare( const void * a, const void * b )
259{
260    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
261}
262
263static tr_handshake*
264getExistingHandshake( tr_ptrArray      * handshakes,
265                      const tr_address * addr )
266{
267    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
268}
269
270static int
271comparePeerAtomToAddress( const void * va, const void * vb )
272{
273    const struct peer_atom * a = va;
274
275    return tr_compareAddresses( &a->addr, vb );
276}
277
278static int
279compareAtomsByAddress( const void * va, const void * vb )
280{
281    const struct peer_atom * b = vb;
282
283    assert( tr_isAtom( b ) );
284
285    return comparePeerAtomToAddress( va, &b->addr );
286}
287
288/**
289***
290**/
291
292const tr_address *
293tr_peerAddress( const tr_peer * peer )
294{
295    return &peer->atom->addr;
296}
297
298static Torrent*
299getExistingTorrent( tr_peerMgr *    manager,
300                    const uint8_t * hash )
301{
302    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
303
304    return tor == NULL ? NULL : tor->torrentPeers;
305}
306
307static int
308peerCompare( const void * a, const void * b )
309{
310    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
311}
312
313static struct peer_atom*
314getExistingAtom( const Torrent    * t,
315                 const tr_address * addr )
316{
317    Torrent * tt = (Torrent*)t;
318    assert( torrentIsLocked( t ) );
319    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
320}
321
322static tr_bool
323peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
324{
325    Torrent * t = (Torrent*) ct;
326
327    assert( torrentIsLocked ( t ) );
328
329    return ( atom->peer != NULL )
330        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
331        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
332}
333
334static tr_peer*
335peerConstructor( struct peer_atom * atom )
336{
337    tr_peer * peer = tr_new0( tr_peer, 1 );
338    tr_bitsetConstructor( &peer->have, 0 );
339    peer->atom = atom;
340    atom->peer = peer;
341    return peer;
342}
343
344static tr_peer*
345getPeer( Torrent * torrent, struct peer_atom * atom )
346{
347    tr_peer * peer;
348
349    assert( torrentIsLocked( torrent ) );
350
351    peer = atom->peer;
352
353    if( peer == NULL )
354    {
355        peer = peerConstructor( atom );
356        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
357    }
358
359    return peer;
360}
361
362static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
363
364static void
365peerDestructor( Torrent * t, tr_peer * peer )
366{
367    assert( peer != NULL );
368
369    peerDeclinedAllRequests( t, peer );
370
371    if( peer->msgs != NULL )
372    {
373        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
374        tr_peerMsgsFree( peer->msgs );
375    }
376
377    tr_peerIoClear( peer->io );
378    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
379
380    tr_bitsetDestructor( &peer->have );
381    tr_bitfieldFree( peer->blame );
382    tr_free( peer->client );
383    peer->atom->peer = NULL;
384
385    tr_free( peer );
386}
387
388static void
389removePeer( Torrent * t, tr_peer * peer )
390{
391    tr_peer * removed;
392    struct peer_atom * atom = peer->atom;
393
394    assert( torrentIsLocked( t ) );
395    assert( atom );
396
397    atom->time = tr_time( );
398
399    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
400    assert( removed == peer );
401    peerDestructor( t, removed );
402}
403
404static void
405removeAllPeers( Torrent * t )
406{
407    while( !tr_ptrArrayEmpty( &t->peers ) )
408        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
409}
410
411static void
412torrentDestructor( void * vt )
413{
414    Torrent * t = vt;
415
416    assert( t );
417    assert( !t->isRunning );
418    assert( torrentIsLocked( t ) );
419    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
420    assert( tr_ptrArrayEmpty( &t->peers ) );
421
422    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
423    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
424    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
425    tr_ptrArrayDestruct( &t->peers, NULL );
426
427    tr_free( t->requests );
428    tr_free( t->pieces );
429    tr_free( t );
430}
431
432static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
433
434static Torrent*
435torrentConstructor( tr_peerMgr * manager,
436                    tr_torrent * tor )
437{
438    int       i;
439    Torrent * t;
440
441    t = tr_new0( Torrent, 1 );
442    t->manager = manager;
443    t->tor = tor;
444    t->pool = TR_PTR_ARRAY_INIT;
445    t->peers = TR_PTR_ARRAY_INIT;
446    t->webseeds = TR_PTR_ARRAY_INIT;
447    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
448    t->requests = 0;
449
450    for( i = 0; i < tor->info.webseedCount; ++i )
451    {
452        tr_webseed * w =
453            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
454        tr_ptrArrayAppend( &t->webseeds, w );
455    }
456
457    return t;
458}
459
460tr_peerMgr*
461tr_peerMgrNew( tr_session * session )
462{
463    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
464    m->session = session;
465    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
466    return m;
467}
468
469static void
470deleteTimer( struct event ** t )
471{
472    if( *t != NULL )
473    {
474        evtimer_del( *t );
475        tr_free( *t );
476        *t = NULL;
477    }
478}
479
480static void
481deleteTimers( struct tr_peerMgr * m )
482{
483    deleteTimer( &m->atomTimer );
484    deleteTimer( &m->bandwidthTimer );
485    deleteTimer( &m->rechokeTimer );
486    deleteTimer( &m->reconnectTimer );
487    deleteTimer( &m->refillUpkeepTimer );
488}
489
490void
491tr_peerMgrFree( tr_peerMgr * manager )
492{
493    managerLock( manager );
494
495    deleteTimers( manager );
496
497    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
498     * the item from manager->handshakes, so this is a little roundabout... */
499    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
500        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
501
502    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
503
504    managerUnlock( manager );
505    tr_free( manager );
506}
507
508static int
509clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
510{
511    if( !tr_torrentHasMetadata( tor ) )
512        return TRUE;
513
514    return peer->clientIsInterested && !peer->clientIsChoked;
515}
516
517static int
518clientIsUploadingTo( const tr_peer * peer )
519{
520    return peer->peerIsInterested && !peer->peerIsChoked;
521}
522
523/***
524****
525***/
526
527tr_bool
528tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
529                      const tr_address  * addr )
530{
531    tr_bool isSeed = FALSE;
532    const Torrent * t = tor->torrentPeers;
533    const struct peer_atom * atom = getExistingAtom( t, addr );
534
535    if( atom )
536        isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
537
538    return isSeed;
539}
540
541/**
542***  REQUESTS
543***
544*** There are two data structures associated with managing block requests:
545***
546*** 1. Torrent::requests, an array of "struct block_request" which keeps
547***    track of which blocks have been requested, and when, and by which peers.
548***    This is list is used for (a) cancelling requests that have been pending
549***    for too long and (b) avoiding duplicate requests before endgame.
550***
551*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
552***    pieces that we want to request.  It's used to decide which pieces to
553***    return next when tr_peerMgrGetBlockRequests() is called.
554**/
555
556/**
557*** struct block_request
558**/
559
560enum
561{
562    REQ_UNSORTED,
563    REQ_SORTED_BY_BLOCK,
564    REQ_SORTED_BY_TIME
565};
566
567static int
568compareReqByBlock( const void * va, const void * vb )
569{
570    const struct block_request * a = va;
571    const struct block_request * b = vb;
572
573    /* primary key: block */
574    if( a->block < b->block ) return -1;
575    if( a->block > b->block ) return 1;
576
577    /* secondary key: peer */
578    if( a->peer < b->peer ) return -1;
579    if( a->peer > b->peer ) return 1;
580
581    return 0;
582}
583
584static int
585compareReqByTime( const void * va, const void * vb )
586{
587    const struct block_request * a = va;
588    const struct block_request * b = vb;
589
590    /* primary key: time */
591    if( a->sentAt < b->sentAt ) return -1;
592    if( a->sentAt > b->sentAt ) return 1;
593
594    /* secondary key: peer */
595    if( a->peer < b->peer ) return -1;
596    if( a->peer > b->peer ) return 1;
597
598    return 0;
599}
600
601static void
602requestListSort( Torrent * t, int mode )
603{
604    assert( mode==REQ_SORTED_BY_BLOCK || mode==REQ_SORTED_BY_TIME );
605
606    if( t->requestsSort != mode )
607    {
608        int(*compar)(const void *, const void *);
609
610        t->requestsSort = mode;
611
612        switch( mode ) {
613            case REQ_SORTED_BY_BLOCK: compar = compareReqByBlock; break;
614            case REQ_SORTED_BY_TIME: compar = compareReqByTime; break;
615            default: assert( 0 && "unhandled" );
616        }
617
618        qsort( t->requests, t->requestCount,
619               sizeof( struct block_request ), compar );
620    }
621}
622
623static void
624requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
625{
626    struct block_request key;
627
628    /* ensure enough room is available... */
629    if( t->requestCount + 1 >= t->requestAlloc )
630    {
631        const int CHUNK_SIZE = 128;
632        t->requestAlloc += CHUNK_SIZE;
633        t->requests = tr_renew( struct block_request,
634                                t->requests, t->requestAlloc );
635    }
636
637    /* populate the record we're inserting */
638    key.block = block;
639    key.peer = peer;
640    key.sentAt = tr_time( );
641
642    /* insert the request to our array... */
643    switch( t->requestsSort )
644    {
645        case REQ_UNSORTED:
646        case REQ_SORTED_BY_TIME:
647            t->requests[t->requestCount++] = key;
648            break;
649
650        case REQ_SORTED_BY_BLOCK: {
651            tr_bool exact;
652            const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
653                                           sizeof( struct block_request ),
654                                           compareReqByBlock, &exact );
655            assert( !exact );
656            memmove( t->requests + pos + 1,
657                     t->requests + pos,
658                     sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
659            t->requests[pos] = key;
660            break;
661        }
662    }
663
664    if( peer != NULL )
665    {
666        ++peer->pendingReqsToPeer;
667        assert( peer->pendingReqsToPeer >= 0 );
668    }
669
670    /*fprintf( stderr, "added request of block %lu from peer %s... "
671                       "there are now %d block\n",
672                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
673}
674
675static struct block_request *
676requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
677{
678    struct block_request key;
679    key.block = block;
680    key.peer = (tr_peer*) peer;
681
682    requestListSort( t, REQ_SORTED_BY_BLOCK );
683
684    return bsearch( &key, t->requests, t->requestCount,
685                    sizeof( struct block_request ),
686                    compareReqByBlock );
687}
688
689/* how many peers are we currently requesting this block from... */
690static int
691countBlockRequests( Torrent * t, tr_block_index_t block )
692{
693    tr_bool exact;
694    int i, n, pos;
695    struct block_request key;
696
697    requestListSort( t, REQ_SORTED_BY_BLOCK );
698    key.block = block;
699    key.peer = NULL;
700    pos = tr_lowerBound( &key, t->requests, t->requestCount,
701                         sizeof( struct block_request ),
702                         compareReqByBlock, &exact );
703
704    assert( !exact ); /* shouldn't have a request with .peer == NULL */
705
706    n = 0;
707    for( i=pos; i<t->requestCount; ++i ) {
708        if( t->requests[i].block == block )
709            ++n;
710        else
711            break;
712    }
713
714    return n;
715}
716
717static void
718requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
719{
720    const struct block_request * b = requestListLookup( t, block, peer );
721    if( b != NULL )
722    {
723        const int pos = b - t->requests;
724        assert( pos < t->requestCount );
725
726        if( b->peer != NULL )
727        {
728            --b->peer->pendingReqsToPeer;
729            assert( b->peer->pendingReqsToPeer >= 0 );
730        }
731
732        memmove( t->requests + pos,
733                 t->requests + pos + 1,
734                 sizeof( struct block_request ) * ( --t->requestCount - pos ) );
735        /*fprintf( stderr, "removing request of block %lu from peer %s... "
736                           "there are now %d block requests left\n",
737                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
738    }
739}
740
741/**
742*** struct weighted_piece
743**/
744
745enum
746{
747    PIECES_UNSORTED,
748    PIECES_SORTED_BY_INDEX,
749    PIECES_SORTED_BY_WEIGHT
750};
751
752const tr_torrent * weightTorrent;
753
754/* we try to create a "weight" s.t. high-priority pieces come before others,
755 * and that partially-complete pieces come before empty ones. */
756static int
757comparePieceByWeight( const void * va, const void * vb )
758{
759    const struct weighted_piece * a = va;
760    const struct weighted_piece * b = vb;
761    int ia, ib, missing, pending;
762    const tr_torrent * tor = weightTorrent;
763
764    /* primary key: weight */
765    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
766    pending = a->requestCount;
767    ia = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
768    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
769    pending = b->requestCount;
770    ib = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
771    if( ia < ib ) return -1;
772    if( ia > ib ) return 1;
773
774    /* secondary key: higher priorities go first */
775    ia = tor->info.pieces[a->index].priority;
776    ib = tor->info.pieces[b->index].priority;
777    if( ia > ib ) return -1;
778    if( ia < ib ) return 1;
779
780    /* tertiary key: random */
781    return a->salt - b->salt;
782}
783
784static int
785comparePieceByIndex( const void * va, const void * vb )
786{
787    const struct weighted_piece * a = va;
788    const struct weighted_piece * b = vb;
789    if( a->index < b->index ) return -1;
790    if( a->index > b->index ) return 1;
791    return 0;
792}
793
794static void
795pieceListSort( Torrent * t, int mode )
796{
797    int(*compar)(const void *, const void *);
798
799    assert( mode==PIECES_SORTED_BY_INDEX
800         || mode==PIECES_SORTED_BY_WEIGHT );
801
802    if( t->piecesSort != mode )
803    {
804        t->piecesSort = mode;
805
806        switch( mode ) {
807            case PIECES_SORTED_BY_WEIGHT: compar = comparePieceByWeight; break;
808            case PIECES_SORTED_BY_INDEX: compar = comparePieceByIndex; break;
809            default: assert( 0 && "unhandled" );  break;
810        }
811
812        weightTorrent = t->tor;
813        qsort( t->pieces, t->pieceCount,
814               sizeof( struct weighted_piece ), compar );
815    }
816
817    /* Also, as long as we've got the pieces sorted by weight,
818     * let's also update t.isInEndgame */
819    if( t->piecesSort == PIECES_SORTED_BY_WEIGHT )
820    {
821        tr_bool endgame = TRUE;
822
823        if( ( t->pieces != NULL ) && ( t->pieceCount > 0 ) )
824        {
825            const tr_completion * cp = &t->tor->completion;
826            const struct weighted_piece * p = t->pieces;
827            const int pending = p->requestCount;
828            const int missing = tr_cpMissingBlocksInPiece( cp, p->index );
829            endgame = pending >= missing;
830        }
831
832        t->isInEndgame = endgame;
833        /*if( t->isInEndgame ) fprintf( stderr, "ENDGAME reached\n" );*/
834    }
835}
836
837static struct weighted_piece *
838pieceListLookup( Torrent * t, tr_piece_index_t index )
839{
840    struct weighted_piece key;
841    key.index = index;
842
843    pieceListSort( t, PIECES_SORTED_BY_INDEX );
844
845    return bsearch( &key, t->pieces, t->pieceCount,
846                    sizeof( struct weighted_piece ),
847                    comparePieceByIndex );
848}
849
850static void
851pieceListRebuild( Torrent * t )
852{
853    if( !tr_torrentIsSeed( t->tor ) )
854    {
855        tr_piece_index_t i;
856        tr_piece_index_t * pool;
857        tr_piece_index_t poolCount = 0;
858        const tr_torrent * tor = t->tor;
859        const tr_info * inf = tr_torrentInfo( tor );
860        struct weighted_piece * pieces;
861        int pieceCount;
862
863        /* build the new list */
864        pool = tr_new( tr_piece_index_t, inf->pieceCount );
865        for( i=0; i<inf->pieceCount; ++i )
866            if( !inf->pieces[i].dnd )
867                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
868                    pool[poolCount++] = i;
869        pieceCount = poolCount;
870        pieces = tr_new0( struct weighted_piece, pieceCount );
871        for( i=0; i<poolCount; ++i ) {
872            struct weighted_piece * piece = pieces + i;
873            piece->index = pool[i];
874            piece->requestCount = 0;
875            piece->salt = tr_cryptoWeakRandInt( 255 );
876        }
877
878        /* if we already had a list of pieces, merge it into
879         * the new list so we don't lose its requestCounts */
880        if( t->pieces != NULL )
881        {
882            struct weighted_piece * o = t->pieces;
883            struct weighted_piece * oend = o + t->pieceCount;
884            struct weighted_piece * n = pieces;
885            struct weighted_piece * nend = n + pieceCount;
886
887            pieceListSort( t, PIECES_SORTED_BY_INDEX );
888
889            while( o!=oend && n!=nend ) {
890                if( o->index < n->index )
891                    ++o;
892                else if( o->index > n->index )
893                    ++n;
894                else
895                    *n++ = *o++;
896            }
897
898            tr_free( t->pieces );
899        }
900
901        t->pieces = pieces;
902        t->pieceCount = pieceCount;
903        t->piecesSort = PIECES_SORTED_BY_INDEX;
904
905        /* cleanup */
906        tr_free( pool );
907    }
908}
909
910static void
911pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
912{
913    struct weighted_piece * p = pieceListLookup( t, piece );
914
915    if( p != NULL )
916    {
917        const int pos = p - t->pieces;
918
919        memmove( t->pieces + pos,
920                 t->pieces + pos + 1,
921                 sizeof( struct weighted_piece ) * ( --t->pieceCount - pos ) );
922
923        if( t->pieceCount == 0 )
924        {
925            tr_free( t->pieces );
926            t->pieces = NULL;
927        }
928    }
929}
930
931static void
932pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
933{
934    struct weighted_piece * p;
935    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
936
937    if(( p = pieceListLookup( t, index )))
938        if( p->requestCount > 0 )
939            --p->requestCount;
940
941    /* note: this invalidates the weighted.piece.weight field,
942     * but that's OK since the call to pieceListLookup ensured
943     * that we were sorted by index anyway.. next time we resort
944     * by weight, pieceListSort() will update the weights */
945}
946
947/**
948***
949**/
950
951void
952tr_peerMgrRebuildRequests( tr_torrent * tor )
953{
954    assert( tr_isTorrent( tor ) );
955
956    pieceListRebuild( tor->torrentPeers );
957}
958
959void
960tr_peerMgrGetNextRequests( tr_torrent           * tor,
961                           tr_peer              * peer,
962                           int                    numwant,
963                           tr_block_index_t     * setme,
964                           int                  * numgot )
965{
966    int i;
967    int got;
968    Torrent * t;
969    struct weighted_piece * pieces;
970    const tr_bitset * have = &peer->have;
971
972    /* sanity clause */
973    assert( tr_isTorrent( tor ) );
974    assert( numwant > 0 );
975
976    /* walk through the pieces and find blocks that should be requested */
977    got = 0;
978    t = tor->torrentPeers;
979
980    /* prep the pieces list */
981    if( t->pieces == NULL )
982        pieceListRebuild( t );
983    pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
984
985    pieces = t->pieces;
986    for( i=0; i<t->pieceCount && got<numwant; ++i )
987    {
988        struct weighted_piece * p = pieces + i;
989        const int missing = tr_cpMissingBlocksInPiece( &tor->completion, p->index );
990        const int maxDuplicatesPerBlock = t->isInEndgame ? 3 : 1;
991
992        if( p->requestCount > ( missing * maxDuplicatesPerBlock ) )
993            continue;
994
995        /* if the peer has this piece that we want... */
996        if( tr_bitsetHasFast( have, p->index ) )
997        {
998            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
999            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
1000
1001            for( ; b!=e && got<numwant; ++b )
1002            {
1003                /* don't request blocks we've already got */
1004                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
1005                    continue;
1006
1007                /* don't send the same request to the same peer twice */
1008                if( tr_peerMgrDidPeerRequest( tor, peer, b ) )
1009                    continue;
1010
1011                /* don't send the same request to any peer too many times */
1012                if( countBlockRequests( t, b ) >= maxDuplicatesPerBlock )
1013                    continue;
1014
1015                /* update the caller's table */
1016                setme[got++] = b;
1017
1018                /* update our own tables */
1019                requestListAdd( t, b, peer );
1020                ++p->requestCount;
1021            }
1022        }
1023    }
1024
1025    /* In most cases we've just changed the weights of a small number of pieces.
1026     * So rather than qsort()ing the entire array, it's faster to sort only the
1027     * changed ones, then do a standard merge-two-sorted-arrays pass on the
1028     * changed and unchanged pieces. */
1029    if( got > 0 )
1030    {
1031        struct weighted_piece * p;
1032        struct weighted_piece * pieces;
1033        struct weighted_piece * a = t->pieces;
1034        struct weighted_piece * a_end = t->pieces + i;
1035        struct weighted_piece * b = a_end;
1036        struct weighted_piece * b_end = t->pieces + t->pieceCount;
1037
1038        /* resort the pieces that we changed */
1039        weightTorrent = t->tor;
1040        qsort( a, a_end-a, sizeof( struct weighted_piece ), comparePieceByWeight );
1041
1042        /* allocate a new array */
1043        p = pieces = tr_new( struct weighted_piece, t->pieceCount );
1044
1045        /* merge the two sorted arrays into this new array */
1046        weightTorrent = t->tor;
1047        while( a!=a_end && b!=b_end )
1048            *p++ = comparePieceByWeight( a, b ) < 0 ? *a++ : *b++;
1049        while( a!=a_end ) *p++ = *a++;
1050        while( b!=b_end ) *p++ = *b++;
1051
1052#if 0
1053        /* make sure we did it right */
1054        assert( p - pieces == t->pieceCount );
1055        for( it=pieces; it+1<p; ++it )
1056            assert( it->weight <= it[1].weight );
1057#endif
1058
1059        /* update */
1060        tr_free( t->pieces );
1061        t->pieces = pieces;
1062    }
1063
1064    *numgot = got;
1065}
1066
1067tr_bool
1068tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1069                          const tr_peer     * peer,
1070                          tr_block_index_t    block )
1071{
1072    const Torrent * t = tor->torrentPeers;
1073    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1074}
1075
1076/* cancel requests that are too old */
1077static void
1078refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1079{
1080    time_t now;
1081    time_t too_old;
1082    tr_torrent * tor;
1083    tr_peerMgr * mgr = vmgr;
1084    managerLock( mgr );
1085
1086    now = tr_time( );
1087    too_old = now - REQUEST_TTL_SECS;
1088
1089    tor = NULL;
1090    while(( tor = tr_torrentNext( mgr->session, tor )))
1091    {
1092        Torrent * t = tor->torrentPeers;
1093        const int n = t->requestCount;
1094        if( n > 0 )
1095        {
1096            int keepCount = 0;
1097            int cancelCount = 0;
1098            struct block_request * keep = tr_new( struct block_request, n );
1099            struct block_request * cancel = tr_new( struct block_request, n );
1100            const struct block_request * it;
1101            const struct block_request * end;
1102
1103            for( it=t->requests, end=it+n; it!=end; ++it )
1104                if( it->sentAt <= too_old )
1105                    cancel[cancelCount++] = *it;
1106                else
1107                    keep[keepCount++] = *it;
1108
1109            /* prune out the ones we aren't keeping */
1110            tr_free( t->requests );
1111            t->requests = keep;
1112            t->requestCount = keepCount;
1113            t->requestAlloc = n;
1114
1115            /* send cancel messages for all the "cancel" ones */
1116            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1117                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) )
1118                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1119
1120            /* decrement the pending request counts for the timed-out blocks */
1121            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1122                pieceListRemoveRequest( t, it->block );
1123
1124            /* cleanup loop */
1125            tr_free( cancel );
1126        }
1127    }
1128
1129    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1130    managerUnlock( mgr );
1131}
1132
1133static void
1134addStrike( Torrent * t, tr_peer * peer )
1135{
1136    tordbg( t, "increasing peer %s strike count to %d",
1137            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1138
1139    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1140    {
1141        struct peer_atom * atom = peer->atom;
1142        atom->myflags |= MYFLAG_BANNED;
1143        peer->doPurge = 1;
1144        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1145    }
1146}
1147
1148static void
1149gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1150{
1151    tr_torrent *   tor = t->tor;
1152    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1153
1154    tor->corruptCur += byteCount;
1155    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1156}
1157
1158static void
1159peerSuggestedPiece( Torrent            * t UNUSED,
1160                    tr_peer            * peer UNUSED,
1161                    tr_piece_index_t     pieceIndex UNUSED,
1162                    int                  isFastAllowed UNUSED )
1163{
1164#if 0
1165    assert( t );
1166    assert( peer );
1167    assert( peer->msgs );
1168
1169    /* is this a valid piece? */
1170    if(  pieceIndex >= t->tor->info.pieceCount )
1171        return;
1172
1173    /* don't ask for it if we've already got it */
1174    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1175        return;
1176
1177    /* don't ask for it if they don't have it */
1178    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1179        return;
1180
1181    /* don't ask for it if we're choked and it's not fast */
1182    if( !isFastAllowed && peer->clientIsChoked )
1183        return;
1184
1185    /* request the blocks that we don't have in this piece */
1186    {
1187        tr_block_index_t block;
1188        const tr_torrent * tor = t->tor;
1189        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
1190        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
1191
1192        for( block=start; block<end; ++block )
1193        {
1194            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1195            {
1196                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1197                const uint32_t length = tr_torBlockCountBytes( tor, block );
1198                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1199                incrementPieceRequests( t, pieceIndex );
1200            }
1201        }
1202    }
1203#endif
1204}
1205
1206static void
1207decrementDownloadedCount( tr_torrent * tor, uint32_t byteCount )
1208{
1209    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1210}
1211
1212static void
1213clientGotUnwantedBlock( tr_torrent * tor, tr_block_index_t block )
1214{
1215    decrementDownloadedCount( tor, tr_torBlockCountBytes( tor, block ) );
1216}
1217
1218static void
1219removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1220{
1221    requestListRemove( t, block, peer );
1222    pieceListRemoveRequest( t, block );
1223}
1224
1225/* peer choked us, or maybe it disconnected.
1226   either way we need to remove all its requests */
1227static void
1228peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1229{
1230    int i, n;
1231    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1232
1233    for( i=n=0; i<t->requestCount; ++i )
1234        if( peer == t->requests[i].peer )
1235            blocks[n++] = t->requests[i].block;
1236
1237    for( i=0; i<n; ++i )
1238        removeRequestFromTables( t, blocks[i], peer );
1239
1240    tr_free( blocks );
1241}
1242
1243static void
1244peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1245{
1246    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1247    Torrent * t = vt;
1248    const tr_peer_event * e = vevent;
1249
1250    torrentLock( t );
1251
1252    switch( e->eventType )
1253    {
1254        case TR_PEER_UPLOAD_ONLY:
1255            /* update our atom */
1256            if( peer ) {
1257                if( e->uploadOnly ) {
1258                    peer->atom->uploadOnly = UPLOAD_ONLY_YES;
1259                    peer->atom->flags |= ADDED_F_SEED_FLAG;
1260                } else {
1261                    peer->atom->uploadOnly = UPLOAD_ONLY_NO;
1262                    peer->atom->flags &= ~ADDED_F_SEED_FLAG;
1263                }
1264            }
1265            break;
1266
1267        case TR_PEER_PEER_GOT_DATA:
1268        {
1269            const time_t now = tr_time( );
1270            tr_torrent * tor = t->tor;
1271
1272            tr_torrentSetActivityDate( tor, now );
1273
1274            if( e->wasPieceData ) {
1275                tor->uploadedCur += e->length;
1276                tr_torrentSetDirty( tor );
1277            }
1278
1279            /* update the stats */
1280            if( e->wasPieceData )
1281                tr_statsAddUploaded( tor->session, e->length );
1282
1283            /* update our atom */
1284            if( peer && e->wasPieceData )
1285                peer->atom->piece_data_time = now;
1286
1287            tor->needsSeedRatioCheck = TRUE;
1288
1289            break;
1290        }
1291
1292        case TR_PEER_CLIENT_GOT_REJ:
1293            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ), peer );
1294            break;
1295
1296        case TR_PEER_CLIENT_GOT_CHOKE:
1297            peerDeclinedAllRequests( t, peer );
1298            break;
1299
1300        case TR_PEER_CLIENT_GOT_PORT:
1301            if( peer )
1302                peer->atom->port = e->port;
1303            break;
1304
1305        case TR_PEER_CLIENT_GOT_SUGGEST:
1306            if( peer )
1307                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1308            break;
1309
1310        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1311            if( peer )
1312                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1313            break;
1314
1315        case TR_PEER_CLIENT_GOT_DATA:
1316        {
1317            const time_t now = tr_time( );
1318            tr_torrent * tor = t->tor;
1319
1320            tr_torrentSetActivityDate( tor, now );
1321
1322            /* only add this to downloadedCur if we got it from a peer --
1323             * webseeds shouldn't count against our ratio.  As one tracker
1324             * admin put it, "Those pieces are downloaded directly from the
1325             * content distributor, not the peers, it is the tracker's job
1326             * to manage the swarms, not the web server and does not fit
1327             * into the jurisdiction of the tracker." */
1328            if( peer && e->wasPieceData ) {
1329                tor->downloadedCur += e->length;
1330                tr_torrentSetDirty( tor );
1331            }
1332
1333            /* update the stats */
1334            if( e->wasPieceData )
1335                tr_statsAddDownloaded( tor->session, e->length );
1336
1337            /* update our atom */
1338            if( peer && e->wasPieceData )
1339                peer->atom->piece_data_time = now;
1340
1341            break;
1342        }
1343
1344        case TR_PEER_PEER_PROGRESS:
1345        {
1346            if( peer )
1347            {
1348                struct peer_atom * atom = peer->atom;
1349                if( e->progress >= 1.0 ) {
1350                    tordbg( t, "marking peer %s as a seed",
1351                            tr_atomAddrStr( atom ) );
1352                    atom->flags |= ADDED_F_SEED_FLAG;
1353                }
1354            }
1355            break;
1356        }
1357
1358        case TR_PEER_CLIENT_GOT_BLOCK:
1359        {
1360            tr_torrent * tor = t->tor;
1361            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1362
1363            requestListRemove( t, block, peer );
1364            pieceListRemoveRequest( t, block );
1365
1366            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1367            {
1368                tordbg( t, "we have this block already..." );
1369                clientGotUnwantedBlock( tor, block );
1370            }
1371            else
1372            {
1373                tr_cpBlockAdd( &tor->completion, block );
1374                tr_torrentSetDirty( tor );
1375
1376                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1377                {
1378                    const tr_piece_index_t p = e->pieceIndex;
1379                    const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
1380
1381                    if( !ok )
1382                    {
1383                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1384                                   (unsigned long)p );
1385                    }
1386
1387                    tr_torrentSetHasPiece( tor, p, ok );
1388                    tr_torrentSetPieceChecked( tor, p, TRUE );
1389                    tr_peerMgrSetBlame( tor, p, ok );
1390
1391                    if( !ok )
1392                    {
1393                        gotBadPiece( t, p );
1394                    }
1395                    else
1396                    {
1397                        int i;
1398                        int peerCount;
1399                        tr_peer ** peers;
1400                        tr_file_index_t fileIndex;
1401
1402                        peerCount = tr_ptrArraySize( &t->peers );
1403                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1404                        for( i=0; i<peerCount; ++i )
1405                            tr_peerMsgsHave( peers[i]->msgs, p );
1406
1407                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1408                            const tr_file * file = &tor->info.files[fileIndex];
1409                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) )
1410                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1411                                    tr_torrentFileCompleted( tor, fileIndex );
1412                        }
1413
1414                        pieceListRemovePiece( t, p );
1415                    }
1416                }
1417
1418                t->needsCompletenessCheck = TRUE;
1419            }
1420            break;
1421        }
1422
1423        case TR_PEER_ERROR:
1424            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1425            {
1426                /* some protocol error from the peer */
1427                peer->doPurge = 1;
1428                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1429                        tr_atomAddrStr( peer->atom ) );
1430            }
1431            else
1432            {
1433                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1434            }
1435            break;
1436
1437        default:
1438            assert( 0 );
1439    }
1440
1441    torrentUnlock( t );
1442}
1443
1444static int
1445getDefaultShelfLife( uint8_t from )
1446{
1447    /* in general, peers obtained from firsthand contact
1448     * are better than those from secondhand, etc etc */
1449    switch( from )
1450    {
1451        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1452        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1453        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1454        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1455        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1456        case TR_PEER_FROM_RESUME   : return 60 * 60;
1457        default                    : return 60 * 60;
1458    }
1459}
1460
1461
1462static void
1463ensureAtomExists( Torrent           * t,
1464                  const tr_address  * addr,
1465                  const tr_port       port,
1466                  const uint8_t       flags,
1467                  const uint8_t       from )
1468{
1469    assert( tr_isAddress( addr ) );
1470    assert( from < TR_PEER_FROM__MAX );
1471
1472    if( getExistingAtom( t, addr ) == NULL )
1473    {
1474        struct peer_atom * a;
1475        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1476
1477        a = tr_new0( struct peer_atom, 1 );
1478        a->addr = *addr;
1479        a->port = port;
1480        a->flags = flags;
1481        a->from = from;
1482        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1483        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1484
1485        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1486    }
1487}
1488
1489static int
1490getMaxPeerCount( const tr_torrent * tor )
1491{
1492    return tor->maxConnectedPeers;
1493}
1494
1495static int
1496getPeerCount( const Torrent * t )
1497{
1498    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1499}
1500
1501/* FIXME: this is kind of a mess. */
1502static tr_bool
1503myHandshakeDoneCB( tr_handshake  * handshake,
1504                   tr_peerIo     * io,
1505                   tr_bool         isConnected,
1506                   const uint8_t * peer_id,
1507                   void          * vmanager )
1508{
1509    tr_bool            ok = isConnected;
1510    tr_bool            success = FALSE;
1511    tr_port            port;
1512    const tr_address * addr;
1513    tr_peerMgr       * manager = vmanager;
1514    Torrent          * t;
1515    tr_handshake     * ours;
1516
1517    assert( io );
1518    assert( tr_isBool( ok ) );
1519
1520    t = tr_peerIoHasTorrentHash( io )
1521        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1522        : NULL;
1523
1524    if( tr_peerIoIsIncoming ( io ) )
1525        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1526                                        handshake, handshakeCompare );
1527    else if( t )
1528        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1529                                        handshake, handshakeCompare );
1530    else
1531        ours = handshake;
1532
1533    assert( ours );
1534    assert( ours == handshake );
1535
1536    if( t )
1537        torrentLock( t );
1538
1539    addr = tr_peerIoGetAddress( io, &port );
1540
1541    if( !ok || !t || !t->isRunning )
1542    {
1543        if( t )
1544        {
1545            struct peer_atom * atom = getExistingAtom( t, addr );
1546            if( atom )
1547                ++atom->numFails;
1548        }
1549    }
1550    else /* looking good */
1551    {
1552        struct peer_atom * atom;
1553
1554        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
1555        atom = getExistingAtom( t, addr );
1556        atom->time = tr_time( );
1557        atom->piece_data_time = 0;
1558
1559        if( atom->myflags & MYFLAG_BANNED )
1560        {
1561            tordbg( t, "banned peer %s tried to reconnect",
1562                    tr_atomAddrStr( atom ) );
1563        }
1564        else if( tr_peerIoIsIncoming( io )
1565               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1566
1567        {
1568        }
1569        else
1570        {
1571            tr_peer * peer = atom->peer;
1572
1573            if( peer ) /* we already have this peer */
1574            {
1575            }
1576            else
1577            {
1578                peer = getPeer( t, atom );
1579                tr_free( peer->client );
1580
1581                if( !peer_id )
1582                    peer->client = NULL;
1583                else {
1584                    char client[128];
1585                    tr_clientForId( client, sizeof( client ), peer_id );
1586                    peer->client = tr_strdup( client );
1587                }
1588
1589                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1590                                                                balanced by our unref in peerDestructor()  */
1591                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1592                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1593
1594                success = TRUE;
1595            }
1596        }
1597    }
1598
1599    if( t )
1600        torrentUnlock( t );
1601
1602    return success;
1603}
1604
1605void
1606tr_peerMgrAddIncoming( tr_peerMgr * manager,
1607                       tr_address * addr,
1608                       tr_port      port,
1609                       int          socket )
1610{
1611    tr_session * session;
1612
1613    managerLock( manager );
1614
1615    assert( tr_isSession( manager->session ) );
1616    session = manager->session;
1617
1618    if( tr_sessionIsAddressBlocked( session, addr ) )
1619    {
1620        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1621        tr_netClose( session, socket );
1622    }
1623    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1624    {
1625        tr_netClose( session, socket );
1626    }
1627    else /* we don't have a connection to them yet... */
1628    {
1629        tr_peerIo *    io;
1630        tr_handshake * handshake;
1631
1632        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket );
1633
1634        handshake = tr_handshakeNew( io,
1635                                     session->encryptionMode,
1636                                     myHandshakeDoneCB,
1637                                     manager );
1638
1639        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1640
1641        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1642                                 handshakeCompare );
1643    }
1644
1645    managerUnlock( manager );
1646}
1647
1648static tr_bool
1649tr_isPex( const tr_pex * pex )
1650{
1651    return pex && tr_isAddress( &pex->addr );
1652}
1653
1654void
1655tr_peerMgrAddPex( tr_torrent   *  tor,
1656                  uint8_t         from,
1657                  const tr_pex *  pex )
1658{
1659    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1660    {
1661        Torrent * t = tor->torrentPeers;
1662        managerLock( t->manager );
1663
1664        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1665            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1666                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
1667
1668        managerUnlock( t->manager );
1669    }
1670}
1671
1672tr_pex *
1673tr_peerMgrCompactToPex( const void *    compact,
1674                        size_t          compactLen,
1675                        const uint8_t * added_f,
1676                        size_t          added_f_len,
1677                        size_t *        pexCount )
1678{
1679    size_t          i;
1680    size_t          n = compactLen / 6;
1681    const uint8_t * walk = compact;
1682    tr_pex *        pex = tr_new0( tr_pex, n );
1683
1684    for( i = 0; i < n; ++i )
1685    {
1686        pex[i].addr.type = TR_AF_INET;
1687        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1688        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1689        if( added_f && ( n == added_f_len ) )
1690            pex[i].flags = added_f[i];
1691    }
1692
1693    *pexCount = n;
1694    return pex;
1695}
1696
1697tr_pex *
1698tr_peerMgrCompact6ToPex( const void    * compact,
1699                         size_t          compactLen,
1700                         const uint8_t * added_f,
1701                         size_t          added_f_len,
1702                         size_t        * pexCount )
1703{
1704    size_t          i;
1705    size_t          n = compactLen / 18;
1706    const uint8_t * walk = compact;
1707    tr_pex *        pex = tr_new0( tr_pex, n );
1708
1709    for( i = 0; i < n; ++i )
1710    {
1711        pex[i].addr.type = TR_AF_INET6;
1712        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1713        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1714        if( added_f && ( n == added_f_len ) )
1715            pex[i].flags = added_f[i];
1716    }
1717
1718    *pexCount = n;
1719    return pex;
1720}
1721
1722tr_pex *
1723tr_peerMgrArrayToPex( const void * array,
1724                      size_t       arrayLen,
1725                      size_t      * pexCount )
1726{
1727    size_t          i;
1728    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1729    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1730    const uint8_t * walk = array;
1731    tr_pex        * pex = tr_new0( tr_pex, n );
1732
1733    for( i = 0 ; i < n ; i++ ) {
1734        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1735        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1736        pex[i].flags = 0x00;
1737        walk += sizeof( tr_address ) + 2;
1738    }
1739
1740    *pexCount = n;
1741    return pex;
1742}
1743
1744/**
1745***
1746**/
1747
1748void
1749tr_peerMgrSetBlame( tr_torrent     * tor,
1750                    tr_piece_index_t pieceIndex,
1751                    int              success )
1752{
1753    if( !success )
1754    {
1755        int        peerCount, i;
1756        Torrent *  t = tor->torrentPeers;
1757        tr_peer ** peers;
1758
1759        assert( torrentIsLocked( t ) );
1760
1761        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1762        for( i = 0; i < peerCount; ++i )
1763        {
1764            tr_peer * peer = peers[i];
1765            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1766            {
1767                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1768                        tr_atomAddrStr( peer->atom ),
1769                        pieceIndex, (int)peer->strikes + 1 );
1770                addStrike( t, peer );
1771            }
1772        }
1773    }
1774}
1775
1776int
1777tr_pexCompare( const void * va, const void * vb )
1778{
1779    const tr_pex * a = va;
1780    const tr_pex * b = vb;
1781    int i;
1782
1783    assert( tr_isPex( a ) );
1784    assert( tr_isPex( b ) );
1785
1786    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1787        return i;
1788
1789    if( a->port != b->port )
1790        return a->port < b->port ? -1 : 1;
1791
1792    return 0;
1793}
1794
1795#if 0
1796static int
1797peerPrefersCrypto( const tr_peer * peer )
1798{
1799    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1800        return TRUE;
1801
1802    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1803        return FALSE;
1804
1805    return tr_peerIoIsEncrypted( peer->io );
1806}
1807#endif
1808
1809/* better goes first */
1810static int
1811compareAtomsByUsefulness( const void * va, const void *vb )
1812{
1813    const struct peer_atom * a = * (const struct peer_atom**) va;
1814    const struct peer_atom * b = * (const struct peer_atom**) vb;
1815
1816    assert( tr_isAtom( a ) );
1817    assert( tr_isAtom( b ) );
1818
1819    if( a->piece_data_time != b->piece_data_time )
1820        return a->piece_data_time > b->piece_data_time ? -1 : 1;
1821    if( a->from != b->from )
1822        return a->from < b->from ? -1 : 1;
1823    if( a->numFails != b->numFails )
1824        return a->numFails < b->numFails ? -1 : 1;
1825
1826    return 0;
1827}
1828
1829int
1830tr_peerMgrGetPeers( tr_torrent   * tor,
1831                    tr_pex      ** setme_pex,
1832                    uint8_t        af,
1833                    uint8_t        list_mode,
1834                    int            maxCount )
1835{
1836    int i;
1837    int n;
1838    int count = 0;
1839    int atomCount = 0;
1840    const Torrent * t = tor->torrentPeers;
1841    struct peer_atom ** atoms = NULL;
1842    tr_pex * pex;
1843    tr_pex * walk;
1844
1845    assert( tr_isTorrent( tor ) );
1846    assert( setme_pex != NULL );
1847    assert( af==TR_AF_INET || af==TR_AF_INET6 );
1848    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
1849
1850    managerLock( t->manager );
1851
1852    /**
1853    ***  build a list of atoms
1854    **/
1855
1856    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
1857    {
1858        int i;
1859        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1860        atomCount = tr_ptrArraySize( &t->peers );
1861        atoms = tr_new( struct peer_atom *, atomCount );
1862        for( i=0; i<atomCount; ++i )
1863            atoms[i] = peers[i]->atom;
1864    }
1865    else /* TR_PEERS_ALL */
1866    {
1867        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
1868        atomCount = tr_ptrArraySize( &t->pool );
1869        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
1870    }
1871
1872    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
1873
1874    /**
1875    ***  add the first N of them into our return list
1876    **/
1877
1878    n = MIN( atomCount, maxCount );
1879    pex = walk = tr_new0( tr_pex, n );
1880
1881    for( i=0; i<atomCount && count<n; ++i )
1882    {
1883        const struct peer_atom * atom = atoms[i];
1884        if( atom->addr.type == af )
1885        {
1886            assert( tr_isAddress( &atom->addr ) );
1887            walk->addr = atom->addr;
1888            walk->port = atom->port;
1889            walk->flags = atom->flags;
1890            ++count;
1891            ++walk;
1892        }
1893    }
1894
1895    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
1896
1897    assert( ( walk - pex ) == count );
1898    *setme_pex = pex;
1899
1900    /* cleanup */
1901    tr_free( atoms );
1902    managerUnlock( t->manager );
1903    return count;
1904}
1905
1906static void atomPulse      ( int, short, void * );
1907static void bandwidthPulse ( int, short, void * );
1908static void rechokePulse   ( int, short, void * );
1909static void reconnectPulse ( int, short, void * );
1910
1911static struct event *
1912createTimer( int msec, void (*callback)(int, short, void *), void * cbdata )
1913{
1914    struct event * timer = tr_new0( struct event, 1 );
1915    evtimer_set( timer, callback, cbdata );
1916    tr_timerAddMsec( timer, msec );
1917    return timer;
1918}
1919
1920static void
1921ensureMgrTimersExist( struct tr_peerMgr * m )
1922{
1923    if( m->atomTimer == NULL )
1924        m->atomTimer = createTimer( ATOM_PERIOD_MSEC, atomPulse, m );
1925
1926    if( m->bandwidthTimer == NULL )
1927        m->bandwidthTimer = createTimer( BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
1928
1929    if( m->rechokeTimer == NULL )
1930        m->rechokeTimer = createTimer( RECHOKE_PERIOD_MSEC, rechokePulse, m );
1931
1932    if( m->reconnectTimer == NULL )
1933        m->reconnectTimer = createTimer( RECONNECT_PERIOD_MSEC, reconnectPulse, m );
1934
1935    if( m->refillUpkeepTimer == NULL )
1936        m->refillUpkeepTimer = createTimer( REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
1937}
1938
1939void
1940tr_peerMgrStartTorrent( tr_torrent * tor )
1941{
1942    Torrent * t = tor->torrentPeers;
1943
1944    assert( t != NULL );
1945    managerLock( t->manager );
1946    ensureMgrTimersExist( t->manager );
1947
1948    t->isRunning = TRUE;
1949
1950    rechokePulse( 0, 0, t->manager );
1951    managerUnlock( t->manager );
1952}
1953
1954static void
1955stopTorrent( Torrent * t )
1956{
1957    int i, n;
1958
1959    assert( torrentIsLocked( t ) );
1960
1961    t->isRunning = FALSE;
1962
1963    /* disconnect the peers. */
1964    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
1965        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
1966    tr_ptrArrayClear( &t->peers );
1967
1968    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
1969     * which removes the handshake from t->outgoingHandshakes... */
1970    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
1971        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
1972}
1973
1974void
1975tr_peerMgrStopTorrent( tr_torrent * tor )
1976{
1977    Torrent * t = tor->torrentPeers;
1978
1979    managerLock( t->manager );
1980
1981    stopTorrent( t );
1982
1983    managerUnlock( t->manager );
1984}
1985
1986void
1987tr_peerMgrAddTorrent( tr_peerMgr * manager,
1988                      tr_torrent * tor )
1989{
1990    managerLock( manager );
1991
1992    assert( tor );
1993    assert( tor->torrentPeers == NULL );
1994
1995    tor->torrentPeers = torrentConstructor( manager, tor );
1996
1997    managerUnlock( manager );
1998}
1999
2000void
2001tr_peerMgrRemoveTorrent( tr_torrent * tor )
2002{
2003    tr_torrentLock( tor );
2004
2005    stopTorrent( tor->torrentPeers );
2006    torrentDestructor( tor->torrentPeers );
2007
2008    tr_torrentUnlock( tor );
2009}
2010
2011void
2012tr_peerMgrTorrentAvailability( const tr_torrent * tor,
2013                               int8_t           * tab,
2014                               unsigned int       tabCount )
2015{
2016    tr_piece_index_t   i;
2017    const Torrent *    t;
2018    float              interval;
2019    tr_bool            isSeed;
2020    int                peerCount;
2021    const tr_peer **   peers;
2022    tr_torrentLock( tor );
2023
2024    t = tor->torrentPeers;
2025    tor = t->tor;
2026    interval = tor->info.pieceCount / (float)tabCount;
2027    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
2028    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2029    peerCount = tr_ptrArraySize( &t->peers );
2030
2031    memset( tab, 0, tabCount );
2032
2033    for( i = 0; tor && i < tabCount; ++i )
2034    {
2035        const int piece = i * interval;
2036
2037        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2038            tab[i] = -1;
2039        else if( peerCount ) {
2040            int j;
2041            for( j = 0; j < peerCount; ++j )
2042                if( tr_bitsetHas( &peers[j]->have, i ) )
2043                    ++tab[i];
2044        }
2045    }
2046
2047    tr_torrentUnlock( tor );
2048}
2049
2050/* Returns the pieces that are available from peers */
2051tr_bitfield*
2052tr_peerMgrGetAvailable( const tr_torrent * tor )
2053{
2054    int i;
2055    int peerCount;
2056    Torrent * t = tor->torrentPeers;
2057    const tr_peer ** peers;
2058    tr_bitfield * pieces;
2059    managerLock( t->manager );
2060
2061    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
2062    peerCount = tr_ptrArraySize( &t->peers );
2063    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2064    for( i=0; i<peerCount; ++i )
2065        tr_bitsetOr( pieces, &peers[i]->have );
2066
2067    managerUnlock( t->manager );
2068    return pieces;
2069}
2070
2071void
2072tr_peerMgrTorrentStats( tr_torrent       * tor,
2073                        int              * setmePeersKnown,
2074                        int              * setmePeersConnected,
2075                        int              * setmeSeedsConnected,
2076                        int              * setmeWebseedsSendingToUs,
2077                        int              * setmePeersSendingToUs,
2078                        int              * setmePeersGettingFromUs,
2079                        int              * setmePeersFrom )
2080{
2081    int i, size;
2082    const Torrent * t = tor->torrentPeers;
2083    const tr_peer ** peers;
2084    const tr_webseed ** webseeds;
2085
2086    managerLock( t->manager );
2087
2088    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2089    size = tr_ptrArraySize( &t->peers );
2090
2091    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
2092    *setmePeersConnected       = 0;
2093    *setmeSeedsConnected       = 0;
2094    *setmePeersGettingFromUs   = 0;
2095    *setmePeersSendingToUs     = 0;
2096    *setmeWebseedsSendingToUs  = 0;
2097
2098    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2099        setmePeersFrom[i] = 0;
2100
2101    for( i=0; i<size; ++i )
2102    {
2103        const tr_peer * peer = peers[i];
2104        const struct peer_atom * atom = peer->atom;
2105
2106        if( peer->io == NULL ) /* not connected */
2107            continue;
2108
2109        ++*setmePeersConnected;
2110
2111        ++setmePeersFrom[atom->from];
2112
2113        if( clientIsDownloadingFrom( tor, peer ) )
2114            ++*setmePeersSendingToUs;
2115
2116        if( clientIsUploadingTo( peer ) )
2117            ++*setmePeersGettingFromUs;
2118
2119        if( atom->flags & ADDED_F_SEED_FLAG )
2120            ++*setmeSeedsConnected;
2121    }
2122
2123    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2124    size = tr_ptrArraySize( &t->webseeds );
2125    for( i=0; i<size; ++i )
2126        if( tr_webseedIsActive( webseeds[i] ) )
2127            ++*setmeWebseedsSendingToUs;
2128
2129    managerUnlock( t->manager );
2130}
2131
2132float
2133tr_peerMgrGetWebseedSpeed( const tr_torrent * tor, uint64_t now )
2134{
2135    int i;
2136    float tmp;
2137    float ret = 0;
2138
2139    const Torrent * t = tor->torrentPeers;
2140    const int n = tr_ptrArraySize( &t->webseeds );
2141    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2142
2143    for( i=0; i<n; ++i )
2144        if( tr_webseedGetSpeed( webseeds[i], now, &tmp ) )
2145            ret += tmp;
2146
2147    return ret;
2148}
2149
2150
2151float*
2152tr_peerMgrWebSpeeds( const tr_torrent * tor )
2153{
2154    const Torrent * t = tor->torrentPeers;
2155    const tr_webseed ** webseeds;
2156    int i;
2157    int webseedCount;
2158    float * ret;
2159    uint64_t now;
2160
2161    assert( t->manager );
2162    managerLock( t->manager );
2163
2164    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2165    webseedCount = tr_ptrArraySize( &t->webseeds );
2166    assert( webseedCount == tor->info.webseedCount );
2167    ret = tr_new0( float, webseedCount );
2168    now = tr_date( );
2169
2170    for( i=0; i<webseedCount; ++i )
2171        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
2172            ret[i] = -1.0;
2173
2174    managerUnlock( t->manager );
2175    return ret;
2176}
2177
2178double
2179tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
2180{
2181    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
2182}
2183
2184
2185struct tr_peer_stat *
2186tr_peerMgrPeerStats( const tr_torrent    * tor,
2187                     int                 * setmeCount )
2188{
2189    int i, size;
2190    const Torrent * t = tor->torrentPeers;
2191    const tr_peer ** peers;
2192    tr_peer_stat * ret;
2193    uint64_t now;
2194
2195    assert( t->manager );
2196    managerLock( t->manager );
2197
2198    size = tr_ptrArraySize( &t->peers );
2199    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2200    ret = tr_new0( tr_peer_stat, size );
2201    now = tr_date( );
2202
2203    for( i=0; i<size; ++i )
2204    {
2205        char *                   pch;
2206        const tr_peer *          peer = peers[i];
2207        const struct peer_atom * atom = peer->atom;
2208        tr_peer_stat *           stat = ret + i;
2209
2210        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2211        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2212                   sizeof( stat->client ) );
2213        stat->port                = ntohs( peer->atom->port );
2214        stat->from                = atom->from;
2215        stat->progress            = peer->progress;
2216        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2217        stat->rateToPeer          = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
2218        stat->rateToClient        = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
2219        stat->peerIsChoked        = peer->peerIsChoked;
2220        stat->peerIsInterested    = peer->peerIsInterested;
2221        stat->clientIsChoked      = peer->clientIsChoked;
2222        stat->clientIsInterested  = peer->clientIsInterested;
2223        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2224        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2225        stat->isUploadingTo       = clientIsUploadingTo( peer );
2226        stat->isSeed              = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
2227        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2228        stat->pendingReqsToClient = peer->pendingReqsToClient;
2229
2230        pch = stat->flagStr;
2231        if( t->optimistic == peer ) *pch++ = 'O';
2232        if( stat->isDownloadingFrom ) *pch++ = 'D';
2233        else if( stat->clientIsInterested ) *pch++ = 'd';
2234        if( stat->isUploadingTo ) *pch++ = 'U';
2235        else if( stat->peerIsInterested ) *pch++ = 'u';
2236        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2237        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2238        if( stat->isEncrypted ) *pch++ = 'E';
2239        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2240        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2241        if( stat->isIncoming ) *pch++ = 'I';
2242        *pch = '\0';
2243    }
2244
2245    *setmeCount = size;
2246
2247    managerUnlock( t->manager );
2248    return ret;
2249}
2250
2251/**
2252***
2253**/
2254
2255struct ChokeData
2256{
2257    tr_bool         doUnchoke;
2258    tr_bool         isInterested;
2259    tr_bool         isChoked;
2260    int             rate;
2261    tr_peer *       peer;
2262};
2263
2264static int
2265compareChoke( const void * va,
2266              const void * vb )
2267{
2268    const struct ChokeData * a = va;
2269    const struct ChokeData * b = vb;
2270
2271    if( a->rate != b->rate ) /* prefer higher overall speeds */
2272        return a->rate > b->rate ? -1 : 1;
2273
2274    if( a->isChoked != b->isChoked ) /* prefer unchoked */
2275        return a->isChoked ? 1 : -1;
2276
2277    return 0;
2278}
2279
2280static int
2281isNew( const tr_peer * peer )
2282{
2283    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2284}
2285
2286static int
2287isSame( const tr_peer * peer )
2288{
2289    return peer && peer->client && strstr( peer->client, "Transmission" );
2290}
2291
2292/**
2293***
2294**/
2295
2296static void
2297rechokeTorrent( Torrent * t, const uint64_t now )
2298{
2299    int i, size, unchokedInterested;
2300    const int peerCount = tr_ptrArraySize( &t->peers );
2301    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2302    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2303    const tr_session * session = t->manager->session;
2304    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
2305
2306    assert( torrentIsLocked( t ) );
2307
2308    /* sort the peers by preference and rate */
2309    for( i = 0, size = 0; i < peerCount; ++i )
2310    {
2311        tr_peer * peer = peers[i];
2312        struct peer_atom * atom = peer->atom;
2313
2314        if( peer->progress >= 1.0 ) /* choke all seeds */
2315        {
2316            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2317        }
2318        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2319        {
2320            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2321        }
2322        else if( chokeAll ) /* choke everyone if we're not uploading */
2323        {
2324            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2325        }
2326        else
2327        {
2328            struct ChokeData * n = &choke[size++];
2329            n->peer         = peer;
2330            n->isInterested = peer->peerIsInterested;
2331            n->isChoked     = peer->peerIsChoked;
2332            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
2333        }
2334    }
2335
2336    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
2337
2338    /**
2339     * Reciprocation and number of uploads capping is managed by unchoking
2340     * the N peers which have the best upload rate and are interested.
2341     * This maximizes the client's download rate. These N peers are
2342     * referred to as downloaders, because they are interested in downloading
2343     * from the client.
2344     *
2345     * Peers which have a better upload rate (as compared to the downloaders)
2346     * but aren't interested get unchoked. If they become interested, the
2347     * downloader with the worst upload rate gets choked. If a client has
2348     * a complete file, it uses its upload rate rather than its download
2349     * rate to decide which peers to unchoke.
2350     */
2351    unchokedInterested = 0;
2352    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
2353        choke[i].doUnchoke = 1;
2354        if( choke[i].isInterested )
2355            ++unchokedInterested;
2356    }
2357
2358    /* optimistic unchoke */
2359    if( i < size )
2360    {
2361        int n;
2362        struct ChokeData * c;
2363        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2364
2365        for( ; i<size; ++i )
2366        {
2367            if( choke[i].isInterested )
2368            {
2369                const tr_peer * peer = choke[i].peer;
2370                int x = 1, y;
2371                if( isNew( peer ) ) x *= 3;
2372                if( isSame( peer ) ) x *= 3;
2373                for( y=0; y<x; ++y )
2374                    tr_ptrArrayAppend( &randPool, &choke[i] );
2375            }
2376        }
2377
2378        if(( n = tr_ptrArraySize( &randPool )))
2379        {
2380            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2381            c->doUnchoke = 1;
2382            t->optimistic = c->peer;
2383        }
2384
2385        tr_ptrArrayDestruct( &randPool, NULL );
2386    }
2387
2388    for( i=0; i<size; ++i )
2389        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2390
2391    /* cleanup */
2392    tr_free( choke );
2393}
2394
2395static void
2396rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
2397{
2398    uint64_t now;
2399    tr_torrent * tor = NULL;
2400    tr_peerMgr * mgr = vmgr;
2401    managerLock( mgr );
2402
2403    now = tr_date( );
2404    while(( tor = tr_torrentNext( mgr->session, tor )))
2405        if( tor->isRunning )
2406            rechokeTorrent( tor->torrentPeers, now );
2407
2408    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
2409    managerUnlock( mgr );
2410}
2411
2412/***
2413****
2414****  Life and Death
2415****
2416***/
2417
2418typedef enum
2419{
2420    TR_CAN_KEEP,
2421    TR_CAN_CLOSE,
2422    TR_MUST_CLOSE,
2423}
2424tr_close_type_t;
2425
2426static tr_close_type_t
2427shouldPeerBeClosed( const Torrent    * t,
2428                    const tr_peer    * peer,
2429                    int                peerCount,
2430                    const time_t       now )
2431{
2432    const tr_torrent *       tor = t->tor;
2433    const struct peer_atom * atom = peer->atom;
2434
2435    /* if it's marked for purging, close it */
2436    if( peer->doPurge )
2437    {
2438        tordbg( t, "purging peer %s because its doPurge flag is set",
2439                tr_atomAddrStr( atom ) );
2440        return TR_MUST_CLOSE;
2441    }
2442
2443    /* if we're seeding and the peer has everything we have,
2444     * and enough time has passed for a pex exchange, then disconnect */
2445    if( tr_torrentIsSeed( tor ) )
2446    {
2447        int peerHasEverything;
2448        if( atom->flags & ADDED_F_SEED_FLAG )
2449            peerHasEverything = TRUE;
2450        else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
2451            peerHasEverything = FALSE;
2452        else {
2453            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2454            tr_bitsetDifference( tmp, &peer->have );
2455            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2456            tr_bitfieldFree( tmp );
2457        }
2458
2459        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2460        {
2461            tordbg( t, "purging peer %s because we're both seeds",
2462                    tr_atomAddrStr( atom ) );
2463            return TR_MUST_CLOSE;
2464        }
2465    }
2466
2467    /* disconnect if it's been too long since piece data has been transferred.
2468     * this is on a sliding scale based on number of available peers... */
2469    {
2470        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2471        /* if we have >= relaxIfFewerThan, strictness is 100%.
2472         * if we have zero connections, strictness is 0% */
2473        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2474                               ? 1.0
2475                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2476        const int lo = MIN_UPLOAD_IDLE_SECS;
2477        const int hi = MAX_UPLOAD_IDLE_SECS;
2478        const int limit = hi - ( ( hi - lo ) * strictness );
2479        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2480/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2481        if( idleTime > limit ) {
2482            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2483                       tr_atomAddrStr( atom ), idleTime );
2484            return TR_CAN_CLOSE;
2485        }
2486    }
2487
2488    return TR_CAN_KEEP;
2489}
2490
2491static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
2492
2493static tr_peer **
2494getPeersToClose( Torrent * t, tr_close_type_t closeType, const time_t now, int * setmeSize )
2495{
2496    int i, peerCount, outsize;
2497    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2498    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2499
2500    assert( torrentIsLocked( t ) );
2501
2502    for( i = outsize = 0; i < peerCount; ++i )
2503        if( shouldPeerBeClosed( t, peers[i], peerCount, now ) == closeType )
2504            ret[outsize++] = peers[i];
2505
2506    sortPeersByLivelinessReverse ( ret, NULL, outsize, tr_date( ) );
2507
2508    *setmeSize = outsize;
2509    return ret;
2510}
2511
2512static int
2513compareCandidates( const void * va, const void * vb )
2514{
2515    const struct peer_atom * a = *(const struct peer_atom**) va;
2516    const struct peer_atom * b = *(const struct peer_atom**) vb;
2517
2518    /* <Charles> Here we would probably want to try reconnecting to
2519     * peers that had most recently given us data. Lots of users have
2520     * trouble with resets due to their routers and/or ISPs. This way we
2521     * can quickly recover from an unwanted reset. So we sort
2522     * piece_data_time in descending order.
2523     */
2524
2525    if( a->piece_data_time != b->piece_data_time )
2526        return a->piece_data_time < b->piece_data_time ? 1 : -1;
2527
2528    if( a->numFails != b->numFails )
2529        return a->numFails < b->numFails ? -1 : 1;
2530
2531    if( a->time != b->time )
2532        return a->time < b->time ? -1 : 1;
2533
2534    /* In order to avoid fragmenting the swarm, peers from trackers and
2535     * from the DHT should be preferred to peers from PEX. */
2536    if( a->from != b->from )
2537        return a->from < b->from ? -1 : 1;
2538
2539    return 0;
2540}
2541
2542static int
2543getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
2544{
2545    int sec;
2546
2547    /* if we were recently connected to this peer and transferring piece
2548     * data, try to reconnect to them sooner rather that later -- we don't
2549     * want network troubles to get in the way of a good peer. */
2550    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2551        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2552
2553    /* don't allow reconnects more often than our minimum */
2554    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2555        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2556
2557    /* otherwise, the interval depends on how many times we've tried
2558     * and failed to connect to the peer */
2559    else switch( atom->numFails ) {
2560        case 0: sec = 0; break;
2561        case 1: sec = 5; break;
2562        case 2: sec = 2 * 60; break;
2563        case 3: sec = 15 * 60; break;
2564        case 4: sec = 30 * 60; break;
2565        case 5: sec = 60 * 60; break;
2566        default: sec = 120 * 60; break;
2567    }
2568
2569    return sec;
2570}
2571
2572static struct peer_atom **
2573getPeerCandidates( Torrent * t, const time_t now, int * setmeSize )
2574{
2575    int                 i, atomCount, retCount;
2576    struct peer_atom ** atoms;
2577    struct peer_atom ** ret;
2578    const int           seed = tr_torrentIsSeed( t->tor );
2579
2580    assert( torrentIsLocked( t ) );
2581
2582    atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
2583    ret = tr_new( struct peer_atom*, atomCount );
2584    for( i = retCount = 0; i < atomCount; ++i )
2585    {
2586        int                interval;
2587        struct peer_atom * atom = atoms[i];
2588
2589        /* peer fed us too much bad data ... we only keep it around
2590         * now to weed it out in case someone sends it to us via pex */
2591        if( atom->myflags & MYFLAG_BANNED )
2592            continue;
2593
2594        /* peer was unconnectable before, so we're not going to keep trying.
2595         * this is needs a separate flag from `banned', since if they try
2596         * to connect to us later, we'll let them in */
2597        if( atom->myflags & MYFLAG_UNREACHABLE )
2598            continue;
2599
2600        /* no need to connect if we're both seeds... */
2601        if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
2602                      ( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
2603            continue;
2604
2605        /* don't reconnect too often */
2606        interval = getReconnectIntervalSecs( atom, now );
2607        if( ( now - atom->time ) < interval )
2608        {
2609            tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
2610                    i, tr_atomAddrStr( atom ), interval );
2611            continue;
2612        }
2613
2614        /* Don't connect to peers in our blocklist */
2615        if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
2616            continue;
2617
2618        /* we don't need two connections to the same peer... */
2619        if( peerIsInUse( t, atom ) )
2620            continue;
2621
2622        ret[retCount++] = atom;
2623    }
2624
2625    if( retCount != 0 )
2626        qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
2627    *setmeSize = retCount;
2628    return ret;
2629}
2630
2631static void
2632closePeer( Torrent * t, tr_peer * peer )
2633{
2634    struct peer_atom * atom;
2635
2636    assert( t != NULL );
2637    assert( peer != NULL );
2638
2639    atom = peer->atom;
2640
2641    /* if we transferred piece data, then they might be good peers,
2642       so reset their `numFails' weight to zero.  otherwise we connected
2643       to them fruitlessly, so mark it as another fail */
2644    if( atom->piece_data_time )
2645        atom->numFails = 0;
2646    else
2647        ++atom->numFails;
2648
2649    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2650    removePeer( t, peer );
2651}
2652
2653static void
2654reconnectTorrent( Torrent * t )
2655{
2656    static time_t prevTime = 0;
2657    static int    newConnectionsThisSecond = 0;
2658    const time_t  now = tr_time( );
2659
2660    if( prevTime != now )
2661    {
2662        prevTime = now;
2663        newConnectionsThisSecond = 0;
2664    }
2665
2666    if( !t->isRunning )
2667    {
2668        removeAllPeers( t );
2669    }
2670    else
2671    {
2672        int i;
2673        int mustCloseCount;
2674        int maxCandidates;
2675        struct tr_peer ** mustClose;
2676
2677        /* disconnect the really bad peers */
2678        mustClose = getPeersToClose( t, TR_MUST_CLOSE, now, &mustCloseCount );
2679        for( i=0; i<mustCloseCount; ++i )
2680            closePeer( t, mustClose[i] );
2681        tr_free( mustClose );
2682
2683        /* decide how many peers can we try to add in this pass */
2684        maxCandidates = MAX_RECONNECTIONS_PER_PULSE;
2685        if( tr_announcerHasBacklog( t->manager->session->announcer ) )
2686            maxCandidates /= 2;
2687        maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
2688        maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
2689
2690        /* select the best candidates, if they are requested */
2691        if( maxCandidates == 0 )
2692        {
2693            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
2694                       "NO connection candidates needed, %d atoms, "
2695                       "max per pulse is %d",
2696                       t->tor->info.name, mustCloseCount,
2697                       tr_ptrArraySize( &t->pool ),
2698                       MAX_RECONNECTIONS_PER_PULSE );
2699
2700            tordbg( t, "maxCandidates is %d, MAX_RECONNECTIONS_PER_PULSE is %d, "
2701                       "getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2702                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2703                       maxCandidates, MAX_RECONNECTIONS_PER_PULSE,
2704                       getPeerCount( t ), getMaxPeerCount( t->tor ),
2705                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2706        }
2707        else
2708        {
2709            int canCloseCount = 0;
2710            int candidateCount;
2711            struct peer_atom ** candidates;
2712
2713            candidates = getPeerCandidates( t, now, &candidateCount );
2714            maxCandidates = MIN( maxCandidates, candidateCount );
2715
2716            /* maybe disconnect some lesser peers, if we have candidates to replace them with */
2717            if( maxCandidates != 0 )
2718            {
2719                struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, now, &canCloseCount );
2720                for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
2721                   closePeer( t, canClose[i] );
2722                tr_free( canClose );
2723            }
2724
2725            tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
2726                       "%d can-close connections, %d connection candidates, "
2727                       "%d atoms, max per pulse is %d",
2728                       t->tor->info.name, mustCloseCount,
2729                       canCloseCount, candidateCount,
2730                       tr_ptrArraySize( &t->pool ), MAX_RECONNECTIONS_PER_PULSE );
2731
2732            tordbg( t, "candidateCount is %d, MAX_RECONNECTIONS_PER_PULSE is %d,"
2733                       " getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
2734                       "newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
2735                       candidateCount, MAX_RECONNECTIONS_PER_PULSE,
2736                       getPeerCount( t ), getMaxPeerCount( t->tor ),
2737                       newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
2738
2739            /* add some new ones */
2740            for( i=0; i<maxCandidates; ++i )
2741            {
2742                tr_peerMgr        * mgr = t->manager;
2743                struct peer_atom  * atom = candidates[i];
2744                tr_peerIo         * io;
2745
2746                tordbg( t, "Starting an OUTGOING connection with %s",
2747                        tr_atomAddrStr( atom ) );
2748
2749                io = tr_peerIoNewOutgoing( mgr->session,
2750                                           mgr->session->bandwidth,
2751                                           &atom->addr,
2752                                           atom->port,
2753                                           t->tor->info.hash,
2754                                           t->tor->completeness == TR_SEED );
2755
2756                if( io == NULL )
2757                {
2758                    tordbg( t, "peerIo not created; marking peer %s as unreachable",
2759                            tr_atomAddrStr( atom ) );
2760                    atom->myflags |= MYFLAG_UNREACHABLE;
2761                }
2762                else
2763                {
2764                    tr_handshake * handshake = tr_handshakeNew( io,
2765                                                                mgr->session->encryptionMode,
2766                                                                myHandshakeDoneCB,
2767                                                                mgr );
2768
2769                    assert( tr_peerIoGetTorrentHash( io ) );
2770
2771                    tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
2772
2773                    ++newConnectionsThisSecond;
2774
2775                    tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
2776                                             handshakeCompare );
2777                }
2778
2779                atom->time = now;
2780            }
2781            tr_free( candidates );
2782        }
2783    }
2784}
2785
2786struct peer_liveliness
2787{
2788    tr_peer * peer;
2789    void * clientData;
2790    time_t pieceDataTime;
2791    time_t time;
2792    int speed;
2793    tr_bool doPurge;
2794};
2795
2796static int
2797comparePeerLiveliness( const void * va, const void * vb )
2798{
2799    const struct peer_liveliness * a = va;
2800    const struct peer_liveliness * b = vb;
2801
2802    if( a->doPurge != b->doPurge )
2803        return a->doPurge ? 1 : -1;
2804
2805    if( a->speed != b->speed ) /* faster goes first */
2806        return a->speed > b->speed ? -1 : 1;
2807
2808    /* the one to give us data more recently goes first */
2809    if( a->pieceDataTime != b->pieceDataTime )
2810        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
2811
2812    /* the one we connected to most recently goes first */
2813    if( a->time != b->time )
2814        return a->time > b->time ? -1 : 1;
2815
2816    return 0;
2817}
2818
2819static int
2820comparePeerLivelinessReverse( const void * va, const void * vb )
2821{
2822    return -comparePeerLiveliness (va, vb);
2823}
2824
2825static void
2826sortPeersByLivelinessImpl( tr_peer  ** peers,
2827                           void     ** clientData,
2828                           int         n,
2829                           uint64_t    now,
2830                           int (*compare) ( const void *va, const void *vb ) )
2831{
2832    int i;
2833    struct peer_liveliness *lives, *l;
2834
2835    /* build a sortable array of peer + extra info */
2836    lives = l = tr_new0( struct peer_liveliness, n );
2837    for( i=0; i<n; ++i, ++l )
2838    {
2839        tr_peer * p = peers[i];
2840        l->peer = p;
2841        l->doPurge = p->doPurge;
2842        l->pieceDataTime = p->atom->piece_data_time;
2843        l->time = p->atom->time;
2844        l->speed = 1024.0 * (   tr_peerGetPieceSpeed( p, now, TR_UP )
2845                              + tr_peerGetPieceSpeed( p, now, TR_DOWN ) );
2846        if( clientData )
2847            l->clientData = clientData[i];
2848    }
2849
2850    /* sort 'em */
2851    assert( n == ( l - lives ) );
2852    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
2853
2854    /* build the peer array */
2855    for( i=0, l=lives; i<n; ++i, ++l ) {
2856        peers[i] = l->peer;
2857        if( clientData )
2858            clientData[i] = l->clientData;
2859    }
2860    assert( n == ( l - lives ) );
2861
2862    /* cleanup */
2863    tr_free( lives );
2864}
2865
2866static void
2867sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2868{
2869    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
2870}
2871
2872static void
2873sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2874{
2875    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
2876}
2877
2878
2879static void
2880enforceTorrentPeerLimit( Torrent * t, uint64_t now )
2881{
2882    int n = tr_ptrArraySize( &t->peers );
2883    const int max = tr_torrentGetPeerLimit( t->tor );
2884    if( n > max )
2885    {
2886        void * base = tr_ptrArrayBase( &t->peers );
2887        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
2888        sortPeersByLiveliness( peers, NULL, n, now );
2889        while( n > max )
2890            closePeer( t, peers[--n] );
2891        tr_free( peers );
2892    }
2893}
2894
2895static void
2896enforceSessionPeerLimit( tr_session * session, uint64_t now )
2897{
2898    int n = 0;
2899    tr_torrent * tor = NULL;
2900    const int max = tr_sessionGetPeerLimit( session );
2901
2902    /* count the total number of peers */
2903    while(( tor = tr_torrentNext( session, tor )))
2904        n += tr_ptrArraySize( &tor->torrentPeers->peers );
2905
2906    /* if there are too many, prune out the worst */
2907    if( n > max )
2908    {
2909        tr_peer ** peers = tr_new( tr_peer*, n );
2910        Torrent ** torrents = tr_new( Torrent*, n );
2911
2912        /* populate the peer array */
2913        n = 0;
2914        tor = NULL;
2915        while(( tor = tr_torrentNext( session, tor ))) {
2916            int i;
2917            Torrent * t = tor->torrentPeers;
2918            const int tn = tr_ptrArraySize( &t->peers );
2919            for( i=0; i<tn; ++i, ++n ) {
2920                peers[n] = tr_ptrArrayNth( &t->peers, i );
2921                torrents[n] = t;
2922            }
2923        }
2924
2925        /* sort 'em */
2926        sortPeersByLiveliness( peers, (void**)torrents, n, now );
2927
2928        /* cull out the crappiest */
2929        while( n-- > max )
2930            closePeer( torrents[n], peers[n] );
2931
2932        /* cleanup */
2933        tr_free( torrents );
2934        tr_free( peers );
2935    }
2936}
2937
2938struct reconnectTorrentStruct
2939{
2940    tr_torrent * torrent;
2941    int salt;
2942};
2943
2944static int
2945compareReconnectTorrents( const void * va, const void * vb )
2946{
2947    int ai, bi;
2948    const struct reconnectTorrentStruct * a = va;
2949    const struct reconnectTorrentStruct * b = vb;
2950
2951    /* primary key: higher priority goes first */
2952    ai = tr_torrentGetPriority( a->torrent );
2953    bi = tr_torrentGetPriority( b->torrent );
2954    if( ai != bi )
2955        return ai > bi ? -1 : 1;
2956
2957    /* secondary key: since users tend to stare at the screens
2958     * watching their downloads' progress, give downloads a
2959     * first shot at attempting outbound peer connections. */
2960    ai = tr_torrentIsSeed( a->torrent );
2961    bi = tr_torrentIsSeed( b->torrent );
2962    if( ai != bi )
2963        return bi ? -1 : 1;
2964
2965    /* tertiary key: random */
2966    if( a->salt != b->salt )
2967        return a->salt - b->salt;
2968
2969    return 0;
2970}
2971   
2972static void
2973reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
2974{
2975    tr_torrent * tor;
2976    tr_peerMgr * mgr = vmgr;
2977    struct reconnectTorrentStruct * torrents;
2978    int torrentCount;
2979    int i;
2980    uint64_t now;
2981    managerLock( mgr );
2982
2983    now = tr_date( );
2984
2985    /**
2986    ***  enforce the per-session and per-torrent peer limits
2987    **/
2988
2989    /* if we're over the per-torrent peer limits, cull some peers */
2990    tor = NULL;
2991    while(( tor = tr_torrentNext( mgr->session, tor )))
2992        if( tor->isRunning )
2993            enforceTorrentPeerLimit( tor->torrentPeers, now );
2994
2995    /* if we're over the per-session peer limits, cull some peers */
2996    enforceSessionPeerLimit( mgr->session, now );
2997
2998    /**
2999    ***  try to make new peer connections
3000    **/
3001
3002    torrentCount = 0;
3003    torrents = tr_new( struct reconnectTorrentStruct,
3004                       mgr->session->torrentCount );
3005    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3006        if( tor->isRunning ) {
3007            struct reconnectTorrentStruct * r = torrents + torrentCount++;
3008            r->torrent = tor;
3009            r->salt = tr_cryptoWeakRandInt( 1024 );
3010        }
3011    }
3012    qsort( torrents,
3013           torrentCount, sizeof( struct reconnectTorrentStruct ),
3014           compareReconnectTorrents );
3015    for( i=0; i<torrentCount; ++i )
3016        reconnectTorrent( torrents[i].torrent->torrentPeers );
3017
3018
3019    /* cleanup */
3020    tr_free( torrents );
3021    tr_timerAddMsec( mgr->reconnectTimer, RECONNECT_PERIOD_MSEC );
3022    managerUnlock( mgr );
3023}
3024
3025/****
3026*****
3027*****  BANDWIDTH ALLOCATION
3028*****
3029****/
3030
3031static void
3032pumpAllPeers( tr_peerMgr * mgr )
3033{
3034    tr_torrent * tor = NULL;
3035
3036    while(( tor = tr_torrentNext( mgr->session, tor )))
3037    {
3038        int j;
3039        Torrent * t = tor->torrentPeers;
3040
3041        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3042        {
3043            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3044            tr_peerMsgsPulse( peer->msgs );
3045        }
3046    }
3047}
3048
3049static void
3050bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3051{
3052    tr_torrent * tor = NULL;
3053    tr_peerMgr * mgr = vmgr;
3054    managerLock( mgr );
3055
3056    /* FIXME: this next line probably isn't necessary... */
3057    pumpAllPeers( mgr );
3058
3059    /* allocate bandwidth to the peers */
3060    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3061    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3062
3063    /* possibly stop torrents that have seeded enough */
3064    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3065        if( tor->needsSeedRatioCheck ) {
3066            tor->needsSeedRatioCheck = FALSE;
3067            tr_torrentCheckSeedRatio( tor );
3068        }
3069    }
3070
3071    /* run the completeness check for any torrents that need it */
3072    tor = NULL;
3073    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3074        if( tor->torrentPeers->needsCompletenessCheck ) {
3075            tor->torrentPeers->needsCompletenessCheck  = FALSE;
3076            tr_torrentRecheckCompleteness( tor );
3077        }
3078    }
3079
3080    /* possibly stop torrents that have an error */
3081    tor = NULL;
3082    while(( tor = tr_torrentNext( mgr->session, tor )))
3083        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR ))
3084            tr_torrentStop( tor );
3085
3086    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3087    managerUnlock( mgr );
3088}
3089
3090/***
3091****
3092***/
3093
3094static int
3095compareAtomPtrsByAddress( const void * va, const void *vb )
3096{
3097    const struct peer_atom * a = * (const struct peer_atom**) va;
3098    const struct peer_atom * b = * (const struct peer_atom**) vb;
3099
3100    assert( tr_isAtom( a ) );
3101    assert( tr_isAtom( b ) );
3102
3103    return tr_compareAddresses( &a->addr, &b->addr );
3104}
3105
3106/* best come first, worst go last */
3107static int
3108compareAtomPtrsByShelfDate( const void * va, const void *vb )
3109{
3110    time_t atime;
3111    time_t btime;
3112    const struct peer_atom * a = * (const struct peer_atom**) va;
3113    const struct peer_atom * b = * (const struct peer_atom**) vb;
3114    const int data_time_cutoff_secs = 60 * 60;
3115    const time_t tr_now = tr_time( );
3116
3117    assert( tr_isAtom( a ) );
3118    assert( tr_isAtom( b ) );
3119
3120    /* primary key: the last piece data time *if* it was within the last hour */
3121    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3122    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3123    if( atime != btime )
3124        return atime > btime ? -1 : 1;
3125
3126    /* secondary key: shelf date. */
3127    if( a->shelf_date != b->shelf_date )
3128        return a->shelf_date > b->shelf_date ? -1 : 1;
3129
3130    return 0;
3131}
3132
3133static int
3134getMaxAtomCount( const tr_torrent * tor )
3135{
3136    /* FIXME: this curve should be smoother... */
3137    const int n = tor->maxConnectedPeers;
3138    if( n >= 200 ) return n * 1.5;
3139    if( n >= 100 ) return n * 2;
3140    if( n >=  50 ) return n * 3;
3141    if( n >=  20 ) return n * 5;
3142    return n * 10;
3143}
3144
3145static void
3146atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3147{
3148    tr_torrent * tor = NULL;
3149    tr_peerMgr * mgr = vmgr;
3150    managerLock( mgr );
3151
3152    while(( tor = tr_torrentNext( mgr->session, tor )))
3153    {
3154        int atomCount;
3155        Torrent * t = tor->torrentPeers;
3156        const int maxAtomCount = getMaxAtomCount( tor );
3157        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3158
3159        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3160        {
3161            int i;
3162            int keepCount = 0;
3163            int testCount = 0;
3164            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3165            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3166
3167            /* keep the ones that are in use */
3168            for( i=0; i<atomCount; ++i ) {
3169                struct peer_atom * atom = atoms[i];
3170                if( peerIsInUse( t, atom ) )
3171                    keep[keepCount++] = atom;
3172                else
3173                    test[testCount++] = atom;
3174            }
3175
3176            /* if there's room, keep the best of what's left */
3177            i = 0;
3178            if( keepCount < maxAtomCount ) {
3179                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3180                while( i<testCount && keepCount<maxAtomCount )
3181                    keep[keepCount++] = test[i++];
3182            }
3183
3184            /* free the culled atoms */
3185            while( i<testCount )
3186                tr_free( test[i++] );
3187
3188            /* rebuild Torrent.pool with what's left */
3189            tr_ptrArrayDestruct( &t->pool, NULL );
3190            t->pool = TR_PTR_ARRAY_INIT;
3191            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3192            for( i=0; i<keepCount; ++i )
3193                tr_ptrArrayAppend( &t->pool, keep[i] );
3194
3195            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3196
3197            /* cleanup */
3198            tr_free( test );
3199            tr_free( keep );
3200        }
3201    }
3202
3203    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3204    managerUnlock( mgr );
3205}
Note: See TracBrowser for help on using the repository browser.