source: trunk/libtransmission/peer-msgs.c @ 14525

Last change on this file since 14525 was 14525, checked in by mikedld, 6 years ago

Fix some issues revealed by coverity

  • Property svn:keywords set to Date Rev Author Id
File size: 75.5 KB
Line 
1/*
2 * This file Copyright (C) 2007-2014 Mnemosyne LLC
3 *
4 * It may be used under the GNU GPL versions 2 or 3
5 * or any future license endorsed by Mnemosyne LLC.
6 *
7 * $Id: peer-msgs.c 14525 2015-05-09 08:37:55Z mikedld $
8 */
9
10#include <assert.h>
11#include <errno.h>
12#include <stdarg.h>
13#include <stdlib.h>
14#include <string.h>
15
16#include <event2/buffer.h>
17#include <event2/bufferevent.h>
18#include <event2/event.h>
19
20#include "transmission.h"
21#include "cache.h"
22#include "completion.h"
23#include "file.h"
24#include "log.h"
25#include "peer-io.h"
26#include "peer-mgr.h"
27#include "peer-msgs.h"
28#include "session.h"
29#include "torrent.h"
30#include "torrent-magnet.h"
31#include "tr-dht.h"
32#include "utils.h"
33#include "variant.h"
34#include "version.h"
35
36#ifndef EBADMSG
37 #define EBADMSG EINVAL
38#endif
39
40/**
41***
42**/
43
44enum
45{
46  BT_CHOKE                = 0,
47  BT_UNCHOKE              = 1,
48  BT_INTERESTED           = 2,
49  BT_NOT_INTERESTED       = 3,
50  BT_HAVE                 = 4,
51  BT_BITFIELD             = 5,
52  BT_REQUEST              = 6,
53  BT_PIECE                = 7,
54  BT_CANCEL               = 8,
55  BT_PORT                 = 9,
56
57  BT_FEXT_SUGGEST         = 13,
58  BT_FEXT_HAVE_ALL        = 14,
59  BT_FEXT_HAVE_NONE       = 15,
60  BT_FEXT_REJECT          = 16,
61  BT_FEXT_ALLOWED_FAST    = 17,
62
63  BT_LTEP                 = 20,
64
65  LTEP_HANDSHAKE          = 0,
66
67  UT_PEX_ID               = 1,
68  UT_METADATA_ID          = 3,
69
70  MAX_PEX_PEER_COUNT      = 50,
71
72  MIN_CHOKE_PERIOD_SEC    = 10,
73
74  /* idle seconds before we send a keepalive */
75  KEEPALIVE_INTERVAL_SECS = 100,
76
77  PEX_INTERVAL_SECS       = 90, /* sec between sendPex () calls */
78
79  REQQ                    = 512,
80
81  METADATA_REQQ           = 64,
82
83  MAGIC_NUMBER            = 21549,
84
85  /* used in lowering the outMessages queue period */
86  IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
87  HIGH_PRIORITY_INTERVAL_SECS = 2,
88  LOW_PRIORITY_INTERVAL_SECS = 10,
89
90  /* number of pieces we'll allow in our fast set */
91  MAX_FAST_SET_SIZE = 3,
92
93  /* how many blocks to keep prefetched per peer */
94  PREFETCH_SIZE = 18,
95
96  /* when we're making requests from another peer,
97     batch them together to send enough requests to
98     meet our bandwidth goals for the next N seconds */
99  REQUEST_BUF_SECS = 10,
100
101  /* defined in BEP #9 */
102  METADATA_MSG_TYPE_REQUEST = 0,
103  METADATA_MSG_TYPE_DATA = 1,
104  METADATA_MSG_TYPE_REJECT = 2
105};
106
107enum
108{
109  AWAITING_BT_LENGTH,
110  AWAITING_BT_ID,
111  AWAITING_BT_MESSAGE,
112  AWAITING_BT_PIECE
113};
114
115typedef enum
116{
117  ENCRYPTION_PREFERENCE_UNKNOWN,
118  ENCRYPTION_PREFERENCE_YES,
119  ENCRYPTION_PREFERENCE_NO
120}
121encryption_preference_t;
122
123/**
124***
125**/
126
127struct peer_request
128{
129  uint32_t index;
130  uint32_t offset;
131  uint32_t length;
132};
133
134static void
135blockToReq (const tr_torrent     * tor,
136            tr_block_index_t       block,
137            struct peer_request  * setme)
138{
139  tr_torrentGetBlockLocation (tor, block, &setme->index,
140                                          &setme->offset,
141                                          &setme->length);
142}
143
144/**
145***
146**/
147
148/* this is raw, unchanged data from the peer regarding
149 * the current message that it's sending us. */
150struct tr_incoming
151{
152  uint8_t                id;
153  uint32_t               length; /* includes the +1 for id length */
154  struct peer_request    blockReq; /* metadata for incoming blocks */
155  struct evbuffer      * block; /* piece data for incoming blocks */
156};
157
158/**
159 * Low-level communication state information about a connected peer.
160 *
161 * This structure remembers the low-level protocol states that we're
162 * in with this peer, such as active requests, pex messages, and so on.
163 * Its fields are all private to peer-msgs.c.
164 *
165 * Data not directly involved with sending & receiving messages is
166 * stored in tr_peer, where it can be accessed by both peermsgs and
167 * the peer manager.
168 *
169 * @see struct peer_atom
170 * @see tr_peer
171 */
172struct tr_peerMsgs
173{
174  struct tr_peer peer; /* parent */
175
176  uint16_t magic_number;
177
178  /* Whether or not we've choked this peer. */
179  bool peer_is_choked;
180
181  /* whether or not the peer has indicated it will download from us. */
182  bool peer_is_interested;
183
184  /* whether or the peer is choking us. */
185  bool client_is_choked;
186
187  /* whether or not we've indicated to the peer that we would download from them if unchoked. */
188  bool client_is_interested;
189
190
191  bool peerSupportsPex;
192  bool peerSupportsMetadataXfer;
193  bool clientSentLtepHandshake;
194  bool peerSentLtepHandshake;
195
196  /*bool haveFastSet;*/
197
198  int desiredRequestCount;
199
200  int prefetchCount;
201
202  int is_active[2];
203
204  /* how long the outMessages batch should be allowed to grow before
205   * it's flushed -- some messages (like requests >:) should be sent
206   * very quickly; others aren't as urgent. */
207  int8_t          outMessagesBatchPeriod;
208
209  uint8_t         state;
210  uint8_t         ut_pex_id;
211  uint8_t         ut_metadata_id;
212  uint16_t        pexCount;
213  uint16_t        pexCount6;
214
215  tr_port         dht_port;
216
217  encryption_preference_t  encryption_preference;
218
219  size_t                   metadata_size_hint;
220#if 0
221  size_t                 fastsetSize;
222  tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
223#endif
224
225  tr_torrent *           torrent;
226
227  tr_peer_callback        callback;
228  void                  * callbackData;
229
230  struct evbuffer *      outMessages; /* all the non-piece messages */
231
232  struct peer_request    peerAskedFor[REQQ];
233
234  int peerAskedForMetadata[METADATA_REQQ];
235  int peerAskedForMetadataCount;
236
237  tr_pex * pex;
238  tr_pex * pex6;
239
240  /*time_t clientSentPexAt;*/
241  time_t clientSentAnythingAt;
242
243  time_t chokeChangedAt;
244
245  /* when we started batching the outMessages */
246  time_t outMessagesBatchedAt;
247
248  struct tr_incoming    incoming;
249
250  /* if the peer supports the Extension Protocol in BEP 10 and
251     supplied a reqq argument, it's stored here. Otherwise, the
252     value is zero and should be ignored. */
253  int64_t reqq;
254
255  struct event * pexTimer;
256
257  struct tr_peerIo * io;
258};
259
260/**
261***
262**/
263
264static inline tr_session*
265getSession (struct tr_peerMsgs * msgs)
266{
267  return msgs->torrent->session;
268}
269
270/**
271***
272**/
273
274static void
275myDebug (const char * file, int line,
276         const struct tr_peerMsgs * msgs,
277         const char * fmt, ...) TR_GNUC_PRINTF(4, 5);
278
279static void
280myDebug (const char * file, int line,
281         const struct tr_peerMsgs * msgs,
282         const char * fmt, ...)
283{
284  const tr_sys_file_t fp = tr_logGetFile ();
285
286  if (fp != TR_BAD_SYS_FILE)
287    {
288      va_list           args;
289      char              timestr[64];
290      struct evbuffer * buf = evbuffer_new ();
291      char *            base = tr_sys_path_basename (file, NULL);
292      char *            message;
293
294      evbuffer_add_printf (buf, "[%s] %s - %s [%s]: ",
295                           tr_logGetTimeStr (timestr, sizeof (timestr)),
296                           tr_torrentName (msgs->torrent),
297                           tr_peerIoGetAddrStr (msgs->io),
298                           tr_quark_get_string (msgs->peer.client, NULL));
299      va_start (args, fmt);
300      evbuffer_add_vprintf (buf, fmt, args);
301      va_end (args);
302      evbuffer_add_printf (buf, " (%s:%d)", base, line);
303
304      message = evbuffer_free_to_str (buf);
305      tr_sys_file_write_line (fp, message, NULL);
306
307      tr_free (base);
308      tr_free (message);
309    }
310}
311
312#define dbgmsg(msgs, ...) \
313  do \
314    { \
315      if (tr_logGetDeepEnabled ()) \
316        myDebug (__FILE__, __LINE__, msgs, __VA_ARGS__); \
317    } \
318  while (0)
319
320/**
321***
322**/
323
324static void
325pokeBatchPeriod (tr_peerMsgs * msgs, int interval)
326{
327  if (msgs->outMessagesBatchPeriod > interval)
328    {
329      msgs->outMessagesBatchPeriod = interval;
330      dbgmsg (msgs, "lowering batch interval to %d seconds", interval);
331    }
332}
333
334static void
335dbgOutMessageLen (tr_peerMsgs * msgs)
336{
337  dbgmsg (msgs, "outMessage size is now %"TR_PRIuSIZE, evbuffer_get_length (msgs->outMessages));
338}
339
340static void
341protocolSendReject (tr_peerMsgs * msgs, const struct peer_request * req)
342{
343  struct evbuffer * out = msgs->outMessages;
344
345  assert (tr_peerIoSupportsFEXT (msgs->io));
346
347  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
348  evbuffer_add_uint8 (out, BT_FEXT_REJECT);
349  evbuffer_add_uint32 (out, req->index);
350  evbuffer_add_uint32 (out, req->offset);
351  evbuffer_add_uint32 (out, req->length);
352
353  dbgmsg (msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length);
354  dbgOutMessageLen (msgs);
355}
356
357static void
358protocolSendRequest (tr_peerMsgs * msgs, const struct peer_request * req)
359{
360  struct evbuffer * out = msgs->outMessages;
361
362  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
363  evbuffer_add_uint8 (out, BT_REQUEST);
364  evbuffer_add_uint32 (out, req->index);
365  evbuffer_add_uint32 (out, req->offset);
366  evbuffer_add_uint32 (out, req->length);
367
368  dbgmsg (msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length);
369  dbgOutMessageLen (msgs);
370  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
371}
372
373static void
374protocolSendCancel (tr_peerMsgs * msgs, const struct peer_request * req)
375{
376  struct evbuffer * out = msgs->outMessages;
377
378  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
379  evbuffer_add_uint8 (out, BT_CANCEL);
380  evbuffer_add_uint32 (out, req->index);
381  evbuffer_add_uint32 (out, req->offset);
382  evbuffer_add_uint32 (out, req->length);
383
384  dbgmsg (msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length);
385  dbgOutMessageLen (msgs);
386  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
387}
388
389static void
390protocolSendPort (tr_peerMsgs *msgs, uint16_t port)
391{
392  struct evbuffer * out = msgs->outMessages;
393
394  dbgmsg (msgs, "sending Port %u", port);
395  evbuffer_add_uint32 (out, 3);
396  evbuffer_add_uint8 (out, BT_PORT);
397  evbuffer_add_uint16 (out, port);
398}
399
400static void
401protocolSendHave (tr_peerMsgs * msgs, uint32_t index)
402{
403  struct evbuffer * out = msgs->outMessages;
404
405  evbuffer_add_uint32 (out, sizeof (uint8_t) + sizeof (uint32_t));
406  evbuffer_add_uint8 (out, BT_HAVE);
407  evbuffer_add_uint32 (out, index);
408
409  dbgmsg (msgs, "sending Have %u", index);
410  dbgOutMessageLen (msgs);
411  pokeBatchPeriod (msgs, LOW_PRIORITY_INTERVAL_SECS);
412}
413
414#if 0
415static void
416protocolSendAllowedFast (tr_peerMsgs * msgs, uint32_t pieceIndex)
417{
418  tr_peerIo       * io  = msgs->io;
419  struct evbuffer * out = msgs->outMessages;
420
421  assert (tr_peerIoSupportsFEXT (msgs->io));
422
423  evbuffer_add_uint32 (io, out, sizeof (uint8_t) + sizeof (uint32_t));
424  evbuffer_add_uint8 (io, out, BT_FEXT_ALLOWED_FAST);
425  evbuffer_add_uint32 (io, out, pieceIndex);
426
427  dbgmsg (msgs, "sending Allowed Fast %u...", pieceIndex);
428  dbgOutMessageLen (msgs);
429}
430#endif
431
432static void
433protocolSendChoke (tr_peerMsgs * msgs, int choke)
434{
435  struct evbuffer * out = msgs->outMessages;
436
437  evbuffer_add_uint32 (out, sizeof (uint8_t));
438  evbuffer_add_uint8 (out, choke ? BT_CHOKE : BT_UNCHOKE);
439
440  dbgmsg (msgs, "sending %s...", choke ? "Choke" : "Unchoke");
441  dbgOutMessageLen (msgs);
442  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
443}
444
445static void
446protocolSendHaveAll (tr_peerMsgs * msgs)
447{
448  struct evbuffer * out = msgs->outMessages;
449
450  assert (tr_peerIoSupportsFEXT (msgs->io));
451
452  evbuffer_add_uint32 (out, sizeof (uint8_t));
453  evbuffer_add_uint8 (out, BT_FEXT_HAVE_ALL);
454
455  dbgmsg (msgs, "sending HAVE_ALL...");
456  dbgOutMessageLen (msgs);
457  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
458}
459
460static void
461protocolSendHaveNone (tr_peerMsgs * msgs)
462{
463  struct evbuffer * out = msgs->outMessages;
464
465  assert (tr_peerIoSupportsFEXT (msgs->io));
466
467  evbuffer_add_uint32 (out, sizeof (uint8_t));
468  evbuffer_add_uint8 (out, BT_FEXT_HAVE_NONE);
469
470  dbgmsg (msgs, "sending HAVE_NONE...");
471  dbgOutMessageLen (msgs);
472  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
473}
474
475/**
476***  EVENTS
477**/
478
479static void
480publish (tr_peerMsgs * msgs, tr_peer_event * e)
481{
482  if (msgs->callback != NULL)
483    (*msgs->callback) (&msgs->peer, e, msgs->callbackData);
484}
485
486static void
487fireError (tr_peerMsgs * msgs, int err)
488{
489  tr_peer_event e = TR_PEER_EVENT_INIT;
490  e.eventType = TR_PEER_ERROR;
491  e.err = err;
492  publish (msgs, &e);
493}
494
495static void
496fireGotBlock (tr_peerMsgs * msgs, const struct peer_request * req)
497{
498  tr_peer_event e = TR_PEER_EVENT_INIT;
499  e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
500  e.pieceIndex = req->index;
501  e.offset = req->offset;
502  e.length = req->length;
503  publish (msgs, &e);
504}
505
506static void
507fireGotRej (tr_peerMsgs * msgs, const struct peer_request * req)
508{
509  tr_peer_event e = TR_PEER_EVENT_INIT;
510  e.eventType = TR_PEER_CLIENT_GOT_REJ;
511  e.pieceIndex = req->index;
512  e.offset = req->offset;
513  e.length = req->length;
514  publish (msgs, &e);
515}
516
517static void
518fireGotChoke (tr_peerMsgs * msgs)
519{
520  tr_peer_event e = TR_PEER_EVENT_INIT;
521  e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
522  publish (msgs, &e);
523}
524
525static void
526fireClientGotHaveAll (tr_peerMsgs * msgs)
527{
528  tr_peer_event e = TR_PEER_EVENT_INIT;
529  e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
530  publish (msgs, &e);
531}
532
533static void
534fireClientGotHaveNone (tr_peerMsgs * msgs)
535{
536  tr_peer_event e = TR_PEER_EVENT_INIT;
537  e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
538  publish (msgs, &e);
539}
540
541static void
542fireClientGotPieceData (tr_peerMsgs * msgs, uint32_t length)
543{
544  tr_peer_event e = TR_PEER_EVENT_INIT;
545  e.length = length;
546  e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA;
547  publish (msgs, &e);
548}
549
550static void
551firePeerGotPieceData (tr_peerMsgs * msgs, uint32_t length)
552{
553  tr_peer_event e = TR_PEER_EVENT_INIT;
554  e.length = length;
555  e.eventType = TR_PEER_PEER_GOT_PIECE_DATA;
556  publish (msgs, &e);
557}
558
559static void
560fireClientGotSuggest (tr_peerMsgs * msgs, uint32_t pieceIndex)
561{
562  tr_peer_event e = TR_PEER_EVENT_INIT;
563  e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
564  e.pieceIndex = pieceIndex;
565  publish (msgs, &e);
566}
567
568static void
569fireClientGotPort (tr_peerMsgs * msgs, tr_port port)
570{
571  tr_peer_event e = TR_PEER_EVENT_INIT;
572  e.eventType = TR_PEER_CLIENT_GOT_PORT;
573  e.port = port;
574  publish (msgs, &e);
575}
576
577static void
578fireClientGotAllowedFast (tr_peerMsgs * msgs, uint32_t pieceIndex)
579{
580  tr_peer_event e = TR_PEER_EVENT_INIT;
581  e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
582  e.pieceIndex = pieceIndex;
583  publish (msgs, &e);
584}
585
586static void
587fireClientGotBitfield (tr_peerMsgs * msgs, tr_bitfield * bitfield)
588{
589  tr_peer_event e = TR_PEER_EVENT_INIT;
590  e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
591  e.bitfield = bitfield;
592  publish (msgs, &e);
593}
594
595static void
596fireClientGotHave (tr_peerMsgs * msgs, tr_piece_index_t index)
597{
598  tr_peer_event e = TR_PEER_EVENT_INIT;
599  e.eventType = TR_PEER_CLIENT_GOT_HAVE;
600  e.pieceIndex = index;
601  publish (msgs, &e);
602}
603
604
605/**
606***  ALLOWED FAST SET
607***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
608**/
609
610#if 0
611size_t
612tr_generateAllowedSet (tr_piece_index_t * setmePieces,
613                       size_t             desiredSetSize,
614                       size_t             pieceCount,
615                       const uint8_t    * infohash,
616                       const tr_address * addr)
617{
618    size_t setSize = 0;
619
620    assert (setmePieces);
621    assert (desiredSetSize <= pieceCount);
622    assert (desiredSetSize);
623    assert (pieceCount);
624    assert (infohash);
625    assert (addr);
626
627    if (addr->type == TR_AF_INET)
628    {
629        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
630        uint8_t x[SHA_DIGEST_LENGTH];
631
632        uint32_t ui32 = ntohl (htonl (addr->addr.addr4.s_addr) & 0xffffff00);   /* (1) */
633        memcpy (w, &ui32, sizeof (uint32_t));
634        walk += sizeof (uint32_t);
635        memcpy (walk, infohash, SHA_DIGEST_LENGTH);                 /* (2) */
636        walk += SHA_DIGEST_LENGTH;
637        tr_sha1 (x, w, walk-w, NULL);                               /* (3) */
638        assert (sizeof (w) == walk-w);
639
640        while (setSize<desiredSetSize)
641        {
642            int i;
643            for (i=0; i<5 && setSize<desiredSetSize; ++i)           /* (4) */
644            {
645                size_t k;
646                uint32_t j = i * 4;                                  /* (5) */
647                uint32_t y = ntohl (* (uint32_t*)(x + j));       /* (6) */
648                uint32_t index = y % pieceCount;                     /* (7) */
649
650                for (k=0; k<setSize; ++k)                           /* (8) */
651                    if (setmePieces[k] == index)
652                        break;
653
654                if (k == setSize)
655                    setmePieces[setSize++] = index;                  /* (9) */
656            }
657
658            tr_sha1 (x, x, sizeof (x), NULL);                      /* (3) */
659        }
660    }
661
662    return setSize;
663}
664
665static void
666updateFastSet (tr_peerMsgs * msgs UNUSED)
667{
668    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
669    const int peerIsNeedy = msgs->peer->progress < 0.10;
670
671    if (fext && peerIsNeedy && !msgs->haveFastSet)
672    {
673        size_t i;
674        const struct tr_address * addr = tr_peerIoGetAddress (msgs->io, NULL);
675        const tr_info * inf = &msgs->torrent->info;
676        const size_t numwant = MIN (MAX_FAST_SET_SIZE, inf->pieceCount);
677
678        /* build the fast set */
679        msgs->fastsetSize = tr_generateAllowedSet (msgs->fastset, numwant, inf->pieceCount, inf->hash, addr);
680        msgs->haveFastSet = true;
681
682        /* send it to the peer */
683        for (i=0; i<msgs->fastsetSize; ++i)
684            protocolSendAllowedFast (msgs, msgs->fastset[i]);
685    }
686}
687#endif
688
689/***
690****  ACTIVE
691***/
692
693static bool
694tr_peerMsgsCalculateActive (const tr_peerMsgs * msgs, tr_direction direction)
695{
696  bool is_active;
697
698  assert (tr_isPeerMsgs (msgs));
699  assert (tr_isDirection (direction));
700
701  if (direction == TR_CLIENT_TO_PEER)
702    {
703      is_active = tr_peerMsgsIsPeerInterested (msgs)
704              && !tr_peerMsgsIsPeerChoked (msgs);
705
706      /* FIXME: https://trac.transmissionbt.com/ticket/5505
707      if (is_active)
708        assert (!tr_peerIsSeed (&msgs->peer));
709      */
710    }
711  else /* TR_PEER_TO_CLIENT */
712    {
713      if (!tr_torrentHasMetadata (msgs->torrent))
714        {
715          is_active = true;
716        }
717      else
718        {
719          is_active = tr_peerMsgsIsClientInterested (msgs)
720                  && !tr_peerMsgsIsClientChoked (msgs);
721
722          if (is_active)
723            assert (!tr_torrentIsSeed (msgs->torrent));
724        }
725    }
726
727  return is_active;
728}
729
730bool
731tr_peerMsgsIsActive (const tr_peerMsgs  * msgs, tr_direction direction)
732{
733  bool is_active;
734
735  assert (tr_isPeerMsgs (msgs));
736  assert (tr_isDirection (direction));
737
738  is_active = msgs->is_active[direction];
739
740  assert (is_active == tr_peerMsgsCalculateActive (msgs, direction));
741
742  return is_active;
743}
744
745static void
746tr_peerMsgsSetActive (tr_peerMsgs  * msgs,
747                      tr_direction   direction,
748                      bool           is_active)
749{
750  dbgmsg (msgs, "direction [%d] is_active [%d]", (int)direction, (int)is_active);
751
752  if (msgs->is_active[direction] != is_active)
753    {
754      msgs->is_active[direction] = is_active;
755
756      tr_swarmIncrementActivePeers (msgs->torrent->swarm, direction, is_active);
757    }
758}
759
760void
761tr_peerMsgsUpdateActive (tr_peerMsgs * msgs, tr_direction direction)
762{
763  const bool is_active = tr_peerMsgsCalculateActive (msgs, direction);
764
765  tr_peerMsgsSetActive (msgs, direction, is_active);
766}
767
768/**
769***  INTEREST
770**/
771
772static void
773sendInterest (tr_peerMsgs * msgs, bool b)
774{
775  struct evbuffer * out = msgs->outMessages;
776
777  assert (msgs);
778  assert (tr_isBool (b));
779
780  msgs->client_is_interested = b;
781  dbgmsg (msgs, "Sending %s", b ? "Interested" : "Not Interested");
782  evbuffer_add_uint32 (out, sizeof (uint8_t));
783  evbuffer_add_uint8 (out, b ? BT_INTERESTED : BT_NOT_INTERESTED);
784
785  pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
786  dbgOutMessageLen (msgs);
787}
788
789static void
790updateInterest (tr_peerMsgs * msgs UNUSED)
791{
792    /* FIXME -- might need to poke the mgr on startup */
793}
794
795void
796tr_peerMsgsSetInterested (tr_peerMsgs * msgs, bool b)
797{
798  assert (tr_isBool (b));
799
800  if (msgs->client_is_interested != b)
801    {
802      sendInterest (msgs, b);
803
804      tr_peerMsgsUpdateActive (msgs, TR_PEER_TO_CLIENT);
805    }
806}
807
808static bool
809popNextMetadataRequest (tr_peerMsgs * msgs, int * piece)
810{
811  if (msgs->peerAskedForMetadataCount == 0)
812    return false;
813
814  *piece = msgs->peerAskedForMetadata[0];
815
816  tr_removeElementFromArray (msgs->peerAskedForMetadata, 0, sizeof (int),
817                             msgs->peerAskedForMetadataCount--);
818
819  return true;
820}
821
822static bool
823popNextRequest (tr_peerMsgs * msgs, struct peer_request * setme)
824{
825  if (msgs->peer.pendingReqsToClient == 0)
826    return false;
827
828  *setme = msgs->peerAskedFor[0];
829
830  tr_removeElementFromArray (msgs->peerAskedFor,
831                             0,
832                             sizeof (struct peer_request),
833                             msgs->peer.pendingReqsToClient--);
834
835  return true;
836}
837
838static void
839cancelAllRequestsToClient (tr_peerMsgs * msgs)
840{
841  struct peer_request req;
842  const int mustSendCancel = tr_peerIoSupportsFEXT (msgs->io);
843
844  while (popNextRequest (msgs, &req))
845    if (mustSendCancel)
846      protocolSendReject (msgs, &req);
847}
848
849void
850tr_peerMsgsSetChoke (tr_peerMsgs * msgs, bool peer_is_choked)
851{
852  const time_t now = tr_time ();
853  const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
854
855  assert (msgs != NULL);
856  assert (tr_isBool (peer_is_choked));
857
858  if (msgs->chokeChangedAt > fibrillationTime)
859    {
860      dbgmsg (msgs, "Not changing choke to %d to avoid fibrillation", peer_is_choked);
861    }
862  else if (msgs->peer_is_choked != peer_is_choked)
863    {
864      msgs->peer_is_choked = peer_is_choked;
865      if (peer_is_choked)
866        cancelAllRequestsToClient (msgs);
867      protocolSendChoke (msgs, peer_is_choked);
868      msgs->chokeChangedAt = now;
869      tr_peerMsgsUpdateActive (msgs, TR_CLIENT_TO_PEER);
870    }
871}
872
873/**
874***
875**/
876
877void
878tr_peerMsgsHave (tr_peerMsgs * msgs, uint32_t index)
879{
880  protocolSendHave (msgs, index);
881
882  /* since we have more pieces now, we might not be interested in this peer */
883  updateInterest (msgs);
884}
885
886/**
887***
888**/
889
890static bool
891reqIsValid (const tr_peerMsgs * peer,
892            uint32_t            index,
893            uint32_t            offset,
894            uint32_t            length)
895{
896    return tr_torrentReqIsValid (peer->torrent, index, offset, length);
897}
898
899static bool
900requestIsValid (const tr_peerMsgs * msgs, const struct peer_request * req)
901{
902    return reqIsValid (msgs, req->index, req->offset, req->length);
903}
904
905void
906tr_peerMsgsCancel (tr_peerMsgs * msgs, tr_block_index_t block)
907{
908    struct peer_request req;
909/*fprintf (stderr, "SENDING CANCEL MESSAGE FOR BLOCK %"TR_PRIuSIZE"\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer);*/
910    blockToReq (msgs->torrent, block, &req);
911    protocolSendCancel (msgs, &req);
912}
913
914/**
915***
916**/
917
918static void
919sendLtepHandshake (tr_peerMsgs * msgs)
920{
921    tr_variant val;
922    bool allow_pex;
923    bool allow_metadata_xfer;
924    struct evbuffer * payload;
925    struct evbuffer * out = msgs->outMessages;
926    const unsigned char * ipv6 = tr_globalIPv6 ();
927    static tr_quark version_quark = 0;
928
929    if (msgs->clientSentLtepHandshake)
930        return;
931
932    if (!version_quark)
933      version_quark = tr_quark_new (TR_NAME " " USERAGENT_PREFIX, -1);
934
935    dbgmsg (msgs, "sending an ltep handshake");
936    msgs->clientSentLtepHandshake = true;
937
938    /* decide if we want to advertise metadata xfer support (BEP 9) */
939    if (tr_torrentIsPrivate (msgs->torrent))
940        allow_metadata_xfer = false;
941    else
942        allow_metadata_xfer = true;
943
944    /* decide if we want to advertise pex support */
945    if (!tr_torrentAllowsPex (msgs->torrent))
946        allow_pex = false;
947    else if (msgs->peerSentLtepHandshake)
948        allow_pex = msgs->peerSupportsPex;
949    else
950        allow_pex = true;
951
952    tr_variantInitDict (&val, 8);
953    tr_variantDictAddInt (&val, TR_KEY_e, getSession (msgs)->encryptionMode != TR_CLEAR_PREFERRED);
954    if (ipv6 != NULL)
955        tr_variantDictAddRaw (&val, TR_KEY_ipv6, ipv6, 16);
956    if (allow_metadata_xfer && tr_torrentHasMetadata (msgs->torrent)
957                            && (msgs->torrent->infoDictLength > 0))
958        tr_variantDictAddInt (&val, TR_KEY_metadata_size, msgs->torrent->infoDictLength);
959    tr_variantDictAddInt (&val, TR_KEY_p, tr_sessionGetPublicPeerPort (getSession (msgs)));
960    tr_variantDictAddInt (&val, TR_KEY_reqq, REQQ);
961    tr_variantDictAddInt (&val, TR_KEY_upload_only, tr_torrentIsSeed (msgs->torrent));
962    tr_variantDictAddQuark (&val, TR_KEY_v, version_quark);
963    if (allow_metadata_xfer || allow_pex) {
964        tr_variant * m  = tr_variantDictAddDict (&val, TR_KEY_m, 2);
965        if (allow_metadata_xfer)
966            tr_variantDictAddInt (m, TR_KEY_ut_metadata, UT_METADATA_ID);
967        if (allow_pex)
968            tr_variantDictAddInt (m, TR_KEY_ut_pex, UT_PEX_ID);
969    }
970
971    payload = tr_variantToBuf (&val, TR_VARIANT_FMT_BENC);
972
973    evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
974    evbuffer_add_uint8 (out, BT_LTEP);
975    evbuffer_add_uint8 (out, LTEP_HANDSHAKE);
976    evbuffer_add_buffer (out, payload);
977    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
978    dbgOutMessageLen (msgs);
979
980    /* cleanup */
981    evbuffer_free (payload);
982    tr_variantFree (&val);
983}
984
985static void
986parseLtepHandshake (tr_peerMsgs * msgs, int len, struct evbuffer * inbuf)
987{
988    int64_t   i;
989    tr_variant   val, * sub;
990    uint8_t * tmp = tr_new (uint8_t, len);
991    const uint8_t *addr;
992    size_t addr_len;
993    tr_pex pex;
994    int8_t seedProbability = -1;
995
996    memset (&pex, 0, sizeof (tr_pex));
997
998    tr_peerIoReadBytes (msgs->io, inbuf, tmp, len);
999    msgs->peerSentLtepHandshake = true;
1000
1001    if (tr_variantFromBenc (&val, tmp, len) || !tr_variantIsDict (&val))
1002    {
1003        dbgmsg (msgs, "GET  extended-handshake, couldn't get dictionary");
1004        tr_free (tmp);
1005        return;
1006    }
1007
1008    dbgmsg (msgs, "here is the handshake: [%*.*s]", len, len,  tmp);
1009
1010    /* does the peer prefer encrypted connections? */
1011    if (tr_variantDictFindInt (&val, TR_KEY_e, &i)) {
1012        msgs->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1013                                        : ENCRYPTION_PREFERENCE_NO;
1014        if (i)
1015            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
1016    }
1017
1018    /* check supported messages for utorrent pex */
1019    msgs->peerSupportsPex = false;
1020    msgs->peerSupportsMetadataXfer = false;
1021
1022    if (tr_variantDictFindDict (&val, TR_KEY_m, &sub)) {
1023        if (tr_variantDictFindInt (sub, TR_KEY_ut_pex, &i)) {
1024            msgs->peerSupportsPex = i != 0;
1025            msgs->ut_pex_id = (uint8_t) i;
1026            dbgmsg (msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id);
1027        }
1028        if (tr_variantDictFindInt (sub, TR_KEY_ut_metadata, &i)) {
1029            msgs->peerSupportsMetadataXfer = i != 0;
1030            msgs->ut_metadata_id = (uint8_t) i;
1031            dbgmsg (msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id);
1032        }
1033        if (tr_variantDictFindInt (sub, TR_KEY_ut_holepunch, &i)) {
1034            /* Mysterious µTorrent extension that we don't grok.  However,
1035               it implies support for µTP, so use it to indicate that. */
1036            tr_peerMgrSetUtpFailed (msgs->torrent,
1037                                    tr_peerIoGetAddress (msgs->io, NULL),
1038                                    false);
1039        }
1040    }
1041
1042    /* look for metainfo size (BEP 9) */
1043    if (tr_variantDictFindInt (&val, TR_KEY_metadata_size, &i)) {
1044        tr_torrentSetMetadataSizeHint (msgs->torrent, i);
1045        msgs->metadata_size_hint = (size_t) i;
1046    }
1047
1048    /* look for upload_only (BEP 21) */
1049    if (tr_variantDictFindInt (&val, TR_KEY_upload_only, &i))
1050        seedProbability = i==0 ? 0 : 100;
1051
1052    /* get peer's listening port */
1053    if (tr_variantDictFindInt (&val, TR_KEY_p, &i)) {
1054        pex.port = htons ((uint16_t)i);
1055        fireClientGotPort (msgs, pex.port);
1056        dbgmsg (msgs, "peer's port is now %d", (int)i);
1057    }
1058
1059    if (tr_peerIoIsIncoming (msgs->io)
1060        && tr_variantDictFindRaw (&val, TR_KEY_ipv4, &addr, &addr_len)
1061        && (addr_len == 4))
1062    {
1063        pex.addr.type = TR_AF_INET;
1064        memcpy (&pex.addr.addr.addr4, addr, 4);
1065        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
1066    }
1067
1068    if (tr_peerIoIsIncoming (msgs->io)
1069        && tr_variantDictFindRaw (&val, TR_KEY_ipv6, &addr, &addr_len)
1070        && (addr_len == 16))
1071    {
1072        pex.addr.type = TR_AF_INET6;
1073        memcpy (&pex.addr.addr.addr6, addr, 16);
1074        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
1075    }
1076
1077    /* get peer's maximum request queue size */
1078    if (tr_variantDictFindInt (&val, TR_KEY_reqq, &i))
1079        msgs->reqq = i;
1080
1081    tr_variantFree (&val);
1082    tr_free (tmp);
1083}
1084
1085static void
1086parseUtMetadata (tr_peerMsgs * msgs, int msglen, struct evbuffer * inbuf)
1087{
1088    tr_variant dict;
1089    char * msg_end;
1090    const char * benc_end;
1091    int64_t msg_type = -1;
1092    int64_t piece = -1;
1093    int64_t total_size = 0;
1094    uint8_t * tmp = tr_new (uint8_t, msglen);
1095
1096    tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1097    msg_end = (char*)tmp + msglen;
1098
1099    if (!tr_variantFromBencFull (&dict, tmp, msglen, NULL, &benc_end))
1100    {
1101        tr_variantDictFindInt (&dict, TR_KEY_msg_type, &msg_type);
1102        tr_variantDictFindInt (&dict, TR_KEY_piece, &piece);
1103        tr_variantDictFindInt (&dict, TR_KEY_total_size, &total_size);
1104        tr_variantFree (&dict);
1105    }
1106
1107    dbgmsg (msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
1108          (int)msg_type, (int)piece, (int)total_size);
1109
1110    if (msg_type == METADATA_MSG_TYPE_REJECT)
1111    {
1112        /* NOOP */
1113    }
1114
1115    if ((msg_type == METADATA_MSG_TYPE_DATA)
1116        && (!tr_torrentHasMetadata (msgs->torrent))
1117        && (msg_end - benc_end <= METADATA_PIECE_SIZE)
1118        && (piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size))
1119    {
1120        const int pieceLen = msg_end - benc_end;
1121        tr_torrentSetMetadataPiece (msgs->torrent, piece, benc_end, pieceLen);
1122    }
1123
1124    if (msg_type == METADATA_MSG_TYPE_REQUEST)
1125    {
1126        if ((piece >= 0)
1127            && tr_torrentHasMetadata (msgs->torrent)
1128            && !tr_torrentIsPrivate (msgs->torrent)
1129            && (msgs->peerAskedForMetadataCount < METADATA_REQQ))
1130        {
1131            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1132        }
1133        else
1134        {
1135            tr_variant tmp;
1136            struct evbuffer * payload;
1137            struct evbuffer * out = msgs->outMessages;
1138
1139            /* build the rejection message */
1140            tr_variantInitDict (&tmp, 2);
1141            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
1142            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1143            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1144
1145            /* write it out as a LTEP message to our outMessages buffer */
1146            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1147            evbuffer_add_uint8 (out, BT_LTEP);
1148            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1149            evbuffer_add_buffer (out, payload);
1150            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1151            dbgOutMessageLen (msgs);
1152
1153            /* cleanup */
1154            evbuffer_free (payload);
1155            tr_variantFree (&tmp);
1156        }
1157    }
1158
1159    tr_free (tmp);
1160}
1161
1162static void
1163parseUtPex (tr_peerMsgs * msgs, int msglen, struct evbuffer * inbuf)
1164{
1165    int loaded = 0;
1166    uint8_t * tmp = tr_new (uint8_t, msglen);
1167    tr_variant val;
1168    tr_torrent * tor = msgs->torrent;
1169    const uint8_t * added;
1170    size_t added_len;
1171
1172    tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1173
1174    if (tr_torrentAllowsPex (tor)
1175      && ((loaded = !tr_variantFromBenc (&val, tmp, msglen))))
1176    {
1177        if (tr_variantDictFindRaw (&val, TR_KEY_added, &added, &added_len))
1178        {
1179            tr_pex * pex;
1180            size_t i, n;
1181            size_t added_f_len = 0;
1182            const uint8_t * added_f = NULL;
1183
1184            tr_variantDictFindRaw (&val, TR_KEY_added_f, &added_f, &added_f_len);
1185            pex = tr_peerMgrCompactToPex (added, added_len, added_f, added_f_len, &n);
1186
1187            n = MIN (n, MAX_PEX_PEER_COUNT);
1188            for (i=0; i<n; ++i)
1189            {
1190                int seedProbability = -1;
1191                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1192                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1193            }
1194
1195            tr_free (pex);
1196        }
1197
1198        if (tr_variantDictFindRaw (&val, TR_KEY_added6, &added, &added_len))
1199        {
1200            tr_pex * pex;
1201            size_t i, n;
1202            size_t added_f_len = 0;
1203            const uint8_t * added_f = NULL;
1204
1205            tr_variantDictFindRaw (&val, TR_KEY_added6_f, &added_f, &added_f_len);
1206            pex = tr_peerMgrCompact6ToPex (added, added_len, added_f, added_f_len, &n);
1207
1208            n = MIN (n, MAX_PEX_PEER_COUNT);
1209            for (i=0; i<n; ++i)
1210            {
1211                int seedProbability = -1;
1212                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1213                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1214            }
1215
1216            tr_free (pex);
1217        }
1218    }
1219
1220    if (loaded)
1221        tr_variantFree (&val);
1222    tr_free (tmp);
1223}
1224
1225static void sendPex (tr_peerMsgs * msgs);
1226
1227static void
1228parseLtep (tr_peerMsgs * msgs, int msglen, struct evbuffer  * inbuf)
1229{
1230    uint8_t ltep_msgid;
1231
1232    tr_peerIoReadUint8 (msgs->io, inbuf, &ltep_msgid);
1233    msglen--;
1234
1235    if (ltep_msgid == LTEP_HANDSHAKE)
1236    {
1237        dbgmsg (msgs, "got ltep handshake");
1238        parseLtepHandshake (msgs, msglen, inbuf);
1239        if (tr_peerIoSupportsLTEP (msgs->io))
1240        {
1241            sendLtepHandshake (msgs);
1242            sendPex (msgs);
1243        }
1244    }
1245    else if (ltep_msgid == UT_PEX_ID)
1246    {
1247        dbgmsg (msgs, "got ut pex");
1248        msgs->peerSupportsPex = true;
1249        parseUtPex (msgs, msglen, inbuf);
1250    }
1251    else if (ltep_msgid == UT_METADATA_ID)
1252    {
1253        dbgmsg (msgs, "got ut metadata");
1254        msgs->peerSupportsMetadataXfer = true;
1255        parseUtMetadata (msgs, msglen, inbuf);
1256    }
1257    else
1258    {
1259        dbgmsg (msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid);
1260        evbuffer_drain (inbuf, msglen);
1261    }
1262}
1263
1264static int
1265readBtLength (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1266{
1267    uint32_t len;
1268
1269    if (inlen < sizeof (len))
1270        return READ_LATER;
1271
1272    tr_peerIoReadUint32 (msgs->io, inbuf, &len);
1273
1274    if (len == 0) /* peer sent us a keepalive message */
1275        dbgmsg (msgs, "got KeepAlive");
1276    else
1277    {
1278        msgs->incoming.length = len;
1279        msgs->state = AWAITING_BT_ID;
1280    }
1281
1282    return READ_NOW;
1283}
1284
1285static int readBtMessage (tr_peerMsgs *, struct evbuffer *, size_t);
1286
1287static int
1288readBtId (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1289{
1290    uint8_t id;
1291
1292    if (inlen < sizeof (uint8_t))
1293        return READ_LATER;
1294
1295    tr_peerIoReadUint8 (msgs->io, inbuf, &id);
1296    msgs->incoming.id = id;
1297    dbgmsg (msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %"TR_PRIuSIZE, id, (size_t)msgs->incoming.length);
1298
1299    if (id == BT_PIECE)
1300    {
1301        msgs->state = AWAITING_BT_PIECE;
1302        return READ_NOW;
1303    }
1304    else if (msgs->incoming.length != 1)
1305    {
1306        msgs->state = AWAITING_BT_MESSAGE;
1307        return READ_NOW;
1308    }
1309    else return readBtMessage (msgs, inbuf, inlen - 1);
1310}
1311
1312static void
1313updatePeerProgress (tr_peerMsgs * msgs)
1314{
1315  tr_peerUpdateProgress (msgs->torrent, &msgs->peer);
1316
1317  /*updateFastSet (msgs);*/
1318  updateInterest (msgs);
1319}
1320
1321static void
1322prefetchPieces (tr_peerMsgs *msgs)
1323{
1324  int i;
1325
1326  if (!getSession (msgs)->isPrefetchEnabled)
1327    return;
1328
1329  for (i=msgs->prefetchCount; i<msgs->peer.pendingReqsToClient && i<PREFETCH_SIZE; ++i)
1330    {
1331      const struct peer_request * req = msgs->peerAskedFor + i;
1332      if (requestIsValid (msgs, req))
1333        {
1334          tr_cachePrefetchBlock (getSession (msgs)->cache, msgs->torrent, req->index, req->offset, req->length);
1335          ++msgs->prefetchCount;
1336        }
1337    }
1338}
1339
1340static void
1341peerMadeRequest (tr_peerMsgs * msgs, const struct peer_request * req)
1342{
1343    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1344    const int reqIsValid = requestIsValid (msgs, req);
1345    const int clientHasPiece = reqIsValid && tr_torrentPieceIsComplete (msgs->torrent, req->index);
1346    const int peerIsChoked = msgs->peer_is_choked;
1347
1348    bool allow = false;
1349
1350    if (!reqIsValid)
1351        dbgmsg (msgs, "rejecting an invalid request.");
1352    else if (!clientHasPiece)
1353        dbgmsg (msgs, "rejecting request for a piece we don't have.");
1354    else if (peerIsChoked)
1355        dbgmsg (msgs, "rejecting request from choked peer");
1356    else if (msgs->peer.pendingReqsToClient + 1 >= REQQ)
1357        dbgmsg (msgs, "rejecting request ... reqq is full");
1358    else
1359        allow = true;
1360
1361    if (allow) {
1362        msgs->peerAskedFor[msgs->peer.pendingReqsToClient++] = *req;
1363        prefetchPieces (msgs);
1364    } else if (fext) {
1365        protocolSendReject (msgs, req);
1366    }
1367}
1368
1369static bool
1370messageLengthIsCorrect (const tr_peerMsgs * msg, uint8_t id, uint32_t len)
1371{
1372    switch (id)
1373    {
1374        case BT_CHOKE:
1375        case BT_UNCHOKE:
1376        case BT_INTERESTED:
1377        case BT_NOT_INTERESTED:
1378        case BT_FEXT_HAVE_ALL:
1379        case BT_FEXT_HAVE_NONE:
1380            return len == 1;
1381
1382        case BT_HAVE:
1383        case BT_FEXT_SUGGEST:
1384        case BT_FEXT_ALLOWED_FAST:
1385            return len == 5;
1386
1387        case BT_BITFIELD:
1388            if (tr_torrentHasMetadata (msg->torrent))
1389                return len == (msg->torrent->info.pieceCount + 7u) / 8u + 1u;
1390            /* we don't know the piece count yet,
1391               so we can only guess whether to send true or false */
1392            if (msg->metadata_size_hint > 0)
1393                return len <= msg->metadata_size_hint;
1394            return true;
1395
1396        case BT_REQUEST:
1397        case BT_CANCEL:
1398        case BT_FEXT_REJECT:
1399            return len == 13;
1400
1401        case BT_PIECE:
1402            return len > 9 && len <= 16393;
1403
1404        case BT_PORT:
1405            return len == 3;
1406
1407        case BT_LTEP:
1408            return len >= 2;
1409
1410        default:
1411            return false;
1412    }
1413}
1414
1415static int clientGotBlock (tr_peerMsgs *               msgs,
1416                           struct evbuffer *           block,
1417                           const struct peer_request * req);
1418
1419static int
1420readBtPiece (tr_peerMsgs      * msgs,
1421             struct evbuffer  * inbuf,
1422             size_t             inlen,
1423             size_t           * setme_piece_bytes_read)
1424{
1425    struct peer_request * req = &msgs->incoming.blockReq;
1426
1427    assert (evbuffer_get_length (inbuf) >= inlen);
1428    dbgmsg (msgs, "In readBtPiece");
1429
1430    if (!req->length)
1431    {
1432        if (inlen < 8)
1433            return READ_LATER;
1434
1435        tr_peerIoReadUint32 (msgs->io, inbuf, &req->index);
1436        tr_peerIoReadUint32 (msgs->io, inbuf, &req->offset);
1437        req->length = msgs->incoming.length - 9;
1438        dbgmsg (msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length);
1439        return READ_NOW;
1440    }
1441    else
1442    {
1443        int err;
1444        size_t n;
1445        size_t nLeft;
1446        struct evbuffer * block_buffer;
1447
1448        if (msgs->incoming.block == NULL)
1449            msgs->incoming.block = evbuffer_new ();
1450        block_buffer = msgs->incoming.block;
1451
1452        /* read in another chunk of data */
1453        nLeft = req->length - evbuffer_get_length (block_buffer);
1454        n = MIN (nLeft, inlen);
1455
1456        tr_peerIoReadBytesToBuf (msgs->io, inbuf, block_buffer, n);
1457
1458        fireClientGotPieceData (msgs, n);
1459        *setme_piece_bytes_read += n;
1460        dbgmsg (msgs, "got %"TR_PRIuSIZE" bytes for block %u:%u->%u ... %d remain",
1461               n, req->index, req->offset, req->length,
1462             (int)(req->length - evbuffer_get_length (block_buffer)));
1463        if (evbuffer_get_length (block_buffer) < req->length)
1464            return READ_LATER;
1465
1466        /* pass the block along... */
1467        err = clientGotBlock (msgs, block_buffer, req);
1468        evbuffer_drain (block_buffer, evbuffer_get_length (block_buffer));
1469
1470        /* cleanup */
1471        req->length = 0;
1472        msgs->state = AWAITING_BT_LENGTH;
1473        return err ? READ_ERR : READ_NOW;
1474    }
1475}
1476
1477static void updateDesiredRequestCount (tr_peerMsgs * msgs);
1478
1479static int
1480readBtMessage (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1481{
1482    uint32_t      ui32;
1483    uint32_t      msglen = msgs->incoming.length;
1484    const uint8_t id = msgs->incoming.id;
1485#ifndef NDEBUG
1486    const size_t  startBufLen = evbuffer_get_length (inbuf);
1487#endif
1488    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1489
1490    --msglen; /* id length */
1491
1492    dbgmsg (msgs, "got BT id %d, len %d, buffer size is %"TR_PRIuSIZE, (int)id, (int)msglen, inlen);
1493
1494    if (inlen < msglen)
1495        return READ_LATER;
1496
1497    if (!messageLengthIsCorrect (msgs, id, msglen + 1))
1498    {
1499        dbgmsg (msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen);
1500        fireError (msgs, EMSGSIZE);
1501        return READ_ERR;
1502    }
1503
1504    switch (id)
1505    {
1506        case BT_CHOKE:
1507            dbgmsg (msgs, "got Choke");
1508            msgs->client_is_choked = true;
1509            if (!fext)
1510                fireGotChoke (msgs);
1511            tr_peerMsgsUpdateActive (msgs, TR_PEER_TO_CLIENT);
1512            break;
1513
1514        case BT_UNCHOKE:
1515            dbgmsg (msgs, "got Unchoke");
1516            msgs->client_is_choked = false;
1517            tr_peerMsgsUpdateActive (msgs, TR_PEER_TO_CLIENT);
1518            updateDesiredRequestCount (msgs);
1519            break;
1520
1521        case BT_INTERESTED:
1522            dbgmsg (msgs, "got Interested");
1523            msgs->peer_is_interested = true;
1524            tr_peerMsgsUpdateActive (msgs, TR_CLIENT_TO_PEER);
1525            break;
1526
1527        case BT_NOT_INTERESTED:
1528            dbgmsg (msgs, "got Not Interested");
1529            msgs->peer_is_interested = false;
1530            tr_peerMsgsUpdateActive (msgs, TR_CLIENT_TO_PEER);
1531            break;
1532
1533        case BT_HAVE:
1534            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1535            dbgmsg (msgs, "got Have: %u", ui32);
1536            if (tr_torrentHasMetadata (msgs->torrent)
1537                    && (ui32 >= msgs->torrent->info.pieceCount))
1538            {
1539                fireError (msgs, ERANGE);
1540                return READ_ERR;
1541            }
1542
1543            /* a peer can send the same HAVE message twice... */
1544            if (!tr_bitfieldHas (&msgs->peer.have, ui32)) {
1545                tr_bitfieldAdd (&msgs->peer.have, ui32);
1546                fireClientGotHave (msgs, ui32);
1547            }
1548            updatePeerProgress (msgs);
1549            break;
1550
1551        case BT_BITFIELD: {
1552            uint8_t * tmp = tr_new (uint8_t, msglen);
1553            dbgmsg (msgs, "got a bitfield");
1554            tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1555            tr_bitfieldSetRaw (&msgs->peer.have, tmp, msglen, tr_torrentHasMetadata (msgs->torrent));
1556            fireClientGotBitfield (msgs, &msgs->peer.have);
1557            updatePeerProgress (msgs);
1558            tr_free (tmp);
1559            break;
1560        }
1561
1562        case BT_REQUEST:
1563        {
1564            struct peer_request r;
1565            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1566            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1567            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1568            dbgmsg (msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length);
1569            peerMadeRequest (msgs, &r);
1570            break;
1571        }
1572
1573        case BT_CANCEL:
1574        {
1575            int i;
1576            struct peer_request r;
1577            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1578            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1579            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1580            tr_historyAdd (&msgs->peer.cancelsSentToClient, tr_time (), 1);
1581            dbgmsg (msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length);
1582
1583            for (i=0; i<msgs->peer.pendingReqsToClient; ++i) {
1584                const struct peer_request * req = msgs->peerAskedFor + i;
1585                if ((req->index == r.index) && (req->offset == r.offset) && (req->length == r.length))
1586                    break;
1587            }
1588
1589            if (i < msgs->peer.pendingReqsToClient)
1590                tr_removeElementFromArray (msgs->peerAskedFor, i, sizeof (struct peer_request),
1591                                           msgs->peer.pendingReqsToClient--);
1592            break;
1593        }
1594
1595        case BT_PIECE:
1596            assert (0); /* handled elsewhere! */
1597            break;
1598
1599        case BT_PORT:
1600            dbgmsg (msgs, "Got a BT_PORT");
1601            tr_peerIoReadUint16 (msgs->io, inbuf, &msgs->dht_port);
1602            if (msgs->dht_port > 0)
1603                tr_dhtAddNode (getSession (msgs),
1604                               tr_peerAddress (&msgs->peer),
1605                               msgs->dht_port, 0);
1606            break;
1607
1608        case BT_FEXT_SUGGEST:
1609            dbgmsg (msgs, "Got a BT_FEXT_SUGGEST");
1610            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1611            if (fext)
1612                fireClientGotSuggest (msgs, ui32);
1613            else {
1614                fireError (msgs, EMSGSIZE);
1615                return READ_ERR;
1616            }
1617            break;
1618
1619        case BT_FEXT_ALLOWED_FAST:
1620            dbgmsg (msgs, "Got a BT_FEXT_ALLOWED_FAST");
1621            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1622            if (fext)
1623                fireClientGotAllowedFast (msgs, ui32);
1624            else {
1625                fireError (msgs, EMSGSIZE);
1626                return READ_ERR;
1627            }
1628            break;
1629
1630        case BT_FEXT_HAVE_ALL:
1631            dbgmsg (msgs, "Got a BT_FEXT_HAVE_ALL");
1632            if (fext) {
1633                tr_bitfieldSetHasAll (&msgs->peer.have);
1634assert (tr_bitfieldHasAll (&msgs->peer.have));
1635                fireClientGotHaveAll (msgs);
1636                updatePeerProgress (msgs);
1637            } else {
1638                fireError (msgs, EMSGSIZE);
1639                return READ_ERR;
1640            }
1641            break;
1642
1643        case BT_FEXT_HAVE_NONE:
1644            dbgmsg (msgs, "Got a BT_FEXT_HAVE_NONE");
1645            if (fext) {
1646                tr_bitfieldSetHasNone (&msgs->peer.have);
1647                fireClientGotHaveNone (msgs);
1648                updatePeerProgress (msgs);
1649            } else {
1650                fireError (msgs, EMSGSIZE);
1651                return READ_ERR;
1652            }
1653            break;
1654
1655        case BT_FEXT_REJECT:
1656        {
1657            struct peer_request r;
1658            dbgmsg (msgs, "Got a BT_FEXT_REJECT");
1659            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1660            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1661            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1662            if (fext)
1663                fireGotRej (msgs, &r);
1664            else {
1665                fireError (msgs, EMSGSIZE);
1666                return READ_ERR;
1667            }
1668            break;
1669        }
1670
1671        case BT_LTEP:
1672            dbgmsg (msgs, "Got a BT_LTEP");
1673            parseLtep (msgs, msglen, inbuf);
1674            break;
1675
1676        default:
1677            dbgmsg (msgs, "peer sent us an UNKNOWN: %d", (int)id);
1678            tr_peerIoDrain (msgs->io, inbuf, msglen);
1679            break;
1680    }
1681
1682    assert (msglen + 1 == msgs->incoming.length);
1683    assert (evbuffer_get_length (inbuf) == startBufLen - msglen);
1684
1685    msgs->state = AWAITING_BT_LENGTH;
1686    return READ_NOW;
1687}
1688
1689/* returns 0 on success, or an errno on failure */
1690static int
1691clientGotBlock (tr_peerMsgs                * msgs,
1692                struct evbuffer            * data,
1693                const struct peer_request  * req)
1694{
1695    int err;
1696    tr_torrent * tor = msgs->torrent;
1697    const tr_block_index_t block = _tr_block (tor, req->index, req->offset);
1698
1699    assert (msgs);
1700    assert (req);
1701
1702    if (!requestIsValid (msgs, req)) {
1703        dbgmsg (msgs, "dropping invalid block %u:%u->%u",
1704                req->index, req->offset, req->length);
1705        return EBADMSG;
1706    }
1707
1708    if (req->length != tr_torBlockCountBytes (msgs->torrent, block)) {
1709        dbgmsg (msgs, "wrong block size -- expected %u, got %d",
1710                tr_torBlockCountBytes (msgs->torrent, block), req->length);
1711        return EMSGSIZE;
1712    }
1713
1714    dbgmsg (msgs, "got block %u:%u->%u", req->index, req->offset, req->length);
1715
1716    if (!tr_peerMgrDidPeerRequest (msgs->torrent, &msgs->peer, block)) {
1717        dbgmsg (msgs, "we didn't ask for this message...");
1718        return 0;
1719    }
1720    if (tr_torrentPieceIsComplete (msgs->torrent, req->index)) {
1721        dbgmsg (msgs, "we did ask for this message, but the piece is already complete...");
1722        return 0;
1723    }
1724
1725    /**
1726    ***  Save the block
1727    **/
1728
1729    if ((err = tr_cacheWriteBlock (getSession (msgs)->cache, tor, req->index, req->offset, req->length, data)))
1730        return err;
1731
1732    tr_bitfieldAdd (&msgs->peer.blame, req->index);
1733    fireGotBlock (msgs, req);
1734    return 0;
1735}
1736
1737static int peerPulse (void * vmsgs);
1738
1739static void
1740didWrite (tr_peerIo * io UNUSED, size_t bytesWritten, bool wasPieceData, void * vmsgs)
1741{
1742    tr_peerMsgs * msgs = vmsgs;
1743
1744    if (wasPieceData)
1745      firePeerGotPieceData (msgs, bytesWritten);
1746
1747    if (tr_isPeerIo (io) && io->userData)
1748        peerPulse (msgs);
1749}
1750
1751static ReadState
1752canRead (tr_peerIo * io, void * vmsgs, size_t * piece)
1753{
1754    ReadState         ret;
1755    tr_peerMsgs *     msgs = vmsgs;
1756    struct evbuffer * in = tr_peerIoGetReadBuffer (io);
1757    const size_t      inlen = evbuffer_get_length (in);
1758
1759    dbgmsg (msgs, "canRead: inlen is %"TR_PRIuSIZE", msgs->state is %d", inlen, msgs->state);
1760
1761    if (!inlen)
1762    {
1763        ret = READ_LATER;
1764    }
1765    else if (msgs->state == AWAITING_BT_PIECE)
1766    {
1767        ret = readBtPiece (msgs, in, inlen, piece);
1768    }
1769    else switch (msgs->state)
1770    {
1771        case AWAITING_BT_LENGTH:
1772            ret = readBtLength (msgs, in, inlen); break;
1773
1774        case AWAITING_BT_ID:
1775            ret = readBtId   (msgs, in, inlen); break;
1776
1777        case AWAITING_BT_MESSAGE:
1778            ret = readBtMessage (msgs, in, inlen); break;
1779
1780        default:
1781            ret = READ_ERR;
1782            assert (0);
1783    }
1784
1785    dbgmsg (msgs, "canRead: ret is %d", (int)ret);
1786
1787    return ret;
1788}
1789
1790int
1791tr_peerMsgsIsReadingBlock (const tr_peerMsgs * msgs, tr_block_index_t block)
1792{
1793    if (msgs->state != AWAITING_BT_PIECE)
1794        return false;
1795
1796    return block == _tr_block (msgs->torrent,
1797                               msgs->incoming.blockReq.index,
1798                               msgs->incoming.blockReq.offset);
1799}
1800
1801/**
1802***
1803**/
1804
1805static void
1806updateDesiredRequestCount (tr_peerMsgs * msgs)
1807{
1808    tr_torrent * const torrent = msgs->torrent;
1809
1810    /* there are lots of reasons we might not want to request any blocks... */
1811    if (tr_torrentIsSeed (torrent) || !tr_torrentHasMetadata (torrent)
1812                                    || msgs->client_is_choked
1813                                    || !msgs->client_is_interested)
1814    {
1815        msgs->desiredRequestCount = 0;
1816    }
1817    else
1818    {
1819        int estimatedBlocksInPeriod;
1820        unsigned int rate_Bps;
1821        unsigned int irate_Bps;
1822        const int floor = 4;
1823        const int seconds = REQUEST_BUF_SECS;
1824        const uint64_t now = tr_time_msec ();
1825
1826        /* Get the rate limit we should use.
1827         * FIXME: this needs to consider all the other peers as well... */
1828        rate_Bps = tr_peerGetPieceSpeed_Bps (&msgs->peer, now, TR_PEER_TO_CLIENT);
1829        if (tr_torrentUsesSpeedLimit (torrent, TR_PEER_TO_CLIENT))
1830            rate_Bps = MIN (rate_Bps, tr_torrentGetSpeedLimit_Bps (torrent, TR_PEER_TO_CLIENT));
1831
1832        /* honor the session limits, if enabled */
1833        if (tr_torrentUsesSessionLimits (torrent) &&
1834            tr_sessionGetActiveSpeedLimit_Bps (torrent->session, TR_PEER_TO_CLIENT, &irate_Bps))
1835                rate_Bps = MIN (rate_Bps, irate_Bps);
1836
1837        /* use this desired rate to figure out how
1838         * many requests we should send to this peer */
1839        estimatedBlocksInPeriod = (rate_Bps * seconds) / torrent->blockSize;
1840        msgs->desiredRequestCount = MAX (floor, estimatedBlocksInPeriod);
1841
1842        /* honor the peer's maximum request count, if specified */
1843        if (msgs->reqq > 0)
1844            if (msgs->desiredRequestCount > msgs->reqq)
1845                msgs->desiredRequestCount = msgs->reqq;
1846    }
1847}
1848
1849static void
1850updateMetadataRequests (tr_peerMsgs * msgs, time_t now)
1851{
1852    int piece;
1853
1854    if (msgs->peerSupportsMetadataXfer
1855        && tr_torrentGetNextMetadataRequest (msgs->torrent, now, &piece))
1856    {
1857        tr_variant tmp;
1858        struct evbuffer * payload;
1859        struct evbuffer * out = msgs->outMessages;
1860
1861        /* build the data message */
1862        tr_variantInitDict (&tmp, 3);
1863        tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REQUEST);
1864        tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1865        payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1866
1867        dbgmsg (msgs, "requesting metadata piece #%d", piece);
1868
1869        /* write it out as a LTEP message to our outMessages buffer */
1870        evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1871        evbuffer_add_uint8 (out, BT_LTEP);
1872        evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1873        evbuffer_add_buffer (out, payload);
1874        pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1875        dbgOutMessageLen (msgs);
1876
1877        /* cleanup */
1878        evbuffer_free (payload);
1879        tr_variantFree (&tmp);
1880    }
1881}
1882
1883static void
1884updateBlockRequests (tr_peerMsgs * msgs)
1885{
1886    if (tr_torrentIsPieceTransferAllowed (msgs->torrent, TR_PEER_TO_CLIENT)
1887        && (msgs->desiredRequestCount > 0)
1888        && (msgs->peer.pendingReqsToPeer <= (msgs->desiredRequestCount * 0.66)))
1889    {
1890        int i;
1891        int n;
1892        tr_block_index_t * blocks;
1893        const int numwant = msgs->desiredRequestCount - msgs->peer.pendingReqsToPeer;
1894
1895        assert (tr_peerMsgsIsClientInterested (msgs));
1896        assert (!tr_peerMsgsIsClientChoked (msgs));
1897
1898        blocks = tr_new (tr_block_index_t, numwant);
1899        tr_peerMgrGetNextRequests (msgs->torrent, &msgs->peer, numwant, blocks, &n, false);
1900
1901        for (i=0; i<n; ++i)
1902        {
1903            struct peer_request req;
1904            blockToReq (msgs->torrent, blocks[i], &req);
1905            protocolSendRequest (msgs, &req);
1906        }
1907
1908        tr_free (blocks);
1909    }
1910}
1911
1912static size_t
1913fillOutputBuffer (tr_peerMsgs * msgs, time_t now)
1914{
1915    int piece;
1916    size_t bytesWritten = 0;
1917    struct peer_request req;
1918    const bool haveMessages = evbuffer_get_length (msgs->outMessages) != 0;
1919    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1920
1921    /**
1922    ***  Protocol messages
1923    **/
1924
1925    if (haveMessages && !msgs->outMessagesBatchedAt) /* fresh batch */
1926    {
1927        dbgmsg (msgs, "started an outMessages batch (length is %"TR_PRIuSIZE")", evbuffer_get_length (msgs->outMessages));
1928        msgs->outMessagesBatchedAt = now;
1929    }
1930    else if (haveMessages && ((now - msgs->outMessagesBatchedAt) >= msgs->outMessagesBatchPeriod))
1931    {
1932        const size_t len = evbuffer_get_length (msgs->outMessages);
1933        /* flush the protocol messages */
1934        dbgmsg (msgs, "flushing outMessages... to %p (length is %"TR_PRIuSIZE")", (void*)msgs->io, len);
1935        tr_peerIoWriteBuf (msgs->io, msgs->outMessages, false);
1936        msgs->clientSentAnythingAt = now;
1937        msgs->outMessagesBatchedAt = 0;
1938        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1939        bytesWritten +=  len;
1940    }
1941
1942    /**
1943    ***  Metadata Pieces
1944    **/
1945
1946    if ((tr_peerIoGetWriteBufferSpace (msgs->io, now) >= METADATA_PIECE_SIZE)
1947        && popNextMetadataRequest (msgs, &piece))
1948    {
1949        char * data;
1950        int dataLen;
1951        bool ok = false;
1952
1953        data = tr_torrentGetMetadataPiece (msgs->torrent, piece, &dataLen);
1954        if (data != NULL)
1955        {
1956            tr_variant tmp;
1957            struct evbuffer * payload;
1958            struct evbuffer * out = msgs->outMessages;
1959
1960            /* build the data message */
1961            tr_variantInitDict (&tmp, 3);
1962            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_DATA);
1963            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1964            tr_variantDictAddInt (&tmp, TR_KEY_total_size, msgs->torrent->infoDictLength);
1965            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1966
1967            /* write it out as a LTEP message to our outMessages buffer */
1968            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload) + dataLen);
1969            evbuffer_add_uint8 (out, BT_LTEP);
1970            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1971            evbuffer_add_buffer (out, payload);
1972            evbuffer_add     (out, data, dataLen);
1973            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1974            dbgOutMessageLen (msgs);
1975
1976            evbuffer_free (payload);
1977            tr_variantFree (&tmp);
1978            tr_free (data);
1979
1980            ok = true;
1981        }
1982
1983        if (!ok) /* send a rejection message */
1984        {
1985            tr_variant tmp;
1986            struct evbuffer * payload;
1987            struct evbuffer * out = msgs->outMessages;
1988
1989            /* build the rejection message */
1990            tr_variantInitDict (&tmp, 2);
1991            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
1992            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1993            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1994
1995            /* write it out as a LTEP message to our outMessages buffer */
1996            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1997            evbuffer_add_uint8 (out, BT_LTEP);
1998            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1999            evbuffer_add_buffer (out, payload);
2000            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
2001            dbgOutMessageLen (msgs);
2002
2003            evbuffer_free (payload);
2004            tr_variantFree (&tmp);
2005        }
2006    }
2007
2008    /**
2009    ***  Data Blocks
2010    **/
2011
2012    if ((tr_peerIoGetWriteBufferSpace (msgs->io, now) >= msgs->torrent->blockSize)
2013        && popNextRequest (msgs, &req))
2014    {
2015        --msgs->prefetchCount;
2016
2017        if (requestIsValid (msgs, &req)
2018            && tr_torrentPieceIsComplete (msgs->torrent, req.index))
2019        {
2020            int err;
2021            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
2022            struct evbuffer * out;
2023            struct evbuffer_iovec iovec[1];
2024
2025            out = evbuffer_new ();
2026            evbuffer_expand (out, msglen);
2027
2028            evbuffer_add_uint32 (out, sizeof (uint8_t) + 2 * sizeof (uint32_t) + req.length);
2029            evbuffer_add_uint8 (out, BT_PIECE);
2030            evbuffer_add_uint32 (out, req.index);
2031            evbuffer_add_uint32 (out, req.offset);
2032
2033            evbuffer_reserve_space (out, req.length, iovec, 1);
2034            err = tr_cacheReadBlock (getSession (msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base);
2035            iovec[0].iov_len = req.length;
2036            evbuffer_commit_space (out, iovec, 1);
2037
2038            /* check the piece if it needs checking... */
2039            if (!err && tr_torrentPieceNeedsCheck (msgs->torrent, req.index))
2040                if ((err = !tr_torrentCheckPiece (msgs->torrent, req.index)))
2041                    tr_torrentSetLocalError (msgs->torrent, _("Please Verify Local Data! Piece #%"TR_PRIuSIZE" is corrupt."), (size_t)req.index);
2042
2043            if (err)
2044            {
2045                if (fext)
2046                    protocolSendReject (msgs, &req);
2047            }
2048            else
2049            {
2050                const size_t n = evbuffer_get_length (out);
2051                dbgmsg (msgs, "sending block %u:%u->%u", req.index, req.offset, req.length);
2052                assert (n == msglen);
2053                tr_peerIoWriteBuf (msgs->io, out, true);
2054                bytesWritten += n;
2055                msgs->clientSentAnythingAt = now;
2056                tr_historyAdd (&msgs->peer.blocksSentToPeer, tr_time (), 1);
2057            }
2058
2059            evbuffer_free (out);
2060
2061            if (err)
2062            {
2063                bytesWritten = 0;
2064                msgs = NULL;
2065            }
2066        }
2067        else if (fext) /* peer needs a reject message */
2068        {
2069            protocolSendReject (msgs, &req);
2070        }
2071
2072        if (msgs != NULL)
2073            prefetchPieces (msgs);
2074    }
2075
2076    /**
2077    ***  Keepalive
2078    **/
2079
2080    if ((msgs != NULL)
2081        && (msgs->clientSentAnythingAt != 0)
2082        && ((now - msgs->clientSentAnythingAt) > KEEPALIVE_INTERVAL_SECS))
2083    {
2084        dbgmsg (msgs, "sending a keepalive message");
2085        evbuffer_add_uint32 (msgs->outMessages, 0);
2086        pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2087    }
2088
2089    return bytesWritten;
2090}
2091
2092static int
2093peerPulse (void * vmsgs)
2094{
2095    tr_peerMsgs * msgs = vmsgs;
2096    const time_t  now = tr_time ();
2097
2098    if (tr_isPeerIo (msgs->io)) {
2099        updateDesiredRequestCount (msgs);
2100        updateBlockRequests (msgs);
2101        updateMetadataRequests (msgs, now);
2102    }
2103
2104    for (;;)
2105        if (fillOutputBuffer (msgs, now) < 1)
2106            break;
2107
2108    return true; /* loop forever */
2109}
2110
2111void
2112tr_peerMsgsPulse (tr_peerMsgs * msgs)
2113{
2114    if (msgs != NULL)
2115        peerPulse (msgs);
2116}
2117
2118static void
2119gotError (tr_peerIo * io UNUSED, short what, void * vmsgs)
2120{
2121    if (what & BEV_EVENT_TIMEOUT)
2122        dbgmsg (vmsgs, "libevent got a timeout, what=%hd", what);
2123    if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR))
2124        dbgmsg (vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2125               what, errno, tr_strerror (errno));
2126    fireError (vmsgs, ENOTCONN);
2127}
2128
2129static void
2130sendBitfield (tr_peerMsgs * msgs)
2131{
2132    void * bytes;
2133    size_t byte_count = 0;
2134    struct evbuffer * out = msgs->outMessages;
2135
2136    assert (tr_torrentHasMetadata (msgs->torrent));
2137
2138    bytes = tr_torrentCreatePieceBitfield (msgs->torrent, &byte_count);
2139    evbuffer_add_uint32 (out, sizeof (uint8_t) + byte_count);
2140    evbuffer_add_uint8 (out, BT_BITFIELD);
2141    evbuffer_add     (out, bytes, byte_count);
2142    dbgmsg (msgs, "sending bitfield... outMessage size is now %"TR_PRIuSIZE, evbuffer_get_length (out));
2143    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2144
2145    tr_free (bytes);
2146}
2147
2148static void
2149tellPeerWhatWeHave (tr_peerMsgs * msgs)
2150{
2151    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
2152
2153    if (fext && tr_torrentHasAll (msgs->torrent))
2154    {
2155        protocolSendHaveAll (msgs);
2156    }
2157    else if (fext && tr_torrentHasNone (msgs->torrent))
2158    {
2159        protocolSendHaveNone (msgs);
2160    }
2161    else if (!tr_torrentHasNone (msgs->torrent))
2162    {
2163        sendBitfield (msgs);
2164    }
2165}
2166
2167/**
2168***
2169**/
2170
2171/* some peers give us error messages if we send
2172   more than this many peers in a single pex message
2173   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2174#define MAX_PEX_ADDED 50
2175#define MAX_PEX_DROPPED 50
2176
2177typedef struct
2178{
2179    tr_pex *  added;
2180    tr_pex *  dropped;
2181    tr_pex *  elements;
2182    int       addedCount;
2183    int       droppedCount;
2184    int       elementCount;
2185}
2186PexDiffs;
2187
2188static void
2189pexAddedCb (void * vpex, void * userData)
2190{
2191    PexDiffs * diffs = userData;
2192    tr_pex *   pex = vpex;
2193
2194    if (diffs->addedCount < MAX_PEX_ADDED)
2195    {
2196        diffs->added[diffs->addedCount++] = *pex;
2197        diffs->elements[diffs->elementCount++] = *pex;
2198    }
2199}
2200
2201static inline void
2202pexDroppedCb (void * vpex, void * userData)
2203{
2204    PexDiffs * diffs = userData;
2205    tr_pex *   pex = vpex;
2206
2207    if (diffs->droppedCount < MAX_PEX_DROPPED)
2208    {
2209        diffs->dropped[diffs->droppedCount++] = *pex;
2210    }
2211}
2212
2213static inline void
2214pexElementCb (void * vpex, void * userData)
2215{
2216    PexDiffs * diffs = userData;
2217    tr_pex * pex = vpex;
2218
2219    diffs->elements[diffs->elementCount++] = *pex;
2220}
2221
2222typedef void (tr_set_func)(void * element, void * userData);
2223
2224/**
2225 * @brief find the differences and commonalities in two sorted sets
2226 * @param a the first set
2227 * @param aCount the number of elements in the set 'a'
2228 * @param b the second set
2229 * @param bCount the number of elements in the set 'b'
2230 * @param compare the sorting method for both sets
2231 * @param elementSize the sizeof the element in the two sorted sets
2232 * @param in_a called for items in set 'a' but not set 'b'
2233 * @param in_b called for items in set 'b' but not set 'a'
2234 * @param in_both called for items that are in both sets
2235 * @param userData user data passed along to in_a, in_b, and in_both
2236 */
2237static void
2238tr_set_compare (const void * va, size_t aCount,
2239                const void * vb, size_t bCount,
2240                int compare (const void * a, const void * b),
2241                size_t elementSize,
2242                tr_set_func in_a_cb,
2243                tr_set_func in_b_cb,
2244                tr_set_func in_both_cb,
2245                void * userData)
2246{
2247    const uint8_t * a = va;
2248    const uint8_t * b = vb;
2249    const uint8_t * aend = a + elementSize * aCount;
2250    const uint8_t * bend = b + elementSize * bCount;
2251
2252    while (a != aend || b != bend)
2253    {
2254        if (a == aend)
2255        {
2256          (*in_b_cb)((void*)b, userData);
2257            b += elementSize;
2258        }
2259        else if (b == bend)
2260        {
2261          (*in_a_cb)((void*)a, userData);
2262            a += elementSize;
2263        }
2264        else
2265        {
2266            const int val = (*compare)(a, b);
2267
2268            if (!val)
2269            {
2270              (*in_both_cb)((void*)a, userData);
2271                a += elementSize;
2272                b += elementSize;
2273            }
2274            else if (val < 0)
2275            {
2276              (*in_a_cb)((void*)a, userData);
2277                a += elementSize;
2278            }
2279            else if (val > 0)
2280            {
2281              (*in_b_cb)((void*)b, userData);
2282                b += elementSize;
2283            }
2284        }
2285    }
2286}
2287
2288
2289static void
2290sendPex (tr_peerMsgs * msgs)
2291{
2292    if (msgs->peerSupportsPex && tr_torrentAllowsPex (msgs->torrent))
2293    {
2294        PexDiffs diffs;
2295        PexDiffs diffs6;
2296        tr_pex * newPex = NULL;
2297        tr_pex * newPex6 = NULL;
2298        const int newCount = tr_peerMgrGetPeers (msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2299        const int newCount6 = tr_peerMgrGetPeers (msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2300
2301        /* build the diffs */
2302        diffs.added = tr_new (tr_pex, newCount);
2303        diffs.addedCount = 0;
2304        diffs.dropped = tr_new (tr_pex, msgs->pexCount);
2305        diffs.droppedCount = 0;
2306        diffs.elements = tr_new (tr_pex, newCount + msgs->pexCount);
2307        diffs.elementCount = 0;
2308        tr_set_compare (msgs->pex, msgs->pexCount,
2309                        newPex, newCount,
2310                        tr_pexCompare, sizeof (tr_pex),
2311                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs);
2312        diffs6.added = tr_new (tr_pex, newCount6);
2313        diffs6.addedCount = 0;
2314        diffs6.dropped = tr_new (tr_pex, msgs->pexCount6);
2315        diffs6.droppedCount = 0;
2316        diffs6.elements = tr_new (tr_pex, newCount6 + msgs->pexCount6);
2317        diffs6.elementCount = 0;
2318        tr_set_compare (msgs->pex6, msgs->pexCount6,
2319                        newPex6, newCount6,
2320                        tr_pexCompare, sizeof (tr_pex),
2321                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6);
2322        dbgmsg (
2323            msgs,
2324            "pex: old peer count %d+%d, new peer count %d+%d, "
2325            "added %d+%d, removed %d+%d",
2326            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2327            diffs.addedCount, diffs6.addedCount,
2328            diffs.droppedCount, diffs6.droppedCount);
2329
2330        if (!diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2331            !diffs6.droppedCount)
2332        {
2333            tr_free (diffs.elements);
2334            tr_free (diffs6.elements);
2335        }
2336        else
2337        {
2338            int  i;
2339            tr_variant val;
2340            uint8_t * tmp, *walk;
2341            struct evbuffer * payload;
2342            struct evbuffer * out = msgs->outMessages;
2343
2344            /* update peer */
2345            tr_free (msgs->pex);
2346            msgs->pex = diffs.elements;
2347            msgs->pexCount = diffs.elementCount;
2348            tr_free (msgs->pex6);
2349            msgs->pex6 = diffs6.elements;
2350            msgs->pexCount6 = diffs6.elementCount;
2351
2352            /* build the pex payload */
2353            tr_variantInitDict (&val, 3); /* ipv6 support: left as 3:
2354                                         * speed vs. likelihood? */
2355
2356            if (diffs.addedCount > 0)
2357            {
2358                /* "added" */
2359                tmp = walk = tr_new (uint8_t, diffs.addedCount * 6);
2360                for (i = 0; i < diffs.addedCount; ++i) {
2361                    memcpy (walk, &diffs.added[i].addr.addr, 4); walk += 4;
2362                    memcpy (walk, &diffs.added[i].port, 2); walk += 2;
2363                }
2364                assert ((walk - tmp) == diffs.addedCount * 6);
2365                tr_variantDictAddRaw (&val, TR_KEY_added, tmp, walk - tmp);
2366                tr_free (tmp);
2367
2368                /* "added.f"
2369                 * unset each holepunch flag because we don't support it. */
2370                tmp = walk = tr_new (uint8_t, diffs.addedCount);
2371                for (i = 0; i < diffs.addedCount; ++i)
2372                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2373                assert ((walk - tmp) == diffs.addedCount);
2374                tr_variantDictAddRaw (&val, TR_KEY_added_f, tmp, walk - tmp);
2375                tr_free (tmp);
2376            }
2377
2378            if (diffs.droppedCount > 0)
2379            {
2380                /* "dropped" */
2381                tmp = walk = tr_new (uint8_t, diffs.droppedCount * 6);
2382                for (i = 0; i < diffs.droppedCount; ++i) {
2383                    memcpy (walk, &diffs.dropped[i].addr.addr, 4); walk += 4;
2384                    memcpy (walk, &diffs.dropped[i].port, 2); walk += 2;
2385                }
2386                assert ((walk - tmp) == diffs.droppedCount * 6);
2387                tr_variantDictAddRaw (&val, TR_KEY_dropped, tmp, walk - tmp);
2388                tr_free (tmp);
2389            }
2390
2391            if (diffs6.addedCount > 0)
2392            {
2393                /* "added6" */
2394                tmp = walk = tr_new (uint8_t, diffs6.addedCount * 18);
2395                for (i = 0; i < diffs6.addedCount; ++i) {
2396                    memcpy (walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16);
2397                    walk += 16;
2398                    memcpy (walk, &diffs6.added[i].port, 2);
2399                    walk += 2;
2400                }
2401                assert ((walk - tmp) == diffs6.addedCount * 18);
2402                tr_variantDictAddRaw (&val, TR_KEY_added6, tmp, walk - tmp);
2403                tr_free (tmp);
2404
2405                /* "added6.f"
2406                 * unset each holepunch flag because we don't support it. */
2407                tmp = walk = tr_new (uint8_t, diffs6.addedCount);
2408                for (i = 0; i < diffs6.addedCount; ++i)
2409                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2410                assert ((walk - tmp) == diffs6.addedCount);
2411                tr_variantDictAddRaw (&val, TR_KEY_added6_f, tmp, walk - tmp);
2412                tr_free (tmp);
2413            }
2414
2415            if (diffs6.droppedCount > 0)
2416            {
2417                /* "dropped6" */
2418                tmp = walk = tr_new (uint8_t, diffs6.droppedCount * 18);
2419                for (i = 0; i < diffs6.droppedCount; ++i) {
2420                    memcpy (walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16);
2421                    walk += 16;
2422                    memcpy (walk, &diffs6.dropped[i].port, 2);
2423                    walk += 2;
2424                }
2425                assert ((walk - tmp) == diffs6.droppedCount * 18);
2426                tr_variantDictAddRaw (&val, TR_KEY_dropped6, tmp, walk - tmp);
2427                tr_free (tmp);
2428            }
2429
2430            /* write the pex message */
2431            payload = tr_variantToBuf (&val, TR_VARIANT_FMT_BENC);
2432            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
2433            evbuffer_add_uint8 (out, BT_LTEP);
2434            evbuffer_add_uint8 (out, msgs->ut_pex_id);
2435            evbuffer_add_buffer (out, payload);
2436            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
2437            dbgmsg (msgs, "sending a pex message; outMessage size is now %"TR_PRIuSIZE, evbuffer_get_length (out));
2438            dbgOutMessageLen (msgs);
2439
2440            evbuffer_free (payload);
2441            tr_variantFree (&val);
2442        }
2443
2444        /* cleanup */
2445        tr_free (diffs.added);
2446        tr_free (diffs.dropped);
2447        tr_free (newPex);
2448        tr_free (diffs6.added);
2449        tr_free (diffs6.dropped);
2450        tr_free (newPex6);
2451
2452        /*msgs->clientSentPexAt = tr_time ();*/
2453    }
2454}
2455
2456static void
2457pexPulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmsgs)
2458{
2459    struct tr_peerMsgs * msgs = vmsgs;
2460
2461    sendPex (msgs);
2462
2463    assert (msgs->pexTimer != NULL);
2464    tr_timerAdd (msgs->pexTimer, PEX_INTERVAL_SECS, 0);
2465}
2466
2467/***
2468****  tr_peer virtual functions
2469***/
2470
2471static bool
2472peermsgs_is_transferring_pieces (const struct tr_peer * peer,
2473                                 uint64_t               now,
2474                                 tr_direction           direction,
2475                                 unsigned int         * setme_Bps)
2476{
2477  unsigned int Bps = 0;
2478
2479  if (tr_isPeerMsgs (peer))
2480    {
2481      const tr_peerMsgs * msgs = (const tr_peerMsgs *) peer;
2482      Bps = tr_peerIoGetPieceSpeed_Bps (msgs->io, now, direction);
2483    }
2484
2485  if (setme_Bps != NULL)
2486    *setme_Bps = Bps;
2487
2488  return Bps > 0;
2489}
2490
2491static void
2492peermsgs_destruct (tr_peer * peer)
2493{
2494  tr_peerMsgs * msgs = PEER_MSGS (peer);
2495
2496  assert (msgs != NULL);
2497
2498  tr_peerMsgsSetActive (msgs, TR_UP, false);
2499  tr_peerMsgsSetActive (msgs, TR_DOWN, false);
2500
2501  if (msgs->pexTimer != NULL)
2502    event_free (msgs->pexTimer);
2503
2504  if (msgs->incoming.block != NULL)
2505    evbuffer_free (msgs->incoming.block);
2506
2507  if (msgs->io)
2508    {
2509      tr_peerIoClear (msgs->io);
2510      tr_peerIoUnref (msgs->io); /* balanced by the ref in handshakeDoneCB () */
2511    }
2512
2513  evbuffer_free (msgs->outMessages);
2514  tr_free (msgs->pex6);
2515  tr_free (msgs->pex);
2516
2517  tr_peerDestruct (&msgs->peer);
2518
2519  memset (msgs, ~0, sizeof (tr_peerMsgs));
2520}
2521
2522static const struct tr_peer_virtual_funcs my_funcs =
2523{
2524  .destruct = peermsgs_destruct,
2525  .is_transferring_pieces = peermsgs_is_transferring_pieces
2526};
2527
2528/***
2529****
2530***/
2531
2532time_t
2533tr_peerMsgsGetConnectionAge (const tr_peerMsgs * msgs)
2534{
2535  assert (tr_isPeerMsgs (msgs));
2536
2537  return tr_peerIoGetAge (msgs->io);
2538}
2539
2540bool
2541tr_peerMsgsIsPeerChoked (const tr_peerMsgs * msgs)
2542{
2543  assert (tr_isPeerMsgs (msgs));
2544
2545  return msgs->peer_is_choked;
2546}
2547
2548bool
2549tr_peerMsgsIsPeerInterested (const tr_peerMsgs * msgs)
2550{
2551  assert (tr_isPeerMsgs (msgs));
2552
2553  return msgs->peer_is_interested;
2554}
2555
2556bool
2557tr_peerMsgsIsClientChoked (const tr_peerMsgs * msgs)
2558{
2559  assert (tr_isPeerMsgs (msgs));
2560
2561  return msgs->client_is_choked;
2562}
2563
2564bool
2565tr_peerMsgsIsClientInterested (const tr_peerMsgs * msgs)
2566{
2567  assert (tr_isPeerMsgs (msgs));
2568
2569  return msgs->client_is_interested;
2570}
2571
2572bool
2573tr_peerMsgsIsUtpConnection (const tr_peerMsgs * msgs)
2574{
2575  assert (tr_isPeerMsgs (msgs));
2576
2577  return msgs->io->utp_socket != NULL;
2578}
2579
2580bool
2581tr_peerMsgsIsEncrypted (const tr_peerMsgs * msgs)
2582{
2583  assert (tr_isPeerMsgs (msgs));
2584
2585  return tr_peerIoIsEncrypted (msgs->io);
2586}
2587
2588bool
2589tr_peerMsgsIsIncomingConnection (const tr_peerMsgs * msgs)
2590{
2591  assert (tr_isPeerMsgs (msgs));
2592
2593  return tr_peerIoIsIncoming (msgs->io);
2594}
2595
2596/***
2597****
2598***/
2599
2600bool
2601tr_isPeerMsgs (const void * msgs)
2602{
2603  /* FIXME: this is pretty crude */
2604  return (msgs != NULL)
2605      && (((struct tr_peerMsgs*)msgs)->magic_number == MAGIC_NUMBER);
2606}
2607
2608tr_peerMsgs *
2609tr_peerMsgsCast (void * vm)
2610{
2611  return tr_isPeerMsgs(vm) ? vm : NULL;
2612}
2613
2614tr_peerMsgs *
2615tr_peerMsgsNew (struct tr_torrent    * torrent,
2616                struct tr_peerIo     * io,
2617                tr_peer_callback       callback,
2618                void                 * callbackData)
2619{
2620  tr_peerMsgs * m;
2621
2622  assert (io != NULL);
2623
2624  m = tr_new0 (tr_peerMsgs, 1);
2625
2626  tr_peerConstruct (&m->peer, torrent);
2627  m->peer.funcs = &my_funcs;
2628
2629  m->magic_number = MAGIC_NUMBER;
2630  m->client_is_choked = true;
2631  m->peer_is_choked = true;
2632  m->client_is_interested = false;
2633  m->peer_is_interested = false;
2634  m->is_active[TR_UP] = false;
2635  m->is_active[TR_DOWN] = false;
2636  m->callback = callback;
2637  m->callbackData = callbackData;
2638  m->io = io;
2639  m->torrent = torrent;
2640  m->state = AWAITING_BT_LENGTH;
2641  m->outMessages = evbuffer_new ();
2642  m->outMessagesBatchedAt = 0;
2643  m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2644
2645  if (tr_torrentAllowsPex (torrent))
2646    {
2647      m->pexTimer = evtimer_new (torrent->session->event_base, pexPulse, m);
2648      tr_timerAdd (m->pexTimer, PEX_INTERVAL_SECS, 0);
2649    }
2650
2651  if (tr_peerIoSupportsUTP (m->io))
2652    {
2653      const tr_address * addr = tr_peerIoGetAddress (m->io, NULL);
2654      tr_peerMgrSetUtpSupported (torrent, addr);
2655      tr_peerMgrSetUtpFailed (torrent, addr, false);
2656    }
2657
2658  if (tr_peerIoSupportsLTEP (m->io))
2659    sendLtepHandshake (m);
2660
2661  tellPeerWhatWeHave (m);
2662
2663  if (tr_dhtEnabled (torrent->session) && tr_peerIoSupportsDHT (m->io))
2664    {
2665      /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2666      const struct tr_address *addr = tr_peerIoGetAddress (m->io, NULL);
2667      if (addr->type == TR_AF_INET || tr_globalIPv6 ())
2668        protocolSendPort (m, tr_dhtPort (torrent->session));
2669    }
2670
2671  tr_peerIoSetIOFuncs (m->io, canRead, didWrite, gotError, m);
2672  updateDesiredRequestCount (m);
2673
2674  return m;
2675}
Note: See TracBrowser for help on using the repository browser.