source: trunk/libtransmission/peer-msgs.c @ 14169

Last change on this file since 14169 was 14169, checked in by jordan, 8 years ago

add inline wrapper functions to tr_torrent to decouple the rest of the code from tr_completion

  • Property svn:keywords set to Date Rev Author Id
File size: 75.2 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2 (b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 14169 2013-08-18 13:06:39Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdarg.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <event2/buffer.h>
20#include <event2/bufferevent.h>
21#include <event2/event.h>
22
23#include "transmission.h"
24#include "cache.h"
25#include "completion.h"
26#include "crypto.h" /* tr_sha1 () */
27#include "log.h"
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-msgs.h"
31#include "session.h"
32#include "torrent.h"
33#include "torrent-magnet.h"
34#include "tr-dht.h"
35#include "utils.h"
36#include "variant.h"
37#include "version.h"
38
39/**
40***
41**/
42
43enum
44{
45  BT_CHOKE                = 0,
46  BT_UNCHOKE              = 1,
47  BT_INTERESTED           = 2,
48  BT_NOT_INTERESTED       = 3,
49  BT_HAVE                 = 4,
50  BT_BITFIELD             = 5,
51  BT_REQUEST              = 6,
52  BT_PIECE                = 7,
53  BT_CANCEL               = 8,
54  BT_PORT                 = 9,
55
56  BT_FEXT_SUGGEST         = 13,
57  BT_FEXT_HAVE_ALL        = 14,
58  BT_FEXT_HAVE_NONE       = 15,
59  BT_FEXT_REJECT          = 16,
60  BT_FEXT_ALLOWED_FAST    = 17,
61
62  BT_LTEP                 = 20,
63
64  LTEP_HANDSHAKE          = 0,
65
66  UT_PEX_ID               = 1,
67  UT_METADATA_ID          = 3,
68
69  MAX_PEX_PEER_COUNT      = 50,
70
71  MIN_CHOKE_PERIOD_SEC    = 10,
72
73  /* idle seconds before we send a keepalive */
74  KEEPALIVE_INTERVAL_SECS = 100,
75
76  PEX_INTERVAL_SECS       = 90, /* sec between sendPex () calls */
77
78  REQQ                    = 512,
79
80  METADATA_REQQ           = 64,
81
82  MAGIC_NUMBER            = 21549,
83
84  /* used in lowering the outMessages queue period */
85  IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
86  HIGH_PRIORITY_INTERVAL_SECS = 2,
87  LOW_PRIORITY_INTERVAL_SECS = 10,
88
89  /* number of pieces we'll allow in our fast set */
90  MAX_FAST_SET_SIZE = 3,
91
92  /* how many blocks to keep prefetched per peer */
93  PREFETCH_SIZE = 18,
94
95  /* when we're making requests from another peer,
96     batch them together to send enough requests to
97     meet our bandwidth goals for the next N seconds */
98  REQUEST_BUF_SECS = 10,
99
100  /* defined in BEP #9 */
101  METADATA_MSG_TYPE_REQUEST = 0,
102  METADATA_MSG_TYPE_DATA = 1,
103  METADATA_MSG_TYPE_REJECT = 2
104};
105
106enum
107{
108  AWAITING_BT_LENGTH,
109  AWAITING_BT_ID,
110  AWAITING_BT_MESSAGE,
111  AWAITING_BT_PIECE
112};
113
114typedef enum
115{
116  ENCRYPTION_PREFERENCE_UNKNOWN,
117  ENCRYPTION_PREFERENCE_YES,
118  ENCRYPTION_PREFERENCE_NO
119}
120encryption_preference_t;
121
122/**
123***
124**/
125
126struct peer_request
127{
128  uint32_t index;
129  uint32_t offset;
130  uint32_t length;
131};
132
133static void
134blockToReq (const tr_torrent     * tor,
135            tr_block_index_t       block,
136            struct peer_request  * setme)
137{
138  tr_torrentGetBlockLocation (tor, block, &setme->index,
139                                          &setme->offset,
140                                          &setme->length);
141}
142
143/**
144***
145**/
146
147/* this is raw, unchanged data from the peer regarding
148 * the current message that it's sending us. */
149struct tr_incoming
150{
151  uint8_t                id;
152  uint32_t               length; /* includes the +1 for id length */
153  struct peer_request    blockReq; /* metadata for incoming blocks */
154  struct evbuffer      * block; /* piece data for incoming blocks */
155};
156
157/**
158 * Low-level communication state information about a connected peer.
159 *
160 * This structure remembers the low-level protocol states that we're
161 * in with this peer, such as active requests, pex messages, and so on.
162 * Its fields are all private to peer-msgs.c.
163 *
164 * Data not directly involved with sending & receiving messages is
165 * stored in tr_peer, where it can be accessed by both peermsgs and
166 * the peer manager.
167 *
168 * @see struct peer_atom
169 * @see tr_peer
170 */
171struct tr_peerMsgs
172{
173  struct tr_peer peer; /* parent */
174
175  uint16_t magic_number;
176
177  /* Whether or not we've choked this peer. */
178  bool peer_is_choked;
179
180  /* whether or not the peer has indicated it will download from us. */
181  bool peer_is_interested;
182
183  /* whether or the peer is choking us. */
184  bool client_is_choked;
185
186  /* whether or not we've indicated to the peer that we would download from them if unchoked. */
187  bool client_is_interested;
188
189
190  bool peerSupportsPex;
191  bool peerSupportsMetadataXfer;
192  bool clientSentLtepHandshake;
193  bool peerSentLtepHandshake;
194
195  /*bool haveFastSet;*/
196
197  int desiredRequestCount;
198
199  int prefetchCount;
200
201  int is_active[2];
202
203  /* how long the outMessages batch should be allowed to grow before
204   * it's flushed -- some messages (like requests >:) should be sent
205   * very quickly; others aren't as urgent. */
206  int8_t          outMessagesBatchPeriod;
207
208  uint8_t         state;
209  uint8_t         ut_pex_id;
210  uint8_t         ut_metadata_id;
211  uint16_t        pexCount;
212  uint16_t        pexCount6;
213
214  tr_port         dht_port;
215
216  encryption_preference_t  encryption_preference;
217
218  size_t                   metadata_size_hint;
219#if 0
220  size_t                 fastsetSize;
221  tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
222#endif
223
224  tr_torrent *           torrent;
225
226  tr_peer_callback      * callback;
227  void                  * callbackData;
228
229  struct evbuffer *      outMessages; /* all the non-piece messages */
230
231  struct peer_request    peerAskedFor[REQQ];
232
233  int peerAskedForMetadata[METADATA_REQQ];
234  int peerAskedForMetadataCount;
235
236  tr_pex * pex;
237  tr_pex * pex6;
238
239  /*time_t clientSentPexAt;*/
240  time_t clientSentAnythingAt;
241
242  time_t chokeChangedAt;
243
244  /* when we started batching the outMessages */
245  time_t outMessagesBatchedAt;
246
247  struct tr_incoming    incoming;
248
249  /* if the peer supports the Extension Protocol in BEP 10 and
250     supplied a reqq argument, it's stored here. Otherwise, the
251     value is zero and should be ignored. */
252  int64_t reqq;
253
254  struct event * pexTimer;
255
256  struct tr_peerIo * io;
257};
258
259/**
260***
261**/
262
263static inline tr_session*
264getSession (struct tr_peerMsgs * msgs)
265{
266  return msgs->torrent->session;
267}
268
269/**
270***
271**/
272
273static void
274myDebug (const char * file, int line,
275         const struct tr_peerMsgs * msgs,
276         const char * fmt, ...) TR_GNUC_PRINTF(4, 5);
277
278static void
279myDebug (const char * file, int line,
280         const struct tr_peerMsgs * msgs,
281         const char * fmt, ...)
282{
283  FILE * fp = tr_logGetFile ();
284
285  if (fp)
286    {
287      va_list           args;
288      char              timestr[64];
289      struct evbuffer * buf = evbuffer_new ();
290      char *            base = tr_basename (file);
291      char *            message;
292
293      evbuffer_add_printf (buf, "[%s] %s - %s [%s]: ",
294                           tr_logGetTimeStr (timestr, sizeof (timestr)),
295                           tr_torrentName (msgs->torrent),
296                           tr_peerIoGetAddrStr (msgs->io),
297                           tr_quark_get_string (msgs->peer.client, NULL));
298      va_start (args, fmt);
299      evbuffer_add_vprintf (buf, fmt, args);
300      va_end (args);
301      evbuffer_add_printf (buf, " (%s:%d)\n", base, line);
302
303      message = evbuffer_free_to_str (buf);
304      fputs (message, fp);
305
306      tr_free (base);
307      tr_free (message);
308    }
309}
310
311#define dbgmsg(msgs, ...) \
312  do \
313    { \
314      if (tr_logGetDeepEnabled ()) \
315        myDebug (__FILE__, __LINE__, msgs, __VA_ARGS__); \
316    } \
317  while (0)
318
319/**
320***
321**/
322
323static void
324pokeBatchPeriod (tr_peerMsgs * msgs, int interval)
325{
326  if (msgs->outMessagesBatchPeriod > interval)
327    {
328      msgs->outMessagesBatchPeriod = interval;
329      dbgmsg (msgs, "lowering batch interval to %d seconds", interval);
330    }
331}
332
333static void
334dbgOutMessageLen (tr_peerMsgs * msgs)
335{
336  dbgmsg (msgs, "outMessage size is now %zu", evbuffer_get_length (msgs->outMessages));
337}
338
339static void
340protocolSendReject (tr_peerMsgs * msgs, const struct peer_request * req)
341{
342  struct evbuffer * out = msgs->outMessages;
343
344  assert (tr_peerIoSupportsFEXT (msgs->io));
345
346  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
347  evbuffer_add_uint8 (out, BT_FEXT_REJECT);
348  evbuffer_add_uint32 (out, req->index);
349  evbuffer_add_uint32 (out, req->offset);
350  evbuffer_add_uint32 (out, req->length);
351
352  dbgmsg (msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length);
353  dbgOutMessageLen (msgs);
354}
355
356static void
357protocolSendRequest (tr_peerMsgs * msgs, const struct peer_request * req)
358{
359  struct evbuffer * out = msgs->outMessages;
360
361  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
362  evbuffer_add_uint8 (out, BT_REQUEST);
363  evbuffer_add_uint32 (out, req->index);
364  evbuffer_add_uint32 (out, req->offset);
365  evbuffer_add_uint32 (out, req->length);
366
367  dbgmsg (msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length);
368  dbgOutMessageLen (msgs);
369  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
370}
371
372static void
373protocolSendCancel (tr_peerMsgs * msgs, const struct peer_request * req)
374{
375  struct evbuffer * out = msgs->outMessages;
376
377  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
378  evbuffer_add_uint8 (out, BT_CANCEL);
379  evbuffer_add_uint32 (out, req->index);
380  evbuffer_add_uint32 (out, req->offset);
381  evbuffer_add_uint32 (out, req->length);
382
383  dbgmsg (msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length);
384  dbgOutMessageLen (msgs);
385  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
386}
387
388static void
389protocolSendPort (tr_peerMsgs *msgs, uint16_t port)
390{
391  struct evbuffer * out = msgs->outMessages;
392
393  dbgmsg (msgs, "sending Port %u", port);
394  evbuffer_add_uint32 (out, 3);
395  evbuffer_add_uint8 (out, BT_PORT);
396  evbuffer_add_uint16 (out, port);
397}
398
399static void
400protocolSendHave (tr_peerMsgs * msgs, uint32_t index)
401{
402  struct evbuffer * out = msgs->outMessages;
403
404  evbuffer_add_uint32 (out, sizeof (uint8_t) + sizeof (uint32_t));
405  evbuffer_add_uint8 (out, BT_HAVE);
406  evbuffer_add_uint32 (out, index);
407
408  dbgmsg (msgs, "sending Have %u", index);
409  dbgOutMessageLen (msgs);
410  pokeBatchPeriod (msgs, LOW_PRIORITY_INTERVAL_SECS);
411}
412
413#if 0
414static void
415protocolSendAllowedFast (tr_peerMsgs * msgs, uint32_t pieceIndex)
416{
417  tr_peerIo       * io  = msgs->io;
418  struct evbuffer * out = msgs->outMessages;
419
420  assert (tr_peerIoSupportsFEXT (msgs->io));
421
422  evbuffer_add_uint32 (io, out, sizeof (uint8_t) + sizeof (uint32_t));
423  evbuffer_add_uint8 (io, out, BT_FEXT_ALLOWED_FAST);
424  evbuffer_add_uint32 (io, out, pieceIndex);
425
426  dbgmsg (msgs, "sending Allowed Fast %u...", pieceIndex);
427  dbgOutMessageLen (msgs);
428}
429#endif
430
431static void
432protocolSendChoke (tr_peerMsgs * msgs, int choke)
433{
434  struct evbuffer * out = msgs->outMessages;
435
436  evbuffer_add_uint32 (out, sizeof (uint8_t));
437  evbuffer_add_uint8 (out, choke ? BT_CHOKE : BT_UNCHOKE);
438
439  dbgmsg (msgs, "sending %s...", choke ? "Choke" : "Unchoke");
440  dbgOutMessageLen (msgs);
441  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
442}
443
444static void
445protocolSendHaveAll (tr_peerMsgs * msgs)
446{
447  struct evbuffer * out = msgs->outMessages;
448
449  assert (tr_peerIoSupportsFEXT (msgs->io));
450
451  evbuffer_add_uint32 (out, sizeof (uint8_t));
452  evbuffer_add_uint8 (out, BT_FEXT_HAVE_ALL);
453
454  dbgmsg (msgs, "sending HAVE_ALL...");
455  dbgOutMessageLen (msgs);
456  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
457}
458
459static void
460protocolSendHaveNone (tr_peerMsgs * msgs)
461{
462  struct evbuffer * out = msgs->outMessages;
463
464  assert (tr_peerIoSupportsFEXT (msgs->io));
465
466  evbuffer_add_uint32 (out, sizeof (uint8_t));
467  evbuffer_add_uint8 (out, BT_FEXT_HAVE_NONE);
468
469  dbgmsg (msgs, "sending HAVE_NONE...");
470  dbgOutMessageLen (msgs);
471  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
472}
473
474/**
475***  EVENTS
476**/
477
478static void
479publish (tr_peerMsgs * msgs, tr_peer_event * e)
480{
481  if (msgs->callback != NULL)
482    msgs->callback (&msgs->peer, e, msgs->callbackData);
483}
484
485static void
486fireError (tr_peerMsgs * msgs, int err)
487{
488  tr_peer_event e = TR_PEER_EVENT_INIT;
489  e.eventType = TR_PEER_ERROR;
490  e.err = err;
491  publish (msgs, &e);
492}
493
494static void
495fireGotBlock (tr_peerMsgs * msgs, const struct peer_request * req)
496{
497  tr_peer_event e = TR_PEER_EVENT_INIT;
498  e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
499  e.pieceIndex = req->index;
500  e.offset = req->offset;
501  e.length = req->length;
502  publish (msgs, &e);
503}
504
505static void
506fireGotRej (tr_peerMsgs * msgs, const struct peer_request * req)
507{
508  tr_peer_event e = TR_PEER_EVENT_INIT;
509  e.eventType = TR_PEER_CLIENT_GOT_REJ;
510  e.pieceIndex = req->index;
511  e.offset = req->offset;
512  e.length = req->length;
513  publish (msgs, &e);
514}
515
516static void
517fireGotChoke (tr_peerMsgs * msgs)
518{
519  tr_peer_event e = TR_PEER_EVENT_INIT;
520  e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
521  publish (msgs, &e);
522}
523
524static void
525fireClientGotHaveAll (tr_peerMsgs * msgs)
526{
527  tr_peer_event e = TR_PEER_EVENT_INIT;
528  e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
529  publish (msgs, &e);
530}
531
532static void
533fireClientGotHaveNone (tr_peerMsgs * msgs)
534{
535  tr_peer_event e = TR_PEER_EVENT_INIT;
536  e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
537  publish (msgs, &e);
538}
539
540static void
541fireClientGotPieceData (tr_peerMsgs * msgs, uint32_t length)
542{
543  tr_peer_event e = TR_PEER_EVENT_INIT;
544  e.length = length;
545  e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA;
546  publish (msgs, &e);
547}
548
549static void
550firePeerGotPieceData (tr_peerMsgs * msgs, uint32_t length)
551{
552  tr_peer_event e = TR_PEER_EVENT_INIT;
553  e.length = length;
554  e.eventType = TR_PEER_PEER_GOT_PIECE_DATA;
555  publish (msgs, &e);
556}
557
558static void
559fireClientGotSuggest (tr_peerMsgs * msgs, uint32_t pieceIndex)
560{
561  tr_peer_event e = TR_PEER_EVENT_INIT;
562  e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
563  e.pieceIndex = pieceIndex;
564  publish (msgs, &e);
565}
566
567static void
568fireClientGotPort (tr_peerMsgs * msgs, tr_port port)
569{
570  tr_peer_event e = TR_PEER_EVENT_INIT;
571  e.eventType = TR_PEER_CLIENT_GOT_PORT;
572  e.port = port;
573  publish (msgs, &e);
574}
575
576static void
577fireClientGotAllowedFast (tr_peerMsgs * msgs, uint32_t pieceIndex)
578{
579  tr_peer_event e = TR_PEER_EVENT_INIT;
580  e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
581  e.pieceIndex = pieceIndex;
582  publish (msgs, &e);
583}
584
585static void
586fireClientGotBitfield (tr_peerMsgs * msgs, tr_bitfield * bitfield)
587{
588  tr_peer_event e = TR_PEER_EVENT_INIT;
589  e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
590  e.bitfield = bitfield;
591  publish (msgs, &e);
592}
593
594static void
595fireClientGotHave (tr_peerMsgs * msgs, tr_piece_index_t index)
596{
597  tr_peer_event e = TR_PEER_EVENT_INIT;
598  e.eventType = TR_PEER_CLIENT_GOT_HAVE;
599  e.pieceIndex = index;
600  publish (msgs, &e);
601}
602
603
604/**
605***  ALLOWED FAST SET
606***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
607**/
608
609#if 0
610size_t
611tr_generateAllowedSet (tr_piece_index_t * setmePieces,
612                       size_t             desiredSetSize,
613                       size_t             pieceCount,
614                       const uint8_t    * infohash,
615                       const tr_address * addr)
616{
617    size_t setSize = 0;
618
619    assert (setmePieces);
620    assert (desiredSetSize <= pieceCount);
621    assert (desiredSetSize);
622    assert (pieceCount);
623    assert (infohash);
624    assert (addr);
625
626    if (addr->type == TR_AF_INET)
627    {
628        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
629        uint8_t x[SHA_DIGEST_LENGTH];
630
631        uint32_t ui32 = ntohl (htonl (addr->addr.addr4.s_addr) & 0xffffff00);   /* (1) */
632        memcpy (w, &ui32, sizeof (uint32_t));
633        walk += sizeof (uint32_t);
634        memcpy (walk, infohash, SHA_DIGEST_LENGTH);                 /* (2) */
635        walk += SHA_DIGEST_LENGTH;
636        tr_sha1 (x, w, walk-w, NULL);                               /* (3) */
637        assert (sizeof (w) == walk-w);
638
639        while (setSize<desiredSetSize)
640        {
641            int i;
642            for (i=0; i<5 && setSize<desiredSetSize; ++i)           /* (4) */
643            {
644                size_t k;
645                uint32_t j = i * 4;                                  /* (5) */
646                uint32_t y = ntohl (* (uint32_t*)(x + j));       /* (6) */
647                uint32_t index = y % pieceCount;                     /* (7) */
648
649                for (k=0; k<setSize; ++k)                           /* (8) */
650                    if (setmePieces[k] == index)
651                        break;
652
653                if (k == setSize)
654                    setmePieces[setSize++] = index;                  /* (9) */
655            }
656
657            tr_sha1 (x, x, sizeof (x), NULL);                      /* (3) */
658        }
659    }
660
661    return setSize;
662}
663
664static void
665updateFastSet (tr_peerMsgs * msgs UNUSED)
666{
667    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
668    const int peerIsNeedy = msgs->peer->progress < 0.10;
669
670    if (fext && peerIsNeedy && !msgs->haveFastSet)
671    {
672        size_t i;
673        const struct tr_address * addr = tr_peerIoGetAddress (msgs->io, NULL);
674        const tr_info * inf = &msgs->torrent->info;
675        const size_t numwant = MIN (MAX_FAST_SET_SIZE, inf->pieceCount);
676
677        /* build the fast set */
678        msgs->fastsetSize = tr_generateAllowedSet (msgs->fastset, numwant, inf->pieceCount, inf->hash, addr);
679        msgs->haveFastSet = 1;
680
681        /* send it to the peer */
682        for (i=0; i<msgs->fastsetSize; ++i)
683            protocolSendAllowedFast (msgs, msgs->fastset[i]);
684    }
685}
686#endif
687
688/***
689****  ACTIVE
690***/
691
692static bool 
693tr_peerMsgsCalculateActive (const tr_peerMsgs * msgs, tr_direction direction)
694{
695  bool is_active;
696
697  assert (tr_isPeerMsgs (msgs));
698  assert (tr_isDirection (direction));
699
700  if (direction == TR_CLIENT_TO_PEER)
701    {
702      is_active = tr_peerMsgsIsPeerInterested (msgs)
703              && !tr_peerMsgsIsPeerChoked (msgs);
704
705      if (is_active)
706        assert (!tr_peerIsSeed (&msgs->peer));
707    }
708  else /* TR_PEER_TO_CLIENT */
709    {
710      if (!tr_torrentHasMetadata (msgs->torrent))
711        {
712          is_active = true;
713        }
714      else
715        {
716          is_active = tr_peerMsgsIsClientInterested (msgs)
717                  && !tr_peerMsgsIsClientChoked (msgs);
718
719          if (is_active)
720            assert (!tr_torrentIsSeed (msgs->torrent));
721        }
722    }
723
724  return is_active;
725}
726
727bool
728tr_peerMsgsIsActive (const tr_peerMsgs  * msgs, tr_direction direction)
729{
730  bool is_active;
731
732  assert (tr_isPeerMsgs (msgs));
733  assert (tr_isDirection (direction));
734
735  is_active = msgs->is_active[direction];
736
737  assert (is_active == tr_peerMsgsCalculateActive (msgs, direction));
738
739  return is_active;
740}
741
742static void
743tr_peerMsgsSetActive (tr_peerMsgs  * msgs,
744                      tr_direction   direction,
745                      bool           is_active)
746{
747  dbgmsg (msgs, "direction [%d] is_active [%d]", (int)direction, (int)is_active);
748
749  if (msgs->is_active[direction] != is_active)
750    {
751      msgs->is_active[direction] = is_active;
752
753      tr_swarmIncrementActivePeers (msgs->torrent->swarm, direction, is_active);
754    }
755}
756
757void
758tr_peerMsgsUpdateActive (tr_peerMsgs * msgs, tr_direction direction)
759{
760  const bool is_active = tr_peerMsgsCalculateActive (msgs, direction);
761
762  tr_peerMsgsSetActive (msgs, direction, is_active);
763}
764
765/**
766***  INTEREST
767**/
768
769static void
770sendInterest (tr_peerMsgs * msgs, bool b)
771{
772  struct evbuffer * out = msgs->outMessages;
773
774  assert (msgs);
775  assert (tr_isBool (b));
776
777  msgs->client_is_interested = b;
778  dbgmsg (msgs, "Sending %s", b ? "Interested" : "Not Interested");
779  evbuffer_add_uint32 (out, sizeof (uint8_t));
780  evbuffer_add_uint8 (out, b ? BT_INTERESTED : BT_NOT_INTERESTED);
781
782  pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
783  dbgOutMessageLen (msgs);
784}
785
786static void
787updateInterest (tr_peerMsgs * msgs UNUSED)
788{
789    /* FIXME -- might need to poke the mgr on startup */
790}
791
792void
793tr_peerMsgsSetInterested (tr_peerMsgs * msgs, bool b)
794{
795  assert (tr_isBool (b));
796
797  if (msgs->client_is_interested != b)
798    {
799      sendInterest (msgs, b);
800
801      tr_peerMsgsUpdateActive (msgs, TR_PEER_TO_CLIENT);
802    }
803}
804
805static bool
806popNextMetadataRequest (tr_peerMsgs * msgs, int * piece)
807{
808  if (msgs->peerAskedForMetadataCount == 0)
809    return false;
810
811  *piece = msgs->peerAskedForMetadata[0];
812
813  tr_removeElementFromArray (msgs->peerAskedForMetadata, 0, sizeof (int),
814                             msgs->peerAskedForMetadataCount--);
815
816  return true;
817}
818
819static bool
820popNextRequest (tr_peerMsgs * msgs, struct peer_request * setme)
821{
822  if (msgs->peer.pendingReqsToClient == 0)
823    return false;
824
825  *setme = msgs->peerAskedFor[0];
826
827  tr_removeElementFromArray (msgs->peerAskedFor,
828                             0,
829                             sizeof (struct peer_request),
830                             msgs->peer.pendingReqsToClient--);
831
832  return true;
833}
834
835static void
836cancelAllRequestsToClient (tr_peerMsgs * msgs)
837{
838  struct peer_request req;
839  const int mustSendCancel = tr_peerIoSupportsFEXT (msgs->io);
840
841  while (popNextRequest (msgs, &req))
842    if (mustSendCancel)
843      protocolSendReject (msgs, &req);
844}
845
846void
847tr_peerMsgsSetChoke (tr_peerMsgs * msgs, bool peer_is_choked)
848{
849  const time_t now = tr_time ();
850  const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
851
852  assert (msgs != NULL);
853  assert (tr_isBool (peer_is_choked));
854
855  if (msgs->chokeChangedAt > fibrillationTime)
856    {
857      dbgmsg (msgs, "Not changing choke to %d to avoid fibrillation", peer_is_choked);
858    }
859  else if (msgs->peer_is_choked != peer_is_choked)
860    {
861      msgs->peer_is_choked = peer_is_choked;
862      if (peer_is_choked)
863        cancelAllRequestsToClient (msgs);
864      protocolSendChoke (msgs, peer_is_choked);
865      msgs->chokeChangedAt = now;
866      tr_peerMsgsUpdateActive (msgs, TR_CLIENT_TO_PEER);
867    }
868}
869
870/**
871***
872**/
873
874void
875tr_peerMsgsHave (tr_peerMsgs * msgs, uint32_t index)
876{
877  protocolSendHave (msgs, index);
878
879  /* since we have more pieces now, we might not be interested in this peer */
880  updateInterest (msgs);
881}
882
883/**
884***
885**/
886
887static bool
888reqIsValid (const tr_peerMsgs * peer,
889            uint32_t            index,
890            uint32_t            offset,
891            uint32_t            length)
892{
893    return tr_torrentReqIsValid (peer->torrent, index, offset, length);
894}
895
896static bool
897requestIsValid (const tr_peerMsgs * msgs, const struct peer_request * req)
898{
899    return reqIsValid (msgs, req->index, req->offset, req->length);
900}
901
902void
903tr_peerMsgsCancel (tr_peerMsgs * msgs, tr_block_index_t block)
904{
905    struct peer_request req;
906/*fprintf (stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer);*/
907    blockToReq (msgs->torrent, block, &req);
908    protocolSendCancel (msgs, &req);
909}
910
911/**
912***
913**/
914
915static void
916sendLtepHandshake (tr_peerMsgs * msgs)
917{
918    tr_variant val;
919    bool allow_pex;
920    bool allow_metadata_xfer;
921    struct evbuffer * payload;
922    struct evbuffer * out = msgs->outMessages;
923    const unsigned char * ipv6 = tr_globalIPv6 ();
924    static tr_quark version_quark = 0;
925
926    if (msgs->clientSentLtepHandshake)
927        return;
928
929    if (!version_quark)
930      version_quark = tr_quark_new (TR_NAME " " USERAGENT_PREFIX, -1);
931
932    dbgmsg (msgs, "sending an ltep handshake");
933    msgs->clientSentLtepHandshake = 1;
934
935    /* decide if we want to advertise metadata xfer support (BEP 9) */
936    if (tr_torrentIsPrivate (msgs->torrent))
937        allow_metadata_xfer = 0;
938    else
939        allow_metadata_xfer = 1;
940
941    /* decide if we want to advertise pex support */
942    if (!tr_torrentAllowsPex (msgs->torrent))
943        allow_pex = 0;
944    else if (msgs->peerSentLtepHandshake)
945        allow_pex = msgs->peerSupportsPex ? 1 : 0;
946    else
947        allow_pex = 1;
948
949    tr_variantInitDict (&val, 8);
950    tr_variantDictAddInt (&val, TR_KEY_e, getSession (msgs)->encryptionMode != TR_CLEAR_PREFERRED);
951    if (ipv6 != NULL)
952        tr_variantDictAddRaw (&val, TR_KEY_ipv6, ipv6, 16);
953    if (allow_metadata_xfer && tr_torrentHasMetadata (msgs->torrent)
954                            && (msgs->torrent->infoDictLength > 0))
955        tr_variantDictAddInt (&val, TR_KEY_metadata_size, msgs->torrent->infoDictLength);
956    tr_variantDictAddInt (&val, TR_KEY_p, tr_sessionGetPublicPeerPort (getSession (msgs)));
957    tr_variantDictAddInt (&val, TR_KEY_reqq, REQQ);
958    tr_variantDictAddInt (&val, TR_KEY_upload_only, tr_torrentIsSeed (msgs->torrent));
959    tr_variantDictAddQuark (&val, TR_KEY_v, version_quark);
960    if (allow_metadata_xfer || allow_pex) {
961        tr_variant * m  = tr_variantDictAddDict (&val, TR_KEY_m, 2);
962        if (allow_metadata_xfer)
963            tr_variantDictAddInt (m, TR_KEY_ut_metadata, UT_METADATA_ID);
964        if (allow_pex)
965            tr_variantDictAddInt (m, TR_KEY_ut_pex, UT_PEX_ID);
966    }
967
968    payload = tr_variantToBuf (&val, TR_VARIANT_FMT_BENC);
969
970    evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
971    evbuffer_add_uint8 (out, BT_LTEP);
972    evbuffer_add_uint8 (out, LTEP_HANDSHAKE);
973    evbuffer_add_buffer (out, payload);
974    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
975    dbgOutMessageLen (msgs);
976
977    /* cleanup */
978    evbuffer_free (payload);
979    tr_variantFree (&val);
980}
981
982static void
983parseLtepHandshake (tr_peerMsgs * msgs, int len, struct evbuffer * inbuf)
984{
985    int64_t   i;
986    tr_variant   val, * sub;
987    uint8_t * tmp = tr_new (uint8_t, len);
988    const uint8_t *addr;
989    size_t addr_len;
990    tr_pex pex;
991    int8_t seedProbability = -1;
992
993    memset (&pex, 0, sizeof (tr_pex));
994
995    tr_peerIoReadBytes (msgs->io, inbuf, tmp, len);
996    msgs->peerSentLtepHandshake = 1;
997
998    if (tr_variantFromBenc (&val, tmp, len) || !tr_variantIsDict (&val))
999    {
1000        dbgmsg (msgs, "GET  extended-handshake, couldn't get dictionary");
1001        tr_free (tmp);
1002        return;
1003    }
1004
1005    dbgmsg (msgs, "here is the handshake: [%*.*s]", len, len,  tmp);
1006
1007    /* does the peer prefer encrypted connections? */
1008    if (tr_variantDictFindInt (&val, TR_KEY_e, &i)) {
1009        msgs->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1010                                        : ENCRYPTION_PREFERENCE_NO;
1011        if (i)
1012            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
1013    }
1014
1015    /* check supported messages for utorrent pex */
1016    msgs->peerSupportsPex = 0;
1017    msgs->peerSupportsMetadataXfer = 0;
1018
1019    if (tr_variantDictFindDict (&val, TR_KEY_m, &sub)) {
1020        if (tr_variantDictFindInt (sub, TR_KEY_ut_pex, &i)) {
1021            msgs->peerSupportsPex = i != 0;
1022            msgs->ut_pex_id = (uint8_t) i;
1023            dbgmsg (msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id);
1024        }
1025        if (tr_variantDictFindInt (sub, TR_KEY_ut_metadata, &i)) {
1026            msgs->peerSupportsMetadataXfer = i != 0;
1027            msgs->ut_metadata_id = (uint8_t) i;
1028            dbgmsg (msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id);
1029        }
1030        if (tr_variantDictFindInt (sub, TR_KEY_ut_holepunch, &i)) {
1031            /* Mysterious µTorrent extension that we don't grok.  However,
1032               it implies support for µTP, so use it to indicate that. */
1033            tr_peerMgrSetUtpFailed (msgs->torrent,
1034                                    tr_peerIoGetAddress (msgs->io, NULL),
1035                                    false);
1036        }
1037    }
1038
1039    /* look for metainfo size (BEP 9) */
1040    if (tr_variantDictFindInt (&val, TR_KEY_metadata_size, &i)) {
1041        tr_torrentSetMetadataSizeHint (msgs->torrent, i);
1042        msgs->metadata_size_hint = (size_t) i;
1043    }
1044
1045    /* look for upload_only (BEP 21) */
1046    if (tr_variantDictFindInt (&val, TR_KEY_upload_only, &i))
1047        seedProbability = i==0 ? 0 : 100;
1048
1049    /* get peer's listening port */
1050    if (tr_variantDictFindInt (&val, TR_KEY_p, &i)) {
1051        pex.port = htons ((uint16_t)i);
1052        fireClientGotPort (msgs, pex.port);
1053        dbgmsg (msgs, "peer's port is now %d", (int)i);
1054    }
1055
1056    if (tr_peerIoIsIncoming (msgs->io)
1057        && tr_variantDictFindRaw (&val, TR_KEY_ipv4, &addr, &addr_len)
1058        && (addr_len == 4))
1059    {
1060        pex.addr.type = TR_AF_INET;
1061        memcpy (&pex.addr.addr.addr4, addr, 4);
1062        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
1063    }
1064
1065    if (tr_peerIoIsIncoming (msgs->io)
1066        && tr_variantDictFindRaw (&val, TR_KEY_ipv6, &addr, &addr_len)
1067        && (addr_len == 16))
1068    {
1069        pex.addr.type = TR_AF_INET6;
1070        memcpy (&pex.addr.addr.addr6, addr, 16);
1071        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
1072    }
1073
1074    /* get peer's maximum request queue size */
1075    if (tr_variantDictFindInt (&val, TR_KEY_reqq, &i))
1076        msgs->reqq = i;
1077
1078    tr_variantFree (&val);
1079    tr_free (tmp);
1080}
1081
1082static void
1083parseUtMetadata (tr_peerMsgs * msgs, int msglen, struct evbuffer * inbuf)
1084{
1085    tr_variant dict;
1086    char * msg_end;
1087    const char * benc_end;
1088    int64_t msg_type = -1;
1089    int64_t piece = -1;
1090    int64_t total_size = 0;
1091    uint8_t * tmp = tr_new (uint8_t, msglen);
1092
1093    tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1094    msg_end = (char*)tmp + msglen;
1095
1096    if (!tr_variantFromBencFull (&dict, tmp, msglen, NULL, &benc_end))
1097    {
1098        tr_variantDictFindInt (&dict, TR_KEY_msg_type, &msg_type);
1099        tr_variantDictFindInt (&dict, TR_KEY_piece, &piece);
1100        tr_variantDictFindInt (&dict, TR_KEY_total_size, &total_size);
1101        tr_variantFree (&dict);
1102    }
1103
1104    dbgmsg (msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
1105          (int)msg_type, (int)piece, (int)total_size);
1106
1107    if (msg_type == METADATA_MSG_TYPE_REJECT)
1108    {
1109        /* NOOP */
1110    }
1111
1112    if ((msg_type == METADATA_MSG_TYPE_DATA)
1113        && (!tr_torrentHasMetadata (msgs->torrent))
1114        && (msg_end - benc_end <= METADATA_PIECE_SIZE)
1115        && (piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size))
1116    {
1117        const int pieceLen = msg_end - benc_end;
1118        tr_torrentSetMetadataPiece (msgs->torrent, piece, benc_end, pieceLen);
1119    }
1120
1121    if (msg_type == METADATA_MSG_TYPE_REQUEST)
1122    {
1123        if ((piece >= 0)
1124            && tr_torrentHasMetadata (msgs->torrent)
1125            && !tr_torrentIsPrivate (msgs->torrent)
1126            && (msgs->peerAskedForMetadataCount < METADATA_REQQ))
1127        {
1128            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1129        }
1130        else
1131        {
1132            tr_variant tmp;
1133            struct evbuffer * payload;
1134            struct evbuffer * out = msgs->outMessages;
1135
1136            /* build the rejection message */
1137            tr_variantInitDict (&tmp, 2);
1138            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
1139            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1140            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1141
1142            /* write it out as a LTEP message to our outMessages buffer */
1143            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1144            evbuffer_add_uint8 (out, BT_LTEP);
1145            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1146            evbuffer_add_buffer (out, payload);
1147            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1148            dbgOutMessageLen (msgs);
1149
1150            /* cleanup */
1151            evbuffer_free (payload);
1152            tr_variantFree (&tmp);
1153        }
1154    }
1155
1156    tr_free (tmp);
1157}
1158
1159static void
1160parseUtPex (tr_peerMsgs * msgs, int msglen, struct evbuffer * inbuf)
1161{
1162    int loaded = 0;
1163    uint8_t * tmp = tr_new (uint8_t, msglen);
1164    tr_variant val;
1165    tr_torrent * tor = msgs->torrent;
1166    const uint8_t * added;
1167    size_t added_len;
1168
1169    tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1170
1171    if (tr_torrentAllowsPex (tor)
1172      && ((loaded = !tr_variantFromBenc (&val, tmp, msglen))))
1173    {
1174        if (tr_variantDictFindRaw (&val, TR_KEY_added, &added, &added_len))
1175        {
1176            tr_pex * pex;
1177            size_t i, n;
1178            size_t added_f_len = 0;
1179            const uint8_t * added_f = NULL;
1180
1181            tr_variantDictFindRaw (&val, TR_KEY_added_f, &added_f, &added_f_len);
1182            pex = tr_peerMgrCompactToPex (added, added_len, added_f, added_f_len, &n);
1183
1184            n = MIN (n, MAX_PEX_PEER_COUNT);
1185            for (i=0; i<n; ++i)
1186            {
1187                int seedProbability = -1;
1188                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1189                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1190            }
1191
1192            tr_free (pex);
1193        }
1194
1195        if (tr_variantDictFindRaw (&val, TR_KEY_added6, &added, &added_len))
1196        {
1197            tr_pex * pex;
1198            size_t i, n;
1199            size_t added_f_len = 0;
1200            const uint8_t * added_f = NULL;
1201
1202            tr_variantDictFindRaw (&val, TR_KEY_added6_f, &added_f, &added_f_len);
1203            pex = tr_peerMgrCompact6ToPex (added, added_len, added_f, added_f_len, &n);
1204
1205            n = MIN (n, MAX_PEX_PEER_COUNT);
1206            for (i=0; i<n; ++i)
1207            {
1208                int seedProbability = -1;
1209                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1210                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1211            }
1212
1213            tr_free (pex);
1214        }
1215    }
1216
1217    if (loaded)
1218        tr_variantFree (&val);
1219    tr_free (tmp);
1220}
1221
1222static void sendPex (tr_peerMsgs * msgs);
1223
1224static void
1225parseLtep (tr_peerMsgs * msgs, int msglen, struct evbuffer  * inbuf)
1226{
1227    uint8_t ltep_msgid;
1228
1229    tr_peerIoReadUint8 (msgs->io, inbuf, &ltep_msgid);
1230    msglen--;
1231
1232    if (ltep_msgid == LTEP_HANDSHAKE)
1233    {
1234        dbgmsg (msgs, "got ltep handshake");
1235        parseLtepHandshake (msgs, msglen, inbuf);
1236        if (tr_peerIoSupportsLTEP (msgs->io))
1237        {
1238            sendLtepHandshake (msgs);
1239            sendPex (msgs);
1240        }
1241    }
1242    else if (ltep_msgid == UT_PEX_ID)
1243    {
1244        dbgmsg (msgs, "got ut pex");
1245        msgs->peerSupportsPex = 1;
1246        parseUtPex (msgs, msglen, inbuf);
1247    }
1248    else if (ltep_msgid == UT_METADATA_ID)
1249    {
1250        dbgmsg (msgs, "got ut metadata");
1251        msgs->peerSupportsMetadataXfer = 1;
1252        parseUtMetadata (msgs, msglen, inbuf);
1253    }
1254    else
1255    {
1256        dbgmsg (msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid);
1257        evbuffer_drain (inbuf, msglen);
1258    }
1259}
1260
1261static int
1262readBtLength (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1263{
1264    uint32_t len;
1265
1266    if (inlen < sizeof (len))
1267        return READ_LATER;
1268
1269    tr_peerIoReadUint32 (msgs->io, inbuf, &len);
1270
1271    if (len == 0) /* peer sent us a keepalive message */
1272        dbgmsg (msgs, "got KeepAlive");
1273    else
1274    {
1275        msgs->incoming.length = len;
1276        msgs->state = AWAITING_BT_ID;
1277    }
1278
1279    return READ_NOW;
1280}
1281
1282static int readBtMessage (tr_peerMsgs *, struct evbuffer *, size_t);
1283
1284static int
1285readBtId (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1286{
1287    uint8_t id;
1288
1289    if (inlen < sizeof (uint8_t))
1290        return READ_LATER;
1291
1292    tr_peerIoReadUint8 (msgs->io, inbuf, &id);
1293    msgs->incoming.id = id;
1294    dbgmsg (msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length);
1295
1296    if (id == BT_PIECE)
1297    {
1298        msgs->state = AWAITING_BT_PIECE;
1299        return READ_NOW;
1300    }
1301    else if (msgs->incoming.length != 1)
1302    {
1303        msgs->state = AWAITING_BT_MESSAGE;
1304        return READ_NOW;
1305    }
1306    else return readBtMessage (msgs, inbuf, inlen - 1);
1307}
1308
1309static void
1310updatePeerProgress (tr_peerMsgs * msgs)
1311{
1312  tr_peerUpdateProgress (msgs->torrent, &msgs->peer);
1313
1314  /*updateFastSet (msgs);*/
1315  updateInterest (msgs);
1316}
1317
1318static void
1319prefetchPieces (tr_peerMsgs *msgs)
1320{
1321  int i;
1322
1323  if (!getSession (msgs)->isPrefetchEnabled)
1324    return;
1325
1326  for (i=msgs->prefetchCount; i<msgs->peer.pendingReqsToClient && i<PREFETCH_SIZE; ++i)
1327    {
1328      const struct peer_request * req = msgs->peerAskedFor + i;
1329      if (requestIsValid (msgs, req))
1330        {
1331          tr_cachePrefetchBlock (getSession (msgs)->cache, msgs->torrent, req->index, req->offset, req->length);
1332          ++msgs->prefetchCount;
1333        }
1334    }
1335}
1336
1337static void
1338peerMadeRequest (tr_peerMsgs * msgs, const struct peer_request * req)
1339{
1340    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1341    const int reqIsValid = requestIsValid (msgs, req);
1342    const int clientHasPiece = reqIsValid && tr_torrentPieceIsComplete (msgs->torrent, req->index);
1343    const int peerIsChoked = msgs->peer_is_choked;
1344
1345    int allow = false;
1346
1347    if (!reqIsValid)
1348        dbgmsg (msgs, "rejecting an invalid request.");
1349    else if (!clientHasPiece)
1350        dbgmsg (msgs, "rejecting request for a piece we don't have.");
1351    else if (peerIsChoked)
1352        dbgmsg (msgs, "rejecting request from choked peer");
1353    else if (msgs->peer.pendingReqsToClient + 1 >= REQQ)
1354        dbgmsg (msgs, "rejecting request ... reqq is full");
1355    else
1356        allow = true;
1357
1358    if (allow) {
1359        msgs->peerAskedFor[msgs->peer.pendingReqsToClient++] = *req;
1360        prefetchPieces (msgs);
1361    } else if (fext) {
1362        protocolSendReject (msgs, req);
1363    }
1364}
1365
1366static bool
1367messageLengthIsCorrect (const tr_peerMsgs * msg, uint8_t id, uint32_t len)
1368{
1369    switch (id)
1370    {
1371        case BT_CHOKE:
1372        case BT_UNCHOKE:
1373        case BT_INTERESTED:
1374        case BT_NOT_INTERESTED:
1375        case BT_FEXT_HAVE_ALL:
1376        case BT_FEXT_HAVE_NONE:
1377            return len == 1;
1378
1379        case BT_HAVE:
1380        case BT_FEXT_SUGGEST:
1381        case BT_FEXT_ALLOWED_FAST:
1382            return len == 5;
1383
1384        case BT_BITFIELD:
1385            if (tr_torrentHasMetadata (msg->torrent))
1386                return len == (msg->torrent->info.pieceCount + 7u) / 8u + 1u;
1387            /* we don't know the piece count yet,
1388               so we can only guess whether to send true or false */
1389            if (msg->metadata_size_hint > 0)
1390                return len <= msg->metadata_size_hint;
1391            return true;
1392
1393        case BT_REQUEST:
1394        case BT_CANCEL:
1395        case BT_FEXT_REJECT:
1396            return len == 13;
1397
1398        case BT_PIECE:
1399            return len > 9 && len <= 16393;
1400
1401        case BT_PORT:
1402            return len == 3;
1403
1404        case BT_LTEP:
1405            return len >= 2;
1406
1407        default:
1408            return false;
1409    }
1410}
1411
1412static int clientGotBlock (tr_peerMsgs *               msgs,
1413                           struct evbuffer *           block,
1414                           const struct peer_request * req);
1415
1416static int
1417readBtPiece (tr_peerMsgs      * msgs,
1418             struct evbuffer  * inbuf,
1419             size_t             inlen,
1420             size_t           * setme_piece_bytes_read)
1421{
1422    struct peer_request * req = &msgs->incoming.blockReq;
1423
1424    assert (evbuffer_get_length (inbuf) >= inlen);
1425    dbgmsg (msgs, "In readBtPiece");
1426
1427    if (!req->length)
1428    {
1429        if (inlen < 8)
1430            return READ_LATER;
1431
1432        tr_peerIoReadUint32 (msgs->io, inbuf, &req->index);
1433        tr_peerIoReadUint32 (msgs->io, inbuf, &req->offset);
1434        req->length = msgs->incoming.length - 9;
1435        dbgmsg (msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length);
1436        return READ_NOW;
1437    }
1438    else
1439    {
1440        int err;
1441        size_t n;
1442        size_t nLeft;
1443        struct evbuffer * block_buffer;
1444
1445        if (msgs->incoming.block == NULL)
1446            msgs->incoming.block = evbuffer_new ();
1447        block_buffer = msgs->incoming.block;
1448
1449        /* read in another chunk of data */
1450        nLeft = req->length - evbuffer_get_length (block_buffer);
1451        n = MIN (nLeft, inlen);
1452
1453        tr_peerIoReadBytesToBuf (msgs->io, inbuf, block_buffer, n);
1454
1455        fireClientGotPieceData (msgs, n);
1456        *setme_piece_bytes_read += n;
1457        dbgmsg (msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1458               n, req->index, req->offset, req->length,
1459             (int)(req->length - evbuffer_get_length (block_buffer)));
1460        if (evbuffer_get_length (block_buffer) < req->length)
1461            return READ_LATER;
1462
1463        /* pass the block along... */
1464        err = clientGotBlock (msgs, block_buffer, req);
1465        evbuffer_drain (block_buffer, evbuffer_get_length (block_buffer));
1466
1467        /* cleanup */
1468        req->length = 0;
1469        msgs->state = AWAITING_BT_LENGTH;
1470        return err ? READ_ERR : READ_NOW;
1471    }
1472}
1473
1474static void updateDesiredRequestCount (tr_peerMsgs * msgs);
1475
1476static int
1477readBtMessage (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1478{
1479    uint32_t      ui32;
1480    uint32_t      msglen = msgs->incoming.length;
1481    const uint8_t id = msgs->incoming.id;
1482#ifndef NDEBUG
1483    const size_t  startBufLen = evbuffer_get_length (inbuf);
1484#endif
1485    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1486
1487    --msglen; /* id length */
1488
1489    dbgmsg (msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen);
1490
1491    if (inlen < msglen)
1492        return READ_LATER;
1493
1494    if (!messageLengthIsCorrect (msgs, id, msglen + 1))
1495    {
1496        dbgmsg (msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen);
1497        fireError (msgs, EMSGSIZE);
1498        return READ_ERR;
1499    }
1500
1501    switch (id)
1502    {
1503        case BT_CHOKE:
1504            dbgmsg (msgs, "got Choke");
1505            msgs->client_is_choked = true;
1506            if (!fext)
1507                fireGotChoke (msgs);
1508            tr_peerMsgsUpdateActive (msgs, TR_PEER_TO_CLIENT);
1509            break;
1510
1511        case BT_UNCHOKE:
1512            dbgmsg (msgs, "got Unchoke");
1513            msgs->client_is_choked = false;
1514            tr_peerMsgsUpdateActive (msgs, TR_PEER_TO_CLIENT);
1515            updateDesiredRequestCount (msgs);
1516            break;
1517
1518        case BT_INTERESTED:
1519            dbgmsg (msgs, "got Interested");
1520            msgs->peer_is_interested = true;
1521            tr_peerMsgsUpdateActive (msgs, TR_CLIENT_TO_PEER);
1522            break;
1523
1524        case BT_NOT_INTERESTED:
1525            dbgmsg (msgs, "got Not Interested");
1526            msgs->peer_is_interested = false;
1527            tr_peerMsgsUpdateActive (msgs, TR_CLIENT_TO_PEER);
1528            break;
1529
1530        case BT_HAVE:
1531            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1532            dbgmsg (msgs, "got Have: %u", ui32);
1533            if (tr_torrentHasMetadata (msgs->torrent)
1534                    && (ui32 >= msgs->torrent->info.pieceCount))
1535            {
1536                fireError (msgs, ERANGE);
1537                return READ_ERR;
1538            }
1539
1540            /* a peer can send the same HAVE message twice... */
1541            if (!tr_bitfieldHas (&msgs->peer.have, ui32)) {
1542                tr_bitfieldAdd (&msgs->peer.have, ui32);
1543                fireClientGotHave (msgs, ui32);
1544            }
1545            updatePeerProgress (msgs);
1546            break;
1547
1548        case BT_BITFIELD: {
1549            uint8_t * tmp = tr_new (uint8_t, msglen);
1550            dbgmsg (msgs, "got a bitfield");
1551            tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1552            tr_bitfieldSetRaw (&msgs->peer.have, tmp, msglen, tr_torrentHasMetadata (msgs->torrent));
1553            fireClientGotBitfield (msgs, &msgs->peer.have);
1554            updatePeerProgress (msgs);
1555            tr_free (tmp);
1556            break;
1557        }
1558
1559        case BT_REQUEST:
1560        {
1561            struct peer_request r;
1562            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1563            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1564            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1565            dbgmsg (msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length);
1566            peerMadeRequest (msgs, &r);
1567            break;
1568        }
1569
1570        case BT_CANCEL:
1571        {
1572            int i;
1573            struct peer_request r;
1574            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1575            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1576            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1577            tr_historyAdd (&msgs->peer.cancelsSentToClient, tr_time (), 1);
1578            dbgmsg (msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length);
1579
1580            for (i=0; i<msgs->peer.pendingReqsToClient; ++i) {
1581                const struct peer_request * req = msgs->peerAskedFor + i;
1582                if ((req->index == r.index) && (req->offset == r.offset) && (req->length == r.length))
1583                    break;
1584            }
1585
1586            if (i < msgs->peer.pendingReqsToClient)
1587                tr_removeElementFromArray (msgs->peerAskedFor, i, sizeof (struct peer_request),
1588                                           msgs->peer.pendingReqsToClient--);
1589            break;
1590        }
1591
1592        case BT_PIECE:
1593            assert (0); /* handled elsewhere! */
1594            break;
1595
1596        case BT_PORT:
1597            dbgmsg (msgs, "Got a BT_PORT");
1598            tr_peerIoReadUint16 (msgs->io, inbuf, &msgs->dht_port);
1599            if (msgs->dht_port > 0)
1600                tr_dhtAddNode (getSession (msgs),
1601                               tr_peerAddress (&msgs->peer),
1602                               msgs->dht_port, 0);
1603            break;
1604
1605        case BT_FEXT_SUGGEST:
1606            dbgmsg (msgs, "Got a BT_FEXT_SUGGEST");
1607            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1608            if (fext)
1609                fireClientGotSuggest (msgs, ui32);
1610            else {
1611                fireError (msgs, EMSGSIZE);
1612                return READ_ERR;
1613            }
1614            break;
1615
1616        case BT_FEXT_ALLOWED_FAST:
1617            dbgmsg (msgs, "Got a BT_FEXT_ALLOWED_FAST");
1618            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1619            if (fext)
1620                fireClientGotAllowedFast (msgs, ui32);
1621            else {
1622                fireError (msgs, EMSGSIZE);
1623                return READ_ERR;
1624            }
1625            break;
1626
1627        case BT_FEXT_HAVE_ALL:
1628            dbgmsg (msgs, "Got a BT_FEXT_HAVE_ALL");
1629            if (fext) {
1630                tr_bitfieldSetHasAll (&msgs->peer.have);
1631assert (tr_bitfieldHasAll (&msgs->peer.have));
1632                fireClientGotHaveAll (msgs);
1633                updatePeerProgress (msgs);
1634            } else {
1635                fireError (msgs, EMSGSIZE);
1636                return READ_ERR;
1637            }
1638            break;
1639
1640        case BT_FEXT_HAVE_NONE:
1641            dbgmsg (msgs, "Got a BT_FEXT_HAVE_NONE");
1642            if (fext) {
1643                tr_bitfieldSetHasNone (&msgs->peer.have);
1644                fireClientGotHaveNone (msgs);
1645                updatePeerProgress (msgs);
1646            } else {
1647                fireError (msgs, EMSGSIZE);
1648                return READ_ERR;
1649            }
1650            break;
1651
1652        case BT_FEXT_REJECT:
1653        {
1654            struct peer_request r;
1655            dbgmsg (msgs, "Got a BT_FEXT_REJECT");
1656            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1657            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1658            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1659            if (fext)
1660                fireGotRej (msgs, &r);
1661            else {
1662                fireError (msgs, EMSGSIZE);
1663                return READ_ERR;
1664            }
1665            break;
1666        }
1667
1668        case BT_LTEP:
1669            dbgmsg (msgs, "Got a BT_LTEP");
1670            parseLtep (msgs, msglen, inbuf);
1671            break;
1672
1673        default:
1674            dbgmsg (msgs, "peer sent us an UNKNOWN: %d", (int)id);
1675            tr_peerIoDrain (msgs->io, inbuf, msglen);
1676            break;
1677    }
1678
1679    assert (msglen + 1 == msgs->incoming.length);
1680    assert (evbuffer_get_length (inbuf) == startBufLen - msglen);
1681
1682    msgs->state = AWAITING_BT_LENGTH;
1683    return READ_NOW;
1684}
1685
1686/* returns 0 on success, or an errno on failure */
1687static int
1688clientGotBlock (tr_peerMsgs                * msgs,
1689                struct evbuffer            * data,
1690                const struct peer_request  * req)
1691{
1692    int err;
1693    tr_torrent * tor = msgs->torrent;
1694    const tr_block_index_t block = _tr_block (tor, req->index, req->offset);
1695
1696    assert (msgs);
1697    assert (req);
1698
1699    if (req->length != tr_torBlockCountBytes (msgs->torrent, block)) {
1700        dbgmsg (msgs, "wrong block size -- expected %u, got %d",
1701                tr_torBlockCountBytes (msgs->torrent, block), req->length);
1702        return EMSGSIZE;
1703    }
1704
1705    dbgmsg (msgs, "got block %u:%u->%u", req->index, req->offset, req->length);
1706
1707    if (!tr_peerMgrDidPeerRequest (msgs->torrent, &msgs->peer, block)) {
1708        dbgmsg (msgs, "we didn't ask for this message...");
1709        return 0;
1710    }
1711    if (tr_torrentPieceIsComplete (msgs->torrent, req->index)) {
1712        dbgmsg (msgs, "we did ask for this message, but the piece is already complete...");
1713        return 0;
1714    }
1715
1716    /**
1717    ***  Save the block
1718    **/
1719
1720    if ((err = tr_cacheWriteBlock (getSession (msgs)->cache, tor, req->index, req->offset, req->length, data)))
1721        return err;
1722
1723    tr_bitfieldAdd (&msgs->peer.blame, req->index);
1724    fireGotBlock (msgs, req);
1725    return 0;
1726}
1727
1728static int peerPulse (void * vmsgs);
1729
1730static void
1731didWrite (tr_peerIo * io UNUSED, size_t bytesWritten, bool wasPieceData, void * vmsgs)
1732{
1733    tr_peerMsgs * msgs = vmsgs;
1734
1735    if (wasPieceData)
1736      firePeerGotPieceData (msgs, bytesWritten);
1737
1738    if (tr_isPeerIo (io) && io->userData)
1739        peerPulse (msgs);
1740}
1741
1742static ReadState
1743canRead (tr_peerIo * io, void * vmsgs, size_t * piece)
1744{
1745    ReadState         ret;
1746    tr_peerMsgs *     msgs = vmsgs;
1747    struct evbuffer * in = tr_peerIoGetReadBuffer (io);
1748    const size_t      inlen = evbuffer_get_length (in);
1749
1750    dbgmsg (msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state);
1751
1752    if (!inlen)
1753    {
1754        ret = READ_LATER;
1755    }
1756    else if (msgs->state == AWAITING_BT_PIECE)
1757    {
1758        ret = readBtPiece (msgs, in, inlen, piece);
1759    }
1760    else switch (msgs->state)
1761    {
1762        case AWAITING_BT_LENGTH:
1763            ret = readBtLength (msgs, in, inlen); break;
1764
1765        case AWAITING_BT_ID:
1766            ret = readBtId   (msgs, in, inlen); break;
1767
1768        case AWAITING_BT_MESSAGE:
1769            ret = readBtMessage (msgs, in, inlen); break;
1770
1771        default:
1772            ret = READ_ERR;
1773            assert (0);
1774    }
1775
1776    dbgmsg (msgs, "canRead: ret is %d", (int)ret);
1777
1778    return ret;
1779}
1780
1781int
1782tr_peerMsgsIsReadingBlock (const tr_peerMsgs * msgs, tr_block_index_t block)
1783{
1784    if (msgs->state != AWAITING_BT_PIECE)
1785        return false;
1786
1787    return block == _tr_block (msgs->torrent,
1788                               msgs->incoming.blockReq.index,
1789                               msgs->incoming.blockReq.offset);
1790}
1791
1792/**
1793***
1794**/
1795
1796static void
1797updateDesiredRequestCount (tr_peerMsgs * msgs)
1798{
1799    tr_torrent * const torrent = msgs->torrent;
1800
1801    /* there are lots of reasons we might not want to request any blocks... */
1802    if (tr_torrentIsSeed (torrent) || !tr_torrentHasMetadata (torrent)
1803                                    || msgs->client_is_choked
1804                                    || !msgs->client_is_interested)
1805    {
1806        msgs->desiredRequestCount = 0;
1807    }
1808    else
1809    {
1810        int estimatedBlocksInPeriod;
1811        unsigned int rate_Bps;
1812        unsigned int irate_Bps;
1813        const int floor = 4;
1814        const int seconds = REQUEST_BUF_SECS;
1815        const uint64_t now = tr_time_msec ();
1816
1817        /* Get the rate limit we should use.
1818         * FIXME: this needs to consider all the other peers as well... */
1819        rate_Bps = tr_peerGetPieceSpeed_Bps (&msgs->peer, now, TR_PEER_TO_CLIENT);
1820        if (tr_torrentUsesSpeedLimit (torrent, TR_PEER_TO_CLIENT))
1821            rate_Bps = MIN (rate_Bps, tr_torrentGetSpeedLimit_Bps (torrent, TR_PEER_TO_CLIENT));
1822
1823        /* honor the session limits, if enabled */
1824        if (tr_torrentUsesSessionLimits (torrent) &&
1825            tr_sessionGetActiveSpeedLimit_Bps (torrent->session, TR_PEER_TO_CLIENT, &irate_Bps))
1826                rate_Bps = MIN (rate_Bps, irate_Bps);
1827
1828        /* use this desired rate to figure out how
1829         * many requests we should send to this peer */
1830        estimatedBlocksInPeriod = (rate_Bps * seconds) / torrent->blockSize;
1831        msgs->desiredRequestCount = MAX (floor, estimatedBlocksInPeriod);
1832
1833        /* honor the peer's maximum request count, if specified */
1834        if (msgs->reqq > 0)
1835            if (msgs->desiredRequestCount > msgs->reqq)
1836                msgs->desiredRequestCount = msgs->reqq;
1837    }
1838}
1839
1840static void
1841updateMetadataRequests (tr_peerMsgs * msgs, time_t now)
1842{
1843    int piece;
1844
1845    if (msgs->peerSupportsMetadataXfer
1846        && tr_torrentGetNextMetadataRequest (msgs->torrent, now, &piece))
1847    {
1848        tr_variant tmp;
1849        struct evbuffer * payload;
1850        struct evbuffer * out = msgs->outMessages;
1851
1852        /* build the data message */
1853        tr_variantInitDict (&tmp, 3);
1854        tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REQUEST);
1855        tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1856        payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1857
1858        dbgmsg (msgs, "requesting metadata piece #%d", piece);
1859
1860        /* write it out as a LTEP message to our outMessages buffer */
1861        evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1862        evbuffer_add_uint8 (out, BT_LTEP);
1863        evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1864        evbuffer_add_buffer (out, payload);
1865        pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1866        dbgOutMessageLen (msgs);
1867
1868        /* cleanup */
1869        evbuffer_free (payload);
1870        tr_variantFree (&tmp);
1871    }
1872}
1873
1874static void
1875updateBlockRequests (tr_peerMsgs * msgs)
1876{
1877    if (tr_torrentIsPieceTransferAllowed (msgs->torrent, TR_PEER_TO_CLIENT)
1878        && (msgs->desiredRequestCount > 0)
1879        && (msgs->peer.pendingReqsToPeer <= (msgs->desiredRequestCount * 0.66)))
1880    {
1881        int i;
1882        int n;
1883        tr_block_index_t * blocks;
1884        const int numwant = msgs->desiredRequestCount - msgs->peer.pendingReqsToPeer;
1885
1886        assert (tr_peerMsgsIsClientInterested (msgs));
1887        assert (!tr_peerMsgsIsClientChoked (msgs));
1888
1889        blocks = tr_new (tr_block_index_t, numwant);
1890        tr_peerMgrGetNextRequests (msgs->torrent, &msgs->peer, numwant, blocks, &n, false);
1891
1892        for (i=0; i<n; ++i)
1893        {
1894            struct peer_request req;
1895            blockToReq (msgs->torrent, blocks[i], &req);
1896            protocolSendRequest (msgs, &req);
1897        }
1898
1899        tr_free (blocks);
1900    }
1901}
1902
1903static size_t
1904fillOutputBuffer (tr_peerMsgs * msgs, time_t now)
1905{
1906    int piece;
1907    size_t bytesWritten = 0;
1908    struct peer_request req;
1909    const bool haveMessages = evbuffer_get_length (msgs->outMessages) != 0;
1910    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1911
1912    /**
1913    ***  Protocol messages
1914    **/
1915
1916    if (haveMessages && !msgs->outMessagesBatchedAt) /* fresh batch */
1917    {
1918        dbgmsg (msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length (msgs->outMessages));
1919        msgs->outMessagesBatchedAt = now;
1920    }
1921    else if (haveMessages && ((now - msgs->outMessagesBatchedAt) >= msgs->outMessagesBatchPeriod))
1922    {
1923        const size_t len = evbuffer_get_length (msgs->outMessages);
1924        /* flush the protocol messages */
1925        dbgmsg (msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len);
1926        tr_peerIoWriteBuf (msgs->io, msgs->outMessages, false);
1927        msgs->clientSentAnythingAt = now;
1928        msgs->outMessagesBatchedAt = 0;
1929        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1930        bytesWritten +=  len;
1931    }
1932
1933    /**
1934    ***  Metadata Pieces
1935    **/
1936
1937    if ((tr_peerIoGetWriteBufferSpace (msgs->io, now) >= METADATA_PIECE_SIZE)
1938        && popNextMetadataRequest (msgs, &piece))
1939    {
1940        char * data;
1941        int dataLen;
1942        bool ok = false;
1943
1944        data = tr_torrentGetMetadataPiece (msgs->torrent, piece, &dataLen);
1945        if ((dataLen > 0) && (data != NULL))
1946        {
1947            tr_variant tmp;
1948            struct evbuffer * payload;
1949            struct evbuffer * out = msgs->outMessages;
1950
1951            /* build the data message */
1952            tr_variantInitDict (&tmp, 3);
1953            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_DATA);
1954            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1955            tr_variantDictAddInt (&tmp, TR_KEY_total_size, msgs->torrent->infoDictLength);
1956            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1957
1958            /* write it out as a LTEP message to our outMessages buffer */
1959            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload) + dataLen);
1960            evbuffer_add_uint8 (out, BT_LTEP);
1961            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1962            evbuffer_add_buffer (out, payload);
1963            evbuffer_add     (out, data, dataLen);
1964            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1965            dbgOutMessageLen (msgs);
1966
1967            evbuffer_free (payload);
1968            tr_variantFree (&tmp);
1969            tr_free (data);
1970
1971            ok = true;
1972        }
1973
1974        if (!ok) /* send a rejection message */
1975        {
1976            tr_variant tmp;
1977            struct evbuffer * payload;
1978            struct evbuffer * out = msgs->outMessages;
1979
1980            /* build the rejection message */
1981            tr_variantInitDict (&tmp, 2);
1982            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
1983            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1984            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1985
1986            /* write it out as a LTEP message to our outMessages buffer */
1987            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1988            evbuffer_add_uint8 (out, BT_LTEP);
1989            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1990            evbuffer_add_buffer (out, payload);
1991            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1992            dbgOutMessageLen (msgs);
1993
1994            evbuffer_free (payload);
1995            tr_variantFree (&tmp);
1996        }
1997    }
1998
1999    /**
2000    ***  Data Blocks
2001    **/
2002
2003    if ((tr_peerIoGetWriteBufferSpace (msgs->io, now) >= msgs->torrent->blockSize)
2004        && popNextRequest (msgs, &req))
2005    {
2006        --msgs->prefetchCount;
2007
2008        if (requestIsValid (msgs, &req)
2009            && tr_torrentPieceIsComplete (msgs->torrent, req.index))
2010        {
2011            int err;
2012            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
2013            struct evbuffer * out;
2014            struct evbuffer_iovec iovec[1];
2015
2016            out = evbuffer_new ();
2017            evbuffer_expand (out, msglen);
2018
2019            evbuffer_add_uint32 (out, sizeof (uint8_t) + 2 * sizeof (uint32_t) + req.length);
2020            evbuffer_add_uint8 (out, BT_PIECE);
2021            evbuffer_add_uint32 (out, req.index);
2022            evbuffer_add_uint32 (out, req.offset);
2023
2024            evbuffer_reserve_space (out, req.length, iovec, 1);
2025            err = tr_cacheReadBlock (getSession (msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base);
2026            iovec[0].iov_len = req.length;
2027            evbuffer_commit_space (out, iovec, 1);
2028
2029            /* check the piece if it needs checking... */
2030            if (!err && tr_torrentPieceNeedsCheck (msgs->torrent, req.index))
2031                if ((err = !tr_torrentCheckPiece (msgs->torrent, req.index)))
2032                    tr_torrentSetLocalError (msgs->torrent, _("Please Verify Local Data! Piece #%zu is corrupt."), (size_t)req.index);
2033
2034            if (err)
2035            {
2036                if (fext)
2037                    protocolSendReject (msgs, &req);
2038            }
2039            else
2040            {
2041                const size_t n = evbuffer_get_length (out);
2042                dbgmsg (msgs, "sending block %u:%u->%u", req.index, req.offset, req.length);
2043                assert (n == msglen);
2044                tr_peerIoWriteBuf (msgs->io, out, true);
2045                bytesWritten += n;
2046                msgs->clientSentAnythingAt = now;
2047                tr_historyAdd (&msgs->peer.blocksSentToPeer, tr_time (), 1);
2048            }
2049
2050            evbuffer_free (out);
2051
2052            if (err)
2053            {
2054                bytesWritten = 0;
2055                msgs = NULL;
2056            }
2057        }
2058        else if (fext) /* peer needs a reject message */
2059        {
2060            protocolSendReject (msgs, &req);
2061        }
2062
2063        if (msgs != NULL)
2064            prefetchPieces (msgs);
2065    }
2066
2067    /**
2068    ***  Keepalive
2069    **/
2070
2071    if ((msgs != NULL)
2072        && (msgs->clientSentAnythingAt != 0)
2073        && ((now - msgs->clientSentAnythingAt) > KEEPALIVE_INTERVAL_SECS))
2074    {
2075        dbgmsg (msgs, "sending a keepalive message");
2076        evbuffer_add_uint32 (msgs->outMessages, 0);
2077        pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2078    }
2079
2080    return bytesWritten;
2081}
2082
2083static int
2084peerPulse (void * vmsgs)
2085{
2086    tr_peerMsgs * msgs = vmsgs;
2087    const time_t  now = tr_time ();
2088
2089    if (tr_isPeerIo (msgs->io)) {
2090        updateDesiredRequestCount (msgs);
2091        updateBlockRequests (msgs);
2092        updateMetadataRequests (msgs, now);
2093    }
2094
2095    for (;;)
2096        if (fillOutputBuffer (msgs, now) < 1)
2097            break;
2098
2099    return true; /* loop forever */
2100}
2101
2102void
2103tr_peerMsgsPulse (tr_peerMsgs * msgs)
2104{
2105    if (msgs != NULL)
2106        peerPulse (msgs);
2107}
2108
2109static void
2110gotError (tr_peerIo * io UNUSED, short what, void * vmsgs)
2111{
2112    if (what & BEV_EVENT_TIMEOUT)
2113        dbgmsg (vmsgs, "libevent got a timeout, what=%hd", what);
2114    if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR))
2115        dbgmsg (vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2116               what, errno, tr_strerror (errno));
2117    fireError (vmsgs, ENOTCONN);
2118}
2119
2120static void
2121sendBitfield (tr_peerMsgs * msgs)
2122{
2123    void * bytes;
2124    size_t byte_count = 0;
2125    struct evbuffer * out = msgs->outMessages;
2126
2127    assert (tr_torrentHasMetadata (msgs->torrent));
2128
2129    bytes = tr_torrentCreatePieceBitfield (msgs->torrent, &byte_count);
2130    evbuffer_add_uint32 (out, sizeof (uint8_t) + byte_count);
2131    evbuffer_add_uint8 (out, BT_BITFIELD);
2132    evbuffer_add     (out, bytes, byte_count);
2133    dbgmsg (msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length (out));
2134    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2135
2136    tr_free (bytes);
2137}
2138
2139static void
2140tellPeerWhatWeHave (tr_peerMsgs * msgs)
2141{
2142    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
2143
2144    if (fext && tr_torrentHasAll (msgs->torrent))
2145    {
2146        protocolSendHaveAll (msgs);
2147    }
2148    else if (fext && tr_torrentHasNone (msgs->torrent))
2149    {
2150        protocolSendHaveNone (msgs);
2151    }
2152    else if (!tr_torrentHasNone (msgs->torrent))
2153    {
2154        sendBitfield (msgs);
2155    }
2156}
2157
2158/**
2159***
2160**/
2161
2162/* some peers give us error messages if we send
2163   more than this many peers in a single pex message
2164   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2165#define MAX_PEX_ADDED 50
2166#define MAX_PEX_DROPPED 50
2167
2168typedef struct
2169{
2170    tr_pex *  added;
2171    tr_pex *  dropped;
2172    tr_pex *  elements;
2173    int       addedCount;
2174    int       droppedCount;
2175    int       elementCount;
2176}
2177PexDiffs;
2178
2179static void
2180pexAddedCb (void * vpex, void * userData)
2181{
2182    PexDiffs * diffs = userData;
2183    tr_pex *   pex = vpex;
2184
2185    if (diffs->addedCount < MAX_PEX_ADDED)
2186    {
2187        diffs->added[diffs->addedCount++] = *pex;
2188        diffs->elements[diffs->elementCount++] = *pex;
2189    }
2190}
2191
2192static inline void
2193pexDroppedCb (void * vpex, void * userData)
2194{
2195    PexDiffs * diffs = userData;
2196    tr_pex *   pex = vpex;
2197
2198    if (diffs->droppedCount < MAX_PEX_DROPPED)
2199    {
2200        diffs->dropped[diffs->droppedCount++] = *pex;
2201    }
2202}
2203
2204static inline void
2205pexElementCb (void * vpex, void * userData)
2206{
2207    PexDiffs * diffs = userData;
2208    tr_pex * pex = vpex;
2209
2210    diffs->elements[diffs->elementCount++] = *pex;
2211}
2212
2213typedef void (tr_set_func)(void * element, void * userData);
2214
2215/**
2216 * @brief find the differences and commonalities in two sorted sets
2217 * @param a the first set
2218 * @param aCount the number of elements in the set 'a'
2219 * @param b the second set
2220 * @param bCount the number of elements in the set 'b'
2221 * @param compare the sorting method for both sets
2222 * @param elementSize the sizeof the element in the two sorted sets
2223 * @param in_a called for items in set 'a' but not set 'b'
2224 * @param in_b called for items in set 'b' but not set 'a'
2225 * @param in_both called for items that are in both sets
2226 * @param userData user data passed along to in_a, in_b, and in_both
2227 */
2228static void
2229tr_set_compare (const void * va, size_t aCount,
2230                const void * vb, size_t bCount,
2231                int compare (const void * a, const void * b),
2232                size_t elementSize,
2233                tr_set_func in_a_cb,
2234                tr_set_func in_b_cb,
2235                tr_set_func in_both_cb,
2236                void * userData)
2237{
2238    const uint8_t * a = va;
2239    const uint8_t * b = vb;
2240    const uint8_t * aend = a + elementSize * aCount;
2241    const uint8_t * bend = b + elementSize * bCount;
2242
2243    while (a != aend || b != bend)
2244    {
2245        if (a == aend)
2246        {
2247          (*in_b_cb)((void*)b, userData);
2248            b += elementSize;
2249        }
2250        else if (b == bend)
2251        {
2252          (*in_a_cb)((void*)a, userData);
2253            a += elementSize;
2254        }
2255        else
2256        {
2257            const int val = (*compare)(a, b);
2258
2259            if (!val)
2260            {
2261              (*in_both_cb)((void*)a, userData);
2262                a += elementSize;
2263                b += elementSize;
2264            }
2265            else if (val < 0)
2266            {
2267              (*in_a_cb)((void*)a, userData);
2268                a += elementSize;
2269            }
2270            else if (val > 0)
2271            {
2272              (*in_b_cb)((void*)b, userData);
2273                b += elementSize;
2274            }
2275        }
2276    }
2277}
2278
2279
2280static void
2281sendPex (tr_peerMsgs * msgs)
2282{
2283    if (msgs->peerSupportsPex && tr_torrentAllowsPex (msgs->torrent))
2284    {
2285        PexDiffs diffs;
2286        PexDiffs diffs6;
2287        tr_pex * newPex = NULL;
2288        tr_pex * newPex6 = NULL;
2289        const int newCount = tr_peerMgrGetPeers (msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2290        const int newCount6 = tr_peerMgrGetPeers (msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2291
2292        /* build the diffs */
2293        diffs.added = tr_new (tr_pex, newCount);
2294        diffs.addedCount = 0;
2295        diffs.dropped = tr_new (tr_pex, msgs->pexCount);
2296        diffs.droppedCount = 0;
2297        diffs.elements = tr_new (tr_pex, newCount + msgs->pexCount);
2298        diffs.elementCount = 0;
2299        tr_set_compare (msgs->pex, msgs->pexCount,
2300                        newPex, newCount,
2301                        tr_pexCompare, sizeof (tr_pex),
2302                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs);
2303        diffs6.added = tr_new (tr_pex, newCount6);
2304        diffs6.addedCount = 0;
2305        diffs6.dropped = tr_new (tr_pex, msgs->pexCount6);
2306        diffs6.droppedCount = 0;
2307        diffs6.elements = tr_new (tr_pex, newCount6 + msgs->pexCount6);
2308        diffs6.elementCount = 0;
2309        tr_set_compare (msgs->pex6, msgs->pexCount6,
2310                        newPex6, newCount6,
2311                        tr_pexCompare, sizeof (tr_pex),
2312                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6);
2313        dbgmsg (
2314            msgs,
2315            "pex: old peer count %d+%d, new peer count %d+%d, "
2316            "added %d+%d, removed %d+%d",
2317            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2318            diffs.addedCount, diffs6.addedCount,
2319            diffs.droppedCount, diffs6.droppedCount);
2320
2321        if (!diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2322            !diffs6.droppedCount)
2323        {
2324            tr_free (diffs.elements);
2325            tr_free (diffs6.elements);
2326        }
2327        else
2328        {
2329            int  i;
2330            tr_variant val;
2331            uint8_t * tmp, *walk;
2332            struct evbuffer * payload;
2333            struct evbuffer * out = msgs->outMessages;
2334
2335            /* update peer */
2336            tr_free (msgs->pex);
2337            msgs->pex = diffs.elements;
2338            msgs->pexCount = diffs.elementCount;
2339            tr_free (msgs->pex6);
2340            msgs->pex6 = diffs6.elements;
2341            msgs->pexCount6 = diffs6.elementCount;
2342
2343            /* build the pex payload */
2344            tr_variantInitDict (&val, 3); /* ipv6 support: left as 3:
2345                                         * speed vs. likelihood? */
2346
2347            if (diffs.addedCount > 0)
2348            {
2349                /* "added" */
2350                tmp = walk = tr_new (uint8_t, diffs.addedCount * 6);
2351                for (i = 0; i < diffs.addedCount; ++i) {
2352                    memcpy (walk, &diffs.added[i].addr.addr, 4); walk += 4;
2353                    memcpy (walk, &diffs.added[i].port, 2); walk += 2;
2354                }
2355                assert ((walk - tmp) == diffs.addedCount * 6);
2356                tr_variantDictAddRaw (&val, TR_KEY_added, tmp, walk - tmp);
2357                tr_free (tmp);
2358
2359                /* "added.f"
2360                 * unset each holepunch flag because we don't support it. */
2361                tmp = walk = tr_new (uint8_t, diffs.addedCount);
2362                for (i = 0; i < diffs.addedCount; ++i)
2363                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2364                assert ((walk - tmp) == diffs.addedCount);
2365                tr_variantDictAddRaw (&val, TR_KEY_added_f, tmp, walk - tmp);
2366                tr_free (tmp);
2367            }
2368
2369            if (diffs.droppedCount > 0)
2370            {
2371                /* "dropped" */
2372                tmp = walk = tr_new (uint8_t, diffs.droppedCount * 6);
2373                for (i = 0; i < diffs.droppedCount; ++i) {
2374                    memcpy (walk, &diffs.dropped[i].addr.addr, 4); walk += 4;
2375                    memcpy (walk, &diffs.dropped[i].port, 2); walk += 2;
2376                }
2377                assert ((walk - tmp) == diffs.droppedCount * 6);
2378                tr_variantDictAddRaw (&val, TR_KEY_dropped, tmp, walk - tmp);
2379                tr_free (tmp);
2380            }
2381
2382            if (diffs6.addedCount > 0)
2383            {
2384                /* "added6" */
2385                tmp = walk = tr_new (uint8_t, diffs6.addedCount * 18);
2386                for (i = 0; i < diffs6.addedCount; ++i) {
2387                    memcpy (walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16);
2388                    walk += 16;
2389                    memcpy (walk, &diffs6.added[i].port, 2);
2390                    walk += 2;
2391                }
2392                assert ((walk - tmp) == diffs6.addedCount * 18);
2393                tr_variantDictAddRaw (&val, TR_KEY_added6, tmp, walk - tmp);
2394                tr_free (tmp);
2395
2396                /* "added6.f"
2397                 * unset each holepunch flag because we don't support it. */
2398                tmp = walk = tr_new (uint8_t, diffs6.addedCount);
2399                for (i = 0; i < diffs6.addedCount; ++i)
2400                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2401                assert ((walk - tmp) == diffs6.addedCount);
2402                tr_variantDictAddRaw (&val, TR_KEY_added6_f, tmp, walk - tmp);
2403                tr_free (tmp);
2404            }
2405
2406            if (diffs6.droppedCount > 0)
2407            {
2408                /* "dropped6" */
2409                tmp = walk = tr_new (uint8_t, diffs6.droppedCount * 18);
2410                for (i = 0; i < diffs6.droppedCount; ++i) {
2411                    memcpy (walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16);
2412                    walk += 16;
2413                    memcpy (walk, &diffs6.dropped[i].port, 2);
2414                    walk += 2;
2415                }
2416                assert ((walk - tmp) == diffs6.droppedCount * 18);
2417                tr_variantDictAddRaw (&val, TR_KEY_dropped6, tmp, walk - tmp);
2418                tr_free (tmp);
2419            }
2420
2421            /* write the pex message */
2422            payload = tr_variantToBuf (&val, TR_VARIANT_FMT_BENC);
2423            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
2424            evbuffer_add_uint8 (out, BT_LTEP);
2425            evbuffer_add_uint8 (out, msgs->ut_pex_id);
2426            evbuffer_add_buffer (out, payload);
2427            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
2428            dbgmsg (msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length (out));
2429            dbgOutMessageLen (msgs);
2430
2431            evbuffer_free (payload);
2432            tr_variantFree (&val);
2433        }
2434
2435        /* cleanup */
2436        tr_free (diffs.added);
2437        tr_free (diffs.dropped);
2438        tr_free (newPex);
2439        tr_free (diffs6.added);
2440        tr_free (diffs6.dropped);
2441        tr_free (newPex6);
2442
2443        /*msgs->clientSentPexAt = tr_time ();*/
2444    }
2445}
2446
2447static void
2448pexPulse (int foo UNUSED, short bar UNUSED, void * vmsgs)
2449{
2450    struct tr_peerMsgs * msgs = vmsgs;
2451
2452    sendPex (msgs);
2453
2454    assert (msgs->pexTimer != NULL);
2455    tr_timerAdd (msgs->pexTimer, PEX_INTERVAL_SECS, 0);
2456}
2457
2458/***
2459****  tr_peer virtual functions
2460***/
2461
2462static bool
2463peermsgs_is_transferring_pieces (const struct tr_peer * peer,
2464                                 uint64_t               now,
2465                                 tr_direction           direction, 
2466                                 unsigned int         * setme_Bps)
2467{
2468  unsigned int Bps = 0;
2469
2470  if (tr_isPeerMsgs (peer))
2471    {
2472      const tr_peerMsgs * msgs = (const tr_peerMsgs *) peer;
2473      Bps = tr_peerIoGetPieceSpeed_Bps (msgs->io, now, direction);
2474    }
2475
2476  if (setme_Bps != NULL)
2477    *setme_Bps = Bps;
2478
2479  return Bps > 0;
2480}
2481
2482static void
2483peermsgs_destruct (tr_peer * peer)
2484{
2485  tr_peerMsgs * msgs = PEER_MSGS (peer);
2486
2487  assert (msgs != NULL);
2488
2489  tr_peerMsgsSetActive (msgs, TR_UP, false);
2490  tr_peerMsgsSetActive (msgs, TR_DOWN, false);
2491
2492  if (msgs->pexTimer != NULL)
2493    event_free (msgs->pexTimer);
2494
2495  if (msgs->incoming.block != NULL)
2496    evbuffer_free (msgs->incoming.block);
2497
2498  if (msgs->io)
2499    {
2500      tr_peerIoClear (msgs->io);
2501      tr_peerIoUnref (msgs->io); /* balanced by the ref in handshakeDoneCB () */
2502    }
2503
2504  evbuffer_free (msgs->outMessages);
2505  tr_free (msgs->pex6);
2506  tr_free (msgs->pex);
2507
2508  tr_peerDestruct (&msgs->peer);
2509
2510  memset (msgs, ~0, sizeof (tr_peerMsgs));
2511}
2512
2513static const struct tr_peer_virtual_funcs my_funcs =
2514{
2515  .destruct = peermsgs_destruct,
2516  .is_transferring_pieces = peermsgs_is_transferring_pieces
2517};
2518
2519/***
2520****
2521***/
2522
2523time_t
2524tr_peerMsgsGetConnectionAge (const tr_peerMsgs * msgs)
2525{
2526  assert (tr_isPeerMsgs (msgs));
2527
2528  return tr_peerIoGetAge (msgs->io);
2529}
2530
2531bool
2532tr_peerMsgsIsPeerChoked (const tr_peerMsgs * msgs)
2533{
2534  assert (tr_isPeerMsgs (msgs));
2535
2536  return msgs->peer_is_choked;
2537}
2538
2539bool
2540tr_peerMsgsIsPeerInterested (const tr_peerMsgs * msgs)
2541{
2542  assert (tr_isPeerMsgs (msgs));
2543
2544  return msgs->peer_is_interested;
2545}
2546
2547bool
2548tr_peerMsgsIsClientChoked (const tr_peerMsgs * msgs)
2549{
2550  assert (tr_isPeerMsgs (msgs));
2551
2552  return msgs->client_is_choked;
2553}
2554
2555bool
2556tr_peerMsgsIsClientInterested (const tr_peerMsgs * msgs)
2557{
2558  assert (tr_isPeerMsgs (msgs));
2559
2560  return msgs->client_is_interested;
2561}
2562
2563bool
2564tr_peerMsgsIsUtpConnection (const tr_peerMsgs * msgs)
2565{
2566  assert (tr_isPeerMsgs (msgs));
2567
2568  return msgs->io->utp_socket != NULL;
2569}
2570
2571bool
2572tr_peerMsgsIsEncrypted (const tr_peerMsgs * msgs)
2573{
2574  assert (tr_isPeerMsgs (msgs));
2575
2576  return tr_peerIoIsEncrypted (msgs->io);
2577}
2578
2579bool
2580tr_peerMsgsIsIncomingConnection (const tr_peerMsgs * msgs)
2581{
2582  assert (tr_isPeerMsgs (msgs));
2583
2584  return tr_peerIoIsIncoming (msgs->io);
2585}
2586
2587/***
2588****
2589***/
2590
2591bool
2592tr_isPeerMsgs (const void * msgs)
2593{
2594  /* FIXME: this is pretty crude */
2595  return (msgs != NULL)
2596      && (((struct tr_peerMsgs*)msgs)->magic_number == MAGIC_NUMBER);
2597}
2598
2599tr_peerMsgs *
2600tr_peerMsgsCast (void * vm)
2601{
2602  return tr_isPeerMsgs(vm) ? vm : NULL;
2603}
2604
2605tr_peerMsgs *
2606tr_peerMsgsNew (struct tr_torrent    * torrent,
2607                struct tr_peerIo     * io,
2608                tr_peer_callback     * callback,
2609                void                 * callbackData)
2610{
2611  tr_peerMsgs * m;
2612
2613  assert (io != NULL);
2614
2615  m = tr_new0 (tr_peerMsgs, 1);
2616
2617  tr_peerConstruct (&m->peer, torrent);
2618  m->peer.funcs = &my_funcs;
2619
2620  m->magic_number = MAGIC_NUMBER;
2621  m->client_is_choked = true;
2622  m->peer_is_choked = true;
2623  m->client_is_interested = false;
2624  m->peer_is_interested = false;
2625  m->is_active[TR_UP] = false;
2626  m->is_active[TR_DOWN] = false;
2627  m->callback = callback;
2628  m->callbackData = callbackData;
2629  m->io = io;
2630  m->torrent = torrent;
2631  m->state = AWAITING_BT_LENGTH;
2632  m->outMessages = evbuffer_new ();
2633  m->outMessagesBatchedAt = 0;
2634  m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2635
2636  if (tr_torrentAllowsPex (torrent))
2637    {
2638      m->pexTimer = evtimer_new (torrent->session->event_base, pexPulse, m);
2639      tr_timerAdd (m->pexTimer, PEX_INTERVAL_SECS, 0);
2640    }
2641
2642  if (tr_peerIoSupportsUTP (m->io))
2643    {
2644      const tr_address * addr = tr_peerIoGetAddress (m->io, NULL);
2645      tr_peerMgrSetUtpSupported (torrent, addr);
2646      tr_peerMgrSetUtpFailed (torrent, addr, false);
2647    }
2648
2649  if (tr_peerIoSupportsLTEP (m->io))
2650    sendLtepHandshake (m);
2651
2652  tellPeerWhatWeHave (m);
2653
2654  if (tr_dhtEnabled (torrent->session) && tr_peerIoSupportsDHT (m->io))
2655    {
2656      /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2657      const struct tr_address *addr = tr_peerIoGetAddress (m->io, NULL);
2658      if (addr->type == TR_AF_INET || tr_globalIPv6 ())
2659        protocolSendPort (m, tr_dhtPort (torrent->session));
2660    }
2661
2662  tr_peerIoSetIOFuncs (m->io, canRead, didWrite, gotError, m);
2663  updateDesiredRequestCount (m);
2664
2665  return m;
2666}
Note: See TracBrowser for help on using the repository browser.