source: branches/2.8x/libtransmission/peer-msgs.c @ 14302

Last change on this file since 14302 was 14302, checked in by jordan, 8 years ago

Fix peer communication vulnerability (no known exploits) reported by Ben Hawkes

  • Property svn:keywords set to Date Rev Author Id
File size: 75.4 KB
Line 
1/*
2 * This file Copyright (C) 2007-2014 Mnemosyne LLC
3 *
4 * It may be used under the GNU GPL versions 2 or 3
5 * or any future license endorsed by Mnemosyne LLC.
6 *
7 * $Id: peer-msgs.c 14302 2014-06-29 01:42:38Z jordan $
8 */
9
10#include <assert.h>
11#include <errno.h>
12#include <stdarg.h>
13#include <stdlib.h>
14#include <string.h>
15
16#include <event2/buffer.h>
17#include <event2/bufferevent.h>
18#include <event2/event.h>
19
20#include "transmission.h"
21#include "cache.h"
22#include "completion.h"
23#include "crypto.h" /* tr_sha1 () */
24#include "log.h"
25#include "peer-io.h"
26#include "peer-mgr.h"
27#include "peer-msgs.h"
28#include "session.h"
29#include "torrent.h"
30#include "torrent-magnet.h"
31#include "tr-dht.h"
32#include "utils.h"
33#include "variant.h"
34#include "version.h"
35
36#ifndef EBADMSG
37 #define EBADMSG EINVAL
38#endif
39
40/**
41***
42**/
43
44enum
45{
46  BT_CHOKE                = 0,
47  BT_UNCHOKE              = 1,
48  BT_INTERESTED           = 2,
49  BT_NOT_INTERESTED       = 3,
50  BT_HAVE                 = 4,
51  BT_BITFIELD             = 5,
52  BT_REQUEST              = 6,
53  BT_PIECE                = 7,
54  BT_CANCEL               = 8,
55  BT_PORT                 = 9,
56
57  BT_FEXT_SUGGEST         = 13,
58  BT_FEXT_HAVE_ALL        = 14,
59  BT_FEXT_HAVE_NONE       = 15,
60  BT_FEXT_REJECT          = 16,
61  BT_FEXT_ALLOWED_FAST    = 17,
62
63  BT_LTEP                 = 20,
64
65  LTEP_HANDSHAKE          = 0,
66
67  UT_PEX_ID               = 1,
68  UT_METADATA_ID          = 3,
69
70  MAX_PEX_PEER_COUNT      = 50,
71
72  MIN_CHOKE_PERIOD_SEC    = 10,
73
74  /* idle seconds before we send a keepalive */
75  KEEPALIVE_INTERVAL_SECS = 100,
76
77  PEX_INTERVAL_SECS       = 90, /* sec between sendPex () calls */
78
79  REQQ                    = 512,
80
81  METADATA_REQQ           = 64,
82
83  MAGIC_NUMBER            = 21549,
84
85  /* used in lowering the outMessages queue period */
86  IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
87  HIGH_PRIORITY_INTERVAL_SECS = 2,
88  LOW_PRIORITY_INTERVAL_SECS = 10,
89
90  /* number of pieces we'll allow in our fast set */
91  MAX_FAST_SET_SIZE = 3,
92
93  /* how many blocks to keep prefetched per peer */
94  PREFETCH_SIZE = 18,
95
96  /* when we're making requests from another peer,
97     batch them together to send enough requests to
98     meet our bandwidth goals for the next N seconds */
99  REQUEST_BUF_SECS = 10,
100
101  /* defined in BEP #9 */
102  METADATA_MSG_TYPE_REQUEST = 0,
103  METADATA_MSG_TYPE_DATA = 1,
104  METADATA_MSG_TYPE_REJECT = 2
105};
106
107enum
108{
109  AWAITING_BT_LENGTH,
110  AWAITING_BT_ID,
111  AWAITING_BT_MESSAGE,
112  AWAITING_BT_PIECE
113};
114
115typedef enum
116{
117  ENCRYPTION_PREFERENCE_UNKNOWN,
118  ENCRYPTION_PREFERENCE_YES,
119  ENCRYPTION_PREFERENCE_NO
120}
121encryption_preference_t;
122
123/**
124***
125**/
126
127struct peer_request
128{
129  uint32_t index;
130  uint32_t offset;
131  uint32_t length;
132};
133
134static void
135blockToReq (const tr_torrent     * tor,
136            tr_block_index_t       block,
137            struct peer_request  * setme)
138{
139  tr_torrentGetBlockLocation (tor, block, &setme->index,
140                                          &setme->offset,
141                                          &setme->length);
142}
143
144/**
145***
146**/
147
148/* this is raw, unchanged data from the peer regarding
149 * the current message that it's sending us. */
150struct tr_incoming
151{
152  uint8_t                id;
153  uint32_t               length; /* includes the +1 for id length */
154  struct peer_request    blockReq; /* metadata for incoming blocks */
155  struct evbuffer      * block; /* piece data for incoming blocks */
156};
157
158/**
159 * Low-level communication state information about a connected peer.
160 *
161 * This structure remembers the low-level protocol states that we're
162 * in with this peer, such as active requests, pex messages, and so on.
163 * Its fields are all private to peer-msgs.c.
164 *
165 * Data not directly involved with sending & receiving messages is
166 * stored in tr_peer, where it can be accessed by both peermsgs and
167 * the peer manager.
168 *
169 * @see struct peer_atom
170 * @see tr_peer
171 */
172struct tr_peerMsgs
173{
174  struct tr_peer peer; /* parent */
175
176  uint16_t magic_number;
177
178  /* Whether or not we've choked this peer. */
179  bool peer_is_choked;
180
181  /* whether or not the peer has indicated it will download from us. */
182  bool peer_is_interested;
183
184  /* whether or the peer is choking us. */
185  bool client_is_choked;
186
187  /* whether or not we've indicated to the peer that we would download from them if unchoked. */
188  bool client_is_interested;
189
190
191  bool peerSupportsPex;
192  bool peerSupportsMetadataXfer;
193  bool clientSentLtepHandshake;
194  bool peerSentLtepHandshake;
195
196  /*bool haveFastSet;*/
197
198  int desiredRequestCount;
199
200  int prefetchCount;
201
202  int is_active[2];
203
204  /* how long the outMessages batch should be allowed to grow before
205   * it's flushed -- some messages (like requests >:) should be sent
206   * very quickly; others aren't as urgent. */
207  int8_t          outMessagesBatchPeriod;
208
209  uint8_t         state;
210  uint8_t         ut_pex_id;
211  uint8_t         ut_metadata_id;
212  uint16_t        pexCount;
213  uint16_t        pexCount6;
214
215  tr_port         dht_port;
216
217  encryption_preference_t  encryption_preference;
218
219  size_t                   metadata_size_hint;
220#if 0
221  size_t                 fastsetSize;
222  tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
223#endif
224
225  tr_torrent *           torrent;
226
227  tr_peer_callback        callback;
228  void                  * callbackData;
229
230  struct evbuffer *      outMessages; /* all the non-piece messages */
231
232  struct peer_request    peerAskedFor[REQQ];
233
234  int peerAskedForMetadata[METADATA_REQQ];
235  int peerAskedForMetadataCount;
236
237  tr_pex * pex;
238  tr_pex * pex6;
239
240  /*time_t clientSentPexAt;*/
241  time_t clientSentAnythingAt;
242
243  time_t chokeChangedAt;
244
245  /* when we started batching the outMessages */
246  time_t outMessagesBatchedAt;
247
248  struct tr_incoming    incoming;
249
250  /* if the peer supports the Extension Protocol in BEP 10 and
251     supplied a reqq argument, it's stored here. Otherwise, the
252     value is zero and should be ignored. */
253  int64_t reqq;
254
255  struct event * pexTimer;
256
257  struct tr_peerIo * io;
258};
259
260/**
261***
262**/
263
264static inline tr_session*
265getSession (struct tr_peerMsgs * msgs)
266{
267  return msgs->torrent->session;
268}
269
270/**
271***
272**/
273
274static void
275myDebug (const char * file, int line,
276         const struct tr_peerMsgs * msgs,
277         const char * fmt, ...) TR_GNUC_PRINTF(4, 5);
278
279static void
280myDebug (const char * file, int line,
281         const struct tr_peerMsgs * msgs,
282         const char * fmt, ...)
283{
284  FILE * fp = tr_logGetFile ();
285
286  if (fp)
287    {
288      va_list           args;
289      char              timestr[64];
290      struct evbuffer * buf = evbuffer_new ();
291      char *            base = tr_basename (file);
292      char *            message;
293
294      evbuffer_add_printf (buf, "[%s] %s - %s [%s]: ",
295                           tr_logGetTimeStr (timestr, sizeof (timestr)),
296                           tr_torrentName (msgs->torrent),
297                           tr_peerIoGetAddrStr (msgs->io),
298                           tr_quark_get_string (msgs->peer.client, NULL));
299      va_start (args, fmt);
300      evbuffer_add_vprintf (buf, fmt, args);
301      va_end (args);
302      evbuffer_add_printf (buf, " (%s:%d)\n", base, line);
303
304      message = evbuffer_free_to_str (buf);
305      fputs (message, fp);
306
307      tr_free (base);
308      tr_free (message);
309    }
310}
311
312#define dbgmsg(msgs, ...) \
313  do \
314    { \
315      if (tr_logGetDeepEnabled ()) \
316        myDebug (__FILE__, __LINE__, msgs, __VA_ARGS__); \
317    } \
318  while (0)
319
320/**
321***
322**/
323
324static void
325pokeBatchPeriod (tr_peerMsgs * msgs, int interval)
326{
327  if (msgs->outMessagesBatchPeriod > interval)
328    {
329      msgs->outMessagesBatchPeriod = interval;
330      dbgmsg (msgs, "lowering batch interval to %d seconds", interval);
331    }
332}
333
334static void
335dbgOutMessageLen (tr_peerMsgs * msgs)
336{
337  dbgmsg (msgs, "outMessage size is now %"TR_PRIuSIZE, evbuffer_get_length (msgs->outMessages));
338}
339
340static void
341protocolSendReject (tr_peerMsgs * msgs, const struct peer_request * req)
342{
343  struct evbuffer * out = msgs->outMessages;
344
345  assert (tr_peerIoSupportsFEXT (msgs->io));
346
347  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
348  evbuffer_add_uint8 (out, BT_FEXT_REJECT);
349  evbuffer_add_uint32 (out, req->index);
350  evbuffer_add_uint32 (out, req->offset);
351  evbuffer_add_uint32 (out, req->length);
352
353  dbgmsg (msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length);
354  dbgOutMessageLen (msgs);
355}
356
357static void
358protocolSendRequest (tr_peerMsgs * msgs, const struct peer_request * req)
359{
360  struct evbuffer * out = msgs->outMessages;
361
362  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
363  evbuffer_add_uint8 (out, BT_REQUEST);
364  evbuffer_add_uint32 (out, req->index);
365  evbuffer_add_uint32 (out, req->offset);
366  evbuffer_add_uint32 (out, req->length);
367
368  dbgmsg (msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length);
369  dbgOutMessageLen (msgs);
370  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
371}
372
373static void
374protocolSendCancel (tr_peerMsgs * msgs, const struct peer_request * req)
375{
376  struct evbuffer * out = msgs->outMessages;
377
378  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
379  evbuffer_add_uint8 (out, BT_CANCEL);
380  evbuffer_add_uint32 (out, req->index);
381  evbuffer_add_uint32 (out, req->offset);
382  evbuffer_add_uint32 (out, req->length);
383
384  dbgmsg (msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length);
385  dbgOutMessageLen (msgs);
386  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
387}
388
389static void
390protocolSendPort (tr_peerMsgs *msgs, uint16_t port)
391{
392  struct evbuffer * out = msgs->outMessages;
393
394  dbgmsg (msgs, "sending Port %u", port);
395  evbuffer_add_uint32 (out, 3);
396  evbuffer_add_uint8 (out, BT_PORT);
397  evbuffer_add_uint16 (out, port);
398}
399
400static void
401protocolSendHave (tr_peerMsgs * msgs, uint32_t index)
402{
403  struct evbuffer * out = msgs->outMessages;
404
405  evbuffer_add_uint32 (out, sizeof (uint8_t) + sizeof (uint32_t));
406  evbuffer_add_uint8 (out, BT_HAVE);
407  evbuffer_add_uint32 (out, index);
408
409  dbgmsg (msgs, "sending Have %u", index);
410  dbgOutMessageLen (msgs);
411  pokeBatchPeriod (msgs, LOW_PRIORITY_INTERVAL_SECS);
412}
413
414#if 0
415static void
416protocolSendAllowedFast (tr_peerMsgs * msgs, uint32_t pieceIndex)
417{
418  tr_peerIo       * io  = msgs->io;
419  struct evbuffer * out = msgs->outMessages;
420
421  assert (tr_peerIoSupportsFEXT (msgs->io));
422
423  evbuffer_add_uint32 (io, out, sizeof (uint8_t) + sizeof (uint32_t));
424  evbuffer_add_uint8 (io, out, BT_FEXT_ALLOWED_FAST);
425  evbuffer_add_uint32 (io, out, pieceIndex);
426
427  dbgmsg (msgs, "sending Allowed Fast %u...", pieceIndex);
428  dbgOutMessageLen (msgs);
429}
430#endif
431
432static void
433protocolSendChoke (tr_peerMsgs * msgs, int choke)
434{
435  struct evbuffer * out = msgs->outMessages;
436
437  evbuffer_add_uint32 (out, sizeof (uint8_t));
438  evbuffer_add_uint8 (out, choke ? BT_CHOKE : BT_UNCHOKE);
439
440  dbgmsg (msgs, "sending %s...", choke ? "Choke" : "Unchoke");
441  dbgOutMessageLen (msgs);
442  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
443}
444
445static void
446protocolSendHaveAll (tr_peerMsgs * msgs)
447{
448  struct evbuffer * out = msgs->outMessages;
449
450  assert (tr_peerIoSupportsFEXT (msgs->io));
451
452  evbuffer_add_uint32 (out, sizeof (uint8_t));
453  evbuffer_add_uint8 (out, BT_FEXT_HAVE_ALL);
454
455  dbgmsg (msgs, "sending HAVE_ALL...");
456  dbgOutMessageLen (msgs);
457  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
458}
459
460static void
461protocolSendHaveNone (tr_peerMsgs * msgs)
462{
463  struct evbuffer * out = msgs->outMessages;
464
465  assert (tr_peerIoSupportsFEXT (msgs->io));
466
467  evbuffer_add_uint32 (out, sizeof (uint8_t));
468  evbuffer_add_uint8 (out, BT_FEXT_HAVE_NONE);
469
470  dbgmsg (msgs, "sending HAVE_NONE...");
471  dbgOutMessageLen (msgs);
472  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
473}
474
475/**
476***  EVENTS
477**/
478
479static void
480publish (tr_peerMsgs * msgs, tr_peer_event * e)
481{
482  if (msgs->callback != NULL)
483    (*msgs->callback) (&msgs->peer, e, msgs->callbackData);
484}
485
486static void
487fireError (tr_peerMsgs * msgs, int err)
488{
489  tr_peer_event e = TR_PEER_EVENT_INIT;
490  e.eventType = TR_PEER_ERROR;
491  e.err = err;
492  publish (msgs, &e);
493}
494
495static void
496fireGotBlock (tr_peerMsgs * msgs, const struct peer_request * req)
497{
498  tr_peer_event e = TR_PEER_EVENT_INIT;
499  e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
500  e.pieceIndex = req->index;
501  e.offset = req->offset;
502  e.length = req->length;
503  publish (msgs, &e);
504}
505
506static void
507fireGotRej (tr_peerMsgs * msgs, const struct peer_request * req)
508{
509  tr_peer_event e = TR_PEER_EVENT_INIT;
510  e.eventType = TR_PEER_CLIENT_GOT_REJ;
511  e.pieceIndex = req->index;
512  e.offset = req->offset;
513  e.length = req->length;
514  publish (msgs, &e);
515}
516
517static void
518fireGotChoke (tr_peerMsgs * msgs)
519{
520  tr_peer_event e = TR_PEER_EVENT_INIT;
521  e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
522  publish (msgs, &e);
523}
524
525static void
526fireClientGotHaveAll (tr_peerMsgs * msgs)
527{
528  tr_peer_event e = TR_PEER_EVENT_INIT;
529  e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
530  publish (msgs, &e);
531}
532
533static void
534fireClientGotHaveNone (tr_peerMsgs * msgs)
535{
536  tr_peer_event e = TR_PEER_EVENT_INIT;
537  e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
538  publish (msgs, &e);
539}
540
541static void
542fireClientGotPieceData (tr_peerMsgs * msgs, uint32_t length)
543{
544  tr_peer_event e = TR_PEER_EVENT_INIT;
545  e.length = length;
546  e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA;
547  publish (msgs, &e);
548}
549
550static void
551firePeerGotPieceData (tr_peerMsgs * msgs, uint32_t length)
552{
553  tr_peer_event e = TR_PEER_EVENT_INIT;
554  e.length = length;
555  e.eventType = TR_PEER_PEER_GOT_PIECE_DATA;
556  publish (msgs, &e);
557}
558
559static void
560fireClientGotSuggest (tr_peerMsgs * msgs, uint32_t pieceIndex)
561{
562  tr_peer_event e = TR_PEER_EVENT_INIT;
563  e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
564  e.pieceIndex = pieceIndex;
565  publish (msgs, &e);
566}
567
568static void
569fireClientGotPort (tr_peerMsgs * msgs, tr_port port)
570{
571  tr_peer_event e = TR_PEER_EVENT_INIT;
572  e.eventType = TR_PEER_CLIENT_GOT_PORT;
573  e.port = port;
574  publish (msgs, &e);
575}
576
577static void
578fireClientGotAllowedFast (tr_peerMsgs * msgs, uint32_t pieceIndex)
579{
580  tr_peer_event e = TR_PEER_EVENT_INIT;
581  e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
582  e.pieceIndex = pieceIndex;
583  publish (msgs, &e);
584}
585
586static void
587fireClientGotBitfield (tr_peerMsgs * msgs, tr_bitfield * bitfield)
588{
589  tr_peer_event e = TR_PEER_EVENT_INIT;
590  e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
591  e.bitfield = bitfield;
592  publish (msgs, &e);
593}
594
595static void
596fireClientGotHave (tr_peerMsgs * msgs, tr_piece_index_t index)
597{
598  tr_peer_event e = TR_PEER_EVENT_INIT;
599  e.eventType = TR_PEER_CLIENT_GOT_HAVE;
600  e.pieceIndex = index;
601  publish (msgs, &e);
602}
603
604
605/**
606***  ALLOWED FAST SET
607***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
608**/
609
610#if 0
611size_t
612tr_generateAllowedSet (tr_piece_index_t * setmePieces,
613                       size_t             desiredSetSize,
614                       size_t             pieceCount,
615                       const uint8_t    * infohash,
616                       const tr_address * addr)
617{
618    size_t setSize = 0;
619
620    assert (setmePieces);
621    assert (desiredSetSize <= pieceCount);
622    assert (desiredSetSize);
623    assert (pieceCount);
624    assert (infohash);
625    assert (addr);
626
627    if (addr->type == TR_AF_INET)
628    {
629        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
630        uint8_t x[SHA_DIGEST_LENGTH];
631
632        uint32_t ui32 = ntohl (htonl (addr->addr.addr4.s_addr) & 0xffffff00);   /* (1) */
633        memcpy (w, &ui32, sizeof (uint32_t));
634        walk += sizeof (uint32_t);
635        memcpy (walk, infohash, SHA_DIGEST_LENGTH);                 /* (2) */
636        walk += SHA_DIGEST_LENGTH;
637        tr_sha1 (x, w, walk-w, NULL);                               /* (3) */
638        assert (sizeof (w) == walk-w);
639
640        while (setSize<desiredSetSize)
641        {
642            int i;
643            for (i=0; i<5 && setSize<desiredSetSize; ++i)           /* (4) */
644            {
645                size_t k;
646                uint32_t j = i * 4;                                  /* (5) */
647                uint32_t y = ntohl (* (uint32_t*)(x + j));       /* (6) */
648                uint32_t index = y % pieceCount;                     /* (7) */
649
650                for (k=0; k<setSize; ++k)                           /* (8) */
651                    if (setmePieces[k] == index)
652                        break;
653
654                if (k == setSize)
655                    setmePieces[setSize++] = index;                  /* (9) */
656            }
657
658            tr_sha1 (x, x, sizeof (x), NULL);                      /* (3) */
659        }
660    }
661
662    return setSize;
663}
664
665static void
666updateFastSet (tr_peerMsgs * msgs UNUSED)
667{
668    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
669    const int peerIsNeedy = msgs->peer->progress < 0.10;
670
671    if (fext && peerIsNeedy && !msgs->haveFastSet)
672    {
673        size_t i;
674        const struct tr_address * addr = tr_peerIoGetAddress (msgs->io, NULL);
675        const tr_info * inf = &msgs->torrent->info;
676        const size_t numwant = MIN (MAX_FAST_SET_SIZE, inf->pieceCount);
677
678        /* build the fast set */
679        msgs->fastsetSize = tr_generateAllowedSet (msgs->fastset, numwant, inf->pieceCount, inf->hash, addr);
680        msgs->haveFastSet = 1;
681
682        /* send it to the peer */
683        for (i=0; i<msgs->fastsetSize; ++i)
684            protocolSendAllowedFast (msgs, msgs->fastset[i]);
685    }
686}
687#endif
688
689/***
690****  ACTIVE
691***/
692
693static bool 
694tr_peerMsgsCalculateActive (const tr_peerMsgs * msgs, tr_direction direction)
695{
696  bool is_active;
697
698  assert (tr_isPeerMsgs (msgs));
699  assert (tr_isDirection (direction));
700
701  if (direction == TR_CLIENT_TO_PEER)
702    {
703      is_active = tr_peerMsgsIsPeerInterested (msgs)
704              && !tr_peerMsgsIsPeerChoked (msgs);
705
706      if (is_active)
707        assert (!tr_peerIsSeed (&msgs->peer));
708    }
709  else /* TR_PEER_TO_CLIENT */
710    {
711      if (!tr_torrentHasMetadata (msgs->torrent))
712        {
713          is_active = true;
714        }
715      else
716        {
717          is_active = tr_peerMsgsIsClientInterested (msgs)
718                  && !tr_peerMsgsIsClientChoked (msgs);
719
720          if (is_active)
721            assert (!tr_torrentIsSeed (msgs->torrent));
722        }
723    }
724
725  return is_active;
726}
727
728bool
729tr_peerMsgsIsActive (const tr_peerMsgs  * msgs, tr_direction direction)
730{
731  bool is_active;
732
733  assert (tr_isPeerMsgs (msgs));
734  assert (tr_isDirection (direction));
735
736  is_active = msgs->is_active[direction];
737
738  assert (is_active == tr_peerMsgsCalculateActive (msgs, direction));
739
740  return is_active;
741}
742
743static void
744tr_peerMsgsSetActive (tr_peerMsgs  * msgs,
745                      tr_direction   direction,
746                      bool           is_active)
747{
748  dbgmsg (msgs, "direction [%d] is_active [%d]", (int)direction, (int)is_active);
749
750  if (msgs->is_active[direction] != is_active)
751    {
752      msgs->is_active[direction] = is_active;
753
754      tr_swarmIncrementActivePeers (msgs->torrent->swarm, direction, is_active);
755    }
756}
757
758void
759tr_peerMsgsUpdateActive (tr_peerMsgs * msgs, tr_direction direction)
760{
761  const bool is_active = tr_peerMsgsCalculateActive (msgs, direction);
762
763  tr_peerMsgsSetActive (msgs, direction, is_active);
764}
765
766/**
767***  INTEREST
768**/
769
770static void
771sendInterest (tr_peerMsgs * msgs, bool b)
772{
773  struct evbuffer * out = msgs->outMessages;
774
775  assert (msgs);
776  assert (tr_isBool (b));
777
778  msgs->client_is_interested = b;
779  dbgmsg (msgs, "Sending %s", b ? "Interested" : "Not Interested");
780  evbuffer_add_uint32 (out, sizeof (uint8_t));
781  evbuffer_add_uint8 (out, b ? BT_INTERESTED : BT_NOT_INTERESTED);
782
783  pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
784  dbgOutMessageLen (msgs);
785}
786
787static void
788updateInterest (tr_peerMsgs * msgs UNUSED)
789{
790    /* FIXME -- might need to poke the mgr on startup */
791}
792
793void
794tr_peerMsgsSetInterested (tr_peerMsgs * msgs, bool b)
795{
796  assert (tr_isBool (b));
797
798  if (msgs->client_is_interested != b)
799    {
800      sendInterest (msgs, b);
801
802      tr_peerMsgsUpdateActive (msgs, TR_PEER_TO_CLIENT);
803    }
804}
805
806static bool
807popNextMetadataRequest (tr_peerMsgs * msgs, int * piece)
808{
809  if (msgs->peerAskedForMetadataCount == 0)
810    return false;
811
812  *piece = msgs->peerAskedForMetadata[0];
813
814  tr_removeElementFromArray (msgs->peerAskedForMetadata, 0, sizeof (int),
815                             msgs->peerAskedForMetadataCount--);
816
817  return true;
818}
819
820static bool
821popNextRequest (tr_peerMsgs * msgs, struct peer_request * setme)
822{
823  if (msgs->peer.pendingReqsToClient == 0)
824    return false;
825
826  *setme = msgs->peerAskedFor[0];
827
828  tr_removeElementFromArray (msgs->peerAskedFor,
829                             0,
830                             sizeof (struct peer_request),
831                             msgs->peer.pendingReqsToClient--);
832
833  return true;
834}
835
836static void
837cancelAllRequestsToClient (tr_peerMsgs * msgs)
838{
839  struct peer_request req;
840  const int mustSendCancel = tr_peerIoSupportsFEXT (msgs->io);
841
842  while (popNextRequest (msgs, &req))
843    if (mustSendCancel)
844      protocolSendReject (msgs, &req);
845}
846
847void
848tr_peerMsgsSetChoke (tr_peerMsgs * msgs, bool peer_is_choked)
849{
850  const time_t now = tr_time ();
851  const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
852
853  assert (msgs != NULL);
854  assert (tr_isBool (peer_is_choked));
855
856  if (msgs->chokeChangedAt > fibrillationTime)
857    {
858      dbgmsg (msgs, "Not changing choke to %d to avoid fibrillation", peer_is_choked);
859    }
860  else if (msgs->peer_is_choked != peer_is_choked)
861    {
862      msgs->peer_is_choked = peer_is_choked;
863      if (peer_is_choked)
864        cancelAllRequestsToClient (msgs);
865      protocolSendChoke (msgs, peer_is_choked);
866      msgs->chokeChangedAt = now;
867      tr_peerMsgsUpdateActive (msgs, TR_CLIENT_TO_PEER);
868    }
869}
870
871/**
872***
873**/
874
875void
876tr_peerMsgsHave (tr_peerMsgs * msgs, uint32_t index)
877{
878  protocolSendHave (msgs, index);
879
880  /* since we have more pieces now, we might not be interested in this peer */
881  updateInterest (msgs);
882}
883
884/**
885***
886**/
887
888static bool
889reqIsValid (const tr_peerMsgs * peer,
890            uint32_t            index,
891            uint32_t            offset,
892            uint32_t            length)
893{
894    return tr_torrentReqIsValid (peer->torrent, index, offset, length);
895}
896
897static bool
898requestIsValid (const tr_peerMsgs * msgs, const struct peer_request * req)
899{
900    return reqIsValid (msgs, req->index, req->offset, req->length);
901}
902
903void
904tr_peerMsgsCancel (tr_peerMsgs * msgs, tr_block_index_t block)
905{
906    struct peer_request req;
907/*fprintf (stderr, "SENDING CANCEL MESSAGE FOR BLOCK %"TR_PRIuSIZE"\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer);*/
908    blockToReq (msgs->torrent, block, &req);
909    protocolSendCancel (msgs, &req);
910}
911
912/**
913***
914**/
915
916static void
917sendLtepHandshake (tr_peerMsgs * msgs)
918{
919    tr_variant val;
920    bool allow_pex;
921    bool allow_metadata_xfer;
922    struct evbuffer * payload;
923    struct evbuffer * out = msgs->outMessages;
924    const unsigned char * ipv6 = tr_globalIPv6 ();
925    static tr_quark version_quark = 0;
926
927    if (msgs->clientSentLtepHandshake)
928        return;
929
930    if (!version_quark)
931      version_quark = tr_quark_new (TR_NAME " " USERAGENT_PREFIX, -1);
932
933    dbgmsg (msgs, "sending an ltep handshake");
934    msgs->clientSentLtepHandshake = 1;
935
936    /* decide if we want to advertise metadata xfer support (BEP 9) */
937    if (tr_torrentIsPrivate (msgs->torrent))
938        allow_metadata_xfer = 0;
939    else
940        allow_metadata_xfer = 1;
941
942    /* decide if we want to advertise pex support */
943    if (!tr_torrentAllowsPex (msgs->torrent))
944        allow_pex = 0;
945    else if (msgs->peerSentLtepHandshake)
946        allow_pex = msgs->peerSupportsPex ? 1 : 0;
947    else
948        allow_pex = 1;
949
950    tr_variantInitDict (&val, 8);
951    tr_variantDictAddInt (&val, TR_KEY_e, getSession (msgs)->encryptionMode != TR_CLEAR_PREFERRED);
952    if (ipv6 != NULL)
953        tr_variantDictAddRaw (&val, TR_KEY_ipv6, ipv6, 16);
954    if (allow_metadata_xfer && tr_torrentHasMetadata (msgs->torrent)
955                            && (msgs->torrent->infoDictLength > 0))
956        tr_variantDictAddInt (&val, TR_KEY_metadata_size, msgs->torrent->infoDictLength);
957    tr_variantDictAddInt (&val, TR_KEY_p, tr_sessionGetPublicPeerPort (getSession (msgs)));
958    tr_variantDictAddInt (&val, TR_KEY_reqq, REQQ);
959    tr_variantDictAddInt (&val, TR_KEY_upload_only, tr_torrentIsSeed (msgs->torrent));
960    tr_variantDictAddQuark (&val, TR_KEY_v, version_quark);
961    if (allow_metadata_xfer || allow_pex) {
962        tr_variant * m  = tr_variantDictAddDict (&val, TR_KEY_m, 2);
963        if (allow_metadata_xfer)
964            tr_variantDictAddInt (m, TR_KEY_ut_metadata, UT_METADATA_ID);
965        if (allow_pex)
966            tr_variantDictAddInt (m, TR_KEY_ut_pex, UT_PEX_ID);
967    }
968
969    payload = tr_variantToBuf (&val, TR_VARIANT_FMT_BENC);
970
971    evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
972    evbuffer_add_uint8 (out, BT_LTEP);
973    evbuffer_add_uint8 (out, LTEP_HANDSHAKE);
974    evbuffer_add_buffer (out, payload);
975    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
976    dbgOutMessageLen (msgs);
977
978    /* cleanup */
979    evbuffer_free (payload);
980    tr_variantFree (&val);
981}
982
983static void
984parseLtepHandshake (tr_peerMsgs * msgs, int len, struct evbuffer * inbuf)
985{
986    int64_t   i;
987    tr_variant   val, * sub;
988    uint8_t * tmp = tr_new (uint8_t, len);
989    const uint8_t *addr;
990    size_t addr_len;
991    tr_pex pex;
992    int8_t seedProbability = -1;
993
994    memset (&pex, 0, sizeof (tr_pex));
995
996    tr_peerIoReadBytes (msgs->io, inbuf, tmp, len);
997    msgs->peerSentLtepHandshake = 1;
998
999    if (tr_variantFromBenc (&val, tmp, len) || !tr_variantIsDict (&val))
1000    {
1001        dbgmsg (msgs, "GET  extended-handshake, couldn't get dictionary");
1002        tr_free (tmp);
1003        return;
1004    }
1005
1006    dbgmsg (msgs, "here is the handshake: [%*.*s]", len, len,  tmp);
1007
1008    /* does the peer prefer encrypted connections? */
1009    if (tr_variantDictFindInt (&val, TR_KEY_e, &i)) {
1010        msgs->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1011                                        : ENCRYPTION_PREFERENCE_NO;
1012        if (i)
1013            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
1014    }
1015
1016    /* check supported messages for utorrent pex */
1017    msgs->peerSupportsPex = 0;
1018    msgs->peerSupportsMetadataXfer = 0;
1019
1020    if (tr_variantDictFindDict (&val, TR_KEY_m, &sub)) {
1021        if (tr_variantDictFindInt (sub, TR_KEY_ut_pex, &i)) {
1022            msgs->peerSupportsPex = i != 0;
1023            msgs->ut_pex_id = (uint8_t) i;
1024            dbgmsg (msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id);
1025        }
1026        if (tr_variantDictFindInt (sub, TR_KEY_ut_metadata, &i)) {
1027            msgs->peerSupportsMetadataXfer = i != 0;
1028            msgs->ut_metadata_id = (uint8_t) i;
1029            dbgmsg (msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id);
1030        }
1031        if (tr_variantDictFindInt (sub, TR_KEY_ut_holepunch, &i)) {
1032            /* Mysterious µTorrent extension that we don't grok.  However,
1033               it implies support for µTP, so use it to indicate that. */
1034            tr_peerMgrSetUtpFailed (msgs->torrent,
1035                                    tr_peerIoGetAddress (msgs->io, NULL),
1036                                    false);
1037        }
1038    }
1039
1040    /* look for metainfo size (BEP 9) */
1041    if (tr_variantDictFindInt (&val, TR_KEY_metadata_size, &i)) {
1042        tr_torrentSetMetadataSizeHint (msgs->torrent, i);
1043        msgs->metadata_size_hint = (size_t) i;
1044    }
1045
1046    /* look for upload_only (BEP 21) */
1047    if (tr_variantDictFindInt (&val, TR_KEY_upload_only, &i))
1048        seedProbability = i==0 ? 0 : 100;
1049
1050    /* get peer's listening port */
1051    if (tr_variantDictFindInt (&val, TR_KEY_p, &i)) {
1052        pex.port = htons ((uint16_t)i);
1053        fireClientGotPort (msgs, pex.port);
1054        dbgmsg (msgs, "peer's port is now %d", (int)i);
1055    }
1056
1057    if (tr_peerIoIsIncoming (msgs->io)
1058        && tr_variantDictFindRaw (&val, TR_KEY_ipv4, &addr, &addr_len)
1059        && (addr_len == 4))
1060    {
1061        pex.addr.type = TR_AF_INET;
1062        memcpy (&pex.addr.addr.addr4, addr, 4);
1063        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
1064    }
1065
1066    if (tr_peerIoIsIncoming (msgs->io)
1067        && tr_variantDictFindRaw (&val, TR_KEY_ipv6, &addr, &addr_len)
1068        && (addr_len == 16))
1069    {
1070        pex.addr.type = TR_AF_INET6;
1071        memcpy (&pex.addr.addr.addr6, addr, 16);
1072        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
1073    }
1074
1075    /* get peer's maximum request queue size */
1076    if (tr_variantDictFindInt (&val, TR_KEY_reqq, &i))
1077        msgs->reqq = i;
1078
1079    tr_variantFree (&val);
1080    tr_free (tmp);
1081}
1082
1083static void
1084parseUtMetadata (tr_peerMsgs * msgs, int msglen, struct evbuffer * inbuf)
1085{
1086    tr_variant dict;
1087    char * msg_end;
1088    const char * benc_end;
1089    int64_t msg_type = -1;
1090    int64_t piece = -1;
1091    int64_t total_size = 0;
1092    uint8_t * tmp = tr_new (uint8_t, msglen);
1093
1094    tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1095    msg_end = (char*)tmp + msglen;
1096
1097    if (!tr_variantFromBencFull (&dict, tmp, msglen, NULL, &benc_end))
1098    {
1099        tr_variantDictFindInt (&dict, TR_KEY_msg_type, &msg_type);
1100        tr_variantDictFindInt (&dict, TR_KEY_piece, &piece);
1101        tr_variantDictFindInt (&dict, TR_KEY_total_size, &total_size);
1102        tr_variantFree (&dict);
1103    }
1104
1105    dbgmsg (msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
1106          (int)msg_type, (int)piece, (int)total_size);
1107
1108    if (msg_type == METADATA_MSG_TYPE_REJECT)
1109    {
1110        /* NOOP */
1111    }
1112
1113    if ((msg_type == METADATA_MSG_TYPE_DATA)
1114        && (!tr_torrentHasMetadata (msgs->torrent))
1115        && (msg_end - benc_end <= METADATA_PIECE_SIZE)
1116        && (piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size))
1117    {
1118        const int pieceLen = msg_end - benc_end;
1119        tr_torrentSetMetadataPiece (msgs->torrent, piece, benc_end, pieceLen);
1120    }
1121
1122    if (msg_type == METADATA_MSG_TYPE_REQUEST)
1123    {
1124        if ((piece >= 0)
1125            && tr_torrentHasMetadata (msgs->torrent)
1126            && !tr_torrentIsPrivate (msgs->torrent)
1127            && (msgs->peerAskedForMetadataCount < METADATA_REQQ))
1128        {
1129            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1130        }
1131        else
1132        {
1133            tr_variant tmp;
1134            struct evbuffer * payload;
1135            struct evbuffer * out = msgs->outMessages;
1136
1137            /* build the rejection message */
1138            tr_variantInitDict (&tmp, 2);
1139            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
1140            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1141            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1142
1143            /* write it out as a LTEP message to our outMessages buffer */
1144            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1145            evbuffer_add_uint8 (out, BT_LTEP);
1146            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1147            evbuffer_add_buffer (out, payload);
1148            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1149            dbgOutMessageLen (msgs);
1150
1151            /* cleanup */
1152            evbuffer_free (payload);
1153            tr_variantFree (&tmp);
1154        }
1155    }
1156
1157    tr_free (tmp);
1158}
1159
1160static void
1161parseUtPex (tr_peerMsgs * msgs, int msglen, struct evbuffer * inbuf)
1162{
1163    int loaded = 0;
1164    uint8_t * tmp = tr_new (uint8_t, msglen);
1165    tr_variant val;
1166    tr_torrent * tor = msgs->torrent;
1167    const uint8_t * added;
1168    size_t added_len;
1169
1170    tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1171
1172    if (tr_torrentAllowsPex (tor)
1173      && ((loaded = !tr_variantFromBenc (&val, tmp, msglen))))
1174    {
1175        if (tr_variantDictFindRaw (&val, TR_KEY_added, &added, &added_len))
1176        {
1177            tr_pex * pex;
1178            size_t i, n;
1179            size_t added_f_len = 0;
1180            const uint8_t * added_f = NULL;
1181
1182            tr_variantDictFindRaw (&val, TR_KEY_added_f, &added_f, &added_f_len);
1183            pex = tr_peerMgrCompactToPex (added, added_len, added_f, added_f_len, &n);
1184
1185            n = MIN (n, MAX_PEX_PEER_COUNT);
1186            for (i=0; i<n; ++i)
1187            {
1188                int seedProbability = -1;
1189                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1190                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1191            }
1192
1193            tr_free (pex);
1194        }
1195
1196        if (tr_variantDictFindRaw (&val, TR_KEY_added6, &added, &added_len))
1197        {
1198            tr_pex * pex;
1199            size_t i, n;
1200            size_t added_f_len = 0;
1201            const uint8_t * added_f = NULL;
1202
1203            tr_variantDictFindRaw (&val, TR_KEY_added6_f, &added_f, &added_f_len);
1204            pex = tr_peerMgrCompact6ToPex (added, added_len, added_f, added_f_len, &n);
1205
1206            n = MIN (n, MAX_PEX_PEER_COUNT);
1207            for (i=0; i<n; ++i)
1208            {
1209                int seedProbability = -1;
1210                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1211                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1212            }
1213
1214            tr_free (pex);
1215        }
1216    }
1217
1218    if (loaded)
1219        tr_variantFree (&val);
1220    tr_free (tmp);
1221}
1222
1223static void sendPex (tr_peerMsgs * msgs);
1224
1225static void
1226parseLtep (tr_peerMsgs * msgs, int msglen, struct evbuffer  * inbuf)
1227{
1228    uint8_t ltep_msgid;
1229
1230    tr_peerIoReadUint8 (msgs->io, inbuf, &ltep_msgid);
1231    msglen--;
1232
1233    if (ltep_msgid == LTEP_HANDSHAKE)
1234    {
1235        dbgmsg (msgs, "got ltep handshake");
1236        parseLtepHandshake (msgs, msglen, inbuf);
1237        if (tr_peerIoSupportsLTEP (msgs->io))
1238        {
1239            sendLtepHandshake (msgs);
1240            sendPex (msgs);
1241        }
1242    }
1243    else if (ltep_msgid == UT_PEX_ID)
1244    {
1245        dbgmsg (msgs, "got ut pex");
1246        msgs->peerSupportsPex = 1;
1247        parseUtPex (msgs, msglen, inbuf);
1248    }
1249    else if (ltep_msgid == UT_METADATA_ID)
1250    {
1251        dbgmsg (msgs, "got ut metadata");
1252        msgs->peerSupportsMetadataXfer = 1;
1253        parseUtMetadata (msgs, msglen, inbuf);
1254    }
1255    else
1256    {
1257        dbgmsg (msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid);
1258        evbuffer_drain (inbuf, msglen);
1259    }
1260}
1261
1262static int
1263readBtLength (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1264{
1265    uint32_t len;
1266
1267    if (inlen < sizeof (len))
1268        return READ_LATER;
1269
1270    tr_peerIoReadUint32 (msgs->io, inbuf, &len);
1271
1272    if (len == 0) /* peer sent us a keepalive message */
1273        dbgmsg (msgs, "got KeepAlive");
1274    else
1275    {
1276        msgs->incoming.length = len;
1277        msgs->state = AWAITING_BT_ID;
1278    }
1279
1280    return READ_NOW;
1281}
1282
1283static int readBtMessage (tr_peerMsgs *, struct evbuffer *, size_t);
1284
1285static int
1286readBtId (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1287{
1288    uint8_t id;
1289
1290    if (inlen < sizeof (uint8_t))
1291        return READ_LATER;
1292
1293    tr_peerIoReadUint8 (msgs->io, inbuf, &id);
1294    msgs->incoming.id = id;
1295    dbgmsg (msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %"TR_PRIuSIZE, id, (size_t)msgs->incoming.length);
1296
1297    if (id == BT_PIECE)
1298    {
1299        msgs->state = AWAITING_BT_PIECE;
1300        return READ_NOW;
1301    }
1302    else if (msgs->incoming.length != 1)
1303    {
1304        msgs->state = AWAITING_BT_MESSAGE;
1305        return READ_NOW;
1306    }
1307    else return readBtMessage (msgs, inbuf, inlen - 1);
1308}
1309
1310static void
1311updatePeerProgress (tr_peerMsgs * msgs)
1312{
1313  tr_peerUpdateProgress (msgs->torrent, &msgs->peer);
1314
1315  /*updateFastSet (msgs);*/
1316  updateInterest (msgs);
1317}
1318
1319static void
1320prefetchPieces (tr_peerMsgs *msgs)
1321{
1322  int i;
1323
1324  if (!getSession (msgs)->isPrefetchEnabled)
1325    return;
1326
1327  for (i=msgs->prefetchCount; i<msgs->peer.pendingReqsToClient && i<PREFETCH_SIZE; ++i)
1328    {
1329      const struct peer_request * req = msgs->peerAskedFor + i;
1330      if (requestIsValid (msgs, req))
1331        {
1332          tr_cachePrefetchBlock (getSession (msgs)->cache, msgs->torrent, req->index, req->offset, req->length);
1333          ++msgs->prefetchCount;
1334        }
1335    }
1336}
1337
1338static void
1339peerMadeRequest (tr_peerMsgs * msgs, const struct peer_request * req)
1340{
1341    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1342    const int reqIsValid = requestIsValid (msgs, req);
1343    const int clientHasPiece = reqIsValid && tr_torrentPieceIsComplete (msgs->torrent, req->index);
1344    const int peerIsChoked = msgs->peer_is_choked;
1345
1346    bool allow = false;
1347
1348    if (!reqIsValid)
1349        dbgmsg (msgs, "rejecting an invalid request.");
1350    else if (!clientHasPiece)
1351        dbgmsg (msgs, "rejecting request for a piece we don't have.");
1352    else if (peerIsChoked)
1353        dbgmsg (msgs, "rejecting request from choked peer");
1354    else if (msgs->peer.pendingReqsToClient + 1 >= REQQ)
1355        dbgmsg (msgs, "rejecting request ... reqq is full");
1356    else
1357        allow = true;
1358
1359    if (allow) {
1360        msgs->peerAskedFor[msgs->peer.pendingReqsToClient++] = *req;
1361        prefetchPieces (msgs);
1362    } else if (fext) {
1363        protocolSendReject (msgs, req);
1364    }
1365}
1366
1367static bool
1368messageLengthIsCorrect (const tr_peerMsgs * msg, uint8_t id, uint32_t len)
1369{
1370    switch (id)
1371    {
1372        case BT_CHOKE:
1373        case BT_UNCHOKE:
1374        case BT_INTERESTED:
1375        case BT_NOT_INTERESTED:
1376        case BT_FEXT_HAVE_ALL:
1377        case BT_FEXT_HAVE_NONE:
1378            return len == 1;
1379
1380        case BT_HAVE:
1381        case BT_FEXT_SUGGEST:
1382        case BT_FEXT_ALLOWED_FAST:
1383            return len == 5;
1384
1385        case BT_BITFIELD:
1386            if (tr_torrentHasMetadata (msg->torrent))
1387                return len == (msg->torrent->info.pieceCount + 7u) / 8u + 1u;
1388            /* we don't know the piece count yet,
1389               so we can only guess whether to send true or false */
1390            if (msg->metadata_size_hint > 0)
1391                return len <= msg->metadata_size_hint;
1392            return true;
1393
1394        case BT_REQUEST:
1395        case BT_CANCEL:
1396        case BT_FEXT_REJECT:
1397            return len == 13;
1398
1399        case BT_PIECE:
1400            return len > 9 && len <= 16393;
1401
1402        case BT_PORT:
1403            return len == 3;
1404
1405        case BT_LTEP:
1406            return len >= 2;
1407
1408        default:
1409            return false;
1410    }
1411}
1412
1413static int clientGotBlock (tr_peerMsgs *               msgs,
1414                           struct evbuffer *           block,
1415                           const struct peer_request * req);
1416
1417static int
1418readBtPiece (tr_peerMsgs      * msgs,
1419             struct evbuffer  * inbuf,
1420             size_t             inlen,
1421             size_t           * setme_piece_bytes_read)
1422{
1423    struct peer_request * req = &msgs->incoming.blockReq;
1424
1425    assert (evbuffer_get_length (inbuf) >= inlen);
1426    dbgmsg (msgs, "In readBtPiece");
1427
1428    if (!req->length)
1429    {
1430        if (inlen < 8)
1431            return READ_LATER;
1432
1433        tr_peerIoReadUint32 (msgs->io, inbuf, &req->index);
1434        tr_peerIoReadUint32 (msgs->io, inbuf, &req->offset);
1435        req->length = msgs->incoming.length - 9;
1436        dbgmsg (msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length);
1437        return READ_NOW;
1438    }
1439    else
1440    {
1441        int err;
1442        size_t n;
1443        size_t nLeft;
1444        struct evbuffer * block_buffer;
1445
1446        if (msgs->incoming.block == NULL)
1447            msgs->incoming.block = evbuffer_new ();
1448        block_buffer = msgs->incoming.block;
1449
1450        /* read in another chunk of data */
1451        nLeft = req->length - evbuffer_get_length (block_buffer);
1452        n = MIN (nLeft, inlen);
1453
1454        tr_peerIoReadBytesToBuf (msgs->io, inbuf, block_buffer, n);
1455
1456        fireClientGotPieceData (msgs, n);
1457        *setme_piece_bytes_read += n;
1458        dbgmsg (msgs, "got %"TR_PRIuSIZE" bytes for block %u:%u->%u ... %d remain",
1459               n, req->index, req->offset, req->length,
1460             (int)(req->length - evbuffer_get_length (block_buffer)));
1461        if (evbuffer_get_length (block_buffer) < req->length)
1462            return READ_LATER;
1463
1464        /* pass the block along... */
1465        err = clientGotBlock (msgs, block_buffer, req);
1466        evbuffer_drain (block_buffer, evbuffer_get_length (block_buffer));
1467
1468        /* cleanup */
1469        req->length = 0;
1470        msgs->state = AWAITING_BT_LENGTH;
1471        return err ? READ_ERR : READ_NOW;
1472    }
1473}
1474
1475static void updateDesiredRequestCount (tr_peerMsgs * msgs);
1476
1477static int
1478readBtMessage (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1479{
1480    uint32_t      ui32;
1481    uint32_t      msglen = msgs->incoming.length;
1482    const uint8_t id = msgs->incoming.id;
1483#ifndef NDEBUG
1484    const size_t  startBufLen = evbuffer_get_length (inbuf);
1485#endif
1486    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1487
1488    --msglen; /* id length */
1489
1490    dbgmsg (msgs, "got BT id %d, len %d, buffer size is %"TR_PRIuSIZE, (int)id, (int)msglen, inlen);
1491
1492    if (inlen < msglen)
1493        return READ_LATER;
1494
1495    if (!messageLengthIsCorrect (msgs, id, msglen + 1))
1496    {
1497        dbgmsg (msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen);
1498        fireError (msgs, EMSGSIZE);
1499        return READ_ERR;
1500    }
1501
1502    switch (id)
1503    {
1504        case BT_CHOKE:
1505            dbgmsg (msgs, "got Choke");
1506            msgs->client_is_choked = true;
1507            if (!fext)
1508                fireGotChoke (msgs);
1509            tr_peerMsgsUpdateActive (msgs, TR_PEER_TO_CLIENT);
1510            break;
1511
1512        case BT_UNCHOKE:
1513            dbgmsg (msgs, "got Unchoke");
1514            msgs->client_is_choked = false;
1515            tr_peerMsgsUpdateActive (msgs, TR_PEER_TO_CLIENT);
1516            updateDesiredRequestCount (msgs);
1517            break;
1518
1519        case BT_INTERESTED:
1520            dbgmsg (msgs, "got Interested");
1521            msgs->peer_is_interested = true;
1522            tr_peerMsgsUpdateActive (msgs, TR_CLIENT_TO_PEER);
1523            break;
1524
1525        case BT_NOT_INTERESTED:
1526            dbgmsg (msgs, "got Not Interested");
1527            msgs->peer_is_interested = false;
1528            tr_peerMsgsUpdateActive (msgs, TR_CLIENT_TO_PEER);
1529            break;
1530
1531        case BT_HAVE:
1532            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1533            dbgmsg (msgs, "got Have: %u", ui32);
1534            if (tr_torrentHasMetadata (msgs->torrent)
1535                    && (ui32 >= msgs->torrent->info.pieceCount))
1536            {
1537                fireError (msgs, ERANGE);
1538                return READ_ERR;
1539            }
1540
1541            /* a peer can send the same HAVE message twice... */
1542            if (!tr_bitfieldHas (&msgs->peer.have, ui32)) {
1543                tr_bitfieldAdd (&msgs->peer.have, ui32);
1544                fireClientGotHave (msgs, ui32);
1545            }
1546            updatePeerProgress (msgs);
1547            break;
1548
1549        case BT_BITFIELD: {
1550            uint8_t * tmp = tr_new (uint8_t, msglen);
1551            dbgmsg (msgs, "got a bitfield");
1552            tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1553            tr_bitfieldSetRaw (&msgs->peer.have, tmp, msglen, tr_torrentHasMetadata (msgs->torrent));
1554            fireClientGotBitfield (msgs, &msgs->peer.have);
1555            updatePeerProgress (msgs);
1556            tr_free (tmp);
1557            break;
1558        }
1559
1560        case BT_REQUEST:
1561        {
1562            struct peer_request r;
1563            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1564            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1565            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1566            dbgmsg (msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length);
1567            peerMadeRequest (msgs, &r);
1568            break;
1569        }
1570
1571        case BT_CANCEL:
1572        {
1573            int i;
1574            struct peer_request r;
1575            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1576            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1577            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1578            tr_historyAdd (&msgs->peer.cancelsSentToClient, tr_time (), 1);
1579            dbgmsg (msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length);
1580
1581            for (i=0; i<msgs->peer.pendingReqsToClient; ++i) {
1582                const struct peer_request * req = msgs->peerAskedFor + i;
1583                if ((req->index == r.index) && (req->offset == r.offset) && (req->length == r.length))
1584                    break;
1585            }
1586
1587            if (i < msgs->peer.pendingReqsToClient)
1588                tr_removeElementFromArray (msgs->peerAskedFor, i, sizeof (struct peer_request),
1589                                           msgs->peer.pendingReqsToClient--);
1590            break;
1591        }
1592
1593        case BT_PIECE:
1594            assert (0); /* handled elsewhere! */
1595            break;
1596
1597        case BT_PORT:
1598            dbgmsg (msgs, "Got a BT_PORT");
1599            tr_peerIoReadUint16 (msgs->io, inbuf, &msgs->dht_port);
1600            if (msgs->dht_port > 0)
1601                tr_dhtAddNode (getSession (msgs),
1602                               tr_peerAddress (&msgs->peer),
1603                               msgs->dht_port, 0);
1604            break;
1605
1606        case BT_FEXT_SUGGEST:
1607            dbgmsg (msgs, "Got a BT_FEXT_SUGGEST");
1608            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1609            if (fext)
1610                fireClientGotSuggest (msgs, ui32);
1611            else {
1612                fireError (msgs, EMSGSIZE);
1613                return READ_ERR;
1614            }
1615            break;
1616
1617        case BT_FEXT_ALLOWED_FAST:
1618            dbgmsg (msgs, "Got a BT_FEXT_ALLOWED_FAST");
1619            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1620            if (fext)
1621                fireClientGotAllowedFast (msgs, ui32);
1622            else {
1623                fireError (msgs, EMSGSIZE);
1624                return READ_ERR;
1625            }
1626            break;
1627
1628        case BT_FEXT_HAVE_ALL:
1629            dbgmsg (msgs, "Got a BT_FEXT_HAVE_ALL");
1630            if (fext) {
1631                tr_bitfieldSetHasAll (&msgs->peer.have);
1632assert (tr_bitfieldHasAll (&msgs->peer.have));
1633                fireClientGotHaveAll (msgs);
1634                updatePeerProgress (msgs);
1635            } else {
1636                fireError (msgs, EMSGSIZE);
1637                return READ_ERR;
1638            }
1639            break;
1640
1641        case BT_FEXT_HAVE_NONE:
1642            dbgmsg (msgs, "Got a BT_FEXT_HAVE_NONE");
1643            if (fext) {
1644                tr_bitfieldSetHasNone (&msgs->peer.have);
1645                fireClientGotHaveNone (msgs);
1646                updatePeerProgress (msgs);
1647            } else {
1648                fireError (msgs, EMSGSIZE);
1649                return READ_ERR;
1650            }
1651            break;
1652
1653        case BT_FEXT_REJECT:
1654        {
1655            struct peer_request r;
1656            dbgmsg (msgs, "Got a BT_FEXT_REJECT");
1657            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1658            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1659            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1660            if (fext)
1661                fireGotRej (msgs, &r);
1662            else {
1663                fireError (msgs, EMSGSIZE);
1664                return READ_ERR;
1665            }
1666            break;
1667        }
1668
1669        case BT_LTEP:
1670            dbgmsg (msgs, "Got a BT_LTEP");
1671            parseLtep (msgs, msglen, inbuf);
1672            break;
1673
1674        default:
1675            dbgmsg (msgs, "peer sent us an UNKNOWN: %d", (int)id);
1676            tr_peerIoDrain (msgs->io, inbuf, msglen);
1677            break;
1678    }
1679
1680    assert (msglen + 1 == msgs->incoming.length);
1681    assert (evbuffer_get_length (inbuf) == startBufLen - msglen);
1682
1683    msgs->state = AWAITING_BT_LENGTH;
1684    return READ_NOW;
1685}
1686
1687/* returns 0 on success, or an errno on failure */
1688static int
1689clientGotBlock (tr_peerMsgs                * msgs,
1690                struct evbuffer            * data,
1691                const struct peer_request  * req)
1692{
1693    int err;
1694    tr_torrent * tor = msgs->torrent;
1695    const tr_block_index_t block = _tr_block (tor, req->index, req->offset);
1696
1697    assert (msgs);
1698    assert (req);
1699
1700    if (!requestIsValid (msgs, req)) {
1701        dbgmsg (msgs, "dropping invalid block %u:%u->%u",
1702                req->index, req->offset, req->length);
1703        return EBADMSG;
1704    }
1705
1706    if (req->length != tr_torBlockCountBytes (msgs->torrent, block)) {
1707        dbgmsg (msgs, "wrong block size -- expected %u, got %d",
1708                tr_torBlockCountBytes (msgs->torrent, block), req->length);
1709        return EMSGSIZE;
1710    }
1711
1712    dbgmsg (msgs, "got block %u:%u->%u", req->index, req->offset, req->length);
1713
1714    if (!tr_peerMgrDidPeerRequest (msgs->torrent, &msgs->peer, block)) {
1715        dbgmsg (msgs, "we didn't ask for this message...");
1716        return 0;
1717    }
1718    if (tr_torrentPieceIsComplete (msgs->torrent, req->index)) {
1719        dbgmsg (msgs, "we did ask for this message, but the piece is already complete...");
1720        return 0;
1721    }
1722
1723    /**
1724    ***  Save the block
1725    **/
1726
1727    if ((err = tr_cacheWriteBlock (getSession (msgs)->cache, tor, req->index, req->offset, req->length, data)))
1728        return err;
1729
1730    tr_bitfieldAdd (&msgs->peer.blame, req->index);
1731    fireGotBlock (msgs, req);
1732    return 0;
1733}
1734
1735static int peerPulse (void * vmsgs);
1736
1737static void
1738didWrite (tr_peerIo * io UNUSED, size_t bytesWritten, bool wasPieceData, void * vmsgs)
1739{
1740    tr_peerMsgs * msgs = vmsgs;
1741
1742    if (wasPieceData)
1743      firePeerGotPieceData (msgs, bytesWritten);
1744
1745    if (tr_isPeerIo (io) && io->userData)
1746        peerPulse (msgs);
1747}
1748
1749static ReadState
1750canRead (tr_peerIo * io, void * vmsgs, size_t * piece)
1751{
1752    ReadState         ret;
1753    tr_peerMsgs *     msgs = vmsgs;
1754    struct evbuffer * in = tr_peerIoGetReadBuffer (io);
1755    const size_t      inlen = evbuffer_get_length (in);
1756
1757    dbgmsg (msgs, "canRead: inlen is %"TR_PRIuSIZE", msgs->state is %d", inlen, msgs->state);
1758
1759    if (!inlen)
1760    {
1761        ret = READ_LATER;
1762    }
1763    else if (msgs->state == AWAITING_BT_PIECE)
1764    {
1765        ret = readBtPiece (msgs, in, inlen, piece);
1766    }
1767    else switch (msgs->state)
1768    {
1769        case AWAITING_BT_LENGTH:
1770            ret = readBtLength (msgs, in, inlen); break;
1771
1772        case AWAITING_BT_ID:
1773            ret = readBtId   (msgs, in, inlen); break;
1774
1775        case AWAITING_BT_MESSAGE:
1776            ret = readBtMessage (msgs, in, inlen); break;
1777
1778        default:
1779            ret = READ_ERR;
1780            assert (0);
1781    }
1782
1783    dbgmsg (msgs, "canRead: ret is %d", (int)ret);
1784
1785    return ret;
1786}
1787
1788int
1789tr_peerMsgsIsReadingBlock (const tr_peerMsgs * msgs, tr_block_index_t block)
1790{
1791    if (msgs->state != AWAITING_BT_PIECE)
1792        return false;
1793
1794    return block == _tr_block (msgs->torrent,
1795                               msgs->incoming.blockReq.index,
1796                               msgs->incoming.blockReq.offset);
1797}
1798
1799/**
1800***
1801**/
1802
1803static void
1804updateDesiredRequestCount (tr_peerMsgs * msgs)
1805{
1806    tr_torrent * const torrent = msgs->torrent;
1807
1808    /* there are lots of reasons we might not want to request any blocks... */
1809    if (tr_torrentIsSeed (torrent) || !tr_torrentHasMetadata (torrent)
1810                                    || msgs->client_is_choked
1811                                    || !msgs->client_is_interested)
1812    {
1813        msgs->desiredRequestCount = 0;
1814    }
1815    else
1816    {
1817        int estimatedBlocksInPeriod;
1818        unsigned int rate_Bps;
1819        unsigned int irate_Bps;
1820        const int floor = 4;
1821        const int seconds = REQUEST_BUF_SECS;
1822        const uint64_t now = tr_time_msec ();
1823
1824        /* Get the rate limit we should use.
1825         * FIXME: this needs to consider all the other peers as well... */
1826        rate_Bps = tr_peerGetPieceSpeed_Bps (&msgs->peer, now, TR_PEER_TO_CLIENT);
1827        if (tr_torrentUsesSpeedLimit (torrent, TR_PEER_TO_CLIENT))
1828            rate_Bps = MIN (rate_Bps, tr_torrentGetSpeedLimit_Bps (torrent, TR_PEER_TO_CLIENT));
1829
1830        /* honor the session limits, if enabled */
1831        if (tr_torrentUsesSessionLimits (torrent) &&
1832            tr_sessionGetActiveSpeedLimit_Bps (torrent->session, TR_PEER_TO_CLIENT, &irate_Bps))
1833                rate_Bps = MIN (rate_Bps, irate_Bps);
1834
1835        /* use this desired rate to figure out how
1836         * many requests we should send to this peer */
1837        estimatedBlocksInPeriod = (rate_Bps * seconds) / torrent->blockSize;
1838        msgs->desiredRequestCount = MAX (floor, estimatedBlocksInPeriod);
1839
1840        /* honor the peer's maximum request count, if specified */
1841        if (msgs->reqq > 0)
1842            if (msgs->desiredRequestCount > msgs->reqq)
1843                msgs->desiredRequestCount = msgs->reqq;
1844    }
1845}
1846
1847static void
1848updateMetadataRequests (tr_peerMsgs * msgs, time_t now)
1849{
1850    int piece;
1851
1852    if (msgs->peerSupportsMetadataXfer
1853        && tr_torrentGetNextMetadataRequest (msgs->torrent, now, &piece))
1854    {
1855        tr_variant tmp;
1856        struct evbuffer * payload;
1857        struct evbuffer * out = msgs->outMessages;
1858
1859        /* build the data message */
1860        tr_variantInitDict (&tmp, 3);
1861        tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REQUEST);
1862        tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1863        payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1864
1865        dbgmsg (msgs, "requesting metadata piece #%d", piece);
1866
1867        /* write it out as a LTEP message to our outMessages buffer */
1868        evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1869        evbuffer_add_uint8 (out, BT_LTEP);
1870        evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1871        evbuffer_add_buffer (out, payload);
1872        pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1873        dbgOutMessageLen (msgs);
1874
1875        /* cleanup */
1876        evbuffer_free (payload);
1877        tr_variantFree (&tmp);
1878    }
1879}
1880
1881static void
1882updateBlockRequests (tr_peerMsgs * msgs)
1883{
1884    if (tr_torrentIsPieceTransferAllowed (msgs->torrent, TR_PEER_TO_CLIENT)
1885        && (msgs->desiredRequestCount > 0)
1886        && (msgs->peer.pendingReqsToPeer <= (msgs->desiredRequestCount * 0.66)))
1887    {
1888        int i;
1889        int n;
1890        tr_block_index_t * blocks;
1891        const int numwant = msgs->desiredRequestCount - msgs->peer.pendingReqsToPeer;
1892
1893        assert (tr_peerMsgsIsClientInterested (msgs));
1894        assert (!tr_peerMsgsIsClientChoked (msgs));
1895
1896        blocks = tr_new (tr_block_index_t, numwant);
1897        tr_peerMgrGetNextRequests (msgs->torrent, &msgs->peer, numwant, blocks, &n, false);
1898
1899        for (i=0; i<n; ++i)
1900        {
1901            struct peer_request req;
1902            blockToReq (msgs->torrent, blocks[i], &req);
1903            protocolSendRequest (msgs, &req);
1904        }
1905
1906        tr_free (blocks);
1907    }
1908}
1909
1910static size_t
1911fillOutputBuffer (tr_peerMsgs * msgs, time_t now)
1912{
1913    int piece;
1914    size_t bytesWritten = 0;
1915    struct peer_request req;
1916    const bool haveMessages = evbuffer_get_length (msgs->outMessages) != 0;
1917    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1918
1919    /**
1920    ***  Protocol messages
1921    **/
1922
1923    if (haveMessages && !msgs->outMessagesBatchedAt) /* fresh batch */
1924    {
1925        dbgmsg (msgs, "started an outMessages batch (length is %"TR_PRIuSIZE")", evbuffer_get_length (msgs->outMessages));
1926        msgs->outMessagesBatchedAt = now;
1927    }
1928    else if (haveMessages && ((now - msgs->outMessagesBatchedAt) >= msgs->outMessagesBatchPeriod))
1929    {
1930        const size_t len = evbuffer_get_length (msgs->outMessages);
1931        /* flush the protocol messages */
1932        dbgmsg (msgs, "flushing outMessages... to %p (length is %"TR_PRIuSIZE")", (void*)msgs->io, len);
1933        tr_peerIoWriteBuf (msgs->io, msgs->outMessages, false);
1934        msgs->clientSentAnythingAt = now;
1935        msgs->outMessagesBatchedAt = 0;
1936        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1937        bytesWritten +=  len;
1938    }
1939
1940    /**
1941    ***  Metadata Pieces
1942    **/
1943
1944    if ((tr_peerIoGetWriteBufferSpace (msgs->io, now) >= METADATA_PIECE_SIZE)
1945        && popNextMetadataRequest (msgs, &piece))
1946    {
1947        char * data;
1948        int dataLen;
1949        bool ok = false;
1950
1951        data = tr_torrentGetMetadataPiece (msgs->torrent, piece, &dataLen);
1952        if ((dataLen > 0) && (data != NULL))
1953        {
1954            tr_variant tmp;
1955            struct evbuffer * payload;
1956            struct evbuffer * out = msgs->outMessages;
1957
1958            /* build the data message */
1959            tr_variantInitDict (&tmp, 3);
1960            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_DATA);
1961            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1962            tr_variantDictAddInt (&tmp, TR_KEY_total_size, msgs->torrent->infoDictLength);
1963            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1964
1965            /* write it out as a LTEP message to our outMessages buffer */
1966            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload) + dataLen);
1967            evbuffer_add_uint8 (out, BT_LTEP);
1968            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1969            evbuffer_add_buffer (out, payload);
1970            evbuffer_add     (out, data, dataLen);
1971            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1972            dbgOutMessageLen (msgs);
1973
1974            evbuffer_free (payload);
1975            tr_variantFree (&tmp);
1976            tr_free (data);
1977
1978            ok = true;
1979        }
1980
1981        if (!ok) /* send a rejection message */
1982        {
1983            tr_variant tmp;
1984            struct evbuffer * payload;
1985            struct evbuffer * out = msgs->outMessages;
1986
1987            /* build the rejection message */
1988            tr_variantInitDict (&tmp, 2);
1989            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
1990            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1991            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1992
1993            /* write it out as a LTEP message to our outMessages buffer */
1994            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1995            evbuffer_add_uint8 (out, BT_LTEP);
1996            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1997            evbuffer_add_buffer (out, payload);
1998            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1999            dbgOutMessageLen (msgs);
2000
2001            evbuffer_free (payload);
2002            tr_variantFree (&tmp);
2003        }
2004    }
2005
2006    /**
2007    ***  Data Blocks
2008    **/
2009
2010    if ((tr_peerIoGetWriteBufferSpace (msgs->io, now) >= msgs->torrent->blockSize)
2011        && popNextRequest (msgs, &req))
2012    {
2013        --msgs->prefetchCount;
2014
2015        if (requestIsValid (msgs, &req)
2016            && tr_torrentPieceIsComplete (msgs->torrent, req.index))
2017        {
2018            int err;
2019            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
2020            struct evbuffer * out;
2021            struct evbuffer_iovec iovec[1];
2022
2023            out = evbuffer_new ();
2024            evbuffer_expand (out, msglen);
2025
2026            evbuffer_add_uint32 (out, sizeof (uint8_t) + 2 * sizeof (uint32_t) + req.length);
2027            evbuffer_add_uint8 (out, BT_PIECE);
2028            evbuffer_add_uint32 (out, req.index);
2029            evbuffer_add_uint32 (out, req.offset);
2030
2031            evbuffer_reserve_space (out, req.length, iovec, 1);
2032            err = tr_cacheReadBlock (getSession (msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base);
2033            iovec[0].iov_len = req.length;
2034            evbuffer_commit_space (out, iovec, 1);
2035
2036            /* check the piece if it needs checking... */
2037            if (!err && tr_torrentPieceNeedsCheck (msgs->torrent, req.index))
2038                if ((err = !tr_torrentCheckPiece (msgs->torrent, req.index)))
2039                    tr_torrentSetLocalError (msgs->torrent, _("Please Verify Local Data! Piece #%"TR_PRIuSIZE" is corrupt."), (size_t)req.index);
2040
2041            if (err)
2042            {
2043                if (fext)
2044                    protocolSendReject (msgs, &req);
2045            }
2046            else
2047            {
2048                const size_t n = evbuffer_get_length (out);
2049                dbgmsg (msgs, "sending block %u:%u->%u", req.index, req.offset, req.length);
2050                assert (n == msglen);
2051                tr_peerIoWriteBuf (msgs->io, out, true);
2052                bytesWritten += n;
2053                msgs->clientSentAnythingAt = now;
2054                tr_historyAdd (&msgs->peer.blocksSentToPeer, tr_time (), 1);
2055            }
2056
2057            evbuffer_free (out);
2058
2059            if (err)
2060            {
2061                bytesWritten = 0;
2062                msgs = NULL;
2063            }
2064        }
2065        else if (fext) /* peer needs a reject message */
2066        {
2067            protocolSendReject (msgs, &req);
2068        }
2069
2070        if (msgs != NULL)
2071            prefetchPieces (msgs);
2072    }
2073
2074    /**
2075    ***  Keepalive
2076    **/
2077
2078    if ((msgs != NULL)
2079        && (msgs->clientSentAnythingAt != 0)
2080        && ((now - msgs->clientSentAnythingAt) > KEEPALIVE_INTERVAL_SECS))
2081    {
2082        dbgmsg (msgs, "sending a keepalive message");
2083        evbuffer_add_uint32 (msgs->outMessages, 0);
2084        pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2085    }
2086
2087    return bytesWritten;
2088}
2089
2090static int
2091peerPulse (void * vmsgs)
2092{
2093    tr_peerMsgs * msgs = vmsgs;
2094    const time_t  now = tr_time ();
2095
2096    if (tr_isPeerIo (msgs->io)) {
2097        updateDesiredRequestCount (msgs);
2098        updateBlockRequests (msgs);
2099        updateMetadataRequests (msgs, now);
2100    }
2101
2102    for (;;)
2103        if (fillOutputBuffer (msgs, now) < 1)
2104            break;
2105
2106    return true; /* loop forever */
2107}
2108
2109void
2110tr_peerMsgsPulse (tr_peerMsgs * msgs)
2111{
2112    if (msgs != NULL)
2113        peerPulse (msgs);
2114}
2115
2116static void
2117gotError (tr_peerIo * io UNUSED, short what, void * vmsgs)
2118{
2119    if (what & BEV_EVENT_TIMEOUT)
2120        dbgmsg (vmsgs, "libevent got a timeout, what=%hd", what);
2121    if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR))
2122        dbgmsg (vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2123               what, errno, tr_strerror (errno));
2124    fireError (vmsgs, ENOTCONN);
2125}
2126
2127static void
2128sendBitfield (tr_peerMsgs * msgs)
2129{
2130    void * bytes;
2131    size_t byte_count = 0;
2132    struct evbuffer * out = msgs->outMessages;
2133
2134    assert (tr_torrentHasMetadata (msgs->torrent));
2135
2136    bytes = tr_torrentCreatePieceBitfield (msgs->torrent, &byte_count);
2137    evbuffer_add_uint32 (out, sizeof (uint8_t) + byte_count);
2138    evbuffer_add_uint8 (out, BT_BITFIELD);
2139    evbuffer_add     (out, bytes, byte_count);
2140    dbgmsg (msgs, "sending bitfield... outMessage size is now %"TR_PRIuSIZE, evbuffer_get_length (out));
2141    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2142
2143    tr_free (bytes);
2144}
2145
2146static void
2147tellPeerWhatWeHave (tr_peerMsgs * msgs)
2148{
2149    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
2150
2151    if (fext && tr_torrentHasAll (msgs->torrent))
2152    {
2153        protocolSendHaveAll (msgs);
2154    }
2155    else if (fext && tr_torrentHasNone (msgs->torrent))
2156    {
2157        protocolSendHaveNone (msgs);
2158    }
2159    else if (!tr_torrentHasNone (msgs->torrent))
2160    {
2161        sendBitfield (msgs);
2162    }
2163}
2164
2165/**
2166***
2167**/
2168
2169/* some peers give us error messages if we send
2170   more than this many peers in a single pex message
2171   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2172#define MAX_PEX_ADDED 50
2173#define MAX_PEX_DROPPED 50
2174
2175typedef struct
2176{
2177    tr_pex *  added;
2178    tr_pex *  dropped;
2179    tr_pex *  elements;
2180    int       addedCount;
2181    int       droppedCount;
2182    int       elementCount;
2183}
2184PexDiffs;
2185
2186static void
2187pexAddedCb (void * vpex, void * userData)
2188{
2189    PexDiffs * diffs = userData;
2190    tr_pex *   pex = vpex;
2191
2192    if (diffs->addedCount < MAX_PEX_ADDED)
2193    {
2194        diffs->added[diffs->addedCount++] = *pex;
2195        diffs->elements[diffs->elementCount++] = *pex;
2196    }
2197}
2198
2199static inline void
2200pexDroppedCb (void * vpex, void * userData)
2201{
2202    PexDiffs * diffs = userData;
2203    tr_pex *   pex = vpex;
2204
2205    if (diffs->droppedCount < MAX_PEX_DROPPED)
2206    {
2207        diffs->dropped[diffs->droppedCount++] = *pex;
2208    }
2209}
2210
2211static inline void
2212pexElementCb (void * vpex, void * userData)
2213{
2214    PexDiffs * diffs = userData;
2215    tr_pex * pex = vpex;
2216
2217    diffs->elements[diffs->elementCount++] = *pex;
2218}
2219
2220typedef void (tr_set_func)(void * element, void * userData);
2221
2222/**
2223 * @brief find the differences and commonalities in two sorted sets
2224 * @param a the first set
2225 * @param aCount the number of elements in the set 'a'
2226 * @param b the second set
2227 * @param bCount the number of elements in the set 'b'
2228 * @param compare the sorting method for both sets
2229 * @param elementSize the sizeof the element in the two sorted sets
2230 * @param in_a called for items in set 'a' but not set 'b'
2231 * @param in_b called for items in set 'b' but not set 'a'
2232 * @param in_both called for items that are in both sets
2233 * @param userData user data passed along to in_a, in_b, and in_both
2234 */
2235static void
2236tr_set_compare (const void * va, size_t aCount,
2237                const void * vb, size_t bCount,
2238                int compare (const void * a, const void * b),
2239                size_t elementSize,
2240                tr_set_func in_a_cb,
2241                tr_set_func in_b_cb,
2242                tr_set_func in_both_cb,
2243                void * userData)
2244{
2245    const uint8_t * a = va;
2246    const uint8_t * b = vb;
2247    const uint8_t * aend = a + elementSize * aCount;
2248    const uint8_t * bend = b + elementSize * bCount;
2249
2250    while (a != aend || b != bend)
2251    {
2252        if (a == aend)
2253        {
2254          (*in_b_cb)((void*)b, userData);
2255            b += elementSize;
2256        }
2257        else if (b == bend)
2258        {
2259          (*in_a_cb)((void*)a, userData);
2260            a += elementSize;
2261        }
2262        else
2263        {
2264            const int val = (*compare)(a, b);
2265
2266            if (!val)
2267            {
2268              (*in_both_cb)((void*)a, userData);
2269                a += elementSize;
2270                b += elementSize;
2271            }
2272            else if (val < 0)
2273            {
2274              (*in_a_cb)((void*)a, userData);
2275                a += elementSize;
2276            }
2277            else if (val > 0)
2278            {
2279              (*in_b_cb)((void*)b, userData);
2280                b += elementSize;
2281            }
2282        }
2283    }
2284}
2285
2286
2287static void
2288sendPex (tr_peerMsgs * msgs)
2289{
2290    if (msgs->peerSupportsPex && tr_torrentAllowsPex (msgs->torrent))
2291    {
2292        PexDiffs diffs;
2293        PexDiffs diffs6;
2294        tr_pex * newPex = NULL;
2295        tr_pex * newPex6 = NULL;
2296        const int newCount = tr_peerMgrGetPeers (msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2297        const int newCount6 = tr_peerMgrGetPeers (msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2298
2299        /* build the diffs */
2300        diffs.added = tr_new (tr_pex, newCount);
2301        diffs.addedCount = 0;
2302        diffs.dropped = tr_new (tr_pex, msgs->pexCount);
2303        diffs.droppedCount = 0;
2304        diffs.elements = tr_new (tr_pex, newCount + msgs->pexCount);
2305        diffs.elementCount = 0;
2306        tr_set_compare (msgs->pex, msgs->pexCount,
2307                        newPex, newCount,
2308                        tr_pexCompare, sizeof (tr_pex),
2309                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs);
2310        diffs6.added = tr_new (tr_pex, newCount6);
2311        diffs6.addedCount = 0;
2312        diffs6.dropped = tr_new (tr_pex, msgs->pexCount6);
2313        diffs6.droppedCount = 0;
2314        diffs6.elements = tr_new (tr_pex, newCount6 + msgs->pexCount6);
2315        diffs6.elementCount = 0;
2316        tr_set_compare (msgs->pex6, msgs->pexCount6,
2317                        newPex6, newCount6,
2318                        tr_pexCompare, sizeof (tr_pex),
2319                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6);
2320        dbgmsg (
2321            msgs,
2322            "pex: old peer count %d+%d, new peer count %d+%d, "
2323            "added %d+%d, removed %d+%d",
2324            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2325            diffs.addedCount, diffs6.addedCount,
2326            diffs.droppedCount, diffs6.droppedCount);
2327
2328        if (!diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2329            !diffs6.droppedCount)
2330        {
2331            tr_free (diffs.elements);
2332            tr_free (diffs6.elements);
2333        }
2334        else
2335        {
2336            int  i;
2337            tr_variant val;
2338            uint8_t * tmp, *walk;
2339            struct evbuffer * payload;
2340            struct evbuffer * out = msgs->outMessages;
2341
2342            /* update peer */
2343            tr_free (msgs->pex);
2344            msgs->pex = diffs.elements;
2345            msgs->pexCount = diffs.elementCount;
2346            tr_free (msgs->pex6);
2347            msgs->pex6 = diffs6.elements;
2348            msgs->pexCount6 = diffs6.elementCount;
2349
2350            /* build the pex payload */
2351            tr_variantInitDict (&val, 3); /* ipv6 support: left as 3:
2352                                         * speed vs. likelihood? */
2353
2354            if (diffs.addedCount > 0)
2355            {
2356                /* "added" */
2357                tmp = walk = tr_new (uint8_t, diffs.addedCount * 6);
2358                for (i = 0; i < diffs.addedCount; ++i) {
2359                    memcpy (walk, &diffs.added[i].addr.addr, 4); walk += 4;
2360                    memcpy (walk, &diffs.added[i].port, 2); walk += 2;
2361                }
2362                assert ((walk - tmp) == diffs.addedCount * 6);
2363                tr_variantDictAddRaw (&val, TR_KEY_added, tmp, walk - tmp);
2364                tr_free (tmp);
2365
2366                /* "added.f"
2367                 * unset each holepunch flag because we don't support it. */
2368                tmp = walk = tr_new (uint8_t, diffs.addedCount);
2369                for (i = 0; i < diffs.addedCount; ++i)
2370                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2371                assert ((walk - tmp) == diffs.addedCount);
2372                tr_variantDictAddRaw (&val, TR_KEY_added_f, tmp, walk - tmp);
2373                tr_free (tmp);
2374            }
2375
2376            if (diffs.droppedCount > 0)
2377            {
2378                /* "dropped" */
2379                tmp = walk = tr_new (uint8_t, diffs.droppedCount * 6);
2380                for (i = 0; i < diffs.droppedCount; ++i) {
2381                    memcpy (walk, &diffs.dropped[i].addr.addr, 4); walk += 4;
2382                    memcpy (walk, &diffs.dropped[i].port, 2); walk += 2;
2383                }
2384                assert ((walk - tmp) == diffs.droppedCount * 6);
2385                tr_variantDictAddRaw (&val, TR_KEY_dropped, tmp, walk - tmp);
2386                tr_free (tmp);
2387            }
2388
2389            if (diffs6.addedCount > 0)
2390            {
2391                /* "added6" */
2392                tmp = walk = tr_new (uint8_t, diffs6.addedCount * 18);
2393                for (i = 0; i < diffs6.addedCount; ++i) {
2394                    memcpy (walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16);
2395                    walk += 16;
2396                    memcpy (walk, &diffs6.added[i].port, 2);
2397                    walk += 2;
2398                }
2399                assert ((walk - tmp) == diffs6.addedCount * 18);
2400                tr_variantDictAddRaw (&val, TR_KEY_added6, tmp, walk - tmp);
2401                tr_free (tmp);
2402
2403                /* "added6.f"
2404                 * unset each holepunch flag because we don't support it. */
2405                tmp = walk = tr_new (uint8_t, diffs6.addedCount);
2406                for (i = 0; i < diffs6.addedCount; ++i)
2407                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2408                assert ((walk - tmp) == diffs6.addedCount);
2409                tr_variantDictAddRaw (&val, TR_KEY_added6_f, tmp, walk - tmp);
2410                tr_free (tmp);
2411            }
2412
2413            if (diffs6.droppedCount > 0)
2414            {
2415                /* "dropped6" */
2416                tmp = walk = tr_new (uint8_t, diffs6.droppedCount * 18);
2417                for (i = 0; i < diffs6.droppedCount; ++i) {
2418                    memcpy (walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16);
2419                    walk += 16;
2420                    memcpy (walk, &diffs6.dropped[i].port, 2);
2421                    walk += 2;
2422                }
2423                assert ((walk - tmp) == diffs6.droppedCount * 18);
2424                tr_variantDictAddRaw (&val, TR_KEY_dropped6, tmp, walk - tmp);
2425                tr_free (tmp);
2426            }
2427
2428            /* write the pex message */
2429            payload = tr_variantToBuf (&val, TR_VARIANT_FMT_BENC);
2430            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
2431            evbuffer_add_uint8 (out, BT_LTEP);
2432            evbuffer_add_uint8 (out, msgs->ut_pex_id);
2433            evbuffer_add_buffer (out, payload);
2434            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
2435            dbgmsg (msgs, "sending a pex message; outMessage size is now %"TR_PRIuSIZE, evbuffer_get_length (out));
2436            dbgOutMessageLen (msgs);
2437
2438            evbuffer_free (payload);
2439            tr_variantFree (&val);
2440        }
2441
2442        /* cleanup */
2443        tr_free (diffs.added);
2444        tr_free (diffs.dropped);
2445        tr_free (newPex);
2446        tr_free (diffs6.added);
2447        tr_free (diffs6.dropped);
2448        tr_free (newPex6);
2449
2450        /*msgs->clientSentPexAt = tr_time ();*/
2451    }
2452}
2453
2454static void
2455pexPulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmsgs)
2456{
2457    struct tr_peerMsgs * msgs = vmsgs;
2458
2459    sendPex (msgs);
2460
2461    assert (msgs->pexTimer != NULL);
2462    tr_timerAdd (msgs->pexTimer, PEX_INTERVAL_SECS, 0);
2463}
2464
2465/***
2466****  tr_peer virtual functions
2467***/
2468
2469static bool
2470peermsgs_is_transferring_pieces (const struct tr_peer * peer,
2471                                 uint64_t               now,
2472                                 tr_direction           direction, 
2473                                 unsigned int         * setme_Bps)
2474{
2475  unsigned int Bps = 0;
2476
2477  if (tr_isPeerMsgs (peer))
2478    {
2479      const tr_peerMsgs * msgs = (const tr_peerMsgs *) peer;
2480      Bps = tr_peerIoGetPieceSpeed_Bps (msgs->io, now, direction);
2481    }
2482
2483  if (setme_Bps != NULL)
2484    *setme_Bps = Bps;
2485
2486  return Bps > 0;
2487}
2488
2489static void
2490peermsgs_destruct (tr_peer * peer)
2491{
2492  tr_peerMsgs * msgs = PEER_MSGS (peer);
2493
2494  assert (msgs != NULL);
2495
2496  tr_peerMsgsSetActive (msgs, TR_UP, false);
2497  tr_peerMsgsSetActive (msgs, TR_DOWN, false);
2498
2499  if (msgs->pexTimer != NULL)
2500    event_free (msgs->pexTimer);
2501
2502  if (msgs->incoming.block != NULL)
2503    evbuffer_free (msgs->incoming.block);
2504
2505  if (msgs->io)
2506    {
2507      tr_peerIoClear (msgs->io);
2508      tr_peerIoUnref (msgs->io); /* balanced by the ref in handshakeDoneCB () */
2509    }
2510
2511  evbuffer_free (msgs->outMessages);
2512  tr_free (msgs->pex6);
2513  tr_free (msgs->pex);
2514
2515  tr_peerDestruct (&msgs->peer);
2516
2517  memset (msgs, ~0, sizeof (tr_peerMsgs));
2518}
2519
2520static const struct tr_peer_virtual_funcs my_funcs =
2521{
2522  .destruct = peermsgs_destruct,
2523  .is_transferring_pieces = peermsgs_is_transferring_pieces
2524};
2525
2526/***
2527****
2528***/
2529
2530time_t
2531tr_peerMsgsGetConnectionAge (const tr_peerMsgs * msgs)
2532{
2533  assert (tr_isPeerMsgs (msgs));
2534
2535  return tr_peerIoGetAge (msgs->io);
2536}
2537
2538bool
2539tr_peerMsgsIsPeerChoked (const tr_peerMsgs * msgs)
2540{
2541  assert (tr_isPeerMsgs (msgs));
2542
2543  return msgs->peer_is_choked;
2544}
2545
2546bool
2547tr_peerMsgsIsPeerInterested (const tr_peerMsgs * msgs)
2548{
2549  assert (tr_isPeerMsgs (msgs));
2550
2551  return msgs->peer_is_interested;
2552}
2553
2554bool
2555tr_peerMsgsIsClientChoked (const tr_peerMsgs * msgs)
2556{
2557  assert (tr_isPeerMsgs (msgs));
2558
2559  return msgs->client_is_choked;
2560}
2561
2562bool
2563tr_peerMsgsIsClientInterested (const tr_peerMsgs * msgs)
2564{
2565  assert (tr_isPeerMsgs (msgs));
2566
2567  return msgs->client_is_interested;
2568}
2569
2570bool
2571tr_peerMsgsIsUtpConnection (const tr_peerMsgs * msgs)
2572{
2573  assert (tr_isPeerMsgs (msgs));
2574
2575  return msgs->io->utp_socket != NULL;
2576}
2577
2578bool
2579tr_peerMsgsIsEncrypted (const tr_peerMsgs * msgs)
2580{
2581  assert (tr_isPeerMsgs (msgs));
2582
2583  return tr_peerIoIsEncrypted (msgs->io);
2584}
2585
2586bool
2587tr_peerMsgsIsIncomingConnection (const tr_peerMsgs * msgs)
2588{
2589  assert (tr_isPeerMsgs (msgs));
2590
2591  return tr_peerIoIsIncoming (msgs->io);
2592}
2593
2594/***
2595****
2596***/
2597
2598bool
2599tr_isPeerMsgs (const void * msgs)
2600{
2601  /* FIXME: this is pretty crude */
2602  return (msgs != NULL)
2603      && (((struct tr_peerMsgs*)msgs)->magic_number == MAGIC_NUMBER);
2604}
2605
2606tr_peerMsgs *
2607tr_peerMsgsCast (void * vm)
2608{
2609  return tr_isPeerMsgs(vm) ? vm : NULL;
2610}
2611
2612tr_peerMsgs *
2613tr_peerMsgsNew (struct tr_torrent    * torrent,
2614                struct tr_peerIo     * io,
2615                tr_peer_callback       callback,
2616                void                 * callbackData)
2617{
2618  tr_peerMsgs * m;
2619
2620  assert (io != NULL);
2621
2622  m = tr_new0 (tr_peerMsgs, 1);
2623
2624  tr_peerConstruct (&m->peer, torrent);
2625  m->peer.funcs = &my_funcs;
2626
2627  m->magic_number = MAGIC_NUMBER;
2628  m->client_is_choked = true;
2629  m->peer_is_choked = true;
2630  m->client_is_interested = false;
2631  m->peer_is_interested = false;
2632  m->is_active[TR_UP] = false;
2633  m->is_active[TR_DOWN] = false;
2634  m->callback = callback;
2635  m->callbackData = callbackData;
2636  m->io = io;
2637  m->torrent = torrent;
2638  m->state = AWAITING_BT_LENGTH;
2639  m->outMessages = evbuffer_new ();
2640  m->outMessagesBatchedAt = 0;
2641  m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2642
2643  if (tr_torrentAllowsPex (torrent))
2644    {
2645      m->pexTimer = evtimer_new (torrent->session->event_base, pexPulse, m);
2646      tr_timerAdd (m->pexTimer, PEX_INTERVAL_SECS, 0);
2647    }
2648
2649  if (tr_peerIoSupportsUTP (m->io))
2650    {
2651      const tr_address * addr = tr_peerIoGetAddress (m->io, NULL);
2652      tr_peerMgrSetUtpSupported (torrent, addr);
2653      tr_peerMgrSetUtpFailed (torrent, addr, false);
2654    }
2655
2656  if (tr_peerIoSupportsLTEP (m->io))
2657    sendLtepHandshake (m);
2658
2659  tellPeerWhatWeHave (m);
2660
2661  if (tr_dhtEnabled (torrent->session) && tr_peerIoSupportsDHT (m->io))
2662    {
2663      /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2664      const struct tr_address *addr = tr_peerIoGetAddress (m->io, NULL);
2665      if (addr->type == TR_AF_INET || tr_globalIPv6 ())
2666        protocolSendPort (m, tr_dhtPort (torrent->session));
2667    }
2668
2669  tr_peerIoSetIOFuncs (m->io, canRead, didWrite, gotError, m);
2670  updateDesiredRequestCount (m);
2671
2672  return m;
2673}
Note: See TracBrowser for help on using the repository browser.