source: trunk/libtransmission/trevent.c

Last change on this file was 14648, checked in by mikedld, 5 years ago

#5891: Fix crash on session shutdown (evdns_getaddrinfo_cancel)

  • Property svn:keywords set to Date Rev Author Id
File size: 8.1 KB
Line 
1/*
2 * This file Copyright (C) 2007-2014 Mnemosyne LLC
3 *
4 * It may be used under the GNU GPL versions 2 or 3
5 * or any future license endorsed by Mnemosyne LLC.
6 *
7 * $Id: trevent.c 14648 2015-12-31 14:17:37Z mikedld $
8 */
9
10#include <assert.h>
11#include <errno.h>
12#include <string.h>
13
14#include <signal.h>
15
16#ifdef _WIN32
17 #include <winsock2.h>
18#else
19 #include <unistd.h> /* read (), write (), pipe () */
20#endif
21
22#include <event2/dns.h>
23#include <event2/event.h>
24
25#include "transmission.h"
26#include "log.h"
27#include "net.h"
28#include "session.h"
29
30#include "transmission.h"
31#include "platform.h" /* tr_lockLock () */
32#include "trevent.h"
33#include "utils.h"
34
35
36#ifdef _WIN32
37
38typedef SOCKET tr_pipe_end_t;
39
40static int
41pgpipe (tr_pipe_end_t handles[2])
42{
43    SOCKET s;
44    struct sockaddr_in serv_addr;
45    int len = sizeof (serv_addr);
46
47    handles[0] = handles[1] = INVALID_SOCKET;
48
49    if ((s = socket (AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
50    {
51        tr_logAddDebug ("pgpipe failed to create socket: %ui", WSAGetLastError ());
52        return -1;
53    }
54
55    memset (&serv_addr, 0, sizeof (serv_addr));
56    serv_addr.sin_family = AF_INET;
57    serv_addr.sin_port = htons (0);
58    serv_addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
59    if (bind (s, (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
60    {
61        tr_logAddDebug ("pgpipe failed to bind: %ui", WSAGetLastError ());
62        closesocket (s);
63        return -1;
64    }
65    if (listen (s, 1) == SOCKET_ERROR)
66    {
67        tr_logAddNamedDbg ("event","pgpipe failed to listen: %ui", WSAGetLastError ());
68        closesocket (s);
69        return -1;
70    }
71    if (getsockname (s, (SOCKADDR *) & serv_addr, &len) == SOCKET_ERROR)
72    {
73        tr_logAddDebug ("pgpipe failed to getsockname: %ui", WSAGetLastError ());
74        closesocket (s);
75        return -1;
76    }
77    if ((handles[1] = socket (PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
78    {
79        tr_logAddDebug ("pgpipe failed to create socket 2: %ui", WSAGetLastError ());
80        closesocket (s);
81        return -1;
82    }
83
84    if (connect (handles[1], (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
85    {
86        tr_logAddDebug ("pgpipe failed to connect socket: %ui", WSAGetLastError ());
87        closesocket (s);
88        return -1;
89    }
90    if ((handles[0] = accept (s, (SOCKADDR *) & serv_addr, &len)) == INVALID_SOCKET)
91    {
92        tr_logAddDebug ("pgpipe failed to accept socket: %ui", WSAGetLastError ());
93        closesocket (handles[1]);
94        handles[1] = INVALID_SOCKET;
95        closesocket (s);
96        return -1;
97    }
98    closesocket (s);
99    return 0;
100}
101
102static int
103piperead (tr_pipe_end_t   s,
104          void          * buf,
105          int             len)
106{
107    int ret = recv (s, buf, len, 0);
108
109    if (ret < 0) {
110        const int werror= WSAGetLastError ();
111        switch (werror) {
112          /* simplified error mapping (not valid for connect) */
113            case WSAEWOULDBLOCK:
114                errno = EAGAIN;
115                break;
116            case WSAECONNRESET:
117                /* EOF on the pipe! (win32 socket based implementation) */
118                ret = 0;
119                /* fall through */
120            default:
121                errno = werror;
122                break;
123        }
124    } else
125        errno = 0;
126    return ret;
127}
128
129#define pipe(a) pgpipe (a)
130#define pipewrite(a,b,c) send (a, (char*)b,c,0)
131
132#else
133typedef int tr_pipe_end_t;
134#define piperead(a,b,c) read (a,b,c)
135#define pipewrite(a,b,c) write (a,b,c)
136#endif
137
138/***
139****
140***/
141
142typedef struct tr_event_handle
143{
144    uint8_t      die;
145    tr_pipe_end_t fds[2];
146    tr_lock *    lock;
147    tr_session *  session;
148    tr_thread *  thread;
149    struct event_base * base;
150    struct event * pipeEvent;
151}
152tr_event_handle;
153
154struct tr_run_data
155{
156    void  (*func)(void *);
157    void *  user_data;
158};
159
160#define dbgmsg(...) \
161    do { \
162        if (tr_logGetDeepEnabled ()) \
163            tr_logAddDeep (__FILE__, __LINE__, "event", __VA_ARGS__); \
164    } while (0)
165
166static void
167readFromPipe (evutil_socket_t   fd,
168              short             eventType,
169              void            * veh)
170{
171    char              ch;
172    int               ret;
173    tr_event_handle * eh = veh;
174
175    dbgmsg ("readFromPipe: eventType is %hd", eventType);
176
177    /* read the command type */
178    ch = '\0';
179    do
180    {
181        ret = piperead (fd, &ch, 1);
182    }
183    while (!eh->die && ret < 0 && errno == EAGAIN);
184
185    dbgmsg ("command is [%c], ret is %d, errno is %d", ch, ret, (int)errno);
186
187    switch (ch)
188    {
189        case 'r': /* run in libevent thread */
190        {
191            struct tr_run_data data;
192            const size_t       nwant = sizeof (data);
193            const ev_ssize_t   ngot = piperead (fd, &data, nwant);
194            if (!eh->die && (ngot == (ev_ssize_t) nwant))
195            {
196                dbgmsg ("invoking function in libevent thread");
197              (data.func)(data.user_data);
198            }
199            break;
200        }
201
202        case '\0': /* eof */
203        {
204            dbgmsg ("pipe eof reached... removing event listener");
205            event_free (eh->pipeEvent);
206            tr_netCloseSocket (eh->fds[0]);
207            event_base_loopexit (eh->base, NULL);
208            break;
209        }
210
211        default:
212        {
213            assert (0 && "unhandled command type!");
214            break;
215        }
216    }
217}
218
219static void
220logFunc (int severity, const char * message)
221{
222    if (severity >= _EVENT_LOG_ERR)
223        tr_logAddError ("%s", message);
224    else
225        tr_logAddDebug ("%s", message);
226}
227
228static void
229libeventThreadFunc (void * veh)
230{
231    struct event_base * base;
232    tr_event_handle * eh = veh;
233
234#ifndef _WIN32
235    /* Don't exit when writing on a broken socket */
236    signal (SIGPIPE, SIG_IGN);
237#endif
238
239    /* create the libevent bases */
240    base = event_base_new ();
241
242    /* set the struct's fields */
243    eh->base = base;
244    eh->session->event_base = base;
245    eh->session->evdns_base = evdns_base_new (base, true);
246    eh->session->events = eh;
247
248    /* listen to the pipe's read fd */
249    eh->pipeEvent = event_new (base, eh->fds[0], EV_READ | EV_PERSIST, readFromPipe, veh);
250    event_add (eh->pipeEvent, NULL);
251    event_set_log_callback (logFunc);
252
253    /* loop until all the events are done */
254    while (!eh->die)
255        event_base_dispatch (base);
256
257    /* shut down the thread */
258    tr_lockFree (eh->lock);
259    event_base_free (base);
260    eh->session->events = NULL;
261    tr_free (eh);
262    tr_logAddDebug ("Closing libevent thread");
263}
264
265void
266tr_eventInit (tr_session * session)
267{
268    tr_event_handle * eh;
269
270    session->events = NULL;
271
272    eh = tr_new0 (tr_event_handle, 1);
273    eh->lock = tr_lockNew ();
274    if (pipe (eh->fds) == -1)
275      tr_logAddError ("Unable to write to pipe() in libtransmission: %s", tr_strerror(errno));
276    eh->session = session;
277    eh->thread = tr_threadNew (libeventThreadFunc, eh);
278
279    /* wait until the libevent thread is running */
280    while (session->events == NULL)
281        tr_wait_msec (100);
282}
283
284void
285tr_eventClose (tr_session * session)
286{
287    assert (tr_isSession (session));
288
289    if (session->events == NULL)
290        return;
291
292    session->events->die = true;
293    tr_logAddDeep (__FILE__, __LINE__, NULL, "closing trevent pipe");
294    tr_netCloseSocket (session->events->fds[1]);
295}
296
297/**
298***
299**/
300
301bool
302tr_amInEventThread (const tr_session * session)
303{
304    assert (tr_isSession (session));
305    assert (session->events != NULL);
306
307    return tr_amInThread (session->events->thread);
308}
309
310/**
311***
312**/
313
314void
315tr_runInEventThread (tr_session * session,
316                     void func (void*), void * user_data)
317{
318  assert (tr_isSession (session));
319  assert (session->events != NULL);
320
321  if (tr_amInThread (session->events->thread))
322    {
323      (func)(user_data);
324    }
325  else
326    {
327      tr_pipe_end_t fd;
328      char ch;
329      ev_ssize_t res_1;
330      ev_ssize_t res_2;
331      tr_event_handle * e = session->events;
332      struct tr_run_data data;
333
334      tr_lockLock (e->lock);
335
336      fd = e->fds[1];
337      ch = 'r';
338      res_1 = pipewrite (fd, &ch, 1);
339
340      data.func = func;
341      data.user_data = user_data;
342      res_2 = pipewrite (fd, &data, sizeof (data));
343
344      tr_lockUnlock (e->lock);
345
346      if ((res_1 == -1) || (res_2 == -1))
347        tr_logAddError ("Unable to write to libtransmisison event queue: %s", tr_strerror(errno));
348    }
349}
Note: See TracBrowser for help on using the repository browser.