source: trunk/libtransmission/peer-msgs.c @ 13631

Last change on this file since 13631 was 13631, checked in by jordan, 9 years ago

(trunk, libT) #5165: fix r13625 oops

  • Property svn:keywords set to Date Rev Author Id
File size: 70.5 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2 (b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 13631 2012-12-07 01:53:31Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdarg.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <event2/buffer.h>
20#include <event2/bufferevent.h>
21#include <event2/event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "cache.h"
26#include "completion.h"
27#include "crypto.h" /* tr_sha1 () */
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-msgs.h"
31#include "session.h"
32#include "torrent.h"
33#include "torrent-magnet.h"
34#include "tr-dht.h"
35#include "utils.h"
36#include "version.h"
37
38/**
39***
40**/
41
42enum
43{
44    BT_CHOKE                = 0,
45    BT_UNCHOKE              = 1,
46    BT_INTERESTED           = 2,
47    BT_NOT_INTERESTED       = 3,
48    BT_HAVE                 = 4,
49    BT_BITFIELD             = 5,
50    BT_REQUEST              = 6,
51    BT_PIECE                = 7,
52    BT_CANCEL               = 8,
53    BT_PORT                 = 9,
54
55    BT_FEXT_SUGGEST         = 13,
56    BT_FEXT_HAVE_ALL        = 14,
57    BT_FEXT_HAVE_NONE       = 15,
58    BT_FEXT_REJECT          = 16,
59    BT_FEXT_ALLOWED_FAST    = 17,
60
61    BT_LTEP                 = 20,
62
63    LTEP_HANDSHAKE          = 0,
64
65    UT_PEX_ID               = 1,
66    UT_METADATA_ID          = 3,
67
68    MAX_PEX_PEER_COUNT      = 50,
69
70    MIN_CHOKE_PERIOD_SEC    = 10,
71
72    /* idle seconds before we send a keepalive */
73    KEEPALIVE_INTERVAL_SECS = 100,
74
75    PEX_INTERVAL_SECS       = 90, /* sec between sendPex () calls */
76
77    REQQ                    = 512,
78
79    METADATA_REQQ           = 64,
80
81    /* used in lowering the outMessages queue period */
82    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
83    HIGH_PRIORITY_INTERVAL_SECS = 2,
84    LOW_PRIORITY_INTERVAL_SECS = 10,
85
86    /* number of pieces we'll allow in our fast set */
87    MAX_FAST_SET_SIZE = 3,
88
89    /* defined in BEP #9 */
90    METADATA_MSG_TYPE_REQUEST = 0,
91    METADATA_MSG_TYPE_DATA = 1,
92    METADATA_MSG_TYPE_REJECT = 2
93};
94
95enum
96{
97    AWAITING_BT_LENGTH,
98    AWAITING_BT_ID,
99    AWAITING_BT_MESSAGE,
100    AWAITING_BT_PIECE
101};
102
103/**
104***
105**/
106
107struct peer_request
108{
109    uint32_t    index;
110    uint32_t    offset;
111    uint32_t    length;
112};
113
114static void
115blockToReq (const tr_torrent     * tor,
116            tr_block_index_t       block,
117            struct peer_request  * setme)
118{
119    tr_torrentGetBlockLocation (tor, block, &setme->index,
120                                            &setme->offset,
121                                            &setme->length);
122}
123
124/**
125***
126**/
127
128/* this is raw, unchanged data from the peer regarding
129 * the current message that it's sending us. */
130struct tr_incoming
131{
132    uint8_t                id;
133    uint32_t               length; /* includes the +1 for id length */
134    struct peer_request    blockReq; /* metadata for incoming blocks */
135    struct evbuffer *      block; /* piece data for incoming blocks */
136};
137
138/**
139 * Low-level communication state information about a connected peer.
140 *
141 * This structure remembers the low-level protocol states that we're
142 * in with this peer, such as active requests, pex messages, and so on.
143 * Its fields are all private to peer-msgs.c.
144 *
145 * Data not directly involved with sending & receiving messages is
146 * stored in tr_peer, where it can be accessed by both peermsgs and
147 * the peer manager.
148 *
149 * @see struct peer_atom
150 * @see tr_peer
151 */
152struct tr_peermsgs
153{
154    bool            peerSupportsPex;
155    bool            peerSupportsMetadataXfer;
156    bool            clientSentLtepHandshake;
157    bool            peerSentLtepHandshake;
158
159    /*bool          haveFastSet;*/
160
161    int             desiredRequestCount;
162
163    int             prefetchCount;
164
165    /* how long the outMessages batch should be allowed to grow before
166     * it's flushed -- some messages (like requests >:) should be sent
167     * very quickly; others aren't as urgent. */
168    int8_t          outMessagesBatchPeriod;
169
170    uint8_t         state;
171    uint8_t         ut_pex_id;
172    uint8_t         ut_metadata_id;
173    uint16_t        pexCount;
174    uint16_t        pexCount6;
175
176    size_t          metadata_size_hint;
177#if 0
178    size_t                 fastsetSize;
179    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
180#endif
181
182    tr_peer *              peer;
183
184    tr_torrent *           torrent;
185
186    tr_peer_callback      * callback;
187    void                  * callbackData;
188
189    struct evbuffer *      outMessages; /* all the non-piece messages */
190
191    struct peer_request    peerAskedFor[REQQ];
192
193    int                    peerAskedForMetadata[METADATA_REQQ];
194    int                    peerAskedForMetadataCount;
195
196    tr_pex               * pex;
197    tr_pex               * pex6;
198
199    /*time_t                 clientSentPexAt;*/
200    time_t                 clientSentAnythingAt;
201
202    /* when we started batching the outMessages */
203    time_t                outMessagesBatchedAt;
204
205    struct tr_incoming    incoming;
206
207    /* if the peer supports the Extension Protocol in BEP 10 and
208       supplied a reqq argument, it's stored here. Otherwise, the
209       value is zero and should be ignored. */
210    int64_t               reqq;
211
212    struct event        * pexTimer;
213};
214
215/**
216***
217**/
218
219static inline tr_session*
220getSession (struct tr_peermsgs * msgs)
221{
222    return msgs->torrent->session;
223}
224
225/**
226***
227**/
228
229static void
230myDebug (const char * file, int line,
231         const struct tr_peermsgs * msgs,
232         const char * fmt, ...)
233{
234    FILE * fp = tr_getLog ();
235
236    if (fp)
237    {
238        va_list           args;
239        char              timestr[64];
240        struct evbuffer * buf = evbuffer_new ();
241        char *            base = tr_basename (file);
242        char *            message;
243
244        evbuffer_add_printf (buf, "[%s] %s - %s [%s]: ",
245                             tr_getLogTimeStr (timestr, sizeof (timestr)),
246                             tr_torrentName (msgs->torrent),
247                             tr_peerIoGetAddrStr (msgs->peer->io),
248                             msgs->peer->client);
249        va_start (args, fmt);
250        evbuffer_add_vprintf (buf, fmt, args);
251        va_end (args);
252        evbuffer_add_printf (buf, " (%s:%d)\n", base, line);
253
254        message = evbuffer_free_to_str (buf);
255        fputs (message, fp);
256
257        tr_free (base);
258        tr_free (message);
259    }
260}
261
262#define dbgmsg(msgs, ...) \
263  do \
264    { \
265      if (tr_deepLoggingIsActive ()) \
266        myDebug (__FILE__, __LINE__, msgs, __VA_ARGS__); \
267    } \
268  while (0)
269
270/**
271***
272**/
273
274static void
275pokeBatchPeriod (tr_peermsgs * msgs, int interval)
276{
277    if (msgs->outMessagesBatchPeriod > interval)
278    {
279        msgs->outMessagesBatchPeriod = interval;
280        dbgmsg (msgs, "lowering batch interval to %d seconds", interval);
281    }
282}
283
284static void
285dbgOutMessageLen (tr_peermsgs * msgs)
286{
287    dbgmsg (msgs, "outMessage size is now %zu", evbuffer_get_length (msgs->outMessages));
288}
289
290static void
291protocolSendReject (tr_peermsgs * msgs, const struct peer_request * req)
292{
293    struct evbuffer * out = msgs->outMessages;
294
295    assert (tr_peerIoSupportsFEXT (msgs->peer->io));
296
297    evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
298    evbuffer_add_uint8 (out, BT_FEXT_REJECT);
299    evbuffer_add_uint32 (out, req->index);
300    evbuffer_add_uint32 (out, req->offset);
301    evbuffer_add_uint32 (out, req->length);
302
303    dbgmsg (msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length);
304    dbgOutMessageLen (msgs);
305}
306
307static void
308protocolSendRequest (tr_peermsgs * msgs, const struct peer_request * req)
309{
310    struct evbuffer * out = msgs->outMessages;
311
312    evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
313    evbuffer_add_uint8 (out, BT_REQUEST);
314    evbuffer_add_uint32 (out, req->index);
315    evbuffer_add_uint32 (out, req->offset);
316    evbuffer_add_uint32 (out, req->length);
317
318    dbgmsg (msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length);
319    dbgOutMessageLen (msgs);
320    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
321}
322
323static void
324protocolSendCancel (tr_peermsgs * msgs, const struct peer_request * req)
325{
326    struct evbuffer * out = msgs->outMessages;
327
328    evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
329    evbuffer_add_uint8 (out, BT_CANCEL);
330    evbuffer_add_uint32 (out, req->index);
331    evbuffer_add_uint32 (out, req->offset);
332    evbuffer_add_uint32 (out, req->length);
333
334    dbgmsg (msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length);
335    dbgOutMessageLen (msgs);
336    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
337}
338
339static void
340protocolSendPort (tr_peermsgs *msgs, uint16_t port)
341{
342    struct evbuffer * out = msgs->outMessages;
343
344    dbgmsg (msgs, "sending Port %u", port);
345    evbuffer_add_uint32 (out, 3);
346    evbuffer_add_uint8 (out, BT_PORT);
347    evbuffer_add_uint16 (out, port);
348}
349
350static void
351protocolSendHave (tr_peermsgs * msgs, uint32_t index)
352{
353    struct evbuffer * out = msgs->outMessages;
354
355    evbuffer_add_uint32 (out, sizeof (uint8_t) + sizeof (uint32_t));
356    evbuffer_add_uint8 (out, BT_HAVE);
357    evbuffer_add_uint32 (out, index);
358
359    dbgmsg (msgs, "sending Have %u", index);
360    dbgOutMessageLen (msgs);
361    pokeBatchPeriod (msgs, LOW_PRIORITY_INTERVAL_SECS);
362}
363
364#if 0
365static void
366protocolSendAllowedFast (tr_peermsgs * msgs, uint32_t pieceIndex)
367{
368    tr_peerIo       * io  = msgs->peer->io;
369    struct evbuffer * out = msgs->outMessages;
370
371    assert (tr_peerIoSupportsFEXT (msgs->peer->io));
372
373    evbuffer_add_uint32 (io, out, sizeof (uint8_t) + sizeof (uint32_t));
374    evbuffer_add_uint8 (io, out, BT_FEXT_ALLOWED_FAST);
375    evbuffer_add_uint32 (io, out, pieceIndex);
376
377    dbgmsg (msgs, "sending Allowed Fast %u...", pieceIndex);
378    dbgOutMessageLen (msgs);
379}
380#endif
381
382static void
383protocolSendChoke (tr_peermsgs * msgs, int choke)
384{
385    struct evbuffer * out = msgs->outMessages;
386
387    evbuffer_add_uint32 (out, sizeof (uint8_t));
388    evbuffer_add_uint8 (out, choke ? BT_CHOKE : BT_UNCHOKE);
389
390    dbgmsg (msgs, "sending %s...", choke ? "Choke" : "Unchoke");
391    dbgOutMessageLen (msgs);
392    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
393}
394
395static void
396protocolSendHaveAll (tr_peermsgs * msgs)
397{
398    struct evbuffer * out = msgs->outMessages;
399
400    assert (tr_peerIoSupportsFEXT (msgs->peer->io));
401
402    evbuffer_add_uint32 (out, sizeof (uint8_t));
403    evbuffer_add_uint8 (out, BT_FEXT_HAVE_ALL);
404
405    dbgmsg (msgs, "sending HAVE_ALL...");
406    dbgOutMessageLen (msgs);
407    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
408}
409
410static void
411protocolSendHaveNone (tr_peermsgs * msgs)
412{
413    struct evbuffer * out = msgs->outMessages;
414
415    assert (tr_peerIoSupportsFEXT (msgs->peer->io));
416
417    evbuffer_add_uint32 (out, sizeof (uint8_t));
418    evbuffer_add_uint8 (out, BT_FEXT_HAVE_NONE);
419
420    dbgmsg (msgs, "sending HAVE_NONE...");
421    dbgOutMessageLen (msgs);
422    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
423}
424
425/**
426***  EVENTS
427**/
428
429static void
430publish (tr_peermsgs * msgs, tr_peer_event * e)
431{
432    assert (msgs->peer);
433    assert (msgs->peer->msgs == msgs);
434
435    if (msgs->callback != NULL)
436        msgs->callback (msgs->peer, e, msgs->callbackData);
437}
438
439static void
440fireError (tr_peermsgs * msgs, int err)
441{
442    tr_peer_event e = TR_PEER_EVENT_INIT;
443    e.eventType = TR_PEER_ERROR;
444    e.err = err;
445    publish (msgs, &e);
446}
447
448static void
449fireGotBlock (tr_peermsgs * msgs, const struct peer_request * req)
450{
451    tr_peer_event e = TR_PEER_EVENT_INIT;
452    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
453    e.pieceIndex = req->index;
454    e.offset = req->offset;
455    e.length = req->length;
456    publish (msgs, &e);
457}
458
459static void
460fireGotRej (tr_peermsgs * msgs, const struct peer_request * req)
461{
462    tr_peer_event e = TR_PEER_EVENT_INIT;
463    e.eventType = TR_PEER_CLIENT_GOT_REJ;
464    e.pieceIndex = req->index;
465    e.offset = req->offset;
466    e.length = req->length;
467    publish (msgs, &e);
468}
469
470static void
471fireGotChoke (tr_peermsgs * msgs)
472{
473    tr_peer_event e = TR_PEER_EVENT_INIT;
474    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
475    publish (msgs, &e);
476}
477
478static void
479fireClientGotHaveAll (tr_peermsgs * msgs)
480{
481    tr_peer_event e = TR_PEER_EVENT_INIT;
482    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
483    publish (msgs, &e);
484}
485
486static void
487fireClientGotHaveNone (tr_peermsgs * msgs)
488{
489    tr_peer_event e = TR_PEER_EVENT_INIT;
490    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
491    publish (msgs, &e);
492}
493
494static void
495fireClientGotData (tr_peermsgs * msgs, uint32_t length, int wasPieceData)
496{
497    tr_peer_event e = TR_PEER_EVENT_INIT;
498
499    e.length = length;
500    e.eventType = TR_PEER_CLIENT_GOT_DATA;
501    e.wasPieceData = wasPieceData;
502    publish (msgs, &e);
503}
504
505static void
506fireClientGotSuggest (tr_peermsgs * msgs, uint32_t pieceIndex)
507{
508    tr_peer_event e = TR_PEER_EVENT_INIT;
509    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
510    e.pieceIndex = pieceIndex;
511    publish (msgs, &e);
512}
513
514static void
515fireClientGotPort (tr_peermsgs * msgs, tr_port port)
516{
517    tr_peer_event e = TR_PEER_EVENT_INIT;
518    e.eventType = TR_PEER_CLIENT_GOT_PORT;
519    e.port = port;
520    publish (msgs, &e);
521}
522
523static void
524fireClientGotAllowedFast (tr_peermsgs * msgs, uint32_t pieceIndex)
525{
526    tr_peer_event e = TR_PEER_EVENT_INIT;
527    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
528    e.pieceIndex = pieceIndex;
529    publish (msgs, &e);
530}
531
532static void
533fireClientGotBitfield (tr_peermsgs * msgs, tr_bitfield * bitfield)
534{
535    tr_peer_event e = TR_PEER_EVENT_INIT;
536    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
537    e.bitfield = bitfield;
538    publish (msgs, &e);
539}
540
541static void
542fireClientGotHave (tr_peermsgs * msgs, tr_piece_index_t index)
543{
544    tr_peer_event e = TR_PEER_EVENT_INIT;
545    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
546    e.pieceIndex = index;
547    publish (msgs, &e);
548}
549
550static void
551firePeerGotData (tr_peermsgs * msgs, uint32_t length, bool wasPieceData)
552{
553    tr_peer_event e = TR_PEER_EVENT_INIT;
554
555    e.length = length;
556    e.eventType = TR_PEER_PEER_GOT_DATA;
557    e.wasPieceData = wasPieceData;
558
559    publish (msgs, &e);
560}
561
562/**
563***  ALLOWED FAST SET
564***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
565**/
566
567#if 0
568size_t
569tr_generateAllowedSet (tr_piece_index_t * setmePieces,
570                       size_t             desiredSetSize,
571                       size_t             pieceCount,
572                       const uint8_t    * infohash,
573                       const tr_address * addr)
574{
575    size_t setSize = 0;
576
577    assert (setmePieces);
578    assert (desiredSetSize <= pieceCount);
579    assert (desiredSetSize);
580    assert (pieceCount);
581    assert (infohash);
582    assert (addr);
583
584    if (addr->type == TR_AF_INET)
585    {
586        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
587        uint8_t x[SHA_DIGEST_LENGTH];
588
589        uint32_t ui32 = ntohl (htonl (addr->addr.addr4.s_addr) & 0xffffff00);   /* (1) */
590        memcpy (w, &ui32, sizeof (uint32_t));
591        walk += sizeof (uint32_t);
592        memcpy (walk, infohash, SHA_DIGEST_LENGTH);                 /* (2) */
593        walk += SHA_DIGEST_LENGTH;
594        tr_sha1 (x, w, walk-w, NULL);                               /* (3) */
595        assert (sizeof (w) == walk-w);
596
597        while (setSize<desiredSetSize)
598        {
599            int i;
600            for (i=0; i<5 && setSize<desiredSetSize; ++i)           /* (4) */
601            {
602                size_t k;
603                uint32_t j = i * 4;                                  /* (5) */
604                uint32_t y = ntohl (* (uint32_t*)(x + j));       /* (6) */
605                uint32_t index = y % pieceCount;                     /* (7) */
606
607                for (k=0; k<setSize; ++k)                           /* (8) */
608                    if (setmePieces[k] == index)
609                        break;
610
611                if (k == setSize)
612                    setmePieces[setSize++] = index;                  /* (9) */
613            }
614
615            tr_sha1 (x, x, sizeof (x), NULL);                      /* (3) */
616        }
617    }
618
619    return setSize;
620}
621
622static void
623updateFastSet (tr_peermsgs * msgs UNUSED)
624{
625    const bool fext = tr_peerIoSupportsFEXT (msgs->peer->io);
626    const int peerIsNeedy = msgs->peer->progress < 0.10;
627
628    if (fext && peerIsNeedy && !msgs->haveFastSet)
629    {
630        size_t i;
631        const struct tr_address * addr = tr_peerIoGetAddress (msgs->peer->io, NULL);
632        const tr_info * inf = &msgs->torrent->info;
633        const size_t numwant = MIN (MAX_FAST_SET_SIZE, inf->pieceCount);
634
635        /* build the fast set */
636        msgs->fastsetSize = tr_generateAllowedSet (msgs->fastset, numwant, inf->pieceCount, inf->hash, addr);
637        msgs->haveFastSet = 1;
638
639        /* send it to the peer */
640        for (i=0; i<msgs->fastsetSize; ++i)
641            protocolSendAllowedFast (msgs, msgs->fastset[i]);
642    }
643}
644#endif
645
646/**
647***  INTEREST
648**/
649
650static void
651sendInterest (tr_peermsgs * msgs, bool clientIsInterested)
652{
653    struct evbuffer * out = msgs->outMessages;
654
655    assert (msgs);
656    assert (tr_isBool (clientIsInterested));
657
658    msgs->peer->clientIsInterested = clientIsInterested;
659    dbgmsg (msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested");
660    evbuffer_add_uint32 (out, sizeof (uint8_t));
661    evbuffer_add_uint8 (out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED);
662
663    pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
664    dbgOutMessageLen (msgs);
665}
666
667static void
668updateInterest (tr_peermsgs * msgs UNUSED)
669{
670    /* FIXME -- might need to poke the mgr on startup */
671}
672
673void
674tr_peerMsgsSetInterested (tr_peermsgs * msgs, bool clientIsInterested)
675{
676    assert (tr_isBool (clientIsInterested));
677
678    if (clientIsInterested != msgs->peer->clientIsInterested)
679        sendInterest (msgs, clientIsInterested);
680}
681
682static bool
683popNextMetadataRequest (tr_peermsgs * msgs, int * piece)
684{
685    if (msgs->peerAskedForMetadataCount == 0)
686        return false;
687
688    *piece = msgs->peerAskedForMetadata[0];
689
690    tr_removeElementFromArray (msgs->peerAskedForMetadata, 0, sizeof (int),
691                               msgs->peerAskedForMetadataCount--);
692
693    return true;
694}
695
696static bool
697popNextRequest (tr_peermsgs * msgs, struct peer_request * setme)
698{
699    if (msgs->peer->pendingReqsToClient == 0)
700        return false;
701
702    *setme = msgs->peerAskedFor[0];
703
704    tr_removeElementFromArray (msgs->peerAskedFor, 0, sizeof (struct peer_request),
705                               msgs->peer->pendingReqsToClient--);
706
707    return true;
708}
709
710static void
711cancelAllRequestsToClient (tr_peermsgs * msgs)
712{
713    struct peer_request req;
714    const int mustSendCancel = tr_peerIoSupportsFEXT (msgs->peer->io);
715
716    while (popNextRequest (msgs, &req))
717        if (mustSendCancel)
718            protocolSendReject (msgs, &req);
719}
720
721void
722tr_peerMsgsSetChoke (tr_peermsgs * msgs, bool peerIsChoked)
723{
724    const time_t now = tr_time ();
725    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
726
727    assert (msgs);
728    assert (msgs->peer);
729    assert (tr_isBool (peerIsChoked));
730
731    if (msgs->peer->chokeChangedAt > fibrillationTime)
732    {
733        dbgmsg (msgs, "Not changing choke to %d to avoid fibrillation", peerIsChoked);
734    }
735    else if (msgs->peer->peerIsChoked != peerIsChoked)
736    {
737        msgs->peer->peerIsChoked = peerIsChoked;
738        if (peerIsChoked)
739            cancelAllRequestsToClient (msgs);
740        protocolSendChoke (msgs, peerIsChoked);
741        msgs->peer->chokeChangedAt = now;
742    }
743}
744
745/**
746***
747**/
748
749void
750tr_peerMsgsHave (tr_peermsgs * msgs, uint32_t index)
751{
752    protocolSendHave (msgs, index);
753
754    /* since we have more pieces now, we might not be interested in this peer */
755    updateInterest (msgs);
756}
757
758/**
759***
760**/
761
762static bool
763reqIsValid (const tr_peermsgs * peer,
764            uint32_t            index,
765            uint32_t            offset,
766            uint32_t            length)
767{
768    return tr_torrentReqIsValid (peer->torrent, index, offset, length);
769}
770
771static bool
772requestIsValid (const tr_peermsgs * msgs, const struct peer_request * req)
773{
774    return reqIsValid (msgs, req->index, req->offset, req->length);
775}
776
777void
778tr_peerMsgsCancel (tr_peermsgs * msgs, tr_block_index_t block)
779{
780    struct peer_request req;
781/*fprintf (stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer);*/
782    blockToReq (msgs->torrent, block, &req);
783    protocolSendCancel (msgs, &req);
784}
785
786/**
787***
788**/
789
790static void
791sendLtepHandshake (tr_peermsgs * msgs)
792{
793    tr_benc val;
794    bool allow_pex;
795    bool allow_metadata_xfer;
796    struct evbuffer * payload;
797    struct evbuffer * out = msgs->outMessages;
798    const unsigned char * ipv6 = tr_globalIPv6 ();
799
800    if (msgs->clientSentLtepHandshake)
801        return;
802
803    dbgmsg (msgs, "sending an ltep handshake");
804    msgs->clientSentLtepHandshake = 1;
805
806    /* decide if we want to advertise metadata xfer support (BEP 9) */
807    if (tr_torrentIsPrivate (msgs->torrent))
808        allow_metadata_xfer = 0;
809    else
810        allow_metadata_xfer = 1;
811
812    /* decide if we want to advertise pex support */
813    if (!tr_torrentAllowsPex (msgs->torrent))
814        allow_pex = 0;
815    else if (msgs->peerSentLtepHandshake)
816        allow_pex = msgs->peerSupportsPex ? 1 : 0;
817    else
818        allow_pex = 1;
819
820    tr_bencInitDict (&val, 8);
821    tr_bencDictAddInt (&val, "e", getSession (msgs)->encryptionMode != TR_CLEAR_PREFERRED);
822    if (ipv6 != NULL)
823        tr_bencDictAddRaw (&val, "ipv6", ipv6, 16);
824    if (allow_metadata_xfer && tr_torrentHasMetadata (msgs->torrent)
825                            && (msgs->torrent->infoDictLength > 0))
826        tr_bencDictAddInt (&val, "metadata_size", msgs->torrent->infoDictLength);
827    tr_bencDictAddInt (&val, "p", tr_sessionGetPublicPeerPort (getSession (msgs)));
828    tr_bencDictAddInt (&val, "reqq", REQQ);
829    tr_bencDictAddInt (&val, "upload_only", tr_torrentIsSeed (msgs->torrent));
830    tr_bencDictAddStr (&val, "v", TR_NAME " " USERAGENT_PREFIX);
831    if (allow_metadata_xfer || allow_pex) {
832        tr_benc * m  = tr_bencDictAddDict (&val, "m", 2);
833        if (allow_metadata_xfer)
834            tr_bencDictAddInt (m, "ut_metadata", UT_METADATA_ID);
835        if (allow_pex)
836            tr_bencDictAddInt (m, "ut_pex", UT_PEX_ID);
837    }
838
839    payload = tr_bencToBuf (&val, TR_FMT_BENC);
840
841    evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
842    evbuffer_add_uint8 (out, BT_LTEP);
843    evbuffer_add_uint8 (out, LTEP_HANDSHAKE);
844    evbuffer_add_buffer (out, payload);
845    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
846    dbgOutMessageLen (msgs);
847
848    /* cleanup */
849    evbuffer_free (payload);
850    tr_bencFree (&val);
851}
852
853static void
854parseLtepHandshake (tr_peermsgs * msgs, int len, struct evbuffer * inbuf)
855{
856    int64_t   i;
857    tr_benc   val, * sub;
858    uint8_t * tmp = tr_new (uint8_t, len);
859    const uint8_t *addr;
860    size_t addr_len;
861    tr_pex pex;
862    int8_t seedProbability = -1;
863
864    memset (&pex, 0, sizeof (tr_pex));
865
866    tr_peerIoReadBytes (msgs->peer->io, inbuf, tmp, len);
867    msgs->peerSentLtepHandshake = 1;
868
869    if (tr_bencLoad (tmp, len, &val, NULL) || !tr_bencIsDict (&val))
870    {
871        dbgmsg (msgs, "GET  extended-handshake, couldn't get dictionary");
872        tr_free (tmp);
873        return;
874    }
875
876    dbgmsg (msgs, "here is the handshake: [%*.*s]", len, len,  tmp);
877
878    /* does the peer prefer encrypted connections? */
879    if (tr_bencDictFindInt (&val, "e", &i)) {
880        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
881                                              : ENCRYPTION_PREFERENCE_NO;
882        if (i)
883            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
884    }
885
886    /* check supported messages for utorrent pex */
887    msgs->peerSupportsPex = 0;
888    msgs->peerSupportsMetadataXfer = 0;
889
890    if (tr_bencDictFindDict (&val, "m", &sub)) {
891        if (tr_bencDictFindInt (sub, "ut_pex", &i)) {
892            msgs->peerSupportsPex = i != 0;
893            msgs->ut_pex_id = (uint8_t) i;
894            dbgmsg (msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id);
895        }
896        if (tr_bencDictFindInt (sub, "ut_metadata", &i)) {
897            msgs->peerSupportsMetadataXfer = i != 0;
898            msgs->ut_metadata_id = (uint8_t) i;
899            dbgmsg (msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id);
900        }
901        if (tr_bencDictFindInt (sub, "ut_holepunch", &i)) {
902            /* Mysterious µTorrent extension that we don't grok.  However,
903               it implies support for µTP, so use it to indicate that. */
904            tr_peerMgrSetUtpFailed (msgs->torrent,
905                                    tr_peerIoGetAddress (msgs->peer->io, NULL),
906                                    false);
907        }
908    }
909
910    /* look for metainfo size (BEP 9) */
911    if (tr_bencDictFindInt (&val, "metadata_size", &i)) {
912        tr_torrentSetMetadataSizeHint (msgs->torrent, i);
913        msgs->metadata_size_hint = (size_t) i;
914    }
915
916    /* look for upload_only (BEP 21) */
917    if (tr_bencDictFindInt (&val, "upload_only", &i))
918        seedProbability = i==0 ? 0 : 100;
919
920    /* get peer's listening port */
921    if (tr_bencDictFindInt (&val, "p", &i)) {
922        pex.port = htons ((uint16_t)i);
923        fireClientGotPort (msgs, pex.port);
924        dbgmsg (msgs, "peer's port is now %d", (int)i);
925    }
926
927    if (tr_peerIoIsIncoming (msgs->peer->io)
928        && tr_bencDictFindRaw (&val, "ipv4", &addr, &addr_len)
929        && (addr_len == 4))
930    {
931        pex.addr.type = TR_AF_INET;
932        memcpy (&pex.addr.addr.addr4, addr, 4);
933        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
934    }
935
936    if (tr_peerIoIsIncoming (msgs->peer->io)
937        && tr_bencDictFindRaw (&val, "ipv6", &addr, &addr_len)
938        && (addr_len == 16))
939    {
940        pex.addr.type = TR_AF_INET6;
941        memcpy (&pex.addr.addr.addr6, addr, 16);
942        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
943    }
944
945    /* get peer's maximum request queue size */
946    if (tr_bencDictFindInt (&val, "reqq", &i))
947        msgs->reqq = i;
948
949    tr_bencFree (&val);
950    tr_free (tmp);
951}
952
953static void
954parseUtMetadata (tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf)
955{
956    tr_benc dict;
957    char * msg_end;
958    char * benc_end;
959    int64_t msg_type = -1;
960    int64_t piece = -1;
961    int64_t total_size = 0;
962    uint8_t * tmp = tr_new (uint8_t, msglen);
963
964    tr_peerIoReadBytes (msgs->peer->io, inbuf, tmp, msglen);
965    msg_end = (char*)tmp + msglen;
966
967    if (!tr_bencLoad (tmp, msglen, &dict, &benc_end))
968    {
969        tr_bencDictFindInt (&dict, "msg_type", &msg_type);
970        tr_bencDictFindInt (&dict, "piece", &piece);
971        tr_bencDictFindInt (&dict, "total_size", &total_size);
972        tr_bencFree (&dict);
973    }
974
975    dbgmsg (msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
976          (int)msg_type, (int)piece, (int)total_size);
977
978    if (msg_type == METADATA_MSG_TYPE_REJECT)
979    {
980        /* NOOP */
981    }
982
983    if ((msg_type == METADATA_MSG_TYPE_DATA)
984        && (!tr_torrentHasMetadata (msgs->torrent))
985        && (msg_end - benc_end <= METADATA_PIECE_SIZE)
986        && (piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size))
987    {
988        const int pieceLen = msg_end - benc_end;
989        tr_torrentSetMetadataPiece (msgs->torrent, piece, benc_end, pieceLen);
990    }
991
992    if (msg_type == METADATA_MSG_TYPE_REQUEST)
993    {
994        if ((piece >= 0)
995            && tr_torrentHasMetadata (msgs->torrent)
996            && !tr_torrentIsPrivate (msgs->torrent)
997            && (msgs->peerAskedForMetadataCount < METADATA_REQQ))
998        {
999            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1000        }
1001        else
1002        {
1003            tr_benc tmp;
1004            struct evbuffer * payload;
1005            struct evbuffer * out = msgs->outMessages;
1006
1007            /* build the rejection message */
1008            tr_bencInitDict (&tmp, 2);
1009            tr_bencDictAddInt (&tmp, "msg_type", METADATA_MSG_TYPE_REJECT);
1010            tr_bencDictAddInt (&tmp, "piece", piece);
1011            payload = tr_bencToBuf (&tmp, TR_FMT_BENC);
1012
1013            /* write it out as a LTEP message to our outMessages buffer */
1014            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1015            evbuffer_add_uint8 (out, BT_LTEP);
1016            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1017            evbuffer_add_buffer (out, payload);
1018            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1019            dbgOutMessageLen (msgs);
1020
1021            /* cleanup */
1022            evbuffer_free (payload);
1023            tr_bencFree (&tmp);
1024        }
1025    }
1026
1027    tr_free (tmp);
1028}
1029
1030static void
1031parseUtPex (tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf)
1032{
1033    int loaded = 0;
1034    uint8_t * tmp = tr_new (uint8_t, msglen);
1035    tr_benc val;
1036    tr_torrent * tor = msgs->torrent;
1037    const uint8_t * added;
1038    size_t added_len;
1039
1040    tr_peerIoReadBytes (msgs->peer->io, inbuf, tmp, msglen);
1041
1042    if (tr_torrentAllowsPex (tor)
1043      && ((loaded = !tr_bencLoad (tmp, msglen, &val, NULL))))
1044    {
1045        if (tr_bencDictFindRaw (&val, "added", &added, &added_len))
1046        {
1047            tr_pex * pex;
1048            size_t i, n;
1049            size_t added_f_len = 0;
1050            const uint8_t * added_f = NULL;
1051
1052            tr_bencDictFindRaw (&val, "added.f", &added_f, &added_f_len);
1053            pex = tr_peerMgrCompactToPex (added, added_len, added_f, added_f_len, &n);
1054
1055            n = MIN (n, MAX_PEX_PEER_COUNT);
1056            for (i=0; i<n; ++i)
1057            {
1058                int seedProbability = -1;
1059                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1060                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1061            }
1062
1063            tr_free (pex);
1064        }
1065
1066        if (tr_bencDictFindRaw (&val, "added6", &added, &added_len))
1067        {
1068            tr_pex * pex;
1069            size_t i, n;
1070            size_t added_f_len = 0;
1071            const uint8_t * added_f = NULL;
1072
1073            tr_bencDictFindRaw (&val, "added6.f", &added_f, &added_f_len);
1074            pex = tr_peerMgrCompact6ToPex (added, added_len, added_f, added_f_len, &n);
1075
1076            n = MIN (n, MAX_PEX_PEER_COUNT);
1077            for (i=0; i<n; ++i)
1078            {
1079                int seedProbability = -1;
1080                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1081                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1082            }
1083
1084            tr_free (pex);
1085        }
1086    }
1087
1088    if (loaded)
1089        tr_bencFree (&val);
1090    tr_free (tmp);
1091}
1092
1093static void sendPex (tr_peermsgs * msgs);
1094
1095static void
1096parseLtep (tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf)
1097{
1098    uint8_t ltep_msgid;
1099
1100    tr_peerIoReadUint8 (msgs->peer->io, inbuf, &ltep_msgid);
1101    msglen--;
1102
1103    if (ltep_msgid == LTEP_HANDSHAKE)
1104    {
1105        dbgmsg (msgs, "got ltep handshake");
1106        parseLtepHandshake (msgs, msglen, inbuf);
1107        if (tr_peerIoSupportsLTEP (msgs->peer->io))
1108        {
1109            sendLtepHandshake (msgs);
1110            sendPex (msgs);
1111        }
1112    }
1113    else if (ltep_msgid == UT_PEX_ID)
1114    {
1115        dbgmsg (msgs, "got ut pex");
1116        msgs->peerSupportsPex = 1;
1117        parseUtPex (msgs, msglen, inbuf);
1118    }
1119    else if (ltep_msgid == UT_METADATA_ID)
1120    {
1121        dbgmsg (msgs, "got ut metadata");
1122        msgs->peerSupportsMetadataXfer = 1;
1123        parseUtMetadata (msgs, msglen, inbuf);
1124    }
1125    else
1126    {
1127        dbgmsg (msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid);
1128        evbuffer_drain (inbuf, msglen);
1129    }
1130}
1131
1132static int
1133readBtLength (tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1134{
1135    uint32_t len;
1136
1137    if (inlen < sizeof (len))
1138        return READ_LATER;
1139
1140    tr_peerIoReadUint32 (msgs->peer->io, inbuf, &len);
1141
1142    if (len == 0) /* peer sent us a keepalive message */
1143        dbgmsg (msgs, "got KeepAlive");
1144    else
1145    {
1146        msgs->incoming.length = len;
1147        msgs->state = AWAITING_BT_ID;
1148    }
1149
1150    return READ_NOW;
1151}
1152
1153static int readBtMessage (tr_peermsgs *, struct evbuffer *, size_t);
1154
1155static int
1156readBtId (tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1157{
1158    uint8_t id;
1159
1160    if (inlen < sizeof (uint8_t))
1161        return READ_LATER;
1162
1163    tr_peerIoReadUint8 (msgs->peer->io, inbuf, &id);
1164    msgs->incoming.id = id;
1165    dbgmsg (msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length);
1166
1167    if (id == BT_PIECE)
1168    {
1169        msgs->state = AWAITING_BT_PIECE;
1170        return READ_NOW;
1171    }
1172    else if (msgs->incoming.length != 1)
1173    {
1174        msgs->state = AWAITING_BT_MESSAGE;
1175        return READ_NOW;
1176    }
1177    else return readBtMessage (msgs, inbuf, inlen - 1);
1178}
1179
1180static void
1181updatePeerProgress (tr_peermsgs * msgs)
1182{
1183    tr_peerUpdateProgress (msgs->torrent, msgs->peer);
1184
1185    /*updateFastSet (msgs);*/
1186    updateInterest (msgs);
1187}
1188
1189static void
1190prefetchPieces (tr_peermsgs *msgs)
1191{
1192    int i;
1193
1194    if (!getSession (msgs)->isPrefetchEnabled)
1195        return;
1196
1197    /* Maintain 12 prefetched blocks per unchoked peer */
1198    for (i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i)
1199    {
1200        const struct peer_request * req = msgs->peerAskedFor + i;
1201        if (requestIsValid (msgs, req))
1202        {
1203            tr_cachePrefetchBlock (getSession (msgs)->cache, msgs->torrent, req->index, req->offset, req->length);
1204            ++msgs->prefetchCount;
1205        }
1206    }
1207}
1208
1209static void
1210peerMadeRequest (tr_peermsgs * msgs, const struct peer_request * req)
1211{
1212    const bool fext = tr_peerIoSupportsFEXT (msgs->peer->io);
1213    const int reqIsValid = requestIsValid (msgs, req);
1214    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete (&msgs->torrent->completion, req->index);
1215    const int peerIsChoked = msgs->peer->peerIsChoked;
1216
1217    int allow = false;
1218
1219    if (!reqIsValid)
1220        dbgmsg (msgs, "rejecting an invalid request.");
1221    else if (!clientHasPiece)
1222        dbgmsg (msgs, "rejecting request for a piece we don't have.");
1223    else if (peerIsChoked)
1224        dbgmsg (msgs, "rejecting request from choked peer");
1225    else if (msgs->peer->pendingReqsToClient + 1 >= REQQ)
1226        dbgmsg (msgs, "rejecting request ... reqq is full");
1227    else
1228        allow = true;
1229
1230    if (allow) {
1231        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1232        prefetchPieces (msgs);
1233    } else if (fext) {
1234        protocolSendReject (msgs, req);
1235    }
1236}
1237
1238static bool
1239messageLengthIsCorrect (const tr_peermsgs * msg, uint8_t id, uint32_t len)
1240{
1241    switch (id)
1242    {
1243        case BT_CHOKE:
1244        case BT_UNCHOKE:
1245        case BT_INTERESTED:
1246        case BT_NOT_INTERESTED:
1247        case BT_FEXT_HAVE_ALL:
1248        case BT_FEXT_HAVE_NONE:
1249            return len == 1;
1250
1251        case BT_HAVE:
1252        case BT_FEXT_SUGGEST:
1253        case BT_FEXT_ALLOWED_FAST:
1254            return len == 5;
1255
1256        case BT_BITFIELD:
1257            if (tr_torrentHasMetadata (msg->torrent))
1258                return len == (msg->torrent->info.pieceCount + 7u) / 8u + 1u;
1259            /* we don't know the piece count yet,
1260               so we can only guess whether to send true or false */
1261            if (msg->metadata_size_hint > 0)
1262                return len <= msg->metadata_size_hint;
1263            return true;
1264
1265        case BT_REQUEST:
1266        case BT_CANCEL:
1267        case BT_FEXT_REJECT:
1268            return len == 13;
1269
1270        case BT_PIECE:
1271            return len > 9 && len <= 16393;
1272
1273        case BT_PORT:
1274            return len == 3;
1275
1276        case BT_LTEP:
1277            return len >= 2;
1278
1279        default:
1280            return false;
1281    }
1282}
1283
1284static int clientGotBlock (tr_peermsgs *               msgs,
1285                           struct evbuffer *           block,
1286                           const struct peer_request * req);
1287
1288static int
1289readBtPiece (tr_peermsgs      * msgs,
1290             struct evbuffer  * inbuf,
1291             size_t             inlen,
1292             size_t           * setme_piece_bytes_read)
1293{
1294    struct peer_request * req = &msgs->incoming.blockReq;
1295
1296    assert (evbuffer_get_length (inbuf) >= inlen);
1297    dbgmsg (msgs, "In readBtPiece");
1298
1299    if (!req->length)
1300    {
1301        if (inlen < 8)
1302            return READ_LATER;
1303
1304        tr_peerIoReadUint32 (msgs->peer->io, inbuf, &req->index);
1305        tr_peerIoReadUint32 (msgs->peer->io, inbuf, &req->offset);
1306        req->length = msgs->incoming.length - 9;
1307        dbgmsg (msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length);
1308        return READ_NOW;
1309    }
1310    else
1311    {
1312        int err;
1313        size_t n;
1314        size_t nLeft;
1315        struct evbuffer * block_buffer;
1316
1317        if (msgs->incoming.block == NULL)
1318            msgs->incoming.block = evbuffer_new ();
1319        block_buffer = msgs->incoming.block;
1320
1321        /* read in another chunk of data */
1322        nLeft = req->length - evbuffer_get_length (block_buffer);
1323        n = MIN (nLeft, inlen);
1324
1325        tr_peerIoReadBytesToBuf (msgs->peer->io, inbuf, block_buffer, n);
1326
1327        fireClientGotData (msgs, n, true);
1328        *setme_piece_bytes_read += n;
1329        dbgmsg (msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1330               n, req->index, req->offset, req->length,
1331             (int)(req->length - evbuffer_get_length (block_buffer)));
1332        if (evbuffer_get_length (block_buffer) < req->length)
1333            return READ_LATER;
1334
1335        /* pass the block along... */
1336        err = clientGotBlock (msgs, block_buffer, req);
1337        evbuffer_drain (block_buffer, evbuffer_get_length (block_buffer));
1338
1339        /* cleanup */
1340        req->length = 0;
1341        msgs->state = AWAITING_BT_LENGTH;
1342        return err ? READ_ERR : READ_NOW;
1343    }
1344}
1345
1346static void updateDesiredRequestCount (tr_peermsgs * msgs);
1347
1348static int
1349readBtMessage (tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1350{
1351    uint32_t      ui32;
1352    uint32_t      msglen = msgs->incoming.length;
1353    const uint8_t id = msgs->incoming.id;
1354#ifndef NDEBUG
1355    const size_t  startBufLen = evbuffer_get_length (inbuf);
1356#endif
1357    const bool fext = tr_peerIoSupportsFEXT (msgs->peer->io);
1358
1359    --msglen; /* id length */
1360
1361    dbgmsg (msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen);
1362
1363    if (inlen < msglen)
1364        return READ_LATER;
1365
1366    if (!messageLengthIsCorrect (msgs, id, msglen + 1))
1367    {
1368        dbgmsg (msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen);
1369        fireError (msgs, EMSGSIZE);
1370        return READ_ERR;
1371    }
1372
1373    switch (id)
1374    {
1375        case BT_CHOKE:
1376            dbgmsg (msgs, "got Choke");
1377            msgs->peer->clientIsChoked = 1;
1378            if (!fext)
1379                fireGotChoke (msgs);
1380            break;
1381
1382        case BT_UNCHOKE:
1383            dbgmsg (msgs, "got Unchoke");
1384            msgs->peer->clientIsChoked = 0;
1385            updateDesiredRequestCount (msgs);
1386            break;
1387
1388        case BT_INTERESTED:
1389            dbgmsg (msgs, "got Interested");
1390            msgs->peer->peerIsInterested = 1;
1391            break;
1392
1393        case BT_NOT_INTERESTED:
1394            dbgmsg (msgs, "got Not Interested");
1395            msgs->peer->peerIsInterested = 0;
1396            break;
1397
1398        case BT_HAVE:
1399            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &ui32);
1400            dbgmsg (msgs, "got Have: %u", ui32);
1401            if (tr_torrentHasMetadata (msgs->torrent)
1402                    && (ui32 >= msgs->torrent->info.pieceCount))
1403            {
1404                fireError (msgs, ERANGE);
1405                return READ_ERR;
1406            }
1407
1408            /* a peer can send the same HAVE message twice... */
1409            if (!tr_bitfieldHas (&msgs->peer->have, ui32)) {
1410                tr_bitfieldAdd (&msgs->peer->have, ui32);
1411                fireClientGotHave (msgs, ui32);
1412            }
1413            updatePeerProgress (msgs);
1414            break;
1415
1416        case BT_BITFIELD: {
1417            uint8_t * tmp = tr_new (uint8_t, msglen);
1418            dbgmsg (msgs, "got a bitfield");
1419            tr_peerIoReadBytes (msgs->peer->io, inbuf, tmp, msglen);
1420            tr_bitfieldSetRaw (&msgs->peer->have, tmp, msglen, tr_torrentHasMetadata (msgs->torrent));
1421            fireClientGotBitfield (msgs, &msgs->peer->have);
1422            updatePeerProgress (msgs);
1423            tr_free (tmp);
1424            break;
1425        }
1426
1427        case BT_REQUEST:
1428        {
1429            struct peer_request r;
1430            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.index);
1431            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.offset);
1432            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.length);
1433            dbgmsg (msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length);
1434            peerMadeRequest (msgs, &r);
1435            break;
1436        }
1437
1438        case BT_CANCEL:
1439        {
1440            int i;
1441            struct peer_request r;
1442            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.index);
1443            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.offset);
1444            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.length);
1445            tr_historyAdd (&msgs->peer->cancelsSentToClient, tr_time (), 1);
1446            dbgmsg (msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length);
1447
1448            for (i=0; i<msgs->peer->pendingReqsToClient; ++i) {
1449                const struct peer_request * req = msgs->peerAskedFor + i;
1450                if ((req->index == r.index) && (req->offset == r.offset) && (req->length == r.length))
1451                    break;
1452            }
1453
1454            if (i < msgs->peer->pendingReqsToClient)
1455                tr_removeElementFromArray (msgs->peerAskedFor, i, sizeof (struct peer_request),
1456                                           msgs->peer->pendingReqsToClient--);
1457            break;
1458        }
1459
1460        case BT_PIECE:
1461            assert (0); /* handled elsewhere! */
1462            break;
1463
1464        case BT_PORT:
1465            dbgmsg (msgs, "Got a BT_PORT");
1466            tr_peerIoReadUint16 (msgs->peer->io, inbuf, &msgs->peer->dht_port);
1467            if (msgs->peer->dht_port > 0)
1468                tr_dhtAddNode (getSession (msgs),
1469                               tr_peerAddress (msgs->peer),
1470                               msgs->peer->dht_port, 0);
1471            break;
1472
1473        case BT_FEXT_SUGGEST:
1474            dbgmsg (msgs, "Got a BT_FEXT_SUGGEST");
1475            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &ui32);
1476            if (fext)
1477                fireClientGotSuggest (msgs, ui32);
1478            else {
1479                fireError (msgs, EMSGSIZE);
1480                return READ_ERR;
1481            }
1482            break;
1483
1484        case BT_FEXT_ALLOWED_FAST:
1485            dbgmsg (msgs, "Got a BT_FEXT_ALLOWED_FAST");
1486            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &ui32);
1487            if (fext)
1488                fireClientGotAllowedFast (msgs, ui32);
1489            else {
1490                fireError (msgs, EMSGSIZE);
1491                return READ_ERR;
1492            }
1493            break;
1494
1495        case BT_FEXT_HAVE_ALL:
1496            dbgmsg (msgs, "Got a BT_FEXT_HAVE_ALL");
1497            if (fext) {
1498                tr_bitfieldSetHasAll (&msgs->peer->have);
1499assert (tr_bitfieldHasAll (&msgs->peer->have));
1500                fireClientGotHaveAll (msgs);
1501                updatePeerProgress (msgs);
1502            } else {
1503                fireError (msgs, EMSGSIZE);
1504                return READ_ERR;
1505            }
1506            break;
1507
1508        case BT_FEXT_HAVE_NONE:
1509            dbgmsg (msgs, "Got a BT_FEXT_HAVE_NONE");
1510            if (fext) {
1511                tr_bitfieldSetHasNone (&msgs->peer->have);
1512                fireClientGotHaveNone (msgs);
1513                updatePeerProgress (msgs);
1514            } else {
1515                fireError (msgs, EMSGSIZE);
1516                return READ_ERR;
1517            }
1518            break;
1519
1520        case BT_FEXT_REJECT:
1521        {
1522            struct peer_request r;
1523            dbgmsg (msgs, "Got a BT_FEXT_REJECT");
1524            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.index);
1525            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.offset);
1526            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.length);
1527            if (fext)
1528                fireGotRej (msgs, &r);
1529            else {
1530                fireError (msgs, EMSGSIZE);
1531                return READ_ERR;
1532            }
1533            break;
1534        }
1535
1536        case BT_LTEP:
1537            dbgmsg (msgs, "Got a BT_LTEP");
1538            parseLtep (msgs, msglen, inbuf);
1539            break;
1540
1541        default:
1542            dbgmsg (msgs, "peer sent us an UNKNOWN: %d", (int)id);
1543            tr_peerIoDrain (msgs->peer->io, inbuf, msglen);
1544            break;
1545    }
1546
1547    assert (msglen + 1 == msgs->incoming.length);
1548    assert (evbuffer_get_length (inbuf) == startBufLen - msglen);
1549
1550    msgs->state = AWAITING_BT_LENGTH;
1551    return READ_NOW;
1552}
1553
1554/* returns 0 on success, or an errno on failure */
1555static int
1556clientGotBlock (tr_peermsgs                * msgs,
1557                struct evbuffer            * data,
1558                const struct peer_request  * req)
1559{
1560    int err;
1561    tr_torrent * tor = msgs->torrent;
1562    const tr_block_index_t block = _tr_block (tor, req->index, req->offset);
1563
1564    assert (msgs);
1565    assert (req);
1566
1567    if (req->length != tr_torBlockCountBytes (msgs->torrent, block)) {
1568        dbgmsg (msgs, "wrong block size -- expected %u, got %d",
1569                tr_torBlockCountBytes (msgs->torrent, block), req->length);
1570        return EMSGSIZE;
1571    }
1572
1573    dbgmsg (msgs, "got block %u:%u->%u", req->index, req->offset, req->length);
1574
1575    if (!tr_peerMgrDidPeerRequest (msgs->torrent, msgs->peer, block)) {
1576        dbgmsg (msgs, "we didn't ask for this message...");
1577        return 0;
1578    }
1579    if (tr_cpPieceIsComplete (&msgs->torrent->completion, req->index)) {
1580        dbgmsg (msgs, "we did ask for this message, but the piece is already complete...");
1581        return 0;
1582    }
1583
1584    /**
1585    ***  Save the block
1586    **/
1587
1588    if ((err = tr_cacheWriteBlock (getSession (msgs)->cache, tor, req->index, req->offset, req->length, data)))
1589        return err;
1590
1591    tr_bitfieldAdd (&msgs->peer->blame, req->index);
1592    fireGotBlock (msgs, req);
1593    return 0;
1594}
1595
1596static int peerPulse (void * vmsgs);
1597
1598static void
1599didWrite (tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs)
1600{
1601    tr_peermsgs * msgs = vmsgs;
1602    firePeerGotData (msgs, bytesWritten, wasPieceData);
1603
1604    if (tr_isPeerIo (io) && io->userData)
1605        peerPulse (msgs);
1606}
1607
1608static ReadState
1609canRead (tr_peerIo * io, void * vmsgs, size_t * piece)
1610{
1611    ReadState         ret;
1612    tr_peermsgs *     msgs = vmsgs;
1613    struct evbuffer * in = tr_peerIoGetReadBuffer (io);
1614    const size_t      inlen = evbuffer_get_length (in);
1615
1616    dbgmsg (msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state);
1617
1618    if (!inlen)
1619    {
1620        ret = READ_LATER;
1621    }
1622    else if (msgs->state == AWAITING_BT_PIECE)
1623    {
1624        ret = readBtPiece (msgs, in, inlen, piece);
1625    }
1626    else switch (msgs->state)
1627    {
1628        case AWAITING_BT_LENGTH:
1629            ret = readBtLength (msgs, in, inlen); break;
1630
1631        case AWAITING_BT_ID:
1632            ret = readBtId   (msgs, in, inlen); break;
1633
1634        case AWAITING_BT_MESSAGE:
1635            ret = readBtMessage (msgs, in, inlen); break;
1636
1637        default:
1638            ret = READ_ERR;
1639            assert (0);
1640    }
1641
1642    dbgmsg (msgs, "canRead: ret is %d", (int)ret);
1643
1644    /* log the raw data that was read */
1645    if ((ret != READ_ERR) && (evbuffer_get_length (in) != inlen))
1646        fireClientGotData (msgs, inlen - evbuffer_get_length (in), false);
1647
1648    return ret;
1649}
1650
1651int
1652tr_peerMsgsIsReadingBlock (const tr_peermsgs * msgs, tr_block_index_t block)
1653{
1654    if (msgs->state != AWAITING_BT_PIECE)
1655        return false;
1656
1657    return block == _tr_block (msgs->torrent,
1658                               msgs->incoming.blockReq.index,
1659                               msgs->incoming.blockReq.offset);
1660}
1661
1662/**
1663***
1664**/
1665
1666static void
1667updateDesiredRequestCount (tr_peermsgs * msgs)
1668{
1669    const tr_torrent * const torrent = msgs->torrent;
1670
1671    /* there are lots of reasons we might not want to request any blocks... */
1672    if (tr_torrentIsSeed (torrent) || !tr_torrentHasMetadata (torrent)
1673                                    || msgs->peer->clientIsChoked
1674                                    || !msgs->peer->clientIsInterested)
1675    {
1676        msgs->desiredRequestCount = 0;
1677    }
1678    else
1679    {
1680        int estimatedBlocksInPeriod;
1681        unsigned int rate_Bps;
1682        unsigned int irate_Bps;
1683        const int floor = 4;
1684        const int seconds = REQUEST_BUF_SECS;
1685        const uint64_t now = tr_time_msec ();
1686
1687        /* Get the rate limit we should use.
1688         * FIXME: this needs to consider all the other peers as well... */
1689        rate_Bps = tr_peerGetPieceSpeed_Bps (msgs->peer, now, TR_PEER_TO_CLIENT);
1690        if (tr_torrentUsesSpeedLimit (torrent, TR_PEER_TO_CLIENT))
1691            rate_Bps = MIN (rate_Bps, tr_torrentGetSpeedLimit_Bps (torrent, TR_PEER_TO_CLIENT));
1692
1693        /* honor the session limits, if enabled */
1694        if (tr_torrentUsesSessionLimits (torrent) &&
1695            tr_sessionGetActiveSpeedLimit_Bps (torrent->session, TR_PEER_TO_CLIENT, &irate_Bps))
1696                rate_Bps = MIN (rate_Bps, irate_Bps);
1697
1698        /* use this desired rate to figure out how
1699         * many requests we should send to this peer */
1700        estimatedBlocksInPeriod = (rate_Bps * seconds) / torrent->blockSize;
1701        msgs->desiredRequestCount = MAX (floor, estimatedBlocksInPeriod);
1702
1703        /* honor the peer's maximum request count, if specified */
1704        if (msgs->reqq > 0)
1705            if (msgs->desiredRequestCount > msgs->reqq)
1706                msgs->desiredRequestCount = msgs->reqq;
1707    }
1708}
1709
1710static void
1711updateMetadataRequests (tr_peermsgs * msgs, time_t now)
1712{
1713    int piece;
1714
1715    if (msgs->peerSupportsMetadataXfer
1716        && tr_torrentGetNextMetadataRequest (msgs->torrent, now, &piece))
1717    {
1718        tr_benc tmp;
1719        struct evbuffer * payload;
1720        struct evbuffer * out = msgs->outMessages;
1721
1722        /* build the data message */
1723        tr_bencInitDict (&tmp, 3);
1724        tr_bencDictAddInt (&tmp, "msg_type", METADATA_MSG_TYPE_REQUEST);
1725        tr_bencDictAddInt (&tmp, "piece", piece);
1726        payload = tr_bencToBuf (&tmp, TR_FMT_BENC);
1727
1728        dbgmsg (msgs, "requesting metadata piece #%d", piece);
1729
1730        /* write it out as a LTEP message to our outMessages buffer */
1731        evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1732        evbuffer_add_uint8 (out, BT_LTEP);
1733        evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1734        evbuffer_add_buffer (out, payload);
1735        pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1736        dbgOutMessageLen (msgs);
1737
1738        /* cleanup */
1739        evbuffer_free (payload);
1740        tr_bencFree (&tmp);
1741    }
1742}
1743
1744static void
1745updateBlockRequests (tr_peermsgs * msgs)
1746{
1747    if (tr_torrentIsPieceTransferAllowed (msgs->torrent, TR_PEER_TO_CLIENT)
1748        && (msgs->desiredRequestCount > 0)
1749        && (msgs->peer->pendingReqsToPeer <= (msgs->desiredRequestCount * 0.66)))
1750    {
1751        int i;
1752        int n;
1753        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1754        tr_block_index_t * blocks = tr_new (tr_block_index_t, numwant);
1755
1756        tr_peerMgrGetNextRequests (msgs->torrent, msgs->peer, numwant, blocks, &n, false);
1757
1758        for (i=0; i<n; ++i)
1759        {
1760            struct peer_request req;
1761            blockToReq (msgs->torrent, blocks[i], &req);
1762            protocolSendRequest (msgs, &req);
1763        }
1764
1765        tr_free (blocks);
1766    }
1767}
1768
1769static size_t
1770fillOutputBuffer (tr_peermsgs * msgs, time_t now)
1771{
1772    int piece;
1773    size_t bytesWritten = 0;
1774    struct peer_request req;
1775    const bool haveMessages = evbuffer_get_length (msgs->outMessages) != 0;
1776    const bool fext = tr_peerIoSupportsFEXT (msgs->peer->io);
1777
1778    /**
1779    ***  Protocol messages
1780    **/
1781
1782    if (haveMessages && !msgs->outMessagesBatchedAt) /* fresh batch */
1783    {
1784        dbgmsg (msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length (msgs->outMessages));
1785        msgs->outMessagesBatchedAt = now;
1786    }
1787    else if (haveMessages && ((now - msgs->outMessagesBatchedAt) >= msgs->outMessagesBatchPeriod))
1788    {
1789        const size_t len = evbuffer_get_length (msgs->outMessages);
1790        /* flush the protocol messages */
1791        dbgmsg (msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len);
1792        tr_peerIoWriteBuf (msgs->peer->io, msgs->outMessages, false);
1793        msgs->clientSentAnythingAt = now;
1794        msgs->outMessagesBatchedAt = 0;
1795        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1796        bytesWritten +=  len;
1797    }
1798
1799    /**
1800    ***  Metadata Pieces
1801    **/
1802
1803    if ((tr_peerIoGetWriteBufferSpace (msgs->peer->io, now) >= METADATA_PIECE_SIZE)
1804        && popNextMetadataRequest (msgs, &piece))
1805    {
1806        char * data;
1807        int dataLen;
1808        bool ok = false;
1809
1810        data = tr_torrentGetMetadataPiece (msgs->torrent, piece, &dataLen);
1811        if ((dataLen > 0) && (data != NULL))
1812        {
1813            tr_benc tmp;
1814            struct evbuffer * payload;
1815            struct evbuffer * out = msgs->outMessages;
1816
1817            /* build the data message */
1818            tr_bencInitDict (&tmp, 3);
1819            tr_bencDictAddInt (&tmp, "msg_type", METADATA_MSG_TYPE_DATA);
1820            tr_bencDictAddInt (&tmp, "piece", piece);
1821            tr_bencDictAddInt (&tmp, "total_size", msgs->torrent->infoDictLength);
1822            payload = tr_bencToBuf (&tmp, TR_FMT_BENC);
1823
1824            /* write it out as a LTEP message to our outMessages buffer */
1825            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload) + dataLen);
1826            evbuffer_add_uint8 (out, BT_LTEP);
1827            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1828            evbuffer_add_buffer (out, payload);
1829            evbuffer_add     (out, data, dataLen);
1830            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1831            dbgOutMessageLen (msgs);
1832
1833            evbuffer_free (payload);
1834            tr_bencFree (&tmp);
1835            tr_free (data);
1836
1837            ok = true;
1838        }
1839
1840        if (!ok) /* send a rejection message */
1841        {
1842            tr_benc tmp;
1843            struct evbuffer * payload;
1844            struct evbuffer * out = msgs->outMessages;
1845
1846            /* build the rejection message */
1847            tr_bencInitDict (&tmp, 2);
1848            tr_bencDictAddInt (&tmp, "msg_type", METADATA_MSG_TYPE_REJECT);
1849            tr_bencDictAddInt (&tmp, "piece", piece);
1850            payload = tr_bencToBuf (&tmp, TR_FMT_BENC);
1851
1852            /* write it out as a LTEP message to our outMessages buffer */
1853            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1854            evbuffer_add_uint8 (out, BT_LTEP);
1855            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1856            evbuffer_add_buffer (out, payload);
1857            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1858            dbgOutMessageLen (msgs);
1859
1860            evbuffer_free (payload);
1861            tr_bencFree (&tmp);
1862        }
1863    }
1864
1865    /**
1866    ***  Data Blocks
1867    **/
1868
1869    if ((tr_peerIoGetWriteBufferSpace (msgs->peer->io, now) >= msgs->torrent->blockSize)
1870        && popNextRequest (msgs, &req))
1871    {
1872        --msgs->prefetchCount;
1873
1874        if (requestIsValid (msgs, &req)
1875            && tr_cpPieceIsComplete (&msgs->torrent->completion, req.index))
1876        {
1877            int err;
1878            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1879            struct evbuffer * out;
1880            struct evbuffer_iovec iovec[1];
1881
1882            out = evbuffer_new ();
1883            evbuffer_expand (out, msglen);
1884
1885            evbuffer_add_uint32 (out, sizeof (uint8_t) + 2 * sizeof (uint32_t) + req.length);
1886            evbuffer_add_uint8 (out, BT_PIECE);
1887            evbuffer_add_uint32 (out, req.index);
1888            evbuffer_add_uint32 (out, req.offset);
1889
1890            evbuffer_reserve_space (out, req.length, iovec, 1);
1891            err = tr_cacheReadBlock (getSession (msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base);
1892            iovec[0].iov_len = req.length;
1893            evbuffer_commit_space (out, iovec, 1);
1894
1895            /* check the piece if it needs checking... */
1896            if (!err && tr_torrentPieceNeedsCheck (msgs->torrent, req.index))
1897                if ((err = !tr_torrentCheckPiece (msgs->torrent, req.index)))
1898                    tr_torrentSetLocalError (msgs->torrent, _("Please Verify Local Data! Piece #%zu is corrupt."), (size_t)req.index);
1899
1900            if (err)
1901            {
1902                if (fext)
1903                    protocolSendReject (msgs, &req);
1904            }
1905            else
1906            {
1907                const size_t n = evbuffer_get_length (out);
1908                dbgmsg (msgs, "sending block %u:%u->%u", req.index, req.offset, req.length);
1909                assert (n == msglen);
1910                tr_peerIoWriteBuf (msgs->peer->io, out, true);
1911                bytesWritten += n;
1912                msgs->clientSentAnythingAt = now;
1913                tr_historyAdd (&msgs->peer->blocksSentToPeer, tr_time (), 1);
1914            }
1915
1916            evbuffer_free (out);
1917
1918            if (err)
1919            {
1920                bytesWritten = 0;
1921                msgs = NULL;
1922            }
1923        }
1924        else if (fext) /* peer needs a reject message */
1925        {
1926            protocolSendReject (msgs, &req);
1927        }
1928
1929        if (msgs != NULL)
1930            prefetchPieces (msgs);
1931    }
1932
1933    /**
1934    ***  Keepalive
1935    **/
1936
1937    if ((msgs != NULL)
1938        && (msgs->clientSentAnythingAt != 0)
1939        && ((now - msgs->clientSentAnythingAt) > KEEPALIVE_INTERVAL_SECS))
1940    {
1941        dbgmsg (msgs, "sending a keepalive message");
1942        evbuffer_add_uint32 (msgs->outMessages, 0);
1943        pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
1944    }
1945
1946    return bytesWritten;
1947}
1948
1949static int
1950peerPulse (void * vmsgs)
1951{
1952    tr_peermsgs * msgs = vmsgs;
1953    const time_t  now = tr_time ();
1954
1955    if (tr_isPeerIo (msgs->peer->io)) {
1956        updateDesiredRequestCount (msgs);
1957        updateBlockRequests (msgs);
1958        updateMetadataRequests (msgs, now);
1959    }
1960
1961    for (;;)
1962        if (fillOutputBuffer (msgs, now) < 1)
1963            break;
1964
1965    return true; /* loop forever */
1966}
1967
1968void
1969tr_peerMsgsPulse (tr_peermsgs * msgs)
1970{
1971    if (msgs != NULL)
1972        peerPulse (msgs);
1973}
1974
1975static void
1976gotError (tr_peerIo * io UNUSED, short what, void * vmsgs)
1977{
1978    if (what & BEV_EVENT_TIMEOUT)
1979        dbgmsg (vmsgs, "libevent got a timeout, what=%hd", what);
1980    if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR))
1981        dbgmsg (vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1982               what, errno, tr_strerror (errno));
1983    fireError (vmsgs, ENOTCONN);
1984}
1985
1986static void
1987sendBitfield (tr_peermsgs * msgs)
1988{
1989    void * bytes;
1990    size_t byte_count = 0;
1991    struct evbuffer * out = msgs->outMessages;
1992
1993    assert (tr_torrentHasMetadata (msgs->torrent));
1994
1995    bytes = tr_cpCreatePieceBitfield (&msgs->torrent->completion, &byte_count);
1996    evbuffer_add_uint32 (out, sizeof (uint8_t) + byte_count);
1997    evbuffer_add_uint8 (out, BT_BITFIELD);
1998    evbuffer_add     (out, bytes, byte_count);
1999    dbgmsg (msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length (out));
2000    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2001
2002    tr_free (bytes);
2003}
2004
2005static void
2006tellPeerWhatWeHave (tr_peermsgs * msgs)
2007{
2008    const bool fext = tr_peerIoSupportsFEXT (msgs->peer->io);
2009
2010    if (fext && tr_cpHasAll (&msgs->torrent->completion))
2011    {
2012        protocolSendHaveAll (msgs);
2013    }
2014    else if (fext && tr_cpHasNone (&msgs->torrent->completion))
2015    {
2016        protocolSendHaveNone (msgs);
2017    }
2018    else if (!tr_cpHasNone (&msgs->torrent->completion))
2019    {
2020        sendBitfield (msgs);
2021    }
2022}
2023
2024/**
2025***
2026**/
2027
2028/* some peers give us error messages if we send
2029   more than this many peers in a single pex message
2030   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2031#define MAX_PEX_ADDED 50
2032#define MAX_PEX_DROPPED 50
2033
2034typedef struct
2035{
2036    tr_pex *  added;
2037    tr_pex *  dropped;
2038    tr_pex *  elements;
2039    int       addedCount;
2040    int       droppedCount;
2041    int       elementCount;
2042}
2043PexDiffs;
2044
2045static void
2046pexAddedCb (void * vpex, void * userData)
2047{
2048    PexDiffs * diffs = userData;
2049    tr_pex *   pex = vpex;
2050
2051    if (diffs->addedCount < MAX_PEX_ADDED)
2052    {
2053        diffs->added[diffs->addedCount++] = *pex;
2054        diffs->elements[diffs->elementCount++] = *pex;
2055    }
2056}
2057
2058static inline void
2059pexDroppedCb (void * vpex, void * userData)
2060{
2061    PexDiffs * diffs = userData;
2062    tr_pex *   pex = vpex;
2063
2064    if (diffs->droppedCount < MAX_PEX_DROPPED)
2065    {
2066        diffs->dropped[diffs->droppedCount++] = *pex;
2067    }
2068}
2069
2070static inline void
2071pexElementCb (void * vpex, void * userData)
2072{
2073    PexDiffs * diffs = userData;
2074    tr_pex * pex = vpex;
2075
2076    diffs->elements[diffs->elementCount++] = *pex;
2077}
2078
2079typedef void (tr_set_func)(void * element, void * userData);
2080
2081/**
2082 * @brief find the differences and commonalities in two sorted sets
2083 * @param a the first set
2084 * @param aCount the number of elements in the set 'a'
2085 * @param b the second set
2086 * @param bCount the number of elements in the set 'b'
2087 * @param compare the sorting method for both sets
2088 * @param elementSize the sizeof the element in the two sorted sets
2089 * @param in_a called for items in set 'a' but not set 'b'
2090 * @param in_b called for items in set 'b' but not set 'a'
2091 * @param in_both called for items that are in both sets
2092 * @param userData user data passed along to in_a, in_b, and in_both
2093 */
2094static void
2095tr_set_compare (const void * va, size_t aCount,
2096                const void * vb, size_t bCount,
2097                int compare (const void * a, const void * b),
2098                size_t elementSize,
2099                tr_set_func in_a_cb,
2100                tr_set_func in_b_cb,
2101                tr_set_func in_both_cb,
2102                void * userData)
2103{
2104    const uint8_t * a = va;
2105    const uint8_t * b = vb;
2106    const uint8_t * aend = a + elementSize * aCount;
2107    const uint8_t * bend = b + elementSize * bCount;
2108
2109    while (a != aend || b != bend)
2110    {
2111        if (a == aend)
2112        {
2113          (*in_b_cb)((void*)b, userData);
2114            b += elementSize;
2115        }
2116        else if (b == bend)
2117        {
2118          (*in_a_cb)((void*)a, userData);
2119            a += elementSize;
2120        }
2121        else
2122        {
2123            const int val = (*compare)(a, b);
2124
2125            if (!val)
2126            {
2127              (*in_both_cb)((void*)a, userData);
2128                a += elementSize;
2129                b += elementSize;
2130            }
2131            else if (val < 0)
2132            {
2133              (*in_a_cb)((void*)a, userData);
2134                a += elementSize;
2135            }
2136            else if (val > 0)
2137            {
2138              (*in_b_cb)((void*)b, userData);
2139                b += elementSize;
2140            }
2141        }
2142    }
2143}
2144
2145
2146static void
2147sendPex (tr_peermsgs * msgs)
2148{
2149    if (msgs->peerSupportsPex && tr_torrentAllowsPex (msgs->torrent))
2150    {
2151        PexDiffs diffs;
2152        PexDiffs diffs6;
2153        tr_pex * newPex = NULL;
2154        tr_pex * newPex6 = NULL;
2155        const int newCount = tr_peerMgrGetPeers (msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2156        const int newCount6 = tr_peerMgrGetPeers (msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2157
2158        /* build the diffs */
2159        diffs.added = tr_new (tr_pex, newCount);
2160        diffs.addedCount = 0;
2161        diffs.dropped = tr_new (tr_pex, msgs->pexCount);
2162        diffs.droppedCount = 0;
2163        diffs.elements = tr_new (tr_pex, newCount + msgs->pexCount);
2164        diffs.elementCount = 0;
2165        tr_set_compare (msgs->pex, msgs->pexCount,
2166                        newPex, newCount,
2167                        tr_pexCompare, sizeof (tr_pex),
2168                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs);
2169        diffs6.added = tr_new (tr_pex, newCount6);
2170        diffs6.addedCount = 0;
2171        diffs6.dropped = tr_new (tr_pex, msgs->pexCount6);
2172        diffs6.droppedCount = 0;
2173        diffs6.elements = tr_new (tr_pex, newCount6 + msgs->pexCount6);
2174        diffs6.elementCount = 0;
2175        tr_set_compare (msgs->pex6, msgs->pexCount6,
2176                        newPex6, newCount6,
2177                        tr_pexCompare, sizeof (tr_pex),
2178                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6);
2179        dbgmsg (
2180            msgs,
2181            "pex: old peer count %d+%d, new peer count %d+%d, "
2182            "added %d+%d, removed %d+%d",
2183            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2184            diffs.addedCount, diffs6.addedCount,
2185            diffs.droppedCount, diffs6.droppedCount);
2186
2187        if (!diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2188            !diffs6.droppedCount)
2189        {
2190            tr_free (diffs.elements);
2191            tr_free (diffs6.elements);
2192        }
2193        else
2194        {
2195            int  i;
2196            tr_benc val;
2197            uint8_t * tmp, *walk;
2198            struct evbuffer * payload;
2199            struct evbuffer * out = msgs->outMessages;
2200
2201            /* update peer */
2202            tr_free (msgs->pex);
2203            msgs->pex = diffs.elements;
2204            msgs->pexCount = diffs.elementCount;
2205            tr_free (msgs->pex6);
2206            msgs->pex6 = diffs6.elements;
2207            msgs->pexCount6 = diffs6.elementCount;
2208
2209            /* build the pex payload */
2210            tr_bencInitDict (&val, 3); /* ipv6 support: left as 3:
2211                                         * speed vs. likelihood? */
2212
2213            if (diffs.addedCount > 0)
2214            {
2215                /* "added" */
2216                tmp = walk = tr_new (uint8_t, diffs.addedCount * 6);
2217                for (i = 0; i < diffs.addedCount; ++i) {
2218                    memcpy (walk, &diffs.added[i].addr.addr, 4); walk += 4;
2219                    memcpy (walk, &diffs.added[i].port, 2); walk += 2;
2220                }
2221                assert ((walk - tmp) == diffs.addedCount * 6);
2222                tr_bencDictAddRaw (&val, "added", tmp, walk - tmp);
2223                tr_free (tmp);
2224
2225                /* "added.f"
2226                 * unset each holepunch flag because we don't support it. */
2227                tmp = walk = tr_new (uint8_t, diffs.addedCount);
2228                for (i = 0; i < diffs.addedCount; ++i)
2229                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2230                assert ((walk - tmp) == diffs.addedCount);
2231                tr_bencDictAddRaw (&val, "added.f", tmp, walk - tmp);
2232                tr_free (tmp);
2233            }
2234
2235            if (diffs.droppedCount > 0)
2236            {
2237                /* "dropped" */
2238                tmp = walk = tr_new (uint8_t, diffs.droppedCount * 6);
2239                for (i = 0; i < diffs.droppedCount; ++i) {
2240                    memcpy (walk, &diffs.dropped[i].addr.addr, 4); walk += 4;
2241                    memcpy (walk, &diffs.dropped[i].port, 2); walk += 2;
2242                }
2243                assert ((walk - tmp) == diffs.droppedCount * 6);
2244                tr_bencDictAddRaw (&val, "dropped", tmp, walk - tmp);
2245                tr_free (tmp);
2246            }
2247
2248            if (diffs6.addedCount > 0)
2249            {
2250                /* "added6" */
2251                tmp = walk = tr_new (uint8_t, diffs6.addedCount * 18);
2252                for (i = 0; i < diffs6.addedCount; ++i) {
2253                    memcpy (walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16);
2254                    walk += 16;
2255                    memcpy (walk, &diffs6.added[i].port, 2);
2256                    walk += 2;
2257                }
2258                assert ((walk - tmp) == diffs6.addedCount * 18);
2259                tr_bencDictAddRaw (&val, "added6", tmp, walk - tmp);
2260                tr_free (tmp);
2261
2262                /* "added6.f"
2263                 * unset each holepunch flag because we don't support it. */
2264                tmp = walk = tr_new (uint8_t, diffs6.addedCount);
2265                for (i = 0; i < diffs6.addedCount; ++i)
2266                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2267                assert ((walk - tmp) == diffs6.addedCount);
2268                tr_bencDictAddRaw (&val, "added6.f", tmp, walk - tmp);
2269                tr_free (tmp);
2270            }
2271
2272            if (diffs6.droppedCount > 0)
2273            {
2274                /* "dropped6" */
2275                tmp = walk = tr_new (uint8_t, diffs6.droppedCount * 18);
2276                for (i = 0; i < diffs6.droppedCount; ++i) {
2277                    memcpy (walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16);
2278                    walk += 16;
2279                    memcpy (walk, &diffs6.dropped[i].port, 2);
2280                    walk += 2;
2281                }
2282                assert ((walk - tmp) == diffs6.droppedCount * 18);
2283                tr_bencDictAddRaw (&val, "dropped6", tmp, walk - tmp);
2284                tr_free (tmp);
2285            }
2286
2287            /* write the pex message */
2288            payload = tr_bencToBuf (&val, TR_FMT_BENC);
2289            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
2290            evbuffer_add_uint8 (out, BT_LTEP);
2291            evbuffer_add_uint8 (out, msgs->ut_pex_id);
2292            evbuffer_add_buffer (out, payload);
2293            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
2294            dbgmsg (msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length (out));
2295            dbgOutMessageLen (msgs);
2296
2297            evbuffer_free (payload);
2298            tr_bencFree (&val);
2299        }
2300
2301        /* cleanup */
2302        tr_free (diffs.added);
2303        tr_free (diffs.dropped);
2304        tr_free (newPex);
2305        tr_free (diffs6.added);
2306        tr_free (diffs6.dropped);
2307        tr_free (newPex6);
2308
2309        /*msgs->clientSentPexAt = tr_time ();*/
2310    }
2311}
2312
2313static void
2314pexPulse (int foo UNUSED, short bar UNUSED, void * vmsgs)
2315{
2316    struct tr_peermsgs * msgs = vmsgs;
2317
2318    sendPex (msgs);
2319
2320    assert (msgs->pexTimer != NULL);
2321    tr_timerAdd (msgs->pexTimer, PEX_INTERVAL_SECS, 0);
2322}
2323
2324/**
2325***
2326**/
2327
2328tr_peermsgs*
2329tr_peerMsgsNew (struct tr_torrent    * torrent,
2330                struct tr_peer       * peer,
2331                tr_peer_callback     * callback,
2332                void                 * callbackData)
2333{
2334    tr_peermsgs * m;
2335
2336    assert (peer);
2337    assert (peer->io);
2338
2339    m = tr_new0 (tr_peermsgs, 1);
2340    m->callback = callback;
2341    m->callbackData = callbackData;
2342    m->peer = peer;
2343    m->torrent = torrent;
2344    m->peer->clientIsChoked = 1;
2345    m->peer->peerIsChoked = 1;
2346    m->peer->clientIsInterested = 0;
2347    m->peer->peerIsInterested = 0;
2348    m->state = AWAITING_BT_LENGTH;
2349    m->outMessages = evbuffer_new ();
2350    m->outMessagesBatchedAt = 0;
2351    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2352    peer->msgs = m;
2353
2354    if (tr_torrentAllowsPex (torrent)) {
2355        m->pexTimer = evtimer_new (torrent->session->event_base, pexPulse, m);
2356        tr_timerAdd (m->pexTimer, PEX_INTERVAL_SECS, 0);
2357    }
2358
2359    if (tr_peerIoSupportsUTP (peer->io)) {
2360        const tr_address * addr = tr_peerIoGetAddress (peer->io, NULL);
2361        tr_peerMgrSetUtpSupported (torrent, addr);
2362        tr_peerMgrSetUtpFailed (torrent, addr, false);
2363    }
2364
2365    if (tr_peerIoSupportsLTEP (peer->io))
2366        sendLtepHandshake (m);
2367
2368    tellPeerWhatWeHave (m);
2369
2370    if (tr_dhtEnabled (torrent->session) && tr_peerIoSupportsDHT (peer->io))
2371    {
2372        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2373        const struct tr_address *addr = tr_peerIoGetAddress (peer->io, NULL);
2374        if (addr->type == TR_AF_INET || tr_globalIPv6 ()) {
2375            protocolSendPort (m, tr_dhtPort (torrent->session));
2376        }
2377    }
2378
2379    tr_peerIoSetIOFuncs (m->peer->io, canRead, didWrite, gotError, m);
2380    updateDesiredRequestCount (m);
2381
2382    return m;
2383}
2384
2385void
2386tr_peerMsgsFree (tr_peermsgs* msgs)
2387{
2388    if (msgs)
2389    {
2390        if (msgs->pexTimer != NULL)
2391            event_free (msgs->pexTimer);
2392
2393        if (msgs->incoming.block != NULL)
2394            evbuffer_free (msgs->incoming.block);
2395
2396        evbuffer_free (msgs->outMessages);
2397        tr_free (msgs->pex6);
2398        tr_free (msgs->pex);
2399
2400        memset (msgs, ~0, sizeof (tr_peermsgs));
2401        tr_free (msgs);
2402    }
2403}
Note: See TracBrowser for help on using the repository browser.