source: trunk/libtransmission/trevent.c @ 14382

Last change on this file since 14382 was 14382, checked in by mikedld, 8 years ago

Fix compilation on Windows

This should not affect non-Win32 platforms in any way.
As for Win32 (both MinGW and MSVC), this should hopefully allow for
unpatched compilation. Correct functioning is not yet guaranteed though.

  • Property svn:keywords set to Date Rev Author Id
File size: 7.8 KB
Line 
1/*
2 * This file Copyright (C) 2007-2014 Mnemosyne LLC
3 *
4 * It may be used under the GNU GPL versions 2 or 3
5 * or any future license endorsed by Mnemosyne LLC.
6 *
7 * $Id: trevent.c 14382 2014-12-13 15:22:39Z mikedld $
8 */
9
10#include <assert.h>
11#include <errno.h>
12#include <string.h>
13
14#include <signal.h>
15
16#ifdef _WIN32
17 #include <winsock2.h>
18#else
19 #include <unistd.h> /* read (), write (), pipe () */
20#endif
21
22#include <event2/dns.h>
23#include <event2/event.h>
24
25#include "transmission.h"
26#include "log.h"
27#include "net.h"
28#include "session.h"
29
30#include "transmission.h"
31#include "platform.h" /* tr_lockLock () */
32#include "trevent.h"
33#include "utils.h"
34
35
36#ifdef _WIN32
37
38static int
39pgpipe (int handles[2])
40{
41    SOCKET s;
42    struct sockaddr_in serv_addr;
43    int len = sizeof (serv_addr);
44
45    handles[0] = handles[1] = INVALID_SOCKET;
46
47    if ((s = socket (AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
48    {
49        tr_logAddDebug ("pgpipe failed to create socket: %ui", WSAGetLastError ());
50        return -1;
51    }
52
53    memset (&serv_addr, 0, sizeof (serv_addr));
54    serv_addr.sin_family = AF_INET;
55    serv_addr.sin_port = htons (0);
56    serv_addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
57    if (bind (s, (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
58    {
59        tr_logAddDebug ("pgpipe failed to bind: %ui", WSAGetLastError ());
60        closesocket (s);
61        return -1;
62    }
63    if (listen (s, 1) == SOCKET_ERROR)
64    {
65        tr_logAddNamedDbg ("event","pgpipe failed to listen: %ui", WSAGetLastError ());
66        closesocket (s);
67        return -1;
68    }
69    if (getsockname (s, (SOCKADDR *) & serv_addr, &len) == SOCKET_ERROR)
70    {
71        tr_logAddDebug ("pgpipe failed to getsockname: %ui", WSAGetLastError ());
72        closesocket (s);
73        return -1;
74    }
75    if ((handles[1] = socket (PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
76    {
77        tr_logAddDebug ("pgpipe failed to create socket 2: %ui", WSAGetLastError ());
78        closesocket (s);
79        return -1;
80    }
81
82    if (connect (handles[1], (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
83    {
84        tr_logAddDebug ("pgpipe failed to connect socket: %ui", WSAGetLastError ());
85        closesocket (s);
86        return -1;
87    }
88    if ((handles[0] = accept (s, (SOCKADDR *) & serv_addr, &len)) == INVALID_SOCKET)
89    {
90        tr_logAddDebug ("pgpipe failed to accept socket: %ui", WSAGetLastError ());
91        closesocket (handles[1]);
92        handles[1] = INVALID_SOCKET;
93        closesocket (s);
94        return -1;
95    }
96    closesocket (s);
97    return 0;
98}
99
100static int
101piperead (int s, char *buf, int len)
102{
103    int ret = recv (s, buf, len, 0);
104
105    if (ret < 0) {
106        const int werror= WSAGetLastError ();
107        switch (werror) {
108          /* simplified error mapping (not valid for connect) */
109            case WSAEWOULDBLOCK:
110                errno = EAGAIN;
111                break;
112            case WSAECONNRESET:
113                /* EOF on the pipe! (win32 socket based implementation) */
114                ret = 0;
115                /* fall through */
116            default:
117                errno = werror;
118                break;
119        }
120    } else
121        errno = 0;
122    return ret;
123}
124
125#define pipe(a) pgpipe (a)
126#define pipewrite(a,b,c) send (a, (char*)b,c,0)
127
128#else
129#define piperead(a,b,c) read (a,b,c)
130#define pipewrite(a,b,c) write (a,b,c)
131#endif
132
133/***
134****
135***/
136
137typedef struct tr_event_handle
138{
139    uint8_t      die;
140    int          fds[2];
141    tr_lock *    lock;
142    tr_session *  session;
143    tr_thread *  thread;
144    struct event_base * base;
145    struct event * pipeEvent;
146}
147tr_event_handle;
148
149struct tr_run_data
150{
151    void  (*func)(void *);
152    void *  user_data;
153};
154
155#define dbgmsg(...) \
156    do { \
157        if (tr_logGetDeepEnabled ()) \
158            tr_logAddDeep (__FILE__, __LINE__, "event", __VA_ARGS__); \
159    } while (0)
160
161static void
162readFromPipe (evutil_socket_t   fd,
163              short             eventType,
164              void            * veh)
165{
166    char              ch;
167    int               ret;
168    tr_event_handle * eh = veh;
169
170    dbgmsg ("readFromPipe: eventType is %hd", eventType);
171
172    /* read the command type */
173    ch = '\0';
174    do
175    {
176        ret = piperead (fd, &ch, 1);
177    }
178    while (!eh->die && ret < 0 && errno == EAGAIN);
179
180    dbgmsg ("command is [%c], ret is %d, errno is %d", ch, ret, (int)errno);
181
182    switch (ch)
183    {
184        case 'r': /* run in libevent thread */
185        {
186            struct tr_run_data data;
187            const size_t       nwant = sizeof (data);
188            const ssize_t      ngot = piperead (fd, &data, nwant);
189            if (!eh->die && (ngot == (ssize_t)nwant))
190            {
191                dbgmsg ("invoking function in libevent thread");
192              (data.func)(data.user_data);
193            }
194            break;
195        }
196
197        case '\0': /* eof */
198        {
199            dbgmsg ("pipe eof reached... removing event listener");
200            event_free (eh->pipeEvent);
201            break;
202        }
203
204        default:
205        {
206            assert (0 && "unhandled command type!");
207            break;
208        }
209    }
210}
211
212static void
213logFunc (int severity, const char * message)
214{
215    if (severity >= _EVENT_LOG_ERR)
216        tr_logAddError ("%s", message);
217    else
218        tr_logAddDebug ("%s", message);
219}
220
221static void
222libeventThreadFunc (void * veh)
223{
224    struct event_base * base;
225    tr_event_handle * eh = veh;
226
227#ifndef _WIN32
228    /* Don't exit when writing on a broken socket */
229    signal (SIGPIPE, SIG_IGN);
230#endif
231
232    /* create the libevent bases */
233    base = event_base_new ();
234
235    /* set the struct's fields */
236    eh->base = base;
237    eh->session->event_base = base;
238    eh->session->evdns_base = evdns_base_new (base, true);
239    eh->session->events = eh;
240
241    /* listen to the pipe's read fd */
242    eh->pipeEvent = event_new (base, eh->fds[0], EV_READ | EV_PERSIST, readFromPipe, veh);
243    event_add (eh->pipeEvent, NULL);
244    event_set_log_callback (logFunc);
245
246    /* loop until all the events are done */
247    while (!eh->die)
248        event_base_dispatch (base);
249
250    /* shut down the thread */
251    tr_lockFree (eh->lock);
252    event_base_free (base);
253    eh->session->events = NULL;
254    tr_free (eh);
255    tr_logAddDebug ("Closing libevent thread");
256}
257
258void
259tr_eventInit (tr_session * session)
260{
261    tr_event_handle * eh;
262
263    session->events = NULL;
264
265    eh = tr_new0 (tr_event_handle, 1);
266    eh->lock = tr_lockNew ();
267    if (pipe (eh->fds) == -1)
268      tr_logAddError ("Unable to write to pipe() in libtransmission: %s", tr_strerror(errno));
269    eh->session = session;
270    eh->thread = tr_threadNew (libeventThreadFunc, eh);
271
272    /* wait until the libevent thread is running */
273    while (session->events == NULL)
274        tr_wait_msec (100);
275}
276
277void
278tr_eventClose (tr_session * session)
279{
280    assert (tr_isSession (session));
281
282    session->events->die = true;
283    tr_logAddDeep (__FILE__, __LINE__, NULL, "closing trevent pipe");
284    tr_netCloseSocket (session->events->fds[1]);
285}
286
287/**
288***
289**/
290
291bool
292tr_amInEventThread (const tr_session * session)
293{
294    assert (tr_isSession (session));
295    assert (session->events != NULL);
296
297    return tr_amInThread (session->events->thread);
298}
299
300/**
301***
302**/
303
304void
305tr_runInEventThread (tr_session * session,
306                     void func (void*), void * user_data)
307{
308  assert (tr_isSession (session));
309  assert (session->events != NULL);
310
311  if (tr_amInThread (session->events->thread))
312    {
313      (func)(user_data);
314    }
315  else
316    {
317      int fd;
318      char ch;
319      ssize_t res_1;
320      ssize_t res_2;
321      tr_event_handle * e = session->events;
322      struct tr_run_data data;
323
324      tr_lockLock (e->lock);
325
326      fd = e->fds[1];
327      ch = 'r';
328      res_1 = pipewrite (fd, &ch, 1);
329
330      data.func = func;
331      data.user_data = user_data;
332      res_2 = pipewrite (fd, &data, sizeof (data));
333
334      tr_lockUnlock (e->lock);
335
336      if ((res_1 == -1) || (res_2 == -1))
337        tr_logAddError ("Unable to write to libtransmisison event queue: %s", tr_strerror(errno));
338    }
339}
Note: See TracBrowser for help on using the repository browser.