source: trunk/libtransmission/peer-io.c

Last change on this file was 14644, checked in by mikedld, 5 years ago

Remove useless checks and definitions (C99)

Now that MSVC support for C99 is quite good, remove previously needed but
now unused checks and definitions, like PRI* format macros (including
PRIdMAX and TR_PRIuSIZE, replaced with %jd and %zu) and inline macro.
Also, remove ssize_t typedef and replace few occurences with ev_ssize_t.
Also, remove check for stdbool.h availability (guaranteed by C99) and
include it unconditionally (except when in C++ mode).

  • Property svn:keywords set to Date Rev Author Id
File size: 34.8 KB
Line 
1/*
2 * This file Copyright (C) 2007-2014 Mnemosyne LLC
3 *
4 * It may be used under the GNU GPL versions 2 or 3
5 * or any future license endorsed by Mnemosyne LLC.
6 *
7 * $Id: peer-io.c 14644 2015-12-29 19:37:31Z mikedld $
8 */
9
10#include <assert.h>
11#include <errno.h>
12#include <string.h>
13
14#include <event2/event.h>
15#include <event2/buffer.h>
16#include <event2/bufferevent.h>
17
18#include <libutp/utp.h>
19
20#include "transmission.h"
21#include "session.h"
22#include "bandwidth.h"
23#include "log.h"
24#include "net.h"
25#include "peer-common.h" /* MAX_BLOCK_SIZE */
26#include "peer-io.h"
27#include "trevent.h" /* tr_runInEventThread () */
28#include "tr-utp.h"
29#include "utils.h"
30
31
32#ifdef _WIN32
33 #undef  EAGAIN
34 #define EAGAIN       WSAEWOULDBLOCK
35 #undef  EINTR
36 #define EINTR        WSAEINTR
37 #undef  EINPROGRESS
38 #define EINPROGRESS  WSAEINPROGRESS
39 #undef  EPIPE
40 #define EPIPE        WSAECONNRESET
41#endif
42
43/* The amount of read bufferring that we allow for uTP sockets. */
44
45#define UTP_READ_BUFFER_SIZE (256 * 1024)
46
47static size_t
48guessPacketOverhead (size_t d)
49{
50    /**
51     * http://sd.wareonearth.com/~phil/net/overhead/
52     *
53     * TCP over Ethernet:
54     * Assuming no header compression (e.g. not PPP)
55     * Add 20 IPv4 header or 40 IPv6 header (no options)
56     * Add 20 TCP header
57     * Add 12 bytes optional TCP timestamps
58     * Max TCP Payload data rates over ethernet are thus:
59     * (1500-40)/ (38+1500) = 94.9285 %  IPv4, minimal headers
60     * (1500-52)/ (38+1500) = 94.1482 %  IPv4, TCP timestamps
61     * (1500-52)/ (42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
62     * (1500-60)/ (38+1500) = 93.6281 %  IPv6, minimal headers
63     * (1500-72)/ (38+1500) = 92.8479 %  IPv6, TCP timestamps
64     * (1500-72)/ (42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
65     */
66    const double assumed_payload_data_rate = 94.0;
67
68    return (unsigned int)(d * (100.0 / assumed_payload_data_rate) - d);
69}
70
71/**
72***
73**/
74
75#define dbgmsg(io, ...) \
76  do \
77    { \
78      if (tr_logGetDeepEnabled ()) \
79        tr_logAddDeep (__FILE__, __LINE__, tr_peerIoGetAddrStr (io), __VA_ARGS__); \
80    } \
81  while (0)
82
83/**
84***
85**/
86
87struct tr_datatype
88{
89    struct tr_datatype * next;
90    size_t length;
91    bool isPieceData;
92};
93
94static struct tr_datatype * datatype_pool = NULL;
95
96static const struct tr_datatype TR_DATATYPE_INIT = { NULL, 0, false };
97
98static struct tr_datatype *
99datatype_new (void)
100{
101    struct tr_datatype * ret;
102
103    if (datatype_pool == NULL)
104        ret = tr_new (struct tr_datatype, 1);
105    else {
106        ret = datatype_pool;
107        datatype_pool = datatype_pool->next;
108    }
109
110    *ret = TR_DATATYPE_INIT;
111    return ret;
112}
113
114static void
115datatype_free (struct tr_datatype * datatype)
116{
117    datatype->next = datatype_pool;
118    datatype_pool = datatype;
119}
120
121static void
122peer_io_pull_datatype (tr_peerIo * io)
123{
124    struct tr_datatype * tmp;
125
126    if ((tmp = io->outbuf_datatypes))
127    {
128        io->outbuf_datatypes = tmp->next;
129        datatype_free (tmp);
130    }
131}
132
133static void
134peer_io_push_datatype (tr_peerIo * io, struct tr_datatype * datatype)
135{
136    struct tr_datatype * tmp;
137
138    if ((tmp = io->outbuf_datatypes)) {
139        while (tmp->next != NULL)
140            tmp = tmp->next;
141        tmp->next = datatype;
142    } else {
143        io->outbuf_datatypes = datatype;
144    }
145}
146
147/***
148****
149***/
150
151static void
152didWriteWrapper (tr_peerIo * io, unsigned int bytes_transferred)
153{
154     while (bytes_transferred && tr_isPeerIo (io))
155     {
156        struct tr_datatype * next = io->outbuf_datatypes;
157
158        const unsigned int payload = MIN (next->length, bytes_transferred);
159        /* For uTP sockets, the overhead is computed in utp_on_overhead. */
160        const unsigned int overhead =
161            io->socket != TR_BAD_SOCKET ? guessPacketOverhead (payload) : 0;
162        const uint64_t now = tr_time_msec ();
163
164        tr_bandwidthUsed (&io->bandwidth, TR_UP, payload, next->isPieceData, now);
165
166        if (overhead > 0)
167            tr_bandwidthUsed (&io->bandwidth, TR_UP, overhead, false, now);
168
169        if (io->didWrite)
170            io->didWrite (io, payload, next->isPieceData, io->userData);
171
172        if (tr_isPeerIo (io))
173        {
174            bytes_transferred -= payload;
175            next->length -= payload;
176            if (!next->length)
177                peer_io_pull_datatype (io);
178        }
179    }
180}
181
182static void
183canReadWrapper (tr_peerIo * io)
184{
185    bool err = false;
186    bool done = false;
187    tr_session * session;
188
189    dbgmsg (io, "canRead");
190
191    tr_peerIoRef (io);
192
193    session = io->session;
194
195    /* try to consume the input buffer */
196    if (io->canRead)
197    {
198        const uint64_t now = tr_time_msec ();
199
200        tr_sessionLock (session);
201
202        while (!done && !err)
203        {
204            size_t piece = 0;
205            const size_t oldLen = evbuffer_get_length (io->inbuf);
206            const int ret = io->canRead (io, io->userData, &piece);
207            const size_t used = oldLen - evbuffer_get_length (io->inbuf);
208            const unsigned int overhead = guessPacketOverhead (used);
209
210            if (piece || (piece!=used))
211            {
212                if (piece)
213                    tr_bandwidthUsed (&io->bandwidth, TR_DOWN, piece, true, now);
214
215                if (used != piece)
216                    tr_bandwidthUsed (&io->bandwidth, TR_DOWN, used - piece, false, now);
217            }
218
219            if (overhead > 0)
220                tr_bandwidthUsed (&io->bandwidth, TR_UP, overhead, false, now);
221
222            switch (ret)
223            {
224                case READ_NOW:
225                    if (evbuffer_get_length (io->inbuf))
226                        continue;
227                    done = true;
228                    break;
229
230                case READ_LATER:
231                    done = true;
232                    break;
233
234                case READ_ERR:
235                    err = true;
236                    break;
237            }
238
239            assert (tr_isPeerIo (io));
240        }
241
242        tr_sessionUnlock (session);
243    }
244
245    tr_peerIoUnref (io);
246}
247
248static void
249event_read_cb (evutil_socket_t fd, short event UNUSED, void * vio)
250{
251    int res;
252    int e;
253    tr_peerIo * io = vio;
254
255    /* Limit the input buffer to 256K, so it doesn't grow too large */
256    unsigned int howmuch;
257    unsigned int curlen;
258    const tr_direction dir = TR_DOWN;
259    const unsigned int max = 256 * 1024;
260
261    assert (tr_isPeerIo (io));
262    assert (io->socket != TR_BAD_SOCKET);
263
264    io->pendingEvents &= ~EV_READ;
265
266    curlen = evbuffer_get_length (io->inbuf);
267    howmuch = curlen >= max ? 0 : max - curlen;
268    howmuch = tr_bandwidthClamp (&io->bandwidth, TR_DOWN, howmuch);
269
270    dbgmsg (io, "libevent says this peer is ready to read");
271
272    /* if we don't have any bandwidth left, stop reading */
273    if (howmuch < 1) {
274        tr_peerIoSetEnabled (io, dir, false);
275        return;
276    }
277
278    EVUTIL_SET_SOCKET_ERROR (0);
279    res = evbuffer_read (io->inbuf, fd, (int)howmuch);
280    e = EVUTIL_SOCKET_ERROR ();
281
282    if (res > 0)
283    {
284        tr_peerIoSetEnabled (io, dir, true);
285
286        /* Invoke the user callback - must always be called last */
287        canReadWrapper (io);
288    }
289    else
290    {
291        char errstr[512];
292        short what = BEV_EVENT_READING;
293
294        if (res == 0) /* EOF */
295            what |= BEV_EVENT_EOF;
296        else if (res == -1) {
297            if (e == EAGAIN || e == EINTR) {
298                tr_peerIoSetEnabled (io, dir, true);
299                return;
300            }
301            what |= BEV_EVENT_ERROR;
302        }
303
304        dbgmsg (io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)",
305                res, what, e, tr_net_strerror (errstr, sizeof (errstr), e));
306
307        if (io->gotError != NULL)
308            io->gotError (io, what, io->userData);
309    }
310}
311
312static int
313tr_evbuffer_write (tr_peerIo * io, int fd, size_t howmuch)
314{
315    int e;
316    int n;
317    char errstr[256];
318
319    EVUTIL_SET_SOCKET_ERROR (0);
320    n = evbuffer_write_atmost (io->outbuf, fd, howmuch);
321    e = EVUTIL_SOCKET_ERROR ();
322    dbgmsg (io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror (errstr,sizeof (errstr),e):""));
323
324    return n;
325}
326
327static void
328event_write_cb (evutil_socket_t fd, short event UNUSED, void * vio)
329{
330    int res = 0;
331    int e;
332    short what = BEV_EVENT_WRITING;
333    tr_peerIo * io = vio;
334    size_t howmuch;
335    const tr_direction dir = TR_UP;
336    char errstr[1024];
337
338    assert (tr_isPeerIo (io));
339    assert (io->socket != TR_BAD_SOCKET);
340
341    io->pendingEvents &= ~EV_WRITE;
342
343    dbgmsg (io, "libevent says this peer is ready to write");
344
345    /* Write as much as possible, since the socket is non-blocking, write () will
346     * return if it can't write any more data without blocking */
347    howmuch = tr_bandwidthClamp (&io->bandwidth, dir, evbuffer_get_length (io->outbuf));
348
349    /* if we don't have any bandwidth left, stop writing */
350    if (howmuch < 1) {
351        tr_peerIoSetEnabled (io, dir, false);
352        return;
353    }
354
355    EVUTIL_SET_SOCKET_ERROR (0);
356    res = tr_evbuffer_write (io, fd, howmuch);
357    e = EVUTIL_SOCKET_ERROR ();
358
359    if (res == -1) {
360        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
361            goto reschedule;
362        /* error case */
363        what |= BEV_EVENT_ERROR;
364    } else if (res == 0) {
365        /* eof case */
366        what |= BEV_EVENT_EOF;
367    }
368    if (res <= 0)
369        goto error;
370
371    if (evbuffer_get_length (io->outbuf))
372        tr_peerIoSetEnabled (io, dir, true);
373
374    didWriteWrapper (io, res);
375    return;
376
377 reschedule:
378    if (evbuffer_get_length (io->outbuf))
379        tr_peerIoSetEnabled (io, dir, true);
380    return;
381
382 error:
383
384    tr_net_strerror (errstr, sizeof (errstr), e);
385    dbgmsg (io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr);
386
387    if (io->gotError != NULL)
388        io->gotError (io, what, io->userData);
389}
390
391/**
392***
393**/
394
395static void
396maybeSetCongestionAlgorithm (tr_socket_t   socket,
397                             const char  * algorithm)
398{
399    if (algorithm && *algorithm)
400        tr_netSetCongestionControl (socket, algorithm);
401}
402
403#ifdef WITH_UTP
404/* UTP callbacks */
405
406static void
407utp_on_read (void *closure, const unsigned char *buf, size_t buflen)
408{
409    int rc;
410    tr_peerIo *io = closure;
411    assert (tr_isPeerIo (io));
412
413    rc = evbuffer_add (io->inbuf, buf, buflen);
414    dbgmsg (io, "utp_on_read got %zu bytes", buflen);
415
416    if (rc < 0) {
417        tr_logAddNamedError ("UTP", "On read evbuffer_add");
418        return;
419    }
420
421    tr_peerIoSetEnabled (io, TR_DOWN, true);
422    canReadWrapper (io);
423}
424
425static void
426utp_on_write (void *closure, unsigned char *buf, size_t buflen)
427{
428    int rc;
429    tr_peerIo *io = closure;
430    assert (tr_isPeerIo (io));
431
432    rc = evbuffer_remove (io->outbuf, buf, buflen);
433    dbgmsg (io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc);
434    assert (rc == (int)buflen); /* if this fails, we've corrupted our bookkeeping somewhere */
435    if (rc < (long)buflen) {
436        tr_logAddNamedError ("UTP", "Short write: %d < %ld", rc, (long)buflen);
437    }
438
439    didWriteWrapper (io, buflen);
440}
441
442static size_t
443utp_get_rb_size (void *closure)
444{
445    size_t bytes;
446    tr_peerIo *io = closure;
447    assert (tr_isPeerIo (io));
448
449    bytes = tr_bandwidthClamp (&io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE);
450
451    dbgmsg (io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes);
452    return UTP_READ_BUFFER_SIZE - bytes;
453}
454
455static int tr_peerIoTryWrite (tr_peerIo * io, size_t howmuch);
456
457static void
458utp_on_writable (tr_peerIo *io)
459{
460    int n;
461
462    dbgmsg (io, "libutp says this peer is ready to write");
463
464    n = tr_peerIoTryWrite (io, SIZE_MAX);
465    tr_peerIoSetEnabled (io, TR_UP, n && evbuffer_get_length (io->outbuf));
466}
467
468static void
469utp_on_state_change (void *closure, int state)
470{
471    tr_peerIo *io = closure;
472    assert (tr_isPeerIo (io));
473
474    if (state == UTP_STATE_CONNECT) {
475        dbgmsg (io, "utp_on_state_change -- changed to connected");
476        io->utpSupported = true;
477    } else if (state == UTP_STATE_WRITABLE) {
478        dbgmsg (io, "utp_on_state_change -- changed to writable");
479        if (io->pendingEvents & EV_WRITE)
480            utp_on_writable (io);
481    } else if (state == UTP_STATE_EOF) {
482        if (io->gotError)
483            io->gotError (io, BEV_EVENT_EOF, io->userData);
484    } else if (state == UTP_STATE_DESTROYING) {
485        tr_logAddNamedError ("UTP", "Impossible state UTP_STATE_DESTROYING");
486        return;
487    } else {
488        tr_logAddNamedError ("UTP", "Unknown state %d", state);
489    }
490}
491
492static void
493utp_on_error (void *closure, int errcode)
494{
495    tr_peerIo *io = closure;
496    assert (tr_isPeerIo (io));
497
498    dbgmsg (io, "utp_on_error -- errcode is %d", errcode);
499
500    if (io->gotError) {
501        errno = errcode;
502        io->gotError (io, BEV_EVENT_ERROR, io->userData);
503    }
504}
505
506static void
507utp_on_overhead (void *closure, uint8_t send, size_t count, int type UNUSED)
508{
509    tr_peerIo *io = closure;
510    assert (tr_isPeerIo (io));
511
512    dbgmsg (io, "utp_on_overhead -- count is %zu", count);
513
514    tr_bandwidthUsed (&io->bandwidth, send ? TR_UP : TR_DOWN,
515                      count, false, tr_time_msec ());
516}
517
518static struct UTPFunctionTable utp_function_table = {
519    .on_read = utp_on_read,
520    .on_write = utp_on_write,
521    .get_rb_size = utp_get_rb_size,
522    .on_state = utp_on_state_change,
523    .on_error = utp_on_error,
524    .on_overhead = utp_on_overhead
525};
526
527
528/* Dummy UTP callbacks. */
529/* We switch a UTP socket to use these after the associated peerIo has been
530   destroyed -- see io_dtor. */
531
532static void
533dummy_read (void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED)
534{
535    /* This cannot happen, as far as I'm aware. */
536    tr_logAddNamedError ("UTP", "On_read called on closed socket");
537
538}
539
540static void
541dummy_write (void * closure UNUSED, unsigned char *buf, size_t buflen)
542{
543    /* This can very well happen if we've shut down a peer connection that
544       had unflushed buffers.  Complain and send zeroes. */
545    tr_logAddNamedDbg ("UTP", "On_write called on closed socket");
546    memset (buf, 0, buflen);
547}
548
549static size_t
550dummy_get_rb_size (void * closure UNUSED)
551{
552    return 0;
553}
554
555static void
556dummy_on_state_change (void * closure UNUSED, int state UNUSED)
557{
558    return;
559}
560
561static void
562dummy_on_error (void * closure UNUSED, int errcode UNUSED)
563{
564    return;
565}
566
567static void
568dummy_on_overhead (void *closure UNUSED, uint8_t send UNUSED, size_t count UNUSED, int type UNUSED)
569{
570    return;
571}
572
573static struct UTPFunctionTable dummy_utp_function_table = {
574    .on_read = dummy_read,
575    .on_write = dummy_write,
576    .get_rb_size = dummy_get_rb_size,
577    .on_state = dummy_on_state_change,
578    .on_error = dummy_on_error,
579    .on_overhead = dummy_on_overhead
580};
581
582#endif /* #ifdef WITH_UTP */
583
584static tr_peerIo*
585tr_peerIoNew (tr_session       * session,
586              tr_bandwidth     * parent,
587              const tr_address * addr,
588              tr_port            port,
589              const uint8_t    * torrentHash,
590              bool               isIncoming,
591              bool               isSeed,
592              tr_socket_t        socket,
593              struct UTPSocket * utp_socket)
594{
595    tr_peerIo * io;
596
597    assert (session != NULL);
598    assert (session->events != NULL);
599    assert (tr_isBool (isIncoming));
600    assert (tr_isBool (isSeed));
601    assert (tr_amInEventThread (session));
602    assert ((socket == TR_BAD_SOCKET) == (utp_socket != NULL));
603#ifndef WITH_UTP
604    assert (socket != TR_BAD_SOCKET);
605#endif
606
607    if (socket != TR_BAD_SOCKET) {
608        tr_netSetTOS (socket, session->peerSocketTOS);
609        maybeSetCongestionAlgorithm (socket, session->peer_congestion_algorithm);
610    }
611
612    io = tr_new0 (tr_peerIo, 1);
613    io->magicNumber = PEER_IO_MAGIC_NUMBER;
614    io->refCount = 1;
615    tr_cryptoConstruct (&io->crypto, torrentHash, isIncoming);
616    io->session = session;
617    io->addr = *addr;
618    io->isSeed = isSeed;
619    io->port = port;
620    io->socket = socket;
621    io->utp_socket = utp_socket;
622    io->isIncoming = isIncoming;
623    io->timeCreated = tr_time ();
624    io->inbuf = evbuffer_new ();
625    io->outbuf = evbuffer_new ();
626    tr_bandwidthConstruct (&io->bandwidth, session, parent);
627    tr_bandwidthSetPeer (&io->bandwidth, io);
628    dbgmsg (io, "bandwidth is %p; its parent is %p", (void*)&io->bandwidth, (void*)parent);
629    dbgmsg (io, "socket is %"TR_PRI_SOCK", utp_socket is %p", socket, (void*)utp_socket);
630
631    if (io->socket != TR_BAD_SOCKET) {
632        io->event_read = event_new (session->event_base,
633                                    io->socket, EV_READ, event_read_cb, io);
634        io->event_write = event_new (session->event_base,
635                                     io->socket, EV_WRITE, event_write_cb, io);
636    }
637#ifdef WITH_UTP
638    else {
639        UTP_SetSockopt (utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE);
640        dbgmsg (io, "%s", "calling UTP_SetCallbacks &utp_function_table");
641        UTP_SetCallbacks (utp_socket,
642                          &utp_function_table,
643                          io);
644        if (!isIncoming) {
645            dbgmsg (io, "%s", "calling UTP_Connect");
646            UTP_Connect (utp_socket);
647        }
648    }
649#endif
650
651    return io;
652}
653
654tr_peerIo*
655tr_peerIoNewIncoming (tr_session        * session,
656                      tr_bandwidth      * parent,
657                      const tr_address  * addr,
658                      tr_port             port,
659                      tr_socket_t         fd,
660                      struct UTPSocket  * utp_socket)
661{
662    assert (session);
663    assert (tr_address_is_valid (addr));
664
665    return tr_peerIoNew (session, parent, addr, port, NULL, true, false,
666                         fd, utp_socket);
667}
668
669tr_peerIo*
670tr_peerIoNewOutgoing (tr_session        * session,
671                      tr_bandwidth      * parent,
672                      const tr_address  * addr,
673                      tr_port             port,
674                      const uint8_t     * torrentHash,
675                      bool                isSeed,
676                      bool                utp)
677{
678    tr_socket_t fd = TR_BAD_SOCKET;
679    struct UTPSocket * utp_socket = NULL;
680
681    assert (session);
682    assert (tr_address_is_valid (addr));
683    assert (torrentHash);
684
685    if (utp)
686        utp_socket = tr_netOpenPeerUTPSocket (session, addr, port, isSeed);
687
688    if (!utp_socket) {
689        fd = tr_netOpenPeerSocket (session, addr, port, isSeed);
690        dbgmsg (NULL, "tr_netOpenPeerSocket returned fd %"TR_PRI_SOCK, fd);
691    }
692
693    if (fd == TR_BAD_SOCKET && utp_socket == NULL)
694        return NULL;
695
696    return tr_peerIoNew (session, parent, addr, port,
697                         torrentHash, false, isSeed, fd, utp_socket);
698}
699
700/***
701****
702***/
703
704static void
705event_enable (tr_peerIo * io, short event)
706{
707    assert (tr_amInEventThread (io->session));
708    assert (io->session != NULL);
709    assert (io->session->events != NULL);
710
711    if (io->socket != TR_BAD_SOCKET)
712    {
713        assert (event_initialized (io->event_read));
714        assert (event_initialized (io->event_write));
715    }
716
717    if ((event & EV_READ) && ! (io->pendingEvents & EV_READ))
718    {
719        dbgmsg (io, "enabling ready-to-read polling");
720        if (io->socket != TR_BAD_SOCKET)
721            event_add (io->event_read, NULL);
722        io->pendingEvents |= EV_READ;
723    }
724
725    if ((event & EV_WRITE) && ! (io->pendingEvents & EV_WRITE))
726    {
727        dbgmsg (io, "enabling ready-to-write polling");
728        if (io->socket != TR_BAD_SOCKET)
729            event_add (io->event_write, NULL);
730        io->pendingEvents |= EV_WRITE;
731    }
732}
733
734static void
735event_disable (struct tr_peerIo * io, short event)
736{
737    assert (tr_amInEventThread (io->session));
738    assert (io->session != NULL);
739    assert (io->session->events != NULL);
740
741    if (io->socket != TR_BAD_SOCKET)
742    {
743        assert (event_initialized (io->event_read));
744        assert (event_initialized (io->event_write));
745    }
746
747    if ((event & EV_READ) && (io->pendingEvents & EV_READ))
748    {
749        dbgmsg (io, "disabling ready-to-read polling");
750        if (io->socket != TR_BAD_SOCKET)
751            event_del (io->event_read);
752        io->pendingEvents &= ~EV_READ;
753    }
754
755    if ((event & EV_WRITE) && (io->pendingEvents & EV_WRITE))
756    {
757        dbgmsg (io, "disabling ready-to-write polling");
758        if (io->socket != TR_BAD_SOCKET)
759            event_del (io->event_write);
760        io->pendingEvents &= ~EV_WRITE;
761    }
762}
763
764void
765tr_peerIoSetEnabled (tr_peerIo    * io,
766                     tr_direction   dir,
767                     bool           isEnabled)
768{
769    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
770
771    assert (tr_isPeerIo (io));
772    assert (tr_isDirection (dir));
773    assert (tr_amInEventThread (io->session));
774    assert (io->session->events != NULL);
775
776    if (isEnabled)
777        event_enable (io, event);
778    else
779        event_disable (io, event);
780}
781
782/***
783****
784***/
785static void
786io_close_socket (tr_peerIo * io)
787{
788    if (io->socket != TR_BAD_SOCKET) {
789        tr_netClose (io->session, io->socket);
790        io->socket = TR_BAD_SOCKET;
791    }
792
793    if (io->event_read != NULL) {
794        event_free (io->event_read);
795        io->event_read = NULL;
796    }
797
798    if (io->event_write != NULL) {
799        event_free (io->event_write);
800        io->event_write = NULL;
801    }
802
803#ifdef WITH_UTP
804    if (io->utp_socket) {
805        UTP_SetCallbacks (io->utp_socket,
806                          &dummy_utp_function_table,
807                          NULL);
808        UTP_Close (io->utp_socket);
809
810        io->utp_socket = NULL;
811    }
812#endif
813}
814
815static void
816io_dtor (void * vio)
817{
818    tr_peerIo * io = vio;
819
820    assert (tr_isPeerIo (io));
821    assert (tr_amInEventThread (io->session));
822    assert (io->session->events != NULL);
823
824    dbgmsg (io, "in tr_peerIo destructor");
825    event_disable (io, EV_READ | EV_WRITE);
826    tr_bandwidthDestruct (&io->bandwidth);
827    evbuffer_free (io->outbuf);
828    evbuffer_free (io->inbuf);
829    io_close_socket (io);
830    tr_cryptoDestruct (&io->crypto);
831
832    while (io->outbuf_datatypes != NULL)
833        peer_io_pull_datatype (io);
834
835    memset (io, ~0, sizeof (tr_peerIo));
836    tr_free (io);
837}
838
839static void
840tr_peerIoFree (tr_peerIo * io)
841{
842    if (io)
843    {
844        dbgmsg (io, "in tr_peerIoFree");
845        io->canRead = NULL;
846        io->didWrite = NULL;
847        io->gotError = NULL;
848        tr_runInEventThread (io->session, io_dtor, io);
849    }
850}
851
852void
853tr_peerIoRefImpl (const char * file, int line, tr_peerIo * io)
854{
855    assert (tr_isPeerIo (io));
856
857    dbgmsg (io, "%s:%d is incrementing the IO's refcount from %d to %d",
858                file, line, io->refCount, io->refCount+1);
859
860    ++io->refCount;
861}
862
863void
864tr_peerIoUnrefImpl (const char * file, int line, tr_peerIo * io)
865{
866    assert (tr_isPeerIo (io));
867
868    dbgmsg (io, "%s:%d is decrementing the IO's refcount from %d to %d",
869                file, line, io->refCount, io->refCount-1);
870
871    if (!--io->refCount)
872        tr_peerIoFree (io);
873}
874
875const tr_address*
876tr_peerIoGetAddress (const tr_peerIo * io, tr_port   * port)
877{
878    assert (tr_isPeerIo (io));
879
880    if (port)
881        *port = io->port;
882
883    return &io->addr;
884}
885
886const char*
887tr_peerIoAddrStr (const tr_address * addr, tr_port port)
888{
889    static char buf[512];
890    tr_snprintf (buf, sizeof (buf), "[%s]:%u", tr_address_to_string (addr), ntohs (port));
891    return buf;
892}
893
894const char* tr_peerIoGetAddrStr (const tr_peerIo * io)
895{
896    return tr_isPeerIo (io) ? tr_peerIoAddrStr (&io->addr, io->port) : "error";
897}
898
899void
900tr_peerIoSetIOFuncs (tr_peerIo        * io,
901                     tr_can_read_cb     readcb,
902                     tr_did_write_cb    writecb,
903                     tr_net_error_cb    errcb,
904                     void             * userData)
905{
906    io->canRead = readcb;
907    io->didWrite = writecb;
908    io->gotError = errcb;
909    io->userData = userData;
910}
911
912void
913tr_peerIoClear (tr_peerIo * io)
914{
915    tr_peerIoSetIOFuncs (io, NULL, NULL, NULL, NULL);
916    tr_peerIoSetEnabled (io, TR_UP, false);
917    tr_peerIoSetEnabled (io, TR_DOWN, false);
918}
919
920int
921tr_peerIoReconnect (tr_peerIo * io)
922{
923    short int pendingEvents;
924    tr_session * session;
925
926    assert (tr_isPeerIo (io));
927    assert (!tr_peerIoIsIncoming (io));
928
929    session = tr_peerIoGetSession (io);
930
931    pendingEvents = io->pendingEvents;
932    event_disable (io, EV_READ | EV_WRITE);
933
934    io_close_socket (io);
935
936    io->socket = tr_netOpenPeerSocket (session, &io->addr, io->port, io->isSeed);
937    io->event_read = event_new (session->event_base, io->socket, EV_READ, event_read_cb, io);
938    io->event_write = event_new (session->event_base, io->socket, EV_WRITE, event_write_cb, io);
939
940    if (io->socket != TR_BAD_SOCKET)
941    {
942        event_enable (io, pendingEvents);
943        tr_netSetTOS (io->socket, session->peerSocketTOS);
944        maybeSetCongestionAlgorithm (io->socket, session->peer_congestion_algorithm);
945        return 0;
946    }
947
948    return -1;
949}
950
951/**
952***
953**/
954
955void
956tr_peerIoSetTorrentHash (tr_peerIo *     io,
957                         const uint8_t * hash)
958{
959    assert (tr_isPeerIo (io));
960
961    tr_cryptoSetTorrentHash (&io->crypto, hash);
962}
963
964const uint8_t*
965tr_peerIoGetTorrentHash (tr_peerIo * io)
966{
967    assert (tr_isPeerIo (io));
968
969    return tr_cryptoGetTorrentHash (&io->crypto);
970}
971
972bool
973tr_peerIoHasTorrentHash (const tr_peerIo * io)
974{
975    assert (tr_isPeerIo (io));
976
977    return tr_cryptoHasTorrentHash (&io->crypto);
978}
979
980/**
981***
982**/
983
984void
985tr_peerIoSetPeersId (tr_peerIo * io, const uint8_t * peer_id)
986{
987    assert (tr_isPeerIo (io));
988
989    if ((io->peerIdIsSet = peer_id != NULL))
990        memcpy (io->peerId, peer_id, 20);
991    else
992        memset (io->peerId, 0, 20);
993}
994
995/**
996***
997**/
998
999static unsigned int
1000getDesiredOutputBufferSize (const tr_peerIo * io, uint64_t now)
1001{
1002    /* this is all kind of arbitrary, but what seems to work well is
1003     * being large enough to hold the next 20 seconds' worth of input,
1004     * or a few blocks, whichever is bigger.
1005     * It's okay to tweak this as needed */
1006    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps (&io->bandwidth, now, TR_UP);
1007    const unsigned int period = 15u; /* arbitrary */
1008    /* the 3 is arbitrary; the .5 is to leave room for messages */
1009    static const unsigned int ceiling = (unsigned int)(MAX_BLOCK_SIZE * 3.5);
1010    return MAX (ceiling, currentSpeed_Bps*period);
1011}
1012
1013size_t
1014tr_peerIoGetWriteBufferSpace (const tr_peerIo * io, uint64_t now)
1015{
1016    const size_t desiredLen = getDesiredOutputBufferSize (io, now);
1017    const size_t currentLen = evbuffer_get_length (io->outbuf);
1018    size_t freeSpace = 0;
1019
1020    if (desiredLen > currentLen)
1021        freeSpace = desiredLen - currentLen;
1022
1023    return freeSpace;
1024}
1025
1026/**
1027***
1028**/
1029
1030void
1031tr_peerIoSetEncryption (tr_peerIo * io, tr_encryption_type encryption_type)
1032{
1033    assert (tr_isPeerIo (io));
1034    assert (encryption_type == PEER_ENCRYPTION_NONE
1035         || encryption_type == PEER_ENCRYPTION_RC4);
1036
1037    io->encryption_type = encryption_type;
1038}
1039
1040/**
1041***
1042**/
1043
1044static inline void
1045processBuffer (tr_crypto        * crypto,
1046               struct evbuffer  * buffer,
1047               size_t             offset,
1048               size_t             size,
1049               void            (* callback) (tr_crypto *, size_t, const void *, void *))
1050{
1051    struct evbuffer_ptr pos;
1052    struct evbuffer_iovec iovec;
1053
1054    evbuffer_ptr_set (buffer, &pos, offset, EVBUFFER_PTR_SET);
1055
1056    do
1057    {
1058        if (evbuffer_peek (buffer, size, &pos, &iovec, 1) <= 0)
1059            break;
1060
1061        callback (crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base);
1062
1063        assert (size >= iovec.iov_len);
1064        size -= iovec.iov_len;
1065    }
1066    while (!evbuffer_ptr_set (buffer, &pos, iovec.iov_len, EVBUFFER_PTR_ADD));
1067
1068    assert (size == 0);
1069}
1070
1071static void
1072addDatatype (tr_peerIo * io, size_t byteCount, bool isPieceData)
1073{
1074    struct tr_datatype * d;
1075    d = datatype_new ();
1076    d->isPieceData = isPieceData;
1077    d->length = byteCount;
1078    peer_io_push_datatype (io, d);
1079}
1080
1081static inline void
1082maybeEncryptBuffer (tr_peerIo       * io,
1083                    struct evbuffer * buf,
1084                    size_t            offset,
1085                    size_t            size)
1086{
1087    if (io->encryption_type == PEER_ENCRYPTION_RC4)
1088        processBuffer (&io->crypto, buf, offset, size, &tr_cryptoEncrypt);
1089}
1090
1091void
1092tr_peerIoWriteBuf (tr_peerIo * io, struct evbuffer * buf, bool isPieceData)
1093{
1094    const size_t byteCount = evbuffer_get_length (buf);
1095    maybeEncryptBuffer (io, buf, 0, byteCount);
1096    evbuffer_add_buffer (io->outbuf, buf);
1097    addDatatype (io, byteCount, isPieceData);
1098}
1099
1100void
1101tr_peerIoWriteBytes (tr_peerIo * io, const void * bytes, size_t byteCount, bool isPieceData)
1102{
1103    struct evbuffer_iovec iovec;
1104    evbuffer_reserve_space (io->outbuf, byteCount, &iovec, 1);
1105
1106    iovec.iov_len = byteCount;
1107    if (io->encryption_type == PEER_ENCRYPTION_RC4)
1108        tr_cryptoEncrypt (&io->crypto, iovec.iov_len, bytes, iovec.iov_base);
1109    else
1110        memcpy (iovec.iov_base, bytes, iovec.iov_len);
1111    evbuffer_commit_space (io->outbuf, &iovec, 1);
1112
1113    addDatatype (io, byteCount, isPieceData);
1114}
1115
1116/***
1117****
1118***/
1119
1120void
1121evbuffer_add_uint8 (struct evbuffer * outbuf, uint8_t byte)
1122{
1123    evbuffer_add (outbuf, &byte, 1);
1124}
1125
1126void
1127evbuffer_add_uint16 (struct evbuffer * outbuf, uint16_t addme_hs)
1128{
1129    const uint16_t ns = htons (addme_hs);
1130    evbuffer_add (outbuf, &ns, sizeof (ns));
1131}
1132
1133void
1134evbuffer_add_uint32 (struct evbuffer * outbuf, uint32_t addme_hl)
1135{
1136    const uint32_t nl = htonl (addme_hl);
1137    evbuffer_add (outbuf, &nl, sizeof (nl));
1138}
1139
1140void
1141evbuffer_add_uint64 (struct evbuffer * outbuf, uint64_t addme_hll)
1142{
1143    const uint64_t nll = tr_htonll (addme_hll);
1144    evbuffer_add (outbuf, &nll, sizeof (nll));
1145}
1146
1147/***
1148****
1149***/
1150
1151static inline void
1152maybeDecryptBuffer (tr_peerIo       * io,
1153                    struct evbuffer * buf,
1154                    size_t            offset,
1155                    size_t            size)
1156{
1157    if (io->encryption_type == PEER_ENCRYPTION_RC4)
1158        processBuffer (&io->crypto, buf, offset, size, &tr_cryptoDecrypt);
1159}
1160
1161void
1162tr_peerIoReadBytesToBuf (tr_peerIo * io, struct evbuffer * inbuf, struct evbuffer * outbuf, size_t byteCount)
1163{
1164    struct evbuffer * tmp;
1165    const size_t old_length = evbuffer_get_length (outbuf);
1166
1167    assert (tr_isPeerIo (io));
1168    assert (evbuffer_get_length (inbuf) >= byteCount);
1169
1170    /* append it to outbuf */
1171    tmp = evbuffer_new ();
1172    evbuffer_remove_buffer (inbuf, tmp, byteCount);
1173    evbuffer_add_buffer (outbuf, tmp);
1174    evbuffer_free (tmp);
1175
1176    maybeDecryptBuffer (io, outbuf, old_length, byteCount);
1177}
1178
1179void
1180tr_peerIoReadBytes (tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount)
1181{
1182    assert (tr_isPeerIo (io));
1183    assert (evbuffer_get_length (inbuf)  >= byteCount);
1184
1185    switch (io->encryption_type)
1186    {
1187        case PEER_ENCRYPTION_NONE:
1188            evbuffer_remove (inbuf, bytes, byteCount);
1189            break;
1190
1191        case PEER_ENCRYPTION_RC4:
1192            evbuffer_remove (inbuf, bytes, byteCount);
1193            tr_cryptoDecrypt (&io->crypto, byteCount, bytes, bytes);
1194            break;
1195
1196        default:
1197            assert (false);
1198    }
1199}
1200
1201void
1202tr_peerIoReadUint16 (tr_peerIo        * io,
1203                     struct evbuffer  * inbuf,
1204                     uint16_t         * setme)
1205{
1206    uint16_t tmp;
1207    tr_peerIoReadBytes (io, inbuf, &tmp, sizeof (uint16_t));
1208    *setme = ntohs (tmp);
1209}
1210
1211void tr_peerIoReadUint32 (tr_peerIo        * io,
1212                          struct evbuffer  * inbuf,
1213                          uint32_t         * setme)
1214{
1215    uint32_t tmp;
1216    tr_peerIoReadBytes (io, inbuf, &tmp, sizeof (uint32_t));
1217    *setme = ntohl (tmp);
1218}
1219
1220void
1221tr_peerIoDrain (tr_peerIo       * io,
1222                struct evbuffer * inbuf,
1223                size_t            byteCount)
1224{
1225    char buf[4096];
1226    const size_t buflen = sizeof (buf);
1227
1228    while (byteCount > 0)
1229    {
1230        const size_t thisPass = MIN (byteCount, buflen);
1231        tr_peerIoReadBytes (io, inbuf, buf, thisPass);
1232        byteCount -= thisPass;
1233    }
1234}
1235
1236/***
1237****
1238***/
1239
1240static int
1241tr_peerIoTryRead (tr_peerIo * io, size_t howmuch)
1242{
1243    int res = 0;
1244
1245    if ((howmuch = tr_bandwidthClamp (&io->bandwidth, TR_DOWN, howmuch)))
1246    {
1247        if (io->utp_socket != NULL) /* utp peer connection */
1248        {
1249            /* UTP_RBDrained notifies libutp that your read buffer is emtpy.
1250             * It opens up the congestion window by sending an ACK (soonish)
1251             * if one was not going to be sent. */
1252            if (evbuffer_get_length (io->inbuf) == 0)
1253                UTP_RBDrained (io->utp_socket);
1254        }
1255        else /* tcp peer connection */
1256        {
1257            int e;
1258            char err_buf[512];
1259
1260            EVUTIL_SET_SOCKET_ERROR (0);
1261            res = evbuffer_read (io->inbuf, io->socket, (int)howmuch);
1262            e = EVUTIL_SOCKET_ERROR ();
1263
1264            dbgmsg (io, "read %d from peer (%s)", res,
1265                    (res==-1?tr_net_strerror (err_buf, sizeof (err_buf), e):""));
1266
1267            if (evbuffer_get_length (io->inbuf))
1268                canReadWrapper (io);
1269
1270            if ((res <= 0) && (io->gotError) && (e != EAGAIN) && (e != EINTR) && (e != EINPROGRESS))
1271            {
1272                short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1273                if (res == 0)
1274                    what |= BEV_EVENT_EOF;
1275                dbgmsg (io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)",
1276                        res, what, e, tr_net_strerror (err_buf, sizeof (err_buf), e));
1277                io->gotError (io, what, io->userData);
1278            }
1279        }
1280    }
1281
1282    return res;
1283}
1284
1285static int
1286tr_peerIoTryWrite (tr_peerIo * io, size_t howmuch)
1287{
1288    int n = 0;
1289    const size_t old_len = evbuffer_get_length (io->outbuf);
1290    dbgmsg (io, "in tr_peerIoTryWrite %zu", howmuch);
1291
1292    if (howmuch > old_len)
1293        howmuch = old_len;
1294
1295    if ((howmuch = tr_bandwidthClamp (&io->bandwidth, TR_UP, howmuch)))
1296    {
1297        if (io->utp_socket != NULL) /* utp peer connection */
1298        {
1299            UTP_Write (io->utp_socket, howmuch);
1300            n = old_len - evbuffer_get_length (io->outbuf);
1301        }
1302        else
1303        {
1304            int e;
1305
1306            EVUTIL_SET_SOCKET_ERROR (0);
1307            n = tr_evbuffer_write (io, io->socket, howmuch);
1308            e = EVUTIL_SOCKET_ERROR ();
1309
1310            if (n > 0)
1311                didWriteWrapper (io, n);
1312
1313            if ((n < 0) && (io->gotError) && e && (e != EPIPE) && (e != EAGAIN) && (e != EINTR) && (e != EINPROGRESS))
1314            {
1315                char errstr[512];
1316                const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1317
1318                dbgmsg (io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)",
1319                        n, what, e, tr_net_strerror (errstr, sizeof (errstr), e));
1320
1321                if (io->gotError != NULL)
1322                    io->gotError (io, what, io->userData);
1323            }
1324        }
1325    }
1326
1327    return n;
1328}
1329
1330int
1331tr_peerIoFlush (tr_peerIo  * io, tr_direction dir, size_t limit)
1332{
1333    int bytesUsed = 0;
1334
1335    assert (tr_isPeerIo (io));
1336    assert (tr_isDirection (dir));
1337
1338    if (dir == TR_DOWN)
1339        bytesUsed = tr_peerIoTryRead (io, limit);
1340    else
1341        bytesUsed = tr_peerIoTryWrite (io, limit);
1342
1343    dbgmsg (io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed);
1344    return bytesUsed;
1345}
1346
1347int
1348tr_peerIoFlushOutgoingProtocolMsgs (tr_peerIo * io)
1349{
1350    size_t byteCount = 0;
1351    const struct tr_datatype * it;
1352
1353    /* count up how many bytes are used by non-piece-data messages
1354       at the front of our outbound queue */
1355    for (it=io->outbuf_datatypes; it!=NULL; it=it->next)
1356        if (it->isPieceData)
1357            break;
1358        else
1359            byteCount += it->length;
1360
1361    return tr_peerIoFlush (io, TR_UP, byteCount);
1362}
Note: See TracBrowser for help on using the repository browser.