source: trunk/libtransmission/trevent.c @ 14525

Last change on this file since 14525 was 14525, checked in by mikedld, 6 years ago

Fix some issues revealed by coverity

  • Property svn:keywords set to Date Rev Author Id
File size: 8.0 KB
Line 
1/*
2 * This file Copyright (C) 2007-2014 Mnemosyne LLC
3 *
4 * It may be used under the GNU GPL versions 2 or 3
5 * or any future license endorsed by Mnemosyne LLC.
6 *
7 * $Id: trevent.c 14525 2015-05-09 08:37:55Z mikedld $
8 */
9
10#include <assert.h>
11#include <errno.h>
12#include <string.h>
13
14#include <signal.h>
15
16#ifdef _WIN32
17 #include <winsock2.h>
18#else
19 #include <unistd.h> /* read (), write (), pipe () */
20#endif
21
22#include <event2/dns.h>
23#include <event2/event.h>
24
25#include "transmission.h"
26#include "log.h"
27#include "net.h"
28#include "session.h"
29
30#include "transmission.h"
31#include "platform.h" /* tr_lockLock () */
32#include "trevent.h"
33#include "utils.h"
34
35
36#ifdef _WIN32
37
38typedef SOCKET tr_pipe_end_t;
39
40static int
41pgpipe (tr_pipe_end_t handles[2])
42{
43    SOCKET s;
44    struct sockaddr_in serv_addr;
45    int len = sizeof (serv_addr);
46
47    handles[0] = handles[1] = INVALID_SOCKET;
48
49    if ((s = socket (AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
50    {
51        tr_logAddDebug ("pgpipe failed to create socket: %ui", WSAGetLastError ());
52        return -1;
53    }
54
55    memset (&serv_addr, 0, sizeof (serv_addr));
56    serv_addr.sin_family = AF_INET;
57    serv_addr.sin_port = htons (0);
58    serv_addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
59    if (bind (s, (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
60    {
61        tr_logAddDebug ("pgpipe failed to bind: %ui", WSAGetLastError ());
62        closesocket (s);
63        return -1;
64    }
65    if (listen (s, 1) == SOCKET_ERROR)
66    {
67        tr_logAddNamedDbg ("event","pgpipe failed to listen: %ui", WSAGetLastError ());
68        closesocket (s);
69        return -1;
70    }
71    if (getsockname (s, (SOCKADDR *) & serv_addr, &len) == SOCKET_ERROR)
72    {
73        tr_logAddDebug ("pgpipe failed to getsockname: %ui", WSAGetLastError ());
74        closesocket (s);
75        return -1;
76    }
77    if ((handles[1] = socket (PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
78    {
79        tr_logAddDebug ("pgpipe failed to create socket 2: %ui", WSAGetLastError ());
80        closesocket (s);
81        return -1;
82    }
83
84    if (connect (handles[1], (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
85    {
86        tr_logAddDebug ("pgpipe failed to connect socket: %ui", WSAGetLastError ());
87        closesocket (s);
88        return -1;
89    }
90    if ((handles[0] = accept (s, (SOCKADDR *) & serv_addr, &len)) == INVALID_SOCKET)
91    {
92        tr_logAddDebug ("pgpipe failed to accept socket: %ui", WSAGetLastError ());
93        closesocket (handles[1]);
94        handles[1] = INVALID_SOCKET;
95        closesocket (s);
96        return -1;
97    }
98    closesocket (s);
99    return 0;
100}
101
102static int
103piperead (tr_pipe_end_t   s,
104          void          * buf,
105          int             len)
106{
107    int ret = recv (s, buf, len, 0);
108
109    if (ret < 0) {
110        const int werror= WSAGetLastError ();
111        switch (werror) {
112          /* simplified error mapping (not valid for connect) */
113            case WSAEWOULDBLOCK:
114                errno = EAGAIN;
115                break;
116            case WSAECONNRESET:
117                /* EOF on the pipe! (win32 socket based implementation) */
118                ret = 0;
119                /* fall through */
120            default:
121                errno = werror;
122                break;
123        }
124    } else
125        errno = 0;
126    return ret;
127}
128
129#define pipe(a) pgpipe (a)
130#define pipewrite(a,b,c) send (a, (char*)b,c,0)
131
132#else
133typedef int tr_pipe_end_t;
134#define piperead(a,b,c) read (a,b,c)
135#define pipewrite(a,b,c) write (a,b,c)
136#endif
137
138/***
139****
140***/
141
142typedef struct tr_event_handle
143{
144    uint8_t      die;
145    tr_pipe_end_t fds[2];
146    tr_lock *    lock;
147    tr_session *  session;
148    tr_thread *  thread;
149    struct event_base * base;
150    struct event * pipeEvent;
151}
152tr_event_handle;
153
154struct tr_run_data
155{
156    void  (*func)(void *);
157    void *  user_data;
158};
159
160#define dbgmsg(...) \
161    do { \
162        if (tr_logGetDeepEnabled ()) \
163            tr_logAddDeep (__FILE__, __LINE__, "event", __VA_ARGS__); \
164    } while (0)
165
166static void
167readFromPipe (evutil_socket_t   fd,
168              short             eventType,
169              void            * veh)
170{
171    char              ch;
172    int               ret;
173    tr_event_handle * eh = veh;
174
175    dbgmsg ("readFromPipe: eventType is %hd", eventType);
176
177    /* read the command type */
178    ch = '\0';
179    do
180    {
181        ret = piperead (fd, &ch, 1);
182    }
183    while (!eh->die && ret < 0 && errno == EAGAIN);
184
185    dbgmsg ("command is [%c], ret is %d, errno is %d", ch, ret, (int)errno);
186
187    switch (ch)
188    {
189        case 'r': /* run in libevent thread */
190        {
191            struct tr_run_data data;
192            const size_t       nwant = sizeof (data);
193            const ssize_t      ngot = piperead (fd, &data, nwant);
194            if (!eh->die && (ngot == (ssize_t)nwant))
195            {
196                dbgmsg ("invoking function in libevent thread");
197              (data.func)(data.user_data);
198            }
199            break;
200        }
201
202        case '\0': /* eof */
203        {
204            dbgmsg ("pipe eof reached... removing event listener");
205            event_free (eh->pipeEvent);
206            break;
207        }
208
209        default:
210        {
211            assert (0 && "unhandled command type!");
212            break;
213        }
214    }
215}
216
217static void
218logFunc (int severity, const char * message)
219{
220    if (severity >= _EVENT_LOG_ERR)
221        tr_logAddError ("%s", message);
222    else
223        tr_logAddDebug ("%s", message);
224}
225
226static void
227libeventThreadFunc (void * veh)
228{
229    struct event_base * base;
230    tr_event_handle * eh = veh;
231
232#ifndef _WIN32
233    /* Don't exit when writing on a broken socket */
234    signal (SIGPIPE, SIG_IGN);
235#endif
236
237    /* create the libevent bases */
238    base = event_base_new ();
239
240    /* set the struct's fields */
241    eh->base = base;
242    eh->session->event_base = base;
243    eh->session->evdns_base = evdns_base_new (base, true);
244    eh->session->events = eh;
245
246    /* listen to the pipe's read fd */
247    eh->pipeEvent = event_new (base, eh->fds[0], EV_READ | EV_PERSIST, readFromPipe, veh);
248    event_add (eh->pipeEvent, NULL);
249    event_set_log_callback (logFunc);
250
251    /* loop until all the events are done */
252    while (!eh->die)
253        event_base_dispatch (base);
254
255    /* shut down the thread */
256    tr_lockFree (eh->lock);
257    event_base_free (base);
258    eh->session->events = NULL;
259    tr_free (eh);
260    tr_logAddDebug ("Closing libevent thread");
261}
262
263void
264tr_eventInit (tr_session * session)
265{
266    tr_event_handle * eh;
267
268    session->events = NULL;
269
270    eh = tr_new0 (tr_event_handle, 1);
271    eh->lock = tr_lockNew ();
272    if (pipe (eh->fds) == -1)
273      tr_logAddError ("Unable to write to pipe() in libtransmission: %s", tr_strerror(errno));
274    eh->session = session;
275    eh->thread = tr_threadNew (libeventThreadFunc, eh);
276
277    /* wait until the libevent thread is running */
278    while (session->events == NULL)
279        tr_wait_msec (100);
280}
281
282void
283tr_eventClose (tr_session * session)
284{
285    assert (tr_isSession (session));
286
287    if (session->events == NULL)
288        return;
289
290    session->events->die = true;
291    tr_logAddDeep (__FILE__, __LINE__, NULL, "closing trevent pipe");
292    tr_netCloseSocket (session->events->fds[1]);
293}
294
295/**
296***
297**/
298
299bool
300tr_amInEventThread (const tr_session * session)
301{
302    assert (tr_isSession (session));
303    assert (session->events != NULL);
304
305    return tr_amInThread (session->events->thread);
306}
307
308/**
309***
310**/
311
312void
313tr_runInEventThread (tr_session * session,
314                     void func (void*), void * user_data)
315{
316  assert (tr_isSession (session));
317  assert (session->events != NULL);
318
319  if (tr_amInThread (session->events->thread))
320    {
321      (func)(user_data);
322    }
323  else
324    {
325      tr_pipe_end_t fd;
326      char ch;
327      ssize_t res_1;
328      ssize_t res_2;
329      tr_event_handle * e = session->events;
330      struct tr_run_data data;
331
332      tr_lockLock (e->lock);
333
334      fd = e->fds[1];
335      ch = 'r';
336      res_1 = pipewrite (fd, &ch, 1);
337
338      data.func = func;
339      data.user_data = user_data;
340      res_2 = pipewrite (fd, &data, sizeof (data));
341
342      tr_lockUnlock (e->lock);
343
344      if ((res_1 == -1) || (res_2 == -1))
345        tr_logAddError ("Unable to write to libtransmisison event queue: %s", tr_strerror(errno));
346    }
347}
Note: See TracBrowser for help on using the repository browser.