source: trunk/libtransmission/peer-msgs.c

Last change on this file was 14734, checked in by mikedld, 5 years ago

#6071: Fix typo in comment

  • Property svn:keywords set to Date Rev Author Id
File size: 75.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2014 Mnemosyne LLC
3 *
4 * It may be used under the GNU GPL versions 2 or 3
5 * or any future license endorsed by Mnemosyne LLC.
6 *
7 * $Id: peer-msgs.c 14734 2016-04-23 16:21:39Z mikedld $
8 */
9
10#include <assert.h>
11#include <errno.h>
12#include <stdarg.h>
13#include <stdlib.h>
14#include <string.h>
15
16#include <event2/buffer.h>
17#include <event2/bufferevent.h>
18#include <event2/event.h>
19
20#include "transmission.h"
21#include "cache.h"
22#include "completion.h"
23#include "file.h"
24#include "log.h"
25#include "peer-io.h"
26#include "peer-mgr.h"
27#include "peer-msgs.h"
28#include "session.h"
29#include "torrent.h"
30#include "torrent-magnet.h"
31#include "tr-dht.h"
32#include "utils.h"
33#include "variant.h"
34#include "version.h"
35
36#ifndef EBADMSG
37 #define EBADMSG EINVAL
38#endif
39
40/**
41***
42**/
43
44enum
45{
46  BT_CHOKE                = 0,
47  BT_UNCHOKE              = 1,
48  BT_INTERESTED           = 2,
49  BT_NOT_INTERESTED       = 3,
50  BT_HAVE                 = 4,
51  BT_BITFIELD             = 5,
52  BT_REQUEST              = 6,
53  BT_PIECE                = 7,
54  BT_CANCEL               = 8,
55  BT_PORT                 = 9,
56
57  BT_FEXT_SUGGEST         = 13,
58  BT_FEXT_HAVE_ALL        = 14,
59  BT_FEXT_HAVE_NONE       = 15,
60  BT_FEXT_REJECT          = 16,
61  BT_FEXT_ALLOWED_FAST    = 17,
62
63  BT_LTEP                 = 20,
64
65  LTEP_HANDSHAKE          = 0,
66
67  UT_PEX_ID               = 1,
68  UT_METADATA_ID          = 3,
69
70  MAX_PEX_PEER_COUNT      = 50,
71
72  MIN_CHOKE_PERIOD_SEC    = 10,
73
74  /* idle seconds before we send a keepalive */
75  KEEPALIVE_INTERVAL_SECS = 100,
76
77  PEX_INTERVAL_SECS       = 90, /* sec between sendPex () calls */
78
79  REQQ                    = 512,
80
81  METADATA_REQQ           = 64,
82
83  MAGIC_NUMBER            = 21549,
84
85  /* used in lowering the outMessages queue period */
86  IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
87  HIGH_PRIORITY_INTERVAL_SECS = 2,
88  LOW_PRIORITY_INTERVAL_SECS = 10,
89
90  /* number of pieces we'll allow in our fast set */
91  MAX_FAST_SET_SIZE = 3,
92
93  /* how many blocks to keep prefetched per peer */
94  PREFETCH_SIZE = 18,
95
96  /* when we're making requests from another peer,
97     batch them together to send enough requests to
98     meet our bandwidth goals for the next N seconds */
99  REQUEST_BUF_SECS = 10,
100
101  /* defined in BEP #9 */
102  METADATA_MSG_TYPE_REQUEST = 0,
103  METADATA_MSG_TYPE_DATA = 1,
104  METADATA_MSG_TYPE_REJECT = 2
105};
106
107enum
108{
109  AWAITING_BT_LENGTH,
110  AWAITING_BT_ID,
111  AWAITING_BT_MESSAGE,
112  AWAITING_BT_PIECE
113};
114
115typedef enum
116{
117  ENCRYPTION_PREFERENCE_UNKNOWN,
118  ENCRYPTION_PREFERENCE_YES,
119  ENCRYPTION_PREFERENCE_NO
120}
121encryption_preference_t;
122
123/**
124***
125**/
126
127struct peer_request
128{
129  uint32_t index;
130  uint32_t offset;
131  uint32_t length;
132};
133
134static void
135blockToReq (const tr_torrent     * tor,
136            tr_block_index_t       block,
137            struct peer_request  * setme)
138{
139  tr_torrentGetBlockLocation (tor, block, &setme->index,
140                                          &setme->offset,
141                                          &setme->length);
142}
143
144/**
145***
146**/
147
148/* this is raw, unchanged data from the peer regarding
149 * the current message that it's sending us. */
150struct tr_incoming
151{
152  uint8_t                id;
153  uint32_t               length; /* includes the +1 for id length */
154  struct peer_request    blockReq; /* metadata for incoming blocks */
155  struct evbuffer      * block; /* piece data for incoming blocks */
156};
157
158/**
159 * Low-level communication state information about a connected peer.
160 *
161 * This structure remembers the low-level protocol states that we're
162 * in with this peer, such as active requests, pex messages, and so on.
163 * Its fields are all private to peer-msgs.c.
164 *
165 * Data not directly involved with sending & receiving messages is
166 * stored in tr_peer, where it can be accessed by both peermsgs and
167 * the peer manager.
168 *
169 * @see struct peer_atom
170 * @see tr_peer
171 */
172struct tr_peerMsgs
173{
174  struct tr_peer peer; /* parent */
175
176  uint16_t magic_number;
177
178  /* Whether or not we've choked this peer. */
179  bool peer_is_choked;
180
181  /* whether or not the peer has indicated it will download from us. */
182  bool peer_is_interested;
183
184  /* whether or not the peer is choking us. */
185  bool client_is_choked;
186
187  /* whether or not we've indicated to the peer that we would download from them if unchoked. */
188  bool client_is_interested;
189
190
191  bool peerSupportsPex;
192  bool peerSupportsMetadataXfer;
193  bool clientSentLtepHandshake;
194  bool peerSentLtepHandshake;
195
196  /*bool haveFastSet;*/
197
198  int desiredRequestCount;
199
200  int prefetchCount;
201
202  int is_active[2];
203
204  /* how long the outMessages batch should be allowed to grow before
205   * it's flushed -- some messages (like requests >:) should be sent
206   * very quickly; others aren't as urgent. */
207  int8_t          outMessagesBatchPeriod;
208
209  uint8_t         state;
210  uint8_t         ut_pex_id;
211  uint8_t         ut_metadata_id;
212  uint16_t        pexCount;
213  uint16_t        pexCount6;
214
215  tr_port         dht_port;
216
217  encryption_preference_t  encryption_preference;
218
219  size_t                   metadata_size_hint;
220#if 0
221  size_t                 fastsetSize;
222  tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
223#endif
224
225  tr_torrent *           torrent;
226
227  tr_peer_callback        callback;
228  void                  * callbackData;
229
230  struct evbuffer *      outMessages; /* all the non-piece messages */
231
232  struct peer_request    peerAskedFor[REQQ];
233
234  int peerAskedForMetadata[METADATA_REQQ];
235  int peerAskedForMetadataCount;
236
237  tr_pex * pex;
238  tr_pex * pex6;
239
240  /*time_t clientSentPexAt;*/
241  time_t clientSentAnythingAt;
242
243  time_t chokeChangedAt;
244
245  /* when we started batching the outMessages */
246  time_t outMessagesBatchedAt;
247
248  struct tr_incoming    incoming;
249
250  /* if the peer supports the Extension Protocol in BEP 10 and
251     supplied a reqq argument, it's stored here. Otherwise, the
252     value is zero and should be ignored. */
253  int64_t reqq;
254
255  struct event * pexTimer;
256
257  struct tr_peerIo * io;
258};
259
260/**
261***
262**/
263
264static inline tr_session*
265getSession (struct tr_peerMsgs * msgs)
266{
267  return msgs->torrent->session;
268}
269
270/**
271***
272**/
273
274static void
275myDebug (const char * file, int line,
276         const struct tr_peerMsgs * msgs,
277         const char * fmt, ...) TR_GNUC_PRINTF(4, 5);
278
279static void
280myDebug (const char * file, int line,
281         const struct tr_peerMsgs * msgs,
282         const char * fmt, ...)
283{
284  const tr_sys_file_t fp = tr_logGetFile ();
285
286  if (fp != TR_BAD_SYS_FILE)
287    {
288      va_list           args;
289      char              timestr[64];
290      struct evbuffer * buf = evbuffer_new ();
291      char *            base = tr_sys_path_basename (file, NULL);
292      char *            message;
293
294      evbuffer_add_printf (buf, "[%s] %s - %s [%s]: ",
295                           tr_logGetTimeStr (timestr, sizeof (timestr)),
296                           tr_torrentName (msgs->torrent),
297                           tr_peerIoGetAddrStr (msgs->io),
298                           tr_quark_get_string (msgs->peer.client, NULL));
299      va_start (args, fmt);
300      evbuffer_add_vprintf (buf, fmt, args);
301      va_end (args);
302      evbuffer_add_printf (buf, " (%s:%d)", base, line);
303
304      message = evbuffer_free_to_str (buf, NULL);
305      tr_sys_file_write_line (fp, message, NULL);
306
307      tr_free (base);
308      tr_free (message);
309    }
310}
311
312#define dbgmsg(msgs, ...) \
313  do \
314    { \
315      if (tr_logGetDeepEnabled ()) \
316        myDebug (__FILE__, __LINE__, msgs, __VA_ARGS__); \
317    } \
318  while (0)
319
320/**
321***
322**/
323
324static void
325pokeBatchPeriod (tr_peerMsgs * msgs, int interval)
326{
327  if (msgs->outMessagesBatchPeriod > interval)
328    {
329      msgs->outMessagesBatchPeriod = interval;
330      dbgmsg (msgs, "lowering batch interval to %d seconds", interval);
331    }
332}
333
334static void
335dbgOutMessageLen (tr_peerMsgs * msgs)
336{
337  dbgmsg (msgs, "outMessage size is now %zu", evbuffer_get_length (msgs->outMessages));
338}
339
340static void
341protocolSendReject (tr_peerMsgs * msgs, const struct peer_request * req)
342{
343  struct evbuffer * out = msgs->outMessages;
344
345  assert (tr_peerIoSupportsFEXT (msgs->io));
346
347  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
348  evbuffer_add_uint8 (out, BT_FEXT_REJECT);
349  evbuffer_add_uint32 (out, req->index);
350  evbuffer_add_uint32 (out, req->offset);
351  evbuffer_add_uint32 (out, req->length);
352
353  dbgmsg (msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length);
354  dbgOutMessageLen (msgs);
355}
356
357static void
358protocolSendRequest (tr_peerMsgs * msgs, const struct peer_request * req)
359{
360  struct evbuffer * out = msgs->outMessages;
361
362  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
363  evbuffer_add_uint8 (out, BT_REQUEST);
364  evbuffer_add_uint32 (out, req->index);
365  evbuffer_add_uint32 (out, req->offset);
366  evbuffer_add_uint32 (out, req->length);
367
368  dbgmsg (msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length);
369  dbgOutMessageLen (msgs);
370  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
371}
372
373static void
374protocolSendCancel (tr_peerMsgs * msgs, const struct peer_request * req)
375{
376  struct evbuffer * out = msgs->outMessages;
377
378  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
379  evbuffer_add_uint8 (out, BT_CANCEL);
380  evbuffer_add_uint32 (out, req->index);
381  evbuffer_add_uint32 (out, req->offset);
382  evbuffer_add_uint32 (out, req->length);
383
384  dbgmsg (msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length);
385  dbgOutMessageLen (msgs);
386  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
387}
388
389static void
390protocolSendPort (tr_peerMsgs *msgs, uint16_t port)
391{
392  struct evbuffer * out = msgs->outMessages;
393
394  dbgmsg (msgs, "sending Port %u", port);
395  evbuffer_add_uint32 (out, 3);
396  evbuffer_add_uint8 (out, BT_PORT);
397  evbuffer_add_uint16 (out, port);
398}
399
400static void
401protocolSendHave (tr_peerMsgs * msgs, uint32_t index)
402{
403  struct evbuffer * out = msgs->outMessages;
404
405  evbuffer_add_uint32 (out, sizeof (uint8_t) + sizeof (uint32_t));
406  evbuffer_add_uint8 (out, BT_HAVE);
407  evbuffer_add_uint32 (out, index);
408
409  dbgmsg (msgs, "sending Have %u", index);
410  dbgOutMessageLen (msgs);
411  pokeBatchPeriod (msgs, LOW_PRIORITY_INTERVAL_SECS);
412}
413
414#if 0
415static void
416protocolSendAllowedFast (tr_peerMsgs * msgs, uint32_t pieceIndex)
417{
418  tr_peerIo       * io  = msgs->io;
419  struct evbuffer * out = msgs->outMessages;
420
421  assert (tr_peerIoSupportsFEXT (msgs->io));
422
423  evbuffer_add_uint32 (io, out, sizeof (uint8_t) + sizeof (uint32_t));
424  evbuffer_add_uint8 (io, out, BT_FEXT_ALLOWED_FAST);
425  evbuffer_add_uint32 (io, out, pieceIndex);
426
427  dbgmsg (msgs, "sending Allowed Fast %u...", pieceIndex);
428  dbgOutMessageLen (msgs);
429}
430#endif
431
432static void
433protocolSendChoke (tr_peerMsgs * msgs, int choke)
434{
435  struct evbuffer * out = msgs->outMessages;
436
437  evbuffer_add_uint32 (out, sizeof (uint8_t));
438  evbuffer_add_uint8 (out, choke ? BT_CHOKE : BT_UNCHOKE);
439
440  dbgmsg (msgs, "sending %s...", choke ? "Choke" : "Unchoke");
441  dbgOutMessageLen (msgs);
442  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
443}
444
445static void
446protocolSendHaveAll (tr_peerMsgs * msgs)
447{
448  struct evbuffer * out = msgs->outMessages;
449
450  assert (tr_peerIoSupportsFEXT (msgs->io));
451
452  evbuffer_add_uint32 (out, sizeof (uint8_t));
453  evbuffer_add_uint8 (out, BT_FEXT_HAVE_ALL);
454
455  dbgmsg (msgs, "sending HAVE_ALL...");
456  dbgOutMessageLen (msgs);
457  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
458}
459
460static void
461protocolSendHaveNone (tr_peerMsgs * msgs)
462{
463  struct evbuffer * out = msgs->outMessages;
464
465  assert (tr_peerIoSupportsFEXT (msgs->io));
466
467  evbuffer_add_uint32 (out, sizeof (uint8_t));
468  evbuffer_add_uint8 (out, BT_FEXT_HAVE_NONE);
469
470  dbgmsg (msgs, "sending HAVE_NONE...");
471  dbgOutMessageLen (msgs);
472  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
473}
474
475/**
476***  EVENTS
477**/
478
479static void
480publish (tr_peerMsgs * msgs, tr_peer_event * e)
481{
482  if (msgs->callback != NULL)
483    (*msgs->callback) (&msgs->peer, e, msgs->callbackData);
484}
485
486static void
487fireError (tr_peerMsgs * msgs, int err)
488{
489  tr_peer_event e = TR_PEER_EVENT_INIT;
490  e.eventType = TR_PEER_ERROR;
491  e.err = err;
492  publish (msgs, &e);
493}
494
495static void
496fireGotBlock (tr_peerMsgs * msgs, const struct peer_request * req)
497{
498  tr_peer_event e = TR_PEER_EVENT_INIT;
499  e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
500  e.pieceIndex = req->index;
501  e.offset = req->offset;
502  e.length = req->length;
503  publish (msgs, &e);
504}
505
506static void
507fireGotRej (tr_peerMsgs * msgs, const struct peer_request * req)
508{
509  tr_peer_event e = TR_PEER_EVENT_INIT;
510  e.eventType = TR_PEER_CLIENT_GOT_REJ;
511  e.pieceIndex = req->index;
512  e.offset = req->offset;
513  e.length = req->length;
514  publish (msgs, &e);
515}
516
517static void
518fireGotChoke (tr_peerMsgs * msgs)
519{
520  tr_peer_event e = TR_PEER_EVENT_INIT;
521  e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
522  publish (msgs, &e);
523}
524
525static void
526fireClientGotHaveAll (tr_peerMsgs * msgs)
527{
528  tr_peer_event e = TR_PEER_EVENT_INIT;
529  e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
530  publish (msgs, &e);
531}
532
533static void
534fireClientGotHaveNone (tr_peerMsgs * msgs)
535{
536  tr_peer_event e = TR_PEER_EVENT_INIT;
537  e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
538  publish (msgs, &e);
539}
540
541static void
542fireClientGotPieceData (tr_peerMsgs * msgs, uint32_t length)
543{
544  tr_peer_event e = TR_PEER_EVENT_INIT;
545  e.length = length;
546  e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA;
547  publish (msgs, &e);
548}
549
550static void
551firePeerGotPieceData (tr_peerMsgs * msgs, uint32_t length)
552{
553  tr_peer_event e = TR_PEER_EVENT_INIT;
554  e.length = length;
555  e.eventType = TR_PEER_PEER_GOT_PIECE_DATA;
556  publish (msgs, &e);
557}
558
559static void
560fireClientGotSuggest (tr_peerMsgs * msgs, uint32_t pieceIndex)
561{
562  tr_peer_event e = TR_PEER_EVENT_INIT;
563  e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
564  e.pieceIndex = pieceIndex;
565  publish (msgs, &e);
566}
567
568static void
569fireClientGotPort (tr_peerMsgs * msgs, tr_port port)
570{
571  tr_peer_event e = TR_PEER_EVENT_INIT;
572  e.eventType = TR_PEER_CLIENT_GOT_PORT;
573  e.port = port;
574  publish (msgs, &e);
575}
576
577static void
578fireClientGotAllowedFast (tr_peerMsgs * msgs, uint32_t pieceIndex)
579{
580  tr_peer_event e = TR_PEER_EVENT_INIT;
581  e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
582  e.pieceIndex = pieceIndex;
583  publish (msgs, &e);
584}
585
586static void
587fireClientGotBitfield (tr_peerMsgs * msgs, tr_bitfield * bitfield)
588{
589  tr_peer_event e = TR_PEER_EVENT_INIT;
590  e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
591  e.bitfield = bitfield;
592  publish (msgs, &e);
593}
594
595static void
596fireClientGotHave (tr_peerMsgs * msgs, tr_piece_index_t index)
597{
598  tr_peer_event e = TR_PEER_EVENT_INIT;
599  e.eventType = TR_PEER_CLIENT_GOT_HAVE;
600  e.pieceIndex = index;
601  publish (msgs, &e);
602}
603
604
605/**
606***  ALLOWED FAST SET
607***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
608**/
609
610#if 0
611size_t
612tr_generateAllowedSet (tr_piece_index_t * setmePieces,
613                       size_t             desiredSetSize,
614                       size_t             pieceCount,
615                       const uint8_t    * infohash,
616                       const tr_address * addr)
617{
618    size_t setSize = 0;
619
620    assert (setmePieces);
621    assert (desiredSetSize <= pieceCount);
622    assert (desiredSetSize);
623    assert (pieceCount);
624    assert (infohash);
625    assert (addr);
626
627    if (addr->type == TR_AF_INET)
628    {
629        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
630        uint8_t x[SHA_DIGEST_LENGTH];
631
632        uint32_t ui32 = ntohl (htonl (addr->addr.addr4.s_addr) & 0xffffff00);   /* (1) */
633        memcpy (w, &ui32, sizeof (uint32_t));
634        walk += sizeof (uint32_t);
635        memcpy (walk, infohash, SHA_DIGEST_LENGTH);                 /* (2) */
636        walk += SHA_DIGEST_LENGTH;
637        tr_sha1 (x, w, walk-w, NULL);                               /* (3) */
638        assert (sizeof (w) == walk-w);
639
640        while (setSize<desiredSetSize)
641        {
642            int i;
643            for (i=0; i<5 && setSize<desiredSetSize; ++i)           /* (4) */
644            {
645                size_t k;
646                uint32_t j = i * 4;                                  /* (5) */
647                uint32_t y = ntohl (* (uint32_t*)(x + j));       /* (6) */
648                uint32_t index = y % pieceCount;                     /* (7) */
649
650                for (k=0; k<setSize; ++k)                           /* (8) */
651                    if (setmePieces[k] == index)
652                        break;
653
654                if (k == setSize)
655                    setmePieces[setSize++] = index;                  /* (9) */
656            }
657
658            tr_sha1 (x, x, sizeof (x), NULL);                      /* (3) */
659        }
660    }
661
662    return setSize;
663}
664
665static void
666updateFastSet (tr_peerMsgs * msgs UNUSED)
667{
668    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
669    const int peerIsNeedy = msgs->peer->progress < 0.10;
670
671    if (fext && peerIsNeedy && !msgs->haveFastSet)
672    {
673        size_t i;
674        const struct tr_address * addr = tr_peerIoGetAddress (msgs->io, NULL);
675        const tr_info * inf = &msgs->torrent->info;
676        const size_t numwant = MIN (MAX_FAST_SET_SIZE, inf->pieceCount);
677
678        /* build the fast set */
679        msgs->fastsetSize = tr_generateAllowedSet (msgs->fastset, numwant, inf->pieceCount, inf->hash, addr);
680        msgs->haveFastSet = true;
681
682        /* send it to the peer */
683        for (i=0; i<msgs->fastsetSize; ++i)
684            protocolSendAllowedFast (msgs, msgs->fastset[i]);
685    }
686}
687#endif
688
689/***
690****  ACTIVE
691***/
692
693static bool
694tr_peerMsgsCalculateActive (const tr_peerMsgs * msgs, tr_direction direction)
695{
696  bool is_active;
697
698  assert (tr_isPeerMsgs (msgs));
699  assert (tr_isDirection (direction));
700
701  if (direction == TR_CLIENT_TO_PEER)
702    {
703      is_active = tr_peerMsgsIsPeerInterested (msgs)
704              && !tr_peerMsgsIsPeerChoked (msgs);
705
706      /* FIXME: https://trac.transmissionbt.com/ticket/5505
707      if (is_active)
708        assert (!tr_peerIsSeed (&msgs->peer));
709      */
710    }
711  else /* TR_PEER_TO_CLIENT */
712    {
713      if (!tr_torrentHasMetadata (msgs->torrent))
714        {
715          is_active = true;
716        }
717      else
718        {
719          is_active = tr_peerMsgsIsClientInterested (msgs)
720                  && !tr_peerMsgsIsClientChoked (msgs);
721
722          if (is_active)
723            assert (!tr_torrentIsSeed (msgs->torrent));
724        }
725    }
726
727  return is_active;
728}
729
730bool
731tr_peerMsgsIsActive (const tr_peerMsgs  * msgs, tr_direction direction)
732{
733  bool is_active;
734
735  assert (tr_isPeerMsgs (msgs));
736  assert (tr_isDirection (direction));
737
738  is_active = msgs->is_active[direction];
739
740  assert (is_active == tr_peerMsgsCalculateActive (msgs, direction));
741
742  return is_active;
743}
744
745static void
746tr_peerMsgsSetActive (tr_peerMsgs  * msgs,
747                      tr_direction   direction,
748                      bool           is_active)
749{
750  dbgmsg (msgs, "direction [%d] is_active [%d]", (int)direction, (int)is_active);
751
752  if (msgs->is_active[direction] != is_active)
753    {
754      msgs->is_active[direction] = is_active;
755
756      tr_swarmIncrementActivePeers (msgs->torrent->swarm, direction, is_active);
757    }
758}
759
760void
761tr_peerMsgsUpdateActive (tr_peerMsgs * msgs, tr_direction direction)
762{
763  const bool is_active = tr_peerMsgsCalculateActive (msgs, direction);
764
765  tr_peerMsgsSetActive (msgs, direction, is_active);
766}
767
768/**
769***  INTEREST
770**/
771
772static void
773sendInterest (tr_peerMsgs * msgs, bool b)
774{
775  struct evbuffer * out = msgs->outMessages;
776
777  assert (msgs);
778  assert (tr_isBool (b));
779
780  msgs->client_is_interested = b;
781  dbgmsg (msgs, "Sending %s", b ? "Interested" : "Not Interested");
782  evbuffer_add_uint32 (out, sizeof (uint8_t));
783  evbuffer_add_uint8 (out, b ? BT_INTERESTED : BT_NOT_INTERESTED);
784
785  pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
786  dbgOutMessageLen (msgs);
787}
788
789static void
790updateInterest (tr_peerMsgs * msgs UNUSED)
791{
792    /* FIXME -- might need to poke the mgr on startup */
793}
794
795void
796tr_peerMsgsSetInterested (tr_peerMsgs * msgs, bool b)
797{
798  assert (tr_isBool (b));
799
800  if (msgs->client_is_interested != b)
801    {
802      sendInterest (msgs, b);
803
804      tr_peerMsgsUpdateActive (msgs, TR_PEER_TO_CLIENT);
805    }
806}
807
808static bool
809popNextMetadataRequest (tr_peerMsgs * msgs, int * piece)
810{
811  if (msgs->peerAskedForMetadataCount == 0)
812    return false;
813
814  *piece = msgs->peerAskedForMetadata[0];
815
816  tr_removeElementFromArray (msgs->peerAskedForMetadata, 0, sizeof (int),
817                             msgs->peerAskedForMetadataCount--);
818
819  return true;
820}
821
822static bool
823popNextRequest (tr_peerMsgs * msgs, struct peer_request * setme)
824{
825  if (msgs->peer.pendingReqsToClient == 0)
826    return false;
827
828  *setme = msgs->peerAskedFor[0];
829
830  tr_removeElementFromArray (msgs->peerAskedFor,
831                             0,
832                             sizeof (struct peer_request),
833                             msgs->peer.pendingReqsToClient--);
834
835  return true;
836}
837
838static void
839cancelAllRequestsToClient (tr_peerMsgs * msgs)
840{
841  struct peer_request req;
842  const int mustSendCancel = tr_peerIoSupportsFEXT (msgs->io);
843
844  while (popNextRequest (msgs, &req))
845    if (mustSendCancel)
846      protocolSendReject (msgs, &req);
847}
848
849void
850tr_peerMsgsSetChoke (tr_peerMsgs * msgs, bool peer_is_choked)
851{
852  const time_t now = tr_time ();
853  const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
854
855  assert (msgs != NULL);
856  assert (tr_isBool (peer_is_choked));
857
858  if (msgs->chokeChangedAt > fibrillationTime)
859    {
860      dbgmsg (msgs, "Not changing choke to %d to avoid fibrillation", peer_is_choked);
861    }
862  else if (msgs->peer_is_choked != peer_is_choked)
863    {
864      msgs->peer_is_choked = peer_is_choked;
865      if (peer_is_choked)
866        cancelAllRequestsToClient (msgs);
867      protocolSendChoke (msgs, peer_is_choked);
868      msgs->chokeChangedAt = now;
869      tr_peerMsgsUpdateActive (msgs, TR_CLIENT_TO_PEER);
870    }
871}
872
873/**
874***
875**/
876
877void
878tr_peerMsgsHave (tr_peerMsgs * msgs, uint32_t index)
879{
880  protocolSendHave (msgs, index);
881
882  /* since we have more pieces now, we might not be interested in this peer */
883  updateInterest (msgs);
884}
885
886/**
887***
888**/
889
890static bool
891reqIsValid (const tr_peerMsgs * peer,
892            uint32_t            index,
893            uint32_t            offset,
894            uint32_t            length)
895{
896    return tr_torrentReqIsValid (peer->torrent, index, offset, length);
897}
898
899static bool
900requestIsValid (const tr_peerMsgs * msgs, const struct peer_request * req)
901{
902    return reqIsValid (msgs, req->index, req->offset, req->length);
903}
904
905void
906tr_peerMsgsCancel (tr_peerMsgs * msgs, tr_block_index_t block)
907{
908    struct peer_request req;
909/*fprintf (stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer);*/
910    blockToReq (msgs->torrent, block, &req);
911    protocolSendCancel (msgs, &req);
912}
913
914/**
915***
916**/
917
918static void
919sendLtepHandshake (tr_peerMsgs * msgs)
920{
921    tr_variant val;
922    bool allow_pex;
923    bool allow_metadata_xfer;
924    struct evbuffer * payload;
925    struct evbuffer * out = msgs->outMessages;
926    const unsigned char * ipv6 = tr_globalIPv6 ();
927    static tr_quark version_quark = 0;
928
929    if (msgs->clientSentLtepHandshake)
930        return;
931
932    if (!version_quark)
933      version_quark = tr_quark_new (TR_NAME " " USERAGENT_PREFIX, TR_BAD_SIZE);
934
935    dbgmsg (msgs, "sending an ltep handshake");
936    msgs->clientSentLtepHandshake = true;
937
938    /* decide if we want to advertise metadata xfer support (BEP 9) */
939    if (tr_torrentIsPrivate (msgs->torrent))
940        allow_metadata_xfer = false;
941    else
942        allow_metadata_xfer = true;
943
944    /* decide if we want to advertise pex support */
945    if (!tr_torrentAllowsPex (msgs->torrent))
946        allow_pex = false;
947    else if (msgs->peerSentLtepHandshake)
948        allow_pex = msgs->peerSupportsPex;
949    else
950        allow_pex = true;
951
952    tr_variantInitDict (&val, 8);
953    tr_variantDictAddInt (&val, TR_KEY_e, getSession (msgs)->encryptionMode != TR_CLEAR_PREFERRED);
954    if (ipv6 != NULL)
955        tr_variantDictAddRaw (&val, TR_KEY_ipv6, ipv6, 16);
956    if (allow_metadata_xfer && tr_torrentHasMetadata (msgs->torrent)
957                            && (msgs->torrent->infoDictLength > 0))
958        tr_variantDictAddInt (&val, TR_KEY_metadata_size, msgs->torrent->infoDictLength);
959    tr_variantDictAddInt (&val, TR_KEY_p, tr_sessionGetPublicPeerPort (getSession (msgs)));
960    tr_variantDictAddInt (&val, TR_KEY_reqq, REQQ);
961    tr_variantDictAddInt (&val, TR_KEY_upload_only, tr_torrentIsSeed (msgs->torrent));
962    tr_variantDictAddQuark (&val, TR_KEY_v, version_quark);
963    if (allow_metadata_xfer || allow_pex) {
964        tr_variant * m  = tr_variantDictAddDict (&val, TR_KEY_m, 2);
965        if (allow_metadata_xfer)
966            tr_variantDictAddInt (m, TR_KEY_ut_metadata, UT_METADATA_ID);
967        if (allow_pex)
968            tr_variantDictAddInt (m, TR_KEY_ut_pex, UT_PEX_ID);
969    }
970
971    payload = tr_variantToBuf (&val, TR_VARIANT_FMT_BENC);
972
973    evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
974    evbuffer_add_uint8 (out, BT_LTEP);
975    evbuffer_add_uint8 (out, LTEP_HANDSHAKE);
976    evbuffer_add_buffer (out, payload);
977    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
978    dbgOutMessageLen (msgs);
979
980    /* cleanup */
981    evbuffer_free (payload);
982    tr_variantFree (&val);
983}
984
985static void
986parseLtepHandshake (tr_peerMsgs * msgs, uint32_t len, struct evbuffer * inbuf)
987{
988    int64_t   i;
989    tr_variant   val, * sub;
990    uint8_t * tmp = tr_new (uint8_t, len);
991    const uint8_t *addr;
992    size_t addr_len;
993    tr_pex pex;
994    int8_t seedProbability = -1;
995
996    memset (&pex, 0, sizeof (tr_pex));
997
998    tr_peerIoReadBytes (msgs->io, inbuf, tmp, len);
999    msgs->peerSentLtepHandshake = true;
1000
1001    if (tr_variantFromBenc (&val, tmp, len) || !tr_variantIsDict (&val))
1002    {
1003        dbgmsg (msgs, "GET  extended-handshake, couldn't get dictionary");
1004        tr_free (tmp);
1005        return;
1006    }
1007
1008    /* arbitrary limit, should be more than enough */
1009    if (len <= 4096)
1010      dbgmsg (msgs, "here is the handshake: [%*.*s]", (int) len, (int) len, tmp);
1011    else
1012      dbgmsg (msgs, "handshake length is too big (%" PRIu32 "), printing skipped", len);
1013
1014    /* does the peer prefer encrypted connections? */
1015    if (tr_variantDictFindInt (&val, TR_KEY_e, &i)) {
1016        msgs->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1017                                        : ENCRYPTION_PREFERENCE_NO;
1018        if (i)
1019            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
1020    }
1021
1022    /* check supported messages for utorrent pex */
1023    msgs->peerSupportsPex = false;
1024    msgs->peerSupportsMetadataXfer = false;
1025
1026    if (tr_variantDictFindDict (&val, TR_KEY_m, &sub)) {
1027        if (tr_variantDictFindInt (sub, TR_KEY_ut_pex, &i)) {
1028            msgs->peerSupportsPex = i != 0;
1029            msgs->ut_pex_id = (uint8_t) i;
1030            dbgmsg (msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id);
1031        }
1032        if (tr_variantDictFindInt (sub, TR_KEY_ut_metadata, &i)) {
1033            msgs->peerSupportsMetadataXfer = i != 0;
1034            msgs->ut_metadata_id = (uint8_t) i;
1035            dbgmsg (msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id);
1036        }
1037        if (tr_variantDictFindInt (sub, TR_KEY_ut_holepunch, &i)) {
1038            /* Mysterious µTorrent extension that we don't grok.  However,
1039               it implies support for µTP, so use it to indicate that. */
1040            tr_peerMgrSetUtpFailed (msgs->torrent,
1041                                    tr_peerIoGetAddress (msgs->io, NULL),
1042                                    false);
1043        }
1044    }
1045
1046    /* look for metainfo size (BEP 9) */
1047    if (tr_variantDictFindInt (&val, TR_KEY_metadata_size, &i)) {
1048        if (tr_torrentSetMetadataSizeHint (msgs->torrent, i))
1049            msgs->metadata_size_hint = (size_t) i;
1050    }
1051
1052    /* look for upload_only (BEP 21) */
1053    if (tr_variantDictFindInt (&val, TR_KEY_upload_only, &i))
1054        seedProbability = i==0 ? 0 : 100;
1055
1056    /* get peer's listening port */
1057    if (tr_variantDictFindInt (&val, TR_KEY_p, &i)) {
1058        pex.port = htons ((uint16_t)i);
1059        fireClientGotPort (msgs, pex.port);
1060        dbgmsg (msgs, "peer's port is now %d", (int)i);
1061    }
1062
1063    if (tr_peerIoIsIncoming (msgs->io)
1064        && tr_variantDictFindRaw (&val, TR_KEY_ipv4, &addr, &addr_len)
1065        && (addr_len == 4))
1066    {
1067        pex.addr.type = TR_AF_INET;
1068        memcpy (&pex.addr.addr.addr4, addr, 4);
1069        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
1070    }
1071
1072    if (tr_peerIoIsIncoming (msgs->io)
1073        && tr_variantDictFindRaw (&val, TR_KEY_ipv6, &addr, &addr_len)
1074        && (addr_len == 16))
1075    {
1076        pex.addr.type = TR_AF_INET6;
1077        memcpy (&pex.addr.addr.addr6, addr, 16);
1078        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
1079    }
1080
1081    /* get peer's maximum request queue size */
1082    if (tr_variantDictFindInt (&val, TR_KEY_reqq, &i))
1083        msgs->reqq = i;
1084
1085    tr_variantFree (&val);
1086    tr_free (tmp);
1087}
1088
1089static void
1090parseUtMetadata (tr_peerMsgs * msgs, uint32_t msglen, struct evbuffer * inbuf)
1091{
1092    tr_variant dict;
1093    char * msg_end;
1094    const char * benc_end;
1095    int64_t msg_type = -1;
1096    int64_t piece = -1;
1097    int64_t total_size = 0;
1098    uint8_t * tmp = tr_new (uint8_t, msglen);
1099
1100    tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1101    msg_end = (char*)tmp + msglen;
1102
1103    if (!tr_variantFromBencFull (&dict, tmp, msglen, NULL, &benc_end))
1104    {
1105        tr_variantDictFindInt (&dict, TR_KEY_msg_type, &msg_type);
1106        tr_variantDictFindInt (&dict, TR_KEY_piece, &piece);
1107        tr_variantDictFindInt (&dict, TR_KEY_total_size, &total_size);
1108        tr_variantFree (&dict);
1109    }
1110
1111    dbgmsg (msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
1112          (int)msg_type, (int)piece, (int)total_size);
1113
1114    if (msg_type == METADATA_MSG_TYPE_REJECT)
1115    {
1116        /* NOOP */
1117    }
1118
1119    if ((msg_type == METADATA_MSG_TYPE_DATA)
1120        && (!tr_torrentHasMetadata (msgs->torrent))
1121        && (msg_end - benc_end <= METADATA_PIECE_SIZE)
1122        && (piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size))
1123    {
1124        const int pieceLen = msg_end - benc_end;
1125        tr_torrentSetMetadataPiece (msgs->torrent, piece, benc_end, pieceLen);
1126    }
1127
1128    if (msg_type == METADATA_MSG_TYPE_REQUEST)
1129    {
1130        if ((piece >= 0)
1131            && tr_torrentHasMetadata (msgs->torrent)
1132            && !tr_torrentIsPrivate (msgs->torrent)
1133            && (msgs->peerAskedForMetadataCount < METADATA_REQQ))
1134        {
1135            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1136        }
1137        else
1138        {
1139            tr_variant tmp;
1140            struct evbuffer * payload;
1141            struct evbuffer * out = msgs->outMessages;
1142
1143            /* build the rejection message */
1144            tr_variantInitDict (&tmp, 2);
1145            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
1146            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1147            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1148
1149            /* write it out as a LTEP message to our outMessages buffer */
1150            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1151            evbuffer_add_uint8 (out, BT_LTEP);
1152            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1153            evbuffer_add_buffer (out, payload);
1154            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1155            dbgOutMessageLen (msgs);
1156
1157            /* cleanup */
1158            evbuffer_free (payload);
1159            tr_variantFree (&tmp);
1160        }
1161    }
1162
1163    tr_free (tmp);
1164}
1165
1166static void
1167parseUtPex (tr_peerMsgs * msgs, uint32_t msglen, struct evbuffer * inbuf)
1168{
1169    int loaded = 0;
1170    uint8_t * tmp = tr_new (uint8_t, msglen);
1171    tr_variant val;
1172    tr_torrent * tor = msgs->torrent;
1173    const uint8_t * added;
1174    size_t added_len;
1175
1176    tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1177
1178    if (tr_torrentAllowsPex (tor)
1179      && ((loaded = !tr_variantFromBenc (&val, tmp, msglen))))
1180    {
1181        if (tr_variantDictFindRaw (&val, TR_KEY_added, &added, &added_len))
1182        {
1183            tr_pex * pex;
1184            size_t i, n;
1185            size_t added_f_len = 0;
1186            const uint8_t * added_f = NULL;
1187
1188            tr_variantDictFindRaw (&val, TR_KEY_added_f, &added_f, &added_f_len);
1189            pex = tr_peerMgrCompactToPex (added, added_len, added_f, added_f_len, &n);
1190
1191            n = MIN (n, MAX_PEX_PEER_COUNT);
1192            for (i=0; i<n; ++i)
1193            {
1194                int seedProbability = -1;
1195                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1196                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1197            }
1198
1199            tr_free (pex);
1200        }
1201
1202        if (tr_variantDictFindRaw (&val, TR_KEY_added6, &added, &added_len))
1203        {
1204            tr_pex * pex;
1205            size_t i, n;
1206            size_t added_f_len = 0;
1207            const uint8_t * added_f = NULL;
1208
1209            tr_variantDictFindRaw (&val, TR_KEY_added6_f, &added_f, &added_f_len);
1210            pex = tr_peerMgrCompact6ToPex (added, added_len, added_f, added_f_len, &n);
1211
1212            n = MIN (n, MAX_PEX_PEER_COUNT);
1213            for (i=0; i<n; ++i)
1214            {
1215                int seedProbability = -1;
1216                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1217                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1218            }
1219
1220            tr_free (pex);
1221        }
1222    }
1223
1224    if (loaded)
1225        tr_variantFree (&val);
1226    tr_free (tmp);
1227}
1228
1229static void sendPex (tr_peerMsgs * msgs);
1230
1231static void
1232parseLtep (tr_peerMsgs * msgs, uint32_t msglen, struct evbuffer * inbuf)
1233{
1234    uint8_t ltep_msgid;
1235
1236    assert (msglen > 0);
1237
1238    tr_peerIoReadUint8 (msgs->io, inbuf, &ltep_msgid);
1239    msglen--;
1240
1241    if (ltep_msgid == LTEP_HANDSHAKE)
1242    {
1243        dbgmsg (msgs, "got ltep handshake");
1244        parseLtepHandshake (msgs, msglen, inbuf);
1245        if (tr_peerIoSupportsLTEP (msgs->io))
1246        {
1247            sendLtepHandshake (msgs);
1248            sendPex (msgs);
1249        }
1250    }
1251    else if (ltep_msgid == UT_PEX_ID)
1252    {
1253        dbgmsg (msgs, "got ut pex");
1254        msgs->peerSupportsPex = true;
1255        parseUtPex (msgs, msglen, inbuf);
1256    }
1257    else if (ltep_msgid == UT_METADATA_ID)
1258    {
1259        dbgmsg (msgs, "got ut metadata");
1260        msgs->peerSupportsMetadataXfer = true;
1261        parseUtMetadata (msgs, msglen, inbuf);
1262    }
1263    else
1264    {
1265        dbgmsg (msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid);
1266        evbuffer_drain (inbuf, msglen);
1267    }
1268}
1269
1270static int
1271readBtLength (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1272{
1273    uint32_t len;
1274
1275    if (inlen < sizeof (len))
1276        return READ_LATER;
1277
1278    tr_peerIoReadUint32 (msgs->io, inbuf, &len);
1279
1280    if (len == 0) /* peer sent us a keepalive message */
1281        dbgmsg (msgs, "got KeepAlive");
1282    else
1283    {
1284        msgs->incoming.length = len;
1285        msgs->state = AWAITING_BT_ID;
1286    }
1287
1288    return READ_NOW;
1289}
1290
1291static int readBtMessage (tr_peerMsgs *, struct evbuffer *, size_t);
1292
1293static int
1294readBtId (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1295{
1296    uint8_t id;
1297
1298    if (inlen < sizeof (uint8_t))
1299        return READ_LATER;
1300
1301    tr_peerIoReadUint8 (msgs->io, inbuf, &id);
1302    msgs->incoming.id = id;
1303    dbgmsg (msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length);
1304
1305    if (id == BT_PIECE)
1306    {
1307        msgs->state = AWAITING_BT_PIECE;
1308        return READ_NOW;
1309    }
1310    else if (msgs->incoming.length != 1)
1311    {
1312        msgs->state = AWAITING_BT_MESSAGE;
1313        return READ_NOW;
1314    }
1315    else return readBtMessage (msgs, inbuf, inlen - 1);
1316}
1317
1318static void
1319updatePeerProgress (tr_peerMsgs * msgs)
1320{
1321  tr_peerUpdateProgress (msgs->torrent, &msgs->peer);
1322
1323  /*updateFastSet (msgs);*/
1324  updateInterest (msgs);
1325}
1326
1327static void
1328prefetchPieces (tr_peerMsgs *msgs)
1329{
1330  int i;
1331
1332  if (!getSession (msgs)->isPrefetchEnabled)
1333    return;
1334
1335  for (i=msgs->prefetchCount; i<msgs->peer.pendingReqsToClient && i<PREFETCH_SIZE; ++i)
1336    {
1337      const struct peer_request * req = msgs->peerAskedFor + i;
1338      if (requestIsValid (msgs, req))
1339        {
1340          tr_cachePrefetchBlock (getSession (msgs)->cache, msgs->torrent, req->index, req->offset, req->length);
1341          ++msgs->prefetchCount;
1342        }
1343    }
1344}
1345
1346static void
1347peerMadeRequest (tr_peerMsgs * msgs, const struct peer_request * req)
1348{
1349    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1350    const int reqIsValid = requestIsValid (msgs, req);
1351    const int clientHasPiece = reqIsValid && tr_torrentPieceIsComplete (msgs->torrent, req->index);
1352    const int peerIsChoked = msgs->peer_is_choked;
1353
1354    bool allow = false;
1355
1356    if (!reqIsValid)
1357        dbgmsg (msgs, "rejecting an invalid request.");
1358    else if (!clientHasPiece)
1359        dbgmsg (msgs, "rejecting request for a piece we don't have.");
1360    else if (peerIsChoked)
1361        dbgmsg (msgs, "rejecting request from choked peer");
1362    else if (msgs->peer.pendingReqsToClient + 1 >= REQQ)
1363        dbgmsg (msgs, "rejecting request ... reqq is full");
1364    else
1365        allow = true;
1366
1367    if (allow) {
1368        msgs->peerAskedFor[msgs->peer.pendingReqsToClient++] = *req;
1369        prefetchPieces (msgs);
1370    } else if (fext) {
1371        protocolSendReject (msgs, req);
1372    }
1373}
1374
1375static bool
1376messageLengthIsCorrect (const tr_peerMsgs * msg, uint8_t id, uint32_t len)
1377{
1378    switch (id)
1379    {
1380        case BT_CHOKE:
1381        case BT_UNCHOKE:
1382        case BT_INTERESTED:
1383        case BT_NOT_INTERESTED:
1384        case BT_FEXT_HAVE_ALL:
1385        case BT_FEXT_HAVE_NONE:
1386            return len == 1;
1387
1388        case BT_HAVE:
1389        case BT_FEXT_SUGGEST:
1390        case BT_FEXT_ALLOWED_FAST:
1391            return len == 5;
1392
1393        case BT_BITFIELD:
1394            if (tr_torrentHasMetadata (msg->torrent))
1395                return len == (msg->torrent->info.pieceCount >> 3) + (msg->torrent->info.pieceCount & 7 ? 1 : 0) + 1u;
1396            /* we don't know the piece count yet,
1397               so we can only guess whether to send true or false */
1398            if (msg->metadata_size_hint > 0)
1399                return len <= msg->metadata_size_hint;
1400            return true;
1401
1402        case BT_REQUEST:
1403        case BT_CANCEL:
1404        case BT_FEXT_REJECT:
1405            return len == 13;
1406
1407        case BT_PIECE:
1408            return len > 9 && len <= 16393;
1409
1410        case BT_PORT:
1411            return len == 3;
1412
1413        case BT_LTEP:
1414            return len >= 2;
1415
1416        default:
1417            return false;
1418    }
1419}
1420
1421static int clientGotBlock (tr_peerMsgs *               msgs,
1422                           struct evbuffer *           block,
1423                           const struct peer_request * req);
1424
1425static int
1426readBtPiece (tr_peerMsgs      * msgs,
1427             struct evbuffer  * inbuf,
1428             size_t             inlen,
1429             size_t           * setme_piece_bytes_read)
1430{
1431    struct peer_request * req = &msgs->incoming.blockReq;
1432
1433    assert (evbuffer_get_length (inbuf) >= inlen);
1434    dbgmsg (msgs, "In readBtPiece");
1435
1436    if (!req->length)
1437    {
1438        if (inlen < 8)
1439            return READ_LATER;
1440
1441        tr_peerIoReadUint32 (msgs->io, inbuf, &req->index);
1442        tr_peerIoReadUint32 (msgs->io, inbuf, &req->offset);
1443        req->length = msgs->incoming.length - 9;
1444        dbgmsg (msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length);
1445        return READ_NOW;
1446    }
1447    else
1448    {
1449        int err;
1450        size_t n;
1451        size_t nLeft;
1452        struct evbuffer * block_buffer;
1453
1454        if (msgs->incoming.block == NULL)
1455            msgs->incoming.block = evbuffer_new ();
1456        block_buffer = msgs->incoming.block;
1457
1458        /* read in another chunk of data */
1459        nLeft = req->length - evbuffer_get_length (block_buffer);
1460        n = MIN (nLeft, inlen);
1461
1462        tr_peerIoReadBytesToBuf (msgs->io, inbuf, block_buffer, n);
1463
1464        fireClientGotPieceData (msgs, n);
1465        *setme_piece_bytes_read += n;
1466        dbgmsg (msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1467               n, req->index, req->offset, req->length,
1468             (int)(req->length - evbuffer_get_length (block_buffer)));
1469        if (evbuffer_get_length (block_buffer) < req->length)
1470            return READ_LATER;
1471
1472        /* pass the block along... */
1473        err = clientGotBlock (msgs, block_buffer, req);
1474        evbuffer_drain (block_buffer, evbuffer_get_length (block_buffer));
1475
1476        /* cleanup */
1477        req->length = 0;
1478        msgs->state = AWAITING_BT_LENGTH;
1479        return err ? READ_ERR : READ_NOW;
1480    }
1481}
1482
1483static void updateDesiredRequestCount (tr_peerMsgs * msgs);
1484
1485static int
1486readBtMessage (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1487{
1488    uint32_t      ui32;
1489    uint32_t      msglen = msgs->incoming.length;
1490    const uint8_t id = msgs->incoming.id;
1491#ifndef NDEBUG
1492    const size_t  startBufLen = evbuffer_get_length (inbuf);
1493#endif
1494    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1495
1496    assert (msglen > 0);
1497
1498    --msglen; /* id length */
1499
1500    dbgmsg (msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen);
1501
1502    if (inlen < msglen)
1503        return READ_LATER;
1504
1505    if (!messageLengthIsCorrect (msgs, id, msglen + 1))
1506    {
1507        dbgmsg (msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen);
1508        fireError (msgs, EMSGSIZE);
1509        return READ_ERR;
1510    }
1511
1512    switch (id)
1513    {
1514        case BT_CHOKE:
1515            dbgmsg (msgs, "got Choke");
1516            msgs->client_is_choked = true;
1517            if (!fext)
1518                fireGotChoke (msgs);
1519            tr_peerMsgsUpdateActive (msgs, TR_PEER_TO_CLIENT);
1520            break;
1521
1522        case BT_UNCHOKE:
1523            dbgmsg (msgs, "got Unchoke");
1524            msgs->client_is_choked = false;
1525            tr_peerMsgsUpdateActive (msgs, TR_PEER_TO_CLIENT);
1526            updateDesiredRequestCount (msgs);
1527            break;
1528
1529        case BT_INTERESTED:
1530            dbgmsg (msgs, "got Interested");
1531            msgs->peer_is_interested = true;
1532            tr_peerMsgsUpdateActive (msgs, TR_CLIENT_TO_PEER);
1533            break;
1534
1535        case BT_NOT_INTERESTED:
1536            dbgmsg (msgs, "got Not Interested");
1537            msgs->peer_is_interested = false;
1538            tr_peerMsgsUpdateActive (msgs, TR_CLIENT_TO_PEER);
1539            break;
1540
1541        case BT_HAVE:
1542            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1543            dbgmsg (msgs, "got Have: %u", ui32);
1544            if (tr_torrentHasMetadata (msgs->torrent)
1545                    && (ui32 >= msgs->torrent->info.pieceCount))
1546            {
1547                fireError (msgs, ERANGE);
1548                return READ_ERR;
1549            }
1550
1551            /* a peer can send the same HAVE message twice... */
1552            if (!tr_bitfieldHas (&msgs->peer.have, ui32)) {
1553                tr_bitfieldAdd (&msgs->peer.have, ui32);
1554                fireClientGotHave (msgs, ui32);
1555            }
1556            updatePeerProgress (msgs);
1557            break;
1558
1559        case BT_BITFIELD: {
1560            uint8_t * tmp = tr_new (uint8_t, msglen);
1561            dbgmsg (msgs, "got a bitfield");
1562            tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1563            tr_bitfieldSetRaw (&msgs->peer.have, tmp, msglen, tr_torrentHasMetadata (msgs->torrent));
1564            fireClientGotBitfield (msgs, &msgs->peer.have);
1565            updatePeerProgress (msgs);
1566            tr_free (tmp);
1567            break;
1568        }
1569
1570        case BT_REQUEST:
1571        {
1572            struct peer_request r;
1573            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1574            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1575            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1576            dbgmsg (msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length);
1577            peerMadeRequest (msgs, &r);
1578            break;
1579        }
1580
1581        case BT_CANCEL:
1582        {
1583            int i;
1584            struct peer_request r;
1585            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1586            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1587            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1588            tr_historyAdd (&msgs->peer.cancelsSentToClient, tr_time (), 1);
1589            dbgmsg (msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length);
1590
1591            for (i=0; i<msgs->peer.pendingReqsToClient; ++i) {
1592                const struct peer_request * req = msgs->peerAskedFor + i;
1593                if ((req->index == r.index) && (req->offset == r.offset) && (req->length == r.length))
1594                    break;
1595            }
1596
1597            if (i < msgs->peer.pendingReqsToClient)
1598                tr_removeElementFromArray (msgs->peerAskedFor, i, sizeof (struct peer_request),
1599                                           msgs->peer.pendingReqsToClient--);
1600            break;
1601        }
1602
1603        case BT_PIECE:
1604            assert (0); /* handled elsewhere! */
1605            break;
1606
1607        case BT_PORT:
1608            dbgmsg (msgs, "Got a BT_PORT");
1609            tr_peerIoReadUint16 (msgs->io, inbuf, &msgs->dht_port);
1610            if (msgs->dht_port > 0)
1611                tr_dhtAddNode (getSession (msgs),
1612                               tr_peerAddress (&msgs->peer),
1613                               msgs->dht_port, 0);
1614            break;
1615
1616        case BT_FEXT_SUGGEST:
1617            dbgmsg (msgs, "Got a BT_FEXT_SUGGEST");
1618            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1619            if (fext)
1620                fireClientGotSuggest (msgs, ui32);
1621            else {
1622                fireError (msgs, EMSGSIZE);
1623                return READ_ERR;
1624            }
1625            break;
1626
1627        case BT_FEXT_ALLOWED_FAST:
1628            dbgmsg (msgs, "Got a BT_FEXT_ALLOWED_FAST");
1629            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1630            if (fext)
1631                fireClientGotAllowedFast (msgs, ui32);
1632            else {
1633                fireError (msgs, EMSGSIZE);
1634                return READ_ERR;
1635            }
1636            break;
1637
1638        case BT_FEXT_HAVE_ALL:
1639            dbgmsg (msgs, "Got a BT_FEXT_HAVE_ALL");
1640            if (fext) {
1641                tr_bitfieldSetHasAll (&msgs->peer.have);
1642assert (tr_bitfieldHasAll (&msgs->peer.have));
1643                fireClientGotHaveAll (msgs);
1644                updatePeerProgress (msgs);
1645            } else {
1646                fireError (msgs, EMSGSIZE);
1647                return READ_ERR;
1648            }
1649            break;
1650
1651        case BT_FEXT_HAVE_NONE:
1652            dbgmsg (msgs, "Got a BT_FEXT_HAVE_NONE");
1653            if (fext) {
1654                tr_bitfieldSetHasNone (&msgs->peer.have);
1655                fireClientGotHaveNone (msgs);
1656                updatePeerProgress (msgs);
1657            } else {
1658                fireError (msgs, EMSGSIZE);
1659                return READ_ERR;
1660            }
1661            break;
1662
1663        case BT_FEXT_REJECT:
1664        {
1665            struct peer_request r;
1666            dbgmsg (msgs, "Got a BT_FEXT_REJECT");
1667            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1668            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1669            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1670            if (fext)
1671                fireGotRej (msgs, &r);
1672            else {
1673                fireError (msgs, EMSGSIZE);
1674                return READ_ERR;
1675            }
1676            break;
1677        }
1678
1679        case BT_LTEP:
1680            dbgmsg (msgs, "Got a BT_LTEP");
1681            parseLtep (msgs, msglen, inbuf);
1682            break;
1683
1684        default:
1685            dbgmsg (msgs, "peer sent us an UNKNOWN: %d", (int)id);
1686            tr_peerIoDrain (msgs->io, inbuf, msglen);
1687            break;
1688    }
1689
1690    assert (msglen + 1 == msgs->incoming.length);
1691    assert (evbuffer_get_length (inbuf) == startBufLen - msglen);
1692
1693    msgs->state = AWAITING_BT_LENGTH;
1694    return READ_NOW;
1695}
1696
1697/* returns 0 on success, or an errno on failure */
1698static int
1699clientGotBlock (tr_peerMsgs                * msgs,
1700                struct evbuffer            * data,
1701                const struct peer_request  * req)
1702{
1703    int err;
1704    tr_torrent * tor = msgs->torrent;
1705    const tr_block_index_t block = _tr_block (tor, req->index, req->offset);
1706
1707    assert (msgs);
1708    assert (req);
1709
1710    if (!requestIsValid (msgs, req)) {
1711        dbgmsg (msgs, "dropping invalid block %u:%u->%u",
1712                req->index, req->offset, req->length);
1713        return EBADMSG;
1714    }
1715
1716    if (req->length != tr_torBlockCountBytes (msgs->torrent, block)) {
1717        dbgmsg (msgs, "wrong block size -- expected %u, got %d",
1718                tr_torBlockCountBytes (msgs->torrent, block), req->length);
1719        return EMSGSIZE;
1720    }
1721
1722    dbgmsg (msgs, "got block %u:%u->%u", req->index, req->offset, req->length);
1723
1724    if (!tr_peerMgrDidPeerRequest (msgs->torrent, &msgs->peer, block)) {
1725        dbgmsg (msgs, "we didn't ask for this message...");
1726        return 0;
1727    }
1728    if (tr_torrentPieceIsComplete (msgs->torrent, req->index)) {
1729        dbgmsg (msgs, "we did ask for this message, but the piece is already complete...");
1730        return 0;
1731    }
1732
1733    /**
1734    ***  Save the block
1735    **/
1736
1737    if ((err = tr_cacheWriteBlock (getSession (msgs)->cache, tor, req->index, req->offset, req->length, data)))
1738        return err;
1739
1740    tr_bitfieldAdd (&msgs->peer.blame, req->index);
1741    fireGotBlock (msgs, req);
1742    return 0;
1743}
1744
1745static int peerPulse (void * vmsgs);
1746
1747static void
1748didWrite (tr_peerIo * io, size_t bytesWritten, bool wasPieceData, void * vmsgs)
1749{
1750    tr_peerMsgs * msgs = vmsgs;
1751
1752    if (wasPieceData)
1753      firePeerGotPieceData (msgs, bytesWritten);
1754
1755    if (tr_isPeerIo (io) && io->userData)
1756        peerPulse (msgs);
1757}
1758
1759static ReadState
1760canRead (tr_peerIo * io, void * vmsgs, size_t * piece)
1761{
1762    ReadState         ret;
1763    tr_peerMsgs *     msgs = vmsgs;
1764    struct evbuffer * in = tr_peerIoGetReadBuffer (io);
1765    const size_t      inlen = evbuffer_get_length (in);
1766
1767    dbgmsg (msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state);
1768
1769    if (!inlen)
1770    {
1771        ret = READ_LATER;
1772    }
1773    else if (msgs->state == AWAITING_BT_PIECE)
1774    {
1775        ret = readBtPiece (msgs, in, inlen, piece);
1776    }
1777    else switch (msgs->state)
1778    {
1779        case AWAITING_BT_LENGTH:
1780            ret = readBtLength (msgs, in, inlen); break;
1781
1782        case AWAITING_BT_ID:
1783            ret = readBtId   (msgs, in, inlen); break;
1784
1785        case AWAITING_BT_MESSAGE:
1786            ret = readBtMessage (msgs, in, inlen); break;
1787
1788        default:
1789            ret = READ_ERR;
1790            assert (0);
1791    }
1792
1793    dbgmsg (msgs, "canRead: ret is %d", (int)ret);
1794
1795    return ret;
1796}
1797
1798int
1799tr_peerMsgsIsReadingBlock (const tr_peerMsgs * msgs, tr_block_index_t block)
1800{
1801    if (msgs->state != AWAITING_BT_PIECE)
1802        return false;
1803
1804    return block == _tr_block (msgs->torrent,
1805                               msgs->incoming.blockReq.index,
1806                               msgs->incoming.blockReq.offset);
1807}
1808
1809/**
1810***
1811**/
1812
1813static void
1814updateDesiredRequestCount (tr_peerMsgs * msgs)
1815{
1816    tr_torrent * const torrent = msgs->torrent;
1817
1818    /* there are lots of reasons we might not want to request any blocks... */
1819    if (tr_torrentIsSeed (torrent) || !tr_torrentHasMetadata (torrent)
1820                                    || msgs->client_is_choked
1821                                    || !msgs->client_is_interested)
1822    {
1823        msgs->desiredRequestCount = 0;
1824    }
1825    else
1826    {
1827        int estimatedBlocksInPeriod;
1828        unsigned int rate_Bps;
1829        unsigned int irate_Bps;
1830        const int floor = 4;
1831        const int seconds = REQUEST_BUF_SECS;
1832        const uint64_t now = tr_time_msec ();
1833
1834        /* Get the rate limit we should use.
1835         * FIXME: this needs to consider all the other peers as well... */
1836        rate_Bps = tr_peerGetPieceSpeed_Bps (&msgs->peer, now, TR_PEER_TO_CLIENT);
1837        if (tr_torrentUsesSpeedLimit (torrent, TR_PEER_TO_CLIENT))
1838            rate_Bps = MIN (rate_Bps, tr_torrentGetSpeedLimit_Bps (torrent, TR_PEER_TO_CLIENT));
1839
1840        /* honor the session limits, if enabled */
1841        if (tr_torrentUsesSessionLimits (torrent) &&
1842            tr_sessionGetActiveSpeedLimit_Bps (torrent->session, TR_PEER_TO_CLIENT, &irate_Bps))
1843                rate_Bps = MIN (rate_Bps, irate_Bps);
1844
1845        /* use this desired rate to figure out how
1846         * many requests we should send to this peer */
1847        estimatedBlocksInPeriod = (rate_Bps * seconds) / torrent->blockSize;
1848        msgs->desiredRequestCount = MAX (floor, estimatedBlocksInPeriod);
1849
1850        /* honor the peer's maximum request count, if specified */
1851        if (msgs->reqq > 0)
1852            if (msgs->desiredRequestCount > msgs->reqq)
1853                msgs->desiredRequestCount = msgs->reqq;
1854    }
1855}
1856
1857static void
1858updateMetadataRequests (tr_peerMsgs * msgs, time_t now)
1859{
1860    int piece;
1861
1862    if (msgs->peerSupportsMetadataXfer
1863        && tr_torrentGetNextMetadataRequest (msgs->torrent, now, &piece))
1864    {
1865        tr_variant tmp;
1866        struct evbuffer * payload;
1867        struct evbuffer * out = msgs->outMessages;
1868
1869        /* build the data message */
1870        tr_variantInitDict (&tmp, 3);
1871        tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REQUEST);
1872        tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1873        payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1874
1875        dbgmsg (msgs, "requesting metadata piece #%d", piece);
1876
1877        /* write it out as a LTEP message to our outMessages buffer */
1878        evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1879        evbuffer_add_uint8 (out, BT_LTEP);
1880        evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1881        evbuffer_add_buffer (out, payload);
1882        pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1883        dbgOutMessageLen (msgs);
1884
1885        /* cleanup */
1886        evbuffer_free (payload);
1887        tr_variantFree (&tmp);
1888    }
1889}
1890
1891static void
1892updateBlockRequests (tr_peerMsgs * msgs)
1893{
1894    if (tr_torrentIsPieceTransferAllowed (msgs->torrent, TR_PEER_TO_CLIENT)
1895        && (msgs->desiredRequestCount > 0)
1896        && (msgs->peer.pendingReqsToPeer <= (msgs->desiredRequestCount * 0.66)))
1897    {
1898        int i;
1899        int n;
1900        tr_block_index_t * blocks;
1901        const int numwant = msgs->desiredRequestCount - msgs->peer.pendingReqsToPeer;
1902
1903        assert (tr_peerMsgsIsClientInterested (msgs));
1904        assert (!tr_peerMsgsIsClientChoked (msgs));
1905
1906        blocks = tr_new (tr_block_index_t, numwant);
1907        tr_peerMgrGetNextRequests (msgs->torrent, &msgs->peer, numwant, blocks, &n, false);
1908
1909        for (i=0; i<n; ++i)
1910        {
1911            struct peer_request req;
1912            blockToReq (msgs->torrent, blocks[i], &req);
1913            protocolSendRequest (msgs, &req);
1914        }
1915
1916        tr_free (blocks);
1917    }
1918}
1919
1920static size_t
1921fillOutputBuffer (tr_peerMsgs * msgs, time_t now)
1922{
1923    int piece;
1924    size_t bytesWritten = 0;
1925    struct peer_request req;
1926    const bool haveMessages = evbuffer_get_length (msgs->outMessages) != 0;
1927    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1928
1929    /**
1930    ***  Protocol messages
1931    **/
1932
1933    if (haveMessages && !msgs->outMessagesBatchedAt) /* fresh batch */
1934    {
1935        dbgmsg (msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length (msgs->outMessages));
1936        msgs->outMessagesBatchedAt = now;
1937    }
1938    else if (haveMessages && ((now - msgs->outMessagesBatchedAt) >= msgs->outMessagesBatchPeriod))
1939    {
1940        const size_t len = evbuffer_get_length (msgs->outMessages);
1941        /* flush the protocol messages */
1942        dbgmsg (msgs, "flushing outMessages... to %p (length is %zu)", (void*)msgs->io, len);
1943        tr_peerIoWriteBuf (msgs->io, msgs->outMessages, false);
1944        msgs->clientSentAnythingAt = now;
1945        msgs->outMessagesBatchedAt = 0;
1946        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1947        bytesWritten +=  len;
1948    }
1949
1950    /**
1951    ***  Metadata Pieces
1952    **/
1953
1954    if ((tr_peerIoGetWriteBufferSpace (msgs->io, now) >= METADATA_PIECE_SIZE)
1955        && popNextMetadataRequest (msgs, &piece))
1956    {
1957        char * data;
1958        size_t dataLen;
1959        bool ok = false;
1960
1961        data = tr_torrentGetMetadataPiece (msgs->torrent, piece, &dataLen);
1962        if (data != NULL)
1963        {
1964            tr_variant tmp;
1965            struct evbuffer * payload;
1966            struct evbuffer * out = msgs->outMessages;
1967
1968            /* build the data message */
1969            tr_variantInitDict (&tmp, 3);
1970            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_DATA);
1971            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1972            tr_variantDictAddInt (&tmp, TR_KEY_total_size, msgs->torrent->infoDictLength);
1973            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1974
1975            /* write it out as a LTEP message to our outMessages buffer */
1976            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload) + dataLen);
1977            evbuffer_add_uint8 (out, BT_LTEP);
1978            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1979            evbuffer_add_buffer (out, payload);
1980            evbuffer_add     (out, data, dataLen);
1981            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1982            dbgOutMessageLen (msgs);
1983
1984            evbuffer_free (payload);
1985            tr_variantFree (&tmp);
1986            tr_free (data);
1987
1988            ok = true;
1989        }
1990
1991        if (!ok) /* send a rejection message */
1992        {
1993            tr_variant tmp;
1994            struct evbuffer * payload;
1995            struct evbuffer * out = msgs->outMessages;
1996
1997            /* build the rejection message */
1998            tr_variantInitDict (&tmp, 2);
1999            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
2000            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
2001            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
2002
2003            /* write it out as a LTEP message to our outMessages buffer */
2004            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
2005            evbuffer_add_uint8 (out, BT_LTEP);
2006            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
2007            evbuffer_add_buffer (out, payload);
2008            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
2009            dbgOutMessageLen (msgs);
2010
2011            evbuffer_free (payload);
2012            tr_variantFree (&tmp);
2013        }
2014    }
2015
2016    /**
2017    ***  Data Blocks
2018    **/
2019
2020    if ((tr_peerIoGetWriteBufferSpace (msgs->io, now) >= msgs->torrent->blockSize)
2021        && popNextRequest (msgs, &req))
2022    {
2023        --msgs->prefetchCount;
2024
2025        if (requestIsValid (msgs, &req)
2026            && tr_torrentPieceIsComplete (msgs->torrent, req.index))
2027        {
2028            int err;
2029            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
2030            struct evbuffer * out;
2031            struct evbuffer_iovec iovec[1];
2032
2033            out = evbuffer_new ();
2034            evbuffer_expand (out, msglen);
2035
2036            evbuffer_add_uint32 (out, sizeof (uint8_t) + 2 * sizeof (uint32_t) + req.length);
2037            evbuffer_add_uint8 (out, BT_PIECE);
2038            evbuffer_add_uint32 (out, req.index);
2039            evbuffer_add_uint32 (out, req.offset);
2040
2041            evbuffer_reserve_space (out, req.length, iovec, 1);
2042            err = tr_cacheReadBlock (getSession (msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base);
2043            iovec[0].iov_len = req.length;
2044            evbuffer_commit_space (out, iovec, 1);
2045
2046            /* check the piece if it needs checking... */
2047            if (!err && tr_torrentPieceNeedsCheck (msgs->torrent, req.index))
2048                if ((err = !tr_torrentCheckPiece (msgs->torrent, req.index)))
2049                    tr_torrentSetLocalError (msgs->torrent, _("Please Verify Local Data! Piece #%zu is corrupt."), (size_t)req.index);
2050
2051            if (err)
2052            {
2053                if (fext)
2054                    protocolSendReject (msgs, &req);
2055            }
2056            else
2057            {
2058                const size_t n = evbuffer_get_length (out);
2059                dbgmsg (msgs, "sending block %u:%u->%u", req.index, req.offset, req.length);
2060                assert (n == msglen);
2061                tr_peerIoWriteBuf (msgs->io, out, true);
2062                bytesWritten += n;
2063                msgs->clientSentAnythingAt = now;
2064                tr_historyAdd (&msgs->peer.blocksSentToPeer, tr_time (), 1);
2065            }
2066
2067            evbuffer_free (out);
2068
2069            if (err)
2070            {
2071                bytesWritten = 0;
2072                msgs = NULL;
2073            }
2074        }
2075        else if (fext) /* peer needs a reject message */
2076        {
2077            protocolSendReject (msgs, &req);
2078        }
2079
2080        if (msgs != NULL)
2081            prefetchPieces (msgs);
2082    }
2083
2084    /**
2085    ***  Keepalive
2086    **/
2087
2088    if ((msgs != NULL)
2089        && (msgs->clientSentAnythingAt != 0)
2090        && ((now - msgs->clientSentAnythingAt) > KEEPALIVE_INTERVAL_SECS))
2091    {
2092        dbgmsg (msgs, "sending a keepalive message");
2093        evbuffer_add_uint32 (msgs->outMessages, 0);
2094        pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2095    }
2096
2097    return bytesWritten;
2098}
2099
2100static int
2101peerPulse (void * vmsgs)
2102{
2103    tr_peerMsgs * msgs = vmsgs;
2104    const time_t  now = tr_time ();
2105
2106    if (tr_isPeerIo (msgs->io)) {
2107        updateDesiredRequestCount (msgs);
2108        updateBlockRequests (msgs);
2109        updateMetadataRequests (msgs, now);
2110    }
2111
2112    for (;;)
2113        if (fillOutputBuffer (msgs, now) < 1)
2114            break;
2115
2116    return true; /* loop forever */
2117}
2118
2119void
2120tr_peerMsgsPulse (tr_peerMsgs * msgs)
2121{
2122    if (msgs != NULL)
2123        peerPulse (msgs);
2124}
2125
2126static void
2127gotError (tr_peerIo * io UNUSED, short what, void * vmsgs)
2128{
2129    if (what & BEV_EVENT_TIMEOUT)
2130        dbgmsg (vmsgs, "libevent got a timeout, what=%hd", what);
2131    if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR))
2132        dbgmsg (vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2133               what, errno, tr_strerror (errno));
2134    fireError (vmsgs, ENOTCONN);
2135}
2136
2137static void
2138sendBitfield (tr_peerMsgs * msgs)
2139{
2140    void * bytes;
2141    size_t byte_count = 0;
2142    struct evbuffer * out = msgs->outMessages;
2143
2144    assert (tr_torrentHasMetadata (msgs->torrent));
2145
2146    bytes = tr_torrentCreatePieceBitfield (msgs->torrent, &byte_count);
2147    evbuffer_add_uint32 (out, sizeof (uint8_t) + byte_count);
2148    evbuffer_add_uint8 (out, BT_BITFIELD);
2149    evbuffer_add     (out, bytes, byte_count);
2150    dbgmsg (msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length (out));
2151    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2152
2153    tr_free (bytes);
2154}
2155
2156static void
2157tellPeerWhatWeHave (tr_peerMsgs * msgs)
2158{
2159    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
2160
2161    if (fext && tr_torrentHasAll (msgs->torrent))
2162    {
2163        protocolSendHaveAll (msgs);
2164    }
2165    else if (fext && tr_torrentHasNone (msgs->torrent))
2166    {
2167        protocolSendHaveNone (msgs);
2168    }
2169    else if (!tr_torrentHasNone (msgs->torrent))
2170    {
2171        sendBitfield (msgs);
2172    }
2173}
2174
2175/**
2176***
2177**/
2178
2179/* some peers give us error messages if we send
2180   more than this many peers in a single pex message
2181   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2182#define MAX_PEX_ADDED 50
2183#define MAX_PEX_DROPPED 50
2184
2185typedef struct
2186{
2187    tr_pex *  added;
2188    tr_pex *  dropped;
2189    tr_pex *  elements;
2190    int       addedCount;
2191    int       droppedCount;
2192    int       elementCount;
2193}
2194PexDiffs;
2195
2196static void
2197pexAddedCb (void * vpex, void * userData)
2198{
2199    PexDiffs * diffs = userData;
2200    tr_pex *   pex = vpex;
2201
2202    if (diffs->addedCount < MAX_PEX_ADDED)
2203    {
2204        diffs->added[diffs->addedCount++] = *pex;
2205        diffs->elements[diffs->elementCount++] = *pex;
2206    }
2207}
2208
2209static inline void
2210pexDroppedCb (void * vpex, void * userData)
2211{
2212    PexDiffs * diffs = userData;
2213    tr_pex *   pex = vpex;
2214
2215    if (diffs->droppedCount < MAX_PEX_DROPPED)
2216    {
2217        diffs->dropped[diffs->droppedCount++] = *pex;
2218    }
2219}
2220
2221static inline void
2222pexElementCb (void * vpex, void * userData)
2223{
2224    PexDiffs * diffs = userData;
2225    tr_pex * pex = vpex;
2226
2227    diffs->elements[diffs->elementCount++] = *pex;
2228}
2229
2230typedef void (tr_set_func)(void * element, void * userData);
2231
2232/**
2233 * @brief find the differences and commonalities in two sorted sets
2234 * @param a the first set
2235 * @param aCount the number of elements in the set 'a'
2236 * @param b the second set
2237 * @param bCount the number of elements in the set 'b'
2238 * @param compare the sorting method for both sets
2239 * @param elementSize the sizeof the element in the two sorted sets
2240 * @param in_a called for items in set 'a' but not set 'b'
2241 * @param in_b called for items in set 'b' but not set 'a'
2242 * @param in_both called for items that are in both sets
2243 * @param userData user data passed along to in_a, in_b, and in_both
2244 */
2245static void
2246tr_set_compare (const void * va, size_t aCount,
2247                const void * vb, size_t bCount,
2248                int compare (const void * a, const void * b),
2249                size_t elementSize,
2250                tr_set_func in_a_cb,
2251                tr_set_func in_b_cb,
2252                tr_set_func in_both_cb,
2253                void * userData)
2254{
2255    const uint8_t * a = va;
2256    const uint8_t * b = vb;
2257    const uint8_t * aend = a + elementSize * aCount;
2258    const uint8_t * bend = b + elementSize * bCount;
2259
2260    while (a != aend || b != bend)
2261    {
2262        if (a == aend)
2263        {
2264          (*in_b_cb)((void*)b, userData);
2265            b += elementSize;
2266        }
2267        else if (b == bend)
2268        {
2269          (*in_a_cb)((void*)a, userData);
2270            a += elementSize;
2271        }
2272        else
2273        {
2274            const int val = (*compare)(a, b);
2275
2276            if (!val)
2277            {
2278              (*in_both_cb)((void*)a, userData);
2279                a += elementSize;
2280                b += elementSize;
2281            }
2282            else if (val < 0)
2283            {
2284              (*in_a_cb)((void*)a, userData);
2285                a += elementSize;
2286            }
2287            else if (val > 0)
2288            {
2289              (*in_b_cb)((void*)b, userData);
2290                b += elementSize;
2291            }
2292        }
2293    }
2294}
2295
2296
2297static void
2298sendPex (tr_peerMsgs * msgs)
2299{
2300    if (msgs->peerSupportsPex && tr_torrentAllowsPex (msgs->torrent))
2301    {
2302        PexDiffs diffs;
2303        PexDiffs diffs6;
2304        tr_pex * newPex = NULL;
2305        tr_pex * newPex6 = NULL;
2306        const int newCount = tr_peerMgrGetPeers (msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2307        const int newCount6 = tr_peerMgrGetPeers (msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2308
2309        /* build the diffs */
2310        diffs.added = tr_new (tr_pex, newCount);
2311        diffs.addedCount = 0;
2312        diffs.dropped = tr_new (tr_pex, msgs->pexCount);
2313        diffs.droppedCount = 0;
2314        diffs.elements = tr_new (tr_pex, newCount + msgs->pexCount);
2315        diffs.elementCount = 0;
2316        tr_set_compare (msgs->pex, msgs->pexCount,
2317                        newPex, newCount,
2318                        tr_pexCompare, sizeof (tr_pex),
2319                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs);
2320        diffs6.added = tr_new (tr_pex, newCount6);
2321        diffs6.addedCount = 0;
2322        diffs6.dropped = tr_new (tr_pex, msgs->pexCount6);
2323        diffs6.droppedCount = 0;
2324        diffs6.elements = tr_new (tr_pex, newCount6 + msgs->pexCount6);
2325        diffs6.elementCount = 0;
2326        tr_set_compare (msgs->pex6, msgs->pexCount6,
2327                        newPex6, newCount6,
2328                        tr_pexCompare, sizeof (tr_pex),
2329                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6);
2330        dbgmsg (
2331            msgs,
2332            "pex: old peer count %d+%d, new peer count %d+%d, "
2333            "added %d+%d, removed %d+%d",
2334            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2335            diffs.addedCount, diffs6.addedCount,
2336            diffs.droppedCount, diffs6.droppedCount);
2337
2338        if (!diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2339            !diffs6.droppedCount)
2340        {
2341            tr_free (diffs.elements);
2342            tr_free (diffs6.elements);
2343        }
2344        else
2345        {
2346            int  i;
2347            tr_variant val;
2348            uint8_t * tmp, *walk;
2349            struct evbuffer * payload;
2350            struct evbuffer * out = msgs->outMessages;
2351
2352            /* update peer */
2353            tr_free (msgs->pex);
2354            msgs->pex = diffs.elements;
2355            msgs->pexCount = diffs.elementCount;
2356            tr_free (msgs->pex6);
2357            msgs->pex6 = diffs6.elements;
2358            msgs->pexCount6 = diffs6.elementCount;
2359
2360            /* build the pex payload */
2361            tr_variantInitDict (&val, 3); /* ipv6 support: left as 3:
2362                                         * speed vs. likelihood? */
2363
2364            if (diffs.addedCount > 0)
2365            {
2366                /* "added" */
2367                tmp = walk = tr_new (uint8_t, diffs.addedCount * 6);
2368                for (i = 0; i < diffs.addedCount; ++i) {
2369                    memcpy (walk, &diffs.added[i].addr.addr, 4); walk += 4;
2370                    memcpy (walk, &diffs.added[i].port, 2); walk += 2;
2371                }
2372                assert ((walk - tmp) == diffs.addedCount * 6);
2373                tr_variantDictAddRaw (&val, TR_KEY_added, tmp, walk - tmp);
2374                tr_free (tmp);
2375
2376                /* "added.f"
2377                 * unset each holepunch flag because we don't support it. */
2378                tmp = walk = tr_new (uint8_t, diffs.addedCount);
2379                for (i = 0; i < diffs.addedCount; ++i)
2380                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2381                assert ((walk - tmp) == diffs.addedCount);
2382                tr_variantDictAddRaw (&val, TR_KEY_added_f, tmp, walk - tmp);
2383                tr_free (tmp);
2384            }
2385
2386            if (diffs.droppedCount > 0)
2387            {
2388                /* "dropped" */
2389                tmp = walk = tr_new (uint8_t, diffs.droppedCount * 6);
2390                for (i = 0; i < diffs.droppedCount; ++i) {
2391                    memcpy (walk, &diffs.dropped[i].addr.addr, 4); walk += 4;
2392                    memcpy (walk, &diffs.dropped[i].port, 2); walk += 2;
2393                }
2394                assert ((walk - tmp) == diffs.droppedCount * 6);
2395                tr_variantDictAddRaw (&val, TR_KEY_dropped, tmp, walk - tmp);
2396                tr_free (tmp);
2397            }
2398
2399            if (diffs6.addedCount > 0)
2400            {
2401                /* "added6" */
2402                tmp = walk = tr_new (uint8_t, diffs6.addedCount * 18);
2403                for (i = 0; i < diffs6.addedCount; ++i) {
2404                    memcpy (walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16);
2405                    walk += 16;
2406                    memcpy (walk, &diffs6.added[i].port, 2);
2407                    walk += 2;
2408                }
2409                assert ((walk - tmp) == diffs6.addedCount * 18);
2410                tr_variantDictAddRaw (&val, TR_KEY_added6, tmp, walk - tmp);
2411                tr_free (tmp);
2412
2413                /* "added6.f"
2414                 * unset each holepunch flag because we don't support it. */
2415                tmp = walk = tr_new (uint8_t, diffs6.addedCount);
2416                for (i = 0; i < diffs6.addedCount; ++i)
2417                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2418                assert ((walk - tmp) == diffs6.addedCount);
2419                tr_variantDictAddRaw (&val, TR_KEY_added6_f, tmp, walk - tmp);
2420                tr_free (tmp);
2421            }
2422
2423            if (diffs6.droppedCount > 0)
2424            {
2425                /* "dropped6" */
2426                tmp = walk = tr_new (uint8_t, diffs6.droppedCount * 18);
2427                for (i = 0; i < diffs6.droppedCount; ++i) {
2428                    memcpy (walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16);
2429                    walk += 16;
2430                    memcpy (walk, &diffs6.dropped[i].port, 2);
2431                    walk += 2;
2432                }
2433                assert ((walk - tmp) == diffs6.droppedCount * 18);
2434                tr_variantDictAddRaw (&val, TR_KEY_dropped6, tmp, walk - tmp);
2435                tr_free (tmp);
2436            }
2437
2438            /* write the pex message */
2439            payload = tr_variantToBuf (&val, TR_VARIANT_FMT_BENC);
2440            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
2441            evbuffer_add_uint8 (out, BT_LTEP);
2442            evbuffer_add_uint8 (out, msgs->ut_pex_id);
2443            evbuffer_add_buffer (out, payload);
2444            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
2445            dbgmsg (msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length (out));
2446            dbgOutMessageLen (msgs);
2447
2448            evbuffer_free (payload);
2449            tr_variantFree (&val);
2450        }
2451
2452        /* cleanup */
2453        tr_free (diffs.added);
2454        tr_free (diffs.dropped);
2455        tr_free (newPex);
2456        tr_free (diffs6.added);
2457        tr_free (diffs6.dropped);
2458        tr_free (newPex6);
2459
2460        /*msgs->clientSentPexAt = tr_time ();*/
2461    }
2462}
2463
2464static void
2465pexPulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmsgs)
2466{
2467    struct tr_peerMsgs * msgs = vmsgs;
2468
2469    sendPex (msgs);
2470
2471    assert (msgs->pexTimer != NULL);
2472    tr_timerAdd (msgs->pexTimer, PEX_INTERVAL_SECS, 0);
2473}
2474
2475/***
2476****  tr_peer virtual functions
2477***/
2478
2479static bool
2480peermsgs_is_transferring_pieces (const struct tr_peer * peer,
2481                                 uint64_t               now,
2482                                 tr_direction           direction,
2483                                 unsigned int         * setme_Bps)
2484{
2485  unsigned int Bps = 0;
2486
2487  if (tr_isPeerMsgs (peer))
2488    {
2489      const tr_peerMsgs * msgs = (const tr_peerMsgs *) peer;
2490      Bps = tr_peerIoGetPieceSpeed_Bps (msgs->io, now, direction);
2491    }
2492
2493  if (setme_Bps != NULL)
2494    *setme_Bps = Bps;
2495
2496  return Bps > 0;
2497}
2498
2499static void
2500peermsgs_destruct (tr_peer * peer)
2501{
2502  tr_peerMsgs * msgs = PEER_MSGS (peer);
2503
2504  assert (msgs != NULL);
2505
2506  tr_peerMsgsSetActive (msgs, TR_UP, false);
2507  tr_peerMsgsSetActive (msgs, TR_DOWN, false);
2508
2509  if (msgs->pexTimer != NULL)
2510    event_free (msgs->pexTimer);
2511
2512  if (msgs->incoming.block != NULL)
2513    evbuffer_free (msgs->incoming.block);
2514
2515  if (msgs->io)
2516    {
2517      tr_peerIoClear (msgs->io);
2518      tr_peerIoUnref (msgs->io); /* balanced by the ref in handshakeDoneCB () */
2519    }
2520
2521  evbuffer_free (msgs->outMessages);
2522  tr_free (msgs->pex6);
2523  tr_free (msgs->pex);
2524
2525  tr_peerDestruct (&msgs->peer);
2526
2527  memset (msgs, ~0, sizeof (tr_peerMsgs));
2528}
2529
2530static const struct tr_peer_virtual_funcs my_funcs =
2531{
2532  .destruct = peermsgs_destruct,
2533  .is_transferring_pieces = peermsgs_is_transferring_pieces
2534};
2535
2536/***
2537****
2538***/
2539
2540time_t
2541tr_peerMsgsGetConnectionAge (const tr_peerMsgs * msgs)
2542{
2543  assert (tr_isPeerMsgs (msgs));
2544
2545  return tr_peerIoGetAge (msgs->io);
2546}
2547
2548bool
2549tr_peerMsgsIsPeerChoked (const tr_peerMsgs * msgs)
2550{
2551  assert (tr_isPeerMsgs (msgs));
2552
2553  return msgs->peer_is_choked;
2554}
2555
2556bool
2557tr_peerMsgsIsPeerInterested (const tr_peerMsgs * msgs)
2558{
2559  assert (tr_isPeerMsgs (msgs));
2560
2561  return msgs->peer_is_interested;
2562}
2563
2564bool
2565tr_peerMsgsIsClientChoked (const tr_peerMsgs * msgs)
2566{
2567  assert (tr_isPeerMsgs (msgs));
2568
2569  return msgs->client_is_choked;
2570}
2571
2572bool
2573tr_peerMsgsIsClientInterested (const tr_peerMsgs * msgs)
2574{
2575  assert (tr_isPeerMsgs (msgs));
2576
2577  return msgs->client_is_interested;
2578}
2579
2580bool
2581tr_peerMsgsIsUtpConnection (const tr_peerMsgs * msgs)
2582{
2583  assert (tr_isPeerMsgs (msgs));
2584
2585  return msgs->io->utp_socket != NULL;
2586}
2587
2588bool
2589tr_peerMsgsIsEncrypted (const tr_peerMsgs * msgs)
2590{
2591  assert (tr_isPeerMsgs (msgs));
2592
2593  return tr_peerIoIsEncrypted (msgs->io);
2594}
2595
2596bool
2597tr_peerMsgsIsIncomingConnection (const tr_peerMsgs * msgs)
2598{
2599  assert (tr_isPeerMsgs (msgs));
2600
2601  return tr_peerIoIsIncoming (msgs->io);
2602}
2603
2604/***
2605****
2606***/
2607
2608bool
2609tr_isPeerMsgs (const void * msgs)
2610{
2611  /* FIXME: this is pretty crude */
2612  return (msgs != NULL)
2613      && (((struct tr_peerMsgs*)msgs)->magic_number == MAGIC_NUMBER);
2614}
2615
2616tr_peerMsgs *
2617tr_peerMsgsCast (void * vm)
2618{
2619  return tr_isPeerMsgs(vm) ? vm : NULL;
2620}
2621
2622tr_peerMsgs *
2623tr_peerMsgsNew (struct tr_torrent    * torrent,
2624                struct tr_peerIo     * io,
2625                tr_peer_callback       callback,
2626                void                 * callbackData)
2627{
2628  tr_peerMsgs * m;
2629
2630  assert (io != NULL);
2631
2632  m = tr_new0 (tr_peerMsgs, 1);
2633
2634  tr_peerConstruct (&m->peer, torrent);
2635  m->peer.funcs = &my_funcs;
2636
2637  m->magic_number = MAGIC_NUMBER;
2638  m->client_is_choked = true;
2639  m->peer_is_choked = true;
2640  m->client_is_interested = false;
2641  m->peer_is_interested = false;
2642  m->is_active[TR_UP] = false;
2643  m->is_active[TR_DOWN] = false;
2644  m->callback = callback;
2645  m->callbackData = callbackData;
2646  m->io = io;
2647  m->torrent = torrent;
2648  m->state = AWAITING_BT_LENGTH;
2649  m->outMessages = evbuffer_new ();
2650  m->outMessagesBatchedAt = 0;
2651  m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2652
2653  if (tr_torrentAllowsPex (torrent))
2654    {
2655      m->pexTimer = evtimer_new (torrent->session->event_base, pexPulse, m);
2656      tr_timerAdd (m->pexTimer, PEX_INTERVAL_SECS, 0);
2657    }
2658
2659  if (tr_peerIoSupportsUTP (m->io))
2660    {
2661      const tr_address * addr = tr_peerIoGetAddress (m->io, NULL);
2662      tr_peerMgrSetUtpSupported (torrent, addr);
2663      tr_peerMgrSetUtpFailed (torrent, addr, false);
2664    }
2665
2666  if (tr_peerIoSupportsLTEP (m->io))
2667    sendLtepHandshake (m);
2668
2669  tellPeerWhatWeHave (m);
2670
2671  if (tr_dhtEnabled (torrent->session) && tr_peerIoSupportsDHT (m->io))
2672    {
2673      /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2674      const struct tr_address *addr = tr_peerIoGetAddress (m->io, NULL);
2675      if (addr->type == TR_AF_INET || tr_globalIPv6 ())
2676        protocolSendPort (m, tr_dhtPort (torrent->session));
2677    }
2678
2679  tr_peerIoSetIOFuncs (m->io, canRead, didWrite, gotError, m);
2680  updateDesiredRequestCount (m);
2681
2682  return m;
2683}
Note: See TracBrowser for help on using the repository browser.