source: trunk/libtransmission/peer-msgs.c @ 13948

Last change on this file since 13948 was 13948, checked in by jordan, 8 years ago

move tr_peer.encryptionPreference to tr_peermsgs.encryptionPreference

  • Property svn:keywords set to Date Rev Author Id
File size: 71.2 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2 (b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 13948 2013-02-03 22:45:32Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdarg.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <event2/buffer.h>
20#include <event2/bufferevent.h>
21#include <event2/event.h>
22
23#include "transmission.h"
24#include "cache.h"
25#include "completion.h"
26#include "crypto.h" /* tr_sha1 () */
27#include "log.h"
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-msgs.h"
31#include "session.h"
32#include "torrent.h"
33#include "torrent-magnet.h"
34#include "tr-dht.h"
35#include "utils.h"
36#include "variant.h"
37#include "version.h"
38
39/**
40***
41**/
42
43enum
44{
45    BT_CHOKE                = 0,
46    BT_UNCHOKE              = 1,
47    BT_INTERESTED           = 2,
48    BT_NOT_INTERESTED       = 3,
49    BT_HAVE                 = 4,
50    BT_BITFIELD             = 5,
51    BT_REQUEST              = 6,
52    BT_PIECE                = 7,
53    BT_CANCEL               = 8,
54    BT_PORT                 = 9,
55
56    BT_FEXT_SUGGEST         = 13,
57    BT_FEXT_HAVE_ALL        = 14,
58    BT_FEXT_HAVE_NONE       = 15,
59    BT_FEXT_REJECT          = 16,
60    BT_FEXT_ALLOWED_FAST    = 17,
61
62    BT_LTEP                 = 20,
63
64    LTEP_HANDSHAKE          = 0,
65
66    UT_PEX_ID               = 1,
67    UT_METADATA_ID          = 3,
68
69    MAX_PEX_PEER_COUNT      = 50,
70
71    MIN_CHOKE_PERIOD_SEC    = 10,
72
73    /* idle seconds before we send a keepalive */
74    KEEPALIVE_INTERVAL_SECS = 100,
75
76    PEX_INTERVAL_SECS       = 90, /* sec between sendPex () calls */
77
78    REQQ                    = 512,
79
80    METADATA_REQQ           = 64,
81
82    /* used in lowering the outMessages queue period */
83    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
84    HIGH_PRIORITY_INTERVAL_SECS = 2,
85    LOW_PRIORITY_INTERVAL_SECS = 10,
86
87    /* number of pieces we'll allow in our fast set */
88    MAX_FAST_SET_SIZE = 3,
89
90    /* how many blocks to keep prefetched per peer */
91    PREFETCH_SIZE = 18,
92
93    /* defined in BEP #9 */
94    METADATA_MSG_TYPE_REQUEST = 0,
95    METADATA_MSG_TYPE_DATA = 1,
96    METADATA_MSG_TYPE_REJECT = 2
97};
98
99enum
100{
101    AWAITING_BT_LENGTH,
102    AWAITING_BT_ID,
103    AWAITING_BT_MESSAGE,
104    AWAITING_BT_PIECE
105};
106
107typedef enum
108{
109  ENCRYPTION_PREFERENCE_UNKNOWN,
110  ENCRYPTION_PREFERENCE_YES,
111  ENCRYPTION_PREFERENCE_NO
112}
113encryption_preference_t;
114
115
116
117/**
118***
119**/
120
121struct peer_request
122{
123    uint32_t    index;
124    uint32_t    offset;
125    uint32_t    length;
126};
127
128static void
129blockToReq (const tr_torrent     * tor,
130            tr_block_index_t       block,
131            struct peer_request  * setme)
132{
133    tr_torrentGetBlockLocation (tor, block, &setme->index,
134                                            &setme->offset,
135                                            &setme->length);
136}
137
138/**
139***
140**/
141
142/* this is raw, unchanged data from the peer regarding
143 * the current message that it's sending us. */
144struct tr_incoming
145{
146    uint8_t                id;
147    uint32_t               length; /* includes the +1 for id length */
148    struct peer_request    blockReq; /* metadata for incoming blocks */
149    struct evbuffer *      block; /* piece data for incoming blocks */
150};
151
152/**
153 * Low-level communication state information about a connected peer.
154 *
155 * This structure remembers the low-level protocol states that we're
156 * in with this peer, such as active requests, pex messages, and so on.
157 * Its fields are all private to peer-msgs.c.
158 *
159 * Data not directly involved with sending & receiving messages is
160 * stored in tr_peer, where it can be accessed by both peermsgs and
161 * the peer manager.
162 *
163 * @see struct peer_atom
164 * @see tr_peer
165 */
166struct tr_peermsgs
167{
168    bool            peerSupportsPex;
169    bool            peerSupportsMetadataXfer;
170    bool            clientSentLtepHandshake;
171    bool            peerSentLtepHandshake;
172
173    /*bool          haveFastSet;*/
174
175    int             desiredRequestCount;
176
177    int             prefetchCount;
178
179    /* how long the outMessages batch should be allowed to grow before
180     * it's flushed -- some messages (like requests >:) should be sent
181     * very quickly; others aren't as urgent. */
182    int8_t          outMessagesBatchPeriod;
183
184    uint8_t         state;
185    uint8_t         ut_pex_id;
186    uint8_t         ut_metadata_id;
187    uint16_t        pexCount;
188    uint16_t        pexCount6;
189
190    tr_port         dht_port;
191
192    encryption_preference_t  encryption_preference;
193
194    size_t                   metadata_size_hint;
195#if 0
196    size_t                 fastsetSize;
197    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
198#endif
199
200    tr_peer *              peer;
201
202    tr_torrent *           torrent;
203
204    tr_peer_callback      * callback;
205    void                  * callbackData;
206
207    struct evbuffer *      outMessages; /* all the non-piece messages */
208
209    struct peer_request    peerAskedFor[REQQ];
210
211    int                    peerAskedForMetadata[METADATA_REQQ];
212    int                    peerAskedForMetadataCount;
213
214    tr_pex               * pex;
215    tr_pex               * pex6;
216
217    /*time_t                 clientSentPexAt;*/
218    time_t                 clientSentAnythingAt;
219
220    /* when we started batching the outMessages */
221    time_t                outMessagesBatchedAt;
222
223    struct tr_incoming    incoming;
224
225    /* if the peer supports the Extension Protocol in BEP 10 and
226       supplied a reqq argument, it's stored here. Otherwise, the
227       value is zero and should be ignored. */
228    int64_t               reqq;
229
230    struct event        * pexTimer;
231};
232
233/**
234***
235**/
236
237static inline tr_session*
238getSession (struct tr_peermsgs * msgs)
239{
240    return msgs->torrent->session;
241}
242
243/**
244***
245**/
246
247static void
248myDebug (const char * file, int line,
249         const struct tr_peermsgs * msgs,
250         const char * fmt, ...)
251{
252    FILE * fp = tr_logGetFile ();
253
254    if (fp)
255    {
256        va_list           args;
257        char              timestr[64];
258        struct evbuffer * buf = evbuffer_new ();
259        char *            base = tr_basename (file);
260        char *            message;
261
262        evbuffer_add_printf (buf, "[%s] %s - %s [%s]: ",
263                             tr_logGetTimeStr (timestr, sizeof (timestr)),
264                             tr_torrentName (msgs->torrent),
265                             tr_peerIoGetAddrStr (msgs->peer->io),
266                             tr_quark_get_string (msgs->peer->client, NULL));
267        va_start (args, fmt);
268        evbuffer_add_vprintf (buf, fmt, args);
269        va_end (args);
270        evbuffer_add_printf (buf, " (%s:%d)\n", base, line);
271
272        message = evbuffer_free_to_str (buf);
273        fputs (message, fp);
274
275        tr_free (base);
276        tr_free (message);
277    }
278}
279
280#define dbgmsg(msgs, ...) \
281  do \
282    { \
283      if (tr_logGetDeepEnabled ()) \
284        myDebug (__FILE__, __LINE__, msgs, __VA_ARGS__); \
285    } \
286  while (0)
287
288/**
289***
290**/
291
292static void
293pokeBatchPeriod (tr_peermsgs * msgs, int interval)
294{
295    if (msgs->outMessagesBatchPeriod > interval)
296    {
297        msgs->outMessagesBatchPeriod = interval;
298        dbgmsg (msgs, "lowering batch interval to %d seconds", interval);
299    }
300}
301
302static void
303dbgOutMessageLen (tr_peermsgs * msgs)
304{
305    dbgmsg (msgs, "outMessage size is now %zu", evbuffer_get_length (msgs->outMessages));
306}
307
308static void
309protocolSendReject (tr_peermsgs * msgs, const struct peer_request * req)
310{
311    struct evbuffer * out = msgs->outMessages;
312
313    assert (tr_peerIoSupportsFEXT (msgs->peer->io));
314
315    evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
316    evbuffer_add_uint8 (out, BT_FEXT_REJECT);
317    evbuffer_add_uint32 (out, req->index);
318    evbuffer_add_uint32 (out, req->offset);
319    evbuffer_add_uint32 (out, req->length);
320
321    dbgmsg (msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length);
322    dbgOutMessageLen (msgs);
323}
324
325static void
326protocolSendRequest (tr_peermsgs * msgs, const struct peer_request * req)
327{
328    struct evbuffer * out = msgs->outMessages;
329
330    evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
331    evbuffer_add_uint8 (out, BT_REQUEST);
332    evbuffer_add_uint32 (out, req->index);
333    evbuffer_add_uint32 (out, req->offset);
334    evbuffer_add_uint32 (out, req->length);
335
336    dbgmsg (msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length);
337    dbgOutMessageLen (msgs);
338    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
339}
340
341static void
342protocolSendCancel (tr_peermsgs * msgs, const struct peer_request * req)
343{
344    struct evbuffer * out = msgs->outMessages;
345
346    evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
347    evbuffer_add_uint8 (out, BT_CANCEL);
348    evbuffer_add_uint32 (out, req->index);
349    evbuffer_add_uint32 (out, req->offset);
350    evbuffer_add_uint32 (out, req->length);
351
352    dbgmsg (msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length);
353    dbgOutMessageLen (msgs);
354    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
355}
356
357static void
358protocolSendPort (tr_peermsgs *msgs, uint16_t port)
359{
360    struct evbuffer * out = msgs->outMessages;
361
362    dbgmsg (msgs, "sending Port %u", port);
363    evbuffer_add_uint32 (out, 3);
364    evbuffer_add_uint8 (out, BT_PORT);
365    evbuffer_add_uint16 (out, port);
366}
367
368static void
369protocolSendHave (tr_peermsgs * msgs, uint32_t index)
370{
371    struct evbuffer * out = msgs->outMessages;
372
373    evbuffer_add_uint32 (out, sizeof (uint8_t) + sizeof (uint32_t));
374    evbuffer_add_uint8 (out, BT_HAVE);
375    evbuffer_add_uint32 (out, index);
376
377    dbgmsg (msgs, "sending Have %u", index);
378    dbgOutMessageLen (msgs);
379    pokeBatchPeriod (msgs, LOW_PRIORITY_INTERVAL_SECS);
380}
381
382#if 0
383static void
384protocolSendAllowedFast (tr_peermsgs * msgs, uint32_t pieceIndex)
385{
386    tr_peerIo       * io  = msgs->peer->io;
387    struct evbuffer * out = msgs->outMessages;
388
389    assert (tr_peerIoSupportsFEXT (msgs->peer->io));
390
391    evbuffer_add_uint32 (io, out, sizeof (uint8_t) + sizeof (uint32_t));
392    evbuffer_add_uint8 (io, out, BT_FEXT_ALLOWED_FAST);
393    evbuffer_add_uint32 (io, out, pieceIndex);
394
395    dbgmsg (msgs, "sending Allowed Fast %u...", pieceIndex);
396    dbgOutMessageLen (msgs);
397}
398#endif
399
400static void
401protocolSendChoke (tr_peermsgs * msgs, int choke)
402{
403    struct evbuffer * out = msgs->outMessages;
404
405    evbuffer_add_uint32 (out, sizeof (uint8_t));
406    evbuffer_add_uint8 (out, choke ? BT_CHOKE : BT_UNCHOKE);
407
408    dbgmsg (msgs, "sending %s...", choke ? "Choke" : "Unchoke");
409    dbgOutMessageLen (msgs);
410    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
411}
412
413static void
414protocolSendHaveAll (tr_peermsgs * msgs)
415{
416    struct evbuffer * out = msgs->outMessages;
417
418    assert (tr_peerIoSupportsFEXT (msgs->peer->io));
419
420    evbuffer_add_uint32 (out, sizeof (uint8_t));
421    evbuffer_add_uint8 (out, BT_FEXT_HAVE_ALL);
422
423    dbgmsg (msgs, "sending HAVE_ALL...");
424    dbgOutMessageLen (msgs);
425    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
426}
427
428static void
429protocolSendHaveNone (tr_peermsgs * msgs)
430{
431    struct evbuffer * out = msgs->outMessages;
432
433    assert (tr_peerIoSupportsFEXT (msgs->peer->io));
434
435    evbuffer_add_uint32 (out, sizeof (uint8_t));
436    evbuffer_add_uint8 (out, BT_FEXT_HAVE_NONE);
437
438    dbgmsg (msgs, "sending HAVE_NONE...");
439    dbgOutMessageLen (msgs);
440    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
441}
442
443/**
444***  EVENTS
445**/
446
447static void
448publish (tr_peermsgs * msgs, tr_peer_event * e)
449{
450    assert (msgs->peer);
451    assert (msgs->peer->msgs == msgs);
452
453    if (msgs->callback != NULL)
454        msgs->callback (msgs->peer, e, msgs->callbackData);
455}
456
457static void
458fireError (tr_peermsgs * msgs, int err)
459{
460    tr_peer_event e = TR_PEER_EVENT_INIT;
461    e.eventType = TR_PEER_ERROR;
462    e.err = err;
463    publish (msgs, &e);
464}
465
466static void
467fireGotBlock (tr_peermsgs * msgs, const struct peer_request * req)
468{
469    tr_peer_event e = TR_PEER_EVENT_INIT;
470    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
471    e.pieceIndex = req->index;
472    e.offset = req->offset;
473    e.length = req->length;
474    publish (msgs, &e);
475}
476
477static void
478fireGotRej (tr_peermsgs * msgs, const struct peer_request * req)
479{
480    tr_peer_event e = TR_PEER_EVENT_INIT;
481    e.eventType = TR_PEER_CLIENT_GOT_REJ;
482    e.pieceIndex = req->index;
483    e.offset = req->offset;
484    e.length = req->length;
485    publish (msgs, &e);
486}
487
488static void
489fireGotChoke (tr_peermsgs * msgs)
490{
491    tr_peer_event e = TR_PEER_EVENT_INIT;
492    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
493    publish (msgs, &e);
494}
495
496static void
497fireClientGotHaveAll (tr_peermsgs * msgs)
498{
499    tr_peer_event e = TR_PEER_EVENT_INIT;
500    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
501    publish (msgs, &e);
502}
503
504static void
505fireClientGotHaveNone (tr_peermsgs * msgs)
506{
507    tr_peer_event e = TR_PEER_EVENT_INIT;
508    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
509    publish (msgs, &e);
510}
511
512static void
513fireClientGotPieceData (tr_peermsgs * msgs, uint32_t length)
514{
515    tr_peer_event e = TR_PEER_EVENT_INIT;
516    e.length = length;
517    e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA;
518    publish (msgs, &e);
519}
520
521static void
522firePeerGotPieceData (tr_peermsgs * msgs, uint32_t length)
523{
524    tr_peer_event e = TR_PEER_EVENT_INIT;
525    e.length = length;
526    e.eventType = TR_PEER_PEER_GOT_PIECE_DATA;
527    publish (msgs, &e);
528}
529
530static void
531fireClientGotSuggest (tr_peermsgs * msgs, uint32_t pieceIndex)
532{
533    tr_peer_event e = TR_PEER_EVENT_INIT;
534    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
535    e.pieceIndex = pieceIndex;
536    publish (msgs, &e);
537}
538
539static void
540fireClientGotPort (tr_peermsgs * msgs, tr_port port)
541{
542    tr_peer_event e = TR_PEER_EVENT_INIT;
543    e.eventType = TR_PEER_CLIENT_GOT_PORT;
544    e.port = port;
545    publish (msgs, &e);
546}
547
548static void
549fireClientGotAllowedFast (tr_peermsgs * msgs, uint32_t pieceIndex)
550{
551    tr_peer_event e = TR_PEER_EVENT_INIT;
552    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
553    e.pieceIndex = pieceIndex;
554    publish (msgs, &e);
555}
556
557static void
558fireClientGotBitfield (tr_peermsgs * msgs, tr_bitfield * bitfield)
559{
560    tr_peer_event e = TR_PEER_EVENT_INIT;
561    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
562    e.bitfield = bitfield;
563    publish (msgs, &e);
564}
565
566static void
567fireClientGotHave (tr_peermsgs * msgs, tr_piece_index_t index)
568{
569    tr_peer_event e = TR_PEER_EVENT_INIT;
570    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
571    e.pieceIndex = index;
572    publish (msgs, &e);
573}
574
575
576/**
577***  ALLOWED FAST SET
578***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
579**/
580
581#if 0
582size_t
583tr_generateAllowedSet (tr_piece_index_t * setmePieces,
584                       size_t             desiredSetSize,
585                       size_t             pieceCount,
586                       const uint8_t    * infohash,
587                       const tr_address * addr)
588{
589    size_t setSize = 0;
590
591    assert (setmePieces);
592    assert (desiredSetSize <= pieceCount);
593    assert (desiredSetSize);
594    assert (pieceCount);
595    assert (infohash);
596    assert (addr);
597
598    if (addr->type == TR_AF_INET)
599    {
600        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
601        uint8_t x[SHA_DIGEST_LENGTH];
602
603        uint32_t ui32 = ntohl (htonl (addr->addr.addr4.s_addr) & 0xffffff00);   /* (1) */
604        memcpy (w, &ui32, sizeof (uint32_t));
605        walk += sizeof (uint32_t);
606        memcpy (walk, infohash, SHA_DIGEST_LENGTH);                 /* (2) */
607        walk += SHA_DIGEST_LENGTH;
608        tr_sha1 (x, w, walk-w, NULL);                               /* (3) */
609        assert (sizeof (w) == walk-w);
610
611        while (setSize<desiredSetSize)
612        {
613            int i;
614            for (i=0; i<5 && setSize<desiredSetSize; ++i)           /* (4) */
615            {
616                size_t k;
617                uint32_t j = i * 4;                                  /* (5) */
618                uint32_t y = ntohl (* (uint32_t*)(x + j));       /* (6) */
619                uint32_t index = y % pieceCount;                     /* (7) */
620
621                for (k=0; k<setSize; ++k)                           /* (8) */
622                    if (setmePieces[k] == index)
623                        break;
624
625                if (k == setSize)
626                    setmePieces[setSize++] = index;                  /* (9) */
627            }
628
629            tr_sha1 (x, x, sizeof (x), NULL);                      /* (3) */
630        }
631    }
632
633    return setSize;
634}
635
636static void
637updateFastSet (tr_peermsgs * msgs UNUSED)
638{
639    const bool fext = tr_peerIoSupportsFEXT (msgs->peer->io);
640    const int peerIsNeedy = msgs->peer->progress < 0.10;
641
642    if (fext && peerIsNeedy && !msgs->haveFastSet)
643    {
644        size_t i;
645        const struct tr_address * addr = tr_peerIoGetAddress (msgs->peer->io, NULL);
646        const tr_info * inf = &msgs->torrent->info;
647        const size_t numwant = MIN (MAX_FAST_SET_SIZE, inf->pieceCount);
648
649        /* build the fast set */
650        msgs->fastsetSize = tr_generateAllowedSet (msgs->fastset, numwant, inf->pieceCount, inf->hash, addr);
651        msgs->haveFastSet = 1;
652
653        /* send it to the peer */
654        for (i=0; i<msgs->fastsetSize; ++i)
655            protocolSendAllowedFast (msgs, msgs->fastset[i]);
656    }
657}
658#endif
659
660/**
661***  INTEREST
662**/
663
664static void
665sendInterest (tr_peermsgs * msgs, bool clientIsInterested)
666{
667    struct evbuffer * out = msgs->outMessages;
668
669    assert (msgs);
670    assert (tr_isBool (clientIsInterested));
671
672    msgs->peer->clientIsInterested = clientIsInterested;
673    dbgmsg (msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested");
674    evbuffer_add_uint32 (out, sizeof (uint8_t));
675    evbuffer_add_uint8 (out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED);
676
677    pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
678    dbgOutMessageLen (msgs);
679}
680
681static void
682updateInterest (tr_peermsgs * msgs UNUSED)
683{
684    /* FIXME -- might need to poke the mgr on startup */
685}
686
687void
688tr_peerMsgsSetInterested (tr_peermsgs * msgs, bool clientIsInterested)
689{
690    assert (tr_isBool (clientIsInterested));
691
692    if (clientIsInterested != msgs->peer->clientIsInterested)
693        sendInterest (msgs, clientIsInterested);
694}
695
696static bool
697popNextMetadataRequest (tr_peermsgs * msgs, int * piece)
698{
699    if (msgs->peerAskedForMetadataCount == 0)
700        return false;
701
702    *piece = msgs->peerAskedForMetadata[0];
703
704    tr_removeElementFromArray (msgs->peerAskedForMetadata, 0, sizeof (int),
705                               msgs->peerAskedForMetadataCount--);
706
707    return true;
708}
709
710static bool
711popNextRequest (tr_peermsgs * msgs, struct peer_request * setme)
712{
713    if (msgs->peer->pendingReqsToClient == 0)
714        return false;
715
716    *setme = msgs->peerAskedFor[0];
717
718    tr_removeElementFromArray (msgs->peerAskedFor, 0, sizeof (struct peer_request),
719                               msgs->peer->pendingReqsToClient--);
720
721    return true;
722}
723
724static void
725cancelAllRequestsToClient (tr_peermsgs * msgs)
726{
727    struct peer_request req;
728    const int mustSendCancel = tr_peerIoSupportsFEXT (msgs->peer->io);
729
730    while (popNextRequest (msgs, &req))
731        if (mustSendCancel)
732            protocolSendReject (msgs, &req);
733}
734
735void
736tr_peerMsgsSetChoke (tr_peermsgs * msgs, bool peerIsChoked)
737{
738    const time_t now = tr_time ();
739    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
740
741    assert (msgs);
742    assert (msgs->peer);
743    assert (tr_isBool (peerIsChoked));
744
745    if (msgs->peer->chokeChangedAt > fibrillationTime)
746    {
747        dbgmsg (msgs, "Not changing choke to %d to avoid fibrillation", peerIsChoked);
748    }
749    else if (msgs->peer->peerIsChoked != peerIsChoked)
750    {
751        msgs->peer->peerIsChoked = peerIsChoked;
752        if (peerIsChoked)
753            cancelAllRequestsToClient (msgs);
754        protocolSendChoke (msgs, peerIsChoked);
755        msgs->peer->chokeChangedAt = now;
756    }
757}
758
759/**
760***
761**/
762
763void
764tr_peerMsgsHave (tr_peermsgs * msgs, uint32_t index)
765{
766    protocolSendHave (msgs, index);
767
768    /* since we have more pieces now, we might not be interested in this peer */
769    updateInterest (msgs);
770}
771
772/**
773***
774**/
775
776static bool
777reqIsValid (const tr_peermsgs * peer,
778            uint32_t            index,
779            uint32_t            offset,
780            uint32_t            length)
781{
782    return tr_torrentReqIsValid (peer->torrent, index, offset, length);
783}
784
785static bool
786requestIsValid (const tr_peermsgs * msgs, const struct peer_request * req)
787{
788    return reqIsValid (msgs, req->index, req->offset, req->length);
789}
790
791void
792tr_peerMsgsCancel (tr_peermsgs * msgs, tr_block_index_t block)
793{
794    struct peer_request req;
795/*fprintf (stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer);*/
796    blockToReq (msgs->torrent, block, &req);
797    protocolSendCancel (msgs, &req);
798}
799
800/**
801***
802**/
803
804static void
805sendLtepHandshake (tr_peermsgs * msgs)
806{
807    tr_variant val;
808    bool allow_pex;
809    bool allow_metadata_xfer;
810    struct evbuffer * payload;
811    struct evbuffer * out = msgs->outMessages;
812    const unsigned char * ipv6 = tr_globalIPv6 ();
813    static tr_quark version_quark = 0;
814
815    if (msgs->clientSentLtepHandshake)
816        return;
817
818    if (!version_quark)
819      version_quark = tr_quark_new (TR_NAME " " USERAGENT_PREFIX, -1);
820
821    dbgmsg (msgs, "sending an ltep handshake");
822    msgs->clientSentLtepHandshake = 1;
823
824    /* decide if we want to advertise metadata xfer support (BEP 9) */
825    if (tr_torrentIsPrivate (msgs->torrent))
826        allow_metadata_xfer = 0;
827    else
828        allow_metadata_xfer = 1;
829
830    /* decide if we want to advertise pex support */
831    if (!tr_torrentAllowsPex (msgs->torrent))
832        allow_pex = 0;
833    else if (msgs->peerSentLtepHandshake)
834        allow_pex = msgs->peerSupportsPex ? 1 : 0;
835    else
836        allow_pex = 1;
837
838    tr_variantInitDict (&val, 8);
839    tr_variantDictAddInt (&val, TR_KEY_e, getSession (msgs)->encryptionMode != TR_CLEAR_PREFERRED);
840    if (ipv6 != NULL)
841        tr_variantDictAddRaw (&val, TR_KEY_ipv6, ipv6, 16);
842    if (allow_metadata_xfer && tr_torrentHasMetadata (msgs->torrent)
843                            && (msgs->torrent->infoDictLength > 0))
844        tr_variantDictAddInt (&val, TR_KEY_metadata_size, msgs->torrent->infoDictLength);
845    tr_variantDictAddInt (&val, TR_KEY_p, tr_sessionGetPublicPeerPort (getSession (msgs)));
846    tr_variantDictAddInt (&val, TR_KEY_reqq, REQQ);
847    tr_variantDictAddInt (&val, TR_KEY_upload_only, tr_torrentIsSeed (msgs->torrent));
848    tr_variantDictAddQuark (&val, TR_KEY_v, version_quark);
849    if (allow_metadata_xfer || allow_pex) {
850        tr_variant * m  = tr_variantDictAddDict (&val, TR_KEY_m, 2);
851        if (allow_metadata_xfer)
852            tr_variantDictAddInt (m, TR_KEY_ut_metadata, UT_METADATA_ID);
853        if (allow_pex)
854            tr_variantDictAddInt (m, TR_KEY_ut_pex, UT_PEX_ID);
855    }
856
857    payload = tr_variantToBuf (&val, TR_VARIANT_FMT_BENC);
858
859    evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
860    evbuffer_add_uint8 (out, BT_LTEP);
861    evbuffer_add_uint8 (out, LTEP_HANDSHAKE);
862    evbuffer_add_buffer (out, payload);
863    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
864    dbgOutMessageLen (msgs);
865
866    /* cleanup */
867    evbuffer_free (payload);
868    tr_variantFree (&val);
869}
870
871static void
872parseLtepHandshake (tr_peermsgs * msgs, int len, struct evbuffer * inbuf)
873{
874    int64_t   i;
875    tr_variant   val, * sub;
876    uint8_t * tmp = tr_new (uint8_t, len);
877    const uint8_t *addr;
878    size_t addr_len;
879    tr_pex pex;
880    int8_t seedProbability = -1;
881
882    memset (&pex, 0, sizeof (tr_pex));
883
884    tr_peerIoReadBytes (msgs->peer->io, inbuf, tmp, len);
885    msgs->peerSentLtepHandshake = 1;
886
887    if (tr_variantFromBenc (&val, tmp, len) || !tr_variantIsDict (&val))
888    {
889        dbgmsg (msgs, "GET  extended-handshake, couldn't get dictionary");
890        tr_free (tmp);
891        return;
892    }
893
894    dbgmsg (msgs, "here is the handshake: [%*.*s]", len, len,  tmp);
895
896    /* does the peer prefer encrypted connections? */
897    if (tr_variantDictFindInt (&val, TR_KEY_e, &i)) {
898        msgs->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
899                                        : ENCRYPTION_PREFERENCE_NO;
900        if (i)
901            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
902    }
903
904    /* check supported messages for utorrent pex */
905    msgs->peerSupportsPex = 0;
906    msgs->peerSupportsMetadataXfer = 0;
907
908    if (tr_variantDictFindDict (&val, TR_KEY_m, &sub)) {
909        if (tr_variantDictFindInt (sub, TR_KEY_ut_pex, &i)) {
910            msgs->peerSupportsPex = i != 0;
911            msgs->ut_pex_id = (uint8_t) i;
912            dbgmsg (msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id);
913        }
914        if (tr_variantDictFindInt (sub, TR_KEY_ut_metadata, &i)) {
915            msgs->peerSupportsMetadataXfer = i != 0;
916            msgs->ut_metadata_id = (uint8_t) i;
917            dbgmsg (msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id);
918        }
919        if (tr_variantDictFindInt (sub, TR_KEY_ut_holepunch, &i)) {
920            /* Mysterious µTorrent extension that we don't grok.  However,
921               it implies support for µTP, so use it to indicate that. */
922            tr_peerMgrSetUtpFailed (msgs->torrent,
923                                    tr_peerIoGetAddress (msgs->peer->io, NULL),
924                                    false);
925        }
926    }
927
928    /* look for metainfo size (BEP 9) */
929    if (tr_variantDictFindInt (&val, TR_KEY_metadata_size, &i)) {
930        tr_torrentSetMetadataSizeHint (msgs->torrent, i);
931        msgs->metadata_size_hint = (size_t) i;
932    }
933
934    /* look for upload_only (BEP 21) */
935    if (tr_variantDictFindInt (&val, TR_KEY_upload_only, &i))
936        seedProbability = i==0 ? 0 : 100;
937
938    /* get peer's listening port */
939    if (tr_variantDictFindInt (&val, TR_KEY_p, &i)) {
940        pex.port = htons ((uint16_t)i);
941        fireClientGotPort (msgs, pex.port);
942        dbgmsg (msgs, "peer's port is now %d", (int)i);
943    }
944
945    if (tr_peerIoIsIncoming (msgs->peer->io)
946        && tr_variantDictFindRaw (&val, TR_KEY_ipv4, &addr, &addr_len)
947        && (addr_len == 4))
948    {
949        pex.addr.type = TR_AF_INET;
950        memcpy (&pex.addr.addr.addr4, addr, 4);
951        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
952    }
953
954    if (tr_peerIoIsIncoming (msgs->peer->io)
955        && tr_variantDictFindRaw (&val, TR_KEY_ipv6, &addr, &addr_len)
956        && (addr_len == 16))
957    {
958        pex.addr.type = TR_AF_INET6;
959        memcpy (&pex.addr.addr.addr6, addr, 16);
960        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
961    }
962
963    /* get peer's maximum request queue size */
964    if (tr_variantDictFindInt (&val, TR_KEY_reqq, &i))
965        msgs->reqq = i;
966
967    tr_variantFree (&val);
968    tr_free (tmp);
969}
970
971static void
972parseUtMetadata (tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf)
973{
974    tr_variant dict;
975    char * msg_end;
976    const char * benc_end;
977    int64_t msg_type = -1;
978    int64_t piece = -1;
979    int64_t total_size = 0;
980    uint8_t * tmp = tr_new (uint8_t, msglen);
981
982    tr_peerIoReadBytes (msgs->peer->io, inbuf, tmp, msglen);
983    msg_end = (char*)tmp + msglen;
984
985    if (!tr_variantFromBencFull (&dict, tmp, msglen, NULL, &benc_end))
986    {
987        tr_variantDictFindInt (&dict, TR_KEY_msg_type, &msg_type);
988        tr_variantDictFindInt (&dict, TR_KEY_piece, &piece);
989        tr_variantDictFindInt (&dict, TR_KEY_total_size, &total_size);
990        tr_variantFree (&dict);
991    }
992
993    dbgmsg (msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
994          (int)msg_type, (int)piece, (int)total_size);
995
996    if (msg_type == METADATA_MSG_TYPE_REJECT)
997    {
998        /* NOOP */
999    }
1000
1001    if ((msg_type == METADATA_MSG_TYPE_DATA)
1002        && (!tr_torrentHasMetadata (msgs->torrent))
1003        && (msg_end - benc_end <= METADATA_PIECE_SIZE)
1004        && (piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size))
1005    {
1006        const int pieceLen = msg_end - benc_end;
1007        tr_torrentSetMetadataPiece (msgs->torrent, piece, benc_end, pieceLen);
1008    }
1009
1010    if (msg_type == METADATA_MSG_TYPE_REQUEST)
1011    {
1012        if ((piece >= 0)
1013            && tr_torrentHasMetadata (msgs->torrent)
1014            && !tr_torrentIsPrivate (msgs->torrent)
1015            && (msgs->peerAskedForMetadataCount < METADATA_REQQ))
1016        {
1017            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1018        }
1019        else
1020        {
1021            tr_variant tmp;
1022            struct evbuffer * payload;
1023            struct evbuffer * out = msgs->outMessages;
1024
1025            /* build the rejection message */
1026            tr_variantInitDict (&tmp, 2);
1027            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
1028            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1029            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1030
1031            /* write it out as a LTEP message to our outMessages buffer */
1032            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1033            evbuffer_add_uint8 (out, BT_LTEP);
1034            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1035            evbuffer_add_buffer (out, payload);
1036            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1037            dbgOutMessageLen (msgs);
1038
1039            /* cleanup */
1040            evbuffer_free (payload);
1041            tr_variantFree (&tmp);
1042        }
1043    }
1044
1045    tr_free (tmp);
1046}
1047
1048static void
1049parseUtPex (tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf)
1050{
1051    int loaded = 0;
1052    uint8_t * tmp = tr_new (uint8_t, msglen);
1053    tr_variant val;
1054    tr_torrent * tor = msgs->torrent;
1055    const uint8_t * added;
1056    size_t added_len;
1057
1058    tr_peerIoReadBytes (msgs->peer->io, inbuf, tmp, msglen);
1059
1060    if (tr_torrentAllowsPex (tor)
1061      && ((loaded = !tr_variantFromBenc (&val, tmp, msglen))))
1062    {
1063        if (tr_variantDictFindRaw (&val, TR_KEY_added, &added, &added_len))
1064        {
1065            tr_pex * pex;
1066            size_t i, n;
1067            size_t added_f_len = 0;
1068            const uint8_t * added_f = NULL;
1069
1070            tr_variantDictFindRaw (&val, TR_KEY_added_f, &added_f, &added_f_len);
1071            pex = tr_peerMgrCompactToPex (added, added_len, added_f, added_f_len, &n);
1072
1073            n = MIN (n, MAX_PEX_PEER_COUNT);
1074            for (i=0; i<n; ++i)
1075            {
1076                int seedProbability = -1;
1077                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1078                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1079            }
1080
1081            tr_free (pex);
1082        }
1083
1084        if (tr_variantDictFindRaw (&val, TR_KEY_added6, &added, &added_len))
1085        {
1086            tr_pex * pex;
1087            size_t i, n;
1088            size_t added_f_len = 0;
1089            const uint8_t * added_f = NULL;
1090
1091            tr_variantDictFindRaw (&val, TR_KEY_added6_f, &added_f, &added_f_len);
1092            pex = tr_peerMgrCompact6ToPex (added, added_len, added_f, added_f_len, &n);
1093
1094            n = MIN (n, MAX_PEX_PEER_COUNT);
1095            for (i=0; i<n; ++i)
1096            {
1097                int seedProbability = -1;
1098                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1099                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1100            }
1101
1102            tr_free (pex);
1103        }
1104    }
1105
1106    if (loaded)
1107        tr_variantFree (&val);
1108    tr_free (tmp);
1109}
1110
1111static void sendPex (tr_peermsgs * msgs);
1112
1113static void
1114parseLtep (tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf)
1115{
1116    uint8_t ltep_msgid;
1117
1118    tr_peerIoReadUint8 (msgs->peer->io, inbuf, &ltep_msgid);
1119    msglen--;
1120
1121    if (ltep_msgid == LTEP_HANDSHAKE)
1122    {
1123        dbgmsg (msgs, "got ltep handshake");
1124        parseLtepHandshake (msgs, msglen, inbuf);
1125        if (tr_peerIoSupportsLTEP (msgs->peer->io))
1126        {
1127            sendLtepHandshake (msgs);
1128            sendPex (msgs);
1129        }
1130    }
1131    else if (ltep_msgid == UT_PEX_ID)
1132    {
1133        dbgmsg (msgs, "got ut pex");
1134        msgs->peerSupportsPex = 1;
1135        parseUtPex (msgs, msglen, inbuf);
1136    }
1137    else if (ltep_msgid == UT_METADATA_ID)
1138    {
1139        dbgmsg (msgs, "got ut metadata");
1140        msgs->peerSupportsMetadataXfer = 1;
1141        parseUtMetadata (msgs, msglen, inbuf);
1142    }
1143    else
1144    {
1145        dbgmsg (msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid);
1146        evbuffer_drain (inbuf, msglen);
1147    }
1148}
1149
1150static int
1151readBtLength (tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1152{
1153    uint32_t len;
1154
1155    if (inlen < sizeof (len))
1156        return READ_LATER;
1157
1158    tr_peerIoReadUint32 (msgs->peer->io, inbuf, &len);
1159
1160    if (len == 0) /* peer sent us a keepalive message */
1161        dbgmsg (msgs, "got KeepAlive");
1162    else
1163    {
1164        msgs->incoming.length = len;
1165        msgs->state = AWAITING_BT_ID;
1166    }
1167
1168    return READ_NOW;
1169}
1170
1171static int readBtMessage (tr_peermsgs *, struct evbuffer *, size_t);
1172
1173static int
1174readBtId (tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1175{
1176    uint8_t id;
1177
1178    if (inlen < sizeof (uint8_t))
1179        return READ_LATER;
1180
1181    tr_peerIoReadUint8 (msgs->peer->io, inbuf, &id);
1182    msgs->incoming.id = id;
1183    dbgmsg (msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length);
1184
1185    if (id == BT_PIECE)
1186    {
1187        msgs->state = AWAITING_BT_PIECE;
1188        return READ_NOW;
1189    }
1190    else if (msgs->incoming.length != 1)
1191    {
1192        msgs->state = AWAITING_BT_MESSAGE;
1193        return READ_NOW;
1194    }
1195    else return readBtMessage (msgs, inbuf, inlen - 1);
1196}
1197
1198static void
1199updatePeerProgress (tr_peermsgs * msgs)
1200{
1201    tr_peerUpdateProgress (msgs->torrent, msgs->peer);
1202
1203    /*updateFastSet (msgs);*/
1204    updateInterest (msgs);
1205}
1206
1207static void
1208prefetchPieces (tr_peermsgs *msgs)
1209{
1210    int i;
1211
1212    if (!getSession (msgs)->isPrefetchEnabled)
1213        return;
1214
1215    for (i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<PREFETCH_SIZE; ++i)
1216    {
1217        const struct peer_request * req = msgs->peerAskedFor + i;
1218        if (requestIsValid (msgs, req))
1219        {
1220            tr_cachePrefetchBlock (getSession (msgs)->cache, msgs->torrent, req->index, req->offset, req->length);
1221            ++msgs->prefetchCount;
1222        }
1223    }
1224}
1225
1226static void
1227peerMadeRequest (tr_peermsgs * msgs, const struct peer_request * req)
1228{
1229    const bool fext = tr_peerIoSupportsFEXT (msgs->peer->io);
1230    const int reqIsValid = requestIsValid (msgs, req);
1231    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete (&msgs->torrent->completion, req->index);
1232    const int peerIsChoked = msgs->peer->peerIsChoked;
1233
1234    int allow = false;
1235
1236    if (!reqIsValid)
1237        dbgmsg (msgs, "rejecting an invalid request.");
1238    else if (!clientHasPiece)
1239        dbgmsg (msgs, "rejecting request for a piece we don't have.");
1240    else if (peerIsChoked)
1241        dbgmsg (msgs, "rejecting request from choked peer");
1242    else if (msgs->peer->pendingReqsToClient + 1 >= REQQ)
1243        dbgmsg (msgs, "rejecting request ... reqq is full");
1244    else
1245        allow = true;
1246
1247    if (allow) {
1248        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1249        prefetchPieces (msgs);
1250    } else if (fext) {
1251        protocolSendReject (msgs, req);
1252    }
1253}
1254
1255static bool
1256messageLengthIsCorrect (const tr_peermsgs * msg, uint8_t id, uint32_t len)
1257{
1258    switch (id)
1259    {
1260        case BT_CHOKE:
1261        case BT_UNCHOKE:
1262        case BT_INTERESTED:
1263        case BT_NOT_INTERESTED:
1264        case BT_FEXT_HAVE_ALL:
1265        case BT_FEXT_HAVE_NONE:
1266            return len == 1;
1267
1268        case BT_HAVE:
1269        case BT_FEXT_SUGGEST:
1270        case BT_FEXT_ALLOWED_FAST:
1271            return len == 5;
1272
1273        case BT_BITFIELD:
1274            if (tr_torrentHasMetadata (msg->torrent))
1275                return len == (msg->torrent->info.pieceCount + 7u) / 8u + 1u;
1276            /* we don't know the piece count yet,
1277               so we can only guess whether to send true or false */
1278            if (msg->metadata_size_hint > 0)
1279                return len <= msg->metadata_size_hint;
1280            return true;
1281
1282        case BT_REQUEST:
1283        case BT_CANCEL:
1284        case BT_FEXT_REJECT:
1285            return len == 13;
1286
1287        case BT_PIECE:
1288            return len > 9 && len <= 16393;
1289
1290        case BT_PORT:
1291            return len == 3;
1292
1293        case BT_LTEP:
1294            return len >= 2;
1295
1296        default:
1297            return false;
1298    }
1299}
1300
1301static int clientGotBlock (tr_peermsgs *               msgs,
1302                           struct evbuffer *           block,
1303                           const struct peer_request * req);
1304
1305static int
1306readBtPiece (tr_peermsgs      * msgs,
1307             struct evbuffer  * inbuf,
1308             size_t             inlen,
1309             size_t           * setme_piece_bytes_read)
1310{
1311    struct peer_request * req = &msgs->incoming.blockReq;
1312
1313    assert (evbuffer_get_length (inbuf) >= inlen);
1314    dbgmsg (msgs, "In readBtPiece");
1315
1316    if (!req->length)
1317    {
1318        if (inlen < 8)
1319            return READ_LATER;
1320
1321        tr_peerIoReadUint32 (msgs->peer->io, inbuf, &req->index);
1322        tr_peerIoReadUint32 (msgs->peer->io, inbuf, &req->offset);
1323        req->length = msgs->incoming.length - 9;
1324        dbgmsg (msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length);
1325        return READ_NOW;
1326    }
1327    else
1328    {
1329        int err;
1330        size_t n;
1331        size_t nLeft;
1332        struct evbuffer * block_buffer;
1333
1334        if (msgs->incoming.block == NULL)
1335            msgs->incoming.block = evbuffer_new ();
1336        block_buffer = msgs->incoming.block;
1337
1338        /* read in another chunk of data */
1339        nLeft = req->length - evbuffer_get_length (block_buffer);
1340        n = MIN (nLeft, inlen);
1341
1342        tr_peerIoReadBytesToBuf (msgs->peer->io, inbuf, block_buffer, n);
1343
1344        fireClientGotPieceData (msgs, n);
1345        *setme_piece_bytes_read += n;
1346        dbgmsg (msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1347               n, req->index, req->offset, req->length,
1348             (int)(req->length - evbuffer_get_length (block_buffer)));
1349        if (evbuffer_get_length (block_buffer) < req->length)
1350            return READ_LATER;
1351
1352        /* pass the block along... */
1353        err = clientGotBlock (msgs, block_buffer, req);
1354        evbuffer_drain (block_buffer, evbuffer_get_length (block_buffer));
1355
1356        /* cleanup */
1357        req->length = 0;
1358        msgs->state = AWAITING_BT_LENGTH;
1359        return err ? READ_ERR : READ_NOW;
1360    }
1361}
1362
1363static void updateDesiredRequestCount (tr_peermsgs * msgs);
1364
1365static int
1366readBtMessage (tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1367{
1368    uint32_t      ui32;
1369    uint32_t      msglen = msgs->incoming.length;
1370    const uint8_t id = msgs->incoming.id;
1371#ifndef NDEBUG
1372    const size_t  startBufLen = evbuffer_get_length (inbuf);
1373#endif
1374    const bool fext = tr_peerIoSupportsFEXT (msgs->peer->io);
1375
1376    --msglen; /* id length */
1377
1378    dbgmsg (msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen);
1379
1380    if (inlen < msglen)
1381        return READ_LATER;
1382
1383    if (!messageLengthIsCorrect (msgs, id, msglen + 1))
1384    {
1385        dbgmsg (msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen);
1386        fireError (msgs, EMSGSIZE);
1387        return READ_ERR;
1388    }
1389
1390    switch (id)
1391    {
1392        case BT_CHOKE:
1393            dbgmsg (msgs, "got Choke");
1394            msgs->peer->clientIsChoked = 1;
1395            if (!fext)
1396                fireGotChoke (msgs);
1397            break;
1398
1399        case BT_UNCHOKE:
1400            dbgmsg (msgs, "got Unchoke");
1401            msgs->peer->clientIsChoked = 0;
1402            updateDesiredRequestCount (msgs);
1403            break;
1404
1405        case BT_INTERESTED:
1406            dbgmsg (msgs, "got Interested");
1407            msgs->peer->peerIsInterested = 1;
1408            break;
1409
1410        case BT_NOT_INTERESTED:
1411            dbgmsg (msgs, "got Not Interested");
1412            msgs->peer->peerIsInterested = 0;
1413            break;
1414
1415        case BT_HAVE:
1416            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &ui32);
1417            dbgmsg (msgs, "got Have: %u", ui32);
1418            if (tr_torrentHasMetadata (msgs->torrent)
1419                    && (ui32 >= msgs->torrent->info.pieceCount))
1420            {
1421                fireError (msgs, ERANGE);
1422                return READ_ERR;
1423            }
1424
1425            /* a peer can send the same HAVE message twice... */
1426            if (!tr_bitfieldHas (&msgs->peer->have, ui32)) {
1427                tr_bitfieldAdd (&msgs->peer->have, ui32);
1428                fireClientGotHave (msgs, ui32);
1429            }
1430            updatePeerProgress (msgs);
1431            break;
1432
1433        case BT_BITFIELD: {
1434            uint8_t * tmp = tr_new (uint8_t, msglen);
1435            dbgmsg (msgs, "got a bitfield");
1436            tr_peerIoReadBytes (msgs->peer->io, inbuf, tmp, msglen);
1437            tr_bitfieldSetRaw (&msgs->peer->have, tmp, msglen, tr_torrentHasMetadata (msgs->torrent));
1438            fireClientGotBitfield (msgs, &msgs->peer->have);
1439            updatePeerProgress (msgs);
1440            tr_free (tmp);
1441            break;
1442        }
1443
1444        case BT_REQUEST:
1445        {
1446            struct peer_request r;
1447            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.index);
1448            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.offset);
1449            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.length);
1450            dbgmsg (msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length);
1451            peerMadeRequest (msgs, &r);
1452            break;
1453        }
1454
1455        case BT_CANCEL:
1456        {
1457            int i;
1458            struct peer_request r;
1459            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.index);
1460            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.offset);
1461            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.length);
1462            tr_historyAdd (&msgs->peer->cancelsSentToClient, tr_time (), 1);
1463            dbgmsg (msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length);
1464
1465            for (i=0; i<msgs->peer->pendingReqsToClient; ++i) {
1466                const struct peer_request * req = msgs->peerAskedFor + i;
1467                if ((req->index == r.index) && (req->offset == r.offset) && (req->length == r.length))
1468                    break;
1469            }
1470
1471            if (i < msgs->peer->pendingReqsToClient)
1472                tr_removeElementFromArray (msgs->peerAskedFor, i, sizeof (struct peer_request),
1473                                           msgs->peer->pendingReqsToClient--);
1474            break;
1475        }
1476
1477        case BT_PIECE:
1478            assert (0); /* handled elsewhere! */
1479            break;
1480
1481        case BT_PORT:
1482            dbgmsg (msgs, "Got a BT_PORT");
1483            tr_peerIoReadUint16 (msgs->peer->io, inbuf, &msgs->dht_port);
1484            if (msgs->dht_port > 0)
1485                tr_dhtAddNode (getSession (msgs),
1486                               tr_peerAddress (msgs->peer),
1487                               msgs->dht_port, 0);
1488            break;
1489
1490        case BT_FEXT_SUGGEST:
1491            dbgmsg (msgs, "Got a BT_FEXT_SUGGEST");
1492            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &ui32);
1493            if (fext)
1494                fireClientGotSuggest (msgs, ui32);
1495            else {
1496                fireError (msgs, EMSGSIZE);
1497                return READ_ERR;
1498            }
1499            break;
1500
1501        case BT_FEXT_ALLOWED_FAST:
1502            dbgmsg (msgs, "Got a BT_FEXT_ALLOWED_FAST");
1503            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &ui32);
1504            if (fext)
1505                fireClientGotAllowedFast (msgs, ui32);
1506            else {
1507                fireError (msgs, EMSGSIZE);
1508                return READ_ERR;
1509            }
1510            break;
1511
1512        case BT_FEXT_HAVE_ALL:
1513            dbgmsg (msgs, "Got a BT_FEXT_HAVE_ALL");
1514            if (fext) {
1515                tr_bitfieldSetHasAll (&msgs->peer->have);
1516assert (tr_bitfieldHasAll (&msgs->peer->have));
1517                fireClientGotHaveAll (msgs);
1518                updatePeerProgress (msgs);
1519            } else {
1520                fireError (msgs, EMSGSIZE);
1521                return READ_ERR;
1522            }
1523            break;
1524
1525        case BT_FEXT_HAVE_NONE:
1526            dbgmsg (msgs, "Got a BT_FEXT_HAVE_NONE");
1527            if (fext) {
1528                tr_bitfieldSetHasNone (&msgs->peer->have);
1529                fireClientGotHaveNone (msgs);
1530                updatePeerProgress (msgs);
1531            } else {
1532                fireError (msgs, EMSGSIZE);
1533                return READ_ERR;
1534            }
1535            break;
1536
1537        case BT_FEXT_REJECT:
1538        {
1539            struct peer_request r;
1540            dbgmsg (msgs, "Got a BT_FEXT_REJECT");
1541            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.index);
1542            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.offset);
1543            tr_peerIoReadUint32 (msgs->peer->io, inbuf, &r.length);
1544            if (fext)
1545                fireGotRej (msgs, &r);
1546            else {
1547                fireError (msgs, EMSGSIZE);
1548                return READ_ERR;
1549            }
1550            break;
1551        }
1552
1553        case BT_LTEP:
1554            dbgmsg (msgs, "Got a BT_LTEP");
1555            parseLtep (msgs, msglen, inbuf);
1556            break;
1557
1558        default:
1559            dbgmsg (msgs, "peer sent us an UNKNOWN: %d", (int)id);
1560            tr_peerIoDrain (msgs->peer->io, inbuf, msglen);
1561            break;
1562    }
1563
1564    assert (msglen + 1 == msgs->incoming.length);
1565    assert (evbuffer_get_length (inbuf) == startBufLen - msglen);
1566
1567    msgs->state = AWAITING_BT_LENGTH;
1568    return READ_NOW;
1569}
1570
1571/* returns 0 on success, or an errno on failure */
1572static int
1573clientGotBlock (tr_peermsgs                * msgs,
1574                struct evbuffer            * data,
1575                const struct peer_request  * req)
1576{
1577    int err;
1578    tr_torrent * tor = msgs->torrent;
1579    const tr_block_index_t block = _tr_block (tor, req->index, req->offset);
1580
1581    assert (msgs);
1582    assert (req);
1583
1584    if (req->length != tr_torBlockCountBytes (msgs->torrent, block)) {
1585        dbgmsg (msgs, "wrong block size -- expected %u, got %d",
1586                tr_torBlockCountBytes (msgs->torrent, block), req->length);
1587        return EMSGSIZE;
1588    }
1589
1590    dbgmsg (msgs, "got block %u:%u->%u", req->index, req->offset, req->length);
1591
1592    if (!tr_peerMgrDidPeerRequest (msgs->torrent, msgs->peer, block)) {
1593        dbgmsg (msgs, "we didn't ask for this message...");
1594        return 0;
1595    }
1596    if (tr_cpPieceIsComplete (&msgs->torrent->completion, req->index)) {
1597        dbgmsg (msgs, "we did ask for this message, but the piece is already complete...");
1598        return 0;
1599    }
1600
1601    /**
1602    ***  Save the block
1603    **/
1604
1605    if ((err = tr_cacheWriteBlock (getSession (msgs)->cache, tor, req->index, req->offset, req->length, data)))
1606        return err;
1607
1608    tr_bitfieldAdd (&msgs->peer->blame, req->index);
1609    fireGotBlock (msgs, req);
1610    return 0;
1611}
1612
1613static int peerPulse (void * vmsgs);
1614
1615static void
1616didWrite (tr_peerIo * io UNUSED, size_t bytesWritten, bool wasPieceData, void * vmsgs)
1617{
1618    tr_peermsgs * msgs = vmsgs;
1619
1620    if (wasPieceData)
1621      firePeerGotPieceData (msgs, bytesWritten);
1622
1623    if (tr_isPeerIo (io) && io->userData)
1624        peerPulse (msgs);
1625}
1626
1627static ReadState
1628canRead (tr_peerIo * io, void * vmsgs, size_t * piece)
1629{
1630    ReadState         ret;
1631    tr_peermsgs *     msgs = vmsgs;
1632    struct evbuffer * in = tr_peerIoGetReadBuffer (io);
1633    const size_t      inlen = evbuffer_get_length (in);
1634
1635    dbgmsg (msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state);
1636
1637    if (!inlen)
1638    {
1639        ret = READ_LATER;
1640    }
1641    else if (msgs->state == AWAITING_BT_PIECE)
1642    {
1643        ret = readBtPiece (msgs, in, inlen, piece);
1644    }
1645    else switch (msgs->state)
1646    {
1647        case AWAITING_BT_LENGTH:
1648            ret = readBtLength (msgs, in, inlen); break;
1649
1650        case AWAITING_BT_ID:
1651            ret = readBtId   (msgs, in, inlen); break;
1652
1653        case AWAITING_BT_MESSAGE:
1654            ret = readBtMessage (msgs, in, inlen); break;
1655
1656        default:
1657            ret = READ_ERR;
1658            assert (0);
1659    }
1660
1661    dbgmsg (msgs, "canRead: ret is %d", (int)ret);
1662
1663    return ret;
1664}
1665
1666int
1667tr_peerMsgsIsReadingBlock (const tr_peermsgs * msgs, tr_block_index_t block)
1668{
1669    if (msgs->state != AWAITING_BT_PIECE)
1670        return false;
1671
1672    return block == _tr_block (msgs->torrent,
1673                               msgs->incoming.blockReq.index,
1674                               msgs->incoming.blockReq.offset);
1675}
1676
1677/**
1678***
1679**/
1680
1681static void
1682updateDesiredRequestCount (tr_peermsgs * msgs)
1683{
1684    tr_torrent * const torrent = msgs->torrent;
1685
1686    /* there are lots of reasons we might not want to request any blocks... */
1687    if (tr_torrentIsSeed (torrent) || !tr_torrentHasMetadata (torrent)
1688                                    || msgs->peer->clientIsChoked
1689                                    || !msgs->peer->clientIsInterested)
1690    {
1691        msgs->desiredRequestCount = 0;
1692    }
1693    else
1694    {
1695        int estimatedBlocksInPeriod;
1696        unsigned int rate_Bps;
1697        unsigned int irate_Bps;
1698        const int floor = 4;
1699        const int seconds = REQUEST_BUF_SECS;
1700        const uint64_t now = tr_time_msec ();
1701
1702        /* Get the rate limit we should use.
1703         * FIXME: this needs to consider all the other peers as well... */
1704        rate_Bps = tr_peerGetPieceSpeed_Bps (msgs->peer, now, TR_PEER_TO_CLIENT);
1705        if (tr_torrentUsesSpeedLimit (torrent, TR_PEER_TO_CLIENT))
1706            rate_Bps = MIN (rate_Bps, tr_torrentGetSpeedLimit_Bps (torrent, TR_PEER_TO_CLIENT));
1707
1708        /* honor the session limits, if enabled */
1709        if (tr_torrentUsesSessionLimits (torrent) &&
1710            tr_sessionGetActiveSpeedLimit_Bps (torrent->session, TR_PEER_TO_CLIENT, &irate_Bps))
1711                rate_Bps = MIN (rate_Bps, irate_Bps);
1712
1713        /* use this desired rate to figure out how
1714         * many requests we should send to this peer */
1715        estimatedBlocksInPeriod = (rate_Bps * seconds) / torrent->blockSize;
1716        msgs->desiredRequestCount = MAX (floor, estimatedBlocksInPeriod);
1717
1718        /* honor the peer's maximum request count, if specified */
1719        if (msgs->reqq > 0)
1720            if (msgs->desiredRequestCount > msgs->reqq)
1721                msgs->desiredRequestCount = msgs->reqq;
1722    }
1723}
1724
1725static void
1726updateMetadataRequests (tr_peermsgs * msgs, time_t now)
1727{
1728    int piece;
1729
1730    if (msgs->peerSupportsMetadataXfer
1731        && tr_torrentGetNextMetadataRequest (msgs->torrent, now, &piece))
1732    {
1733        tr_variant tmp;
1734        struct evbuffer * payload;
1735        struct evbuffer * out = msgs->outMessages;
1736
1737        /* build the data message */
1738        tr_variantInitDict (&tmp, 3);
1739        tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REQUEST);
1740        tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1741        payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1742
1743        dbgmsg (msgs, "requesting metadata piece #%d", piece);
1744
1745        /* write it out as a LTEP message to our outMessages buffer */
1746        evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1747        evbuffer_add_uint8 (out, BT_LTEP);
1748        evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1749        evbuffer_add_buffer (out, payload);
1750        pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1751        dbgOutMessageLen (msgs);
1752
1753        /* cleanup */
1754        evbuffer_free (payload);
1755        tr_variantFree (&tmp);
1756    }
1757}
1758
1759static void
1760updateBlockRequests (tr_peermsgs * msgs)
1761{
1762    if (tr_torrentIsPieceTransferAllowed (msgs->torrent, TR_PEER_TO_CLIENT)
1763        && (msgs->desiredRequestCount > 0)
1764        && (msgs->peer->pendingReqsToPeer <= (msgs->desiredRequestCount * 0.66)))
1765    {
1766        int i;
1767        int n;
1768        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1769        tr_block_index_t * blocks = tr_new (tr_block_index_t, numwant);
1770
1771        tr_peerMgrGetNextRequests (msgs->torrent, msgs->peer, numwant, blocks, &n, false);
1772
1773        for (i=0; i<n; ++i)
1774        {
1775            struct peer_request req;
1776            blockToReq (msgs->torrent, blocks[i], &req);
1777            protocolSendRequest (msgs, &req);
1778        }
1779
1780        tr_free (blocks);
1781    }
1782}
1783
1784static size_t
1785fillOutputBuffer (tr_peermsgs * msgs, time_t now)
1786{
1787    int piece;
1788    size_t bytesWritten = 0;
1789    struct peer_request req;
1790    const bool haveMessages = evbuffer_get_length (msgs->outMessages) != 0;
1791    const bool fext = tr_peerIoSupportsFEXT (msgs->peer->io);
1792
1793    /**
1794    ***  Protocol messages
1795    **/
1796
1797    if (haveMessages && !msgs->outMessagesBatchedAt) /* fresh batch */
1798    {
1799        dbgmsg (msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length (msgs->outMessages));
1800        msgs->outMessagesBatchedAt = now;
1801    }
1802    else if (haveMessages && ((now - msgs->outMessagesBatchedAt) >= msgs->outMessagesBatchPeriod))
1803    {
1804        const size_t len = evbuffer_get_length (msgs->outMessages);
1805        /* flush the protocol messages */
1806        dbgmsg (msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len);
1807        tr_peerIoWriteBuf (msgs->peer->io, msgs->outMessages, false);
1808        msgs->clientSentAnythingAt = now;
1809        msgs->outMessagesBatchedAt = 0;
1810        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1811        bytesWritten +=  len;
1812    }
1813
1814    /**
1815    ***  Metadata Pieces
1816    **/
1817
1818    if ((tr_peerIoGetWriteBufferSpace (msgs->peer->io, now) >= METADATA_PIECE_SIZE)
1819        && popNextMetadataRequest (msgs, &piece))
1820    {
1821        char * data;
1822        int dataLen;
1823        bool ok = false;
1824
1825        data = tr_torrentGetMetadataPiece (msgs->torrent, piece, &dataLen);
1826        if ((dataLen > 0) && (data != NULL))
1827        {
1828            tr_variant tmp;
1829            struct evbuffer * payload;
1830            struct evbuffer * out = msgs->outMessages;
1831
1832            /* build the data message */
1833            tr_variantInitDict (&tmp, 3);
1834            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_DATA);
1835            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1836            tr_variantDictAddInt (&tmp, TR_KEY_total_size, msgs->torrent->infoDictLength);
1837            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1838
1839            /* write it out as a LTEP message to our outMessages buffer */
1840            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload) + dataLen);
1841            evbuffer_add_uint8 (out, BT_LTEP);
1842            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1843            evbuffer_add_buffer (out, payload);
1844            evbuffer_add     (out, data, dataLen);
1845            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1846            dbgOutMessageLen (msgs);
1847
1848            evbuffer_free (payload);
1849            tr_variantFree (&tmp);
1850            tr_free (data);
1851
1852            ok = true;
1853        }
1854
1855        if (!ok) /* send a rejection message */
1856        {
1857            tr_variant tmp;
1858            struct evbuffer * payload;
1859            struct evbuffer * out = msgs->outMessages;
1860
1861            /* build the rejection message */
1862            tr_variantInitDict (&tmp, 2);
1863            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
1864            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1865            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1866
1867            /* write it out as a LTEP message to our outMessages buffer */
1868            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1869            evbuffer_add_uint8 (out, BT_LTEP);
1870            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1871            evbuffer_add_buffer (out, payload);
1872            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1873            dbgOutMessageLen (msgs);
1874
1875            evbuffer_free (payload);
1876            tr_variantFree (&tmp);
1877        }
1878    }
1879
1880    /**
1881    ***  Data Blocks
1882    **/
1883
1884    if ((tr_peerIoGetWriteBufferSpace (msgs->peer->io, now) >= msgs->torrent->blockSize)
1885        && popNextRequest (msgs, &req))
1886    {
1887        --msgs->prefetchCount;
1888
1889        if (requestIsValid (msgs, &req)
1890            && tr_cpPieceIsComplete (&msgs->torrent->completion, req.index))
1891        {
1892            int err;
1893            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1894            struct evbuffer * out;
1895            struct evbuffer_iovec iovec[1];
1896
1897            out = evbuffer_new ();
1898            evbuffer_expand (out, msglen);
1899
1900            evbuffer_add_uint32 (out, sizeof (uint8_t) + 2 * sizeof (uint32_t) + req.length);
1901            evbuffer_add_uint8 (out, BT_PIECE);
1902            evbuffer_add_uint32 (out, req.index);
1903            evbuffer_add_uint32 (out, req.offset);
1904
1905            evbuffer_reserve_space (out, req.length, iovec, 1);
1906            err = tr_cacheReadBlock (getSession (msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base);
1907            iovec[0].iov_len = req.length;
1908            evbuffer_commit_space (out, iovec, 1);
1909
1910            /* check the piece if it needs checking... */
1911            if (!err && tr_torrentPieceNeedsCheck (msgs->torrent, req.index))
1912                if ((err = !tr_torrentCheckPiece (msgs->torrent, req.index)))
1913                    tr_torrentSetLocalError (msgs->torrent, _("Please Verify Local Data! Piece #%zu is corrupt."), (size_t)req.index);
1914
1915            if (err)
1916            {
1917                if (fext)
1918                    protocolSendReject (msgs, &req);
1919            }
1920            else
1921            {
1922                const size_t n = evbuffer_get_length (out);
1923                dbgmsg (msgs, "sending block %u:%u->%u", req.index, req.offset, req.length);
1924                assert (n == msglen);
1925                tr_peerIoWriteBuf (msgs->peer->io, out, true);
1926                bytesWritten += n;
1927                msgs->clientSentAnythingAt = now;
1928                tr_historyAdd (&msgs->peer->blocksSentToPeer, tr_time (), 1);
1929            }
1930
1931            evbuffer_free (out);
1932
1933            if (err)
1934            {
1935                bytesWritten = 0;
1936                msgs = NULL;
1937            }
1938        }
1939        else if (fext) /* peer needs a reject message */
1940        {
1941            protocolSendReject (msgs, &req);
1942        }
1943
1944        if (msgs != NULL)
1945            prefetchPieces (msgs);
1946    }
1947
1948    /**
1949    ***  Keepalive
1950    **/
1951
1952    if ((msgs != NULL)
1953        && (msgs->clientSentAnythingAt != 0)
1954        && ((now - msgs->clientSentAnythingAt) > KEEPALIVE_INTERVAL_SECS))
1955    {
1956        dbgmsg (msgs, "sending a keepalive message");
1957        evbuffer_add_uint32 (msgs->outMessages, 0);
1958        pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
1959    }
1960
1961    return bytesWritten;
1962}
1963
1964static int
1965peerPulse (void * vmsgs)
1966{
1967    tr_peermsgs * msgs = vmsgs;
1968    const time_t  now = tr_time ();
1969
1970    if (tr_isPeerIo (msgs->peer->io)) {
1971        updateDesiredRequestCount (msgs);
1972        updateBlockRequests (msgs);
1973        updateMetadataRequests (msgs, now);
1974    }
1975
1976    for (;;)
1977        if (fillOutputBuffer (msgs, now) < 1)
1978            break;
1979
1980    return true; /* loop forever */
1981}
1982
1983void
1984tr_peerMsgsPulse (tr_peermsgs * msgs)
1985{
1986    if (msgs != NULL)
1987        peerPulse (msgs);
1988}
1989
1990static void
1991gotError (tr_peerIo * io UNUSED, short what, void * vmsgs)
1992{
1993    if (what & BEV_EVENT_TIMEOUT)
1994        dbgmsg (vmsgs, "libevent got a timeout, what=%hd", what);
1995    if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR))
1996        dbgmsg (vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1997               what, errno, tr_strerror (errno));
1998    fireError (vmsgs, ENOTCONN);
1999}
2000
2001static void
2002sendBitfield (tr_peermsgs * msgs)
2003{
2004    void * bytes;
2005    size_t byte_count = 0;
2006    struct evbuffer * out = msgs->outMessages;
2007
2008    assert (tr_torrentHasMetadata (msgs->torrent));
2009
2010    bytes = tr_cpCreatePieceBitfield (&msgs->torrent->completion, &byte_count);
2011    evbuffer_add_uint32 (out, sizeof (uint8_t) + byte_count);
2012    evbuffer_add_uint8 (out, BT_BITFIELD);
2013    evbuffer_add     (out, bytes, byte_count);
2014    dbgmsg (msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length (out));
2015    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2016
2017    tr_free (bytes);
2018}
2019
2020static void
2021tellPeerWhatWeHave (tr_peermsgs * msgs)
2022{
2023    const bool fext = tr_peerIoSupportsFEXT (msgs->peer->io);
2024
2025    if (fext && tr_cpHasAll (&msgs->torrent->completion))
2026    {
2027        protocolSendHaveAll (msgs);
2028    }
2029    else if (fext && tr_cpHasNone (&msgs->torrent->completion))
2030    {
2031        protocolSendHaveNone (msgs);
2032    }
2033    else if (!tr_cpHasNone (&msgs->torrent->completion))
2034    {
2035        sendBitfield (msgs);
2036    }
2037}
2038
2039/**
2040***
2041**/
2042
2043/* some peers give us error messages if we send
2044   more than this many peers in a single pex message
2045   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2046#define MAX_PEX_ADDED 50
2047#define MAX_PEX_DROPPED 50
2048
2049typedef struct
2050{
2051    tr_pex *  added;
2052    tr_pex *  dropped;
2053    tr_pex *  elements;
2054    int       addedCount;
2055    int       droppedCount;
2056    int       elementCount;
2057}
2058PexDiffs;
2059
2060static void
2061pexAddedCb (void * vpex, void * userData)
2062{
2063    PexDiffs * diffs = userData;
2064    tr_pex *   pex = vpex;
2065
2066    if (diffs->addedCount < MAX_PEX_ADDED)
2067    {
2068        diffs->added[diffs->addedCount++] = *pex;
2069        diffs->elements[diffs->elementCount++] = *pex;
2070    }
2071}
2072
2073static inline void
2074pexDroppedCb (void * vpex, void * userData)
2075{
2076    PexDiffs * diffs = userData;
2077    tr_pex *   pex = vpex;
2078
2079    if (diffs->droppedCount < MAX_PEX_DROPPED)
2080    {
2081        diffs->dropped[diffs->droppedCount++] = *pex;
2082    }
2083}
2084
2085static inline void
2086pexElementCb (void * vpex, void * userData)
2087{
2088    PexDiffs * diffs = userData;
2089    tr_pex * pex = vpex;
2090
2091    diffs->elements[diffs->elementCount++] = *pex;
2092}
2093
2094typedef void (tr_set_func)(void * element, void * userData);
2095
2096/**
2097 * @brief find the differences and commonalities in two sorted sets
2098 * @param a the first set
2099 * @param aCount the number of elements in the set 'a'
2100 * @param b the second set
2101 * @param bCount the number of elements in the set 'b'
2102 * @param compare the sorting method for both sets
2103 * @param elementSize the sizeof the element in the two sorted sets
2104 * @param in_a called for items in set 'a' but not set 'b'
2105 * @param in_b called for items in set 'b' but not set 'a'
2106 * @param in_both called for items that are in both sets
2107 * @param userData user data passed along to in_a, in_b, and in_both
2108 */
2109static void
2110tr_set_compare (const void * va, size_t aCount,
2111                const void * vb, size_t bCount,
2112                int compare (const void * a, const void * b),
2113                size_t elementSize,
2114                tr_set_func in_a_cb,
2115                tr_set_func in_b_cb,
2116                tr_set_func in_both_cb,
2117                void * userData)
2118{
2119    const uint8_t * a = va;
2120    const uint8_t * b = vb;
2121    const uint8_t * aend = a + elementSize * aCount;
2122    const uint8_t * bend = b + elementSize * bCount;
2123
2124    while (a != aend || b != bend)
2125    {
2126        if (a == aend)
2127        {
2128          (*in_b_cb)((void*)b, userData);
2129            b += elementSize;
2130        }
2131        else if (b == bend)
2132        {
2133          (*in_a_cb)((void*)a, userData);
2134            a += elementSize;
2135        }
2136        else
2137        {
2138            const int val = (*compare)(a, b);
2139
2140            if (!val)
2141            {
2142              (*in_both_cb)((void*)a, userData);
2143                a += elementSize;
2144                b += elementSize;
2145            }
2146            else if (val < 0)
2147            {
2148              (*in_a_cb)((void*)a, userData);
2149                a += elementSize;
2150            }
2151            else if (val > 0)
2152            {
2153              (*in_b_cb)((void*)b, userData);
2154                b += elementSize;
2155            }
2156        }
2157    }
2158}
2159
2160
2161static void
2162sendPex (tr_peermsgs * msgs)
2163{
2164    if (msgs->peerSupportsPex && tr_torrentAllowsPex (msgs->torrent))
2165    {
2166        PexDiffs diffs;
2167        PexDiffs diffs6;
2168        tr_pex * newPex = NULL;
2169        tr_pex * newPex6 = NULL;
2170        const int newCount = tr_peerMgrGetPeers (msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2171        const int newCount6 = tr_peerMgrGetPeers (msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2172
2173        /* build the diffs */
2174        diffs.added = tr_new (tr_pex, newCount);
2175        diffs.addedCount = 0;
2176        diffs.dropped = tr_new (tr_pex, msgs->pexCount);
2177        diffs.droppedCount = 0;
2178        diffs.elements = tr_new (tr_pex, newCount + msgs->pexCount);
2179        diffs.elementCount = 0;
2180        tr_set_compare (msgs->pex, msgs->pexCount,
2181                        newPex, newCount,
2182                        tr_pexCompare, sizeof (tr_pex),
2183                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs);
2184        diffs6.added = tr_new (tr_pex, newCount6);
2185        diffs6.addedCount = 0;
2186        diffs6.dropped = tr_new (tr_pex, msgs->pexCount6);
2187        diffs6.droppedCount = 0;
2188        diffs6.elements = tr_new (tr_pex, newCount6 + msgs->pexCount6);
2189        diffs6.elementCount = 0;
2190        tr_set_compare (msgs->pex6, msgs->pexCount6,
2191                        newPex6, newCount6,
2192                        tr_pexCompare, sizeof (tr_pex),
2193                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6);
2194        dbgmsg (
2195            msgs,
2196            "pex: old peer count %d+%d, new peer count %d+%d, "
2197            "added %d+%d, removed %d+%d",
2198            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2199            diffs.addedCount, diffs6.addedCount,
2200            diffs.droppedCount, diffs6.droppedCount);
2201
2202        if (!diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2203            !diffs6.droppedCount)
2204        {
2205            tr_free (diffs.elements);
2206            tr_free (diffs6.elements);
2207        }
2208        else
2209        {
2210            int  i;
2211            tr_variant val;
2212            uint8_t * tmp, *walk;
2213            struct evbuffer * payload;
2214            struct evbuffer * out = msgs->outMessages;
2215
2216            /* update peer */
2217            tr_free (msgs->pex);
2218            msgs->pex = diffs.elements;
2219            msgs->pexCount = diffs.elementCount;
2220            tr_free (msgs->pex6);
2221            msgs->pex6 = diffs6.elements;
2222            msgs->pexCount6 = diffs6.elementCount;
2223
2224            /* build the pex payload */
2225            tr_variantInitDict (&val, 3); /* ipv6 support: left as 3:
2226                                         * speed vs. likelihood? */
2227
2228            if (diffs.addedCount > 0)
2229            {
2230                /* "added" */
2231                tmp = walk = tr_new (uint8_t, diffs.addedCount * 6);
2232                for (i = 0; i < diffs.addedCount; ++i) {
2233                    memcpy (walk, &diffs.added[i].addr.addr, 4); walk += 4;
2234                    memcpy (walk, &diffs.added[i].port, 2); walk += 2;
2235                }
2236                assert ((walk - tmp) == diffs.addedCount * 6);
2237                tr_variantDictAddRaw (&val, TR_KEY_added, tmp, walk - tmp);
2238                tr_free (tmp);
2239
2240                /* "added.f"
2241                 * unset each holepunch flag because we don't support it. */
2242                tmp = walk = tr_new (uint8_t, diffs.addedCount);
2243                for (i = 0; i < diffs.addedCount; ++i)
2244                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2245                assert ((walk - tmp) == diffs.addedCount);
2246                tr_variantDictAddRaw (&val, TR_KEY_added_f, tmp, walk - tmp);
2247                tr_free (tmp);
2248            }
2249
2250            if (diffs.droppedCount > 0)
2251            {
2252                /* "dropped" */
2253                tmp = walk = tr_new (uint8_t, diffs.droppedCount * 6);
2254                for (i = 0; i < diffs.droppedCount; ++i) {
2255                    memcpy (walk, &diffs.dropped[i].addr.addr, 4); walk += 4;
2256                    memcpy (walk, &diffs.dropped[i].port, 2); walk += 2;
2257                }
2258                assert ((walk - tmp) == diffs.droppedCount * 6);
2259                tr_variantDictAddRaw (&val, TR_KEY_dropped, tmp, walk - tmp);
2260                tr_free (tmp);
2261            }
2262
2263            if (diffs6.addedCount > 0)
2264            {
2265                /* "added6" */
2266                tmp = walk = tr_new (uint8_t, diffs6.addedCount * 18);
2267                for (i = 0; i < diffs6.addedCount; ++i) {
2268                    memcpy (walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16);
2269                    walk += 16;
2270                    memcpy (walk, &diffs6.added[i].port, 2);
2271                    walk += 2;
2272                }
2273                assert ((walk - tmp) == diffs6.addedCount * 18);
2274                tr_variantDictAddRaw (&val, TR_KEY_added6, tmp, walk - tmp);
2275                tr_free (tmp);
2276
2277                /* "added6.f"
2278                 * unset each holepunch flag because we don't support it. */
2279                tmp = walk = tr_new (uint8_t, diffs6.addedCount);
2280                for (i = 0; i < diffs6.addedCount; ++i)
2281                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2282                assert ((walk - tmp) == diffs6.addedCount);
2283                tr_variantDictAddRaw (&val, TR_KEY_added6_f, tmp, walk - tmp);
2284                tr_free (tmp);
2285            }
2286
2287            if (diffs6.droppedCount > 0)
2288            {
2289                /* "dropped6" */
2290                tmp = walk = tr_new (uint8_t, diffs6.droppedCount * 18);
2291                for (i = 0; i < diffs6.droppedCount; ++i) {
2292                    memcpy (walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16);
2293                    walk += 16;
2294                    memcpy (walk, &diffs6.dropped[i].port, 2);
2295                    walk += 2;
2296                }
2297                assert ((walk - tmp) == diffs6.droppedCount * 18);
2298                tr_variantDictAddRaw (&val, TR_KEY_dropped6, tmp, walk - tmp);
2299                tr_free (tmp);
2300            }
2301
2302            /* write the pex message */
2303            payload = tr_variantToBuf (&val, TR_VARIANT_FMT_BENC);
2304            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
2305            evbuffer_add_uint8 (out, BT_LTEP);
2306            evbuffer_add_uint8 (out, msgs->ut_pex_id);
2307            evbuffer_add_buffer (out, payload);
2308            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
2309            dbgmsg (msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length (out));
2310            dbgOutMessageLen (msgs);
2311
2312            evbuffer_free (payload);
2313            tr_variantFree (&val);
2314        }
2315
2316        /* cleanup */
2317        tr_free (diffs.added);
2318        tr_free (diffs.dropped);
2319        tr_free (newPex);
2320        tr_free (diffs6.added);
2321        tr_free (diffs6.dropped);
2322        tr_free (newPex6);
2323
2324        /*msgs->clientSentPexAt = tr_time ();*/
2325    }
2326}
2327
2328static void
2329pexPulse (int foo UNUSED, short bar UNUSED, void * vmsgs)
2330{
2331    struct tr_peermsgs * msgs = vmsgs;
2332
2333    sendPex (msgs);
2334
2335    assert (msgs->pexTimer != NULL);
2336    tr_timerAdd (msgs->pexTimer, PEX_INTERVAL_SECS, 0);
2337}
2338
2339/**
2340***
2341**/
2342
2343tr_peermsgs*
2344tr_peerMsgsNew (struct tr_torrent    * torrent,
2345                struct tr_peer       * peer,
2346                tr_peer_callback     * callback,
2347                void                 * callbackData)
2348{
2349    tr_peermsgs * m;
2350
2351    assert (peer);
2352    assert (peer->io);
2353
2354    m = tr_new0 (tr_peermsgs, 1);
2355    m->callback = callback;
2356    m->callbackData = callbackData;
2357    m->peer = peer;
2358    m->torrent = torrent;
2359    m->peer->clientIsChoked = 1;
2360    m->peer->peerIsChoked = 1;
2361    m->peer->clientIsInterested = 0;
2362    m->peer->peerIsInterested = 0;
2363    m->state = AWAITING_BT_LENGTH;
2364    m->outMessages = evbuffer_new ();
2365    m->outMessagesBatchedAt = 0;
2366    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2367    peer->msgs = m;
2368
2369    if (tr_torrentAllowsPex (torrent)) {
2370        m->pexTimer = evtimer_new (torrent->session->event_base, pexPulse, m);
2371        tr_timerAdd (m->pexTimer, PEX_INTERVAL_SECS, 0);
2372    }
2373
2374    if (tr_peerIoSupportsUTP (peer->io)) {
2375        const tr_address * addr = tr_peerIoGetAddress (peer->io, NULL);
2376        tr_peerMgrSetUtpSupported (torrent, addr);
2377        tr_peerMgrSetUtpFailed (torrent, addr, false);
2378    }
2379
2380    if (tr_peerIoSupportsLTEP (peer->io))
2381        sendLtepHandshake (m);
2382
2383    tellPeerWhatWeHave (m);
2384
2385    if (tr_dhtEnabled (torrent->session) && tr_peerIoSupportsDHT (peer->io))
2386    {
2387        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2388        const struct tr_address *addr = tr_peerIoGetAddress (peer->io, NULL);
2389        if (addr->type == TR_AF_INET || tr_globalIPv6 ()) {
2390            protocolSendPort (m, tr_dhtPort (torrent->session));
2391        }
2392    }
2393
2394    tr_peerIoSetIOFuncs (m->peer->io, canRead, didWrite, gotError, m);
2395    updateDesiredRequestCount (m);
2396
2397    return m;
2398}
2399
2400void
2401tr_peerMsgsFree (tr_peermsgs* msgs)
2402{
2403    if (msgs)
2404    {
2405        if (msgs->pexTimer != NULL)
2406            event_free (msgs->pexTimer);
2407
2408        if (msgs->incoming.block != NULL)
2409            evbuffer_free (msgs->incoming.block);
2410
2411        evbuffer_free (msgs->outMessages);
2412        tr_free (msgs->pex6);
2413        tr_free (msgs->pex);
2414
2415        memset (msgs, ~0, sizeof (tr_peermsgs));
2416        tr_free (msgs);
2417    }
2418}
Note: See TracBrowser for help on using the repository browser.