source: trunk/libtransmission/trevent.c @ 14316

Last change on this file since 14316 was 14316, checked in by livings124, 8 years ago

Use built-in _WIN32 macro instead of WIN32

  • Property svn:keywords set to Date Rev Author Id
File size: 7.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2014 Mnemosyne LLC
3 *
4 * It may be used under the GNU GPL versions 2 or 3
5 * or any future license endorsed by Mnemosyne LLC.
6 *
7 * $Id: trevent.c 14316 2014-07-04 00:00:07Z livings124 $
8 */
9
10#include <assert.h>
11#include <errno.h>
12#include <string.h>
13
14#include <signal.h>
15
16#include <event2/dns.h>
17#include <event2/event.h>
18
19#include "transmission.h"
20#include "log.h"
21#include "net.h"
22#include "session.h"
23
24#ifdef _WIN32
25
26#include "utils.h"
27#include <winsock2.h>
28
29static int
30pgpipe (int handles[2])
31{
32    SOCKET s;
33    struct sockaddr_in serv_addr;
34    int len = sizeof (serv_addr);
35
36    handles[0] = handles[1] = INVALID_SOCKET;
37
38    if ((s = socket (AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
39    {
40        tr_logAddDebug ("pgpipe failed to create socket: %ui", WSAGetLastError ());
41        return -1;
42    }
43
44    memset (&serv_addr, 0, sizeof (serv_addr));
45    serv_addr.sin_family = AF_INET;
46    serv_addr.sin_port = htons (0);
47    serv_addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
48    if (bind (s, (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
49    {
50        tr_logAddDebug ("pgpipe failed to bind: %ui", WSAGetLastError ());
51        closesocket (s);
52        return -1;
53    }
54    if (listen (s, 1) == SOCKET_ERROR)
55    {
56        tr_logAddNamedDbg ("event","pgpipe failed to listen: %ui", WSAGetLastError ());
57        closesocket (s);
58        return -1;
59    }
60    if (getsockname (s, (SOCKADDR *) & serv_addr, &len) == SOCKET_ERROR)
61    {
62        tr_logAddDebug ("pgpipe failed to getsockname: %ui", WSAGetLastError ());
63        closesocket (s);
64        return -1;
65    }
66    if ((handles[1] = socket (PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
67    {
68        tr_logAddDebug ("pgpipe failed to create socket 2: %ui", WSAGetLastError ());
69        closesocket (s);
70        return -1;
71    }
72
73    if (connect (handles[1], (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
74    {
75        tr_logAddDebug ("pgpipe failed to connect socket: %ui", WSAGetLastError ());
76        closesocket (s);
77        return -1;
78    }
79    if ((handles[0] = accept (s, (SOCKADDR *) & serv_addr, &len)) == INVALID_SOCKET)
80    {
81        tr_logAddDebug ("pgpipe failed to accept socket: %ui", WSAGetLastError ());
82        closesocket (handles[1]);
83        handles[1] = INVALID_SOCKET;
84        closesocket (s);
85        return -1;
86    }
87    closesocket (s);
88    return 0;
89}
90
91static int
92piperead (int s, char *buf, int len)
93{
94    int ret = recv (s, buf, len, 0);
95
96    if (ret < 0) {
97        const int werror= WSAGetLastError ();
98        switch (werror) {
99          /* simplified error mapping (not valid for connect) */
100            case WSAEWOULDBLOCK:
101                errno = EAGAIN;
102                break;
103            case WSAECONNRESET:
104                /* EOF on the pipe! (win32 socket based implementation) */
105                ret = 0;
106                /* fall through */
107            default:
108                errno = werror;
109                break;
110        }
111    } else
112        errno = 0;
113    return ret;
114}
115
116#define pipe(a) pgpipe (a)
117#define pipewrite(a,b,c) send (a, (char*)b,c,0)
118
119#else
120#define piperead(a,b,c) read (a,b,c)
121#define pipewrite(a,b,c) write (a,b,c)
122#endif
123
124#include <unistd.h> /* read (), write (), pipe () */
125
126#include "transmission.h"
127#include "platform.h" /* tr_lockLock () */
128#include "trevent.h"
129#include "utils.h"
130
131/***
132****
133***/
134
135typedef struct tr_event_handle
136{
137    uint8_t      die;
138    int          fds[2];
139    tr_lock *    lock;
140    tr_session *  session;
141    tr_thread *  thread;
142    struct event_base * base;
143    struct event * pipeEvent;
144}
145tr_event_handle;
146
147struct tr_run_data
148{
149    void  (*func)(void *);
150    void *  user_data;
151};
152
153#define dbgmsg(...) \
154    do { \
155        if (tr_logGetDeepEnabled ()) \
156            tr_logAddDeep (__FILE__, __LINE__, "event", __VA_ARGS__); \
157    } while (0)
158
159static void
160readFromPipe (evutil_socket_t   fd,
161              short             eventType,
162              void            * veh)
163{
164    char              ch;
165    int               ret;
166    tr_event_handle * eh = veh;
167
168    dbgmsg ("readFromPipe: eventType is %hd", eventType);
169
170    /* read the command type */
171    ch = '\0';
172    do
173    {
174        ret = piperead (fd, &ch, 1);
175    }
176    while (!eh->die && ret < 0 && errno == EAGAIN);
177
178    dbgmsg ("command is [%c], ret is %d, errno is %d", ch, ret, (int)errno);
179
180    switch (ch)
181    {
182        case 'r': /* run in libevent thread */
183        {
184            struct tr_run_data data;
185            const size_t       nwant = sizeof (data);
186            const ssize_t      ngot = piperead (fd, &data, nwant);
187            if (!eh->die && (ngot == (ssize_t)nwant))
188            {
189                dbgmsg ("invoking function in libevent thread");
190              (data.func)(data.user_data);
191            }
192            break;
193        }
194
195        case '\0': /* eof */
196        {
197            dbgmsg ("pipe eof reached... removing event listener");
198            event_free (eh->pipeEvent);
199            break;
200        }
201
202        default:
203        {
204            assert (0 && "unhandled command type!");
205            break;
206        }
207    }
208}
209
210static void
211logFunc (int severity, const char * message)
212{
213    if (severity >= _EVENT_LOG_ERR)
214        tr_logAddError ("%s", message);
215    else
216        tr_logAddDebug ("%s", message);
217}
218
219static void
220libeventThreadFunc (void * veh)
221{
222    struct event_base * base;
223    tr_event_handle * eh = veh;
224
225#ifndef _WIN32
226    /* Don't exit when writing on a broken socket */
227    signal (SIGPIPE, SIG_IGN);
228#endif
229
230    /* create the libevent bases */
231    base = event_base_new ();
232
233    /* set the struct's fields */
234    eh->base = base;
235    eh->session->event_base = base;
236    eh->session->evdns_base = evdns_base_new (base, true);
237    eh->session->events = eh;
238
239    /* listen to the pipe's read fd */
240    eh->pipeEvent = event_new (base, eh->fds[0], EV_READ | EV_PERSIST, readFromPipe, veh);
241    event_add (eh->pipeEvent, NULL);
242    event_set_log_callback (logFunc);
243
244    /* loop until all the events are done */
245    while (!eh->die)
246        event_base_dispatch (base);
247
248    /* shut down the thread */
249    tr_lockFree (eh->lock);
250    event_base_free (base);
251    eh->session->events = NULL;
252    tr_free (eh);
253    tr_logAddDebug ("Closing libevent thread");
254}
255
256void
257tr_eventInit (tr_session * session)
258{
259    tr_event_handle * eh;
260
261    session->events = NULL;
262
263    eh = tr_new0 (tr_event_handle, 1);
264    eh->lock = tr_lockNew ();
265    if (pipe (eh->fds) == -1)
266      tr_logAddError ("Unable to write to pipe() in libtransmission: %s", tr_strerror(errno));
267    eh->session = session;
268    eh->thread = tr_threadNew (libeventThreadFunc, eh);
269
270    /* wait until the libevent thread is running */
271    while (session->events == NULL)
272        tr_wait_msec (100);
273}
274
275void
276tr_eventClose (tr_session * session)
277{
278    assert (tr_isSession (session));
279
280    session->events->die = true;
281    tr_logAddDeep (__FILE__, __LINE__, NULL, "closing trevent pipe");
282    tr_netCloseSocket (session->events->fds[1]);
283}
284
285/**
286***
287**/
288
289bool
290tr_amInEventThread (const tr_session * session)
291{
292    assert (tr_isSession (session));
293    assert (session->events != NULL);
294
295    return tr_amInThread (session->events->thread);
296}
297
298/**
299***
300**/
301
302void
303tr_runInEventThread (tr_session * session,
304                     void func (void*), void * user_data)
305{
306  assert (tr_isSession (session));
307  assert (session->events != NULL);
308
309  if (tr_amInThread (session->events->thread))
310    {
311      (func)(user_data);
312    }
313  else
314    {
315      int fd;
316      char ch;
317      ssize_t res_1;
318      ssize_t res_2;
319      tr_event_handle * e = session->events;
320      struct tr_run_data data;
321
322      tr_lockLock (e->lock);
323
324      fd = e->fds[1];
325      ch = 'r';
326      res_1 = pipewrite (fd, &ch, 1);
327
328      data.func = func;
329      data.user_data = user_data;
330      res_2 = pipewrite (fd, &data, sizeof (data));
331
332      tr_lockUnlock (e->lock);
333
334      if ((res_1 == -1) || (res_2 == -1))
335        tr_logAddError ("Unable to write to libtransmisison event queue: %s", tr_strerror(errno));
336    }
337}
Note: See TracBrowser for help on using the repository browser.