source: trunk/libtransmission/trevent.c @ 10858

Last change on this file since 10858 was 10858, checked in by charles, 12 years ago

(trunk libT) #3311 "MingW build of Transmission" -- apply further win32 diffs from rb07

  • Property svn:keywords set to Date Rev Author Id
File size: 7.5 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: trevent.c 10858 2010-06-25 20:36:10Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdlib.h>
16#include <string.h>
17#include <stdio.h>
18
19#include <signal.h>
20
21#include <event.h>
22
23#include "transmission.h"
24#include "net.h"
25#include "session.h"
26
27#ifdef WIN32
28
29#include "utils.h"
30#include <winsock2.h>
31
32static int
33pgpipe( int handles[2] )
34{
35    SOCKET s;
36    struct sockaddr_in serv_addr;
37    int len = sizeof( serv_addr );
38
39    handles[0] = handles[1] = INVALID_SOCKET;
40
41    if ( ( s = socket( AF_INET, SOCK_STREAM, 0 ) ) == INVALID_SOCKET )
42    {
43        tr_dbg("pgpipe failed to create socket: %ui", WSAGetLastError());
44        return -1;
45    }
46
47    memset( &serv_addr, 0, sizeof( serv_addr ) );
48    serv_addr.sin_family = AF_INET;
49    serv_addr.sin_port = htons(0);
50    serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
51    if (bind(s, (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
52    {
53        tr_dbg("pgpipe failed to bind: %ui", WSAGetLastError());
54        closesocket(s);
55        return -1;
56    }
57    if (listen(s, 1) == SOCKET_ERROR)
58    {
59        tr_ndbg("event","pgpipe failed to listen: %ui", WSAGetLastError());
60        closesocket(s);
61        return -1;
62    }
63    if (getsockname(s, (SOCKADDR *) & serv_addr, &len) == SOCKET_ERROR)
64    {
65        tr_dbg("pgpipe failed to getsockname: %ui", WSAGetLastError());
66        closesocket(s);
67        return -1;
68    }
69    if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
70    {
71        tr_dbg("pgpipe failed to create socket 2: %ui", WSAGetLastError());
72        closesocket(s);
73        return -1;
74    }
75
76    if (connect(handles[1], (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
77    {
78        tr_dbg("pgpipe failed to connect socket: %ui", WSAGetLastError());
79        closesocket(s);
80        return -1;
81    }
82    if ((handles[0] = accept(s, (SOCKADDR *) & serv_addr, &len)) == INVALID_SOCKET)
83    {
84        tr_dbg("pgpipe failed to accept socket: %ui", WSAGetLastError());
85        closesocket(handles[1]);
86        handles[1] = INVALID_SOCKET;
87        closesocket(s);
88        return -1;
89    }
90    closesocket(s);
91    return 0;
92}
93
94static int
95piperead( int s, char *buf, int len )
96{
97    int ret = recv(s, buf, len, 0);
98    int werror = 0;
99
100    if (ret < 0) {
101        werror= WSAGetLastError();
102        switch(werror) {
103          /* simplified error mapping (not valid for connect) */
104            case WSAEWOULDBLOCK:
105                errno = EAGAIN;
106                break;
107            case WSAECONNRESET:
108                /* EOF on the pipe! (win32 socket based implementation) */
109                ret = 0;
110                /* fall through */
111            default:
112                errno = werror;
113                break;
114        }
115    } else
116        errno = 0;
117    return ret;
118}
119
120#define pipe(a) pgpipe(a)
121#define pipewrite(a,b,c) send(a,(char*)b,c,0)
122
123#else
124#define piperead(a,b,c) read(a,b,c)
125#define pipewrite(a,b,c) write(a,b,c)
126#endif
127
128#include <unistd.h>
129
130#include <event.h>
131
132#include "transmission.h"
133#include "platform.h"
134#include "trevent.h"
135#include "utils.h"
136
137/***
138****
139***/
140
141typedef struct tr_event_handle
142{
143    uint8_t      die;
144    int          fds[2];
145    tr_lock *    lock;
146    tr_session *  session;
147    tr_thread *  thread;
148    struct event_base * base;
149    struct event pipeEvent;
150}
151tr_event_handle;
152
153struct tr_run_data
154{
155    void    ( *func )( void * );
156    void *  user_data;
157};
158
159#define dbgmsg( ... ) \
160    do { \
161        if( tr_deepLoggingIsActive( ) ) \
162            tr_deepLog( __FILE__, __LINE__, "event", __VA_ARGS__ ); \
163    } while( 0 )
164
165static void
166readFromPipe( int    fd,
167              short  eventType,
168              void * veh )
169{
170    char              ch;
171    int               ret;
172    tr_event_handle * eh = veh;
173
174    dbgmsg( "readFromPipe: eventType is %hd", eventType );
175
176    /* read the command type */
177    ch = '\0';
178    do
179    {
180        ret = piperead( fd, &ch, 1 );
181    }
182    while( !eh->die && ret < 0 && errno == EAGAIN );
183
184    dbgmsg( "command is [%c], ret is %d, errno is %d", ch, ret, (int)errno );
185
186    switch( ch )
187    {
188        case 'r': /* run in libevent thread */
189        {
190            struct tr_run_data data;
191            const size_t       nwant = sizeof( data );
192            const ssize_t      ngot = piperead( fd, &data, nwant );
193            if( !eh->die && ( ngot == (ssize_t)nwant ) )
194            {
195                dbgmsg( "invoking function in libevent thread" );
196                ( data.func )( data.user_data );
197            }
198            break;
199        }
200
201        case '\0': /* eof */
202        {
203            dbgmsg( "pipe eof reached... removing event listener" );
204            event_del( &eh->pipeEvent );
205            break;
206        }
207
208        default:
209        {
210            assert( 0 && "unhandled command type!" );
211            break;
212        }
213    }
214}
215
216static void
217logFunc( int severity, const char * message )
218{
219    if( severity >= _EVENT_LOG_ERR )
220        tr_err( "%s", message );
221    else
222        tr_dbg( "%s", message );
223}
224
225static void
226libeventThreadFunc( void * veh )
227{
228    tr_event_handle * eh = veh;
229
230#ifndef WIN32
231    /* Don't exit when writing on a broken socket */
232    signal( SIGPIPE, SIG_IGN );
233#endif
234
235    eh->base = event_init( );
236    eh->session->events = eh;
237
238    /* listen to the pipe's read fd */
239    event_set( &eh->pipeEvent, eh->fds[0], EV_READ | EV_PERSIST, readFromPipe, veh );
240    event_add( &eh->pipeEvent, NULL );
241    event_set_log_callback( logFunc );
242
243    /* loop until all the events are done */
244    while( !eh->die )
245        event_dispatch( );
246
247    /* shut down the thread */
248    tr_lockFree( eh->lock );
249    event_base_free( eh->base );
250    eh->session->events = NULL;
251    tr_free( eh );
252    tr_dbg( "Closing libevent thread" );
253}
254
255void
256tr_eventInit( tr_session * session )
257{
258    tr_event_handle * eh;
259
260    session->events = NULL;
261
262    eh = tr_new0( tr_event_handle, 1 );
263    eh->lock = tr_lockNew( );
264    pipe( eh->fds );
265    eh->session = session;
266    eh->thread = tr_threadNew( libeventThreadFunc, eh );
267
268    /* wait until the libevent thread is running */
269    while( session->events == NULL )
270        tr_wait_msec( 100 );
271}
272
273void
274tr_eventClose( tr_session * session )
275{
276    assert( tr_isSession( session ) );
277
278    session->events->die = TRUE;
279    tr_deepLog( __FILE__, __LINE__, NULL, "closing trevent pipe" );
280    tr_netCloseSocket( session->events->fds[1] );
281}
282
283/**
284***
285**/
286
287tr_bool
288tr_amInEventThread( const tr_session * session )
289{
290    assert( tr_isSession( session ) );
291    assert( session->events != NULL );
292
293    return tr_amInThread( session->events->thread );
294}
295
296/**
297***
298**/
299
300void
301tr_runInEventThread( tr_session * session,
302                     void func( void* ), void * user_data )
303{
304    assert( tr_isSession( session ) );
305    assert( session->events != NULL );
306
307    if( tr_amInThread( session->events->thread ) )
308    {
309        (func)( user_data );
310    }
311    else
312    {
313        const char         ch = 'r';
314        int                fd = session->events->fds[1];
315        tr_lock *          lock = session->events->lock;
316        struct tr_run_data data;
317
318        tr_lockLock( lock );
319        pipewrite( fd, &ch, 1 );
320        data.func = func;
321        data.user_data = user_data;
322        pipewrite( fd, &data, sizeof( data ) );
323        tr_lockUnlock( lock );
324    }
325}
326
327struct event_base *
328tr_eventGetBase( tr_session * session )
329{
330    assert( tr_isSession( session ) );
331
332    return session->events->base;
333}
Note: See TracBrowser for help on using the repository browser.