source: trunk/libtransmission/trevent.c @ 11599

Last change on this file since 11599 was 11599, checked in by charles, 11 years ago

(trunk) Join the 21st century and use only 1 space at the end sentences. This commit is nearly as important as the semi-annual ones that remove trailing spaces from the ends of lines of code... :)

  • Property svn:keywords set to Date Rev Author Id
File size: 7.4 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: trevent.c 11599 2010-12-27 19:18:17Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdlib.h>
16#include <string.h>
17#include <stdio.h>
18
19#include <signal.h>
20
21#include <event2/event.h>
22
23#include "transmission.h"
24#include "net.h"
25#include "session.h"
26
27#ifdef WIN32
28
29#include "utils.h"
30#include <winsock2.h>
31
32static int
33pgpipe( int handles[2] )
34{
35    SOCKET s;
36    struct sockaddr_in serv_addr;
37    int len = sizeof( serv_addr );
38
39    handles[0] = handles[1] = INVALID_SOCKET;
40
41    if ( ( s = socket( AF_INET, SOCK_STREAM, 0 ) ) == INVALID_SOCKET )
42    {
43        tr_dbg("pgpipe failed to create socket: %ui", WSAGetLastError());
44        return -1;
45    }
46
47    memset( &serv_addr, 0, sizeof( serv_addr ) );
48    serv_addr.sin_family = AF_INET;
49    serv_addr.sin_port = htons(0);
50    serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
51    if (bind(s, (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
52    {
53        tr_dbg("pgpipe failed to bind: %ui", WSAGetLastError());
54        closesocket(s);
55        return -1;
56    }
57    if (listen(s, 1) == SOCKET_ERROR)
58    {
59        tr_ndbg("event","pgpipe failed to listen: %ui", WSAGetLastError());
60        closesocket(s);
61        return -1;
62    }
63    if (getsockname(s, (SOCKADDR *) & serv_addr, &len) == SOCKET_ERROR)
64    {
65        tr_dbg("pgpipe failed to getsockname: %ui", WSAGetLastError());
66        closesocket(s);
67        return -1;
68    }
69    if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
70    {
71        tr_dbg("pgpipe failed to create socket 2: %ui", WSAGetLastError());
72        closesocket(s);
73        return -1;
74    }
75
76    if (connect(handles[1], (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
77    {
78        tr_dbg("pgpipe failed to connect socket: %ui", WSAGetLastError());
79        closesocket(s);
80        return -1;
81    }
82    if ((handles[0] = accept(s, (SOCKADDR *) & serv_addr, &len)) == INVALID_SOCKET)
83    {
84        tr_dbg("pgpipe failed to accept socket: %ui", WSAGetLastError());
85        closesocket(handles[1]);
86        handles[1] = INVALID_SOCKET;
87        closesocket(s);
88        return -1;
89    }
90    closesocket(s);
91    return 0;
92}
93
94static int
95piperead( int s, char *buf, int len )
96{
97    int ret = recv(s, buf, len, 0);
98
99    if (ret < 0) {
100        const int werror= WSAGetLastError();
101        switch(werror) {
102          /* simplified error mapping (not valid for connect) */
103            case WSAEWOULDBLOCK:
104                errno = EAGAIN;
105                break;
106            case WSAECONNRESET:
107                /* EOF on the pipe! (win32 socket based implementation) */
108                ret = 0;
109                /* fall through */
110            default:
111                errno = werror;
112                break;
113        }
114    } else
115        errno = 0;
116    return ret;
117}
118
119#define pipe(a) pgpipe(a)
120#define pipewrite(a,b,c) send(a,(char*)b,c,0)
121
122#else
123#define piperead(a,b,c) read(a,b,c)
124#define pipewrite(a,b,c) write(a,b,c)
125#endif
126
127#include <unistd.h>
128
129#include "transmission.h"
130#include "platform.h"
131#include "trevent.h"
132#include "utils.h"
133
134/***
135****
136***/
137
138typedef struct tr_event_handle
139{
140    uint8_t      die;
141    int          fds[2];
142    tr_lock *    lock;
143    tr_session *  session;
144    tr_thread *  thread;
145    struct event_base * base;
146    struct event * pipeEvent;
147}
148tr_event_handle;
149
150struct tr_run_data
151{
152    void    ( *func )( void * );
153    void *  user_data;
154};
155
156#define dbgmsg( ... ) \
157    do { \
158        if( tr_deepLoggingIsActive( ) ) \
159            tr_deepLog( __FILE__, __LINE__, "event", __VA_ARGS__ ); \
160    } while( 0 )
161
162static void
163readFromPipe( int    fd,
164              short  eventType,
165              void * veh )
166{
167    char              ch;
168    int               ret;
169    tr_event_handle * eh = veh;
170
171    dbgmsg( "readFromPipe: eventType is %hd", eventType );
172
173    /* read the command type */
174    ch = '\0';
175    do
176    {
177        ret = piperead( fd, &ch, 1 );
178    }
179    while( !eh->die && ret < 0 && errno == EAGAIN );
180
181    dbgmsg( "command is [%c], ret is %d, errno is %d", ch, ret, (int)errno );
182
183    switch( ch )
184    {
185        case 'r': /* run in libevent thread */
186        {
187            struct tr_run_data data;
188            const size_t       nwant = sizeof( data );
189            const ssize_t      ngot = piperead( fd, &data, nwant );
190            if( !eh->die && ( ngot == (ssize_t)nwant ) )
191            {
192                dbgmsg( "invoking function in libevent thread" );
193                ( data.func )( data.user_data );
194            }
195            break;
196        }
197
198        case '\0': /* eof */
199        {
200            dbgmsg( "pipe eof reached... removing event listener" );
201            event_free( eh->pipeEvent );
202            break;
203        }
204
205        default:
206        {
207            assert( 0 && "unhandled command type!" );
208            break;
209        }
210    }
211}
212
213static void
214logFunc( int severity, const char * message )
215{
216    if( severity >= _EVENT_LOG_ERR )
217        tr_err( "%s", message );
218    else
219        tr_dbg( "%s", message );
220}
221
222static void
223libeventThreadFunc( void * veh )
224{
225    struct event_base * base;
226    tr_event_handle * eh = veh;
227
228#ifndef WIN32
229    /* Don't exit when writing on a broken socket */
230    signal( SIGPIPE, SIG_IGN );
231#endif
232
233    base = event_base_new( );
234    eh->base = base;
235    eh->session->event_base = base;
236    eh->session->events = eh;
237
238    /* listen to the pipe's read fd */
239    eh->pipeEvent = event_new( base, eh->fds[0], EV_READ | EV_PERSIST, readFromPipe, veh );
240    event_add( eh->pipeEvent, NULL );
241    event_set_log_callback( logFunc );
242
243    /* loop until all the events are done */
244    while( !eh->die )
245        event_base_dispatch( base );
246
247    /* shut down the thread */
248    tr_lockFree( eh->lock );
249    event_base_free( base );
250    eh->session->events = NULL;
251    tr_free( eh );
252    tr_dbg( "Closing libevent thread" );
253}
254
255void
256tr_eventInit( tr_session * session )
257{
258    tr_event_handle * eh;
259
260    session->events = NULL;
261
262    eh = tr_new0( tr_event_handle, 1 );
263    eh->lock = tr_lockNew( );
264    pipe( eh->fds );
265    eh->session = session;
266    eh->thread = tr_threadNew( libeventThreadFunc, eh );
267
268    /* wait until the libevent thread is running */
269    while( session->events == NULL )
270        tr_wait_msec( 100 );
271}
272
273void
274tr_eventClose( tr_session * session )
275{
276    assert( tr_isSession( session ) );
277
278    session->events->die = TRUE;
279    tr_deepLog( __FILE__, __LINE__, NULL, "closing trevent pipe" );
280    tr_netCloseSocket( session->events->fds[1] );
281}
282
283/**
284***
285**/
286
287tr_bool
288tr_amInEventThread( const tr_session * session )
289{
290    assert( tr_isSession( session ) );
291    assert( session->events != NULL );
292
293    return tr_amInThread( session->events->thread );
294}
295
296/**
297***
298**/
299
300void
301tr_runInEventThread( tr_session * session,
302                     void func( void* ), void * user_data )
303{
304    assert( tr_isSession( session ) );
305    assert( session->events != NULL );
306
307    if( tr_amInThread( session->events->thread ) )
308    {
309        (func)( user_data );
310    }
311    else
312    {
313        const char         ch = 'r';
314        int                fd = session->events->fds[1];
315        tr_lock *          lock = session->events->lock;
316        struct tr_run_data data;
317
318        tr_lockLock( lock );
319        pipewrite( fd, &ch, 1 );
320        data.func = func;
321        data.user_data = user_data;
322        pipewrite( fd, &data, sizeof( data ) );
323        tr_lockUnlock( lock );
324    }
325}
Note: See TracBrowser for help on using the repository browser.