source: trunk/libtransmission/peer-mgr.c @ 10746

Last change on this file since 10746 was 10746, checked in by livings124, 12 years ago

trivial improvement to compareSeedProbabilities()

  • Property svn:keywords set to Date Rev Author Id
File size: 100.0 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 10746 2010-06-13 20:54:03Z livings124 $
11 */
12
13#include <assert.h>
14#include <string.h> /* memcpy, memcmp, strstr */
15#include <stdlib.h> /* qsort */
16
17#include <event.h>
18
19#include "transmission.h"
20#include "announcer.h"
21#include "bandwidth.h"
22#include "bencode.h"
23#include "blocklist.h"
24#include "clients.h"
25#include "completion.h"
26#include "crypto.h"
27#include "handshake.h"
28#include "inout.h" /* tr_ioTestPiece */
29#include "net.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-msgs.h"
33#include "ptrarray.h"
34#include "session.h"
35#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
36#include "torrent.h"
37#include "utils.h"
38#include "webseed.h"
39
40enum
41{
42    /* how frequently to cull old atoms */
43    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
44
45    /* how frequently to change which peers are choked */
46    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
47
48    /* how frequently to reallocate bandwidth */
49    BANDWIDTH_PERIOD_MSEC = 500,
50
51    /* how frequently to age out old piece request lists */
52    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
53
54    /* how frequently to decide which peers live and die */
55    RECONNECT_PERIOD_MSEC = 500,
56
57    /* when many peers are available, keep idle ones this long */
58    MIN_UPLOAD_IDLE_SECS = ( 60 ),
59
60    /* when few peers are available, keep idle ones this long */
61    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
62
63    /* max number of peers to ask for per second overall.
64    * this throttle is to avoid overloading the router */
65    MAX_CONNECTIONS_PER_SECOND = 8,
66
67    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
68
69    /* number of bad pieces a peer is allowed to send before we ban them */
70    MAX_BAD_PIECES_PER_PEER = 5,
71
72    /* amount of time to keep a list of request pieces lying around
73       before it's considered too old and needs to be rebuilt */
74    PIECE_LIST_SHELF_LIFE_SECS = 60,
75
76    /* use for bitwise operations w/peer_atom.myflags */
77    MYFLAG_BANNED = 1,
78
79    /* use for bitwise operations w/peer_atom.myflags */
80    /* unreachable for now... but not banned.
81     * if they try to connect to us it's okay */
82    MYFLAG_UNREACHABLE = 2,
83
84    /* the minimum we'll wait before attempting to reconnect to a peer */
85    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
86
87    /** how long we'll let requests we've made linger before we cancel them */
88    REQUEST_TTL_SECS = 120,
89
90    CANCEL_HISTORY_SEC = 120
91};
92
93
94/**
95***
96**/
97
98enum
99{
100    UPLOAD_ONLY_UKNOWN,
101    UPLOAD_ONLY_YES,
102    UPLOAD_ONLY_NO
103};
104
105/**
106 * Peer information that should be kept even before we've connected and
107 * after we've disconnected.  These are kept in a pool of peer_atoms to decide
108 * which ones would make good candidates for connecting to, and to watch out
109 * for banned peers.
110 *
111 * @see tr_peer
112 * @see tr_peermsgs
113 */
114struct peer_atom
115{
116    uint8_t     from;
117    uint8_t     flags;              /* these match the added_f flags */
118    uint8_t     myflags;            /* flags that aren't defined in added_f */
119    uint8_t     uploadOnly;         /* UPLOAD_ONLY_ */
120    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
121
122    tr_port     port;
123    uint16_t    numFails;
124    time_t      time;               /* when the peer's connection status last changed */
125    time_t      piece_data_time;
126
127    time_t      lastConnectionAttemptAt;
128    time_t      lastConnectionAt;
129
130    /* similar to a TTL field, but less rigid --
131     * if the swarm is small, the atom will be kept past this date. */
132    time_t      shelf_date;
133    tr_peer   * peer;               /* will be NULL if not connected */
134    tr_address  addr;
135};
136
137#ifdef NDEBUG
138#define tr_isAtom(a) (TRUE)
139#else
140static tr_bool
141tr_isAtom( const struct peer_atom * atom )
142{
143    return ( atom != NULL )
144        && ( atom->from < TR_PEER_FROM__MAX )
145        && ( tr_isAddress( &atom->addr ) );
146}
147#endif
148
149static const char*
150tr_atomAddrStr( const struct peer_atom * atom )
151{
152    return tr_peerIoAddrStr( &atom->addr, atom->port );
153}
154
155struct block_request
156{
157    tr_block_index_t block;
158    tr_peer * peer;
159    time_t sentAt;
160};
161
162struct weighted_piece
163{
164    tr_piece_index_t index;
165    int16_t salt;
166    int16_t requestCount;
167};
168
169/** @brief Opaque, per-torrent data structure for peer connection information */
170typedef struct tr_torrent_peers
171{
172    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
173    tr_ptrArray                pool; /* struct peer_atom */
174    tr_ptrArray                peers; /* tr_peer */
175    tr_ptrArray                webseeds; /* tr_webseed */
176
177    tr_torrent               * tor;
178    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
179    struct tr_peerMgr        * manager;
180
181    tr_bool                    isRunning;
182    tr_bool                    needsCompletenessCheck;
183
184    struct block_request     * requests;
185    int                        requestCount;
186    int                        requestAlloc;
187
188    struct weighted_piece    * pieces;
189    int                        pieceCount;
190
191    int                        interestedCount;
192
193    /* An arbitrary metric of how congested the downloads are.
194     * Based on how many of requests are cancelled and how many are completed.
195     * Lower values indicate less congestion. */
196    double                     cancelRate;
197}
198Torrent;
199
200struct tr_peerMgr
201{
202    tr_session    * session;
203    tr_ptrArray     incomingHandshakes; /* tr_handshake */
204    struct event  * bandwidthTimer;
205    struct event  * rechokeTimer;
206    struct event  * refillUpkeepTimer;
207    struct event  * atomTimer;
208};
209
210#define tordbg( t, ... ) \
211    do { \
212        if( tr_deepLoggingIsActive( ) ) \
213            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
214    } while( 0 )
215
216#define dbgmsg( ... ) \
217    do { \
218        if( tr_deepLoggingIsActive( ) ) \
219            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
220    } while( 0 )
221
222/**
223***
224**/
225
226static inline void
227managerLock( const struct tr_peerMgr * manager )
228{
229    tr_sessionLock( manager->session );
230}
231
232static inline void
233managerUnlock( const struct tr_peerMgr * manager )
234{
235    tr_sessionUnlock( manager->session );
236}
237
238static inline void
239torrentLock( Torrent * torrent )
240{
241    managerLock( torrent->manager );
242}
243
244static inline void
245torrentUnlock( Torrent * torrent )
246{
247    managerUnlock( torrent->manager );
248}
249
250static inline int
251torrentIsLocked( const Torrent * t )
252{
253    return tr_sessionIsLocked( t->manager->session );
254}
255
256/**
257***
258**/
259
260static int
261handshakeCompareToAddr( const void * va, const void * vb )
262{
263    const tr_handshake * a = va;
264
265    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
266}
267
268static int
269handshakeCompare( const void * a, const void * b )
270{
271    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
272}
273
274static tr_handshake*
275getExistingHandshake( tr_ptrArray      * handshakes,
276                      const tr_address * addr )
277{
278    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
279}
280
281static int
282comparePeerAtomToAddress( const void * va, const void * vb )
283{
284    const struct peer_atom * a = va;
285
286    return tr_compareAddresses( &a->addr, vb );
287}
288
289static int
290compareAtomsByAddress( const void * va, const void * vb )
291{
292    const struct peer_atom * b = vb;
293
294    assert( tr_isAtom( b ) );
295
296    return comparePeerAtomToAddress( va, &b->addr );
297}
298
299/**
300***
301**/
302
303const tr_address *
304tr_peerAddress( const tr_peer * peer )
305{
306    return &peer->atom->addr;
307}
308
309static Torrent*
310getExistingTorrent( tr_peerMgr *    manager,
311                    const uint8_t * hash )
312{
313    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
314
315    return tor == NULL ? NULL : tor->torrentPeers;
316}
317
318static int
319peerCompare( const void * a, const void * b )
320{
321    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
322}
323
324static struct peer_atom*
325getExistingAtom( const Torrent    * t,
326                 const tr_address * addr )
327{
328    Torrent * tt = (Torrent*)t;
329    assert( torrentIsLocked( t ) );
330    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
331}
332
333static tr_bool
334peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
335{
336    Torrent * t = (Torrent*) ct;
337
338    assert( torrentIsLocked ( t ) );
339
340    return ( atom->peer != NULL )
341        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
342        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
343}
344
345static tr_peer*
346peerConstructor( struct peer_atom * atom )
347{
348    tr_peer * peer = tr_new0( tr_peer, 1 );
349
350    tr_bitsetConstructor( &peer->have, 0 );
351
352    peer->atom = atom;
353    atom->peer = peer;
354
355    peer->blocksSentToClient  = tr_historyNew( CANCEL_HISTORY_SEC, 1 );
356    peer->blocksSentToPeer    = tr_historyNew( CANCEL_HISTORY_SEC, 1 );
357    peer->cancelsSentToClient = tr_historyNew( CANCEL_HISTORY_SEC, 1 );
358    peer->cancelsSentToPeer   = tr_historyNew( CANCEL_HISTORY_SEC, 1 );
359
360    return peer;
361}
362
363static tr_peer*
364getPeer( Torrent * torrent, struct peer_atom * atom )
365{
366    tr_peer * peer;
367
368    assert( torrentIsLocked( torrent ) );
369
370    peer = atom->peer;
371
372    if( peer == NULL )
373    {
374        peer = peerConstructor( atom );
375        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
376    }
377
378    return peer;
379}
380
381static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
382
383static void
384peerDestructor( Torrent * t, tr_peer * peer )
385{
386    assert( peer != NULL );
387
388    peerDeclinedAllRequests( t, peer );
389
390    if( peer->msgs != NULL )
391    {
392        tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
393        tr_peerMsgsFree( peer->msgs );
394    }
395
396    tr_peerIoClear( peer->io );
397    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
398
399    tr_historyFree( peer->blocksSentToClient  );
400    tr_historyFree( peer->blocksSentToPeer    );
401    tr_historyFree( peer->cancelsSentToClient );
402    tr_historyFree( peer->cancelsSentToPeer   );
403
404    tr_bitsetDestructor( &peer->have );
405    tr_bitfieldFree( peer->blame );
406    tr_free( peer->client );
407    peer->atom->peer = NULL;
408
409    tr_free( peer );
410}
411
412static void
413removePeer( Torrent * t, tr_peer * peer )
414{
415    tr_peer * removed;
416    struct peer_atom * atom = peer->atom;
417
418    assert( torrentIsLocked( t ) );
419    assert( atom );
420
421    atom->time = tr_time( );
422
423    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
424    assert( removed == peer );
425    peerDestructor( t, removed );
426}
427
428static void
429removeAllPeers( Torrent * t )
430{
431    while( !tr_ptrArrayEmpty( &t->peers ) )
432        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
433}
434
435static void
436torrentDestructor( void * vt )
437{
438    Torrent * t = vt;
439
440    assert( t );
441    assert( !t->isRunning );
442    assert( torrentIsLocked( t ) );
443    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
444    assert( tr_ptrArrayEmpty( &t->peers ) );
445
446    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
447    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
448    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
449    tr_ptrArrayDestruct( &t->peers, NULL );
450
451    tr_free( t->requests );
452    tr_free( t->pieces );
453    tr_free( t );
454}
455
456static void peerCallbackFunc( void * vpeer, void * vevent, void * vt );
457
458static Torrent*
459torrentConstructor( tr_peerMgr * manager,
460                    tr_torrent * tor )
461{
462    int       i;
463    Torrent * t;
464
465    t = tr_new0( Torrent, 1 );
466    t->manager = manager;
467    t->tor = tor;
468    t->pool = TR_PTR_ARRAY_INIT;
469    t->peers = TR_PTR_ARRAY_INIT;
470    t->webseeds = TR_PTR_ARRAY_INIT;
471    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
472
473    for( i = 0; i < tor->info.webseedCount; ++i )
474    {
475        tr_webseed * w =
476            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
477        tr_ptrArrayAppend( &t->webseeds, w );
478    }
479
480    return t;
481}
482
483tr_peerMgr*
484tr_peerMgrNew( tr_session * session )
485{
486    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
487    m->session = session;
488    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
489    return m;
490}
491
492static void
493deleteTimer( struct event ** t )
494{
495    if( *t != NULL )
496    {
497        evtimer_del( *t );
498        tr_free( *t );
499        *t = NULL;
500    }
501}
502
503static void
504deleteTimers( struct tr_peerMgr * m )
505{
506    deleteTimer( &m->atomTimer );
507    deleteTimer( &m->bandwidthTimer );
508    deleteTimer( &m->rechokeTimer );
509    deleteTimer( &m->refillUpkeepTimer );
510}
511
512void
513tr_peerMgrFree( tr_peerMgr * manager )
514{
515    managerLock( manager );
516
517    deleteTimers( manager );
518
519    /* free the handshakes.  Abort invokes handshakeDoneCB(), which removes
520     * the item from manager->handshakes, so this is a little roundabout... */
521    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
522        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
523
524    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
525
526    managerUnlock( manager );
527    tr_free( manager );
528}
529
530static int
531clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
532{
533    if( !tr_torrentHasMetadata( tor ) )
534        return TRUE;
535
536    return peer->clientIsInterested && !peer->clientIsChoked;
537}
538
539static int
540clientIsUploadingTo( const tr_peer * peer )
541{
542    return peer->peerIsInterested && !peer->peerIsChoked;
543}
544
545/***
546****
547***/
548
549static void
550atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
551{
552    assert( atom != NULL );
553    assert( -1<=seedProbability && seedProbability<=100 );
554
555    atom->seedProbability = seedProbability;
556
557    if( seedProbability == 100 )
558        atom->flags |= ADDED_F_SEED_FLAG;
559    else if( seedProbability != -1 )
560        atom->flags &= ~ADDED_F_SEED_FLAG;
561}
562
563static void
564atomSetSeed( struct peer_atom * atom )
565{
566    atomSetSeedProbability( atom, 100 );
567}
568
569static tr_bool
570atomIsSeed( const struct peer_atom * atom )
571{
572    return atom->seedProbability == 100;
573}
574
575tr_bool
576tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
577                      const tr_address  * addr )
578{
579    tr_bool isSeed = FALSE;
580    const Torrent * t = tor->torrentPeers;
581    const struct peer_atom * atom = getExistingAtom( t, addr );
582
583    if( atom )
584        isSeed = atomIsSeed( atom );
585
586    return isSeed;
587}
588
589/**
590***  REQUESTS
591***
592*** There are two data structures associated with managing block requests:
593***
594*** 1. Torrent::requests, an array of "struct block_request" which keeps
595***    track of which blocks have been requested, and when, and by which peers.
596***    This is list is used for (a) cancelling requests that have been pending
597***    for too long and (b) avoiding duplicate requests before endgame.
598***
599*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
600***    pieces that we want to request.  It's used to decide which blocks to
601***    return next when tr_peerMgrGetBlockRequests() is called.
602**/
603
604/**
605*** struct block_request
606**/
607
608static int
609compareReqByBlock( const void * va, const void * vb )
610{
611    const struct block_request * a = va;
612    const struct block_request * b = vb;
613
614    /* primary key: block */
615    if( a->block < b->block ) return -1;
616    if( a->block > b->block ) return 1;
617
618    /* secondary key: peer */
619    if( a->peer < b->peer ) return -1;
620    if( a->peer > b->peer ) return 1;
621
622    return 0;
623}
624
625static void
626requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
627{
628    struct block_request key;
629
630    /* ensure enough room is available... */
631    if( t->requestCount + 1 >= t->requestAlloc )
632    {
633        const int CHUNK_SIZE = 128;
634        t->requestAlloc += CHUNK_SIZE;
635        t->requests = tr_renew( struct block_request,
636                                t->requests, t->requestAlloc );
637    }
638
639    /* populate the record we're inserting */
640    key.block = block;
641    key.peer = peer;
642    key.sentAt = tr_time( );
643
644    /* insert the request to our array... */
645    {
646        tr_bool exact;
647        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
648                                       sizeof( struct block_request ),
649                                       compareReqByBlock, &exact );
650        assert( !exact );
651        memmove( t->requests + pos + 1,
652                 t->requests + pos,
653                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
654        t->requests[pos] = key;
655    }
656
657    if( peer != NULL )
658    {
659        ++peer->pendingReqsToPeer;
660        assert( peer->pendingReqsToPeer >= 0 );
661    }
662
663    /*fprintf( stderr, "added request of block %lu from peer %s... "
664                       "there are now %d block\n",
665                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
666}
667
668static struct block_request *
669requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
670{
671    struct block_request key;
672    key.block = block;
673    key.peer = (tr_peer*) peer;
674
675    return bsearch( &key, t->requests, t->requestCount,
676                    sizeof( struct block_request ),
677                    compareReqByBlock );
678}
679
680/* how many peers are we currently requesting this block from... */
681static int
682countBlockRequests( Torrent * t, tr_block_index_t block )
683{
684    tr_bool exact;
685    int i, n, pos;
686    struct block_request key;
687
688    key.block = block;
689    key.peer = NULL;
690    pos = tr_lowerBound( &key, t->requests, t->requestCount,
691                         sizeof( struct block_request ),
692                         compareReqByBlock, &exact );
693
694    assert( !exact ); /* shouldn't have a request with .peer == NULL */
695
696    n = 0;
697    for( i=pos; i<t->requestCount; ++i ) {
698        if( t->requests[i].block == block )
699            ++n;
700        else
701            break;
702    }
703
704    return n;
705}
706
707static void
708decrementPendingReqCount( const struct block_request * b )
709{
710    if( b->peer != NULL )
711        if( b->peer->pendingReqsToPeer > 0 )
712            --b->peer->pendingReqsToPeer;
713}
714
715static void
716requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
717{
718    const struct block_request * b = requestListLookup( t, block, peer );
719    if( b != NULL )
720    {
721        const int pos = b - t->requests;
722        assert( pos < t->requestCount );
723
724        decrementPendingReqCount( b );
725
726        tr_removeElementFromArray( t->requests,
727                                   pos,
728                                   sizeof( struct block_request ),
729                                   t->requestCount-- );
730
731        /*fprintf( stderr, "removing request of block %lu from peer %s... "
732                           "there are now %d block requests left\n",
733                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
734    }
735}
736
737/**
738*** struct weighted_piece
739**/
740
741enum
742{
743    PIECES_UNSORTED,
744    PIECES_SORTED_BY_INDEX,
745    PIECES_SORTED_BY_WEIGHT
746};
747
748const tr_torrent * weightTorrent;
749
750/* we try to create a "weight" s.t. high-priority pieces come before others,
751 * and that partially-complete pieces come before empty ones. */
752static int
753comparePieceByWeight( const void * va, const void * vb )
754{
755    const struct weighted_piece * a = va;
756    const struct weighted_piece * b = vb;
757    int ia, ib, missing, pending;
758    const tr_torrent * tor = weightTorrent;
759
760    /* primary key: weight */
761    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
762    pending = a->requestCount;
763    ia = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
764    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
765    pending = b->requestCount;
766    ib = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
767    if( ia < ib ) return -1;
768    if( ia > ib ) return 1;
769
770    /* secondary key: higher priorities go first */
771    ia = tor->info.pieces[a->index].priority;
772    ib = tor->info.pieces[b->index].priority;
773    if( ia > ib ) return -1;
774    if( ia < ib ) return 1;
775
776    /* tertiary key: random */
777    if( a->salt < b->salt ) return -1;
778    if( a->salt > b->salt ) return 1;
779
780    /* okay, they're equal */
781    return 0;
782}
783
784static int
785comparePieceByIndex( const void * va, const void * vb )
786{
787    const struct weighted_piece * a = va;
788    const struct weighted_piece * b = vb;
789    if( a->index < b->index ) return -1;
790    if( a->index > b->index ) return 1;
791    return 0;
792}
793
794static void
795pieceListSort( Torrent * t, int mode )
796{
797    int(*compar)(const void *, const void *);
798
799    assert( mode==PIECES_SORTED_BY_INDEX
800         || mode==PIECES_SORTED_BY_WEIGHT );
801
802    switch( mode ) {
803        case PIECES_SORTED_BY_WEIGHT: compar = comparePieceByWeight; break;
804        case PIECES_SORTED_BY_INDEX: compar = comparePieceByIndex; break;
805        default: assert( 0 && "unhandled" );  break;
806    }
807
808    weightTorrent = t->tor;
809    qsort( t->pieces, t->pieceCount,
810           sizeof( struct weighted_piece ), compar );
811}
812
813static tr_bool
814isInEndgame( Torrent * t )
815{
816    tr_bool endgame = FALSE;
817
818    if( ( t->pieces != NULL ) && ( t->pieceCount > 0 ) )
819    {
820        const struct weighted_piece * p = t->pieces;
821        const int pending = p->requestCount;
822        const int missing = tr_cpMissingBlocksInPiece( &t->tor->completion, p->index );
823        endgame = pending >= missing;
824    }
825
826    /*if( endgame ) fprintf( stderr, "ENDGAME reached\n" );*/
827    return endgame;
828}
829
830/**
831 * This function is useful for sanity checking,
832 * but is too expensive even for nightly builds...
833 * let's leave it disabled but add an easy hook to compile it back in
834 */
835#if 0
836static void
837assertWeightedPiecesAreSorted( Torrent * t )
838{
839    if( !isInEndgame( t ) )
840    {
841        int i;
842        weightTorrent = t->tor;
843        for( i=0; i<t->pieceCount-1; ++i )
844            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
845    }
846}
847#else
848#define assertWeightedPiecesAreSorted(t)
849#endif
850
851static struct weighted_piece *
852pieceListLookup( Torrent * t, tr_piece_index_t index )
853{
854    int i;
855
856    for( i=0; i<t->pieceCount; ++i )
857        if( t->pieces[i].index == index )
858            return &t->pieces[i];
859
860    return NULL;
861}
862
863static void
864pieceListRebuild( Torrent * t )
865{
866    assertWeightedPiecesAreSorted( t );
867
868    if( !tr_torrentIsSeed( t->tor ) )
869    {
870        tr_piece_index_t i;
871        tr_piece_index_t * pool;
872        tr_piece_index_t poolCount = 0;
873        const tr_torrent * tor = t->tor;
874        const tr_info * inf = tr_torrentInfo( tor );
875        struct weighted_piece * pieces;
876        int pieceCount;
877
878        /* build the new list */
879        pool = tr_new( tr_piece_index_t, inf->pieceCount );
880        for( i=0; i<inf->pieceCount; ++i )
881            if( !inf->pieces[i].dnd )
882                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
883                    pool[poolCount++] = i;
884        pieceCount = poolCount;
885        pieces = tr_new0( struct weighted_piece, pieceCount );
886        for( i=0; i<poolCount; ++i ) {
887            struct weighted_piece * piece = pieces + i;
888            piece->index = pool[i];
889            piece->requestCount = 0;
890            piece->salt = tr_cryptoWeakRandInt( 4096 );
891        }
892
893        /* if we already had a list of pieces, merge it into
894         * the new list so we don't lose its requestCounts */
895        if( t->pieces != NULL )
896        {
897            struct weighted_piece * o = t->pieces;
898            struct weighted_piece * oend = o + t->pieceCount;
899            struct weighted_piece * n = pieces;
900            struct weighted_piece * nend = n + pieceCount;
901
902            pieceListSort( t, PIECES_SORTED_BY_INDEX );
903
904            while( o!=oend && n!=nend ) {
905                if( o->index < n->index )
906                    ++o;
907                else if( o->index > n->index )
908                    ++n;
909                else
910                    *n++ = *o++;
911            }
912
913            tr_free( t->pieces );
914        }
915
916        t->pieces = pieces;
917        t->pieceCount = pieceCount;
918
919        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
920
921        /* cleanup */
922        tr_free( pool );
923    }
924}
925
926static void
927pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
928{
929    struct weighted_piece * p;
930
931    assertWeightedPiecesAreSorted( t );
932
933    if(( p = pieceListLookup( t, piece )))
934    {
935        const int pos = p - t->pieces;
936
937        tr_removeElementFromArray( t->pieces,
938                                   pos,
939                                   sizeof( struct weighted_piece ),
940                                   t->pieceCount-- );
941
942        if( t->pieceCount == 0 )
943        {
944            tr_free( t->pieces );
945            t->pieces = NULL;
946        }
947    }
948
949    assertWeightedPiecesAreSorted( t );
950}
951
952static void
953pieceListResortPiece( Torrent * t, struct weighted_piece * p )
954{
955    int pos;
956    tr_bool isSorted = TRUE;
957
958    if( p == NULL )
959        return;
960
961    /* is the torrent already sorted? */
962    pos = p - t->pieces;
963    weightTorrent = t->tor;
964    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
965        isSorted = FALSE;
966    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
967        isSorted = FALSE;
968
969    /* if it's not sorted, move it around */
970    if( !isSorted )
971    {
972        tr_bool exact;
973        const struct weighted_piece tmp = *p;
974
975        tr_removeElementFromArray( t->pieces,
976                                   pos,
977                                   sizeof( struct weighted_piece ),
978                                   t->pieceCount-- );
979
980        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
981                             sizeof( struct weighted_piece ),
982                             comparePieceByWeight, &exact );
983
984        memmove( &t->pieces[pos + 1],
985                 &t->pieces[pos],
986                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
987
988        t->pieces[pos] = tmp;
989    }
990
991    assertWeightedPiecesAreSorted( t );
992}
993
994static void
995pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
996{
997    struct weighted_piece * p;
998    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
999
1000    assertWeightedPiecesAreSorted( t );
1001
1002    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
1003    {
1004        --p->requestCount;
1005        pieceListResortPiece( t, p );
1006    }
1007
1008    assertWeightedPiecesAreSorted( t );
1009}
1010
1011/**
1012***
1013**/
1014
1015void
1016tr_peerMgrRebuildRequests( tr_torrent * tor )
1017{
1018    assert( tr_isTorrent( tor ) );
1019
1020    pieceListRebuild( tor->torrentPeers );
1021}
1022
1023void
1024tr_peerMgrGetNextRequests( tr_torrent           * tor,
1025                           tr_peer              * peer,
1026                           int                    numwant,
1027                           tr_block_index_t     * setme,
1028                           int                  * numgot )
1029{
1030    int i;
1031    int got;
1032    Torrent * t;
1033    tr_bool endgame;
1034    struct weighted_piece * pieces;
1035    const tr_bitset * have = &peer->have;
1036
1037    /* sanity clause */
1038    assert( tr_isTorrent( tor ) );
1039    assert( peer->clientIsInterested );
1040    assert( !peer->clientIsChoked );
1041    assert( numwant > 0 );
1042
1043    /* walk through the pieces and find blocks that should be requested */
1044    got = 0;
1045    t = tor->torrentPeers;
1046    assertWeightedPiecesAreSorted( t );
1047
1048    /* prep the pieces list */
1049    if( t->pieces == NULL )
1050        pieceListRebuild( t );
1051
1052    endgame = isInEndgame( t );
1053
1054    pieces = t->pieces;
1055    for( i=0; i<t->pieceCount && got<numwant; ++i )
1056    {
1057        struct weighted_piece * p = pieces + i;
1058        const int missing = tr_cpMissingBlocksInPiece( &tor->completion, p->index );
1059        const int maxDuplicatesPerBlock = endgame ? 3 : 1;
1060
1061        if( p->requestCount > ( missing * maxDuplicatesPerBlock ) )
1062            continue;
1063
1064        /* if the peer has this piece that we want... */
1065        if( tr_bitsetHasFast( have, p->index ) )
1066        {
1067            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
1068            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
1069
1070            for( ; b!=e && got<numwant; ++b )
1071            {
1072                /* don't request blocks we've already got */
1073                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
1074                    continue;
1075
1076                /* don't send the same request to the same peer twice */
1077                if( tr_peerMgrDidPeerRequest( tor, peer, b ) )
1078                    continue;
1079
1080                /* don't send the same request to any peer too many times */
1081                if( countBlockRequests( t, b ) >= maxDuplicatesPerBlock )
1082                    continue;
1083
1084                /* update the caller's table */
1085                setme[got++] = b;
1086
1087                /* update our own tables */
1088                requestListAdd( t, b, peer );
1089                ++p->requestCount;
1090            }
1091        }
1092    }
1093
1094    /* In most cases we've just changed the weights of a small number of pieces.
1095     * So rather than qsort()ing the entire array, it's faster to apply an
1096     * adaptive insertion sort algorithm. */
1097    if( got > 0 )
1098    {
1099        /* not enough requests || last piece modified */
1100        if ( i == t->pieceCount ) --i;
1101
1102        weightTorrent = t->tor;
1103        while( --i >= 0 )
1104        {
1105            tr_bool exact;
1106
1107            /* relative position! */
1108            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1109                                              t->pieceCount - (i + 1),
1110                                              sizeof( struct weighted_piece ),
1111                                              comparePieceByWeight, &exact );
1112            if( newpos > 0 )
1113            {
1114                const struct weighted_piece piece = t->pieces[i];
1115                memmove( &t->pieces[i],
1116                         &t->pieces[i + 1],
1117                         sizeof( struct weighted_piece ) * ( newpos ) );
1118                t->pieces[i + newpos] = piece;
1119            }
1120        }
1121    }
1122
1123    assertWeightedPiecesAreSorted( t );
1124    *numgot = got;
1125}
1126
1127tr_bool
1128tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1129                          const tr_peer     * peer,
1130                          tr_block_index_t    block )
1131{
1132    const Torrent * t = tor->torrentPeers;
1133    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1134}
1135
1136/* cancel requests that are too old */
1137static void
1138refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1139{
1140    time_t now;
1141    uint64_t now_msec;
1142    time_t too_old;
1143    tr_torrent * tor;
1144    tr_peerMgr * mgr = vmgr;
1145    managerLock( mgr );
1146
1147    now = tr_time( );
1148    now_msec = tr_date( );
1149    too_old = now - REQUEST_TTL_SECS;
1150
1151    tor = NULL;
1152    while(( tor = tr_torrentNext( mgr->session, tor )))
1153    {
1154        Torrent * t = tor->torrentPeers;
1155        const int n = t->requestCount;
1156        if( n > 0 )
1157        {
1158            int keepCount = 0;
1159            int cancelCount = 0;
1160            struct block_request * cancel = tr_new( struct block_request, n );
1161            const struct block_request * it;
1162            const struct block_request * end;
1163
1164            for( it=t->requests, end=it+n; it!=end; ++it )
1165            {
1166                if( ( it->sentAt <= too_old ) && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1167                    cancel[cancelCount++] = *it;
1168                else
1169                {
1170                    if( it != &t->requests[keepCount] )
1171                        t->requests[keepCount] = *it;
1172                    keepCount++;
1173                }
1174            }
1175
1176            /* prune out the ones we aren't keeping */
1177            t->requestCount = keepCount;
1178
1179            /* send cancel messages for all the "cancel" ones */
1180            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1181                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1182                    tr_historyAdd( it->peer->cancelsSentToPeer, now_msec, 1 );
1183                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1184                    decrementPendingReqCount( it );
1185                }
1186            }
1187
1188            /* decrement the pending request counts for the timed-out blocks */
1189            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1190                pieceListRemoveRequest( t, it->block );
1191
1192            /* cleanup loop */
1193            tr_free( cancel );
1194        }
1195    }
1196
1197    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1198    managerUnlock( mgr );
1199}
1200
1201static void
1202addStrike( Torrent * t, tr_peer * peer )
1203{
1204    tordbg( t, "increasing peer %s strike count to %d",
1205            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1206
1207    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1208    {
1209        struct peer_atom * atom = peer->atom;
1210        atom->myflags |= MYFLAG_BANNED;
1211        peer->doPurge = 1;
1212        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1213    }
1214}
1215
1216static void
1217gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1218{
1219    tr_torrent *   tor = t->tor;
1220    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1221
1222    tor->corruptCur += byteCount;
1223    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1224
1225    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1226}
1227
1228static void
1229peerSuggestedPiece( Torrent            * t UNUSED,
1230                    tr_peer            * peer UNUSED,
1231                    tr_piece_index_t     pieceIndex UNUSED,
1232                    int                  isFastAllowed UNUSED )
1233{
1234#if 0
1235    assert( t );
1236    assert( peer );
1237    assert( peer->msgs );
1238
1239    /* is this a valid piece? */
1240    if(  pieceIndex >= t->tor->info.pieceCount )
1241        return;
1242
1243    /* don't ask for it if we've already got it */
1244    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1245        return;
1246
1247    /* don't ask for it if they don't have it */
1248    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1249        return;
1250
1251    /* don't ask for it if we're choked and it's not fast */
1252    if( !isFastAllowed && peer->clientIsChoked )
1253        return;
1254
1255    /* request the blocks that we don't have in this piece */
1256    {
1257        tr_block_index_t block;
1258        const tr_torrent * tor = t->tor;
1259        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
1260        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
1261
1262        for( block=start; block<end; ++block )
1263        {
1264            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1265            {
1266                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1267                const uint32_t length = tr_torBlockCountBytes( tor, block );
1268                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1269                incrementPieceRequests( t, pieceIndex );
1270            }
1271        }
1272    }
1273#endif
1274}
1275
1276static void
1277removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1278{
1279    requestListRemove( t, block, peer );
1280    pieceListRemoveRequest( t, block );
1281}
1282
1283/* peer choked us, or maybe it disconnected.
1284   either way we need to remove all its requests */
1285static void
1286peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1287{
1288    int i, n;
1289    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1290
1291    for( i=n=0; i<t->requestCount; ++i )
1292        if( peer == t->requests[i].peer )
1293            blocks[n++] = t->requests[i].block;
1294
1295    for( i=0; i<n; ++i )
1296        removeRequestFromTables( t, blocks[i], peer );
1297
1298    tr_free( blocks );
1299}
1300
1301static void
1302peerCallbackFunc( void * vpeer, void * vevent, void * vt )
1303{
1304    tr_peer * peer = vpeer; /* may be NULL if peer is a webseed */
1305    Torrent * t = vt;
1306    const tr_peer_event * e = vevent;
1307
1308    torrentLock( t );
1309
1310    switch( e->eventType )
1311    {
1312        case TR_PEER_PEER_GOT_DATA:
1313        {
1314            const time_t now = tr_time( );
1315            tr_torrent * tor = t->tor;
1316
1317            tr_torrentSetActivityDate( tor, now );
1318
1319            if( e->wasPieceData ) {
1320                tor->uploadedCur += e->length;
1321                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1322                tr_torrentSetDirty( tor );
1323            }
1324
1325            /* update the stats */
1326            if( e->wasPieceData )
1327                tr_statsAddUploaded( tor->session, e->length );
1328
1329            /* update our atom */
1330            if( peer && e->wasPieceData )
1331                peer->atom->piece_data_time = now;
1332
1333            break;
1334        }
1335
1336        case TR_PEER_CLIENT_GOT_REJ:
1337            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ), peer );
1338            break;
1339
1340        case TR_PEER_CLIENT_GOT_CHOKE:
1341            peerDeclinedAllRequests( t, peer );
1342            break;
1343
1344        case TR_PEER_CLIENT_GOT_PORT:
1345            if( peer )
1346                peer->atom->port = e->port;
1347            break;
1348
1349        case TR_PEER_CLIENT_GOT_SUGGEST:
1350            if( peer )
1351                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1352            break;
1353
1354        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1355            if( peer )
1356                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1357            break;
1358
1359        case TR_PEER_CLIENT_GOT_DATA:
1360        {
1361            const time_t now = tr_time( );
1362            tr_torrent * tor = t->tor;
1363
1364            tr_torrentSetActivityDate( tor, now );
1365
1366            if( e->wasPieceData ) {
1367                tor->downloadedCur += e->length;
1368                tr_torrentSetDirty( tor );
1369            }
1370
1371            /* update the stats */
1372            if( e->wasPieceData )
1373                tr_statsAddDownloaded( tor->session, e->length );
1374
1375            /* update our atom */
1376            if( peer && e->wasPieceData )
1377                peer->atom->piece_data_time = now;
1378
1379            break;
1380        }
1381
1382        case TR_PEER_PEER_PROGRESS:
1383        {
1384            if( peer )
1385            {
1386                struct peer_atom * atom = peer->atom;
1387                if( e->progress >= 1.0 ) {
1388                    tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
1389                    atomSetSeed( atom );
1390                }
1391            }
1392            break;
1393        }
1394
1395        case TR_PEER_CLIENT_GOT_BLOCK:
1396        {
1397            tr_torrent * tor = t->tor;
1398            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1399
1400            requestListRemove( t, block, peer );
1401            pieceListRemoveRequest( t, block );
1402
1403            if( peer != NULL )
1404                tr_historyAdd( peer->blocksSentToClient, tr_date( ), 1 );
1405
1406            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1407            {
1408                /* we already have this block... */
1409                const uint32_t n = tr_torBlockCountBytes( tor, block );
1410                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1411                tordbg( t, "we have this block already..." );
1412            }
1413            else
1414            {
1415                tr_cpBlockAdd( &tor->completion, block );
1416                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1417                tr_torrentSetDirty( tor );
1418
1419                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1420                {
1421                    const tr_piece_index_t p = e->pieceIndex;
1422                    const tr_bool ok = tr_ioTestPiece( tor, p );
1423
1424                    if( !ok )
1425                    {
1426                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1427                                   (unsigned long)p );
1428                    }
1429
1430                    tr_torrentSetHasPiece( tor, p, ok );
1431                    tr_torrentSetPieceChecked( tor, p, TRUE );
1432                    tr_peerMgrSetBlame( tor, p, ok );
1433
1434                    if( !ok )
1435                    {
1436                        gotBadPiece( t, p );
1437                    }
1438                    else
1439                    {
1440                        int i;
1441                        int peerCount;
1442                        tr_peer ** peers;
1443                        tr_file_index_t fileIndex;
1444
1445                        /* only add this to downloadedCur if we got it from a peer --
1446                         * webseeds shouldn't count against our ratio.  As one tracker
1447                         * admin put it, "Those pieces are downloaded directly from the
1448                         * content distributor, not the peers, it is the tracker's job
1449                         * to manage the swarms, not the web server and does not fit
1450                         * into the jurisdiction of the tracker." */
1451                        if( peer != NULL ) {
1452                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1453                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1454                        }
1455
1456                        peerCount = tr_ptrArraySize( &t->peers );
1457                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1458                        for( i=0; i<peerCount; ++i )
1459                            tr_peerMsgsHave( peers[i]->msgs, p );
1460
1461                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1462                            const tr_file * file = &tor->info.files[fileIndex];
1463                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) )
1464                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) )
1465                                    tr_torrentFileCompleted( tor, fileIndex );
1466                        }
1467
1468                        pieceListRemovePiece( t, p );
1469                    }
1470                }
1471
1472                t->needsCompletenessCheck = TRUE;
1473            }
1474            break;
1475        }
1476
1477        case TR_PEER_ERROR:
1478            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1479            {
1480                /* some protocol error from the peer */
1481                peer->doPurge = 1;
1482                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1483                        tr_atomAddrStr( peer->atom ) );
1484            }
1485            else
1486            {
1487                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1488            }
1489            break;
1490
1491        default:
1492            assert( 0 );
1493    }
1494
1495    torrentUnlock( t );
1496}
1497
1498static int
1499getDefaultShelfLife( uint8_t from )
1500{
1501    /* in general, peers obtained from firsthand contact
1502     * are better than those from secondhand, etc etc */
1503    switch( from )
1504    {
1505        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1506        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1507        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1508        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1509        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1510        case TR_PEER_FROM_RESUME   : return 60 * 60;
1511        case TR_PEER_FROM_LPD      : return 10 * 60;
1512        default                    : return 60 * 60;
1513    }
1514}
1515
1516static void
1517ensureAtomExists( Torrent           * t,
1518                  const tr_address  * addr,
1519                  const tr_port       port,
1520                  const uint8_t       flags,
1521                  const int8_t        seedProbability,
1522                  const uint8_t       from )
1523{
1524    struct peer_atom * a;
1525
1526    assert( tr_isAddress( addr ) );
1527    assert( from < TR_PEER_FROM__MAX );
1528
1529    a = getExistingAtom( t, addr );
1530
1531    if( a == NULL )
1532    {
1533        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1534        a = tr_new0( struct peer_atom, 1 );
1535        a->addr = *addr;
1536        a->port = port;
1537        a->flags = flags;
1538        a->from = from;
1539        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1540        atomSetSeedProbability( a, seedProbability );
1541        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1542
1543        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1544    }
1545    else if( a->seedProbability == -1 )
1546    {
1547        atomSetSeedProbability( a, seedProbability );
1548    }
1549}
1550
1551static int
1552getMaxPeerCount( const tr_torrent * tor )
1553{
1554    return tor->maxConnectedPeers;
1555}
1556
1557static int
1558getPeerCount( const Torrent * t )
1559{
1560    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1561}
1562
1563/* FIXME: this is kind of a mess. */
1564static tr_bool
1565myHandshakeDoneCB( tr_handshake  * handshake,
1566                   tr_peerIo     * io,
1567                   tr_bool         readAnythingFromPeer,
1568                   tr_bool         isConnected,
1569                   const uint8_t * peer_id,
1570                   void          * vmanager )
1571{
1572    tr_bool            ok = isConnected;
1573    tr_bool            success = FALSE;
1574    tr_port            port;
1575    const tr_address * addr;
1576    tr_peerMgr       * manager = vmanager;
1577    Torrent          * t;
1578    tr_handshake     * ours;
1579
1580    assert( io );
1581    assert( tr_isBool( ok ) );
1582
1583    t = tr_peerIoHasTorrentHash( io )
1584        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1585        : NULL;
1586
1587    if( tr_peerIoIsIncoming ( io ) )
1588        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1589                                        handshake, handshakeCompare );
1590    else if( t )
1591        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1592                                        handshake, handshakeCompare );
1593    else
1594        ours = handshake;
1595
1596    assert( ours );
1597    assert( ours == handshake );
1598
1599    if( t )
1600        torrentLock( t );
1601
1602    addr = tr_peerIoGetAddress( io, &port );
1603
1604    if( !ok || !t || !t->isRunning )
1605    {
1606        if( t )
1607        {
1608            struct peer_atom * atom = getExistingAtom( t, addr );
1609            if( atom )
1610            {
1611                ++atom->numFails;
1612
1613                if( !readAnythingFromPeer )
1614                {
1615                    tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
1616                    atom->myflags |= MYFLAG_UNREACHABLE;
1617                }
1618            }
1619        }
1620    }
1621    else /* looking good */
1622    {
1623        struct peer_atom * atom;
1624
1625        ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
1626        atom = getExistingAtom( t, addr );
1627        atom->time = tr_time( );
1628        atom->piece_data_time = 0;
1629        atom->lastConnectionAt = tr_time( );
1630        atom->myflags &= ~MYFLAG_UNREACHABLE;
1631
1632        if( atom->myflags & MYFLAG_BANNED )
1633        {
1634            tordbg( t, "banned peer %s tried to reconnect",
1635                    tr_atomAddrStr( atom ) );
1636        }
1637        else if( tr_peerIoIsIncoming( io )
1638               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1639
1640        {
1641        }
1642        else
1643        {
1644            tr_peer * peer = atom->peer;
1645
1646            if( peer ) /* we already have this peer */
1647            {
1648            }
1649            else
1650            {
1651                peer = getPeer( t, atom );
1652                tr_free( peer->client );
1653
1654                if( !peer_id )
1655                    peer->client = NULL;
1656                else {
1657                    char client[128];
1658                    tr_clientForId( client, sizeof( client ), peer_id );
1659                    peer->client = tr_strdup( client );
1660                }
1661
1662                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
1663                                                                balanced by our unref in peerDestructor()  */
1664                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
1665                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
1666
1667                success = TRUE;
1668            }
1669        }
1670    }
1671
1672    if( t )
1673        torrentUnlock( t );
1674
1675    return success;
1676}
1677
1678void
1679tr_peerMgrAddIncoming( tr_peerMgr * manager,
1680                       tr_address * addr,
1681                       tr_port      port,
1682                       int          socket )
1683{
1684    tr_session * session;
1685
1686    managerLock( manager );
1687
1688    assert( tr_isSession( manager->session ) );
1689    session = manager->session;
1690
1691    if( tr_sessionIsAddressBlocked( session, addr ) )
1692    {
1693        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
1694        tr_netClose( session, socket );
1695    }
1696    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
1697    {
1698        tr_netClose( session, socket );
1699    }
1700    else /* we don't have a connection to them yet... */
1701    {
1702        tr_peerIo *    io;
1703        tr_handshake * handshake;
1704
1705        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket );
1706
1707        handshake = tr_handshakeNew( io,
1708                                     session->encryptionMode,
1709                                     myHandshakeDoneCB,
1710                                     manager );
1711
1712        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
1713
1714        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
1715                                 handshakeCompare );
1716    }
1717
1718    managerUnlock( manager );
1719}
1720
1721static tr_bool
1722tr_isPex( const tr_pex * pex )
1723{
1724    return pex && tr_isAddress( &pex->addr );
1725}
1726
1727void
1728tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
1729                  const tr_pex * pex, int8_t seedProbability )
1730{
1731    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
1732    {
1733        Torrent * t = tor->torrentPeers;
1734        managerLock( t->manager );
1735
1736        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
1737            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
1738                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
1739
1740        managerUnlock( t->manager );
1741    }
1742}
1743
1744void
1745tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
1746{
1747    Torrent * t = tor->torrentPeers;
1748    const int n = tr_ptrArraySize( &t->pool );
1749    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
1750    struct peer_atom ** end = it + n;
1751
1752    while( it != end )
1753        atomSetSeed( *it++ );
1754}
1755
1756tr_pex *
1757tr_peerMgrCompactToPex( const void *    compact,
1758                        size_t          compactLen,
1759                        const uint8_t * added_f,
1760                        size_t          added_f_len,
1761                        size_t *        pexCount )
1762{
1763    size_t          i;
1764    size_t          n = compactLen / 6;
1765    const uint8_t * walk = compact;
1766    tr_pex *        pex = tr_new0( tr_pex, n );
1767
1768    for( i = 0; i < n; ++i )
1769    {
1770        pex[i].addr.type = TR_AF_INET;
1771        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
1772        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1773        if( added_f && ( n == added_f_len ) )
1774            pex[i].flags = added_f[i];
1775    }
1776
1777    *pexCount = n;
1778    return pex;
1779}
1780
1781tr_pex *
1782tr_peerMgrCompact6ToPex( const void    * compact,
1783                         size_t          compactLen,
1784                         const uint8_t * added_f,
1785                         size_t          added_f_len,
1786                         size_t        * pexCount )
1787{
1788    size_t          i;
1789    size_t          n = compactLen / 18;
1790    const uint8_t * walk = compact;
1791    tr_pex *        pex = tr_new0( tr_pex, n );
1792
1793    for( i = 0; i < n; ++i )
1794    {
1795        pex[i].addr.type = TR_AF_INET6;
1796        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
1797        memcpy( &pex[i].port, walk, 2 ); walk += 2;
1798        if( added_f && ( n == added_f_len ) )
1799            pex[i].flags = added_f[i];
1800    }
1801
1802    *pexCount = n;
1803    return pex;
1804}
1805
1806tr_pex *
1807tr_peerMgrArrayToPex( const void * array,
1808                      size_t       arrayLen,
1809                      size_t      * pexCount )
1810{
1811    size_t          i;
1812    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
1813    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
1814    const uint8_t * walk = array;
1815    tr_pex        * pex = tr_new0( tr_pex, n );
1816
1817    for( i = 0 ; i < n ; i++ ) {
1818        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
1819        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
1820        pex[i].flags = 0x00;
1821        walk += sizeof( tr_address ) + 2;
1822    }
1823
1824    *pexCount = n;
1825    return pex;
1826}
1827
1828/**
1829***
1830**/
1831
1832void
1833tr_peerMgrSetBlame( tr_torrent     * tor,
1834                    tr_piece_index_t pieceIndex,
1835                    int              success )
1836{
1837    if( !success )
1838    {
1839        int        peerCount, i;
1840        Torrent *  t = tor->torrentPeers;
1841        tr_peer ** peers;
1842
1843        assert( torrentIsLocked( t ) );
1844
1845        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
1846        for( i = 0; i < peerCount; ++i )
1847        {
1848            tr_peer * peer = peers[i];
1849            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
1850            {
1851                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
1852                        tr_atomAddrStr( peer->atom ),
1853                        pieceIndex, (int)peer->strikes + 1 );
1854                addStrike( t, peer );
1855            }
1856        }
1857    }
1858}
1859
1860int
1861tr_pexCompare( const void * va, const void * vb )
1862{
1863    const tr_pex * a = va;
1864    const tr_pex * b = vb;
1865    int i;
1866
1867    assert( tr_isPex( a ) );
1868    assert( tr_isPex( b ) );
1869
1870    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
1871        return i;
1872
1873    if( a->port != b->port )
1874        return a->port < b->port ? -1 : 1;
1875
1876    return 0;
1877}
1878
1879#if 0
1880static int
1881peerPrefersCrypto( const tr_peer * peer )
1882{
1883    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
1884        return TRUE;
1885
1886    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
1887        return FALSE;
1888
1889    return tr_peerIoIsEncrypted( peer->io );
1890}
1891#endif
1892
1893/* better goes first */
1894static int
1895compareAtomsByUsefulness( const void * va, const void *vb )
1896{
1897    const struct peer_atom * a = * (const struct peer_atom**) va;
1898    const struct peer_atom * b = * (const struct peer_atom**) vb;
1899
1900    assert( tr_isAtom( a ) );
1901    assert( tr_isAtom( b ) );
1902
1903    if( a->piece_data_time != b->piece_data_time )
1904        return a->piece_data_time > b->piece_data_time ? -1 : 1;
1905    if( a->from != b->from )
1906        return a->from < b->from ? -1 : 1;
1907    if( a->numFails != b->numFails )
1908        return a->numFails < b->numFails ? -1 : 1;
1909
1910    return 0;
1911}
1912
1913int
1914tr_peerMgrGetPeers( tr_torrent   * tor,
1915                    tr_pex      ** setme_pex,
1916                    uint8_t        af,
1917                    uint8_t        list_mode,
1918                    int            maxCount )
1919{
1920    int i;
1921    int n;
1922    int count = 0;
1923    int atomCount = 0;
1924    const Torrent * t = tor->torrentPeers;
1925    struct peer_atom ** atoms = NULL;
1926    tr_pex * pex;
1927    tr_pex * walk;
1928
1929    assert( tr_isTorrent( tor ) );
1930    assert( setme_pex != NULL );
1931    assert( af==TR_AF_INET || af==TR_AF_INET6 );
1932    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
1933
1934    managerLock( t->manager );
1935
1936    /**
1937    ***  build a list of atoms
1938    **/
1939
1940    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
1941    {
1942        int i;
1943        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
1944        atomCount = tr_ptrArraySize( &t->peers );
1945        atoms = tr_new( struct peer_atom *, atomCount );
1946        for( i=0; i<atomCount; ++i )
1947            atoms[i] = peers[i]->atom;
1948    }
1949    else /* TR_PEERS_ALL */
1950    {
1951        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
1952        atomCount = tr_ptrArraySize( &t->pool );
1953        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
1954    }
1955
1956    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
1957
1958    /**
1959    ***  add the first N of them into our return list
1960    **/
1961
1962    n = MIN( atomCount, maxCount );
1963    pex = walk = tr_new0( tr_pex, n );
1964
1965    for( i=0; i<atomCount && count<n; ++i )
1966    {
1967        const struct peer_atom * atom = atoms[i];
1968        if( atom->addr.type == af )
1969        {
1970            assert( tr_isAddress( &atom->addr ) );
1971            walk->addr = atom->addr;
1972            walk->port = atom->port;
1973            walk->flags = atom->flags;
1974            ++count;
1975            ++walk;
1976        }
1977    }
1978
1979    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
1980
1981    assert( ( walk - pex ) == count );
1982    *setme_pex = pex;
1983
1984    /* cleanup */
1985    tr_free( atoms );
1986    managerUnlock( t->manager );
1987    return count;
1988}
1989
1990static void atomPulse      ( int, short, void * );
1991static void bandwidthPulse ( int, short, void * );
1992static void rechokePulse   ( int, short, void * );
1993static void reconnectPulse ( int, short, void * );
1994
1995static struct event *
1996createTimer( int msec, void (*callback)(int, short, void *), void * cbdata )
1997{
1998    struct event * timer = tr_new0( struct event, 1 );
1999    evtimer_set( timer, callback, cbdata );
2000    tr_timerAddMsec( timer, msec );
2001    return timer;
2002}
2003
2004static void
2005ensureMgrTimersExist( struct tr_peerMgr * m )
2006{
2007    if( m->atomTimer == NULL )
2008        m->atomTimer = createTimer( ATOM_PERIOD_MSEC, atomPulse, m );
2009
2010    if( m->bandwidthTimer == NULL )
2011        m->bandwidthTimer = createTimer( BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
2012
2013    if( m->rechokeTimer == NULL )
2014        m->rechokeTimer = createTimer( RECHOKE_PERIOD_MSEC, rechokePulse, m );
2015
2016   if( m->refillUpkeepTimer == NULL )
2017        m->refillUpkeepTimer = createTimer( REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
2018}
2019
2020void
2021tr_peerMgrStartTorrent( tr_torrent * tor )
2022{
2023    Torrent * t = tor->torrentPeers;
2024
2025    assert( t != NULL );
2026    managerLock( t->manager );
2027    ensureMgrTimersExist( t->manager );
2028
2029    t->isRunning = TRUE;
2030
2031    rechokePulse( 0, 0, t->manager );
2032    managerUnlock( t->manager );
2033}
2034
2035static void
2036stopTorrent( Torrent * t )
2037{
2038    int i, n;
2039
2040    assert( torrentIsLocked( t ) );
2041
2042    t->isRunning = FALSE;
2043
2044    /* disconnect the peers. */
2045    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
2046        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
2047    tr_ptrArrayClear( &t->peers );
2048
2049    /* disconnect the handshakes.  handshakeAbort calls handshakeDoneCB(),
2050     * which removes the handshake from t->outgoingHandshakes... */
2051    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2052        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2053}
2054
2055void
2056tr_peerMgrStopTorrent( tr_torrent * tor )
2057{
2058    Torrent * t = tor->torrentPeers;
2059
2060    managerLock( t->manager );
2061
2062    stopTorrent( t );
2063
2064    managerUnlock( t->manager );
2065}
2066
2067void
2068tr_peerMgrAddTorrent( tr_peerMgr * manager,
2069                      tr_torrent * tor )
2070{
2071    managerLock( manager );
2072
2073    assert( tor );
2074    assert( tor->torrentPeers == NULL );
2075
2076    tor->torrentPeers = torrentConstructor( manager, tor );
2077
2078    managerUnlock( manager );
2079}
2080
2081void
2082tr_peerMgrRemoveTorrent( tr_torrent * tor )
2083{
2084    tr_torrentLock( tor );
2085
2086    stopTorrent( tor->torrentPeers );
2087    torrentDestructor( tor->torrentPeers );
2088
2089    tr_torrentUnlock( tor );
2090}
2091
2092void
2093tr_peerMgrTorrentAvailability( const tr_torrent * tor,
2094                               int8_t           * tab,
2095                               unsigned int       tabCount )
2096{
2097    tr_piece_index_t   i;
2098    const Torrent *    t;
2099    float              interval;
2100    tr_bool            isSeed;
2101    int                peerCount;
2102    const tr_peer **   peers;
2103    tr_torrentLock( tor );
2104
2105    t = tor->torrentPeers;
2106    tor = t->tor;
2107    interval = tor->info.pieceCount / (float)tabCount;
2108    isSeed = tor && ( tr_cpGetStatus ( &tor->completion ) == TR_SEED );
2109    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2110    peerCount = tr_ptrArraySize( &t->peers );
2111
2112    memset( tab, 0, tabCount );
2113
2114    for( i = 0; tor && i < tabCount; ++i )
2115    {
2116        const int piece = i * interval;
2117
2118        if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2119            tab[i] = -1;
2120        else if( peerCount ) {
2121            int j;
2122            for( j = 0; j < peerCount; ++j )
2123                if( tr_bitsetHas( &peers[j]->have, i ) )
2124                    ++tab[i];
2125        }
2126    }
2127
2128    tr_torrentUnlock( tor );
2129}
2130
2131/* Returns the pieces that are available from peers */
2132tr_bitfield*
2133tr_peerMgrGetAvailable( const tr_torrent * tor )
2134{
2135    int i;
2136    int peerCount;
2137    Torrent * t = tor->torrentPeers;
2138    const tr_peer ** peers;
2139    tr_bitfield * pieces;
2140    managerLock( t->manager );
2141
2142    pieces = tr_bitfieldNew( t->tor->info.pieceCount );
2143    peerCount = tr_ptrArraySize( &t->peers );
2144    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2145    for( i=0; i<peerCount; ++i )
2146        tr_bitsetOr( pieces, &peers[i]->have );
2147
2148    managerUnlock( t->manager );
2149    return pieces;
2150}
2151
2152void
2153tr_peerMgrTorrentStats( tr_torrent       * tor,
2154                        int              * setmePeersKnown,
2155                        int              * setmePeersConnected,
2156                        int              * setmeSeedsConnected,
2157                        int              * setmeWebseedsSendingToUs,
2158                        int              * setmePeersSendingToUs,
2159                        int              * setmePeersGettingFromUs,
2160                        int              * setmePeersFrom )
2161{
2162    int i, size;
2163    const Torrent * t = tor->torrentPeers;
2164    const tr_peer ** peers;
2165    const tr_webseed ** webseeds;
2166
2167    managerLock( t->manager );
2168
2169    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2170    size = tr_ptrArraySize( &t->peers );
2171
2172    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
2173    *setmePeersConnected       = 0;
2174    *setmeSeedsConnected       = 0;
2175    *setmePeersGettingFromUs   = 0;
2176    *setmePeersSendingToUs     = 0;
2177    *setmeWebseedsSendingToUs  = 0;
2178
2179    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2180        setmePeersFrom[i] = 0;
2181
2182    for( i=0; i<size; ++i )
2183    {
2184        const tr_peer * peer = peers[i];
2185        const struct peer_atom * atom = peer->atom;
2186
2187        if( peer->io == NULL ) /* not connected */
2188            continue;
2189
2190        ++*setmePeersConnected;
2191
2192        ++setmePeersFrom[atom->from];
2193
2194        if( clientIsDownloadingFrom( tor, peer ) )
2195            ++*setmePeersSendingToUs;
2196
2197        if( clientIsUploadingTo( peer ) )
2198            ++*setmePeersGettingFromUs;
2199
2200        if( atomIsSeed( atom ) )
2201            ++*setmeSeedsConnected;
2202    }
2203
2204    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2205    size = tr_ptrArraySize( &t->webseeds );
2206    for( i=0; i<size; ++i )
2207        if( tr_webseedIsActive( webseeds[i] ) )
2208            ++*setmeWebseedsSendingToUs;
2209
2210    managerUnlock( t->manager );
2211}
2212
2213float
2214tr_peerMgrGetWebseedSpeed( const tr_torrent * tor, uint64_t now )
2215{
2216    int i;
2217    float tmp;
2218    float ret = 0;
2219
2220    const Torrent * t = tor->torrentPeers;
2221    const int n = tr_ptrArraySize( &t->webseeds );
2222    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2223
2224    for( i=0; i<n; ++i )
2225        if( tr_webseedGetSpeed( webseeds[i], now, &tmp ) )
2226            ret += tmp;
2227
2228    return ret;
2229}
2230
2231
2232float*
2233tr_peerMgrWebSpeeds( const tr_torrent * tor )
2234{
2235    const Torrent * t = tor->torrentPeers;
2236    const tr_webseed ** webseeds;
2237    int i;
2238    int webseedCount;
2239    float * ret;
2240    uint64_t now;
2241
2242    assert( t->manager );
2243    managerLock( t->manager );
2244
2245    webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2246    webseedCount = tr_ptrArraySize( &t->webseeds );
2247    assert( webseedCount == tor->info.webseedCount );
2248    ret = tr_new0( float, webseedCount );
2249    now = tr_date( );
2250
2251    for( i=0; i<webseedCount; ++i )
2252        if( !tr_webseedGetSpeed( webseeds[i], now, &ret[i] ) )
2253            ret[i] = -1.0;
2254
2255    managerUnlock( t->manager );
2256    return ret;
2257}
2258
2259double
2260tr_peerGetPieceSpeed( const tr_peer * peer, uint64_t now, tr_direction direction )
2261{
2262    return peer->io ? tr_peerIoGetPieceSpeed( peer->io, now, direction ) : 0.0;
2263}
2264
2265
2266struct tr_peer_stat *
2267tr_peerMgrPeerStats( const tr_torrent    * tor,
2268                     int                 * setmeCount )
2269{
2270    int i, size;
2271    const Torrent * t = tor->torrentPeers;
2272    const tr_peer ** peers;
2273    tr_peer_stat * ret;
2274    uint64_t now;
2275
2276    assert( t->manager );
2277    managerLock( t->manager );
2278
2279    size = tr_ptrArraySize( &t->peers );
2280    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2281    ret = tr_new0( tr_peer_stat, size );
2282    now = tr_date( );
2283
2284    for( i=0; i<size; ++i )
2285    {
2286        char *                   pch;
2287        const tr_peer *          peer = peers[i];
2288        const struct peer_atom * atom = peer->atom;
2289        tr_peer_stat *           stat = ret + i;
2290
2291        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2292        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2293                   sizeof( stat->client ) );
2294        stat->port                = ntohs( peer->atom->port );
2295        stat->from                = atom->from;
2296        stat->progress            = peer->progress;
2297        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2298        stat->rateToPeer          = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER );
2299        stat->rateToClient        = tr_peerGetPieceSpeed( peer, now, TR_PEER_TO_CLIENT );
2300        stat->peerIsChoked        = peer->peerIsChoked;
2301        stat->peerIsInterested    = peer->peerIsInterested;
2302        stat->clientIsChoked      = peer->clientIsChoked;
2303        stat->clientIsInterested  = peer->clientIsInterested;
2304        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2305        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2306        stat->isUploadingTo       = clientIsUploadingTo( peer );
2307        stat->isSeed              = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
2308
2309        stat->blocksToPeer        = tr_historyGet( peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC*1000 );
2310        stat->blocksToClient      = tr_historyGet( peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC*1000 );
2311        stat->cancelsToPeer       = tr_historyGet( peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC*1000 );
2312        stat->cancelsToClient     = tr_historyGet( peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC*1000 );
2313
2314        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2315        stat->pendingReqsToClient = peer->pendingReqsToClient;
2316
2317        pch = stat->flagStr;
2318        if( t->optimistic == peer ) *pch++ = 'O';
2319        if( stat->isDownloadingFrom ) *pch++ = 'D';
2320        else if( stat->clientIsInterested ) *pch++ = 'd';
2321        if( stat->isUploadingTo ) *pch++ = 'U';
2322        else if( stat->peerIsInterested ) *pch++ = 'u';
2323        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2324        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2325        if( stat->isEncrypted ) *pch++ = 'E';
2326        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2327        if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2328        if( stat->isIncoming ) *pch++ = 'I';
2329        *pch = '\0';
2330    }
2331
2332    *setmeCount = size;
2333
2334    managerUnlock( t->manager );
2335    return ret;
2336}
2337
2338/**
2339***
2340**/
2341
2342/* do we still want this piece and does the peer have it? */
2343static tr_bool
2344isPieceInteresting( const tr_torrent * tor, const tr_peer * peer, tr_piece_index_t index )
2345{
2346    return ( !tor->info.pieces[index].dnd ) /* we want it */
2347        && ( !tr_cpPieceIsComplete( &tor->completion, index ) )  /* we don't have it */
2348        && ( tr_bitsetHas( &peer->have, index ) ); /* peer has it */
2349}
2350
2351/* does this peer have any pieces that we want? */
2352static tr_bool
2353isPeerInteresting( const tr_torrent * tor, const tr_peer * peer )
2354{
2355    tr_piece_index_t i, n;
2356
2357    if ( tr_torrentIsSeed( tor ) )
2358        return FALSE;
2359
2360    if( !tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) )
2361        return FALSE;
2362
2363    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2364        if( isPieceInteresting( tor, peer, i ) )
2365            return TRUE;
2366
2367    return FALSE;
2368}
2369
2370/* determines who we send "interested" messages to */
2371static void
2372rechokeDownloads( Torrent * t )
2373{
2374    int i;
2375    const uint64_t now = tr_date( );
2376    const int msec = 60 * 1000;
2377    const int MIN_INTERESTING_PEERS = 5;
2378    const int peerCount = tr_ptrArraySize( &t->peers );
2379    int maxPeers;
2380
2381    int badCount         = 0;
2382    int goodCount        = 0;
2383    int untestedCount    = 0;
2384    tr_peer ** bad       = tr_new( tr_peer*, peerCount );
2385    tr_peer ** good      = tr_new( tr_peer*, peerCount );
2386    tr_peer ** untested  = tr_new( tr_peer*, peerCount );
2387
2388    /* decide how many peers to be interested in */
2389    {
2390        int blocks = 0;
2391        int cancels = 0;
2392
2393        /* Count up how many blocks & cancels each peer has.
2394         *
2395         * There are two situations where we send out cancels --
2396         *
2397         * 1. We've got unresponsive peers, which is handled by deciding
2398         *    -which- peers to be interested in.
2399         *
2400         * 2. We've hit our bandwidth cap, which is handled by deciding
2401         *    -how many- peers to be interested in.
2402         *
2403         * We're working on 2. here, so we need to ignore unresponsive
2404         * peers in our calculations lest they confuse Transmission into
2405         * thinking it's hit its bandwidth cap.
2406         */
2407        for( i=0; i<peerCount; ++i )
2408        {
2409            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2410            const int b = tr_historyGet( peer->blocksSentToClient, now, msec );
2411            const int c = tr_historyGet( peer->cancelsSentToPeer, now, msec );
2412
2413            if( b == 0 ) /* ignore unresponsive peers, as described above */
2414                continue;
2415
2416            blocks += b;
2417            cancels += c;
2418        }
2419
2420        if( !t->interestedCount )
2421        {
2422            /* this is the torrent's first time to call this function...
2423             * start off optimistically by allowing interest in many peers */
2424            maxPeers = t->tor->maxConnectedPeers;
2425        }
2426        else if( !blocks )
2427        {
2428            /* we've gotten cancels but zero blocks...
2429             * something is seriously wrong.  throttle back sharply */
2430            maxPeers = t->interestedCount * 0.5;
2431        }
2432        else
2433        {
2434            const double cancelRate = cancels / (double)(cancels + blocks);
2435                 if( cancelRate >= 0.20 ) maxPeers = t->interestedCount * 0.7;
2436            else if( cancelRate >= 0.10 ) maxPeers = t->interestedCount * 0.8;
2437            else if( cancelRate >= 0.05 ) maxPeers = t->interestedCount * 0.9;
2438            else if( cancelRate >= 0.01 ) maxPeers = t->interestedCount;
2439            else                          maxPeers = t->interestedCount + 1;
2440
2441            /* if things are getting worse, don't add more peers */
2442            if( ( t->cancelRate > 0.01 ) && ( cancelRate > t->cancelRate ) )
2443                maxPeers = MIN( maxPeers, t->interestedCount );
2444
2445            t->cancelRate = cancelRate;
2446
2447            tordbg( t, "cancel rate is %.3f -- changing the "
2448                       "number of peers we're interested in from %d to %d",
2449                       cancelRate, t->interestedCount, maxPeers );
2450        }
2451    }
2452
2453    /* don't let the previous paragraph's number tweaking go too far... */
2454    if( maxPeers < MIN_INTERESTING_PEERS )
2455        maxPeers = MIN_INTERESTING_PEERS;
2456    if( maxPeers > t->tor->maxConnectedPeers )
2457        maxPeers = t->tor->maxConnectedPeers;
2458
2459    /* separate the peers into "good" (ones with a low cancel-to-block ratio),
2460     * untested peers, and "bad" (ones with a high cancel-to-block ratio).
2461     * That's the order in which we'll choose who to show interest in */
2462    for( i=0; i<peerCount; ++i )
2463    {
2464        tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2465
2466        if( !isPeerInteresting( t->tor, peer ) )
2467        {
2468            tr_peerMsgsSetInterested( peer->msgs, FALSE );
2469        }
2470        else
2471        {
2472            const int blocks = tr_historyGet( peer->blocksSentToClient, now, msec );
2473            const int cancels = tr_historyGet( peer->cancelsSentToPeer, now, msec );
2474
2475            if( !blocks && !cancels )
2476                untested[untestedCount++] = peer;
2477            else if( !cancels )
2478                good[goodCount++] = peer;
2479            else if( !blocks )
2480                bad[badCount++] = peer;
2481            else if( ( cancels * 10 ) < blocks )
2482                good[goodCount++] = peer;
2483            else
2484                bad[badCount++] = peer;
2485        }
2486    }
2487
2488    t->interestedCount = 0;
2489
2490    /* We've decided (1) how many peers to be interested in,
2491     * and (2) which peers are the best candidates,
2492     * Now it's time to update our `interest' flags. */
2493    for( i=0; i<goodCount; ++i ) {
2494        const tr_bool b = t->interestedCount < maxPeers;
2495        tr_peerMsgsSetInterested( good[i]->msgs, b );
2496        if( b )
2497            ++t->interestedCount;
2498    }
2499    for( i=0; i<untestedCount; ++i ) {
2500        const tr_bool b = t->interestedCount < maxPeers;
2501        tr_peerMsgsSetInterested( untested[i]->msgs, b );
2502        if( b )
2503            ++t->interestedCount;
2504    }
2505    for( i=0; i<badCount; ++i ) {
2506        const tr_bool b = t->interestedCount < maxPeers;
2507        tr_peerMsgsSetInterested( bad[i]->msgs, b );
2508        if( b )
2509            ++t->interestedCount;
2510    }
2511
2512/*fprintf( stderr, "num interested: %d\n", t->interestedCount );*/
2513
2514    /* cleanup */
2515    tr_free( untested );
2516    tr_free( good );
2517    tr_free( bad );
2518}
2519
2520/**
2521***
2522**/
2523
2524struct ChokeData
2525{
2526    tr_bool         doUnchoke;
2527    tr_bool         isInterested;
2528    tr_bool         isChoked;
2529    int             rate;
2530    tr_peer *       peer;
2531};
2532
2533static int
2534compareChoke( const void * va,
2535              const void * vb )
2536{
2537    const struct ChokeData * a = va;
2538    const struct ChokeData * b = vb;
2539
2540    if( a->rate != b->rate ) /* prefer higher overall speeds */
2541        return a->rate > b->rate ? -1 : 1;
2542
2543    if( a->isChoked != b->isChoked ) /* prefer unchoked */
2544        return a->isChoked ? 1 : -1;
2545
2546    return 0;
2547}
2548
2549/* is this a new connection? */
2550static int
2551isNew( const tr_peer * peer )
2552{
2553    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2554}
2555
2556static void
2557rechokeUploads( Torrent * t, const uint64_t now )
2558{
2559    int i, size, unchokedInterested;
2560    const int peerCount = tr_ptrArraySize( &t->peers );
2561    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2562    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2563    const tr_session * session = t->manager->session;
2564    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
2565
2566    assert( torrentIsLocked( t ) );
2567
2568    /* sort the peers by preference and rate */
2569    for( i = 0, size = 0; i < peerCount; ++i )
2570    {
2571        tr_peer * peer = peers[i];
2572        struct peer_atom * atom = peer->atom;
2573
2574        if( peer->progress >= 1.0 ) /* choke all seeds */
2575        {
2576            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2577        }
2578        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2579        {
2580            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2581        }
2582        else if( chokeAll ) /* choke everyone if we're not uploading */
2583        {
2584            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2585        }
2586        else
2587        {
2588            struct ChokeData * n = &choke[size++];
2589            n->peer         = peer;
2590            n->isInterested = peer->peerIsInterested;
2591            n->isChoked     = peer->peerIsChoked;
2592            n->rate         = tr_peerGetPieceSpeed( peer, now, TR_CLIENT_TO_PEER ) * 1024;
2593        }
2594    }
2595
2596    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
2597
2598    /**
2599     * Reciprocation and number of uploads capping is managed by unchoking
2600     * the N peers which have the best upload rate and are interested.
2601     * This maximizes the client's download rate. These N peers are
2602     * referred to as downloaders, because they are interested in downloading
2603     * from the client.
2604     *
2605     * Peers which have a better upload rate (as compared to the downloaders)
2606     * but aren't interested get unchoked. If they become interested, the
2607     * downloader with the worst upload rate gets choked. If a client has
2608     * a complete file, it uses its upload rate rather than its download
2609     * rate to decide which peers to unchoke.
2610     */
2611    unchokedInterested = 0;
2612    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
2613        choke[i].doUnchoke = 1;
2614        if( choke[i].isInterested )
2615            ++unchokedInterested;
2616    }
2617
2618    /* optimistic unchoke */
2619    if( i < size )
2620    {
2621        int n;
2622        struct ChokeData * c;
2623        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
2624
2625        for( ; i<size; ++i )
2626        {
2627            if( choke[i].isInterested )
2628            {
2629                const tr_peer * peer = choke[i].peer;
2630                int x = 1, y;
2631                if( isNew( peer ) ) x *= 3;
2632                for( y=0; y<x; ++y )
2633                    tr_ptrArrayAppend( &randPool, &choke[i] );
2634            }
2635        }
2636
2637        if(( n = tr_ptrArraySize( &randPool )))
2638        {
2639            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
2640            c->doUnchoke = 1;
2641            t->optimistic = c->peer;
2642        }
2643
2644        tr_ptrArrayDestruct( &randPool, NULL );
2645    }
2646
2647    for( i=0; i<size; ++i )
2648        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
2649
2650    /* cleanup */
2651    tr_free( choke );
2652}
2653
2654static void
2655rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
2656{
2657    uint64_t now;
2658    tr_torrent * tor = NULL;
2659    tr_peerMgr * mgr = vmgr;
2660    managerLock( mgr );
2661
2662    now = tr_date( );
2663    while(( tor = tr_torrentNext( mgr->session, tor ))) {
2664        if( tor->isRunning ) {
2665            rechokeUploads( tor->torrentPeers, now );
2666            if( !tr_torrentIsSeed( tor ) )
2667                rechokeDownloads( tor->torrentPeers );
2668        }
2669    }
2670
2671    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
2672    managerUnlock( mgr );
2673}
2674
2675/***
2676****
2677****  Life and Death
2678****
2679***/
2680
2681typedef enum
2682{
2683    TR_CAN_KEEP,
2684    TR_CAN_CLOSE,
2685    TR_MUST_CLOSE,
2686}
2687tr_close_type_t;
2688
2689static tr_close_type_t
2690shouldPeerBeClosed( const Torrent    * t,
2691                    const tr_peer    * peer,
2692                    int                peerCount,
2693                    const time_t       now )
2694{
2695    const tr_torrent *       tor = t->tor;
2696    const struct peer_atom * atom = peer->atom;
2697
2698    /* if it's marked for purging, close it */
2699    if( peer->doPurge )
2700    {
2701        tordbg( t, "purging peer %s because its doPurge flag is set",
2702                tr_atomAddrStr( atom ) );
2703        return TR_MUST_CLOSE;
2704    }
2705
2706    /* if we're seeding and the peer has everything we have,
2707     * and enough time has passed for a pex exchange, then disconnect */
2708    if( tr_torrentIsSeed( tor ) )
2709    {
2710        tr_bool peerHasEverything;
2711
2712        if( atom->seedProbability != -1 )
2713        {
2714            peerHasEverything = atomIsSeed( atom );
2715        }
2716        else
2717        {
2718            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
2719            tr_bitsetDifference( tmp, &peer->have );
2720            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
2721            tr_bitfieldFree( tmp );
2722        }
2723
2724        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
2725        {
2726            tordbg( t, "purging peer %s because we're both seeds",
2727                    tr_atomAddrStr( atom ) );
2728            return TR_MUST_CLOSE;
2729        }
2730    }
2731
2732    /* disconnect if it's been too long since piece data has been transferred.
2733     * this is on a sliding scale based on number of available peers... */
2734    {
2735        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
2736        /* if we have >= relaxIfFewerThan, strictness is 100%.
2737         * if we have zero connections, strictness is 0% */
2738        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
2739                               ? 1.0
2740                               : peerCount / (float)relaxStrictnessIfFewerThanN;
2741        const int lo = MIN_UPLOAD_IDLE_SECS;
2742        const int hi = MAX_UPLOAD_IDLE_SECS;
2743        const int limit = hi - ( ( hi - lo ) * strictness );
2744        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
2745/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
2746        if( idleTime > limit ) {
2747            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
2748                       tr_atomAddrStr( atom ), idleTime );
2749            return TR_CAN_CLOSE;
2750        }
2751    }
2752
2753    return TR_CAN_KEEP;
2754}
2755
2756static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
2757
2758static tr_peer **
2759getPeersToClose( Torrent * t, tr_close_type_t closeType, const time_t now, int * setmeSize )
2760{
2761    int i, peerCount, outsize;
2762    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
2763    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
2764
2765    assert( torrentIsLocked( t ) );
2766
2767    for( i = outsize = 0; i < peerCount; ++i )
2768        if( shouldPeerBeClosed( t, peers[i], peerCount, now ) == closeType )
2769            ret[outsize++] = peers[i];
2770
2771    sortPeersByLivelinessReverse ( ret, NULL, outsize, tr_date( ) );
2772
2773    *setmeSize = outsize;
2774    return ret;
2775}
2776
2777static int
2778getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
2779{
2780    int sec;
2781
2782    /* if we were recently connected to this peer and transferring piece
2783     * data, try to reconnect to them sooner rather that later -- we don't
2784     * want network troubles to get in the way of a good peer. */
2785    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
2786        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2787
2788    /* don't allow reconnects more often than our minimum */
2789    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
2790        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
2791
2792    /* otherwise, the interval depends on how many times we've tried
2793     * and failed to connect to the peer */
2794    else switch( atom->numFails ) {
2795        case 0: sec = 0; break;
2796        case 1: sec = 5; break;
2797        case 2: sec = 2 * 60; break;
2798        case 3: sec = 15 * 60; break;
2799        case 4: sec = 30 * 60; break;
2800        case 5: sec = 60 * 60; break;
2801        default: sec = 120 * 60; break;
2802    }
2803
2804    /* penalize peers that were unreachable the last time we tried */
2805    if( atom->myflags & MYFLAG_UNREACHABLE )
2806        sec += sec;
2807
2808    dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
2809    return sec;
2810}
2811
2812static void
2813closePeer( Torrent * t, tr_peer * peer )
2814{
2815    struct peer_atom * atom;
2816
2817    assert( t != NULL );
2818    assert( peer != NULL );
2819
2820    atom = peer->atom;
2821
2822    /* if we transferred piece data, then they might be good peers,
2823       so reset their `numFails' weight to zero.  otherwise we connected
2824       to them fruitlessly, so mark it as another fail */
2825    if( atom->piece_data_time ) {
2826        tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
2827        atom->numFails = 0;
2828    } else {
2829        ++atom->numFails;
2830        tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
2831    }
2832
2833    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
2834    removePeer( t, peer );
2835}
2836
2837static void
2838closeBadPeers( Torrent * t )
2839{
2840    const time_t  now = tr_time( );
2841
2842    if( !t->isRunning )
2843    {
2844        removeAllPeers( t );
2845    }
2846    else
2847    {
2848        int i;
2849        int mustCloseCount;
2850        struct tr_peer ** mustClose;
2851
2852        /* disconnect the really bad peers */
2853        mustClose = getPeersToClose( t, TR_MUST_CLOSE, now, &mustCloseCount );
2854        for( i=0; i<mustCloseCount; ++i )
2855            closePeer( t, mustClose[i] );
2856        tr_free( mustClose );
2857    }
2858}
2859
2860struct peer_liveliness
2861{
2862    tr_peer * peer;
2863    void * clientData;
2864    time_t pieceDataTime;
2865    time_t time;
2866    int speed;
2867    tr_bool doPurge;
2868};
2869
2870static int
2871comparePeerLiveliness( const void * va, const void * vb )
2872{
2873    const struct peer_liveliness * a = va;
2874    const struct peer_liveliness * b = vb;
2875
2876    if( a->doPurge != b->doPurge )
2877        return a->doPurge ? 1 : -1;
2878
2879    if( a->speed != b->speed ) /* faster goes first */
2880        return a->speed > b->speed ? -1 : 1;
2881
2882    /* the one to give us data more recently goes first */
2883    if( a->pieceDataTime != b->pieceDataTime )
2884        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
2885
2886    /* the one we connected to most recently goes first */
2887    if( a->time != b->time )
2888        return a->time > b->time ? -1 : 1;
2889
2890    return 0;
2891}
2892
2893static int
2894comparePeerLivelinessReverse( const void * va, const void * vb )
2895{
2896    return -comparePeerLiveliness (va, vb);
2897}
2898
2899static void
2900sortPeersByLivelinessImpl( tr_peer  ** peers,
2901                           void     ** clientData,
2902                           int         n,
2903                           uint64_t    now,
2904                           int (*compare) ( const void *va, const void *vb ) )
2905{
2906    int i;
2907    struct peer_liveliness *lives, *l;
2908
2909    /* build a sortable array of peer + extra info */
2910    lives = l = tr_new0( struct peer_liveliness, n );
2911    for( i=0; i<n; ++i, ++l )
2912    {
2913        tr_peer * p = peers[i];
2914        l->peer = p;
2915        l->doPurge = p->doPurge;
2916        l->pieceDataTime = p->atom->piece_data_time;
2917        l->time = p->atom->time;
2918        l->speed = 1024.0 * (   tr_peerGetPieceSpeed( p, now, TR_UP )
2919                              + tr_peerGetPieceSpeed( p, now, TR_DOWN ) );
2920        if( clientData )
2921            l->clientData = clientData[i];
2922    }
2923
2924    /* sort 'em */
2925    assert( n == ( l - lives ) );
2926    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
2927
2928    /* build the peer array */
2929    for( i=0, l=lives; i<n; ++i, ++l ) {
2930        peers[i] = l->peer;
2931        if( clientData )
2932            clientData[i] = l->clientData;
2933    }
2934    assert( n == ( l - lives ) );
2935
2936    /* cleanup */
2937    tr_free( lives );
2938}
2939
2940static void
2941sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2942{
2943    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
2944}
2945
2946static void
2947sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
2948{
2949    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
2950}
2951
2952
2953static void
2954enforceTorrentPeerLimit( Torrent * t, uint64_t now )
2955{
2956    int n = tr_ptrArraySize( &t->peers );
2957    const int max = tr_torrentGetPeerLimit( t->tor );
2958    if( n > max )
2959    {
2960        void * base = tr_ptrArrayBase( &t->peers );
2961        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
2962        sortPeersByLiveliness( peers, NULL, n, now );
2963        while( n > max )
2964            closePeer( t, peers[--n] );
2965        tr_free( peers );
2966    }
2967}
2968
2969static void
2970enforceSessionPeerLimit( tr_session * session, uint64_t now )
2971{
2972    int n = 0;
2973    tr_torrent * tor = NULL;
2974    const int max = tr_sessionGetPeerLimit( session );
2975
2976    /* count the total number of peers */
2977    while(( tor = tr_torrentNext( session, tor )))
2978        n += tr_ptrArraySize( &tor->torrentPeers->peers );
2979
2980    /* if there are too many, prune out the worst */
2981    if( n > max )
2982    {
2983        tr_peer ** peers = tr_new( tr_peer*, n );
2984        Torrent ** torrents = tr_new( Torrent*, n );
2985
2986        /* populate the peer array */
2987        n = 0;
2988        tor = NULL;
2989        while(( tor = tr_torrentNext( session, tor ))) {
2990            int i;
2991            Torrent * t = tor->torrentPeers;
2992            const int tn = tr_ptrArraySize( &t->peers );
2993            for( i=0; i<tn; ++i, ++n ) {
2994                peers[n] = tr_ptrArrayNth( &t->peers, i );
2995                torrents[n] = t;
2996            }
2997        }
2998
2999        /* sort 'em */
3000        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3001
3002        /* cull out the crappiest */
3003        while( n-- > max )
3004            closePeer( torrents[n], peers[n] );
3005
3006        /* cleanup */
3007        tr_free( torrents );
3008        tr_free( peers );
3009    }
3010}
3011
3012static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
3013
3014static void
3015reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3016{
3017    tr_torrent * tor;
3018    tr_peerMgr * mgr = vmgr;
3019    const uint64_t now = tr_date( );
3020
3021    /**
3022    ***  enforce the per-session and per-torrent peer limits
3023    **/
3024
3025    /* if we're over the per-torrent peer limits, cull some peers */
3026    tor = NULL;
3027    while(( tor = tr_torrentNext( mgr->session, tor )))
3028        if( tor->isRunning )
3029            enforceTorrentPeerLimit( tor->torrentPeers, now );
3030
3031    /* if we're over the per-session peer limits, cull some peers */
3032    enforceSessionPeerLimit( mgr->session, now );
3033
3034    /* remove crappy peers */
3035    tor = NULL;
3036    while(( tor = tr_torrentNext( mgr->session, tor )))
3037        closeBadPeers( tor->torrentPeers );
3038
3039    /* try to make new peer connections */
3040    makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
3041}
3042
3043/****
3044*****
3045*****  BANDWIDTH ALLOCATION
3046*****
3047****/
3048
3049static void
3050pumpAllPeers( tr_peerMgr * mgr )
3051{
3052    tr_torrent * tor = NULL;
3053
3054    while(( tor = tr_torrentNext( mgr->session, tor )))
3055    {
3056        int j;
3057        Torrent * t = tor->torrentPeers;
3058
3059        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3060        {
3061            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3062            tr_peerMsgsPulse( peer->msgs );
3063        }
3064    }
3065}
3066
3067static void
3068bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3069{
3070    tr_torrent * tor;
3071    tr_peerMgr * mgr = vmgr;
3072    managerLock( mgr );
3073
3074    /* FIXME: this next line probably isn't necessary... */
3075    pumpAllPeers( mgr );
3076
3077    /* allocate bandwidth to the peers */
3078    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3079    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3080
3081    /* possibly stop torrents that have seeded enough */
3082    tor = NULL;
3083    while(( tor = tr_torrentNext( mgr->session, tor )))
3084        tr_torrentCheckSeedRatio( tor );
3085
3086    /* run the completeness check for any torrents that need it */
3087    tor = NULL;
3088    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3089        if( tor->torrentPeers->needsCompletenessCheck ) {
3090            tor->torrentPeers->needsCompletenessCheck  = FALSE;
3091            tr_torrentRecheckCompleteness( tor );
3092        }
3093    }
3094
3095    /* possibly stop torrents that have an error */
3096    tor = NULL;
3097    while(( tor = tr_torrentNext( mgr->session, tor )))
3098        if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR ))
3099            tr_torrentStop( tor );
3100
3101    reconnectPulse( 0, 0, mgr );
3102
3103    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3104    managerUnlock( mgr );
3105}
3106
3107/***
3108****
3109***/
3110
3111static int
3112compareAtomPtrsByAddress( const void * va, const void *vb )
3113{
3114    const struct peer_atom * a = * (const struct peer_atom**) va;
3115    const struct peer_atom * b = * (const struct peer_atom**) vb;
3116
3117    assert( tr_isAtom( a ) );
3118    assert( tr_isAtom( b ) );
3119
3120    return tr_compareAddresses( &a->addr, &b->addr );
3121}
3122
3123/* best come first, worst go last */
3124static int
3125compareAtomPtrsByShelfDate( const void * va, const void *vb )
3126{
3127    time_t atime;
3128    time_t btime;
3129    const struct peer_atom * a = * (const struct peer_atom**) va;
3130    const struct peer_atom * b = * (const struct peer_atom**) vb;
3131    const int data_time_cutoff_secs = 60 * 60;
3132    const time_t tr_now = tr_time( );
3133
3134    assert( tr_isAtom( a ) );
3135    assert( tr_isAtom( b ) );
3136
3137    /* primary key: the last piece data time *if* it was within the last hour */
3138    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3139    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3140    if( atime != btime )
3141        return atime > btime ? -1 : 1;
3142
3143    /* secondary key: shelf date. */
3144    if( a->shelf_date != b->shelf_date )
3145        return a->shelf_date > b->shelf_date ? -1 : 1;
3146
3147    return 0;
3148}
3149
3150static int
3151getMaxAtomCount( const tr_torrent * tor )
3152{
3153    /* FIXME: this curve should be smoother... */
3154    const int n = tor->maxConnectedPeers;
3155    if( n >= 200 ) return n * 1.5;
3156    if( n >= 100 ) return n * 2;
3157    if( n >=  50 ) return n * 3;
3158    if( n >=  20 ) return n * 5;
3159    return n * 10;
3160}
3161
3162static void
3163atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3164{
3165    tr_torrent * tor = NULL;
3166    tr_peerMgr * mgr = vmgr;
3167    managerLock( mgr );
3168
3169    while(( tor = tr_torrentNext( mgr->session, tor )))
3170    {
3171        int atomCount;
3172        Torrent * t = tor->torrentPeers;
3173        const int maxAtomCount = getMaxAtomCount( tor );
3174        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3175
3176        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3177        {
3178            int i;
3179            int keepCount = 0;
3180            int testCount = 0;
3181            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3182            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3183
3184            /* keep the ones that are in use */
3185            for( i=0; i<atomCount; ++i ) {
3186                struct peer_atom * atom = atoms[i];
3187                if( peerIsInUse( t, atom ) )
3188                    keep[keepCount++] = atom;
3189                else
3190                    test[testCount++] = atom;
3191            }
3192
3193            /* if there's room, keep the best of what's left */
3194            i = 0;
3195            if( keepCount < maxAtomCount ) {
3196                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3197                while( i<testCount && keepCount<maxAtomCount )
3198                    keep[keepCount++] = test[i++];
3199            }
3200
3201            /* free the culled atoms */
3202            while( i<testCount )
3203                tr_free( test[i++] );
3204
3205            /* rebuild Torrent.pool with what's left */
3206            tr_ptrArrayDestruct( &t->pool, NULL );
3207            t->pool = TR_PTR_ARRAY_INIT;
3208            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3209            for( i=0; i<keepCount; ++i )
3210                tr_ptrArrayAppend( &t->pool, keep[i] );
3211
3212            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3213
3214            /* cleanup */
3215            tr_free( test );
3216            tr_free( keep );
3217        }
3218    }
3219
3220    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3221    managerUnlock( mgr );
3222}
3223
3224/***
3225****
3226****
3227****
3228***/
3229
3230static inline tr_bool
3231isBandwidthMaxedOut( const tr_bandwidth * b,
3232                     const uint64_t now_msec, tr_direction dir )
3233{
3234    if( !tr_bandwidthIsLimited( b, dir ) )
3235        return FALSE;
3236    else {
3237        const double got = tr_bandwidthGetPieceSpeed( b, now_msec, dir );
3238        const double want = tr_bandwidthGetDesiredSpeed( b, dir );
3239        return got >= want;
3240    }
3241}
3242
3243/* is this atom someone that we'd want to initiate a connection to? */
3244static tr_bool
3245isPeerCandidate( const tr_torrent * tor, const struct peer_atom * atom, const time_t now )
3246{
3247    /* not if we've already got a connection to them... */
3248    if( peerIsInUse( tor->torrentPeers, atom ) )
3249        return FALSE;
3250
3251    /* not if they're banned... */
3252    if( atom->myflags & MYFLAG_BANNED )
3253        return FALSE;
3254
3255    /* not if we just tried them already */
3256    if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
3257        return FALSE;
3258
3259    /* not if we're both seeds */
3260    if( tr_torrentIsSeed( tor ) )
3261        if( atomIsSeed( atom ) || ( atom->uploadOnly == UPLOAD_ONLY_YES ) )
3262            return FALSE;
3263 
3264    /* not if they're blocklisted */
3265    /* FIXME: maybe we should remove this atom altogether? */
3266    if( tr_sessionIsAddressBlocked( tor->session, &atom->addr ) )
3267        return FALSE;
3268
3269    return TRUE;
3270}
3271
3272struct peer_candidate
3273{
3274    int salt;
3275    tr_torrent * tor;
3276    struct peer_atom * atom;
3277    tr_priority_t priority;
3278    tr_bool wasRecentlyStarted;
3279};
3280
3281static int
3282compareSeedProbabilities( int a, int b )
3283{
3284    /* 1. smaller numbers are better
3285       2. prefer leechers to unknown
3286       3. prefer unknown to seeds (FIXME: this is a simplistic test) */
3287    if( a == 100 ) a = 101;
3288    else if( a == -1 ) a = 100;
3289
3290    if( b == 100 ) b = 101;
3291    else if( b == -1 ) b = 100;
3292
3293    return a > b ? 1 : -1;
3294}
3295
3296static tr_bool
3297torrentWasRecentlyStarted( const tr_torrent * tor )
3298{
3299    return difftime( tr_time( ), tor->startDate ) < 120;
3300}
3301
3302/* sort an array of peer candidates */
3303static int
3304comparePeerCandidates( const void * va, const void * vb )
3305{
3306    int i;
3307    tr_bool af, bf;
3308    const struct peer_candidate * a = va;
3309    const struct peer_candidate * b = vb;
3310
3311    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3312    af = a->atom->lastConnectionAt < a->atom->lastConnectionAttemptAt;
3313    bf = b->atom->lastConnectionAt < b->atom->lastConnectionAttemptAt;
3314    if( af != bf )
3315        return af ? 1 : -1;
3316
3317    /* prefer the one we attempted least recently (to cycle through all peers) */
3318    if( a->atom->lastConnectionAttemptAt != b->atom->lastConnectionAttemptAt )
3319        return a->atom->lastConnectionAttemptAt < b->atom->lastConnectionAttemptAt ? -1 : 1;
3320
3321    /* prefer peers belonging to a torrent of a higher priority */
3322    if( a->priority != b->priority )
3323        return a->priority > b->priority ? -1 : 1;
3324
3325    /* prefer recently-started torrents */
3326    if( a->wasRecentlyStarted != b->wasRecentlyStarted )
3327        return a->wasRecentlyStarted ? -1 : 1;
3328
3329    /* prefer peers that we might have a chance of uploading to */
3330    if(( i = compareSeedProbabilities( a->atom->seedProbability, b->atom->seedProbability )))
3331        return i;
3332
3333    /* prefer peers that we got from more trusted sources */
3334    if( a->atom->from != b->atom->from )
3335        return a->atom->from < b->atom->from ? -1 : 1;
3336
3337    /* salt */
3338    return a->salt - b->salt;
3339}
3340
3341/** @return an array of all the atoms we might want to connect to */
3342static struct peer_candidate*
3343getPeerCandidates( tr_session * session, int * candidateCount )
3344{
3345    int n;
3346    tr_torrent * tor;
3347    struct peer_candidate * candidates;
3348    struct peer_candidate * walk;
3349    const time_t now = tr_time( );
3350    const uint64_t now_msec = tr_date( );
3351    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3352    const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95;
3353
3354    /* don't start any new handshakes if we're full up */
3355    n = 0;
3356    tor= NULL;
3357    while(( tor = tr_torrentNext( session, tor )))
3358        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3359    if( maxCandidates <= n ) {
3360        *candidateCount = 0;
3361        return NULL;
3362    }
3363
3364    /* allocate an array of candidates */
3365    n = 0;
3366    tor= NULL;
3367    while(( tor = tr_torrentNext( session, tor )))
3368        n += tr_ptrArraySize( &tor->torrentPeers->pool );
3369    walk = candidates = tr_new( struct peer_candidate, n );
3370
3371    /* populate the candidate array */
3372    tor = NULL;
3373    while(( tor = tr_torrentNext( session, tor )))
3374    {
3375        int i, nAtoms;
3376        struct peer_atom ** atoms;
3377
3378        if( !tor->torrentPeers->isRunning )
3379            continue;
3380
3381        /* if we've already got enough peers in this torrent... */
3382        if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
3383            continue;
3384
3385        /* if we've already got enough speed in this torrent... */
3386        if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( tor->bandwidth, now_msec, TR_UP ) )
3387            continue;
3388
3389        atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
3390        for( i=0; i<nAtoms; ++i )
3391        {
3392            struct peer_atom * atom = atoms[i];
3393
3394            if( isPeerCandidate( tor, atom, now ) )
3395            {
3396                walk->tor = tor;
3397                walk->atom = atom;
3398                walk->salt = tr_cryptoWeakRandInt( 4096 );
3399                walk->priority = tr_torrentGetPriority( tor );
3400                walk->wasRecentlyStarted = torrentWasRecentlyStarted( tor );
3401                ++walk;
3402            }
3403        }
3404    }
3405
3406    *candidateCount = walk - candidates;
3407    if( *candidateCount > 1 )
3408        qsort( candidates, *candidateCount, sizeof( struct peer_candidate ), comparePeerCandidates );
3409    return candidates;
3410}
3411
3412static void
3413initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
3414{
3415    tr_peerIo * io;
3416    const time_t now = tr_time( );
3417
3418    tordbg( t, "Starting an OUTGOING connection with %s", tr_atomAddrStr( atom ) );
3419
3420    io = tr_peerIoNewOutgoing( mgr->session,
3421                               mgr->session->bandwidth,
3422                               &atom->addr,
3423                               atom->port,
3424                               t->tor->info.hash,
3425                               t->tor->completeness == TR_SEED );
3426
3427    if( io == NULL )
3428    {
3429        tordbg( t, "peerIo not created; marking peer %s as unreachable",
3430                tr_atomAddrStr( atom ) );
3431        atom->myflags |= MYFLAG_UNREACHABLE;
3432        atom->numFails++;
3433    }
3434    else
3435    {
3436        tr_handshake * handshake = tr_handshakeNew( io,
3437                                                    mgr->session->encryptionMode,
3438                                                    myHandshakeDoneCB,
3439                                                    mgr );
3440
3441        assert( tr_peerIoGetTorrentHash( io ) );
3442
3443        tr_peerIoUnref( io ); /* balanced by the initial ref
3444                                 in tr_peerIoNewOutgoing() */
3445
3446        tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
3447                                 handshakeCompare );
3448    }
3449
3450    atom->lastConnectionAttemptAt = now;
3451    atom->time = now;
3452}
3453
3454static void
3455initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
3456{
3457#if 0
3458    fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
3459             tr_atomAddrStr( c->atom ),
3460             tr_torrentName( c->tor ),
3461             (int)c->atom->seedProbability,
3462             tr_torrentIsPrivate( c->tor ) ? "private" : "public",
3463             tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
3464#endif
3465
3466    initiateConnection( mgr, c->tor->torrentPeers, c->atom );
3467}
3468
3469static void
3470makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
3471{
3472    int i, n;
3473    struct peer_candidate * candidates;
3474
3475    candidates = getPeerCandidates( mgr->session, &n );
3476
3477    for( i=0; i<n && i<max; ++i )
3478        initiateCandidateConnection( mgr, &candidates[i] );
3479
3480    tr_free( candidates );
3481}
Note: See TracBrowser for help on using the repository browser.