source: trunk/libtransmission/peer-msgs.c @ 14083

Last change on this file since 14083 was 14083, checked in by jordan, 9 years ago

(trunk, libT) fix tr_torrentStat() regression in the nightlies reported in #5294 by mw3demo

  • Property svn:keywords set to Date Rev Author Id
File size: 74.7 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2 (b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 14083 2013-05-27 21:04:48Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdarg.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <event2/buffer.h>
20#include <event2/bufferevent.h>
21#include <event2/event.h>
22
23#include "transmission.h"
24#include "cache.h"
25#include "completion.h"
26#include "crypto.h" /* tr_sha1 () */
27#include "log.h"
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-msgs.h"
31#include "session.h"
32#include "torrent.h"
33#include "torrent-magnet.h"
34#include "tr-dht.h"
35#include "utils.h"
36#include "variant.h"
37#include "version.h"
38
39/**
40***
41**/
42
43enum
44{
45  BT_CHOKE                = 0,
46  BT_UNCHOKE              = 1,
47  BT_INTERESTED           = 2,
48  BT_NOT_INTERESTED       = 3,
49  BT_HAVE                 = 4,
50  BT_BITFIELD             = 5,
51  BT_REQUEST              = 6,
52  BT_PIECE                = 7,
53  BT_CANCEL               = 8,
54  BT_PORT                 = 9,
55
56  BT_FEXT_SUGGEST         = 13,
57  BT_FEXT_HAVE_ALL        = 14,
58  BT_FEXT_HAVE_NONE       = 15,
59  BT_FEXT_REJECT          = 16,
60  BT_FEXT_ALLOWED_FAST    = 17,
61
62  BT_LTEP                 = 20,
63
64  LTEP_HANDSHAKE          = 0,
65
66  UT_PEX_ID               = 1,
67  UT_METADATA_ID          = 3,
68
69  MAX_PEX_PEER_COUNT      = 50,
70
71  MIN_CHOKE_PERIOD_SEC    = 10,
72
73  /* idle seconds before we send a keepalive */
74  KEEPALIVE_INTERVAL_SECS = 100,
75
76  PEX_INTERVAL_SECS       = 90, /* sec between sendPex () calls */
77
78  REQQ                    = 512,
79
80  METADATA_REQQ           = 64,
81
82  MAGIC_NUMBER            = 21549,
83
84  /* used in lowering the outMessages queue period */
85  IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
86  HIGH_PRIORITY_INTERVAL_SECS = 2,
87  LOW_PRIORITY_INTERVAL_SECS = 10,
88
89  /* number of pieces we'll allow in our fast set */
90  MAX_FAST_SET_SIZE = 3,
91
92  /* how many blocks to keep prefetched per peer */
93  PREFETCH_SIZE = 18,
94
95  /* when we're making requests from another peer,
96     batch them together to send enough requests to
97     meet our bandwidth goals for the next N seconds */
98  REQUEST_BUF_SECS = 10,
99
100  /* defined in BEP #9 */
101  METADATA_MSG_TYPE_REQUEST = 0,
102  METADATA_MSG_TYPE_DATA = 1,
103  METADATA_MSG_TYPE_REJECT = 2
104};
105
106enum
107{
108  AWAITING_BT_LENGTH,
109  AWAITING_BT_ID,
110  AWAITING_BT_MESSAGE,
111  AWAITING_BT_PIECE
112};
113
114typedef enum
115{
116  ENCRYPTION_PREFERENCE_UNKNOWN,
117  ENCRYPTION_PREFERENCE_YES,
118  ENCRYPTION_PREFERENCE_NO
119}
120encryption_preference_t;
121
122/**
123***
124**/
125
126struct peer_request
127{
128  uint32_t index;
129  uint32_t offset;
130  uint32_t length;
131};
132
133static void
134blockToReq (const tr_torrent     * tor,
135            tr_block_index_t       block,
136            struct peer_request  * setme)
137{
138  tr_torrentGetBlockLocation (tor, block, &setme->index,
139                                          &setme->offset,
140                                          &setme->length);
141}
142
143/**
144***
145**/
146
147/* this is raw, unchanged data from the peer regarding
148 * the current message that it's sending us. */
149struct tr_incoming
150{
151  uint8_t                id;
152  uint32_t               length; /* includes the +1 for id length */
153  struct peer_request    blockReq; /* metadata for incoming blocks */
154  struct evbuffer      * block; /* piece data for incoming blocks */
155};
156
157/**
158 * Low-level communication state information about a connected peer.
159 *
160 * This structure remembers the low-level protocol states that we're
161 * in with this peer, such as active requests, pex messages, and so on.
162 * Its fields are all private to peer-msgs.c.
163 *
164 * Data not directly involved with sending & receiving messages is
165 * stored in tr_peer, where it can be accessed by both peermsgs and
166 * the peer manager.
167 *
168 * @see struct peer_atom
169 * @see tr_peer
170 */
171struct tr_peerMsgs
172{
173  struct tr_peer peer; /* parent */
174
175  uint16_t magic_number;
176
177  /* Whether or not we've choked this peer. */
178  bool peer_is_choked;
179
180  /* whether or not the peer has indicated it will download from us. */
181  bool peer_is_interested;
182
183  /* whether or the peer is choking us. */
184  bool client_is_choked;
185
186  /* whether or not we've indicated to the peer that we would download from them if unchoked. */
187  bool client_is_interested;
188
189
190  bool peerSupportsPex;
191  bool peerSupportsMetadataXfer;
192  bool clientSentLtepHandshake;
193  bool peerSentLtepHandshake;
194
195  /*bool haveFastSet;*/
196
197  int desiredRequestCount;
198
199  int prefetchCount;
200
201  int is_active[2];
202
203  /* how long the outMessages batch should be allowed to grow before
204   * it's flushed -- some messages (like requests >:) should be sent
205   * very quickly; others aren't as urgent. */
206  int8_t          outMessagesBatchPeriod;
207
208  uint8_t         state;
209  uint8_t         ut_pex_id;
210  uint8_t         ut_metadata_id;
211  uint16_t        pexCount;
212  uint16_t        pexCount6;
213
214  tr_port         dht_port;
215
216  encryption_preference_t  encryption_preference;
217
218  size_t                   metadata_size_hint;
219#if 0
220  size_t                 fastsetSize;
221  tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
222#endif
223
224  tr_torrent *           torrent;
225
226  tr_peer_callback      * callback;
227  void                  * callbackData;
228
229  struct evbuffer *      outMessages; /* all the non-piece messages */
230
231  struct peer_request    peerAskedFor[REQQ];
232
233  int peerAskedForMetadata[METADATA_REQQ];
234  int peerAskedForMetadataCount;
235
236  tr_pex * pex;
237  tr_pex * pex6;
238
239  /*time_t clientSentPexAt;*/
240  time_t clientSentAnythingAt;
241
242  time_t chokeChangedAt;
243
244  /* when we started batching the outMessages */
245  time_t outMessagesBatchedAt;
246
247  struct tr_incoming    incoming;
248
249  /* if the peer supports the Extension Protocol in BEP 10 and
250     supplied a reqq argument, it's stored here. Otherwise, the
251     value is zero and should be ignored. */
252  int64_t reqq;
253
254  struct event * pexTimer;
255
256  struct tr_peerIo * io;
257};
258
259/**
260***
261**/
262
263static inline tr_session*
264getSession (struct tr_peerMsgs * msgs)
265{
266  return msgs->torrent->session;
267}
268
269/**
270***
271**/
272
273static void
274myDebug (const char * file, int line,
275         const struct tr_peerMsgs * msgs,
276         const char * fmt, ...) TR_GNUC_PRINTF(4, 5);
277
278static void
279myDebug (const char * file, int line,
280         const struct tr_peerMsgs * msgs,
281         const char * fmt, ...)
282{
283  FILE * fp = tr_logGetFile ();
284
285  if (fp)
286    {
287      va_list           args;
288      char              timestr[64];
289      struct evbuffer * buf = evbuffer_new ();
290      char *            base = tr_basename (file);
291      char *            message;
292
293      evbuffer_add_printf (buf, "[%s] %s - %s [%s]: ",
294                           tr_logGetTimeStr (timestr, sizeof (timestr)),
295                           tr_torrentName (msgs->torrent),
296                           tr_peerIoGetAddrStr (msgs->io),
297                           tr_quark_get_string (msgs->peer.client, NULL));
298      va_start (args, fmt);
299      evbuffer_add_vprintf (buf, fmt, args);
300      va_end (args);
301      evbuffer_add_printf (buf, " (%s:%d)\n", base, line);
302
303      message = evbuffer_free_to_str (buf);
304      fputs (message, fp);
305
306      tr_free (base);
307      tr_free (message);
308    }
309}
310
311#define dbgmsg(msgs, ...) \
312  do \
313    { \
314      if (tr_logGetDeepEnabled ()) \
315        myDebug (__FILE__, __LINE__, msgs, __VA_ARGS__); \
316    } \
317  while (0)
318
319/**
320***
321**/
322
323static void
324pokeBatchPeriod (tr_peerMsgs * msgs, int interval)
325{
326  if (msgs->outMessagesBatchPeriod > interval)
327    {
328      msgs->outMessagesBatchPeriod = interval;
329      dbgmsg (msgs, "lowering batch interval to %d seconds", interval);
330    }
331}
332
333static void
334dbgOutMessageLen (tr_peerMsgs * msgs)
335{
336  dbgmsg (msgs, "outMessage size is now %zu", evbuffer_get_length (msgs->outMessages));
337}
338
339static void
340protocolSendReject (tr_peerMsgs * msgs, const struct peer_request * req)
341{
342  struct evbuffer * out = msgs->outMessages;
343
344  assert (tr_peerIoSupportsFEXT (msgs->io));
345
346  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
347  evbuffer_add_uint8 (out, BT_FEXT_REJECT);
348  evbuffer_add_uint32 (out, req->index);
349  evbuffer_add_uint32 (out, req->offset);
350  evbuffer_add_uint32 (out, req->length);
351
352  dbgmsg (msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length);
353  dbgOutMessageLen (msgs);
354}
355
356static void
357protocolSendRequest (tr_peerMsgs * msgs, const struct peer_request * req)
358{
359  struct evbuffer * out = msgs->outMessages;
360
361  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
362  evbuffer_add_uint8 (out, BT_REQUEST);
363  evbuffer_add_uint32 (out, req->index);
364  evbuffer_add_uint32 (out, req->offset);
365  evbuffer_add_uint32 (out, req->length);
366
367  dbgmsg (msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length);
368  dbgOutMessageLen (msgs);
369  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
370}
371
372static void
373protocolSendCancel (tr_peerMsgs * msgs, const struct peer_request * req)
374{
375  struct evbuffer * out = msgs->outMessages;
376
377  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
378  evbuffer_add_uint8 (out, BT_CANCEL);
379  evbuffer_add_uint32 (out, req->index);
380  evbuffer_add_uint32 (out, req->offset);
381  evbuffer_add_uint32 (out, req->length);
382
383  dbgmsg (msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length);
384  dbgOutMessageLen (msgs);
385  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
386}
387
388static void
389protocolSendPort (tr_peerMsgs *msgs, uint16_t port)
390{
391  struct evbuffer * out = msgs->outMessages;
392
393  dbgmsg (msgs, "sending Port %u", port);
394  evbuffer_add_uint32 (out, 3);
395  evbuffer_add_uint8 (out, BT_PORT);
396  evbuffer_add_uint16 (out, port);
397}
398
399static void
400protocolSendHave (tr_peerMsgs * msgs, uint32_t index)
401{
402  struct evbuffer * out = msgs->outMessages;
403
404  evbuffer_add_uint32 (out, sizeof (uint8_t) + sizeof (uint32_t));
405  evbuffer_add_uint8 (out, BT_HAVE);
406  evbuffer_add_uint32 (out, index);
407
408  dbgmsg (msgs, "sending Have %u", index);
409  dbgOutMessageLen (msgs);
410  pokeBatchPeriod (msgs, LOW_PRIORITY_INTERVAL_SECS);
411}
412
413#if 0
414static void
415protocolSendAllowedFast (tr_peerMsgs * msgs, uint32_t pieceIndex)
416{
417  tr_peerIo       * io  = msgs->io;
418  struct evbuffer * out = msgs->outMessages;
419
420  assert (tr_peerIoSupportsFEXT (msgs->io));
421
422  evbuffer_add_uint32 (io, out, sizeof (uint8_t) + sizeof (uint32_t));
423  evbuffer_add_uint8 (io, out, BT_FEXT_ALLOWED_FAST);
424  evbuffer_add_uint32 (io, out, pieceIndex);
425
426  dbgmsg (msgs, "sending Allowed Fast %u...", pieceIndex);
427  dbgOutMessageLen (msgs);
428}
429#endif
430
431static void
432protocolSendChoke (tr_peerMsgs * msgs, int choke)
433{
434  struct evbuffer * out = msgs->outMessages;
435
436  evbuffer_add_uint32 (out, sizeof (uint8_t));
437  evbuffer_add_uint8 (out, choke ? BT_CHOKE : BT_UNCHOKE);
438
439  dbgmsg (msgs, "sending %s...", choke ? "Choke" : "Unchoke");
440  dbgOutMessageLen (msgs);
441  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
442}
443
444static void
445protocolSendHaveAll (tr_peerMsgs * msgs)
446{
447  struct evbuffer * out = msgs->outMessages;
448
449  assert (tr_peerIoSupportsFEXT (msgs->io));
450
451  evbuffer_add_uint32 (out, sizeof (uint8_t));
452  evbuffer_add_uint8 (out, BT_FEXT_HAVE_ALL);
453
454  dbgmsg (msgs, "sending HAVE_ALL...");
455  dbgOutMessageLen (msgs);
456  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
457}
458
459static void
460protocolSendHaveNone (tr_peerMsgs * msgs)
461{
462  struct evbuffer * out = msgs->outMessages;
463
464  assert (tr_peerIoSupportsFEXT (msgs->io));
465
466  evbuffer_add_uint32 (out, sizeof (uint8_t));
467  evbuffer_add_uint8 (out, BT_FEXT_HAVE_NONE);
468
469  dbgmsg (msgs, "sending HAVE_NONE...");
470  dbgOutMessageLen (msgs);
471  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
472}
473
474/**
475***  EVENTS
476**/
477
478static void
479publish (tr_peerMsgs * msgs, tr_peer_event * e)
480{
481  if (msgs->callback != NULL)
482    msgs->callback (&msgs->peer, e, msgs->callbackData);
483}
484
485static void
486fireError (tr_peerMsgs * msgs, int err)
487{
488  tr_peer_event e = TR_PEER_EVENT_INIT;
489  e.eventType = TR_PEER_ERROR;
490  e.err = err;
491  publish (msgs, &e);
492}
493
494static void
495fireGotBlock (tr_peerMsgs * msgs, const struct peer_request * req)
496{
497  tr_peer_event e = TR_PEER_EVENT_INIT;
498  e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
499  e.pieceIndex = req->index;
500  e.offset = req->offset;
501  e.length = req->length;
502  publish (msgs, &e);
503}
504
505static void
506fireGotRej (tr_peerMsgs * msgs, const struct peer_request * req)
507{
508  tr_peer_event e = TR_PEER_EVENT_INIT;
509  e.eventType = TR_PEER_CLIENT_GOT_REJ;
510  e.pieceIndex = req->index;
511  e.offset = req->offset;
512  e.length = req->length;
513  publish (msgs, &e);
514}
515
516static void
517fireGotChoke (tr_peerMsgs * msgs)
518{
519  tr_peer_event e = TR_PEER_EVENT_INIT;
520  e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
521  publish (msgs, &e);
522}
523
524static void
525fireClientGotHaveAll (tr_peerMsgs * msgs)
526{
527  tr_peer_event e = TR_PEER_EVENT_INIT;
528  e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
529  publish (msgs, &e);
530}
531
532static void
533fireClientGotHaveNone (tr_peerMsgs * msgs)
534{
535  tr_peer_event e = TR_PEER_EVENT_INIT;
536  e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
537  publish (msgs, &e);
538}
539
540static void
541fireClientGotPieceData (tr_peerMsgs * msgs, uint32_t length)
542{
543  tr_peer_event e = TR_PEER_EVENT_INIT;
544  e.length = length;
545  e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA;
546  publish (msgs, &e);
547}
548
549static void
550firePeerGotPieceData (tr_peerMsgs * msgs, uint32_t length)
551{
552  tr_peer_event e = TR_PEER_EVENT_INIT;
553  e.length = length;
554  e.eventType = TR_PEER_PEER_GOT_PIECE_DATA;
555  publish (msgs, &e);
556}
557
558static void
559fireClientGotSuggest (tr_peerMsgs * msgs, uint32_t pieceIndex)
560{
561  tr_peer_event e = TR_PEER_EVENT_INIT;
562  e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
563  e.pieceIndex = pieceIndex;
564  publish (msgs, &e);
565}
566
567static void
568fireClientGotPort (tr_peerMsgs * msgs, tr_port port)
569{
570  tr_peer_event e = TR_PEER_EVENT_INIT;
571  e.eventType = TR_PEER_CLIENT_GOT_PORT;
572  e.port = port;
573  publish (msgs, &e);
574}
575
576static void
577fireClientGotAllowedFast (tr_peerMsgs * msgs, uint32_t pieceIndex)
578{
579  tr_peer_event e = TR_PEER_EVENT_INIT;
580  e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
581  e.pieceIndex = pieceIndex;
582  publish (msgs, &e);
583}
584
585static void
586fireClientGotBitfield (tr_peerMsgs * msgs, tr_bitfield * bitfield)
587{
588  tr_peer_event e = TR_PEER_EVENT_INIT;
589  e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
590  e.bitfield = bitfield;
591  publish (msgs, &e);
592}
593
594static void
595fireClientGotHave (tr_peerMsgs * msgs, tr_piece_index_t index)
596{
597  tr_peer_event e = TR_PEER_EVENT_INIT;
598  e.eventType = TR_PEER_CLIENT_GOT_HAVE;
599  e.pieceIndex = index;
600  publish (msgs, &e);
601}
602
603
604/**
605***  ALLOWED FAST SET
606***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
607**/
608
609#if 0
610size_t
611tr_generateAllowedSet (tr_piece_index_t * setmePieces,
612                       size_t             desiredSetSize,
613                       size_t             pieceCount,
614                       const uint8_t    * infohash,
615                       const tr_address * addr)
616{
617    size_t setSize = 0;
618
619    assert (setmePieces);
620    assert (desiredSetSize <= pieceCount);
621    assert (desiredSetSize);
622    assert (pieceCount);
623    assert (infohash);
624    assert (addr);
625
626    if (addr->type == TR_AF_INET)
627    {
628        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
629        uint8_t x[SHA_DIGEST_LENGTH];
630
631        uint32_t ui32 = ntohl (htonl (addr->addr.addr4.s_addr) & 0xffffff00);   /* (1) */
632        memcpy (w, &ui32, sizeof (uint32_t));
633        walk += sizeof (uint32_t);
634        memcpy (walk, infohash, SHA_DIGEST_LENGTH);                 /* (2) */
635        walk += SHA_DIGEST_LENGTH;
636        tr_sha1 (x, w, walk-w, NULL);                               /* (3) */
637        assert (sizeof (w) == walk-w);
638
639        while (setSize<desiredSetSize)
640        {
641            int i;
642            for (i=0; i<5 && setSize<desiredSetSize; ++i)           /* (4) */
643            {
644                size_t k;
645                uint32_t j = i * 4;                                  /* (5) */
646                uint32_t y = ntohl (* (uint32_t*)(x + j));       /* (6) */
647                uint32_t index = y % pieceCount;                     /* (7) */
648
649                for (k=0; k<setSize; ++k)                           /* (8) */
650                    if (setmePieces[k] == index)
651                        break;
652
653                if (k == setSize)
654                    setmePieces[setSize++] = index;                  /* (9) */
655            }
656
657            tr_sha1 (x, x, sizeof (x), NULL);                      /* (3) */
658        }
659    }
660
661    return setSize;
662}
663
664static void
665updateFastSet (tr_peerMsgs * msgs UNUSED)
666{
667    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
668    const int peerIsNeedy = msgs->peer->progress < 0.10;
669
670    if (fext && peerIsNeedy && !msgs->haveFastSet)
671    {
672        size_t i;
673        const struct tr_address * addr = tr_peerIoGetAddress (msgs->io, NULL);
674        const tr_info * inf = &msgs->torrent->info;
675        const size_t numwant = MIN (MAX_FAST_SET_SIZE, inf->pieceCount);
676
677        /* build the fast set */
678        msgs->fastsetSize = tr_generateAllowedSet (msgs->fastset, numwant, inf->pieceCount, inf->hash, addr);
679        msgs->haveFastSet = 1;
680
681        /* send it to the peer */
682        for (i=0; i<msgs->fastsetSize; ++i)
683            protocolSendAllowedFast (msgs, msgs->fastset[i]);
684    }
685}
686#endif
687
688/***
689****  ACTIVE
690***/
691
692bool
693tr_peerMsgsIsActive (const tr_peerMsgs  * msgs, tr_direction direction)
694{
695  assert (tr_isPeerMsgs (msgs));
696  assert (tr_isDirection (direction));
697
698  return msgs->is_active[direction];
699}
700
701static void
702tr_peerMsgsSetActive (tr_peerMsgs  * msgs,
703                      tr_direction   direction,
704                      bool           is_active)
705{
706  if (msgs->is_active[direction] != is_active)
707    {
708      int n = msgs->torrent->activePeerCount[direction];
709
710      msgs->is_active[direction] = is_active;
711
712      if (is_active)
713        ++n;
714      else
715        --n;
716      assert (0 <= n);
717      assert (n <= msgs->torrent->peerCount);
718
719      msgs->torrent->activePeerCount[direction] = n;
720    }
721}
722
723void
724tr_peerMsgsUpdateActive (tr_peerMsgs * msgs, tr_direction direction)
725{
726  bool active;
727
728  assert (tr_isPeerMsgs (msgs));
729  assert (tr_isDirection (direction));
730
731  if (direction == TR_CLIENT_TO_PEER)
732    {
733      active = tr_peerMsgsIsPeerInterested (msgs)
734           && !tr_peerMsgsIsPeerChoked (msgs);
735    }
736  else /* TR_PEER_TO_CLIENT */
737    {
738      if (!tr_torrentHasMetadata (msgs->torrent))
739        active = true;
740      else
741        active = tr_peerMsgsIsClientInterested (msgs)
742             && !tr_peerMsgsIsClientChoked (msgs);
743    }
744
745  tr_peerMsgsSetActive (msgs, direction, active);
746}
747
748/**
749***  INTEREST
750**/
751
752static void
753sendInterest (tr_peerMsgs * msgs, bool b)
754{
755  struct evbuffer * out = msgs->outMessages;
756
757  assert (msgs);
758  assert (tr_isBool (b));
759
760  msgs->client_is_interested = b;
761  dbgmsg (msgs, "Sending %s", b ? "Interested" : "Not Interested");
762  evbuffer_add_uint32 (out, sizeof (uint8_t));
763  evbuffer_add_uint8 (out, b ? BT_INTERESTED : BT_NOT_INTERESTED);
764
765  pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
766  dbgOutMessageLen (msgs);
767}
768
769static void
770updateInterest (tr_peerMsgs * msgs UNUSED)
771{
772    /* FIXME -- might need to poke the mgr on startup */
773}
774
775void
776tr_peerMsgsSetInterested (tr_peerMsgs * msgs, bool b)
777{
778  assert (tr_isBool (b));
779
780  if (msgs->client_is_interested != b)
781    {
782      sendInterest (msgs, b);
783
784      tr_peerMsgsUpdateActive (msgs, TR_PEER_TO_CLIENT);
785    }
786}
787
788static bool
789popNextMetadataRequest (tr_peerMsgs * msgs, int * piece)
790{
791  if (msgs->peerAskedForMetadataCount == 0)
792    return false;
793
794  *piece = msgs->peerAskedForMetadata[0];
795
796  tr_removeElementFromArray (msgs->peerAskedForMetadata, 0, sizeof (int),
797                             msgs->peerAskedForMetadataCount--);
798
799  return true;
800}
801
802static bool
803popNextRequest (tr_peerMsgs * msgs, struct peer_request * setme)
804{
805  if (msgs->peer.pendingReqsToClient == 0)
806    return false;
807
808  *setme = msgs->peerAskedFor[0];
809
810  tr_removeElementFromArray (msgs->peerAskedFor,
811                             0,
812                             sizeof (struct peer_request),
813                             msgs->peer.pendingReqsToClient--);
814
815  return true;
816}
817
818static void
819cancelAllRequestsToClient (tr_peerMsgs * msgs)
820{
821  struct peer_request req;
822  const int mustSendCancel = tr_peerIoSupportsFEXT (msgs->io);
823
824  while (popNextRequest (msgs, &req))
825    if (mustSendCancel)
826      protocolSendReject (msgs, &req);
827}
828
829void
830tr_peerMsgsSetChoke (tr_peerMsgs * msgs, bool peer_is_choked)
831{
832  const time_t now = tr_time ();
833  const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
834
835  assert (msgs != NULL);
836  assert (tr_isBool (peer_is_choked));
837
838  if (msgs->chokeChangedAt > fibrillationTime)
839    {
840      dbgmsg (msgs, "Not changing choke to %d to avoid fibrillation", peer_is_choked);
841    }
842  else if (msgs->peer_is_choked != peer_is_choked)
843    {
844      msgs->peer_is_choked = peer_is_choked;
845      if (peer_is_choked)
846        cancelAllRequestsToClient (msgs);
847      protocolSendChoke (msgs, peer_is_choked);
848      msgs->chokeChangedAt = now;
849    }
850}
851
852/**
853***
854**/
855
856void
857tr_peerMsgsHave (tr_peerMsgs * msgs, uint32_t index)
858{
859  protocolSendHave (msgs, index);
860
861  /* since we have more pieces now, we might not be interested in this peer */
862  updateInterest (msgs);
863}
864
865/**
866***
867**/
868
869static bool
870reqIsValid (const tr_peerMsgs * peer,
871            uint32_t            index,
872            uint32_t            offset,
873            uint32_t            length)
874{
875    return tr_torrentReqIsValid (peer->torrent, index, offset, length);
876}
877
878static bool
879requestIsValid (const tr_peerMsgs * msgs, const struct peer_request * req)
880{
881    return reqIsValid (msgs, req->index, req->offset, req->length);
882}
883
884void
885tr_peerMsgsCancel (tr_peerMsgs * msgs, tr_block_index_t block)
886{
887    struct peer_request req;
888/*fprintf (stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer);*/
889    blockToReq (msgs->torrent, block, &req);
890    protocolSendCancel (msgs, &req);
891}
892
893/**
894***
895**/
896
897static void
898sendLtepHandshake (tr_peerMsgs * msgs)
899{
900    tr_variant val;
901    bool allow_pex;
902    bool allow_metadata_xfer;
903    struct evbuffer * payload;
904    struct evbuffer * out = msgs->outMessages;
905    const unsigned char * ipv6 = tr_globalIPv6 ();
906    static tr_quark version_quark = 0;
907
908    if (msgs->clientSentLtepHandshake)
909        return;
910
911    if (!version_quark)
912      version_quark = tr_quark_new (TR_NAME " " USERAGENT_PREFIX, -1);
913
914    dbgmsg (msgs, "sending an ltep handshake");
915    msgs->clientSentLtepHandshake = 1;
916
917    /* decide if we want to advertise metadata xfer support (BEP 9) */
918    if (tr_torrentIsPrivate (msgs->torrent))
919        allow_metadata_xfer = 0;
920    else
921        allow_metadata_xfer = 1;
922
923    /* decide if we want to advertise pex support */
924    if (!tr_torrentAllowsPex (msgs->torrent))
925        allow_pex = 0;
926    else if (msgs->peerSentLtepHandshake)
927        allow_pex = msgs->peerSupportsPex ? 1 : 0;
928    else
929        allow_pex = 1;
930
931    tr_variantInitDict (&val, 8);
932    tr_variantDictAddInt (&val, TR_KEY_e, getSession (msgs)->encryptionMode != TR_CLEAR_PREFERRED);
933    if (ipv6 != NULL)
934        tr_variantDictAddRaw (&val, TR_KEY_ipv6, ipv6, 16);
935    if (allow_metadata_xfer && tr_torrentHasMetadata (msgs->torrent)
936                            && (msgs->torrent->infoDictLength > 0))
937        tr_variantDictAddInt (&val, TR_KEY_metadata_size, msgs->torrent->infoDictLength);
938    tr_variantDictAddInt (&val, TR_KEY_p, tr_sessionGetPublicPeerPort (getSession (msgs)));
939    tr_variantDictAddInt (&val, TR_KEY_reqq, REQQ);
940    tr_variantDictAddInt (&val, TR_KEY_upload_only, tr_torrentIsSeed (msgs->torrent));
941    tr_variantDictAddQuark (&val, TR_KEY_v, version_quark);
942    if (allow_metadata_xfer || allow_pex) {
943        tr_variant * m  = tr_variantDictAddDict (&val, TR_KEY_m, 2);
944        if (allow_metadata_xfer)
945            tr_variantDictAddInt (m, TR_KEY_ut_metadata, UT_METADATA_ID);
946        if (allow_pex)
947            tr_variantDictAddInt (m, TR_KEY_ut_pex, UT_PEX_ID);
948    }
949
950    payload = tr_variantToBuf (&val, TR_VARIANT_FMT_BENC);
951
952    evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
953    evbuffer_add_uint8 (out, BT_LTEP);
954    evbuffer_add_uint8 (out, LTEP_HANDSHAKE);
955    evbuffer_add_buffer (out, payload);
956    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
957    dbgOutMessageLen (msgs);
958
959    /* cleanup */
960    evbuffer_free (payload);
961    tr_variantFree (&val);
962}
963
964static void
965parseLtepHandshake (tr_peerMsgs * msgs, int len, struct evbuffer * inbuf)
966{
967    int64_t   i;
968    tr_variant   val, * sub;
969    uint8_t * tmp = tr_new (uint8_t, len);
970    const uint8_t *addr;
971    size_t addr_len;
972    tr_pex pex;
973    int8_t seedProbability = -1;
974
975    memset (&pex, 0, sizeof (tr_pex));
976
977    tr_peerIoReadBytes (msgs->io, inbuf, tmp, len);
978    msgs->peerSentLtepHandshake = 1;
979
980    if (tr_variantFromBenc (&val, tmp, len) || !tr_variantIsDict (&val))
981    {
982        dbgmsg (msgs, "GET  extended-handshake, couldn't get dictionary");
983        tr_free (tmp);
984        return;
985    }
986
987    dbgmsg (msgs, "here is the handshake: [%*.*s]", len, len,  tmp);
988
989    /* does the peer prefer encrypted connections? */
990    if (tr_variantDictFindInt (&val, TR_KEY_e, &i)) {
991        msgs->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
992                                        : ENCRYPTION_PREFERENCE_NO;
993        if (i)
994            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
995    }
996
997    /* check supported messages for utorrent pex */
998    msgs->peerSupportsPex = 0;
999    msgs->peerSupportsMetadataXfer = 0;
1000
1001    if (tr_variantDictFindDict (&val, TR_KEY_m, &sub)) {
1002        if (tr_variantDictFindInt (sub, TR_KEY_ut_pex, &i)) {
1003            msgs->peerSupportsPex = i != 0;
1004            msgs->ut_pex_id = (uint8_t) i;
1005            dbgmsg (msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id);
1006        }
1007        if (tr_variantDictFindInt (sub, TR_KEY_ut_metadata, &i)) {
1008            msgs->peerSupportsMetadataXfer = i != 0;
1009            msgs->ut_metadata_id = (uint8_t) i;
1010            dbgmsg (msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id);
1011        }
1012        if (tr_variantDictFindInt (sub, TR_KEY_ut_holepunch, &i)) {
1013            /* Mysterious µTorrent extension that we don't grok.  However,
1014               it implies support for µTP, so use it to indicate that. */
1015            tr_peerMgrSetUtpFailed (msgs->torrent,
1016                                    tr_peerIoGetAddress (msgs->io, NULL),
1017                                    false);
1018        }
1019    }
1020
1021    /* look for metainfo size (BEP 9) */
1022    if (tr_variantDictFindInt (&val, TR_KEY_metadata_size, &i)) {
1023        tr_torrentSetMetadataSizeHint (msgs->torrent, i);
1024        msgs->metadata_size_hint = (size_t) i;
1025    }
1026
1027    /* look for upload_only (BEP 21) */
1028    if (tr_variantDictFindInt (&val, TR_KEY_upload_only, &i))
1029        seedProbability = i==0 ? 0 : 100;
1030
1031    /* get peer's listening port */
1032    if (tr_variantDictFindInt (&val, TR_KEY_p, &i)) {
1033        pex.port = htons ((uint16_t)i);
1034        fireClientGotPort (msgs, pex.port);
1035        dbgmsg (msgs, "peer's port is now %d", (int)i);
1036    }
1037
1038    if (tr_peerIoIsIncoming (msgs->io)
1039        && tr_variantDictFindRaw (&val, TR_KEY_ipv4, &addr, &addr_len)
1040        && (addr_len == 4))
1041    {
1042        pex.addr.type = TR_AF_INET;
1043        memcpy (&pex.addr.addr.addr4, addr, 4);
1044        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
1045    }
1046
1047    if (tr_peerIoIsIncoming (msgs->io)
1048        && tr_variantDictFindRaw (&val, TR_KEY_ipv6, &addr, &addr_len)
1049        && (addr_len == 16))
1050    {
1051        pex.addr.type = TR_AF_INET6;
1052        memcpy (&pex.addr.addr.addr6, addr, 16);
1053        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
1054    }
1055
1056    /* get peer's maximum request queue size */
1057    if (tr_variantDictFindInt (&val, TR_KEY_reqq, &i))
1058        msgs->reqq = i;
1059
1060    tr_variantFree (&val);
1061    tr_free (tmp);
1062}
1063
1064static void
1065parseUtMetadata (tr_peerMsgs * msgs, int msglen, struct evbuffer * inbuf)
1066{
1067    tr_variant dict;
1068    char * msg_end;
1069    const char * benc_end;
1070    int64_t msg_type = -1;
1071    int64_t piece = -1;
1072    int64_t total_size = 0;
1073    uint8_t * tmp = tr_new (uint8_t, msglen);
1074
1075    tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1076    msg_end = (char*)tmp + msglen;
1077
1078    if (!tr_variantFromBencFull (&dict, tmp, msglen, NULL, &benc_end))
1079    {
1080        tr_variantDictFindInt (&dict, TR_KEY_msg_type, &msg_type);
1081        tr_variantDictFindInt (&dict, TR_KEY_piece, &piece);
1082        tr_variantDictFindInt (&dict, TR_KEY_total_size, &total_size);
1083        tr_variantFree (&dict);
1084    }
1085
1086    dbgmsg (msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
1087          (int)msg_type, (int)piece, (int)total_size);
1088
1089    if (msg_type == METADATA_MSG_TYPE_REJECT)
1090    {
1091        /* NOOP */
1092    }
1093
1094    if ((msg_type == METADATA_MSG_TYPE_DATA)
1095        && (!tr_torrentHasMetadata (msgs->torrent))
1096        && (msg_end - benc_end <= METADATA_PIECE_SIZE)
1097        && (piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size))
1098    {
1099        const int pieceLen = msg_end - benc_end;
1100        tr_torrentSetMetadataPiece (msgs->torrent, piece, benc_end, pieceLen);
1101    }
1102
1103    if (msg_type == METADATA_MSG_TYPE_REQUEST)
1104    {
1105        if ((piece >= 0)
1106            && tr_torrentHasMetadata (msgs->torrent)
1107            && !tr_torrentIsPrivate (msgs->torrent)
1108            && (msgs->peerAskedForMetadataCount < METADATA_REQQ))
1109        {
1110            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1111        }
1112        else
1113        {
1114            tr_variant tmp;
1115            struct evbuffer * payload;
1116            struct evbuffer * out = msgs->outMessages;
1117
1118            /* build the rejection message */
1119            tr_variantInitDict (&tmp, 2);
1120            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
1121            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1122            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1123
1124            /* write it out as a LTEP message to our outMessages buffer */
1125            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1126            evbuffer_add_uint8 (out, BT_LTEP);
1127            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1128            evbuffer_add_buffer (out, payload);
1129            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1130            dbgOutMessageLen (msgs);
1131
1132            /* cleanup */
1133            evbuffer_free (payload);
1134            tr_variantFree (&tmp);
1135        }
1136    }
1137
1138    tr_free (tmp);
1139}
1140
1141static void
1142parseUtPex (tr_peerMsgs * msgs, int msglen, struct evbuffer * inbuf)
1143{
1144    int loaded = 0;
1145    uint8_t * tmp = tr_new (uint8_t, msglen);
1146    tr_variant val;
1147    tr_torrent * tor = msgs->torrent;
1148    const uint8_t * added;
1149    size_t added_len;
1150
1151    tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1152
1153    if (tr_torrentAllowsPex (tor)
1154      && ((loaded = !tr_variantFromBenc (&val, tmp, msglen))))
1155    {
1156        if (tr_variantDictFindRaw (&val, TR_KEY_added, &added, &added_len))
1157        {
1158            tr_pex * pex;
1159            size_t i, n;
1160            size_t added_f_len = 0;
1161            const uint8_t * added_f = NULL;
1162
1163            tr_variantDictFindRaw (&val, TR_KEY_added_f, &added_f, &added_f_len);
1164            pex = tr_peerMgrCompactToPex (added, added_len, added_f, added_f_len, &n);
1165
1166            n = MIN (n, MAX_PEX_PEER_COUNT);
1167            for (i=0; i<n; ++i)
1168            {
1169                int seedProbability = -1;
1170                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1171                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1172            }
1173
1174            tr_free (pex);
1175        }
1176
1177        if (tr_variantDictFindRaw (&val, TR_KEY_added6, &added, &added_len))
1178        {
1179            tr_pex * pex;
1180            size_t i, n;
1181            size_t added_f_len = 0;
1182            const uint8_t * added_f = NULL;
1183
1184            tr_variantDictFindRaw (&val, TR_KEY_added6_f, &added_f, &added_f_len);
1185            pex = tr_peerMgrCompact6ToPex (added, added_len, added_f, added_f_len, &n);
1186
1187            n = MIN (n, MAX_PEX_PEER_COUNT);
1188            for (i=0; i<n; ++i)
1189            {
1190                int seedProbability = -1;
1191                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1192                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1193            }
1194
1195            tr_free (pex);
1196        }
1197    }
1198
1199    if (loaded)
1200        tr_variantFree (&val);
1201    tr_free (tmp);
1202}
1203
1204static void sendPex (tr_peerMsgs * msgs);
1205
1206static void
1207parseLtep (tr_peerMsgs * msgs, int msglen, struct evbuffer  * inbuf)
1208{
1209    uint8_t ltep_msgid;
1210
1211    tr_peerIoReadUint8 (msgs->io, inbuf, &ltep_msgid);
1212    msglen--;
1213
1214    if (ltep_msgid == LTEP_HANDSHAKE)
1215    {
1216        dbgmsg (msgs, "got ltep handshake");
1217        parseLtepHandshake (msgs, msglen, inbuf);
1218        if (tr_peerIoSupportsLTEP (msgs->io))
1219        {
1220            sendLtepHandshake (msgs);
1221            sendPex (msgs);
1222        }
1223    }
1224    else if (ltep_msgid == UT_PEX_ID)
1225    {
1226        dbgmsg (msgs, "got ut pex");
1227        msgs->peerSupportsPex = 1;
1228        parseUtPex (msgs, msglen, inbuf);
1229    }
1230    else if (ltep_msgid == UT_METADATA_ID)
1231    {
1232        dbgmsg (msgs, "got ut metadata");
1233        msgs->peerSupportsMetadataXfer = 1;
1234        parseUtMetadata (msgs, msglen, inbuf);
1235    }
1236    else
1237    {
1238        dbgmsg (msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid);
1239        evbuffer_drain (inbuf, msglen);
1240    }
1241}
1242
1243static int
1244readBtLength (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1245{
1246    uint32_t len;
1247
1248    if (inlen < sizeof (len))
1249        return READ_LATER;
1250
1251    tr_peerIoReadUint32 (msgs->io, inbuf, &len);
1252
1253    if (len == 0) /* peer sent us a keepalive message */
1254        dbgmsg (msgs, "got KeepAlive");
1255    else
1256    {
1257        msgs->incoming.length = len;
1258        msgs->state = AWAITING_BT_ID;
1259    }
1260
1261    return READ_NOW;
1262}
1263
1264static int readBtMessage (tr_peerMsgs *, struct evbuffer *, size_t);
1265
1266static int
1267readBtId (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1268{
1269    uint8_t id;
1270
1271    if (inlen < sizeof (uint8_t))
1272        return READ_LATER;
1273
1274    tr_peerIoReadUint8 (msgs->io, inbuf, &id);
1275    msgs->incoming.id = id;
1276    dbgmsg (msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length);
1277
1278    if (id == BT_PIECE)
1279    {
1280        msgs->state = AWAITING_BT_PIECE;
1281        return READ_NOW;
1282    }
1283    else if (msgs->incoming.length != 1)
1284    {
1285        msgs->state = AWAITING_BT_MESSAGE;
1286        return READ_NOW;
1287    }
1288    else return readBtMessage (msgs, inbuf, inlen - 1);
1289}
1290
1291static void
1292updatePeerProgress (tr_peerMsgs * msgs)
1293{
1294  tr_peerUpdateProgress (msgs->torrent, &msgs->peer);
1295
1296  /*updateFastSet (msgs);*/
1297  updateInterest (msgs);
1298}
1299
1300static void
1301prefetchPieces (tr_peerMsgs *msgs)
1302{
1303  int i;
1304
1305  if (!getSession (msgs)->isPrefetchEnabled)
1306    return;
1307
1308  for (i=msgs->prefetchCount; i<msgs->peer.pendingReqsToClient && i<PREFETCH_SIZE; ++i)
1309    {
1310      const struct peer_request * req = msgs->peerAskedFor + i;
1311      if (requestIsValid (msgs, req))
1312        {
1313          tr_cachePrefetchBlock (getSession (msgs)->cache, msgs->torrent, req->index, req->offset, req->length);
1314          ++msgs->prefetchCount;
1315        }
1316    }
1317}
1318
1319static void
1320peerMadeRequest (tr_peerMsgs * msgs, const struct peer_request * req)
1321{
1322    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1323    const int reqIsValid = requestIsValid (msgs, req);
1324    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete (&msgs->torrent->completion, req->index);
1325    const int peerIsChoked = msgs->peer_is_choked;
1326
1327    int allow = false;
1328
1329    if (!reqIsValid)
1330        dbgmsg (msgs, "rejecting an invalid request.");
1331    else if (!clientHasPiece)
1332        dbgmsg (msgs, "rejecting request for a piece we don't have.");
1333    else if (peerIsChoked)
1334        dbgmsg (msgs, "rejecting request from choked peer");
1335    else if (msgs->peer.pendingReqsToClient + 1 >= REQQ)
1336        dbgmsg (msgs, "rejecting request ... reqq is full");
1337    else
1338        allow = true;
1339
1340    if (allow) {
1341        msgs->peerAskedFor[msgs->peer.pendingReqsToClient++] = *req;
1342        prefetchPieces (msgs);
1343    } else if (fext) {
1344        protocolSendReject (msgs, req);
1345    }
1346}
1347
1348static bool
1349messageLengthIsCorrect (const tr_peerMsgs * msg, uint8_t id, uint32_t len)
1350{
1351    switch (id)
1352    {
1353        case BT_CHOKE:
1354        case BT_UNCHOKE:
1355        case BT_INTERESTED:
1356        case BT_NOT_INTERESTED:
1357        case BT_FEXT_HAVE_ALL:
1358        case BT_FEXT_HAVE_NONE:
1359            return len == 1;
1360
1361        case BT_HAVE:
1362        case BT_FEXT_SUGGEST:
1363        case BT_FEXT_ALLOWED_FAST:
1364            return len == 5;
1365
1366        case BT_BITFIELD:
1367            if (tr_torrentHasMetadata (msg->torrent))
1368                return len == (msg->torrent->info.pieceCount + 7u) / 8u + 1u;
1369            /* we don't know the piece count yet,
1370               so we can only guess whether to send true or false */
1371            if (msg->metadata_size_hint > 0)
1372                return len <= msg->metadata_size_hint;
1373            return true;
1374
1375        case BT_REQUEST:
1376        case BT_CANCEL:
1377        case BT_FEXT_REJECT:
1378            return len == 13;
1379
1380        case BT_PIECE:
1381            return len > 9 && len <= 16393;
1382
1383        case BT_PORT:
1384            return len == 3;
1385
1386        case BT_LTEP:
1387            return len >= 2;
1388
1389        default:
1390            return false;
1391    }
1392}
1393
1394static int clientGotBlock (tr_peerMsgs *               msgs,
1395                           struct evbuffer *           block,
1396                           const struct peer_request * req);
1397
1398static int
1399readBtPiece (tr_peerMsgs      * msgs,
1400             struct evbuffer  * inbuf,
1401             size_t             inlen,
1402             size_t           * setme_piece_bytes_read)
1403{
1404    struct peer_request * req = &msgs->incoming.blockReq;
1405
1406    assert (evbuffer_get_length (inbuf) >= inlen);
1407    dbgmsg (msgs, "In readBtPiece");
1408
1409    if (!req->length)
1410    {
1411        if (inlen < 8)
1412            return READ_LATER;
1413
1414        tr_peerIoReadUint32 (msgs->io, inbuf, &req->index);
1415        tr_peerIoReadUint32 (msgs->io, inbuf, &req->offset);
1416        req->length = msgs->incoming.length - 9;
1417        dbgmsg (msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length);
1418        return READ_NOW;
1419    }
1420    else
1421    {
1422        int err;
1423        size_t n;
1424        size_t nLeft;
1425        struct evbuffer * block_buffer;
1426
1427        if (msgs->incoming.block == NULL)
1428            msgs->incoming.block = evbuffer_new ();
1429        block_buffer = msgs->incoming.block;
1430
1431        /* read in another chunk of data */
1432        nLeft = req->length - evbuffer_get_length (block_buffer);
1433        n = MIN (nLeft, inlen);
1434
1435        tr_peerIoReadBytesToBuf (msgs->io, inbuf, block_buffer, n);
1436
1437        fireClientGotPieceData (msgs, n);
1438        *setme_piece_bytes_read += n;
1439        dbgmsg (msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1440               n, req->index, req->offset, req->length,
1441             (int)(req->length - evbuffer_get_length (block_buffer)));
1442        if (evbuffer_get_length (block_buffer) < req->length)
1443            return READ_LATER;
1444
1445        /* pass the block along... */
1446        err = clientGotBlock (msgs, block_buffer, req);
1447        evbuffer_drain (block_buffer, evbuffer_get_length (block_buffer));
1448
1449        /* cleanup */
1450        req->length = 0;
1451        msgs->state = AWAITING_BT_LENGTH;
1452        return err ? READ_ERR : READ_NOW;
1453    }
1454}
1455
1456static void updateDesiredRequestCount (tr_peerMsgs * msgs);
1457
1458static int
1459readBtMessage (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1460{
1461    uint32_t      ui32;
1462    uint32_t      msglen = msgs->incoming.length;
1463    const uint8_t id = msgs->incoming.id;
1464#ifndef NDEBUG
1465    const size_t  startBufLen = evbuffer_get_length (inbuf);
1466#endif
1467    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1468
1469    --msglen; /* id length */
1470
1471    dbgmsg (msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen);
1472
1473    if (inlen < msglen)
1474        return READ_LATER;
1475
1476    if (!messageLengthIsCorrect (msgs, id, msglen + 1))
1477    {
1478        dbgmsg (msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen);
1479        fireError (msgs, EMSGSIZE);
1480        return READ_ERR;
1481    }
1482
1483    switch (id)
1484    {
1485        case BT_CHOKE:
1486            dbgmsg (msgs, "got Choke");
1487            msgs->client_is_choked = true;
1488            if (!fext)
1489                fireGotChoke (msgs);
1490            tr_peerMsgsUpdateActive (msgs, TR_PEER_TO_CLIENT);
1491            break;
1492
1493        case BT_UNCHOKE:
1494            dbgmsg (msgs, "got Unchoke");
1495            msgs->client_is_choked = false;
1496            tr_peerMsgsUpdateActive (msgs, TR_PEER_TO_CLIENT);
1497            updateDesiredRequestCount (msgs);
1498            break;
1499
1500        case BT_INTERESTED:
1501            dbgmsg (msgs, "got Interested");
1502            msgs->peer_is_interested = true;
1503            tr_peerMsgsUpdateActive (msgs, TR_CLIENT_TO_PEER);
1504            break;
1505
1506        case BT_NOT_INTERESTED:
1507            dbgmsg (msgs, "got Not Interested");
1508            msgs->peer_is_interested = false;
1509            tr_peerMsgsUpdateActive (msgs, TR_CLIENT_TO_PEER);
1510            break;
1511
1512        case BT_HAVE:
1513            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1514            dbgmsg (msgs, "got Have: %u", ui32);
1515            if (tr_torrentHasMetadata (msgs->torrent)
1516                    && (ui32 >= msgs->torrent->info.pieceCount))
1517            {
1518                fireError (msgs, ERANGE);
1519                return READ_ERR;
1520            }
1521
1522            /* a peer can send the same HAVE message twice... */
1523            if (!tr_bitfieldHas (&msgs->peer.have, ui32)) {
1524                tr_bitfieldAdd (&msgs->peer.have, ui32);
1525                fireClientGotHave (msgs, ui32);
1526            }
1527            updatePeerProgress (msgs);
1528            break;
1529
1530        case BT_BITFIELD: {
1531            uint8_t * tmp = tr_new (uint8_t, msglen);
1532            dbgmsg (msgs, "got a bitfield");
1533            tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1534            tr_bitfieldSetRaw (&msgs->peer.have, tmp, msglen, tr_torrentHasMetadata (msgs->torrent));
1535            fireClientGotBitfield (msgs, &msgs->peer.have);
1536            updatePeerProgress (msgs);
1537            tr_free (tmp);
1538            break;
1539        }
1540
1541        case BT_REQUEST:
1542        {
1543            struct peer_request r;
1544            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1545            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1546            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1547            dbgmsg (msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length);
1548            peerMadeRequest (msgs, &r);
1549            break;
1550        }
1551
1552        case BT_CANCEL:
1553        {
1554            int i;
1555            struct peer_request r;
1556            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1557            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1558            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1559            tr_historyAdd (&msgs->peer.cancelsSentToClient, tr_time (), 1);
1560            dbgmsg (msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length);
1561
1562            for (i=0; i<msgs->peer.pendingReqsToClient; ++i) {
1563                const struct peer_request * req = msgs->peerAskedFor + i;
1564                if ((req->index == r.index) && (req->offset == r.offset) && (req->length == r.length))
1565                    break;
1566            }
1567
1568            if (i < msgs->peer.pendingReqsToClient)
1569                tr_removeElementFromArray (msgs->peerAskedFor, i, sizeof (struct peer_request),
1570                                           msgs->peer.pendingReqsToClient--);
1571            break;
1572        }
1573
1574        case BT_PIECE:
1575            assert (0); /* handled elsewhere! */
1576            break;
1577
1578        case BT_PORT:
1579            dbgmsg (msgs, "Got a BT_PORT");
1580            tr_peerIoReadUint16 (msgs->io, inbuf, &msgs->dht_port);
1581            if (msgs->dht_port > 0)
1582                tr_dhtAddNode (getSession (msgs),
1583                               tr_peerAddress (&msgs->peer),
1584                               msgs->dht_port, 0);
1585            break;
1586
1587        case BT_FEXT_SUGGEST:
1588            dbgmsg (msgs, "Got a BT_FEXT_SUGGEST");
1589            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1590            if (fext)
1591                fireClientGotSuggest (msgs, ui32);
1592            else {
1593                fireError (msgs, EMSGSIZE);
1594                return READ_ERR;
1595            }
1596            break;
1597
1598        case BT_FEXT_ALLOWED_FAST:
1599            dbgmsg (msgs, "Got a BT_FEXT_ALLOWED_FAST");
1600            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1601            if (fext)
1602                fireClientGotAllowedFast (msgs, ui32);
1603            else {
1604                fireError (msgs, EMSGSIZE);
1605                return READ_ERR;
1606            }
1607            break;
1608
1609        case BT_FEXT_HAVE_ALL:
1610            dbgmsg (msgs, "Got a BT_FEXT_HAVE_ALL");
1611            if (fext) {
1612                tr_bitfieldSetHasAll (&msgs->peer.have);
1613assert (tr_bitfieldHasAll (&msgs->peer.have));
1614                fireClientGotHaveAll (msgs);
1615                updatePeerProgress (msgs);
1616            } else {
1617                fireError (msgs, EMSGSIZE);
1618                return READ_ERR;
1619            }
1620            break;
1621
1622        case BT_FEXT_HAVE_NONE:
1623            dbgmsg (msgs, "Got a BT_FEXT_HAVE_NONE");
1624            if (fext) {
1625                tr_bitfieldSetHasNone (&msgs->peer.have);
1626                fireClientGotHaveNone (msgs);
1627                updatePeerProgress (msgs);
1628            } else {
1629                fireError (msgs, EMSGSIZE);
1630                return READ_ERR;
1631            }
1632            break;
1633
1634        case BT_FEXT_REJECT:
1635        {
1636            struct peer_request r;
1637            dbgmsg (msgs, "Got a BT_FEXT_REJECT");
1638            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1639            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1640            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1641            if (fext)
1642                fireGotRej (msgs, &r);
1643            else {
1644                fireError (msgs, EMSGSIZE);
1645                return READ_ERR;
1646            }
1647            break;
1648        }
1649
1650        case BT_LTEP:
1651            dbgmsg (msgs, "Got a BT_LTEP");
1652            parseLtep (msgs, msglen, inbuf);
1653            break;
1654
1655        default:
1656            dbgmsg (msgs, "peer sent us an UNKNOWN: %d", (int)id);
1657            tr_peerIoDrain (msgs->io, inbuf, msglen);
1658            break;
1659    }
1660
1661    assert (msglen + 1 == msgs->incoming.length);
1662    assert (evbuffer_get_length (inbuf) == startBufLen - msglen);
1663
1664    msgs->state = AWAITING_BT_LENGTH;
1665    return READ_NOW;
1666}
1667
1668/* returns 0 on success, or an errno on failure */
1669static int
1670clientGotBlock (tr_peerMsgs                * msgs,
1671                struct evbuffer            * data,
1672                const struct peer_request  * req)
1673{
1674    int err;
1675    tr_torrent * tor = msgs->torrent;
1676    const tr_block_index_t block = _tr_block (tor, req->index, req->offset);
1677
1678    assert (msgs);
1679    assert (req);
1680
1681    if (req->length != tr_torBlockCountBytes (msgs->torrent, block)) {
1682        dbgmsg (msgs, "wrong block size -- expected %u, got %d",
1683                tr_torBlockCountBytes (msgs->torrent, block), req->length);
1684        return EMSGSIZE;
1685    }
1686
1687    dbgmsg (msgs, "got block %u:%u->%u", req->index, req->offset, req->length);
1688
1689    if (!tr_peerMgrDidPeerRequest (msgs->torrent, &msgs->peer, block)) {
1690        dbgmsg (msgs, "we didn't ask for this message...");
1691        return 0;
1692    }
1693    if (tr_cpPieceIsComplete (&msgs->torrent->completion, req->index)) {
1694        dbgmsg (msgs, "we did ask for this message, but the piece is already complete...");
1695        return 0;
1696    }
1697
1698    /**
1699    ***  Save the block
1700    **/
1701
1702    if ((err = tr_cacheWriteBlock (getSession (msgs)->cache, tor, req->index, req->offset, req->length, data)))
1703        return err;
1704
1705    tr_bitfieldAdd (&msgs->peer.blame, req->index);
1706    fireGotBlock (msgs, req);
1707    return 0;
1708}
1709
1710static int peerPulse (void * vmsgs);
1711
1712static void
1713didWrite (tr_peerIo * io UNUSED, size_t bytesWritten, bool wasPieceData, void * vmsgs)
1714{
1715    tr_peerMsgs * msgs = vmsgs;
1716
1717    if (wasPieceData)
1718      firePeerGotPieceData (msgs, bytesWritten);
1719
1720    if (tr_isPeerIo (io) && io->userData)
1721        peerPulse (msgs);
1722}
1723
1724static ReadState
1725canRead (tr_peerIo * io, void * vmsgs, size_t * piece)
1726{
1727    ReadState         ret;
1728    tr_peerMsgs *     msgs = vmsgs;
1729    struct evbuffer * in = tr_peerIoGetReadBuffer (io);
1730    const size_t      inlen = evbuffer_get_length (in);
1731
1732    dbgmsg (msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state);
1733
1734    if (!inlen)
1735    {
1736        ret = READ_LATER;
1737    }
1738    else if (msgs->state == AWAITING_BT_PIECE)
1739    {
1740        ret = readBtPiece (msgs, in, inlen, piece);
1741    }
1742    else switch (msgs->state)
1743    {
1744        case AWAITING_BT_LENGTH:
1745            ret = readBtLength (msgs, in, inlen); break;
1746
1747        case AWAITING_BT_ID:
1748            ret = readBtId   (msgs, in, inlen); break;
1749
1750        case AWAITING_BT_MESSAGE:
1751            ret = readBtMessage (msgs, in, inlen); break;
1752
1753        default:
1754            ret = READ_ERR;
1755            assert (0);
1756    }
1757
1758    dbgmsg (msgs, "canRead: ret is %d", (int)ret);
1759
1760    return ret;
1761}
1762
1763int
1764tr_peerMsgsIsReadingBlock (const tr_peerMsgs * msgs, tr_block_index_t block)
1765{
1766    if (msgs->state != AWAITING_BT_PIECE)
1767        return false;
1768
1769    return block == _tr_block (msgs->torrent,
1770                               msgs->incoming.blockReq.index,
1771                               msgs->incoming.blockReq.offset);
1772}
1773
1774/**
1775***
1776**/
1777
1778static void
1779updateDesiredRequestCount (tr_peerMsgs * msgs)
1780{
1781    tr_torrent * const torrent = msgs->torrent;
1782
1783    /* there are lots of reasons we might not want to request any blocks... */
1784    if (tr_torrentIsSeed (torrent) || !tr_torrentHasMetadata (torrent)
1785                                    || msgs->client_is_choked
1786                                    || !msgs->client_is_interested)
1787    {
1788        msgs->desiredRequestCount = 0;
1789    }
1790    else
1791    {
1792        int estimatedBlocksInPeriod;
1793        unsigned int rate_Bps;
1794        unsigned int irate_Bps;
1795        const int floor = 4;
1796        const int seconds = REQUEST_BUF_SECS;
1797        const uint64_t now = tr_time_msec ();
1798
1799        /* Get the rate limit we should use.
1800         * FIXME: this needs to consider all the other peers as well... */
1801        rate_Bps = tr_peerGetPieceSpeed_Bps (&msgs->peer, now, TR_PEER_TO_CLIENT);
1802        if (tr_torrentUsesSpeedLimit (torrent, TR_PEER_TO_CLIENT))
1803            rate_Bps = MIN (rate_Bps, tr_torrentGetSpeedLimit_Bps (torrent, TR_PEER_TO_CLIENT));
1804
1805        /* honor the session limits, if enabled */
1806        if (tr_torrentUsesSessionLimits (torrent) &&
1807            tr_sessionGetActiveSpeedLimit_Bps (torrent->session, TR_PEER_TO_CLIENT, &irate_Bps))
1808                rate_Bps = MIN (rate_Bps, irate_Bps);
1809
1810        /* use this desired rate to figure out how
1811         * many requests we should send to this peer */
1812        estimatedBlocksInPeriod = (rate_Bps * seconds) / torrent->blockSize;
1813        msgs->desiredRequestCount = MAX (floor, estimatedBlocksInPeriod);
1814
1815        /* honor the peer's maximum request count, if specified */
1816        if (msgs->reqq > 0)
1817            if (msgs->desiredRequestCount > msgs->reqq)
1818                msgs->desiredRequestCount = msgs->reqq;
1819    }
1820}
1821
1822static void
1823updateMetadataRequests (tr_peerMsgs * msgs, time_t now)
1824{
1825    int piece;
1826
1827    if (msgs->peerSupportsMetadataXfer
1828        && tr_torrentGetNextMetadataRequest (msgs->torrent, now, &piece))
1829    {
1830        tr_variant tmp;
1831        struct evbuffer * payload;
1832        struct evbuffer * out = msgs->outMessages;
1833
1834        /* build the data message */
1835        tr_variantInitDict (&tmp, 3);
1836        tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REQUEST);
1837        tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1838        payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1839
1840        dbgmsg (msgs, "requesting metadata piece #%d", piece);
1841
1842        /* write it out as a LTEP message to our outMessages buffer */
1843        evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1844        evbuffer_add_uint8 (out, BT_LTEP);
1845        evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1846        evbuffer_add_buffer (out, payload);
1847        pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1848        dbgOutMessageLen (msgs);
1849
1850        /* cleanup */
1851        evbuffer_free (payload);
1852        tr_variantFree (&tmp);
1853    }
1854}
1855
1856static void
1857updateBlockRequests (tr_peerMsgs * msgs)
1858{
1859    if (tr_torrentIsPieceTransferAllowed (msgs->torrent, TR_PEER_TO_CLIENT)
1860        && (msgs->desiredRequestCount > 0)
1861        && (msgs->peer.pendingReqsToPeer <= (msgs->desiredRequestCount * 0.66)))
1862    {
1863        int i;
1864        int n;
1865        tr_block_index_t * blocks;
1866        const int numwant = msgs->desiredRequestCount - msgs->peer.pendingReqsToPeer;
1867
1868        assert (tr_peerMsgsIsClientInterested (msgs));
1869        assert (!tr_peerMsgsIsClientChoked (msgs));
1870
1871        blocks = tr_new (tr_block_index_t, numwant);
1872        tr_peerMgrGetNextRequests (msgs->torrent, &msgs->peer, numwant, blocks, &n, false);
1873
1874        for (i=0; i<n; ++i)
1875        {
1876            struct peer_request req;
1877            blockToReq (msgs->torrent, blocks[i], &req);
1878            protocolSendRequest (msgs, &req);
1879        }
1880
1881        tr_free (blocks);
1882    }
1883}
1884
1885static size_t
1886fillOutputBuffer (tr_peerMsgs * msgs, time_t now)
1887{
1888    int piece;
1889    size_t bytesWritten = 0;
1890    struct peer_request req;
1891    const bool haveMessages = evbuffer_get_length (msgs->outMessages) != 0;
1892    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1893
1894    /**
1895    ***  Protocol messages
1896    **/
1897
1898    if (haveMessages && !msgs->outMessagesBatchedAt) /* fresh batch */
1899    {
1900        dbgmsg (msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length (msgs->outMessages));
1901        msgs->outMessagesBatchedAt = now;
1902    }
1903    else if (haveMessages && ((now - msgs->outMessagesBatchedAt) >= msgs->outMessagesBatchPeriod))
1904    {
1905        const size_t len = evbuffer_get_length (msgs->outMessages);
1906        /* flush the protocol messages */
1907        dbgmsg (msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len);
1908        tr_peerIoWriteBuf (msgs->io, msgs->outMessages, false);
1909        msgs->clientSentAnythingAt = now;
1910        msgs->outMessagesBatchedAt = 0;
1911        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1912        bytesWritten +=  len;
1913    }
1914
1915    /**
1916    ***  Metadata Pieces
1917    **/
1918
1919    if ((tr_peerIoGetWriteBufferSpace (msgs->io, now) >= METADATA_PIECE_SIZE)
1920        && popNextMetadataRequest (msgs, &piece))
1921    {
1922        char * data;
1923        int dataLen;
1924        bool ok = false;
1925
1926        data = tr_torrentGetMetadataPiece (msgs->torrent, piece, &dataLen);
1927        if ((dataLen > 0) && (data != NULL))
1928        {
1929            tr_variant tmp;
1930            struct evbuffer * payload;
1931            struct evbuffer * out = msgs->outMessages;
1932
1933            /* build the data message */
1934            tr_variantInitDict (&tmp, 3);
1935            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_DATA);
1936            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1937            tr_variantDictAddInt (&tmp, TR_KEY_total_size, msgs->torrent->infoDictLength);
1938            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1939
1940            /* write it out as a LTEP message to our outMessages buffer */
1941            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload) + dataLen);
1942            evbuffer_add_uint8 (out, BT_LTEP);
1943            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1944            evbuffer_add_buffer (out, payload);
1945            evbuffer_add     (out, data, dataLen);
1946            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1947            dbgOutMessageLen (msgs);
1948
1949            evbuffer_free (payload);
1950            tr_variantFree (&tmp);
1951            tr_free (data);
1952
1953            ok = true;
1954        }
1955
1956        if (!ok) /* send a rejection message */
1957        {
1958            tr_variant tmp;
1959            struct evbuffer * payload;
1960            struct evbuffer * out = msgs->outMessages;
1961
1962            /* build the rejection message */
1963            tr_variantInitDict (&tmp, 2);
1964            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
1965            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1966            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1967
1968            /* write it out as a LTEP message to our outMessages buffer */
1969            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1970            evbuffer_add_uint8 (out, BT_LTEP);
1971            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1972            evbuffer_add_buffer (out, payload);
1973            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1974            dbgOutMessageLen (msgs);
1975
1976            evbuffer_free (payload);
1977            tr_variantFree (&tmp);
1978        }
1979    }
1980
1981    /**
1982    ***  Data Blocks
1983    **/
1984
1985    if ((tr_peerIoGetWriteBufferSpace (msgs->io, now) >= msgs->torrent->blockSize)
1986        && popNextRequest (msgs, &req))
1987    {
1988        --msgs->prefetchCount;
1989
1990        if (requestIsValid (msgs, &req)
1991            && tr_cpPieceIsComplete (&msgs->torrent->completion, req.index))
1992        {
1993            int err;
1994            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1995            struct evbuffer * out;
1996            struct evbuffer_iovec iovec[1];
1997
1998            out = evbuffer_new ();
1999            evbuffer_expand (out, msglen);
2000
2001            evbuffer_add_uint32 (out, sizeof (uint8_t) + 2 * sizeof (uint32_t) + req.length);
2002            evbuffer_add_uint8 (out, BT_PIECE);
2003            evbuffer_add_uint32 (out, req.index);
2004            evbuffer_add_uint32 (out, req.offset);
2005
2006            evbuffer_reserve_space (out, req.length, iovec, 1);
2007            err = tr_cacheReadBlock (getSession (msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base);
2008            iovec[0].iov_len = req.length;
2009            evbuffer_commit_space (out, iovec, 1);
2010
2011            /* check the piece if it needs checking... */
2012            if (!err && tr_torrentPieceNeedsCheck (msgs->torrent, req.index))
2013                if ((err = !tr_torrentCheckPiece (msgs->torrent, req.index)))
2014                    tr_torrentSetLocalError (msgs->torrent, _("Please Verify Local Data! Piece #%zu is corrupt."), (size_t)req.index);
2015
2016            if (err)
2017            {
2018                if (fext)
2019                    protocolSendReject (msgs, &req);
2020            }
2021            else
2022            {
2023                const size_t n = evbuffer_get_length (out);
2024                dbgmsg (msgs, "sending block %u:%u->%u", req.index, req.offset, req.length);
2025                assert (n == msglen);
2026                tr_peerIoWriteBuf (msgs->io, out, true);
2027                bytesWritten += n;
2028                msgs->clientSentAnythingAt = now;
2029                tr_historyAdd (&msgs->peer.blocksSentToPeer, tr_time (), 1);
2030            }
2031
2032            evbuffer_free (out);
2033
2034            if (err)
2035            {
2036                bytesWritten = 0;
2037                msgs = NULL;
2038            }
2039        }
2040        else if (fext) /* peer needs a reject message */
2041        {
2042            protocolSendReject (msgs, &req);
2043        }
2044
2045        if (msgs != NULL)
2046            prefetchPieces (msgs);
2047    }
2048
2049    /**
2050    ***  Keepalive
2051    **/
2052
2053    if ((msgs != NULL)
2054        && (msgs->clientSentAnythingAt != 0)
2055        && ((now - msgs->clientSentAnythingAt) > KEEPALIVE_INTERVAL_SECS))
2056    {
2057        dbgmsg (msgs, "sending a keepalive message");
2058        evbuffer_add_uint32 (msgs->outMessages, 0);
2059        pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2060    }
2061
2062    return bytesWritten;
2063}
2064
2065static int
2066peerPulse (void * vmsgs)
2067{
2068    tr_peerMsgs * msgs = vmsgs;
2069    const time_t  now = tr_time ();
2070
2071    if (tr_isPeerIo (msgs->io)) {
2072        updateDesiredRequestCount (msgs);
2073        updateBlockRequests (msgs);
2074        updateMetadataRequests (msgs, now);
2075    }
2076
2077    for (;;)
2078        if (fillOutputBuffer (msgs, now) < 1)
2079            break;
2080
2081    return true; /* loop forever */
2082}
2083
2084void
2085tr_peerMsgsPulse (tr_peerMsgs * msgs)
2086{
2087    if (msgs != NULL)
2088        peerPulse (msgs);
2089}
2090
2091static void
2092gotError (tr_peerIo * io UNUSED, short what, void * vmsgs)
2093{
2094    if (what & BEV_EVENT_TIMEOUT)
2095        dbgmsg (vmsgs, "libevent got a timeout, what=%hd", what);
2096    if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR))
2097        dbgmsg (vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2098               what, errno, tr_strerror (errno));
2099    fireError (vmsgs, ENOTCONN);
2100}
2101
2102static void
2103sendBitfield (tr_peerMsgs * msgs)
2104{
2105    void * bytes;
2106    size_t byte_count = 0;
2107    struct evbuffer * out = msgs->outMessages;
2108
2109    assert (tr_torrentHasMetadata (msgs->torrent));
2110
2111    bytes = tr_cpCreatePieceBitfield (&msgs->torrent->completion, &byte_count);
2112    evbuffer_add_uint32 (out, sizeof (uint8_t) + byte_count);
2113    evbuffer_add_uint8 (out, BT_BITFIELD);
2114    evbuffer_add     (out, bytes, byte_count);
2115    dbgmsg (msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length (out));
2116    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2117
2118    tr_free (bytes);
2119}
2120
2121static void
2122tellPeerWhatWeHave (tr_peerMsgs * msgs)
2123{
2124    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
2125
2126    if (fext && tr_cpHasAll (&msgs->torrent->completion))
2127    {
2128        protocolSendHaveAll (msgs);
2129    }
2130    else if (fext && tr_cpHasNone (&msgs->torrent->completion))
2131    {
2132        protocolSendHaveNone (msgs);
2133    }
2134    else if (!tr_cpHasNone (&msgs->torrent->completion))
2135    {
2136        sendBitfield (msgs);
2137    }
2138}
2139
2140/**
2141***
2142**/
2143
2144/* some peers give us error messages if we send
2145   more than this many peers in a single pex message
2146   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2147#define MAX_PEX_ADDED 50
2148#define MAX_PEX_DROPPED 50
2149
2150typedef struct
2151{
2152    tr_pex *  added;
2153    tr_pex *  dropped;
2154    tr_pex *  elements;
2155    int       addedCount;
2156    int       droppedCount;
2157    int       elementCount;
2158}
2159PexDiffs;
2160
2161static void
2162pexAddedCb (void * vpex, void * userData)
2163{
2164    PexDiffs * diffs = userData;
2165    tr_pex *   pex = vpex;
2166
2167    if (diffs->addedCount < MAX_PEX_ADDED)
2168    {
2169        diffs->added[diffs->addedCount++] = *pex;
2170        diffs->elements[diffs->elementCount++] = *pex;
2171    }
2172}
2173
2174static inline void
2175pexDroppedCb (void * vpex, void * userData)
2176{
2177    PexDiffs * diffs = userData;
2178    tr_pex *   pex = vpex;
2179
2180    if (diffs->droppedCount < MAX_PEX_DROPPED)
2181    {
2182        diffs->dropped[diffs->droppedCount++] = *pex;
2183    }
2184}
2185
2186static inline void
2187pexElementCb (void * vpex, void * userData)
2188{
2189    PexDiffs * diffs = userData;
2190    tr_pex * pex = vpex;
2191
2192    diffs->elements[diffs->elementCount++] = *pex;
2193}
2194
2195typedef void (tr_set_func)(void * element, void * userData);
2196
2197/**
2198 * @brief find the differences and commonalities in two sorted sets
2199 * @param a the first set
2200 * @param aCount the number of elements in the set 'a'
2201 * @param b the second set
2202 * @param bCount the number of elements in the set 'b'
2203 * @param compare the sorting method for both sets
2204 * @param elementSize the sizeof the element in the two sorted sets
2205 * @param in_a called for items in set 'a' but not set 'b'
2206 * @param in_b called for items in set 'b' but not set 'a'
2207 * @param in_both called for items that are in both sets
2208 * @param userData user data passed along to in_a, in_b, and in_both
2209 */
2210static void
2211tr_set_compare (const void * va, size_t aCount,
2212                const void * vb, size_t bCount,
2213                int compare (const void * a, const void * b),
2214                size_t elementSize,
2215                tr_set_func in_a_cb,
2216                tr_set_func in_b_cb,
2217                tr_set_func in_both_cb,
2218                void * userData)
2219{
2220    const uint8_t * a = va;
2221    const uint8_t * b = vb;
2222    const uint8_t * aend = a + elementSize * aCount;
2223    const uint8_t * bend = b + elementSize * bCount;
2224
2225    while (a != aend || b != bend)
2226    {
2227        if (a == aend)
2228        {
2229          (*in_b_cb)((void*)b, userData);
2230            b += elementSize;
2231        }
2232        else if (b == bend)
2233        {
2234          (*in_a_cb)((void*)a, userData);
2235            a += elementSize;
2236        }
2237        else
2238        {
2239            const int val = (*compare)(a, b);
2240
2241            if (!val)
2242            {
2243              (*in_both_cb)((void*)a, userData);
2244                a += elementSize;
2245                b += elementSize;
2246            }
2247            else if (val < 0)
2248            {
2249              (*in_a_cb)((void*)a, userData);
2250                a += elementSize;
2251            }
2252            else if (val > 0)
2253            {
2254              (*in_b_cb)((void*)b, userData);
2255                b += elementSize;
2256            }
2257        }
2258    }
2259}
2260
2261
2262static void
2263sendPex (tr_peerMsgs * msgs)
2264{
2265    if (msgs->peerSupportsPex && tr_torrentAllowsPex (msgs->torrent))
2266    {
2267        PexDiffs diffs;
2268        PexDiffs diffs6;
2269        tr_pex * newPex = NULL;
2270        tr_pex * newPex6 = NULL;
2271        const int newCount = tr_peerMgrGetPeers (msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2272        const int newCount6 = tr_peerMgrGetPeers (msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2273
2274        /* build the diffs */
2275        diffs.added = tr_new (tr_pex, newCount);
2276        diffs.addedCount = 0;
2277        diffs.dropped = tr_new (tr_pex, msgs->pexCount);
2278        diffs.droppedCount = 0;
2279        diffs.elements = tr_new (tr_pex, newCount + msgs->pexCount);
2280        diffs.elementCount = 0;
2281        tr_set_compare (msgs->pex, msgs->pexCount,
2282                        newPex, newCount,
2283                        tr_pexCompare, sizeof (tr_pex),
2284                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs);
2285        diffs6.added = tr_new (tr_pex, newCount6);
2286        diffs6.addedCount = 0;
2287        diffs6.dropped = tr_new (tr_pex, msgs->pexCount6);
2288        diffs6.droppedCount = 0;
2289        diffs6.elements = tr_new (tr_pex, newCount6 + msgs->pexCount6);
2290        diffs6.elementCount = 0;
2291        tr_set_compare (msgs->pex6, msgs->pexCount6,
2292                        newPex6, newCount6,
2293                        tr_pexCompare, sizeof (tr_pex),
2294                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6);
2295        dbgmsg (
2296            msgs,
2297            "pex: old peer count %d+%d, new peer count %d+%d, "
2298            "added %d+%d, removed %d+%d",
2299            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2300            diffs.addedCount, diffs6.addedCount,
2301            diffs.droppedCount, diffs6.droppedCount);
2302
2303        if (!diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2304            !diffs6.droppedCount)
2305        {
2306            tr_free (diffs.elements);
2307            tr_free (diffs6.elements);
2308        }
2309        else
2310        {
2311            int  i;
2312            tr_variant val;
2313            uint8_t * tmp, *walk;
2314            struct evbuffer * payload;
2315            struct evbuffer * out = msgs->outMessages;
2316
2317            /* update peer */
2318            tr_free (msgs->pex);
2319            msgs->pex = diffs.elements;
2320            msgs->pexCount = diffs.elementCount;
2321            tr_free (msgs->pex6);
2322            msgs->pex6 = diffs6.elements;
2323            msgs->pexCount6 = diffs6.elementCount;
2324
2325            /* build the pex payload */
2326            tr_variantInitDict (&val, 3); /* ipv6 support: left as 3:
2327                                         * speed vs. likelihood? */
2328
2329            if (diffs.addedCount > 0)
2330            {
2331                /* "added" */
2332                tmp = walk = tr_new (uint8_t, diffs.addedCount * 6);
2333                for (i = 0; i < diffs.addedCount; ++i) {
2334                    memcpy (walk, &diffs.added[i].addr.addr, 4); walk += 4;
2335                    memcpy (walk, &diffs.added[i].port, 2); walk += 2;
2336                }
2337                assert ((walk - tmp) == diffs.addedCount * 6);
2338                tr_variantDictAddRaw (&val, TR_KEY_added, tmp, walk - tmp);
2339                tr_free (tmp);
2340
2341                /* "added.f"
2342                 * unset each holepunch flag because we don't support it. */
2343                tmp = walk = tr_new (uint8_t, diffs.addedCount);
2344                for (i = 0; i < diffs.addedCount; ++i)
2345                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2346                assert ((walk - tmp) == diffs.addedCount);
2347                tr_variantDictAddRaw (&val, TR_KEY_added_f, tmp, walk - tmp);
2348                tr_free (tmp);
2349            }
2350
2351            if (diffs.droppedCount > 0)
2352            {
2353                /* "dropped" */
2354                tmp = walk = tr_new (uint8_t, diffs.droppedCount * 6);
2355                for (i = 0; i < diffs.droppedCount; ++i) {
2356                    memcpy (walk, &diffs.dropped[i].addr.addr, 4); walk += 4;
2357                    memcpy (walk, &diffs.dropped[i].port, 2); walk += 2;
2358                }
2359                assert ((walk - tmp) == diffs.droppedCount * 6);
2360                tr_variantDictAddRaw (&val, TR_KEY_dropped, tmp, walk - tmp);
2361                tr_free (tmp);
2362            }
2363
2364            if (diffs6.addedCount > 0)
2365            {
2366                /* "added6" */
2367                tmp = walk = tr_new (uint8_t, diffs6.addedCount * 18);
2368                for (i = 0; i < diffs6.addedCount; ++i) {
2369                    memcpy (walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16);
2370                    walk += 16;
2371                    memcpy (walk, &diffs6.added[i].port, 2);
2372                    walk += 2;
2373                }
2374                assert ((walk - tmp) == diffs6.addedCount * 18);
2375                tr_variantDictAddRaw (&val, TR_KEY_added6, tmp, walk - tmp);
2376                tr_free (tmp);
2377
2378                /* "added6.f"
2379                 * unset each holepunch flag because we don't support it. */
2380                tmp = walk = tr_new (uint8_t, diffs6.addedCount);
2381                for (i = 0; i < diffs6.addedCount; ++i)
2382                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2383                assert ((walk - tmp) == diffs6.addedCount);
2384                tr_variantDictAddRaw (&val, TR_KEY_added6_f, tmp, walk - tmp);
2385                tr_free (tmp);
2386            }
2387
2388            if (diffs6.droppedCount > 0)
2389            {
2390                /* "dropped6" */
2391                tmp = walk = tr_new (uint8_t, diffs6.droppedCount * 18);
2392                for (i = 0; i < diffs6.droppedCount; ++i) {
2393                    memcpy (walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16);
2394                    walk += 16;
2395                    memcpy (walk, &diffs6.dropped[i].port, 2);
2396                    walk += 2;
2397                }
2398                assert ((walk - tmp) == diffs6.droppedCount * 18);
2399                tr_variantDictAddRaw (&val, TR_KEY_dropped6, tmp, walk - tmp);
2400                tr_free (tmp);
2401            }
2402
2403            /* write the pex message */
2404            payload = tr_variantToBuf (&val, TR_VARIANT_FMT_BENC);
2405            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
2406            evbuffer_add_uint8 (out, BT_LTEP);
2407            evbuffer_add_uint8 (out, msgs->ut_pex_id);
2408            evbuffer_add_buffer (out, payload);
2409            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
2410            dbgmsg (msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length (out));
2411            dbgOutMessageLen (msgs);
2412
2413            evbuffer_free (payload);
2414            tr_variantFree (&val);
2415        }
2416
2417        /* cleanup */
2418        tr_free (diffs.added);
2419        tr_free (diffs.dropped);
2420        tr_free (newPex);
2421        tr_free (diffs6.added);
2422        tr_free (diffs6.dropped);
2423        tr_free (newPex6);
2424
2425        /*msgs->clientSentPexAt = tr_time ();*/
2426    }
2427}
2428
2429static void
2430pexPulse (int foo UNUSED, short bar UNUSED, void * vmsgs)
2431{
2432    struct tr_peerMsgs * msgs = vmsgs;
2433
2434    sendPex (msgs);
2435
2436    assert (msgs->pexTimer != NULL);
2437    tr_timerAdd (msgs->pexTimer, PEX_INTERVAL_SECS, 0);
2438}
2439
2440/***
2441****  tr_peer virtual functions
2442***/
2443
2444static bool
2445peermsgs_is_transferring_pieces (const struct tr_peer * peer,
2446                                 uint64_t               now,
2447                                 tr_direction           direction, 
2448                                 unsigned int         * setme_Bps)
2449{
2450  unsigned int Bps = 0;
2451
2452  if (tr_isPeerMsgs (peer))
2453    {
2454      const tr_peerMsgs * msgs = (const tr_peerMsgs *) peer;
2455      Bps = tr_peerIoGetPieceSpeed_Bps (msgs->io, now, direction);
2456    }
2457
2458  if (setme_Bps != NULL)
2459    *setme_Bps = Bps;
2460
2461  return Bps > 0;
2462}
2463
2464static void
2465peermsgs_destruct (tr_peer * peer)
2466{
2467  tr_peerMsgs * msgs = PEER_MSGS (peer);
2468
2469  assert (msgs != NULL);
2470
2471  tr_peerMsgsSetActive (msgs, TR_UP, false);
2472  tr_peerMsgsSetActive (msgs, TR_DOWN, false);
2473
2474  if (msgs->pexTimer != NULL)
2475    event_free (msgs->pexTimer);
2476
2477  if (msgs->incoming.block != NULL)
2478    evbuffer_free (msgs->incoming.block);
2479
2480  if (msgs->io)
2481    {
2482      tr_peerIoClear (msgs->io);
2483      tr_peerIoUnref (msgs->io); /* balanced by the ref in handshakeDoneCB () */
2484    }
2485
2486  evbuffer_free (msgs->outMessages);
2487  tr_free (msgs->pex6);
2488  tr_free (msgs->pex);
2489
2490  tr_peerDestruct (&msgs->peer);
2491
2492  memset (msgs, ~0, sizeof (tr_peerMsgs));
2493}
2494
2495static const struct tr_peer_virtual_funcs my_funcs =
2496{
2497  .destruct = peermsgs_destruct,
2498  .is_transferring_pieces = peermsgs_is_transferring_pieces,
2499};
2500
2501/***
2502****
2503***/
2504
2505time_t
2506tr_peerMsgsGetConnectionAge (const tr_peerMsgs * msgs)
2507{
2508  assert (tr_isPeerMsgs (msgs));
2509
2510  return tr_peerIoGetAge (msgs->io);
2511}
2512
2513bool
2514tr_peerMsgsIsPeerChoked (const tr_peerMsgs * msgs)
2515{
2516  assert (tr_isPeerMsgs (msgs));
2517
2518  return msgs->peer_is_choked;
2519}
2520
2521bool
2522tr_peerMsgsIsPeerInterested (const tr_peerMsgs * msgs)
2523{
2524  assert (tr_isPeerMsgs (msgs));
2525
2526  return msgs->peer_is_interested;
2527}
2528
2529bool
2530tr_peerMsgsIsClientChoked (const tr_peerMsgs * msgs)
2531{
2532  assert (tr_isPeerMsgs (msgs));
2533
2534  return msgs->client_is_choked;
2535}
2536
2537bool
2538tr_peerMsgsIsClientInterested (const tr_peerMsgs * msgs)
2539{
2540  assert (tr_isPeerMsgs (msgs));
2541
2542  return msgs->client_is_interested;
2543}
2544
2545bool
2546tr_peerMsgsIsUtpConnection (const tr_peerMsgs * msgs)
2547{
2548  assert (tr_isPeerMsgs (msgs));
2549
2550  return msgs->io->utp_socket != NULL;
2551}
2552
2553bool
2554tr_peerMsgsIsEncrypted (const tr_peerMsgs * msgs)
2555{
2556  assert (tr_isPeerMsgs (msgs));
2557
2558  return tr_peerIoIsEncrypted (msgs->io);
2559}
2560
2561bool
2562tr_peerMsgsIsIncomingConnection (const tr_peerMsgs * msgs)
2563{
2564  assert (tr_isPeerMsgs (msgs));
2565
2566  return tr_peerIoIsIncoming (msgs->io);
2567}
2568
2569/***
2570****
2571***/
2572
2573bool
2574tr_isPeerMsgs (const void * msgs)
2575{
2576  /* FIXME: this is pretty crude */
2577  return (msgs != NULL)
2578      && (((struct tr_peerMsgs*)msgs)->magic_number == MAGIC_NUMBER);
2579}
2580
2581tr_peerMsgs *
2582tr_peerMsgsCast (void * vm)
2583{
2584  return tr_isPeerMsgs(vm) ? vm : NULL;
2585}
2586
2587tr_peerMsgs *
2588tr_peerMsgsNew (struct tr_torrent    * torrent,
2589                struct tr_peerIo     * io,
2590                tr_peer_callback     * callback,
2591                void                 * callbackData)
2592{
2593  tr_peerMsgs * m;
2594
2595  assert (io != NULL);
2596
2597  m = tr_new0 (tr_peerMsgs, 1);
2598
2599  tr_peerConstruct (&m->peer, torrent);
2600  m->peer.funcs = &my_funcs;
2601
2602  m->magic_number = MAGIC_NUMBER;
2603  m->client_is_choked = true;
2604  m->peer_is_choked = true;
2605  m->client_is_interested = false;
2606  m->peer_is_interested = false;
2607  m->is_active[TR_UP] = false;
2608  m->is_active[TR_DOWN] = false;
2609  m->callback = callback;
2610  m->callbackData = callbackData;
2611  m->io = io;
2612  m->torrent = torrent;
2613  m->state = AWAITING_BT_LENGTH;
2614  m->outMessages = evbuffer_new ();
2615  m->outMessagesBatchedAt = 0;
2616  m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2617
2618  if (tr_torrentAllowsPex (torrent))
2619    {
2620      m->pexTimer = evtimer_new (torrent->session->event_base, pexPulse, m);
2621      tr_timerAdd (m->pexTimer, PEX_INTERVAL_SECS, 0);
2622    }
2623
2624  if (tr_peerIoSupportsUTP (m->io))
2625    {
2626      const tr_address * addr = tr_peerIoGetAddress (m->io, NULL);
2627      tr_peerMgrSetUtpSupported (torrent, addr);
2628      tr_peerMgrSetUtpFailed (torrent, addr, false);
2629    }
2630
2631  if (tr_peerIoSupportsLTEP (m->io))
2632    sendLtepHandshake (m);
2633
2634  tellPeerWhatWeHave (m);
2635
2636  if (tr_dhtEnabled (torrent->session) && tr_peerIoSupportsDHT (m->io))
2637    {
2638      /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2639      const struct tr_address *addr = tr_peerIoGetAddress (m->io, NULL);
2640      if (addr->type == TR_AF_INET || tr_globalIPv6 ())
2641        protocolSendPort (m, tr_dhtPort (torrent->session));
2642    }
2643
2644  tr_peerIoSetIOFuncs (m->io, canRead, didWrite, gotError, m);
2645  updateDesiredRequestCount (m);
2646
2647  return m;
2648}
Note: See TracBrowser for help on using the repository browser.