source: trunk/libtransmission/peer-msgs.c @ 13949

Last change on this file since 13949 was 13949, checked in by jordan, 8 years ago

(libT) reduce the scope of REQUEST_BUF_SECS from peer-common.h to peer-msgs.c

  • Property svn:keywords set to Date Rev Author Id
File size: 71.4 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2 (b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 13949 2013-02-03 23:29:34Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdarg.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <event2/buffer.h>
20#include <event2/bufferevent.h>
21#include <event2/event.h>
22
23#include "transmission.h"
24#include "cache.h"
25#include "completion.h"
26#include "crypto.h" /* tr_sha1 () */
27#include "log.h"
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-msgs.h"
31#include "session.h"
32#include "torrent.h"
33#include "torrent-magnet.h"
34#include "tr-dht.h"
35#include "utils.h"
36#include "variant.h"
37#include "version.h"
38
39/**
40***
41**/
42
43enum
44{
45    BT_CHOKE                = 0,
46    BT_UNCHOKE              = 1,
47    BT_INTERESTED           = 2,
48    BT_NOT_INTERESTED       = 3,
49    BT_HAVE                 = 4,
50    BT_BITFIELD             = 5,
51    BT_REQUEST              = 6,
52    BT_PIECE                = 7,
53    BT_CANCEL               = 8,
54    BT_PORT                 = 9,
55
56    BT_FEXT_SUGGEST         = 13,
57    BT_FEXT_HAVE_ALL        = 14,
58    BT_FEXT_HAVE_NONE       = 15,
59    BT_FEXT_REJECT          = 16,
60    BT_FEXT_ALLOWED_FAST    = 17,
61
62    BT_LTEP                 = 20,
63
64    LTEP_HANDSHAKE          = 0,
65
66    UT_PEX_ID               = 1,
67    UT_METADATA_ID          = 3,
68
69    MAX_PEX_PEER_COUNT      = 50,
70
71    MIN_CHOKE_PERIOD_SEC    = 10,
72
73    /* idle seconds before we send a keepalive */
74    KEEPALIVE_INTERVAL_SECS = 100,
75
76    PEX_INTERVAL_SECS       = 90, /* sec between sendPex () calls */
77
78    REQQ                    = 512,
79
80    METADATA_REQQ           = 64,
81
82    /* used in lowering the outMessages queue period */
83    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
84    HIGH_PRIORITY_INTERVAL_SECS = 2,
85    LOW_PRIORITY_INTERVAL_SECS = 10,
86
87    /* number of pieces we'll allow in our fast set */
88    MAX_FAST_SET_SIZE = 3,
89
90    /* how many blocks to keep prefetched per peer */
91    PREFETCH_SIZE = 18,
92
93    /* when we're making requests from another peer,
94       batch them together to send enough requests to
95       meet our bandwidth goals for the next N seconds */
96    REQUEST_BUF_SECS = 10,
97
98    /* defined in BEP #9 */
99    METADATA_MSG_TYPE_REQUEST = 0,
100    METADATA_MSG_TYPE_DATA = 1,
101    METADATA_MSG_TYPE_REJECT = 2
102};
103
104enum
105{
106    AWAITING_BT_LENGTH,
107    AWAITING_BT_ID,
108    AWAITING_BT_MESSAGE,
109    AWAITING_BT_PIECE
110};
111
112typedef enum
113{
114  ENCRYPTION_PREFERENCE_UNKNOWN,
115  ENCRYPTION_PREFERENCE_YES,
116  ENCRYPTION_PREFERENCE_NO
117}
118encryption_preference_t;
119
120
121
122/**
123***
124**/
125
126struct peer_request
127{
128    uint32_t    index;
129    uint32_t    offset;
130    uint32_t    length;
131};
132
133static void
134blockToReq (const tr_torrent     * tor,
135            tr_block_index_t       block,
136            struct peer_request  * setme)
137{
138    tr_torrentGetBlockLocation (tor, block, &setme->index,
139                                            &setme->offset,
140                                            &setme->length);
141}
142
143/**
144***
145**/
146
147/* this is raw, unchanged data from the peer regarding
148 * the current message that it's sending us. */
149struct tr_incoming
150{
151    uint8_t                id;
152    uint32_t               length; /* includes the +1 for id length */
153    struct peer_request    blockReq; /* metadata for incoming blocks */
154    struct evbuffer *      block; /* piece data for incoming blocks */
155};
156
157/**
158 * Low-level communication state information about a connected peer.
159 *
160 * This structure remembers the low-level protocol states that we're
161 * in with this peer, such as active requests, pex messages, and so on.
162 * Its fields are all private to peer-msgs.c.
163 *
164 * Data not directly involved with sending & receiving messages is
165 * stored in tr_peer, where it can be accessed by both peermsgs and
166 * the peer manager.
167 *
168 * @see struct peer_atom
169 * @see tr_peer
170 */
171struct tr_peermsgs
172{
173    bool            peerSupportsPex;
174    bool            peerSupportsMetadataXfer;
175    bool            clientSentLtepHandshake;
176    bool            peerSentLtepHandshake;
177
178    /*bool          haveFastSet;*/
179
180    int             desiredRequestCount;
181
182    int             prefetchCount;
183
184    /* how long the outMessages batch should be allowed to grow before
185     * it's flushed -- some messages (like requests >:) should be sent
186     * very quickly; others aren't as urgent. */
187    int8_t          outMessagesBatchPeriod;
188
189    uint8_t         state;
190    uint8_t         ut_pex_id;
191    uint8_t         ut_metadata_id;
192    uint16_t        pexCount;
193    uint16_t        pexCount6;
194
195    tr_port         dht_port;
196
197    encryption_preference_t  encryption_preference;
198
199    size_t                   metadata_size_hint;
200#if 0
201    size_t                 fastsetSize;
202    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
203#endif
204
205    tr_peer *              peer;
206
207    tr_torrent *           torrent;
208
209    tr_peer_callback      * callback;
210    void                  * callbackData;
211
212    struct evbuffer *      outMessages; /* all the non-piece messages */
213
214    struct peer_request    peerAskedFor[REQQ];
215
216    int                    peerAskedForMetadata[METADATA_REQQ];
217    int                    peerAskedForMetadataCount;
218
219    tr_pex               * pex;
220    tr_pex               * pex6;
221
222    /*time_t                 clientSentPexAt;*/
223    time_t                 clientSentAnythingAt;
224
225    /* when we started batching the outMessages */
226    time_t                outMessagesBatchedAt;
227
228    struct tr_incoming    incoming;
229
230    /* if the peer supports the Extension Protocol in BEP 10 and
231       supplied a reqq argument, it's stored here. Otherwise, the
232       value is zero and should be ignored. */
233    int64_t               reqq;
234
235    struct event        * pexTimer;
236};
237
238/**
239***
240**/
241
242static inline tr_session*
243getSession (struct tr_peermsgs * msgs)
244{
245    return msgs->torrent->session;
246}
247
248/**
249***
250**/
251
252static void
253myDebug (const char * file, int line,
254         const struct tr_peermsgs * msgs,
255         const char * fmt, ...)
256{
257    FILE * fp = tr_logGetFile ();
258
259    if (fp)
260    {
261        va_list           args;
262        char              timestr[64];
263        struct evbuffer * buf = evbuffer_new ();
264        char *            base = tr_basename (file);
265        char *            message;
266
267        evbuffer_add_printf (buf, "[%s] %s - %s [%s]: ",
268                             tr_logGetTimeStr (timestr, sizeof (timestr)),
269                             tr_torrentName (msgs->torrent),
270                             tr_peerIoGetAddrStr (msgs->peer->io),
271                             tr_quark_get_string (msgs->peer->client, NULL));
272        va_start (args, fmt);
273        evbuffer_add_vprintf (buf, fmt, args);
274        va_end (args);
275        evbuffer_add_printf (buf, " (%s:%d)\n", base, line);
276
277        message = evbuffer_free_to_str (buf);
278        fputs (message, fp);
279
280        tr_free (base);
281        tr_free (message);
282    }
283}
284
285#define dbgmsg(msgs, ...) \
286  do \
287    { \
288      if (tr_logGetDeepEnabled ()) \
289        myDebug (__FILE__, __LINE__, msgs, __VA_ARGS__); \
290    } \
291  while (0)
292
293/**
294***
295**/
296
297static void
298pokeBatchPeriod (tr_peermsgs * msgs, int interval)
299{
300    if (msgs->outMessagesBatchPeriod > interval)
301    {
302        msgs->outMessagesBatchPeriod = interval;
303        dbgmsg (msgs, "lowering batch interval to %d seconds", interval);
304    }
305}
306
307static void
308dbgOutMessageLen (tr_peermsgs * msgs)
309{
310    dbgmsg (msgs, "outMessage size is now %zu", evbuffer_get_length (msgs->outMessages));
311}
312
313static void
314protocolSendReject (tr_peermsgs * msgs, const struct peer_request * req)
315{
316    struct evbuffer * out = msgs->outMessages;
317
318    assert (tr_peerIoSupportsFEXT (msgs->peer->io));
319
320    evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
321    evbuffer_add_uint8 (out, BT_FEXT_REJECT);
322    evbuffer_add_uint32 (out, req->index);
323    evbuffer_add_uint32 (out, req->offset);
324    evbuffer_add_uint32 (out, req->length);
325
326    dbgmsg (msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length);
327    dbgOutMessageLen (msgs);
328}
329
330static void
331protocolSendRequest (tr_peermsgs * msgs, const struct peer_request * req)
332{
333    struct evbuffer * out = msgs->outMessages;
334
335    evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
336    evbuffer_add_uint8 (out, BT_REQUEST);
337    evbuffer_add_uint32 (out, req->index);
338    evbuffer_add_uint32 (out, req->offset);
339    evbuffer_add_uint32 (out, req->length);
340
341    dbgmsg (msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length);
342    dbgOutMessageLen (msgs);
343    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
344}
345
346static void
347protocolSendCancel (tr_peermsgs * msgs, const struct peer_request * req)
348{
349    struct evbuffer * out = msgs->outMessages;
350
351    evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
352    evbuffer_add_uint8 (out, BT_CANCEL);
353    evbuffer_add_uint32 (out, req->index);
354    evbuffer_add_uint32 (out, req->offset);
355    evbuffer_add_uint32 (out, req->length);
356
357    dbgmsg (msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length);
358    dbgOutMessageLen (msgs);
359    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
360}
361
362static void
363protocolSendPort (tr_peermsgs *msgs, uint16_t port)
364{
365    struct evbuffer * out = msgs->outMessages;
366
367    dbgmsg (msgs, "sending Port %u", port);
368    evbuffer_add_uint32 (out, 3);
369    evbuffer_add_uint8 (out, BT_PORT);
370    evbuffer_add_uint16 (out, port);
371}
372
373static void
374protocolSendHave (tr_peermsgs * msgs, uint32_t index)
375{
376    struct evbuffer * out = msgs->outMessages;
377
378    evbuffer_add_uint32 (out, sizeof (uint8_t) + sizeof (uint32_t));
379    evbuffer_add_uint8 (out, BT_HAVE);
380    evbuffer_add_uint32 (out, index);
381
382    dbgmsg (msgs, "sending Have %u", index);
383    dbgOutMessageLen (msgs);
384    pokeBatchPeriod (msgs, LOW_PRIORITY_INTERVAL_SECS);
385}
386
387#if 0
388static void
389protocolSendAllowedFast (tr_peermsgs * msgs, uint32_t pieceIndex)
390{
391    tr_peerIo       * io  = msgs->peer->io;
392    struct evbuffer * out = msgs->outMessages;
393
394    assert (tr_peerIoSupportsFEXT (msgs->peer->io));
395
396    evbuffer_add_uint32 (io, out, sizeof (uint8_t) + sizeof (uint32_t));
397    evbuffer_add_uint8 (io, out, BT_FEXT_ALLOWED_FAST);
398    evbuffer_add_uint32 (io, out, pieceIndex);
399
400    dbgmsg (msgs, "sending Allowed Fast %u...", pieceIndex);
401    dbgOutMessageLen (msgs);
402}
403#endif
404
405static void
406protocolSendChoke (tr_peermsgs * msgs, int choke)
407{
408    struct evbuffer * out = msgs->outMessages;
409
410    evbuffer_add_uint32 (out, sizeof (uint8_t));
411    evbuffer_add_uint8 (out, choke ? BT_CHOKE : BT_UNCHOKE);
412
413    dbgmsg (msgs, "sending %s...", choke ? "Choke" : "Unchoke");
414    dbgOutMessageLen (msgs);
415    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
416}
417
418static void
419protocolSendHaveAll (tr_peermsgs * msgs)
420{
421    struct evbuffer * out = msgs->outMessages;
422
423    assert (tr_peerIoSupportsFEXT (msgs->peer->io));
424
425    evbuffer_add_uint32 (out, sizeof (uint8_t));
426    evbuffer_add_uint8 (out, BT_FEXT_HAVE_ALL);
427
428    dbgmsg (msgs, "sending HAVE_ALL...");
429    dbgOutMessageLen (msgs);
430    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
431}
432
433static void
434protocolSendHaveNone (tr_peermsgs * msgs)
435{
436    struct evbuffer * out = msgs->outMessages;
437
438    assert (tr_peerIoSupportsFEXT (msgs->peer->io));
439
440    evbuffer_add_uint32 (out, sizeof (uint8_t));
441    evbuffer_add_uint8 (out, BT_FEXT_HAVE_NONE);
442
443    dbgmsg (msgs, "sending HAVE_NONE...");
444    dbgOutMessageLen (msgs);
445    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
446}
447
448/**
449***  EVENTS
450**/
451
452static void
453publish (tr_peermsgs * msgs, tr_peer_event * e)
454{
455    assert (msgs->peer);
456    assert (msgs->peer->msgs == msgs);
457
458    if (msgs->callback != NULL)
459        msgs->callback (msgs->peer, e, msgs->callbackData);
460}
461
462static void
463fireError (tr_peermsgs * msgs, int err)
464{
465    tr_peer_event e = TR_PEER_EVENT_INIT;
466    e.eventType = TR_PEER_ERROR;
467    e.err = err;
468    publish (msgs, &e);
469}
470
471static void
472fireGotBlock (tr_peermsgs * msgs, const struct peer_request * req)
473{
474    tr_peer_event e = TR_PEER_EVENT_INIT;
475    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
476    e.pieceIndex = req->index;
477    e.offset = req->offset;
478    e.length = req->length;
479    publish (msgs, &e);
480}
481
482static void
483fireGotRej (tr_peermsgs * msgs, const struct peer_request * req)
484{
485    tr_peer_event e = TR_PEER_EVENT_INIT;
486    e.eventType = TR_PEER_CLIENT_GOT_REJ;
487    e.pieceIndex = req->index;
488    e.offset = req->offset;
489    e.length = req->length;
490    publish (msgs, &e);
491}
492
493static void
494fireGotChoke (tr_peermsgs * msgs)
495{
496    tr_peer_event e = TR_PEER_EVENT_INIT;
497    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
498    publish (msgs, &e);
499}
500
501static void
502fireClientGotHaveAll (tr_peermsgs * msgs)
503{
504    tr_peer_event e = TR_PEER_EVENT_INIT;
505    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
506    publish (msgs, &e);
507}
508
509static void
510fireClientGotHaveNone (tr_peermsgs * msgs)
511{
512    tr_peer_event e = TR_PEER_EVENT_INIT;
513    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
514    publish (msgs, &e);
515}
516
517static void
518fireClientGotPieceData (tr_peermsgs * msgs, uint32_t length)
519{
520    tr_peer_event e = TR_PEER_EVENT_INIT;
521    e.length = length;
522    e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA;
523    publish (msgs, &e);
524}
525
526static void
527firePeerGotPieceData (tr_peermsgs * msgs, uint32_t length)
528{
529    tr_peer_event e = TR_PEER_EVENT_INIT;
530    e.length = length;
531    e.eventType = TR_PEER_PEER_GOT_PIECE_DATA;
532    publish (msgs, &e);
533}
534
535static void
536fireClientGotSuggest (tr_peermsgs * msgs, uint32_t pieceIndex)
537{
538    tr_peer_event e = TR_PEER_EVENT_INIT;
539    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
540    e.pieceIndex = pieceIndex;
541    publish (msgs, &e);
542}
543
544static void
545fireClientGotPort (tr_peermsgs * msgs, tr_port port)
546{
547    tr_peer_event e = TR_PEER_EVENT_INIT;
548    e.eventType = TR_PEER_CLIENT_GOT_PORT;
549    e.port = port;
550    publish (msgs, &e);
551}
552
553static void
554fireClientGotAllowedFast (tr_peermsgs * msgs, uint32_t pieceIndex)
555{
556    tr_peer_event e = TR_PEER_EVENT_INIT;
557    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
558    e.pieceIndex = pieceIndex;
559    publish (msgs, &e);
560}
561
562static void
563fireClientGotBitfield (tr_peermsgs * msgs, tr_bitfield * bitfield)
564{
565    tr_peer_event e = TR_PEER_EVENT_INIT;
566    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
567    e.bitfield = bitfield;
568    publish (msgs, &e);
569}
570
571static void
572fireClientGotHave (tr_peermsgs * msgs, tr_piece_index_t index)
573{
574    tr_peer_event e = TR_PEER_EVENT_INIT;
575    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
576    e.pieceIndex = index;
577    publish (msgs, &e);
578}
579
580
581/**
582***  ALLOWED FAST SET
583***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
584**/
585
586#if 0
587size_t
588tr_generateAllowedSet (tr_piece_index_t * setmePieces,
589                       size_t             desiredSetSize,
590                       size_t             pieceCount,
591                       const uint8_t    * infohash,
592                       const tr_address * addr)
593{
594    size_t setSize = 0;
595
596    assert (setmePieces);
597    assert (desiredSetSize <= pieceCount);
598    assert (desiredSetSize);
599    assert (pieceCount);
600    assert (infohash);
601    assert (addr);
602
603    if (addr->type == TR_AF_INET)
604    {
605        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
606        uint8_t x[SHA_DIGEST_LENGTH];
607
608        uint32_t ui32 = ntohl (htonl (addr->addr.addr4.s_addr) & 0xffffff00);   /* (1) */
609        memcpy (w, &ui32, sizeof (uint32_t));
610        walk += sizeof (uint32_t);
611        memcpy (walk, infohash, SHA_DIGEST_LENGTH);                 /* (2) */
612        walk += SHA_DIGEST_LENGTH;
613        tr_sha1 (x, w, walk-w, NULL);                               /* (3) */
614        assert (sizeof (w) == walk-w);
615
616        while (setSize<desiredSetSize)
617        {
618            int i;
619            for (i=0; i<5 && setSize<desiredSetSize; ++i)           /* (4) */
620            {
621                size_t k;
622                uint32_t j = i * 4;                                  /* (5) */
623                uint32_t y = ntohl (* (uint32_t*)(x + j));       /* (6) */
624                uint32_t index = y % pieceCount;                     /* (7) */
625
626                for (k=0; k<setSize; ++k)                           /* (8) */
627                    if (setmePieces[k] == index)
628                        break;
629
630                if (k == setSize)
631                    setmePieces[setSize++] = index;                  /* (9) */
632            }
633
634            tr_sha1 (x, x, sizeof (x), NULL);                      /* (3) */
635        }
636    }
637
638    return setSize;
639}
640
641static void
642updateFastSet (tr_peermsgs * msgs UNUSED)
643{
644    const bool fext = tr_peerIoSupportsFEXT (msgs->peer->io);
645    const int peerIsNeedy = msgs->peer->progress < 0.10;
646
647    if (fext && peerIsNeedy && !msgs->haveFastSet)
648    {
649        size_t i;
650        const struct tr_address * addr = tr_peerIoGetAddress (msgs->peer->io, NULL);
651        const tr_info * inf = &msgs->torrent->info;
652        const size_t numwant = MIN (MAX_FAST_SET_SIZE, inf->pieceCount);
653
654        /* build the fast set */
655        msgs->fastsetSize = tr_generateAllowedSet (msgs->fastset, numwant, inf->pieceCount, inf->hash, addr);
656        msgs->haveFastSet = 1;
657
658        /* send it to the peer */
659        for (i=0; i<msgs->fastsetSize; ++i)
660            protocolSendAllowedFast (msgs, msgs->fastset[i]);
661    }
662}
663#endif
664
665/**
666***  INTEREST
667**/
668
669static void
670sendInterest (tr_peermsgs * msgs, bool clientIsInterested)
671{
672    struct evbuffer * out = msgs->outMessages;
673
674    assert (msgs);
675    assert (tr_isBool (clientIsInterested));
676
677    msgs->peer->clientIsInterested = clientIsInterested;
678    dbgmsg (msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested");
679    evbuffer_add_uint32 (out, sizeof (uint8_t));
680    evbuffer_add_uint8 (out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED);
681
682    pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
683    dbgOutMessageLen (msgs);
684}
685
686static void
687updateInterest (tr_peermsgs * msgs UNUSED)
688{
689    /* FIXME -- might need to poke the mgr on startup */
690}
691
692void
693tr_peerMsgsSetInterested (tr_peermsgs * msgs, bool clientIsInterested)
694{
695    assert (tr_isBool (clientIsInterested));
696
697    if (clientIsInterested != msgs->peer->clientIsInterested)
698        sendInterest (msgs, clientIsInterested);
699}
700
701static bool
702popNextMetadataRequest (tr_peermsgs * msgs, int * piece)
703{
704    if (msgs->peerAskedForMetadataCount == 0)
705        return false;
706
707    *piece = msgs->peerAskedForMetadata[0];
708
709    tr_removeElementFromArray (msgs->peerAskedForMetadata, 0, sizeof (int),
710                               msgs->peerAskedForMetadataCount--);
711
712    return true;
713}
714
715static bool
716popNextRequest (tr_peermsgs * msgs, struct peer_request * setme)
717{
718    if (msgs->peer->pendingReqsToClient == 0)
719        return false;
720
721    *setme = msgs->peerAskedFor[0];
722
723    tr_removeElementFromArray (msgs->peerAskedFor, 0, sizeof (struct peer_request),
724                               msgs->peer->pendingReqsToClient--);
725
726    return true;
727}
728
729static void
730cancelAllRequestsToClient (tr_peermsgs * msgs)
731{
732    struct peer_request req;
733    const int mustSendCancel = tr_peerIoSupportsFEXT (msgs->peer->io);
734
735    while (popNextRequest (msgs, &req))
736        if (mustSendCancel)
737            protocolSendReject (msgs, &req);
738}
739
740void
741tr_peerMsgsSetChoke (tr_peermsgs * msgs, bool peerIsChoked)
742{
743    const time_t now = tr_time ();
744    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
745
746    assert (msgs);
747    assert (msgs->peer);
748    assert (tr_isBool (peerIsChoked));
749
750    if (msgs->peer->chokeChangedAt > fibrillationTime)
751    {
752        dbgmsg (msgs, "Not changing choke to %d to avoid fibrillation", peerIsChoked);
753    }
754    else if (msgs->peer->peerIsChoked != peerIsChoked)
755    {
756        msgs->peer->peerIsChoked = peerIsChoked;
757        if (peerIsChoked)
758            cancelAllRequestsToClient (msgs);
759        protocolSendChoke (msgs, peerIsChoked);
760        msgs->peer->chokeChangedAt = now;
761    }
762}
763
764/**
765***
766**/
767
768void
769tr_peerMsgsHave (tr_peermsgs * msgs, uint32_t index)
770{
771    protocolSendHave (msgs, index);
772
773    /* since we have more pieces now, we might not be interested in this peer */
774    updateInterest (msgs);
775}
776
777/**
778***
779**/
780
781static bool
782reqIsValid (const tr_peermsgs * peer,
783            uint32_t            index,
784            uint32_t            offset,
785            uint32_t            length)
786{
787    return tr_torrentReqIsValid (peer->torrent, index, offset, length);
788}
789
790static bool
791requestIsValid (const tr_peermsgs * msgs, const struct peer_request * req)
792{
793    return reqIsValid (msgs, req->index, req->offset, req->length);
794}
795
796void
797tr_peerMsgsCancel (tr_peermsgs * msgs, tr_block_index_t block)
798{
799    struct peer_request req;
800/*fprintf (stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer);*/
801    blockToReq (msgs->torrent, block, &req);
802    protocolSendCancel (msgs, &req);
803}
804
805/**
806***
807**/
808
809static void
810sendLtepHandshake (tr_peermsgs * msgs)
811{
812    tr_variant val;
813    bool allow_pex;
814    bool allow_metadata_xfer;
815    struct evbuffer * payload;
816    struct evbuffer * out = msgs->outMessages;
817    const unsigned char * ipv6 = tr_globalIPv6 ();
818    static tr_quark version_quark = 0;
819
820    if (msgs->clientSentLtepHandshake)
821        return;
822
823    if (!version_quark)
824      version_quark = tr_quark_new (TR_NAME " " USERAGENT_PREFIX, -1);
825
826    dbgmsg (msgs, "sending an ltep handshake");
827    msgs->clientSentLtepHandshake = 1;
828
829    /* decide if we want to advertise metadata xfer support (BEP 9) */
830    if (tr_torrentIsPrivate (msgs->torrent))
831        allow_metadata_xfer = 0;
832    else
833        allow_metadata_xfer = 1;
834
835    /* decide if we want to advertise pex support */
836    if (!tr_torrentAllowsPex (msgs->torrent))
837        allow_pex = 0;
838    else if (msgs->peerSentLtepHandshake)
839        allow_pex = msgs->peerSupportsPex ? 1 : 0;
840    else
841        allow_pex = 1;
842
843    tr_variantInitDict (&val, 8);
844    tr_variantDictAddInt (&val, TR_KEY_e, getSession (msgs)->encryptionMode != TR_CLEAR_PREFERRED);
845    if (ipv6 != NULL)
846        tr_variantDictAddRaw (&val, TR_KEY_ipv6, ipv6, 16);
847    if (allow_metadata_xfer && tr_torrentHasMetadata (msgs->torrent)
848                            && (msgs->torrent->infoDictLength > 0))
849        tr_variantDictAddInt (&val, TR_KEY_metadata_size, msgs->torrent->infoDictLength);
850    tr_variantDictAddInt (&val, TR_KEY_p, tr_sessionGetPublicPeerPort (getSession (msgs)));
851    tr_variantDictAddInt (&val, TR_KEY_reqq, REQQ);
852    tr_variantDictAddInt (&val, TR_KEY_upload_only, tr_torrentIsSeed (msgs->torrent));
853    tr_variantDictAddQuark (&val, TR_KEY_v, version_quark);
854    if (allow_metadata_xfer || allow_pex) {
855        tr_variant * m  = tr_variantDictAddDict (&val, TR_KEY_m, 2);
856        if (allow_metadata_xfer)
857            tr_variantDictAddInt (m, TR_KEY_ut_metadata, UT_METADATA_ID);
858        if (allow_pex)
859            tr_variantDictAddInt (m, TR_KEY_ut_pex, UT_PEX_ID);
860    }
861
862    payload = tr_variantToBuf (&val, TR_VARIANT_FMT_BENC);
863
864    evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
865    evbuffer_add_uint8 (out, BT_LTEP);
866    evbuffer_add_uint8 (out, LTEP_HANDSHAKE);
867    evbuffer_add_buffer (out, payload);
868    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
869    dbgOutMessageLen (msgs);
870
871    /* cleanup */
872    evbuffer_free (payload);
873    tr_variantFree (&val);
874}
875
876static void
877parseLtepHandshake (tr_peermsgs * msgs, int len, struct evbuffer * inbuf)
878{
879    int64_t   i;
880    tr_variant   val, * sub;
881    uint8_t * tmp = tr_new (uint8_t, len);
882    const uint8_t *addr;
883    size_t addr_len;
884    tr_pex pex;
885    int8_t seedProbability = -1;
886
887    memset (&pex, 0, sizeof (tr_pex));
888
889    tr_peerIoReadBytes (msgs->peer->io, inbuf, tmp, len);
890    msgs->peerSentLtepHandshake = 1;
891
892    if (tr_variantFromBenc (&val, tmp, len) || !tr_variantIsDict (&val))
893    {
894        dbgmsg (msgs, "GET  extended-handshake, couldn't get dictionary");
895        tr_free (tmp);
896        return;
897    }
898
899    dbgmsg (msgs, "here is the handshake: [%*.*s]", len, len,  tmp);
900
901    /* does the peer prefer encrypted connections? */
902    if (tr_variantDictFindInt (&val, TR_KEY_e, &i)) {
903        msgs->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
904                                        : ENCRYPTION_PREFERENCE_NO;
905        if (i)
906            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
907    }
908
909    /* check supported messages for utorrent pex */
910    msgs->peerSupportsPex = 0;
911    msgs->peerSupportsMetadataXfer = 0;
912
913    if (tr_variantDictFindDict (&val, TR_KEY_m, &sub)) {
914        if (tr_variantDictFindInt (sub, TR_KEY_ut_pex, &i)) {
915            msgs->peerSupportsPex = i != 0;
916            msgs->ut_pex_id = (uint8_t) i;
917            dbgmsg (msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id);
918        }
919        if (tr_variantDictFindInt (sub, TR_KEY_ut_metadata, &i)) {
920            msgs->peerSupportsMetadataXfer = i != 0;
921            msgs->ut_metadata_id = (uint8_t) i;
922            dbgmsg (msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id);
923        }
924        if (tr_variantDictFindInt (sub, TR_KEY_ut_holepunch, &i)) {
925            /* Mysterious µTorrent extension that we don't grok.  However,
926               it implies support for µTP, so use it to indicate that. */
927            tr_peerMgrSetUtpFailed (msgs->torrent,
928                                    tr_peerIoGetAddress (msgs->peer->io, NULL),
929                                    false);
930        }
931    }
932
933    /* look for metainfo size (BEP 9) */
934    if (tr_variantDictFindInt (&val, TR_KEY_metadata_size, &i)) {
935        tr_torrentSetMetadataSizeHint (msgs->torrent, i);
936        msgs->metadata_size_hint = (size_t) i;
937    }
938
939    /* look for upload_only (BEP 21) */
940    if (tr_variantDictFindInt (&val, TR_KEY_upload_only, &i))
941        seedProbability = i==0 ? 0 : 100;
942
943    /* get peer's listening port */
944    if (tr_variantDictFindInt (&val, TR_KEY_p, &i)) {
945        pex.port = htons ((uint16_t)i);
946        fireClientGotPort (msgs, pex.port);
947        dbgmsg (msgs, "peer's port is now %d", (int)i);
948    }
949
950    if (tr_peerIoIsIncoming (msgs->peer->io)
951        && tr_variantDictFindRaw (&val, TR_KEY_ipv4, &addr, &addr_len)
952        && (addr_len == 4))
953    {
954        pex.addr.type = TR_AF_INET;
955        memcpy (&pex.addr.addr.addr4, addr, 4);
956        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
957    }
958
959    if (tr_peerIoIsIncoming (msgs->peer->io)
960        && tr_variantDictFindRaw (&val, TR_KEY_ipv6, &addr, &addr_len)
961        && (addr_len == 16))
962    {
963        pex.addr.type = TR_AF_INET6;
964        memcpy (&pex.addr.addr.addr6, addr, 16);
965        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
966    }
967
968    /* get peer's maximum request queue size */
969    if (tr_variantDictFindInt (&val, TR_KEY_reqq, &i))
970        msgs->reqq = i;
971
972    tr_variantFree (&val);
973    tr_free (tmp);
974}
975
976static void
977parseUtMetadata (tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf)
978{
979    tr_variant dict;
980    char * msg_end;
981    const char * benc_end;
982    int64_t msg_type = -1;
983    int64_t piece = -1;
984    int64_t total_size = 0;
985    uint8_t * tmp = tr_new (uint8_t, msglen);
986
987    tr_peerIoReadBytes (msgs->peer->io, inbuf, tmp, msglen);
988    msg_end = (char*)tmp + msglen;
989
990    if (!tr_variantFromBencFull (&dict, tmp, msglen, NULL, &benc_end))
991    {
992        tr_variantDictFindInt (&dict, TR_KEY_msg_type, &msg_type);
993        tr_variantDictFindInt (&dict, TR_KEY_piece, &piece);
994        tr_variantDictFindInt (&dict, TR_KEY_total_size, &total_size);
995        tr_variantFree (&dict);
996    }
997
998    dbgmsg (msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
999          (int)msg_type, (int)piece, (int)total_size);
1000
1001    if (msg_type == METADATA_MSG_TYPE_REJECT)
1002    {
1003        /* NOOP */
1004    }
1005
1006    if ((msg_type == METADATA_MSG_TYPE_DATA)
1007        && (!tr_torrentHasMetadata (msgs->torrent))
1008        && (msg_end - benc_end <= METADATA_PIECE_SIZE)
1009        && (piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size))
1010    {
1011        const int pieceLen = msg_end - benc_end;
1012        tr_torrentSetMetadataPiece (msgs->torrent, piece, benc_end, pieceLen);
1013    }
1014
1015    if (msg_type == METADATA_MSG_TYPE_REQUEST)
1016    {
1017        if ((piece >= 0)
1018            && tr_torrentHasMetadata (msgs->torrent)
1019            && !tr_torrentIsPrivate (msgs->torrent)
1020            && (msgs->peerAskedForMetadataCount < METADATA_REQQ))
1021        {
1022            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1023        }
1024        else
1025        {
1026            tr_variant tmp;
1027            struct evbuffer * payload;
1028            struct evbuffer * out = msgs->outMessages;
1029
1030            /* build the rejection message */
1031            tr_variantInitDict (&tmp, 2);
1032            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
1033            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1034            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1035
1036            /* write it out as a LTEP message to our outMessages buffer */
1037            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1038            evbuffer_add_uint8 (out, BT_LTEP);
1039            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1040            evbuffer_add_buffer (out, payload);
1041            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1042            dbgOutMessageLen (msgs);
1043
1044            /* cleanup */
1045            evbuffer_free (payload);
1046            tr_variantFree (&tmp);
1047        }
1048    }
1049
1050    tr_free (tmp);
1051}
1052
1053static void
1054parseUtPex (tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf)
1055{
1056    int loaded = 0;
1057    uint8_t * tmp = tr_new (uint8_t, msglen);
1058    tr_variant val;
1059    tr_torrent * tor = msgs->torrent;
1060    const uint8_t * added;
1061    size_t added_len;
1062
1063    tr_peerIoReadBytes (msgs->peer->io, inbuf, tmp, msglen);
1064
1065    if (tr_torrentAllowsPex (tor)
1066      && ((loaded = !tr_variantFromBenc (&val, tmp, msglen))))
1067    {
1068        if (tr_variantDictFindRaw (&val, TR_KEY_added, &added, &added_len))
1069        {
1070            tr_pex * pex;
1071            size_t i, n;
1072            size_t added_f_len = 0;
1073            const uint8_t * added_f = NULL;
1074
1075            tr_variantDictFindRaw (&val, TR_KEY_added_f, &added_f, &added_f_len);
1076            pex = tr_peerMgrCompactToPex (added, added_len, added_f, added_f_len, &n);
1077
1078            n = MIN (n, MAX_PEX_PEER_COUNT);
1079            for (i=0; i<n; ++i)
1080            {
1081                int seedProbability = -1;
1082                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1083                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1084            }
1085
1086            tr_free (pex);
1087        }
1088
1089        if (tr_variantDictFindRaw (&val, TR_KEY_added6, &added, &added_len))
1090        {
1091            tr_pex * pex;
1092            size_t i, n;
1093            size_t added_f_len = 0;
1094            const uint8_t * added_f = NULL;
1095
1096            tr_variantDictFindRaw (&val, TR_KEY_added6_f, &added_f, &added_f_len);
1097            pex = tr_peerMgrCompact6ToPex (added, added_len, added_f, added_f_len, &n);
1098
1099            n = MIN (n, MAX_PEX_PEER_COUNT);
1100            for (i=0; i<n; ++i)
1101            {
1102                int seedProbability = -1;
1103                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1104                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1105            }
1106
1107            tr_free (pex);
1108        }
1109    }
1110
1111    if (loaded)
1112        tr_variantFree (&val);
1113    tr_free (tmp);
1114}
1115
1116static void sendPex (tr_peermsgs * msgs);
1117
1118static void
1119parseLtep (tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf)
1120{
1121    uint8_t ltep_msgid;
1122
1123    tr_peerIoReadUint8 (msgs->peer->io, inbuf, &ltep_msgid);
1124    msglen--;
1125
1126    if (ltep_msgid == LTEP_HANDSHAKE)
1127    {
1128        dbgmsg (msgs, "got ltep handshake");
1129        parseLtepHandshake (msgs, msglen, inbuf);
1130        if (tr_peerIoSupportsLTEP (msgs->peer->io))
1131        {
1132            sendLtepHandshake (msgs);
1133            sendPex (msgs);
1134        }
1135    }
1136    else if (ltep_msgid == UT_PEX_ID)
1137    {
1138        dbgmsg (msgs, "got ut pex");
1139        msgs->peerSupportsPex = 1;
1140        parseUtPex (msgs, msglen, inbuf);
1141    }
1142    else if (ltep_msgid == UT_METADATA_ID)
1143    {
1144        dbgmsg (msgs, "got ut metadata");
1145        msgs->peerSupportsMetadataXfer = 1;
1146        parseUtMetadata (msgs, msglen, inbuf);
1147    }
1148    else
1149    {
1150        dbgmsg (msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid);
1151        evbuffer_drain (inbuf, msglen);
1152    }
1153}
1154
1155static int
1156readBtLength (tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1157{
1158    uint32_t len;
1159
1160    if (inlen < sizeof (len))
1161        return READ_LATER;
1162
1163    tr_peerIoReadUint32 (msgs->peer->io, inbuf, &len);
1164
1165    if (len == 0) /* peer sent us a keepalive message */
1166        dbgmsg (msgs, "got KeepAlive");
1167    else
1168    {
1169        msgs->incoming.length = len;
1170        msgs->state = AWAITING_BT_ID;
1171    }
1172
1173    return READ_NOW;
1174}
1175
1176static int readBtMessage (tr_peermsgs *, struct evbuffer *, size_t);
1177
1178static int
1179readBtId (tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1180{
1181    uint8_t id;
1182
1183    if (inlen < sizeof (uint8_t))
1184        return READ_LATER;
1185
1186    tr_peerIoReadUint8 (msgs->peer->io, inbuf, &id);
1187    msgs->incoming.id = id;
1188    dbgmsg (msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length);
1189
1190    if (id == BT_PIECE)
1191    {
1192        msgs->state = AWAITING_BT_PIECE;
1193        return READ_NOW;
1194    }
1195    else if (msgs->incoming.length != 1)
1196    {
1197        msgs->state = AWAITING_BT_MESSAGE;
1198        return READ_NOW;
1199    }
1200    else return readBtMessage (msgs, inbuf, inlen - 1);
1201}
1202
1203static void
1204updatePeerProgress (tr_peermsgs * msgs)
1205{
1206    tr_peerUpdateProgress (msgs->torrent, msgs->peer);
1207
1208    /*updateFastSet (msgs);*/
1209    updateInterest (msgs);
1210}
1211
1212static void
1213prefetchPieces (tr_peermsgs *msgs)
1214{
1215    int i;
1216
1217    if (!getSession (msgs)->isPrefetchEnabled)
1218        return;
1219
1220    for (i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<PREFETCH_SIZE; ++i)
1221    {
1222        const struct peer_request * req = msgs->peerAskedFor + i;
1223        if (requestIsValid (msgs, req))
1224        {
1225            tr_cachePrefetchBlock (getSession (msgs)->cache, msgs->torrent, req->index, req->offset, req->length);
1226            ++msgs->prefetchCount;
1227        }
1228    }
1229}
1230
1231static void
1232peerMadeRequest (tr_peermsgs * msgs, const struct peer_request * req)
1233{
1234    const bool fext = tr_peerIoSupportsFEXT (msgs->peer->io);
1235    const int reqIsValid = requestIsValid (msgs, req);
1236    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete (&msgs->torrent->completion, req->index);
1237    const int peerIsChoked = msgs->peer->peerIsChoked;
1238
1239    int allow = false;
1240
1241    if (!reqIsValid)
1242        dbgmsg (msgs, "rejecting an invalid request.");
1243    else if (!clientHasPiece)
1244        dbgmsg (msgs, "rejecting request for a piece we don't have.");
1245    else if (peerIsChoked)
1246        dbgmsg (msgs, "rejecting request from choked peer");
1247    else if (msgs->peer->pendingReqsToClient + 1 >= REQQ)
1248        dbgmsg (msgs, "rejecting request ... reqq is full");
1249    else
1250        allow = true;
1251
1252    if (allow) {
1253        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1254        prefetchPieces (msgs);
1255    } else if (fext) {
1256        protocolSendReject (msgs, req);
1257    }
1258}
1259
1260static bool
1261messageLengthIsCorrect (const tr_peermsgs * msg, uint8_t id, uint32_t len)
1262{
1263    switch (id)
1264    {
1265        case BT_CHOKE:
1266        case BT_UNCHOKE:
1267        case BT_INTERESTED:
1268        case BT_NOT_INTERESTED:
1269        case BT_FEXT_HAVE_ALL:
1270        case BT_FEXT_HAVE_NONE:
1271            return len == 1;
1272
1273        case BT_HAVE:
1274        case BT_FEXT_SUGGEST:
1275        case BT_FEXT_ALLOWED_FAST:
1276            return len == 5;
1277
1278        case BT_BITFIELD:
1279            if (tr_torrentHasMetadata (msg->torrent))
1280                return len == (msg->torrent->info.pieceCount + 7u) / 8u + 1u;
1281            /* we don't know the piece count yet,
1282               so we can only guess whether to send true or false */
1283            if (msg->metadata_size_hint > 0)
1284                return len <= msg->metadata_size_hint;
1285            return true;
1286
1287        case BT_REQUEST:
1288        case BT_CANCEL:
1289        case BT_FEXT_REJECT:
1290            return len == 13;
1291
1292        case BT_PIECE:
1293            return len > 9 && len <= 16393;
1294
1295        case BT_PORT:
1296            return len == 3;
1297
1298        case BT_LTEP:
1299            return len >= 2;
1300
1301        default:
1302            return false;
1303    }
1304}
1305
1306static int clientGotBlock (tr_peermsgs *               msgs,
1307                           struct evbuffer *           block,
1308                           const struct peer_request * req);
1309
1310static int
1311readBtPiece (tr_peermsgs      * msgs,
1312             struct evbuffer  * inbuf,
1313             size_t             inlen,
1314             size_t           * setme_piece_bytes_read)
1315{
1316    struct peer_request * req = &msgs->incoming.blockReq;
1317
1318    assert (evbuffer_get_length (inbuf) >= inlen);
1319    dbgmsg (msgs, "In readBtPiece");
1320
1321    if (!req->length)
1322    {
1323        if (inlen < 8)
1324            return READ_LATER;
1325
1326        tr_peerIoReadUint32 (msgs->peer->io, inbuf, &req->index);
1327        tr_peerIoReadUint32 (msgs->peer->io, inbuf, &req->offset);
1328        req->length = msgs->incoming.length - 9;
1329        dbgmsg (msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length);
1330        return READ_NOW;
1331    }
1332    else
1333    {
1334        int err;
1335        size_t n;
1336        size_t nLeft;
1337        struct evbuffer * block_buffer;
1338
1339        if (msgs->incoming.block == NULL)
1340            msgs->incoming.block = evbuffer_new ();
1341        block_buffer = msgs->incoming.block;
1342
1343        /* read in another chunk of data */
1344        nLeft = req->length - evbuffer_get_length (block_buffer);
1345        n = MIN (nLeft, inlen);
1346
1347        tr_peerIoReadBytesToBuf (msgs->peer->io, inbuf, block_buffer, n);
1348
1349        fireClientGotPieceData (msgs, n);
1350        *setme_piece_bytes_read += n;
1351        dbgmsg (msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1352               n, req->index, req->offset, req->length,
1353             (int)(req->length - evbuffer_get_length (block_buffer)));
1354        if (evbuffer_get_length (block_buffer) < req->length)
1355            return READ_LATER;
1356
1357        /* pass the block along... */
1358        err = clientGotBlock (msgs, block_buffer, req);
1359        evbuffer_drain (block_buffer, evbuffer_get_length (block_buffer));
1360
1361        /* cleanup */
1362        req->length = 0;
1363        msgs->state = AWAITING_BT_LENGTH;
1364        return err ? READ_ERR : READ_NOW;
1365    }
1366}
1367
1368static void updateDesiredRequestCount (tr_peermsgs * msgs);
1369
1370static int
1371readBtMessage (tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1372{
1373    uint32_t      ui32;
1374    uint32_t      msglen = msgs->incoming.length;
1375    const uint8_t id = msgs->incoming.id;
1376#ifndef NDEBUG
1377    const size_t  startBufLen = evbuffer_get_length (inbuf);
1378#endif
1379    const bool fext = tr_peerIoSupportsFEXT (msgs->peer->io);
1380
1381    --msglen; /* id length */
1382
1383    dbgmsg (msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen);
1384
1385    if (inlen < msglen)
1386        return READ_LATER;
1387
1388    if (!messageLengthIsCorrect (msgs, id, msglen + 1))
1389    {
1390        dbgmsg (msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen);
1391        fireError (msgs, EMSGSIZE);
1392        return READ_ERR;
1393    }
1394
1395    switch (id)
1396    {
1397        case BT_CHOKE:
1398            dbgmsg (msgs, "got Choke");
1399            msgs->peer->clientIsChoked = 1;
1400            if (!fext)
1401                fireGotChoke (msgs);
1402            break;
1403
1404        case BT_UNCHOKE:
1405            dbgmsg (msgs, "got Unchoke");
1406            msgs->peer->clientIsChoked = 0;
1407            updateDesiredRequestCount (msgs);
1408            break;
1409
1410        case BT_INTERESTED:
1411            dbgmsg (msgs, "got Interested");
1412            msgs->peer->peerIsInterested = 1;
1413            break;
1414
1415        case BT_NOT_INTERESTED:
1416            dbgmsg (msgs, "got Not Interested");
1417            msgs->peer->peerIsInterested = 0;
1418            break;
1419
1420        case BT_HAVE:
1421            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &ui32);
1422            dbgmsg (msgs, "got Have: %u", ui32);
1423            if (tr_torrentHasMetadata (msgs->torrent)
1424                    && (ui32 >= msgs->torrent->info.pieceCount))
1425            {
1426                fireError (msgs, ERANGE);
1427                return READ_ERR;
1428            }
1429
1430            /* a peer can send the same HAVE message twice... */
1431            if (!tr_bitfieldHas (&msgs->peer->have, ui32)) {
1432                tr_bitfieldAdd (&msgs->peer->have, ui32);
1433                fireClientGotHave (msgs, ui32);
1434            }
1435            updatePeerProgress (msgs);
1436            break;
1437
1438        case BT_BITFIELD: {
1439            uint8_t * tmp = tr_new (uint8_t, msglen);
1440            dbgmsg (msgs, "got a bitfield");
1441            tr_peerIoReadBytes (msgs->peer->io, inbuf, tmp, msglen);
1442            tr_bitfieldSetRaw (&msgs->peer->have, tmp, msglen, tr_torrentHasMetadata (msgs->torrent));
1443            fireClientGotBitfield (msgs, &msgs->peer->have);
1444            updatePeerProgress (msgs);
1445            tr_free (tmp);
1446            break;
1447        }
1448
1449        case BT_REQUEST:
1450        {
1451            struct peer_request r;
1452            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.index);
1453            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.offset);
1454            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.length);
1455            dbgmsg (msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length);
1456            peerMadeRequest (msgs, &r);
1457            break;
1458        }
1459
1460        case BT_CANCEL:
1461        {
1462            int i;
1463            struct peer_request r;
1464            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.index);
1465            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.offset);
1466            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.length);
1467            tr_historyAdd (&msgs->peer->cancelsSentToClient, tr_time (), 1);
1468            dbgmsg (msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length);
1469
1470            for (i=0; i<msgs->peer->pendingReqsToClient; ++i) {
1471                const struct peer_request * req = msgs->peerAskedFor + i;
1472                if ((req->index == r.index) && (req->offset == r.offset) && (req->length == r.length))
1473                    break;
1474            }
1475
1476            if (i < msgs->peer->pendingReqsToClient)
1477                tr_removeElementFromArray (msgs->peerAskedFor, i, sizeof (struct peer_request),
1478                                           msgs->peer->pendingReqsToClient--);
1479            break;
1480        }
1481
1482        case BT_PIECE:
1483            assert (0); /* handled elsewhere! */
1484            break;
1485
1486        case BT_PORT:
1487            dbgmsg (msgs, "Got a BT_PORT");
1488            tr_peerIoReadUint16 (msgs->peer->io, inbuf, &msgs->dht_port);
1489            if (msgs->dht_port > 0)
1490                tr_dhtAddNode (getSession (msgs),
1491                               tr_peerAddress (msgs->peer),
1492                               msgs->dht_port, 0);
1493            break;
1494
1495        case BT_FEXT_SUGGEST:
1496            dbgmsg (msgs, "Got a BT_FEXT_SUGGEST");
1497            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &ui32);
1498            if (fext)
1499                fireClientGotSuggest (msgs, ui32);
1500            else {
1501                fireError (msgs, EMSGSIZE);
1502                return READ_ERR;
1503            }
1504            break;
1505
1506        case BT_FEXT_ALLOWED_FAST:
1507            dbgmsg (msgs, "Got a BT_FEXT_ALLOWED_FAST");
1508            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &ui32);
1509            if (fext)
1510                fireClientGotAllowedFast (msgs, ui32);
1511            else {
1512                fireError (msgs, EMSGSIZE);
1513                return READ_ERR;
1514            }
1515            break;
1516
1517        case BT_FEXT_HAVE_ALL:
1518            dbgmsg (msgs, "Got a BT_FEXT_HAVE_ALL");
1519            if (fext) {
1520                tr_bitfieldSetHasAll (&msgs->peer->have);
1521assert (tr_bitfieldHasAll (&msgs->peer->have));
1522                fireClientGotHaveAll (msgs);
1523                updatePeerProgress (msgs);
1524            } else {
1525                fireError (msgs, EMSGSIZE);
1526                return READ_ERR;
1527            }
1528            break;
1529
1530        case BT_FEXT_HAVE_NONE:
1531            dbgmsg (msgs, "Got a BT_FEXT_HAVE_NONE");
1532            if (fext) {
1533                tr_bitfieldSetHasNone (&msgs->peer->have);
1534                fireClientGotHaveNone (msgs);
1535                updatePeerProgress (msgs);
1536            } else {
1537                fireError (msgs, EMSGSIZE);
1538                return READ_ERR;
1539            }
1540            break;
1541
1542        case BT_FEXT_REJECT:
1543        {
1544            struct peer_request r;
1545            dbgmsg (msgs, "Got a BT_FEXT_REJECT");
1546            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.index);
1547            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.offset);
1548            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.length);
1549            if (fext)
1550                fireGotRej (msgs, &r);
1551            else {
1552                fireError (msgs, EMSGSIZE);
1553                return READ_ERR;
1554            }
1555            break;
1556        }
1557
1558        case BT_LTEP:
1559            dbgmsg (msgs, "Got a BT_LTEP");
1560            parseLtep (msgs, msglen, inbuf);
1561            break;
1562
1563        default:
1564            dbgmsg (msgs, "peer sent us an UNKNOWN: %d", (int)id);
1565            tr_peerIoDrain (msgs->peer->io, inbuf, msglen);
1566            break;
1567    }
1568
1569    assert (msglen + 1 == msgs->incoming.length);
1570    assert (evbuffer_get_length (inbuf) == startBufLen - msglen);
1571
1572    msgs->state = AWAITING_BT_LENGTH;
1573    return READ_NOW;
1574}
1575
1576/* returns 0 on success, or an errno on failure */
1577static int
1578clientGotBlock (tr_peermsgs                * msgs,
1579                struct evbuffer            * data,
1580                const struct peer_request  * req)
1581{
1582    int err;
1583    tr_torrent * tor = msgs->torrent;
1584    const tr_block_index_t block = _tr_block (tor, req->index, req->offset);
1585
1586    assert (msgs);
1587    assert (req);
1588
1589    if (req->length != tr_torBlockCountBytes (msgs->torrent, block)) {
1590        dbgmsg (msgs, "wrong block size -- expected %u, got %d",
1591                tr_torBlockCountBytes (msgs->torrent, block), req->length);
1592        return EMSGSIZE;
1593    }
1594
1595    dbgmsg (msgs, "got block %u:%u->%u", req->index, req->offset, req->length);
1596
1597    if (!tr_peerMgrDidPeerRequest (msgs->torrent, msgs->peer, block)) {
1598        dbgmsg (msgs, "we didn't ask for this message...");
1599        return 0;
1600    }
1601    if (tr_cpPieceIsComplete (&msgs->torrent->completion, req->index)) {
1602        dbgmsg (msgs, "we did ask for this message, but the piece is already complete...");
1603        return 0;
1604    }
1605
1606    /**
1607    ***  Save the block
1608    **/
1609
1610    if ((err = tr_cacheWriteBlock (getSession (msgs)->cache, tor, req->index, req->offset, req->length, data)))
1611        return err;
1612
1613    tr_bitfieldAdd (&msgs->peer->blame, req->index);
1614    fireGotBlock (msgs, req);
1615    return 0;
1616}
1617
1618static int peerPulse (void * vmsgs);
1619
1620static void
1621didWrite (tr_peerIo * io UNUSED, size_t bytesWritten, bool wasPieceData, void * vmsgs)
1622{
1623    tr_peermsgs * msgs = vmsgs;
1624
1625    if (wasPieceData)
1626      firePeerGotPieceData (msgs, bytesWritten);
1627
1628    if (tr_isPeerIo (io) && io->userData)
1629        peerPulse (msgs);
1630}
1631
1632static ReadState
1633canRead (tr_peerIo * io, void * vmsgs, size_t * piece)
1634{
1635    ReadState         ret;
1636    tr_peermsgs *     msgs = vmsgs;
1637    struct evbuffer * in = tr_peerIoGetReadBuffer (io);
1638    const size_t      inlen = evbuffer_get_length (in);
1639
1640    dbgmsg (msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state);
1641
1642    if (!inlen)
1643    {
1644        ret = READ_LATER;
1645    }
1646    else if (msgs->state == AWAITING_BT_PIECE)
1647    {
1648        ret = readBtPiece (msgs, in, inlen, piece);
1649    }
1650    else switch (msgs->state)
1651    {
1652        case AWAITING_BT_LENGTH:
1653            ret = readBtLength (msgs, in, inlen); break;
1654
1655        case AWAITING_BT_ID:
1656            ret = readBtId   (msgs, in, inlen); break;
1657
1658        case AWAITING_BT_MESSAGE:
1659            ret = readBtMessage (msgs, in, inlen); break;
1660
1661        default:
1662            ret = READ_ERR;
1663            assert (0);
1664    }
1665
1666    dbgmsg (msgs, "canRead: ret is %d", (int)ret);
1667
1668    return ret;
1669}
1670
1671int
1672tr_peerMsgsIsReadingBlock (const tr_peermsgs * msgs, tr_block_index_t block)
1673{
1674    if (msgs->state != AWAITING_BT_PIECE)
1675        return false;
1676
1677    return block == _tr_block (msgs->torrent,
1678                               msgs->incoming.blockReq.index,
1679                               msgs->incoming.blockReq.offset);
1680}
1681
1682/**
1683***
1684**/
1685
1686static void
1687updateDesiredRequestCount (tr_peermsgs * msgs)
1688{
1689    tr_torrent * const torrent = msgs->torrent;
1690
1691    /* there are lots of reasons we might not want to request any blocks... */
1692    if (tr_torrentIsSeed (torrent) || !tr_torrentHasMetadata (torrent)
1693                                    || msgs->peer->clientIsChoked
1694                                    || !msgs->peer->clientIsInterested)
1695    {
1696        msgs->desiredRequestCount = 0;
1697    }
1698    else
1699    {
1700        int estimatedBlocksInPeriod;
1701        unsigned int rate_Bps;
1702        unsigned int irate_Bps;
1703        const int floor = 4;
1704        const int seconds = REQUEST_BUF_SECS;
1705        const uint64_t now = tr_time_msec ();
1706
1707        /* Get the rate limit we should use.
1708         * FIXME: this needs to consider all the other peers as well... */
1709        rate_Bps = tr_peerGetPieceSpeed_Bps (msgs->peer, now, TR_PEER_TO_CLIENT);
1710        if (tr_torrentUsesSpeedLimit (torrent, TR_PEER_TO_CLIENT))
1711            rate_Bps = MIN (rate_Bps, tr_torrentGetSpeedLimit_Bps (torrent, TR_PEER_TO_CLIENT));
1712
1713        /* honor the session limits, if enabled */
1714        if (tr_torrentUsesSessionLimits (torrent) &&
1715            tr_sessionGetActiveSpeedLimit_Bps (torrent->session, TR_PEER_TO_CLIENT, &irate_Bps))
1716                rate_Bps = MIN (rate_Bps, irate_Bps);
1717
1718        /* use this desired rate to figure out how
1719         * many requests we should send to this peer */
1720        estimatedBlocksInPeriod = (rate_Bps * seconds) / torrent->blockSize;
1721        msgs->desiredRequestCount = MAX (floor, estimatedBlocksInPeriod);
1722
1723        /* honor the peer's maximum request count, if specified */
1724        if (msgs->reqq > 0)
1725            if (msgs->desiredRequestCount > msgs->reqq)
1726                msgs->desiredRequestCount = msgs->reqq;
1727    }
1728}
1729
1730static void
1731updateMetadataRequests (tr_peermsgs * msgs, time_t now)
1732{
1733    int piece;
1734
1735    if (msgs->peerSupportsMetadataXfer
1736        && tr_torrentGetNextMetadataRequest (msgs->torrent, now, &piece))
1737    {
1738        tr_variant tmp;
1739        struct evbuffer * payload;
1740        struct evbuffer * out = msgs->outMessages;
1741
1742        /* build the data message */
1743        tr_variantInitDict (&tmp, 3);
1744        tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REQUEST);
1745        tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1746        payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1747
1748        dbgmsg (msgs, "requesting metadata piece #%d", piece);
1749
1750        /* write it out as a LTEP message to our outMessages buffer */
1751        evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1752        evbuffer_add_uint8 (out, BT_LTEP);
1753        evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1754        evbuffer_add_buffer (out, payload);
1755        pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1756        dbgOutMessageLen (msgs);
1757
1758        /* cleanup */
1759        evbuffer_free (payload);
1760        tr_variantFree (&tmp);
1761    }
1762}
1763
1764static void
1765updateBlockRequests (tr_peermsgs * msgs)
1766{
1767    if (tr_torrentIsPieceTransferAllowed (msgs->torrent, TR_PEER_TO_CLIENT)
1768        && (msgs->desiredRequestCount > 0)
1769        && (msgs->peer->pendingReqsToPeer <= (msgs->desiredRequestCount * 0.66)))
1770    {
1771        int i;
1772        int n;
1773        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1774        tr_block_index_t * blocks = tr_new (tr_block_index_t, numwant);
1775
1776        tr_peerMgrGetNextRequests (msgs->torrent, msgs->peer, numwant, blocks, &n, false);
1777
1778        for (i=0; i<n; ++i)
1779        {
1780            struct peer_request req;
1781            blockToReq (msgs->torrent, blocks[i], &req);
1782            protocolSendRequest (msgs, &req);
1783        }
1784
1785        tr_free (blocks);
1786    }
1787}
1788
1789static size_t
1790fillOutputBuffer (tr_peermsgs * msgs, time_t now)
1791{
1792    int piece;
1793    size_t bytesWritten = 0;
1794    struct peer_request req;
1795    const bool haveMessages = evbuffer_get_length (msgs->outMessages) != 0;
1796    const bool fext = tr_peerIoSupportsFEXT (msgs->peer->io);
1797
1798    /**
1799    ***  Protocol messages
1800    **/
1801
1802    if (haveMessages && !msgs->outMessagesBatchedAt) /* fresh batch */
1803    {
1804        dbgmsg (msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length (msgs->outMessages));
1805        msgs->outMessagesBatchedAt = now;
1806    }
1807    else if (haveMessages && ((now - msgs->outMessagesBatchedAt) >= msgs->outMessagesBatchPeriod))
1808    {
1809        const size_t len = evbuffer_get_length (msgs->outMessages);
1810        /* flush the protocol messages */
1811        dbgmsg (msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len);
1812        tr_peerIoWriteBuf (msgs->peer->io, msgs->outMessages, false);
1813        msgs->clientSentAnythingAt = now;
1814        msgs->outMessagesBatchedAt = 0;
1815        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1816        bytesWritten +=  len;
1817    }
1818
1819    /**
1820    ***  Metadata Pieces
1821    **/
1822
1823    if ((tr_peerIoGetWriteBufferSpace (msgs->peer->io, now) >= METADATA_PIECE_SIZE)
1824        && popNextMetadataRequest (msgs, &piece))
1825    {
1826        char * data;
1827        int dataLen;
1828        bool ok = false;
1829
1830        data = tr_torrentGetMetadataPiece (msgs->torrent, piece, &dataLen);
1831        if ((dataLen > 0) && (data != NULL))
1832        {
1833            tr_variant tmp;
1834            struct evbuffer * payload;
1835            struct evbuffer * out = msgs->outMessages;
1836
1837            /* build the data message */
1838            tr_variantInitDict (&tmp, 3);
1839            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_DATA);
1840            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1841            tr_variantDictAddInt (&tmp, TR_KEY_total_size, msgs->torrent->infoDictLength);
1842            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1843
1844            /* write it out as a LTEP message to our outMessages buffer */
1845            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload) + dataLen);
1846            evbuffer_add_uint8 (out, BT_LTEP);
1847            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1848            evbuffer_add_buffer (out, payload);
1849            evbuffer_add     (out, data, dataLen);
1850            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1851            dbgOutMessageLen (msgs);
1852
1853            evbuffer_free (payload);
1854            tr_variantFree (&tmp);
1855            tr_free (data);
1856
1857            ok = true;
1858        }
1859
1860        if (!ok) /* send a rejection message */
1861        {
1862            tr_variant tmp;
1863            struct evbuffer * payload;
1864            struct evbuffer * out = msgs->outMessages;
1865
1866            /* build the rejection message */
1867            tr_variantInitDict (&tmp, 2);
1868            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
1869            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1870            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1871
1872            /* write it out as a LTEP message to our outMessages buffer */
1873            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1874            evbuffer_add_uint8 (out, BT_LTEP);
1875            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1876            evbuffer_add_buffer (out, payload);
1877            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1878            dbgOutMessageLen (msgs);
1879
1880            evbuffer_free (payload);
1881            tr_variantFree (&tmp);
1882        }
1883    }
1884
1885    /**
1886    ***  Data Blocks
1887    **/
1888
1889    if ((tr_peerIoGetWriteBufferSpace (msgs->peer->io, now) >= msgs->torrent->blockSize)
1890        && popNextRequest (msgs, &req))
1891    {
1892        --msgs->prefetchCount;
1893
1894        if (requestIsValid (msgs, &req)
1895            && tr_cpPieceIsComplete (&msgs->torrent->completion, req.index))
1896        {
1897            int err;
1898            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1899            struct evbuffer * out;
1900            struct evbuffer_iovec iovec[1];
1901
1902            out = evbuffer_new ();
1903            evbuffer_expand (out, msglen);
1904
1905            evbuffer_add_uint32 (out, sizeof (uint8_t) + 2 * sizeof (uint32_t) + req.length);
1906            evbuffer_add_uint8 (out, BT_PIECE);
1907            evbuffer_add_uint32 (out, req.index);
1908            evbuffer_add_uint32 (out, req.offset);
1909
1910            evbuffer_reserve_space (out, req.length, iovec, 1);
1911            err = tr_cacheReadBlock (getSession (msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base);
1912            iovec[0].iov_len = req.length;
1913            evbuffer_commit_space (out, iovec, 1);
1914
1915            /* check the piece if it needs checking... */
1916            if (!err && tr_torrentPieceNeedsCheck (msgs->torrent, req.index))
1917                if ((err = !tr_torrentCheckPiece (msgs->torrent, req.index)))
1918                    tr_torrentSetLocalError (msgs->torrent, _("Please Verify Local Data! Piece #%zu is corrupt."), (size_t)req.index);
1919
1920            if (err)
1921            {
1922                if (fext)
1923                    protocolSendReject (msgs, &req);
1924            }
1925            else
1926            {
1927                const size_t n = evbuffer_get_length (out);
1928                dbgmsg (msgs, "sending block %u:%u->%u", req.index, req.offset, req.length);
1929                assert (n == msglen);
1930                tr_peerIoWriteBuf (msgs->peer->io, out, true);
1931                bytesWritten += n;
1932                msgs->clientSentAnythingAt = now;
1933                tr_historyAdd (&msgs->peer->blocksSentToPeer, tr_time (), 1);
1934            }
1935
1936            evbuffer_free (out);
1937
1938            if (err)
1939            {
1940                bytesWritten = 0;
1941                msgs = NULL;
1942            }
1943        }
1944        else if (fext) /* peer needs a reject message */
1945        {
1946            protocolSendReject (msgs, &req);
1947        }
1948
1949        if (msgs != NULL)
1950            prefetchPieces (msgs);
1951    }
1952
1953    /**
1954    ***  Keepalive
1955    **/
1956
1957    if ((msgs != NULL)
1958        && (msgs->clientSentAnythingAt != 0)
1959        && ((now - msgs->clientSentAnythingAt) > KEEPALIVE_INTERVAL_SECS))
1960    {
1961        dbgmsg (msgs, "sending a keepalive message");
1962        evbuffer_add_uint32 (msgs->outMessages, 0);
1963        pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
1964    }
1965
1966    return bytesWritten;
1967}
1968
1969static int
1970peerPulse (void * vmsgs)
1971{
1972    tr_peermsgs * msgs = vmsgs;
1973    const time_t  now = tr_time ();
1974
1975    if (tr_isPeerIo (msgs->peer->io)) {
1976        updateDesiredRequestCount (msgs);
1977        updateBlockRequests (msgs);
1978        updateMetadataRequests (msgs, now);
1979    }
1980
1981    for (;;)
1982        if (fillOutputBuffer (msgs, now) < 1)
1983            break;
1984
1985    return true; /* loop forever */
1986}
1987
1988void
1989tr_peerMsgsPulse (tr_peermsgs * msgs)
1990{
1991    if (msgs != NULL)
1992        peerPulse (msgs);
1993}
1994
1995static void
1996gotError (tr_peerIo * io UNUSED, short what, void * vmsgs)
1997{
1998    if (what & BEV_EVENT_TIMEOUT)
1999        dbgmsg (vmsgs, "libevent got a timeout, what=%hd", what);
2000    if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR))
2001        dbgmsg (vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2002               what, errno, tr_strerror (errno));
2003    fireError (vmsgs, ENOTCONN);
2004}
2005
2006static void
2007sendBitfield (tr_peermsgs * msgs)
2008{
2009    void * bytes;
2010    size_t byte_count = 0;
2011    struct evbuffer * out = msgs->outMessages;
2012
2013    assert (tr_torrentHasMetadata (msgs->torrent));
2014
2015    bytes = tr_cpCreatePieceBitfield (&msgs->torrent->completion, &byte_count);
2016    evbuffer_add_uint32 (out, sizeof (uint8_t) + byte_count);
2017    evbuffer_add_uint8 (out, BT_BITFIELD);
2018    evbuffer_add     (out, bytes, byte_count);
2019    dbgmsg (msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length (out));
2020    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2021
2022    tr_free (bytes);
2023}
2024
2025static void
2026tellPeerWhatWeHave (tr_peermsgs * msgs)
2027{
2028    const bool fext = tr_peerIoSupportsFEXT (msgs->peer->io);
2029
2030    if (fext && tr_cpHasAll (&msgs->torrent->completion))
2031    {
2032        protocolSendHaveAll (msgs);
2033    }
2034    else if (fext && tr_cpHasNone (&msgs->torrent->completion))
2035    {
2036        protocolSendHaveNone (msgs);
2037    }
2038    else if (!tr_cpHasNone (&msgs->torrent->completion))
2039    {
2040        sendBitfield (msgs);
2041    }
2042}
2043
2044/**
2045***
2046**/
2047
2048/* some peers give us error messages if we send
2049   more than this many peers in a single pex message
2050   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2051#define MAX_PEX_ADDED 50
2052#define MAX_PEX_DROPPED 50
2053
2054typedef struct
2055{
2056    tr_pex *  added;
2057    tr_pex *  dropped;
2058    tr_pex *  elements;
2059    int       addedCount;
2060    int       droppedCount;
2061    int       elementCount;
2062}
2063PexDiffs;
2064
2065static void
2066pexAddedCb (void * vpex, void * userData)
2067{
2068    PexDiffs * diffs = userData;
2069    tr_pex *   pex = vpex;
2070
2071    if (diffs->addedCount < MAX_PEX_ADDED)
2072    {
2073        diffs->added[diffs->addedCount++] = *pex;
2074        diffs->elements[diffs->elementCount++] = *pex;
2075    }
2076}
2077
2078static inline void
2079pexDroppedCb (void * vpex, void * userData)
2080{
2081    PexDiffs * diffs = userData;
2082    tr_pex *   pex = vpex;
2083
2084    if (diffs->droppedCount < MAX_PEX_DROPPED)
2085    {
2086        diffs->dropped[diffs->droppedCount++] = *pex;
2087    }
2088}
2089
2090static inline void
2091pexElementCb (void * vpex, void * userData)
2092{
2093    PexDiffs * diffs = userData;
2094    tr_pex * pex = vpex;
2095
2096    diffs->elements[diffs->elementCount++] = *pex;
2097}
2098
2099typedef void (tr_set_func)(void * element, void * userData);
2100
2101/**
2102 * @brief find the differences and commonalities in two sorted sets
2103 * @param a the first set
2104 * @param aCount the number of elements in the set 'a'
2105 * @param b the second set
2106 * @param bCount the number of elements in the set 'b'
2107 * @param compare the sorting method for both sets
2108 * @param elementSize the sizeof the element in the two sorted sets
2109 * @param in_a called for items in set 'a' but not set 'b'
2110 * @param in_b called for items in set 'b' but not set 'a'
2111 * @param in_both called for items that are in both sets
2112 * @param userData user data passed along to in_a, in_b, and in_both
2113 */
2114static void
2115tr_set_compare (const void * va, size_t aCount,
2116                const void * vb, size_t bCount,
2117                int compare (const void * a, const void * b),
2118                size_t elementSize,
2119                tr_set_func in_a_cb,
2120                tr_set_func in_b_cb,
2121                tr_set_func in_both_cb,
2122                void * userData)
2123{
2124    const uint8_t * a = va;
2125    const uint8_t * b = vb;
2126    const uint8_t * aend = a + elementSize * aCount;
2127    const uint8_t * bend = b + elementSize * bCount;
2128
2129    while (a != aend || b != bend)
2130    {
2131        if (a == aend)
2132        {
2133          (*in_b_cb)((void*)b, userData);
2134            b += elementSize;
2135        }
2136        else if (b == bend)
2137        {
2138          (*in_a_cb)((void*)a, userData);
2139            a += elementSize;
2140        }
2141        else
2142        {
2143            const int val = (*compare)(a, b);
2144
2145            if (!val)
2146            {
2147              (*in_both_cb)((void*)a, userData);
2148                a += elementSize;
2149                b += elementSize;
2150            }
2151            else if (val < 0)
2152            {
2153              (*in_a_cb)((void*)a, userData);
2154                a += elementSize;
2155            }
2156            else if (val > 0)
2157            {
2158              (*in_b_cb)((void*)b, userData);
2159                b += elementSize;
2160            }
2161        }
2162    }
2163}
2164
2165
2166static void
2167sendPex (tr_peermsgs * msgs)
2168{
2169    if (msgs->peerSupportsPex && tr_torrentAllowsPex (msgs->torrent))
2170    {
2171        PexDiffs diffs;
2172        PexDiffs diffs6;
2173        tr_pex * newPex = NULL;
2174        tr_pex * newPex6 = NULL;
2175        const int newCount = tr_peerMgrGetPeers (msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2176        const int newCount6 = tr_peerMgrGetPeers (msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2177
2178        /* build the diffs */
2179        diffs.added = tr_new (tr_pex, newCount);
2180        diffs.addedCount = 0;
2181        diffs.dropped = tr_new (tr_pex, msgs->pexCount);
2182        diffs.droppedCount = 0;
2183        diffs.elements = tr_new (tr_pex, newCount + msgs->pexCount);
2184        diffs.elementCount = 0;
2185        tr_set_compare (msgs->pex, msgs->pexCount,
2186                        newPex, newCount,
2187                        tr_pexCompare, sizeof (tr_pex),
2188                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs);
2189        diffs6.added = tr_new (tr_pex, newCount6);
2190        diffs6.addedCount = 0;
2191        diffs6.dropped = tr_new (tr_pex, msgs->pexCount6);
2192        diffs6.droppedCount = 0;
2193        diffs6.elements = tr_new (tr_pex, newCount6 + msgs->pexCount6);
2194        diffs6.elementCount = 0;
2195        tr_set_compare (msgs->pex6, msgs->pexCount6,
2196                        newPex6, newCount6,
2197                        tr_pexCompare, sizeof (tr_pex),
2198                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6);
2199        dbgmsg (
2200            msgs,
2201            "pex: old peer count %d+%d, new peer count %d+%d, "
2202            "added %d+%d, removed %d+%d",
2203            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2204            diffs.addedCount, diffs6.addedCount,
2205            diffs.droppedCount, diffs6.droppedCount);
2206
2207        if (!diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2208            !diffs6.droppedCount)
2209        {
2210            tr_free (diffs.elements);
2211            tr_free (diffs6.elements);
2212        }
2213        else
2214        {
2215            int  i;
2216            tr_variant val;
2217            uint8_t * tmp, *walk;
2218            struct evbuffer * payload;
2219            struct evbuffer * out = msgs->outMessages;
2220
2221            /* update peer */
2222            tr_free (msgs->pex);
2223            msgs->pex = diffs.elements;
2224            msgs->pexCount = diffs.elementCount;
2225            tr_free (msgs->pex6);
2226            msgs->pex6 = diffs6.elements;
2227            msgs->pexCount6 = diffs6.elementCount;
2228
2229            /* build the pex payload */
2230            tr_variantInitDict (&val, 3); /* ipv6 support: left as 3:
2231                                         * speed vs. likelihood? */
2232
2233            if (diffs.addedCount > 0)
2234            {
2235                /* "added" */
2236                tmp = walk = tr_new (uint8_t, diffs.addedCount * 6);
2237                for (i = 0; i < diffs.addedCount; ++i) {
2238                    memcpy (walk, &diffs.added[i].addr.addr, 4); walk += 4;
2239                    memcpy (walk, &diffs.added[i].port, 2); walk += 2;
2240                }
2241                assert ((walk - tmp) == diffs.addedCount * 6);
2242                tr_variantDictAddRaw (&val, TR_KEY_added, tmp, walk - tmp);
2243                tr_free (tmp);
2244
2245                /* "added.f"
2246                 * unset each holepunch flag because we don't support it. */
2247                tmp = walk = tr_new (uint8_t, diffs.addedCount);
2248                for (i = 0; i < diffs.addedCount; ++i)
2249                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2250                assert ((walk - tmp) == diffs.addedCount);
2251                tr_variantDictAddRaw (&val, TR_KEY_added_f, tmp, walk - tmp);
2252                tr_free (tmp);
2253            }
2254
2255            if (diffs.droppedCount > 0)
2256            {
2257                /* "dropped" */
2258                tmp = walk = tr_new (uint8_t, diffs.droppedCount * 6);
2259                for (i = 0; i < diffs.droppedCount; ++i) {
2260                    memcpy (walk, &diffs.dropped[i].addr.addr, 4); walk += 4;
2261                    memcpy (walk, &diffs.dropped[i].port, 2); walk += 2;
2262                }
2263                assert ((walk - tmp) == diffs.droppedCount * 6);
2264                tr_variantDictAddRaw (&val, TR_KEY_dropped, tmp, walk - tmp);
2265                tr_free (tmp);
2266            }
2267
2268            if (diffs6.addedCount > 0)
2269            {
2270                /* "added6" */
2271                tmp = walk = tr_new (uint8_t, diffs6.addedCount * 18);
2272                for (i = 0; i < diffs6.addedCount; ++i) {
2273                    memcpy (walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16);
2274                    walk += 16;
2275                    memcpy (walk, &diffs6.added[i].port, 2);
2276                    walk += 2;
2277                }
2278                assert ((walk - tmp) == diffs6.addedCount * 18);
2279                tr_variantDictAddRaw (&val, TR_KEY_added6, tmp, walk - tmp);
2280                tr_free (tmp);
2281
2282                /* "added6.f"
2283                 * unset each holepunch flag because we don't support it. */
2284                tmp = walk = tr_new (uint8_t, diffs6.addedCount);
2285                for (i = 0; i < diffs6.addedCount; ++i)
2286                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2287                assert ((walk - tmp) == diffs6.addedCount);
2288                tr_variantDictAddRaw (&val, TR_KEY_added6_f, tmp, walk - tmp);
2289                tr_free (tmp);
2290            }
2291
2292            if (diffs6.droppedCount > 0)
2293            {
2294                /* "dropped6" */
2295                tmp = walk = tr_new (uint8_t, diffs6.droppedCount * 18);
2296                for (i = 0; i < diffs6.droppedCount; ++i) {
2297                    memcpy (walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16);
2298                    walk += 16;
2299                    memcpy (walk, &diffs6.dropped[i].port, 2);
2300                    walk += 2;
2301                }
2302                assert ((walk - tmp) == diffs6.droppedCount * 18);
2303                tr_variantDictAddRaw (&val, TR_KEY_dropped6, tmp, walk - tmp);
2304                tr_free (tmp);
2305            }
2306
2307            /* write the pex message */
2308            payload = tr_variantToBuf (&val, TR_VARIANT_FMT_BENC);
2309            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
2310            evbuffer_add_uint8 (out, BT_LTEP);
2311            evbuffer_add_uint8 (out, msgs->ut_pex_id);
2312            evbuffer_add_buffer (out, payload);
2313            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
2314            dbgmsg (msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length (out));
2315            dbgOutMessageLen (msgs);
2316
2317            evbuffer_free (payload);
2318            tr_variantFree (&val);
2319        }
2320
2321        /* cleanup */
2322        tr_free (diffs.added);
2323        tr_free (diffs.dropped);
2324        tr_free (newPex);
2325        tr_free (diffs6.added);
2326        tr_free (diffs6.dropped);
2327        tr_free (newPex6);
2328
2329        /*msgs->clientSentPexAt = tr_time ();*/
2330    }
2331}
2332
2333static void
2334pexPulse (int foo UNUSED, short bar UNUSED, void * vmsgs)
2335{
2336    struct tr_peermsgs * msgs = vmsgs;
2337
2338    sendPex (msgs);
2339
2340    assert (msgs->pexTimer != NULL);
2341    tr_timerAdd (msgs->pexTimer, PEX_INTERVAL_SECS, 0);
2342}
2343
2344/**
2345***
2346**/
2347
2348tr_peermsgs*
2349tr_peerMsgsNew (struct tr_torrent    * torrent,
2350                struct tr_peer       * peer,
2351                tr_peer_callback     * callback,
2352                void                 * callbackData)
2353{
2354    tr_peermsgs * m;
2355
2356    assert (peer);
2357    assert (peer->io);
2358
2359    m = tr_new0 (tr_peermsgs, 1);
2360    m->callback = callback;
2361    m->callbackData = callbackData;
2362    m->peer = peer;
2363    m->torrent = torrent;
2364    m->peer->clientIsChoked = 1;
2365    m->peer->peerIsChoked = 1;
2366    m->peer->clientIsInterested = 0;
2367    m->peer->peerIsInterested = 0;
2368    m->state = AWAITING_BT_LENGTH;
2369    m->outMessages = evbuffer_new ();
2370    m->outMessagesBatchedAt = 0;
2371    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2372    peer->msgs = m;
2373
2374    if (tr_torrentAllowsPex (torrent)) {
2375        m->pexTimer = evtimer_new (torrent->session->event_base, pexPulse, m);
2376        tr_timerAdd (m->pexTimer, PEX_INTERVAL_SECS, 0);
2377    }
2378
2379    if (tr_peerIoSupportsUTP (peer->io)) {
2380        const tr_address * addr = tr_peerIoGetAddress (peer->io, NULL);
2381        tr_peerMgrSetUtpSupported (torrent, addr);
2382        tr_peerMgrSetUtpFailed (torrent, addr, false);
2383    }
2384
2385    if (tr_peerIoSupportsLTEP (peer->io))
2386        sendLtepHandshake (m);
2387
2388    tellPeerWhatWeHave (m);
2389
2390    if (tr_dhtEnabled (torrent->session) && tr_peerIoSupportsDHT (peer->io))
2391    {
2392        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2393        const struct tr_address *addr = tr_peerIoGetAddress (peer->io, NULL);
2394        if (addr->type == TR_AF_INET || tr_globalIPv6 ()) {
2395            protocolSendPort (m, tr_dhtPort (torrent->session));
2396        }
2397    }
2398
2399    tr_peerIoSetIOFuncs (m->peer->io, canRead, didWrite, gotError, m);
2400    updateDesiredRequestCount (m);
2401
2402    return m;
2403}
2404
2405void
2406tr_peerMsgsFree (tr_peermsgs* msgs)
2407{
2408    if (msgs)
2409    {
2410        if (msgs->pexTimer != NULL)
2411            event_free (msgs->pexTimer);
2412
2413        if (msgs->incoming.block != NULL)
2414            evbuffer_free (msgs->incoming.block);
2415
2416        evbuffer_free (msgs->outMessages);
2417        tr_free (msgs->pex6);
2418        tr_free (msgs->pex);
2419
2420        memset (msgs, ~0, sizeof (tr_peermsgs));
2421        tr_free (msgs);
2422    }
2423}
Note: See TracBrowser for help on using the repository browser.