source: trunk/libtransmission/peer-msgs.c @ 14076

Last change on this file since 14076 was 14076, checked in by jordan, 9 years ago

undo previous commit's accidental inclusion of changes to peer-msgs.* and peer-mgr.c

  • Property svn:keywords set to Date Rev Author Id
File size: 72.8 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2 (b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 14076 2013-05-22 19:06:54Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdarg.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <event2/buffer.h>
20#include <event2/bufferevent.h>
21#include <event2/event.h>
22
23#include "transmission.h"
24#include "cache.h"
25#include "completion.h"
26#include "crypto.h" /* tr_sha1 () */
27#include "log.h"
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-msgs.h"
31#include "session.h"
32#include "torrent.h"
33#include "torrent-magnet.h"
34#include "tr-dht.h"
35#include "utils.h"
36#include "variant.h"
37#include "version.h"
38
39/**
40***
41**/
42
43enum
44{
45  BT_CHOKE                = 0,
46  BT_UNCHOKE              = 1,
47  BT_INTERESTED           = 2,
48  BT_NOT_INTERESTED       = 3,
49  BT_HAVE                 = 4,
50  BT_BITFIELD             = 5,
51  BT_REQUEST              = 6,
52  BT_PIECE                = 7,
53  BT_CANCEL               = 8,
54  BT_PORT                 = 9,
55
56  BT_FEXT_SUGGEST         = 13,
57  BT_FEXT_HAVE_ALL        = 14,
58  BT_FEXT_HAVE_NONE       = 15,
59  BT_FEXT_REJECT          = 16,
60  BT_FEXT_ALLOWED_FAST    = 17,
61
62  BT_LTEP                 = 20,
63
64  LTEP_HANDSHAKE          = 0,
65
66  UT_PEX_ID               = 1,
67  UT_METADATA_ID          = 3,
68
69  MAX_PEX_PEER_COUNT      = 50,
70
71  MIN_CHOKE_PERIOD_SEC    = 10,
72
73  /* idle seconds before we send a keepalive */
74  KEEPALIVE_INTERVAL_SECS = 100,
75
76  PEX_INTERVAL_SECS       = 90, /* sec between sendPex () calls */
77
78  REQQ                    = 512,
79
80  METADATA_REQQ           = 64,
81
82  MAGIC_NUMBER            = 21549,
83
84  /* used in lowering the outMessages queue period */
85  IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
86  HIGH_PRIORITY_INTERVAL_SECS = 2,
87  LOW_PRIORITY_INTERVAL_SECS = 10,
88
89  /* number of pieces we'll allow in our fast set */
90  MAX_FAST_SET_SIZE = 3,
91
92  /* how many blocks to keep prefetched per peer */
93  PREFETCH_SIZE = 18,
94
95  /* when we're making requests from another peer,
96     batch them together to send enough requests to
97     meet our bandwidth goals for the next N seconds */
98  REQUEST_BUF_SECS = 10,
99
100  /* defined in BEP #9 */
101  METADATA_MSG_TYPE_REQUEST = 0,
102  METADATA_MSG_TYPE_DATA = 1,
103  METADATA_MSG_TYPE_REJECT = 2
104};
105
106enum
107{
108  AWAITING_BT_LENGTH,
109  AWAITING_BT_ID,
110  AWAITING_BT_MESSAGE,
111  AWAITING_BT_PIECE
112};
113
114typedef enum
115{
116  ENCRYPTION_PREFERENCE_UNKNOWN,
117  ENCRYPTION_PREFERENCE_YES,
118  ENCRYPTION_PREFERENCE_NO
119}
120encryption_preference_t;
121
122/**
123***
124**/
125
126struct peer_request
127{
128  uint32_t index;
129  uint32_t offset;
130  uint32_t length;
131};
132
133static void
134blockToReq (const tr_torrent     * tor,
135            tr_block_index_t       block,
136            struct peer_request  * setme)
137{
138  tr_torrentGetBlockLocation (tor, block, &setme->index,
139                                          &setme->offset,
140                                          &setme->length);
141}
142
143/**
144***
145**/
146
147/* this is raw, unchanged data from the peer regarding
148 * the current message that it's sending us. */
149struct tr_incoming
150{
151  uint8_t                id;
152  uint32_t               length; /* includes the +1 for id length */
153  struct peer_request    blockReq; /* metadata for incoming blocks */
154  struct evbuffer      * block; /* piece data for incoming blocks */
155};
156
157/**
158 * Low-level communication state information about a connected peer.
159 *
160 * This structure remembers the low-level protocol states that we're
161 * in with this peer, such as active requests, pex messages, and so on.
162 * Its fields are all private to peer-msgs.c.
163 *
164 * Data not directly involved with sending & receiving messages is
165 * stored in tr_peer, where it can be accessed by both peermsgs and
166 * the peer manager.
167 *
168 * @see struct peer_atom
169 * @see tr_peer
170 */
171struct tr_peerMsgs
172{
173  struct tr_peer peer; /* parent */
174
175  uint16_t magic_number;
176
177  /* Whether or not we've choked this peer. */
178  bool peer_is_choked;
179
180  /* whether or not the peer has indicated it will download from us. */
181  bool peer_is_interested;
182
183  /* whether or the peer is choking us. */
184  bool client_is_choked;
185
186  /* whether or not we've indicated to the peer that we would download from them if unchoked. */
187  bool client_is_interested;
188
189
190  bool peerSupportsPex;
191  bool peerSupportsMetadataXfer;
192  bool clientSentLtepHandshake;
193  bool peerSentLtepHandshake;
194
195  /*bool haveFastSet;*/
196
197  int desiredRequestCount;
198
199  int prefetchCount;
200
201  /* how long the outMessages batch should be allowed to grow before
202   * it's flushed -- some messages (like requests >:) should be sent
203   * very quickly; others aren't as urgent. */
204  int8_t          outMessagesBatchPeriod;
205
206  uint8_t         state;
207  uint8_t         ut_pex_id;
208  uint8_t         ut_metadata_id;
209  uint16_t        pexCount;
210  uint16_t        pexCount6;
211
212  tr_port         dht_port;
213
214  encryption_preference_t  encryption_preference;
215
216  size_t                   metadata_size_hint;
217#if 0
218  size_t                 fastsetSize;
219  tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
220#endif
221
222  tr_torrent *           torrent;
223
224  tr_peer_callback      * callback;
225  void                  * callbackData;
226
227  struct evbuffer *      outMessages; /* all the non-piece messages */
228
229  struct peer_request    peerAskedFor[REQQ];
230
231  int peerAskedForMetadata[METADATA_REQQ];
232  int peerAskedForMetadataCount;
233
234  tr_pex * pex;
235  tr_pex * pex6;
236
237  /*time_t clientSentPexAt;*/
238  time_t clientSentAnythingAt;
239
240  time_t chokeChangedAt;
241
242  /* when we started batching the outMessages */
243  time_t outMessagesBatchedAt;
244
245  struct tr_incoming    incoming;
246
247  /* if the peer supports the Extension Protocol in BEP 10 and
248     supplied a reqq argument, it's stored here. Otherwise, the
249     value is zero and should be ignored. */
250  int64_t reqq;
251
252  struct event * pexTimer;
253
254  struct tr_peerIo * io;
255};
256
257/**
258***
259**/
260
261static inline tr_session*
262getSession (struct tr_peerMsgs * msgs)
263{
264  return msgs->torrent->session;
265}
266
267/**
268***
269**/
270
271static void
272myDebug (const char * file, int line,
273         const struct tr_peerMsgs * msgs,
274         const char * fmt, ...)
275{
276  FILE * fp = tr_logGetFile ();
277
278  if (fp)
279    {
280      va_list           args;
281      char              timestr[64];
282      struct evbuffer * buf = evbuffer_new ();
283      char *            base = tr_basename (file);
284      char *            message;
285
286      evbuffer_add_printf (buf, "[%s] %s - %s [%s]: ",
287                           tr_logGetTimeStr (timestr, sizeof (timestr)),
288                           tr_torrentName (msgs->torrent),
289                           tr_peerIoGetAddrStr (msgs->io),
290                           tr_quark_get_string (msgs->peer.client, NULL));
291      va_start (args, fmt);
292      evbuffer_add_vprintf (buf, fmt, args);
293      va_end (args);
294      evbuffer_add_printf (buf, " (%s:%d)\n", base, line);
295
296      message = evbuffer_free_to_str (buf);
297      fputs (message, fp);
298
299      tr_free (base);
300      tr_free (message);
301    }
302}
303
304#define dbgmsg(msgs, ...) \
305  do \
306    { \
307      if (tr_logGetDeepEnabled ()) \
308        myDebug (__FILE__, __LINE__, msgs, __VA_ARGS__); \
309    } \
310  while (0)
311
312/**
313***
314**/
315
316static void
317pokeBatchPeriod (tr_peerMsgs * msgs, int interval)
318{
319  if (msgs->outMessagesBatchPeriod > interval)
320    {
321      msgs->outMessagesBatchPeriod = interval;
322      dbgmsg (msgs, "lowering batch interval to %d seconds", interval);
323    }
324}
325
326static void
327dbgOutMessageLen (tr_peerMsgs * msgs)
328{
329  dbgmsg (msgs, "outMessage size is now %zu", evbuffer_get_length (msgs->outMessages));
330}
331
332static void
333protocolSendReject (tr_peerMsgs * msgs, const struct peer_request * req)
334{
335  struct evbuffer * out = msgs->outMessages;
336
337  assert (tr_peerIoSupportsFEXT (msgs->io));
338
339  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
340  evbuffer_add_uint8 (out, BT_FEXT_REJECT);
341  evbuffer_add_uint32 (out, req->index);
342  evbuffer_add_uint32 (out, req->offset);
343  evbuffer_add_uint32 (out, req->length);
344
345  dbgmsg (msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length);
346  dbgOutMessageLen (msgs);
347}
348
349static void
350protocolSendRequest (tr_peerMsgs * msgs, const struct peer_request * req)
351{
352  struct evbuffer * out = msgs->outMessages;
353
354  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
355  evbuffer_add_uint8 (out, BT_REQUEST);
356  evbuffer_add_uint32 (out, req->index);
357  evbuffer_add_uint32 (out, req->offset);
358  evbuffer_add_uint32 (out, req->length);
359
360  dbgmsg (msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length);
361  dbgOutMessageLen (msgs);
362  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
363}
364
365static void
366protocolSendCancel (tr_peerMsgs * msgs, const struct peer_request * req)
367{
368  struct evbuffer * out = msgs->outMessages;
369
370  evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t));
371  evbuffer_add_uint8 (out, BT_CANCEL);
372  evbuffer_add_uint32 (out, req->index);
373  evbuffer_add_uint32 (out, req->offset);
374  evbuffer_add_uint32 (out, req->length);
375
376  dbgmsg (msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length);
377  dbgOutMessageLen (msgs);
378  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
379}
380
381static void
382protocolSendPort (tr_peerMsgs *msgs, uint16_t port)
383{
384  struct evbuffer * out = msgs->outMessages;
385
386  dbgmsg (msgs, "sending Port %u", port);
387  evbuffer_add_uint32 (out, 3);
388  evbuffer_add_uint8 (out, BT_PORT);
389  evbuffer_add_uint16 (out, port);
390}
391
392static void
393protocolSendHave (tr_peerMsgs * msgs, uint32_t index)
394{
395  struct evbuffer * out = msgs->outMessages;
396
397  evbuffer_add_uint32 (out, sizeof (uint8_t) + sizeof (uint32_t));
398  evbuffer_add_uint8 (out, BT_HAVE);
399  evbuffer_add_uint32 (out, index);
400
401  dbgmsg (msgs, "sending Have %u", index);
402  dbgOutMessageLen (msgs);
403  pokeBatchPeriod (msgs, LOW_PRIORITY_INTERVAL_SECS);
404}
405
406#if 0
407static void
408protocolSendAllowedFast (tr_peerMsgs * msgs, uint32_t pieceIndex)
409{
410  tr_peerIo       * io  = msgs->io;
411  struct evbuffer * out = msgs->outMessages;
412
413  assert (tr_peerIoSupportsFEXT (msgs->io));
414
415  evbuffer_add_uint32 (io, out, sizeof (uint8_t) + sizeof (uint32_t));
416  evbuffer_add_uint8 (io, out, BT_FEXT_ALLOWED_FAST);
417  evbuffer_add_uint32 (io, out, pieceIndex);
418
419  dbgmsg (msgs, "sending Allowed Fast %u...", pieceIndex);
420  dbgOutMessageLen (msgs);
421}
422#endif
423
424static void
425protocolSendChoke (tr_peerMsgs * msgs, int choke)
426{
427  struct evbuffer * out = msgs->outMessages;
428
429  evbuffer_add_uint32 (out, sizeof (uint8_t));
430  evbuffer_add_uint8 (out, choke ? BT_CHOKE : BT_UNCHOKE);
431
432  dbgmsg (msgs, "sending %s...", choke ? "Choke" : "Unchoke");
433  dbgOutMessageLen (msgs);
434  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
435}
436
437static void
438protocolSendHaveAll (tr_peerMsgs * msgs)
439{
440  struct evbuffer * out = msgs->outMessages;
441
442  assert (tr_peerIoSupportsFEXT (msgs->io));
443
444  evbuffer_add_uint32 (out, sizeof (uint8_t));
445  evbuffer_add_uint8 (out, BT_FEXT_HAVE_ALL);
446
447  dbgmsg (msgs, "sending HAVE_ALL...");
448  dbgOutMessageLen (msgs);
449  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
450}
451
452static void
453protocolSendHaveNone (tr_peerMsgs * msgs)
454{
455  struct evbuffer * out = msgs->outMessages;
456
457  assert (tr_peerIoSupportsFEXT (msgs->io));
458
459  evbuffer_add_uint32 (out, sizeof (uint8_t));
460  evbuffer_add_uint8 (out, BT_FEXT_HAVE_NONE);
461
462  dbgmsg (msgs, "sending HAVE_NONE...");
463  dbgOutMessageLen (msgs);
464  pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
465}
466
467/**
468***  EVENTS
469**/
470
471static void
472publish (tr_peerMsgs * msgs, tr_peer_event * e)
473{
474  if (msgs->callback != NULL)
475    msgs->callback (&msgs->peer, e, msgs->callbackData);
476}
477
478static void
479fireError (tr_peerMsgs * msgs, int err)
480{
481  tr_peer_event e = TR_PEER_EVENT_INIT;
482  e.eventType = TR_PEER_ERROR;
483  e.err = err;
484  publish (msgs, &e);
485}
486
487static void
488fireGotBlock (tr_peerMsgs * msgs, const struct peer_request * req)
489{
490  tr_peer_event e = TR_PEER_EVENT_INIT;
491  e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
492  e.pieceIndex = req->index;
493  e.offset = req->offset;
494  e.length = req->length;
495  publish (msgs, &e);
496}
497
498static void
499fireGotRej (tr_peerMsgs * msgs, const struct peer_request * req)
500{
501  tr_peer_event e = TR_PEER_EVENT_INIT;
502  e.eventType = TR_PEER_CLIENT_GOT_REJ;
503  e.pieceIndex = req->index;
504  e.offset = req->offset;
505  e.length = req->length;
506  publish (msgs, &e);
507}
508
509static void
510fireGotChoke (tr_peerMsgs * msgs)
511{
512  tr_peer_event e = TR_PEER_EVENT_INIT;
513  e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
514  publish (msgs, &e);
515}
516
517static void
518fireClientGotHaveAll (tr_peerMsgs * msgs)
519{
520  tr_peer_event e = TR_PEER_EVENT_INIT;
521  e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
522  publish (msgs, &e);
523}
524
525static void
526fireClientGotHaveNone (tr_peerMsgs * msgs)
527{
528  tr_peer_event e = TR_PEER_EVENT_INIT;
529  e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
530  publish (msgs, &e);
531}
532
533static void
534fireClientGotPieceData (tr_peerMsgs * msgs, uint32_t length)
535{
536  tr_peer_event e = TR_PEER_EVENT_INIT;
537  e.length = length;
538  e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA;
539  publish (msgs, &e);
540}
541
542static void
543firePeerGotPieceData (tr_peerMsgs * msgs, uint32_t length)
544{
545  tr_peer_event e = TR_PEER_EVENT_INIT;
546  e.length = length;
547  e.eventType = TR_PEER_PEER_GOT_PIECE_DATA;
548  publish (msgs, &e);
549}
550
551static void
552fireClientGotSuggest (tr_peerMsgs * msgs, uint32_t pieceIndex)
553{
554  tr_peer_event e = TR_PEER_EVENT_INIT;
555  e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
556  e.pieceIndex = pieceIndex;
557  publish (msgs, &e);
558}
559
560static void
561fireClientGotPort (tr_peerMsgs * msgs, tr_port port)
562{
563  tr_peer_event e = TR_PEER_EVENT_INIT;
564  e.eventType = TR_PEER_CLIENT_GOT_PORT;
565  e.port = port;
566  publish (msgs, &e);
567}
568
569static void
570fireClientGotAllowedFast (tr_peerMsgs * msgs, uint32_t pieceIndex)
571{
572  tr_peer_event e = TR_PEER_EVENT_INIT;
573  e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
574  e.pieceIndex = pieceIndex;
575  publish (msgs, &e);
576}
577
578static void
579fireClientGotBitfield (tr_peerMsgs * msgs, tr_bitfield * bitfield)
580{
581  tr_peer_event e = TR_PEER_EVENT_INIT;
582  e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
583  e.bitfield = bitfield;
584  publish (msgs, &e);
585}
586
587static void
588fireClientGotHave (tr_peerMsgs * msgs, tr_piece_index_t index)
589{
590  tr_peer_event e = TR_PEER_EVENT_INIT;
591  e.eventType = TR_PEER_CLIENT_GOT_HAVE;
592  e.pieceIndex = index;
593  publish (msgs, &e);
594}
595
596
597/**
598***  ALLOWED FAST SET
599***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
600**/
601
602#if 0
603size_t
604tr_generateAllowedSet (tr_piece_index_t * setmePieces,
605                       size_t             desiredSetSize,
606                       size_t             pieceCount,
607                       const uint8_t    * infohash,
608                       const tr_address * addr)
609{
610    size_t setSize = 0;
611
612    assert (setmePieces);
613    assert (desiredSetSize <= pieceCount);
614    assert (desiredSetSize);
615    assert (pieceCount);
616    assert (infohash);
617    assert (addr);
618
619    if (addr->type == TR_AF_INET)
620    {
621        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
622        uint8_t x[SHA_DIGEST_LENGTH];
623
624        uint32_t ui32 = ntohl (htonl (addr->addr.addr4.s_addr) & 0xffffff00);   /* (1) */
625        memcpy (w, &ui32, sizeof (uint32_t));
626        walk += sizeof (uint32_t);
627        memcpy (walk, infohash, SHA_DIGEST_LENGTH);                 /* (2) */
628        walk += SHA_DIGEST_LENGTH;
629        tr_sha1 (x, w, walk-w, NULL);                               /* (3) */
630        assert (sizeof (w) == walk-w);
631
632        while (setSize<desiredSetSize)
633        {
634            int i;
635            for (i=0; i<5 && setSize<desiredSetSize; ++i)           /* (4) */
636            {
637                size_t k;
638                uint32_t j = i * 4;                                  /* (5) */
639                uint32_t y = ntohl (* (uint32_t*)(x + j));       /* (6) */
640                uint32_t index = y % pieceCount;                     /* (7) */
641
642                for (k=0; k<setSize; ++k)                           /* (8) */
643                    if (setmePieces[k] == index)
644                        break;
645
646                if (k == setSize)
647                    setmePieces[setSize++] = index;                  /* (9) */
648            }
649
650            tr_sha1 (x, x, sizeof (x), NULL);                      /* (3) */
651        }
652    }
653
654    return setSize;
655}
656
657static void
658updateFastSet (tr_peerMsgs * msgs UNUSED)
659{
660    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
661    const int peerIsNeedy = msgs->peer->progress < 0.10;
662
663    if (fext && peerIsNeedy && !msgs->haveFastSet)
664    {
665        size_t i;
666        const struct tr_address * addr = tr_peerIoGetAddress (msgs->io, NULL);
667        const tr_info * inf = &msgs->torrent->info;
668        const size_t numwant = MIN (MAX_FAST_SET_SIZE, inf->pieceCount);
669
670        /* build the fast set */
671        msgs->fastsetSize = tr_generateAllowedSet (msgs->fastset, numwant, inf->pieceCount, inf->hash, addr);
672        msgs->haveFastSet = 1;
673
674        /* send it to the peer */
675        for (i=0; i<msgs->fastsetSize; ++i)
676            protocolSendAllowedFast (msgs, msgs->fastset[i]);
677    }
678}
679#endif
680
681/**
682***  INTEREST
683**/
684
685static void
686sendInterest (tr_peerMsgs * msgs, bool b)
687{
688  struct evbuffer * out = msgs->outMessages;
689
690  assert (msgs);
691  assert (tr_isBool (b));
692
693  msgs->client_is_interested = b;
694  dbgmsg (msgs, "Sending %s", b ? "Interested" : "Not Interested");
695  evbuffer_add_uint32 (out, sizeof (uint8_t));
696  evbuffer_add_uint8 (out, b ? BT_INTERESTED : BT_NOT_INTERESTED);
697
698  pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
699  dbgOutMessageLen (msgs);
700}
701
702static void
703updateInterest (tr_peerMsgs * msgs UNUSED)
704{
705    /* FIXME -- might need to poke the mgr on startup */
706}
707
708void
709tr_peerMsgsSetInterested (tr_peerMsgs * msgs, bool b)
710{
711  assert (tr_isBool (b));
712
713  if (msgs->client_is_interested != b)
714    sendInterest (msgs, b);
715}
716
717static bool
718popNextMetadataRequest (tr_peerMsgs * msgs, int * piece)
719{
720  if (msgs->peerAskedForMetadataCount == 0)
721    return false;
722
723  *piece = msgs->peerAskedForMetadata[0];
724
725  tr_removeElementFromArray (msgs->peerAskedForMetadata, 0, sizeof (int),
726                             msgs->peerAskedForMetadataCount--);
727
728  return true;
729}
730
731static bool
732popNextRequest (tr_peerMsgs * msgs, struct peer_request * setme)
733{
734  if (msgs->peer.pendingReqsToClient == 0)
735    return false;
736
737  *setme = msgs->peerAskedFor[0];
738
739  tr_removeElementFromArray (msgs->peerAskedFor,
740                             0,
741                             sizeof (struct peer_request),
742                             msgs->peer.pendingReqsToClient--);
743
744  return true;
745}
746
747static void
748cancelAllRequestsToClient (tr_peerMsgs * msgs)
749{
750  struct peer_request req;
751  const int mustSendCancel = tr_peerIoSupportsFEXT (msgs->io);
752
753  while (popNextRequest (msgs, &req))
754    if (mustSendCancel)
755      protocolSendReject (msgs, &req);
756}
757
758void
759tr_peerMsgsSetChoke (tr_peerMsgs * msgs, bool peer_is_choked)
760{
761  const time_t now = tr_time ();
762  const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
763
764  assert (msgs != NULL);
765  assert (tr_isBool (peer_is_choked));
766
767  if (msgs->chokeChangedAt > fibrillationTime)
768    {
769      dbgmsg (msgs, "Not changing choke to %d to avoid fibrillation", peer_is_choked);
770    }
771  else if (msgs->peer_is_choked != peer_is_choked)
772    {
773      msgs->peer_is_choked = peer_is_choked;
774      if (peer_is_choked)
775        cancelAllRequestsToClient (msgs);
776      protocolSendChoke (msgs, peer_is_choked);
777      msgs->chokeChangedAt = now;
778    }
779}
780
781/**
782***
783**/
784
785void
786tr_peerMsgsHave (tr_peerMsgs * msgs, uint32_t index)
787{
788  protocolSendHave (msgs, index);
789
790  /* since we have more pieces now, we might not be interested in this peer */
791  updateInterest (msgs);
792}
793
794/**
795***
796**/
797
798static bool
799reqIsValid (const tr_peerMsgs * peer,
800            uint32_t            index,
801            uint32_t            offset,
802            uint32_t            length)
803{
804    return tr_torrentReqIsValid (peer->torrent, index, offset, length);
805}
806
807static bool
808requestIsValid (const tr_peerMsgs * msgs, const struct peer_request * req)
809{
810    return reqIsValid (msgs, req->index, req->offset, req->length);
811}
812
813void
814tr_peerMsgsCancel (tr_peerMsgs * msgs, tr_block_index_t block)
815{
816    struct peer_request req;
817/*fprintf (stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer);*/
818    blockToReq (msgs->torrent, block, &req);
819    protocolSendCancel (msgs, &req);
820}
821
822/**
823***
824**/
825
826static void
827sendLtepHandshake (tr_peerMsgs * msgs)
828{
829    tr_variant val;
830    bool allow_pex;
831    bool allow_metadata_xfer;
832    struct evbuffer * payload;
833    struct evbuffer * out = msgs->outMessages;
834    const unsigned char * ipv6 = tr_globalIPv6 ();
835    static tr_quark version_quark = 0;
836
837    if (msgs->clientSentLtepHandshake)
838        return;
839
840    if (!version_quark)
841      version_quark = tr_quark_new (TR_NAME " " USERAGENT_PREFIX, -1);
842
843    dbgmsg (msgs, "sending an ltep handshake");
844    msgs->clientSentLtepHandshake = 1;
845
846    /* decide if we want to advertise metadata xfer support (BEP 9) */
847    if (tr_torrentIsPrivate (msgs->torrent))
848        allow_metadata_xfer = 0;
849    else
850        allow_metadata_xfer = 1;
851
852    /* decide if we want to advertise pex support */
853    if (!tr_torrentAllowsPex (msgs->torrent))
854        allow_pex = 0;
855    else if (msgs->peerSentLtepHandshake)
856        allow_pex = msgs->peerSupportsPex ? 1 : 0;
857    else
858        allow_pex = 1;
859
860    tr_variantInitDict (&val, 8);
861    tr_variantDictAddInt (&val, TR_KEY_e, getSession (msgs)->encryptionMode != TR_CLEAR_PREFERRED);
862    if (ipv6 != NULL)
863        tr_variantDictAddRaw (&val, TR_KEY_ipv6, ipv6, 16);
864    if (allow_metadata_xfer && tr_torrentHasMetadata (msgs->torrent)
865                            && (msgs->torrent->infoDictLength > 0))
866        tr_variantDictAddInt (&val, TR_KEY_metadata_size, msgs->torrent->infoDictLength);
867    tr_variantDictAddInt (&val, TR_KEY_p, tr_sessionGetPublicPeerPort (getSession (msgs)));
868    tr_variantDictAddInt (&val, TR_KEY_reqq, REQQ);
869    tr_variantDictAddInt (&val, TR_KEY_upload_only, tr_torrentIsSeed (msgs->torrent));
870    tr_variantDictAddQuark (&val, TR_KEY_v, version_quark);
871    if (allow_metadata_xfer || allow_pex) {
872        tr_variant * m  = tr_variantDictAddDict (&val, TR_KEY_m, 2);
873        if (allow_metadata_xfer)
874            tr_variantDictAddInt (m, TR_KEY_ut_metadata, UT_METADATA_ID);
875        if (allow_pex)
876            tr_variantDictAddInt (m, TR_KEY_ut_pex, UT_PEX_ID);
877    }
878
879    payload = tr_variantToBuf (&val, TR_VARIANT_FMT_BENC);
880
881    evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
882    evbuffer_add_uint8 (out, BT_LTEP);
883    evbuffer_add_uint8 (out, LTEP_HANDSHAKE);
884    evbuffer_add_buffer (out, payload);
885    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
886    dbgOutMessageLen (msgs);
887
888    /* cleanup */
889    evbuffer_free (payload);
890    tr_variantFree (&val);
891}
892
893static void
894parseLtepHandshake (tr_peerMsgs * msgs, int len, struct evbuffer * inbuf)
895{
896    int64_t   i;
897    tr_variant   val, * sub;
898    uint8_t * tmp = tr_new (uint8_t, len);
899    const uint8_t *addr;
900    size_t addr_len;
901    tr_pex pex;
902    int8_t seedProbability = -1;
903
904    memset (&pex, 0, sizeof (tr_pex));
905
906    tr_peerIoReadBytes (msgs->io, inbuf, tmp, len);
907    msgs->peerSentLtepHandshake = 1;
908
909    if (tr_variantFromBenc (&val, tmp, len) || !tr_variantIsDict (&val))
910    {
911        dbgmsg (msgs, "GET  extended-handshake, couldn't get dictionary");
912        tr_free (tmp);
913        return;
914    }
915
916    dbgmsg (msgs, "here is the handshake: [%*.*s]", len, len,  tmp);
917
918    /* does the peer prefer encrypted connections? */
919    if (tr_variantDictFindInt (&val, TR_KEY_e, &i)) {
920        msgs->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
921                                        : ENCRYPTION_PREFERENCE_NO;
922        if (i)
923            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
924    }
925
926    /* check supported messages for utorrent pex */
927    msgs->peerSupportsPex = 0;
928    msgs->peerSupportsMetadataXfer = 0;
929
930    if (tr_variantDictFindDict (&val, TR_KEY_m, &sub)) {
931        if (tr_variantDictFindInt (sub, TR_KEY_ut_pex, &i)) {
932            msgs->peerSupportsPex = i != 0;
933            msgs->ut_pex_id = (uint8_t) i;
934            dbgmsg (msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id);
935        }
936        if (tr_variantDictFindInt (sub, TR_KEY_ut_metadata, &i)) {
937            msgs->peerSupportsMetadataXfer = i != 0;
938            msgs->ut_metadata_id = (uint8_t) i;
939            dbgmsg (msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id);
940        }
941        if (tr_variantDictFindInt (sub, TR_KEY_ut_holepunch, &i)) {
942            /* Mysterious µTorrent extension that we don't grok.  However,
943               it implies support for µTP, so use it to indicate that. */
944            tr_peerMgrSetUtpFailed (msgs->torrent,
945                                    tr_peerIoGetAddress (msgs->io, NULL),
946                                    false);
947        }
948    }
949
950    /* look for metainfo size (BEP 9) */
951    if (tr_variantDictFindInt (&val, TR_KEY_metadata_size, &i)) {
952        tr_torrentSetMetadataSizeHint (msgs->torrent, i);
953        msgs->metadata_size_hint = (size_t) i;
954    }
955
956    /* look for upload_only (BEP 21) */
957    if (tr_variantDictFindInt (&val, TR_KEY_upload_only, &i))
958        seedProbability = i==0 ? 0 : 100;
959
960    /* get peer's listening port */
961    if (tr_variantDictFindInt (&val, TR_KEY_p, &i)) {
962        pex.port = htons ((uint16_t)i);
963        fireClientGotPort (msgs, pex.port);
964        dbgmsg (msgs, "peer's port is now %d", (int)i);
965    }
966
967    if (tr_peerIoIsIncoming (msgs->io)
968        && tr_variantDictFindRaw (&val, TR_KEY_ipv4, &addr, &addr_len)
969        && (addr_len == 4))
970    {
971        pex.addr.type = TR_AF_INET;
972        memcpy (&pex.addr.addr.addr4, addr, 4);
973        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
974    }
975
976    if (tr_peerIoIsIncoming (msgs->io)
977        && tr_variantDictFindRaw (&val, TR_KEY_ipv6, &addr, &addr_len)
978        && (addr_len == 16))
979    {
980        pex.addr.type = TR_AF_INET6;
981        memcpy (&pex.addr.addr.addr6, addr, 16);
982        tr_peerMgrAddPex (msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
983    }
984
985    /* get peer's maximum request queue size */
986    if (tr_variantDictFindInt (&val, TR_KEY_reqq, &i))
987        msgs->reqq = i;
988
989    tr_variantFree (&val);
990    tr_free (tmp);
991}
992
993static void
994parseUtMetadata (tr_peerMsgs * msgs, int msglen, struct evbuffer * inbuf)
995{
996    tr_variant dict;
997    char * msg_end;
998    const char * benc_end;
999    int64_t msg_type = -1;
1000    int64_t piece = -1;
1001    int64_t total_size = 0;
1002    uint8_t * tmp = tr_new (uint8_t, msglen);
1003
1004    tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1005    msg_end = (char*)tmp + msglen;
1006
1007    if (!tr_variantFromBencFull (&dict, tmp, msglen, NULL, &benc_end))
1008    {
1009        tr_variantDictFindInt (&dict, TR_KEY_msg_type, &msg_type);
1010        tr_variantDictFindInt (&dict, TR_KEY_piece, &piece);
1011        tr_variantDictFindInt (&dict, TR_KEY_total_size, &total_size);
1012        tr_variantFree (&dict);
1013    }
1014
1015    dbgmsg (msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
1016          (int)msg_type, (int)piece, (int)total_size);
1017
1018    if (msg_type == METADATA_MSG_TYPE_REJECT)
1019    {
1020        /* NOOP */
1021    }
1022
1023    if ((msg_type == METADATA_MSG_TYPE_DATA)
1024        && (!tr_torrentHasMetadata (msgs->torrent))
1025        && (msg_end - benc_end <= METADATA_PIECE_SIZE)
1026        && (piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size))
1027    {
1028        const int pieceLen = msg_end - benc_end;
1029        tr_torrentSetMetadataPiece (msgs->torrent, piece, benc_end, pieceLen);
1030    }
1031
1032    if (msg_type == METADATA_MSG_TYPE_REQUEST)
1033    {
1034        if ((piece >= 0)
1035            && tr_torrentHasMetadata (msgs->torrent)
1036            && !tr_torrentIsPrivate (msgs->torrent)
1037            && (msgs->peerAskedForMetadataCount < METADATA_REQQ))
1038        {
1039            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1040        }
1041        else
1042        {
1043            tr_variant tmp;
1044            struct evbuffer * payload;
1045            struct evbuffer * out = msgs->outMessages;
1046
1047            /* build the rejection message */
1048            tr_variantInitDict (&tmp, 2);
1049            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
1050            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1051            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1052
1053            /* write it out as a LTEP message to our outMessages buffer */
1054            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1055            evbuffer_add_uint8 (out, BT_LTEP);
1056            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1057            evbuffer_add_buffer (out, payload);
1058            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1059            dbgOutMessageLen (msgs);
1060
1061            /* cleanup */
1062            evbuffer_free (payload);
1063            tr_variantFree (&tmp);
1064        }
1065    }
1066
1067    tr_free (tmp);
1068}
1069
1070static void
1071parseUtPex (tr_peerMsgs * msgs, int msglen, struct evbuffer * inbuf)
1072{
1073    int loaded = 0;
1074    uint8_t * tmp = tr_new (uint8_t, msglen);
1075    tr_variant val;
1076    tr_torrent * tor = msgs->torrent;
1077    const uint8_t * added;
1078    size_t added_len;
1079
1080    tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1081
1082    if (tr_torrentAllowsPex (tor)
1083      && ((loaded = !tr_variantFromBenc (&val, tmp, msglen))))
1084    {
1085        if (tr_variantDictFindRaw (&val, TR_KEY_added, &added, &added_len))
1086        {
1087            tr_pex * pex;
1088            size_t i, n;
1089            size_t added_f_len = 0;
1090            const uint8_t * added_f = NULL;
1091
1092            tr_variantDictFindRaw (&val, TR_KEY_added_f, &added_f, &added_f_len);
1093            pex = tr_peerMgrCompactToPex (added, added_len, added_f, added_f_len, &n);
1094
1095            n = MIN (n, MAX_PEX_PEER_COUNT);
1096            for (i=0; i<n; ++i)
1097            {
1098                int seedProbability = -1;
1099                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1100                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1101            }
1102
1103            tr_free (pex);
1104        }
1105
1106        if (tr_variantDictFindRaw (&val, TR_KEY_added6, &added, &added_len))
1107        {
1108            tr_pex * pex;
1109            size_t i, n;
1110            size_t added_f_len = 0;
1111            const uint8_t * added_f = NULL;
1112
1113            tr_variantDictFindRaw (&val, TR_KEY_added6_f, &added_f, &added_f_len);
1114            pex = tr_peerMgrCompact6ToPex (added, added_len, added_f, added_f_len, &n);
1115
1116            n = MIN (n, MAX_PEX_PEER_COUNT);
1117            for (i=0; i<n; ++i)
1118            {
1119                int seedProbability = -1;
1120                if (i < added_f_len) seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) ? 100 : 0;
1121                tr_peerMgrAddPex (tor, TR_PEER_FROM_PEX, pex+i, seedProbability);
1122            }
1123
1124            tr_free (pex);
1125        }
1126    }
1127
1128    if (loaded)
1129        tr_variantFree (&val);
1130    tr_free (tmp);
1131}
1132
1133static void sendPex (tr_peerMsgs * msgs);
1134
1135static void
1136parseLtep (tr_peerMsgs * msgs, int msglen, struct evbuffer  * inbuf)
1137{
1138    uint8_t ltep_msgid;
1139
1140    tr_peerIoReadUint8 (msgs->io, inbuf, &ltep_msgid);
1141    msglen--;
1142
1143    if (ltep_msgid == LTEP_HANDSHAKE)
1144    {
1145        dbgmsg (msgs, "got ltep handshake");
1146        parseLtepHandshake (msgs, msglen, inbuf);
1147        if (tr_peerIoSupportsLTEP (msgs->io))
1148        {
1149            sendLtepHandshake (msgs);
1150            sendPex (msgs);
1151        }
1152    }
1153    else if (ltep_msgid == UT_PEX_ID)
1154    {
1155        dbgmsg (msgs, "got ut pex");
1156        msgs->peerSupportsPex = 1;
1157        parseUtPex (msgs, msglen, inbuf);
1158    }
1159    else if (ltep_msgid == UT_METADATA_ID)
1160    {
1161        dbgmsg (msgs, "got ut metadata");
1162        msgs->peerSupportsMetadataXfer = 1;
1163        parseUtMetadata (msgs, msglen, inbuf);
1164    }
1165    else
1166    {
1167        dbgmsg (msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid);
1168        evbuffer_drain (inbuf, msglen);
1169    }
1170}
1171
1172static int
1173readBtLength (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1174{
1175    uint32_t len;
1176
1177    if (inlen < sizeof (len))
1178        return READ_LATER;
1179
1180    tr_peerIoReadUint32 (msgs->io, inbuf, &len);
1181
1182    if (len == 0) /* peer sent us a keepalive message */
1183        dbgmsg (msgs, "got KeepAlive");
1184    else
1185    {
1186        msgs->incoming.length = len;
1187        msgs->state = AWAITING_BT_ID;
1188    }
1189
1190    return READ_NOW;
1191}
1192
1193static int readBtMessage (tr_peerMsgs *, struct evbuffer *, size_t);
1194
1195static int
1196readBtId (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1197{
1198    uint8_t id;
1199
1200    if (inlen < sizeof (uint8_t))
1201        return READ_LATER;
1202
1203    tr_peerIoReadUint8 (msgs->io, inbuf, &id);
1204    msgs->incoming.id = id;
1205    dbgmsg (msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length);
1206
1207    if (id == BT_PIECE)
1208    {
1209        msgs->state = AWAITING_BT_PIECE;
1210        return READ_NOW;
1211    }
1212    else if (msgs->incoming.length != 1)
1213    {
1214        msgs->state = AWAITING_BT_MESSAGE;
1215        return READ_NOW;
1216    }
1217    else return readBtMessage (msgs, inbuf, inlen - 1);
1218}
1219
1220static void
1221updatePeerProgress (tr_peerMsgs * msgs)
1222{
1223  tr_peerUpdateProgress (msgs->torrent, &msgs->peer);
1224
1225  /*updateFastSet (msgs);*/
1226  updateInterest (msgs);
1227}
1228
1229static void
1230prefetchPieces (tr_peerMsgs *msgs)
1231{
1232  int i;
1233
1234  if (!getSession (msgs)->isPrefetchEnabled)
1235    return;
1236
1237  for (i=msgs->prefetchCount; i<msgs->peer.pendingReqsToClient && i<PREFETCH_SIZE; ++i)
1238    {
1239      const struct peer_request * req = msgs->peerAskedFor + i;
1240      if (requestIsValid (msgs, req))
1241        {
1242          tr_cachePrefetchBlock (getSession (msgs)->cache, msgs->torrent, req->index, req->offset, req->length);
1243          ++msgs->prefetchCount;
1244        }
1245    }
1246}
1247
1248static void
1249peerMadeRequest (tr_peerMsgs * msgs, const struct peer_request * req)
1250{
1251    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1252    const int reqIsValid = requestIsValid (msgs, req);
1253    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete (&msgs->torrent->completion, req->index);
1254    const int peerIsChoked = msgs->peer_is_choked;
1255
1256    int allow = false;
1257
1258    if (!reqIsValid)
1259        dbgmsg (msgs, "rejecting an invalid request.");
1260    else if (!clientHasPiece)
1261        dbgmsg (msgs, "rejecting request for a piece we don't have.");
1262    else if (peerIsChoked)
1263        dbgmsg (msgs, "rejecting request from choked peer");
1264    else if (msgs->peer.pendingReqsToClient + 1 >= REQQ)
1265        dbgmsg (msgs, "rejecting request ... reqq is full");
1266    else
1267        allow = true;
1268
1269    if (allow) {
1270        msgs->peerAskedFor[msgs->peer.pendingReqsToClient++] = *req;
1271        prefetchPieces (msgs);
1272    } else if (fext) {
1273        protocolSendReject (msgs, req);
1274    }
1275}
1276
1277static bool
1278messageLengthIsCorrect (const tr_peerMsgs * msg, uint8_t id, uint32_t len)
1279{
1280    switch (id)
1281    {
1282        case BT_CHOKE:
1283        case BT_UNCHOKE:
1284        case BT_INTERESTED:
1285        case BT_NOT_INTERESTED:
1286        case BT_FEXT_HAVE_ALL:
1287        case BT_FEXT_HAVE_NONE:
1288            return len == 1;
1289
1290        case BT_HAVE:
1291        case BT_FEXT_SUGGEST:
1292        case BT_FEXT_ALLOWED_FAST:
1293            return len == 5;
1294
1295        case BT_BITFIELD:
1296            if (tr_torrentHasMetadata (msg->torrent))
1297                return len == (msg->torrent->info.pieceCount + 7u) / 8u + 1u;
1298            /* we don't know the piece count yet,
1299               so we can only guess whether to send true or false */
1300            if (msg->metadata_size_hint > 0)
1301                return len <= msg->metadata_size_hint;
1302            return true;
1303
1304        case BT_REQUEST:
1305        case BT_CANCEL:
1306        case BT_FEXT_REJECT:
1307            return len == 13;
1308
1309        case BT_PIECE:
1310            return len > 9 && len <= 16393;
1311
1312        case BT_PORT:
1313            return len == 3;
1314
1315        case BT_LTEP:
1316            return len >= 2;
1317
1318        default:
1319            return false;
1320    }
1321}
1322
1323static int clientGotBlock (tr_peerMsgs *               msgs,
1324                           struct evbuffer *           block,
1325                           const struct peer_request * req);
1326
1327static int
1328readBtPiece (tr_peerMsgs      * msgs,
1329             struct evbuffer  * inbuf,
1330             size_t             inlen,
1331             size_t           * setme_piece_bytes_read)
1332{
1333    struct peer_request * req = &msgs->incoming.blockReq;
1334
1335    assert (evbuffer_get_length (inbuf) >= inlen);
1336    dbgmsg (msgs, "In readBtPiece");
1337
1338    if (!req->length)
1339    {
1340        if (inlen < 8)
1341            return READ_LATER;
1342
1343        tr_peerIoReadUint32 (msgs->io, inbuf, &req->index);
1344        tr_peerIoReadUint32 (msgs->io, inbuf, &req->offset);
1345        req->length = msgs->incoming.length - 9;
1346        dbgmsg (msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length);
1347        return READ_NOW;
1348    }
1349    else
1350    {
1351        int err;
1352        size_t n;
1353        size_t nLeft;
1354        struct evbuffer * block_buffer;
1355
1356        if (msgs->incoming.block == NULL)
1357            msgs->incoming.block = evbuffer_new ();
1358        block_buffer = msgs->incoming.block;
1359
1360        /* read in another chunk of data */
1361        nLeft = req->length - evbuffer_get_length (block_buffer);
1362        n = MIN (nLeft, inlen);
1363
1364        tr_peerIoReadBytesToBuf (msgs->io, inbuf, block_buffer, n);
1365
1366        fireClientGotPieceData (msgs, n);
1367        *setme_piece_bytes_read += n;
1368        dbgmsg (msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1369               n, req->index, req->offset, req->length,
1370             (int)(req->length - evbuffer_get_length (block_buffer)));
1371        if (evbuffer_get_length (block_buffer) < req->length)
1372            return READ_LATER;
1373
1374        /* pass the block along... */
1375        err = clientGotBlock (msgs, block_buffer, req);
1376        evbuffer_drain (block_buffer, evbuffer_get_length (block_buffer));
1377
1378        /* cleanup */
1379        req->length = 0;
1380        msgs->state = AWAITING_BT_LENGTH;
1381        return err ? READ_ERR : READ_NOW;
1382    }
1383}
1384
1385static void updateDesiredRequestCount (tr_peerMsgs * msgs);
1386
1387static int
1388readBtMessage (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen)
1389{
1390    uint32_t      ui32;
1391    uint32_t      msglen = msgs->incoming.length;
1392    const uint8_t id = msgs->incoming.id;
1393#ifndef NDEBUG
1394    const size_t  startBufLen = evbuffer_get_length (inbuf);
1395#endif
1396    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1397
1398    --msglen; /* id length */
1399
1400    dbgmsg (msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen);
1401
1402    if (inlen < msglen)
1403        return READ_LATER;
1404
1405    if (!messageLengthIsCorrect (msgs, id, msglen + 1))
1406    {
1407        dbgmsg (msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen);
1408        fireError (msgs, EMSGSIZE);
1409        return READ_ERR;
1410    }
1411
1412    switch (id)
1413    {
1414        case BT_CHOKE:
1415            dbgmsg (msgs, "got Choke");
1416            msgs->client_is_choked = true;
1417            if (!fext)
1418                fireGotChoke (msgs);
1419            break;
1420
1421        case BT_UNCHOKE:
1422            dbgmsg (msgs, "got Unchoke");
1423            msgs->client_is_choked = false;
1424            updateDesiredRequestCount (msgs);
1425            break;
1426
1427        case BT_INTERESTED:
1428            dbgmsg (msgs, "got Interested");
1429            msgs->peer_is_interested = true;
1430            break;
1431
1432        case BT_NOT_INTERESTED:
1433            dbgmsg (msgs, "got Not Interested");
1434            msgs->peer_is_interested = false;
1435            break;
1436
1437        case BT_HAVE:
1438            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1439            dbgmsg (msgs, "got Have: %u", ui32);
1440            if (tr_torrentHasMetadata (msgs->torrent)
1441                    && (ui32 >= msgs->torrent->info.pieceCount))
1442            {
1443                fireError (msgs, ERANGE);
1444                return READ_ERR;
1445            }
1446
1447            /* a peer can send the same HAVE message twice... */
1448            if (!tr_bitfieldHas (&msgs->peer.have, ui32)) {
1449                tr_bitfieldAdd (&msgs->peer.have, ui32);
1450                fireClientGotHave (msgs, ui32);
1451            }
1452            updatePeerProgress (msgs);
1453            break;
1454
1455        case BT_BITFIELD: {
1456            uint8_t * tmp = tr_new (uint8_t, msglen);
1457            dbgmsg (msgs, "got a bitfield");
1458            tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen);
1459            tr_bitfieldSetRaw (&msgs->peer.have, tmp, msglen, tr_torrentHasMetadata (msgs->torrent));
1460            fireClientGotBitfield (msgs, &msgs->peer.have);
1461            updatePeerProgress (msgs);
1462            tr_free (tmp);
1463            break;
1464        }
1465
1466        case BT_REQUEST:
1467        {
1468            struct peer_request r;
1469            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1470            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1471            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1472            dbgmsg (msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length);
1473            peerMadeRequest (msgs, &r);
1474            break;
1475        }
1476
1477        case BT_CANCEL:
1478        {
1479            int i;
1480            struct peer_request r;
1481            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1482            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1483            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1484            tr_historyAdd (&msgs->peer.cancelsSentToClient, tr_time (), 1);
1485            dbgmsg (msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length);
1486
1487            for (i=0; i<msgs->peer.pendingReqsToClient; ++i) {
1488                const struct peer_request * req = msgs->peerAskedFor + i;
1489                if ((req->index == r.index) && (req->offset == r.offset) && (req->length == r.length))
1490                    break;
1491            }
1492
1493            if (i < msgs->peer.pendingReqsToClient)
1494                tr_removeElementFromArray (msgs->peerAskedFor, i, sizeof (struct peer_request),
1495                                           msgs->peer.pendingReqsToClient--);
1496            break;
1497        }
1498
1499        case BT_PIECE:
1500            assert (0); /* handled elsewhere! */
1501            break;
1502
1503        case BT_PORT:
1504            dbgmsg (msgs, "Got a BT_PORT");
1505            tr_peerIoReadUint16 (msgs->io, inbuf, &msgs->dht_port);
1506            if (msgs->dht_port > 0)
1507                tr_dhtAddNode (getSession (msgs),
1508                               tr_peerAddress (&msgs->peer),
1509                               msgs->dht_port, 0);
1510            break;
1511
1512        case BT_FEXT_SUGGEST:
1513            dbgmsg (msgs, "Got a BT_FEXT_SUGGEST");
1514            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1515            if (fext)
1516                fireClientGotSuggest (msgs, ui32);
1517            else {
1518                fireError (msgs, EMSGSIZE);
1519                return READ_ERR;
1520            }
1521            break;
1522
1523        case BT_FEXT_ALLOWED_FAST:
1524            dbgmsg (msgs, "Got a BT_FEXT_ALLOWED_FAST");
1525            tr_peerIoReadUint32 (msgs->io, inbuf, &ui32);
1526            if (fext)
1527                fireClientGotAllowedFast (msgs, ui32);
1528            else {
1529                fireError (msgs, EMSGSIZE);
1530                return READ_ERR;
1531            }
1532            break;
1533
1534        case BT_FEXT_HAVE_ALL:
1535            dbgmsg (msgs, "Got a BT_FEXT_HAVE_ALL");
1536            if (fext) {
1537                tr_bitfieldSetHasAll (&msgs->peer.have);
1538assert (tr_bitfieldHasAll (&msgs->peer.have));
1539                fireClientGotHaveAll (msgs);
1540                updatePeerProgress (msgs);
1541            } else {
1542                fireError (msgs, EMSGSIZE);
1543                return READ_ERR;
1544            }
1545            break;
1546
1547        case BT_FEXT_HAVE_NONE:
1548            dbgmsg (msgs, "Got a BT_FEXT_HAVE_NONE");
1549            if (fext) {
1550                tr_bitfieldSetHasNone (&msgs->peer.have);
1551                fireClientGotHaveNone (msgs);
1552                updatePeerProgress (msgs);
1553            } else {
1554                fireError (msgs, EMSGSIZE);
1555                return READ_ERR;
1556            }
1557            break;
1558
1559        case BT_FEXT_REJECT:
1560        {
1561            struct peer_request r;
1562            dbgmsg (msgs, "Got a BT_FEXT_REJECT");
1563            tr_peerIoReadUint32 (msgs->io, inbuf, &r.index);
1564            tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset);
1565            tr_peerIoReadUint32 (msgs->io, inbuf, &r.length);
1566            if (fext)
1567                fireGotRej (msgs, &r);
1568            else {
1569                fireError (msgs, EMSGSIZE);
1570                return READ_ERR;
1571            }
1572            break;
1573        }
1574
1575        case BT_LTEP:
1576            dbgmsg (msgs, "Got a BT_LTEP");
1577            parseLtep (msgs, msglen, inbuf);
1578            break;
1579
1580        default:
1581            dbgmsg (msgs, "peer sent us an UNKNOWN: %d", (int)id);
1582            tr_peerIoDrain (msgs->io, inbuf, msglen);
1583            break;
1584    }
1585
1586    assert (msglen + 1 == msgs->incoming.length);
1587    assert (evbuffer_get_length (inbuf) == startBufLen - msglen);
1588
1589    msgs->state = AWAITING_BT_LENGTH;
1590    return READ_NOW;
1591}
1592
1593/* returns 0 on success, or an errno on failure */
1594static int
1595clientGotBlock (tr_peerMsgs                * msgs,
1596                struct evbuffer            * data,
1597                const struct peer_request  * req)
1598{
1599    int err;
1600    tr_torrent * tor = msgs->torrent;
1601    const tr_block_index_t block = _tr_block (tor, req->index, req->offset);
1602
1603    assert (msgs);
1604    assert (req);
1605
1606    if (req->length != tr_torBlockCountBytes (msgs->torrent, block)) {
1607        dbgmsg (msgs, "wrong block size -- expected %u, got %d",
1608                tr_torBlockCountBytes (msgs->torrent, block), req->length);
1609        return EMSGSIZE;
1610    }
1611
1612    dbgmsg (msgs, "got block %u:%u->%u", req->index, req->offset, req->length);
1613
1614    if (!tr_peerMgrDidPeerRequest (msgs->torrent, &msgs->peer, block)) {
1615        dbgmsg (msgs, "we didn't ask for this message...");
1616        return 0;
1617    }
1618    if (tr_cpPieceIsComplete (&msgs->torrent->completion, req->index)) {
1619        dbgmsg (msgs, "we did ask for this message, but the piece is already complete...");
1620        return 0;
1621    }
1622
1623    /**
1624    ***  Save the block
1625    **/
1626
1627    if ((err = tr_cacheWriteBlock (getSession (msgs)->cache, tor, req->index, req->offset, req->length, data)))
1628        return err;
1629
1630    tr_bitfieldAdd (&msgs->peer.blame, req->index);
1631    fireGotBlock (msgs, req);
1632    return 0;
1633}
1634
1635static int peerPulse (void * vmsgs);
1636
1637static void
1638didWrite (tr_peerIo * io UNUSED, size_t bytesWritten, bool wasPieceData, void * vmsgs)
1639{
1640    tr_peerMsgs * msgs = vmsgs;
1641
1642    if (wasPieceData)
1643      firePeerGotPieceData (msgs, bytesWritten);
1644
1645    if (tr_isPeerIo (io) && io->userData)
1646        peerPulse (msgs);
1647}
1648
1649static ReadState
1650canRead (tr_peerIo * io, void * vmsgs, size_t * piece)
1651{
1652    ReadState         ret;
1653    tr_peerMsgs *     msgs = vmsgs;
1654    struct evbuffer * in = tr_peerIoGetReadBuffer (io);
1655    const size_t      inlen = evbuffer_get_length (in);
1656
1657    dbgmsg (msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state);
1658
1659    if (!inlen)
1660    {
1661        ret = READ_LATER;
1662    }
1663    else if (msgs->state == AWAITING_BT_PIECE)
1664    {
1665        ret = readBtPiece (msgs, in, inlen, piece);
1666    }
1667    else switch (msgs->state)
1668    {
1669        case AWAITING_BT_LENGTH:
1670            ret = readBtLength (msgs, in, inlen); break;
1671
1672        case AWAITING_BT_ID:
1673            ret = readBtId   (msgs, in, inlen); break;
1674
1675        case AWAITING_BT_MESSAGE:
1676            ret = readBtMessage (msgs, in, inlen); break;
1677
1678        default:
1679            ret = READ_ERR;
1680            assert (0);
1681    }
1682
1683    dbgmsg (msgs, "canRead: ret is %d", (int)ret);
1684
1685    return ret;
1686}
1687
1688int
1689tr_peerMsgsIsReadingBlock (const tr_peerMsgs * msgs, tr_block_index_t block)
1690{
1691    if (msgs->state != AWAITING_BT_PIECE)
1692        return false;
1693
1694    return block == _tr_block (msgs->torrent,
1695                               msgs->incoming.blockReq.index,
1696                               msgs->incoming.blockReq.offset);
1697}
1698
1699/**
1700***
1701**/
1702
1703static void
1704updateDesiredRequestCount (tr_peerMsgs * msgs)
1705{
1706    tr_torrent * const torrent = msgs->torrent;
1707
1708    /* there are lots of reasons we might not want to request any blocks... */
1709    if (tr_torrentIsSeed (torrent) || !tr_torrentHasMetadata (torrent)
1710                                    || msgs->client_is_choked
1711                                    || !msgs->client_is_interested)
1712    {
1713        msgs->desiredRequestCount = 0;
1714    }
1715    else
1716    {
1717        int estimatedBlocksInPeriod;
1718        unsigned int rate_Bps;
1719        unsigned int irate_Bps;
1720        const int floor = 4;
1721        const int seconds = REQUEST_BUF_SECS;
1722        const uint64_t now = tr_time_msec ();
1723
1724        /* Get the rate limit we should use.
1725         * FIXME: this needs to consider all the other peers as well... */
1726        rate_Bps = tr_peerGetPieceSpeed_Bps (&msgs->peer, now, TR_PEER_TO_CLIENT);
1727        if (tr_torrentUsesSpeedLimit (torrent, TR_PEER_TO_CLIENT))
1728            rate_Bps = MIN (rate_Bps, tr_torrentGetSpeedLimit_Bps (torrent, TR_PEER_TO_CLIENT));
1729
1730        /* honor the session limits, if enabled */
1731        if (tr_torrentUsesSessionLimits (torrent) &&
1732            tr_sessionGetActiveSpeedLimit_Bps (torrent->session, TR_PEER_TO_CLIENT, &irate_Bps))
1733                rate_Bps = MIN (rate_Bps, irate_Bps);
1734
1735        /* use this desired rate to figure out how
1736         * many requests we should send to this peer */
1737        estimatedBlocksInPeriod = (rate_Bps * seconds) / torrent->blockSize;
1738        msgs->desiredRequestCount = MAX (floor, estimatedBlocksInPeriod);
1739
1740        /* honor the peer's maximum request count, if specified */
1741        if (msgs->reqq > 0)
1742            if (msgs->desiredRequestCount > msgs->reqq)
1743                msgs->desiredRequestCount = msgs->reqq;
1744    }
1745}
1746
1747static void
1748updateMetadataRequests (tr_peerMsgs * msgs, time_t now)
1749{
1750    int piece;
1751
1752    if (msgs->peerSupportsMetadataXfer
1753        && tr_torrentGetNextMetadataRequest (msgs->torrent, now, &piece))
1754    {
1755        tr_variant tmp;
1756        struct evbuffer * payload;
1757        struct evbuffer * out = msgs->outMessages;
1758
1759        /* build the data message */
1760        tr_variantInitDict (&tmp, 3);
1761        tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REQUEST);
1762        tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1763        payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1764
1765        dbgmsg (msgs, "requesting metadata piece #%d", piece);
1766
1767        /* write it out as a LTEP message to our outMessages buffer */
1768        evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1769        evbuffer_add_uint8 (out, BT_LTEP);
1770        evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1771        evbuffer_add_buffer (out, payload);
1772        pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1773        dbgOutMessageLen (msgs);
1774
1775        /* cleanup */
1776        evbuffer_free (payload);
1777        tr_variantFree (&tmp);
1778    }
1779}
1780
1781static void
1782updateBlockRequests (tr_peerMsgs * msgs)
1783{
1784    if (tr_torrentIsPieceTransferAllowed (msgs->torrent, TR_PEER_TO_CLIENT)
1785        && (msgs->desiredRequestCount > 0)
1786        && (msgs->peer.pendingReqsToPeer <= (msgs->desiredRequestCount * 0.66)))
1787    {
1788        int i;
1789        int n;
1790        tr_block_index_t * blocks;
1791        const int numwant = msgs->desiredRequestCount - msgs->peer.pendingReqsToPeer;
1792
1793        assert (tr_peerMsgsIsClientInterested (msgs));
1794        assert (!tr_peerMsgsIsClientChoked (msgs));
1795
1796        blocks = tr_new (tr_block_index_t, numwant);
1797        tr_peerMgrGetNextRequests (msgs->torrent, &msgs->peer, numwant, blocks, &n, false);
1798
1799        for (i=0; i<n; ++i)
1800        {
1801            struct peer_request req;
1802            blockToReq (msgs->torrent, blocks[i], &req);
1803            protocolSendRequest (msgs, &req);
1804        }
1805
1806        tr_free (blocks);
1807    }
1808}
1809
1810static size_t
1811fillOutputBuffer (tr_peerMsgs * msgs, time_t now)
1812{
1813    int piece;
1814    size_t bytesWritten = 0;
1815    struct peer_request req;
1816    const bool haveMessages = evbuffer_get_length (msgs->outMessages) != 0;
1817    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
1818
1819    /**
1820    ***  Protocol messages
1821    **/
1822
1823    if (haveMessages && !msgs->outMessagesBatchedAt) /* fresh batch */
1824    {
1825        dbgmsg (msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length (msgs->outMessages));
1826        msgs->outMessagesBatchedAt = now;
1827    }
1828    else if (haveMessages && ((now - msgs->outMessagesBatchedAt) >= msgs->outMessagesBatchPeriod))
1829    {
1830        const size_t len = evbuffer_get_length (msgs->outMessages);
1831        /* flush the protocol messages */
1832        dbgmsg (msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len);
1833        tr_peerIoWriteBuf (msgs->io, msgs->outMessages, false);
1834        msgs->clientSentAnythingAt = now;
1835        msgs->outMessagesBatchedAt = 0;
1836        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1837        bytesWritten +=  len;
1838    }
1839
1840    /**
1841    ***  Metadata Pieces
1842    **/
1843
1844    if ((tr_peerIoGetWriteBufferSpace (msgs->io, now) >= METADATA_PIECE_SIZE)
1845        && popNextMetadataRequest (msgs, &piece))
1846    {
1847        char * data;
1848        int dataLen;
1849        bool ok = false;
1850
1851        data = tr_torrentGetMetadataPiece (msgs->torrent, piece, &dataLen);
1852        if ((dataLen > 0) && (data != NULL))
1853        {
1854            tr_variant tmp;
1855            struct evbuffer * payload;
1856            struct evbuffer * out = msgs->outMessages;
1857
1858            /* build the data message */
1859            tr_variantInitDict (&tmp, 3);
1860            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_DATA);
1861            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1862            tr_variantDictAddInt (&tmp, TR_KEY_total_size, msgs->torrent->infoDictLength);
1863            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1864
1865            /* write it out as a LTEP message to our outMessages buffer */
1866            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload) + dataLen);
1867            evbuffer_add_uint8 (out, BT_LTEP);
1868            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1869            evbuffer_add_buffer (out, payload);
1870            evbuffer_add     (out, data, dataLen);
1871            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1872            dbgOutMessageLen (msgs);
1873
1874            evbuffer_free (payload);
1875            tr_variantFree (&tmp);
1876            tr_free (data);
1877
1878            ok = true;
1879        }
1880
1881        if (!ok) /* send a rejection message */
1882        {
1883            tr_variant tmp;
1884            struct evbuffer * payload;
1885            struct evbuffer * out = msgs->outMessages;
1886
1887            /* build the rejection message */
1888            tr_variantInitDict (&tmp, 2);
1889            tr_variantDictAddInt (&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
1890            tr_variantDictAddInt (&tmp, TR_KEY_piece, piece);
1891            payload = tr_variantToBuf (&tmp, TR_VARIANT_FMT_BENC);
1892
1893            /* write it out as a LTEP message to our outMessages buffer */
1894            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
1895            evbuffer_add_uint8 (out, BT_LTEP);
1896            evbuffer_add_uint8 (out, msgs->ut_metadata_id);
1897            evbuffer_add_buffer (out, payload);
1898            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
1899            dbgOutMessageLen (msgs);
1900
1901            evbuffer_free (payload);
1902            tr_variantFree (&tmp);
1903        }
1904    }
1905
1906    /**
1907    ***  Data Blocks
1908    **/
1909
1910    if ((tr_peerIoGetWriteBufferSpace (msgs->io, now) >= msgs->torrent->blockSize)
1911        && popNextRequest (msgs, &req))
1912    {
1913        --msgs->prefetchCount;
1914
1915        if (requestIsValid (msgs, &req)
1916            && tr_cpPieceIsComplete (&msgs->torrent->completion, req.index))
1917        {
1918            int err;
1919            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1920            struct evbuffer * out;
1921            struct evbuffer_iovec iovec[1];
1922
1923            out = evbuffer_new ();
1924            evbuffer_expand (out, msglen);
1925
1926            evbuffer_add_uint32 (out, sizeof (uint8_t) + 2 * sizeof (uint32_t) + req.length);
1927            evbuffer_add_uint8 (out, BT_PIECE);
1928            evbuffer_add_uint32 (out, req.index);
1929            evbuffer_add_uint32 (out, req.offset);
1930
1931            evbuffer_reserve_space (out, req.length, iovec, 1);
1932            err = tr_cacheReadBlock (getSession (msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base);
1933            iovec[0].iov_len = req.length;
1934            evbuffer_commit_space (out, iovec, 1);
1935
1936            /* check the piece if it needs checking... */
1937            if (!err && tr_torrentPieceNeedsCheck (msgs->torrent, req.index))
1938                if ((err = !tr_torrentCheckPiece (msgs->torrent, req.index)))
1939                    tr_torrentSetLocalError (msgs->torrent, _("Please Verify Local Data! Piece #%zu is corrupt."), (size_t)req.index);
1940
1941            if (err)
1942            {
1943                if (fext)
1944                    protocolSendReject (msgs, &req);
1945            }
1946            else
1947            {
1948                const size_t n = evbuffer_get_length (out);
1949                dbgmsg (msgs, "sending block %u:%u->%u", req.index, req.offset, req.length);
1950                assert (n == msglen);
1951                tr_peerIoWriteBuf (msgs->io, out, true);
1952                bytesWritten += n;
1953                msgs->clientSentAnythingAt = now;
1954                tr_historyAdd (&msgs->peer.blocksSentToPeer, tr_time (), 1);
1955            }
1956
1957            evbuffer_free (out);
1958
1959            if (err)
1960            {
1961                bytesWritten = 0;
1962                msgs = NULL;
1963            }
1964        }
1965        else if (fext) /* peer needs a reject message */
1966        {
1967            protocolSendReject (msgs, &req);
1968        }
1969
1970        if (msgs != NULL)
1971            prefetchPieces (msgs);
1972    }
1973
1974    /**
1975    ***  Keepalive
1976    **/
1977
1978    if ((msgs != NULL)
1979        && (msgs->clientSentAnythingAt != 0)
1980        && ((now - msgs->clientSentAnythingAt) > KEEPALIVE_INTERVAL_SECS))
1981    {
1982        dbgmsg (msgs, "sending a keepalive message");
1983        evbuffer_add_uint32 (msgs->outMessages, 0);
1984        pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
1985    }
1986
1987    return bytesWritten;
1988}
1989
1990static int
1991peerPulse (void * vmsgs)
1992{
1993    tr_peerMsgs * msgs = vmsgs;
1994    const time_t  now = tr_time ();
1995
1996    if (tr_isPeerIo (msgs->io)) {
1997        updateDesiredRequestCount (msgs);
1998        updateBlockRequests (msgs);
1999        updateMetadataRequests (msgs, now);
2000    }
2001
2002    for (;;)
2003        if (fillOutputBuffer (msgs, now) < 1)
2004            break;
2005
2006    return true; /* loop forever */
2007}
2008
2009void
2010tr_peerMsgsPulse (tr_peerMsgs * msgs)
2011{
2012    if (msgs != NULL)
2013        peerPulse (msgs);
2014}
2015
2016static void
2017gotError (tr_peerIo * io UNUSED, short what, void * vmsgs)
2018{
2019    if (what & BEV_EVENT_TIMEOUT)
2020        dbgmsg (vmsgs, "libevent got a timeout, what=%hd", what);
2021    if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR))
2022        dbgmsg (vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2023               what, errno, tr_strerror (errno));
2024    fireError (vmsgs, ENOTCONN);
2025}
2026
2027static void
2028sendBitfield (tr_peerMsgs * msgs)
2029{
2030    void * bytes;
2031    size_t byte_count = 0;
2032    struct evbuffer * out = msgs->outMessages;
2033
2034    assert (tr_torrentHasMetadata (msgs->torrent));
2035
2036    bytes = tr_cpCreatePieceBitfield (&msgs->torrent->completion, &byte_count);
2037    evbuffer_add_uint32 (out, sizeof (uint8_t) + byte_count);
2038    evbuffer_add_uint8 (out, BT_BITFIELD);
2039    evbuffer_add     (out, bytes, byte_count);
2040    dbgmsg (msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length (out));
2041    pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2042
2043    tr_free (bytes);
2044}
2045
2046static void
2047tellPeerWhatWeHave (tr_peerMsgs * msgs)
2048{
2049    const bool fext = tr_peerIoSupportsFEXT (msgs->io);
2050
2051    if (fext && tr_cpHasAll (&msgs->torrent->completion))
2052    {
2053        protocolSendHaveAll (msgs);
2054    }
2055    else if (fext && tr_cpHasNone (&msgs->torrent->completion))
2056    {
2057        protocolSendHaveNone (msgs);
2058    }
2059    else if (!tr_cpHasNone (&msgs->torrent->completion))
2060    {
2061        sendBitfield (msgs);
2062    }
2063}
2064
2065/**
2066***
2067**/
2068
2069/* some peers give us error messages if we send
2070   more than this many peers in a single pex message
2071   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2072#define MAX_PEX_ADDED 50
2073#define MAX_PEX_DROPPED 50
2074
2075typedef struct
2076{
2077    tr_pex *  added;
2078    tr_pex *  dropped;
2079    tr_pex *  elements;
2080    int       addedCount;
2081    int       droppedCount;
2082    int       elementCount;
2083}
2084PexDiffs;
2085
2086static void
2087pexAddedCb (void * vpex, void * userData)
2088{
2089    PexDiffs * diffs = userData;
2090    tr_pex *   pex = vpex;
2091
2092    if (diffs->addedCount < MAX_PEX_ADDED)
2093    {
2094        diffs->added[diffs->addedCount++] = *pex;
2095        diffs->elements[diffs->elementCount++] = *pex;
2096    }
2097}
2098
2099static inline void
2100pexDroppedCb (void * vpex, void * userData)
2101{
2102    PexDiffs * diffs = userData;
2103    tr_pex *   pex = vpex;
2104
2105    if (diffs->droppedCount < MAX_PEX_DROPPED)
2106    {
2107        diffs->dropped[diffs->droppedCount++] = *pex;
2108    }
2109}
2110
2111static inline void
2112pexElementCb (void * vpex, void * userData)
2113{
2114    PexDiffs * diffs = userData;
2115    tr_pex * pex = vpex;
2116
2117    diffs->elements[diffs->elementCount++] = *pex;
2118}
2119
2120typedef void (tr_set_func)(void * element, void * userData);
2121
2122/**
2123 * @brief find the differences and commonalities in two sorted sets
2124 * @param a the first set
2125 * @param aCount the number of elements in the set 'a'
2126 * @param b the second set
2127 * @param bCount the number of elements in the set 'b'
2128 * @param compare the sorting method for both sets
2129 * @param elementSize the sizeof the element in the two sorted sets
2130 * @param in_a called for items in set 'a' but not set 'b'
2131 * @param in_b called for items in set 'b' but not set 'a'
2132 * @param in_both called for items that are in both sets
2133 * @param userData user data passed along to in_a, in_b, and in_both
2134 */
2135static void
2136tr_set_compare (const void * va, size_t aCount,
2137                const void * vb, size_t bCount,
2138                int compare (const void * a, const void * b),
2139                size_t elementSize,
2140                tr_set_func in_a_cb,
2141                tr_set_func in_b_cb,
2142                tr_set_func in_both_cb,
2143                void * userData)
2144{
2145    const uint8_t * a = va;
2146    const uint8_t * b = vb;
2147    const uint8_t * aend = a + elementSize * aCount;
2148    const uint8_t * bend = b + elementSize * bCount;
2149
2150    while (a != aend || b != bend)
2151    {
2152        if (a == aend)
2153        {
2154          (*in_b_cb)((void*)b, userData);
2155            b += elementSize;
2156        }
2157        else if (b == bend)
2158        {
2159          (*in_a_cb)((void*)a, userData);
2160            a += elementSize;
2161        }
2162        else
2163        {
2164            const int val = (*compare)(a, b);
2165
2166            if (!val)
2167            {
2168              (*in_both_cb)((void*)a, userData);
2169                a += elementSize;
2170                b += elementSize;
2171            }
2172            else if (val < 0)
2173            {
2174              (*in_a_cb)((void*)a, userData);
2175                a += elementSize;
2176            }
2177            else if (val > 0)
2178            {
2179              (*in_b_cb)((void*)b, userData);
2180                b += elementSize;
2181            }
2182        }
2183    }
2184}
2185
2186
2187static void
2188sendPex (tr_peerMsgs * msgs)
2189{
2190    if (msgs->peerSupportsPex && tr_torrentAllowsPex (msgs->torrent))
2191    {
2192        PexDiffs diffs;
2193        PexDiffs diffs6;
2194        tr_pex * newPex = NULL;
2195        tr_pex * newPex6 = NULL;
2196        const int newCount = tr_peerMgrGetPeers (msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2197        const int newCount6 = tr_peerMgrGetPeers (msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2198
2199        /* build the diffs */
2200        diffs.added = tr_new (tr_pex, newCount);
2201        diffs.addedCount = 0;
2202        diffs.dropped = tr_new (tr_pex, msgs->pexCount);
2203        diffs.droppedCount = 0;
2204        diffs.elements = tr_new (tr_pex, newCount + msgs->pexCount);
2205        diffs.elementCount = 0;
2206        tr_set_compare (msgs->pex, msgs->pexCount,
2207                        newPex, newCount,
2208                        tr_pexCompare, sizeof (tr_pex),
2209                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs);
2210        diffs6.added = tr_new (tr_pex, newCount6);
2211        diffs6.addedCount = 0;
2212        diffs6.dropped = tr_new (tr_pex, msgs->pexCount6);
2213        diffs6.droppedCount = 0;
2214        diffs6.elements = tr_new (tr_pex, newCount6 + msgs->pexCount6);
2215        diffs6.elementCount = 0;
2216        tr_set_compare (msgs->pex6, msgs->pexCount6,
2217                        newPex6, newCount6,
2218                        tr_pexCompare, sizeof (tr_pex),
2219                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6);
2220        dbgmsg (
2221            msgs,
2222            "pex: old peer count %d+%d, new peer count %d+%d, "
2223            "added %d+%d, removed %d+%d",
2224            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2225            diffs.addedCount, diffs6.addedCount,
2226            diffs.droppedCount, diffs6.droppedCount);
2227
2228        if (!diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2229            !diffs6.droppedCount)
2230        {
2231            tr_free (diffs.elements);
2232            tr_free (diffs6.elements);
2233        }
2234        else
2235        {
2236            int  i;
2237            tr_variant val;
2238            uint8_t * tmp, *walk;
2239            struct evbuffer * payload;
2240            struct evbuffer * out = msgs->outMessages;
2241
2242            /* update peer */
2243            tr_free (msgs->pex);
2244            msgs->pex = diffs.elements;
2245            msgs->pexCount = diffs.elementCount;
2246            tr_free (msgs->pex6);
2247            msgs->pex6 = diffs6.elements;
2248            msgs->pexCount6 = diffs6.elementCount;
2249
2250            /* build the pex payload */
2251            tr_variantInitDict (&val, 3); /* ipv6 support: left as 3:
2252                                         * speed vs. likelihood? */
2253
2254            if (diffs.addedCount > 0)
2255            {
2256                /* "added" */
2257                tmp = walk = tr_new (uint8_t, diffs.addedCount * 6);
2258                for (i = 0; i < diffs.addedCount; ++i) {
2259                    memcpy (walk, &diffs.added[i].addr.addr, 4); walk += 4;
2260                    memcpy (walk, &diffs.added[i].port, 2); walk += 2;
2261                }
2262                assert ((walk - tmp) == diffs.addedCount * 6);
2263                tr_variantDictAddRaw (&val, TR_KEY_added, tmp, walk - tmp);
2264                tr_free (tmp);
2265
2266                /* "added.f"
2267                 * unset each holepunch flag because we don't support it. */
2268                tmp = walk = tr_new (uint8_t, diffs.addedCount);
2269                for (i = 0; i < diffs.addedCount; ++i)
2270                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2271                assert ((walk - tmp) == diffs.addedCount);
2272                tr_variantDictAddRaw (&val, TR_KEY_added_f, tmp, walk - tmp);
2273                tr_free (tmp);
2274            }
2275
2276            if (diffs.droppedCount > 0)
2277            {
2278                /* "dropped" */
2279                tmp = walk = tr_new (uint8_t, diffs.droppedCount * 6);
2280                for (i = 0; i < diffs.droppedCount; ++i) {
2281                    memcpy (walk, &diffs.dropped[i].addr.addr, 4); walk += 4;
2282                    memcpy (walk, &diffs.dropped[i].port, 2); walk += 2;
2283                }
2284                assert ((walk - tmp) == diffs.droppedCount * 6);
2285                tr_variantDictAddRaw (&val, TR_KEY_dropped, tmp, walk - tmp);
2286                tr_free (tmp);
2287            }
2288
2289            if (diffs6.addedCount > 0)
2290            {
2291                /* "added6" */
2292                tmp = walk = tr_new (uint8_t, diffs6.addedCount * 18);
2293                for (i = 0; i < diffs6.addedCount; ++i) {
2294                    memcpy (walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16);
2295                    walk += 16;
2296                    memcpy (walk, &diffs6.added[i].port, 2);
2297                    walk += 2;
2298                }
2299                assert ((walk - tmp) == diffs6.addedCount * 18);
2300                tr_variantDictAddRaw (&val, TR_KEY_added6, tmp, walk - tmp);
2301                tr_free (tmp);
2302
2303                /* "added6.f"
2304                 * unset each holepunch flag because we don't support it. */
2305                tmp = walk = tr_new (uint8_t, diffs6.addedCount);
2306                for (i = 0; i < diffs6.addedCount; ++i)
2307                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2308                assert ((walk - tmp) == diffs6.addedCount);
2309                tr_variantDictAddRaw (&val, TR_KEY_added6_f, tmp, walk - tmp);
2310                tr_free (tmp);
2311            }
2312
2313            if (diffs6.droppedCount > 0)
2314            {
2315                /* "dropped6" */
2316                tmp = walk = tr_new (uint8_t, diffs6.droppedCount * 18);
2317                for (i = 0; i < diffs6.droppedCount; ++i) {
2318                    memcpy (walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16);
2319                    walk += 16;
2320                    memcpy (walk, &diffs6.dropped[i].port, 2);
2321                    walk += 2;
2322                }
2323                assert ((walk - tmp) == diffs6.droppedCount * 18);
2324                tr_variantDictAddRaw (&val, TR_KEY_dropped6, tmp, walk - tmp);
2325                tr_free (tmp);
2326            }
2327
2328            /* write the pex message */
2329            payload = tr_variantToBuf (&val, TR_VARIANT_FMT_BENC);
2330            evbuffer_add_uint32 (out, 2 * sizeof (uint8_t) + evbuffer_get_length (payload));
2331            evbuffer_add_uint8 (out, BT_LTEP);
2332            evbuffer_add_uint8 (out, msgs->ut_pex_id);
2333            evbuffer_add_buffer (out, payload);
2334            pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS);
2335            dbgmsg (msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length (out));
2336            dbgOutMessageLen (msgs);
2337
2338            evbuffer_free (payload);
2339            tr_variantFree (&val);
2340        }
2341
2342        /* cleanup */
2343        tr_free (diffs.added);
2344        tr_free (diffs.dropped);
2345        tr_free (newPex);
2346        tr_free (diffs6.added);
2347        tr_free (diffs6.dropped);
2348        tr_free (newPex6);
2349
2350        /*msgs->clientSentPexAt = tr_time ();*/
2351    }
2352}
2353
2354static void
2355pexPulse (int foo UNUSED, short bar UNUSED, void * vmsgs)
2356{
2357    struct tr_peerMsgs * msgs = vmsgs;
2358
2359    sendPex (msgs);
2360
2361    assert (msgs->pexTimer != NULL);
2362    tr_timerAdd (msgs->pexTimer, PEX_INTERVAL_SECS, 0);
2363}
2364
2365/***
2366****  tr_peer virtual functions
2367***/
2368
2369static bool
2370peermsgs_is_transferring_pieces (const struct tr_peer * peer,
2371                                 uint64_t               now,
2372                                 tr_direction           direction, 
2373                                 unsigned int         * setme_Bps)
2374{
2375  unsigned int Bps = 0;
2376
2377  if (tr_isPeerMsgs (peer))
2378    {
2379      const tr_peerMsgs * msgs = (const tr_peerMsgs *) peer;
2380      Bps = tr_peerIoGetPieceSpeed_Bps (msgs->io, now, direction);
2381    }
2382
2383  if (setme_Bps != NULL)
2384    *setme_Bps = Bps;
2385
2386  return Bps > 0;
2387}
2388
2389static void
2390peermsgs_destruct (tr_peer * peer)
2391{
2392  tr_peerMsgs * msgs = PEER_MSGS (peer);
2393
2394  assert (msgs != NULL);
2395
2396  if (msgs->pexTimer != NULL)
2397    event_free (msgs->pexTimer);
2398
2399  if (msgs->incoming.block != NULL)
2400    evbuffer_free (msgs->incoming.block);
2401
2402  if (msgs->io)
2403    {
2404      tr_peerIoClear (msgs->io);
2405      tr_peerIoUnref (msgs->io); /* balanced by the ref in handshakeDoneCB () */
2406    }
2407
2408  evbuffer_free (msgs->outMessages);
2409  tr_free (msgs->pex6);
2410  tr_free (msgs->pex);
2411
2412  tr_peerDestruct (&msgs->peer);
2413
2414  memset (msgs, ~0, sizeof (tr_peerMsgs));
2415}
2416
2417static const struct tr_peer_virtual_funcs my_funcs =
2418{
2419  .destruct = peermsgs_destruct,
2420  .is_transferring_pieces = peermsgs_is_transferring_pieces
2421};
2422
2423/***
2424****
2425***/
2426
2427time_t
2428tr_peerMsgsGetConnectionAge (const tr_peerMsgs * msgs)
2429{
2430  assert (tr_isPeerMsgs (msgs));
2431
2432  return tr_peerIoGetAge (msgs->io);
2433}
2434
2435bool
2436tr_peerMsgsIsPeerChoked (const tr_peerMsgs * msgs)
2437{
2438  assert (tr_isPeerMsgs (msgs));
2439
2440  return msgs->peer_is_choked;
2441}
2442
2443bool
2444tr_peerMsgsIsPeerInterested (const tr_peerMsgs * msgs)
2445{
2446  assert (tr_isPeerMsgs (msgs));
2447
2448  return msgs->peer_is_interested;
2449}
2450
2451bool
2452tr_peerMsgsIsClientChoked (const tr_peerMsgs * msgs)
2453{
2454  assert (tr_isPeerMsgs (msgs));
2455
2456  return msgs->client_is_choked;
2457}
2458
2459bool
2460tr_peerMsgsIsClientInterested (const tr_peerMsgs * msgs)
2461{
2462  assert (tr_isPeerMsgs (msgs));
2463
2464  return msgs->client_is_interested;
2465}
2466
2467bool
2468tr_peerMsgsIsUtpConnection (const tr_peerMsgs * msgs)
2469{
2470  assert (tr_isPeerMsgs (msgs));
2471
2472  return msgs->io->utp_socket != NULL;
2473}
2474
2475bool
2476tr_peerMsgsIsEncrypted (const tr_peerMsgs * msgs)
2477{
2478  assert (tr_isPeerMsgs (msgs));
2479
2480  return tr_peerIoIsEncrypted (msgs->io);
2481}
2482
2483bool
2484tr_peerMsgsIsIncomingConnection (const tr_peerMsgs * msgs)
2485{
2486  assert (tr_isPeerMsgs (msgs));
2487
2488  return tr_peerIoIsIncoming (msgs->io);
2489}
2490
2491/***
2492****
2493***/
2494
2495bool
2496tr_isPeerMsgs (const void * msgs)
2497{
2498  /* FIXME: this is pretty crude */
2499  return (msgs != NULL)
2500      && (((struct tr_peerMsgs*)msgs)->magic_number == MAGIC_NUMBER);
2501}
2502
2503tr_peerMsgs *
2504tr_peerMsgsCast (void * vm)
2505{
2506  return tr_isPeerMsgs(vm) ? vm : NULL;
2507}
2508
2509tr_peerMsgs *
2510tr_peerMsgsNew (struct tr_torrent    * torrent,
2511                struct tr_peerIo     * io,
2512                tr_peer_callback     * callback,
2513                void                 * callbackData)
2514{
2515  tr_peerMsgs * m;
2516
2517  assert (io != NULL);
2518
2519  m = tr_new0 (tr_peerMsgs, 1);
2520
2521  tr_peerConstruct (&m->peer, torrent);
2522  m->peer.funcs = &my_funcs;
2523
2524  m->magic_number = MAGIC_NUMBER;
2525  m->client_is_choked = true;
2526  m->peer_is_choked = true;
2527  m->client_is_interested = false;
2528  m->peer_is_interested = false;
2529  m->callback = callback;
2530  m->callbackData = callbackData;
2531  m->io = io;
2532  m->torrent = torrent;
2533  m->state = AWAITING_BT_LENGTH;
2534  m->outMessages = evbuffer_new ();
2535  m->outMessagesBatchedAt = 0;
2536  m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2537
2538  if (tr_torrentAllowsPex (torrent))
2539    {
2540      m->pexTimer = evtimer_new (torrent->session->event_base, pexPulse, m);
2541      tr_timerAdd (m->pexTimer, PEX_INTERVAL_SECS, 0);
2542    }
2543
2544  if (tr_peerIoSupportsUTP (m->io))
2545    {
2546      const tr_address * addr = tr_peerIoGetAddress (m->io, NULL);
2547      tr_peerMgrSetUtpSupported (torrent, addr);
2548      tr_peerMgrSetUtpFailed (torrent, addr, false);
2549    }
2550
2551  if (tr_peerIoSupportsLTEP (m->io))
2552    sendLtepHandshake (m);
2553
2554  tellPeerWhatWeHave (m);
2555
2556  if (tr_dhtEnabled (torrent->session) && tr_peerIoSupportsDHT (m->io))
2557    {
2558      /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2559      const struct tr_address *addr = tr_peerIoGetAddress (m->io, NULL);
2560      if (addr->type == TR_AF_INET || tr_globalIPv6 ())
2561        protocolSendPort (m, tr_dhtPort (torrent->session));
2562    }
2563
2564  tr_peerIoSetIOFuncs (m->io, canRead, didWrite, gotError, m);
2565  updateDesiredRequestCount (m);
2566
2567  return m;
2568}
Note: See TracBrowser for help on using the repository browser.