source: trunk/libtransmission/trevent.c @ 9992

Last change on this file since 9992 was 9992, checked in by charles, 12 years ago

(trunk libT) if we're calling evdns_init(), we probably ought to call evdns_shutdown() on exit...

  • Property svn:keywords set to Date Rev Author Id
File size: 7.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: trevent.c 9992 2010-01-22 03:39:21Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdlib.h>
16#include <string.h>
17#include <stdio.h>
18
19#include <signal.h>
20
21#include <event.h>
22
23#include "transmission.h"
24#include "net.h"
25#include "session.h"
26
27#ifdef WIN32
28
29#include <WinSock2.h>
30
31static int
32pgpipe( int handles[2] )
33{
34        SOCKET s;
35        struct sockaddr_in serv_addr;
36        int len = sizeof( serv_addr );
37
38        handles[0] = handles[1] = INVALID_SOCKET;
39
40        if ( ( s = socket( AF_INET, SOCK_STREAM, 0 ) ) == INVALID_SOCKET )
41        {
42/*              ereport(LOG, (errmsg_internal("pgpipe failed to create socket: %ui", WSAGetLastError()))); */
43                return -1;
44        }
45
46        memset( &serv_addr, 0, sizeof( serv_addr ) );
47        serv_addr.sin_family = AF_INET;
48        serv_addr.sin_port = htons(0);
49        serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
50        if (bind(s, (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
51        {
52/*              ereport(LOG, (errmsg_internal("pgpipe failed to bind: %ui", WSAGetLastError()))); */
53                closesocket(s);
54                return -1;
55        }
56        if (listen(s, 1) == SOCKET_ERROR)
57        {
58/*              ereport(LOG, (errmsg_internal("pgpipe failed to listen: %ui", WSAGetLastError()))); */
59                closesocket(s);
60                return -1;
61        }
62        if (getsockname(s, (SOCKADDR *) & serv_addr, &len) == SOCKET_ERROR)
63        {
64/*              ereport(LOG, (errmsg_internal("pgpipe failed to getsockname: %ui", WSAGetLastError()))); */
65                closesocket(s);
66                return -1;
67        }
68        if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
69        {
70/*              ereport(LOG, (errmsg_internal("pgpipe failed to create socket 2: %ui", WSAGetLastError()))); */
71                closesocket(s);
72                return -1;
73        }
74
75        if (connect(handles[1], (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
76        {
77/*              ereport(LOG, (errmsg_internal("pgpipe failed to connect socket: %ui", WSAGetLastError()))); */
78                closesocket(s);
79                return -1;
80        }
81        if ((handles[0] = accept(s, (SOCKADDR *) & serv_addr, &len)) == INVALID_SOCKET)
82        {
83/*              ereport(LOG, (errmsg_internal("pgpipe failed to accept socket: %ui", WSAGetLastError()))); */
84                closesocket(handles[1]);
85                handles[1] = INVALID_SOCKET;
86                closesocket(s);
87                return -1;
88        }
89        closesocket(s);
90        return 0;
91}
92
93static int
94piperead( int s, char *buf, int len )
95{
96        int ret = recv(s, buf, len, 0);
97
98        if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
99                /* EOF on the pipe! (win32 socket based implementation) */
100                ret = 0;
101        return ret;
102}
103
104#define pipe(a) pgpipe(a)
105#define pipewrite(a,b,c) send(a,(char*)b,c,0)
106
107#else
108#define piperead(a,b,c) read(a,b,c)
109#define pipewrite(a,b,c) write(a,b,c)
110#endif
111
112#include <unistd.h>
113
114#include <event.h>
115
116#include "transmission.h"
117#include "platform.h"
118#include "trevent.h"
119#include "utils.h"
120
121/***
122****
123***/
124
125typedef struct tr_event_handle
126{
127    uint8_t      die;
128    int          fds[2];
129    tr_lock *    lock;
130    tr_session *  session;
131    tr_thread *  thread;
132    struct event_base * base;
133    struct event pipeEvent;
134}
135tr_event_handle;
136
137struct tr_run_data
138{
139    void    ( *func )( void * );
140    void *  user_data;
141};
142
143#define dbgmsg( ... ) \
144    do { \
145        if( tr_deepLoggingIsActive( ) ) \
146            tr_deepLog( __FILE__, __LINE__, "event", __VA_ARGS__ ); \
147    } while( 0 )
148
149static void
150readFromPipe( int    fd,
151              short  eventType,
152              void * veh )
153{
154    char              ch;
155    int               ret;
156    tr_event_handle * eh = veh;
157
158    dbgmsg( "readFromPipe: eventType is %hd", eventType );
159
160    /* read the command type */
161    ch = '\0';
162    do
163    {
164        ret = piperead( fd, &ch, 1 );
165    }
166    while( !eh->die && ret < 0 && errno == EAGAIN );
167
168    dbgmsg( "command is [%c], ret is %d, errno is %d", ch, ret, (int)errno );
169
170    switch( ch )
171    {
172        case 'r': /* run in libevent thread */
173        {
174            struct tr_run_data data;
175            const size_t       nwant = sizeof( data );
176            const ssize_t      ngot = piperead( fd, &data, nwant );
177            if( !eh->die && ( ngot == (ssize_t)nwant ) )
178            {
179                dbgmsg( "invoking function in libevent thread" );
180                ( data.func )( data.user_data );
181            }
182            break;
183        }
184
185        case '\0': /* eof */
186        {
187            dbgmsg( "pipe eof reached... removing event listener" );
188            event_del( &eh->pipeEvent );
189            break;
190        }
191
192        default:
193        {
194            assert( 0 && "unhandled command type!" );
195            break;
196        }
197    }
198}
199
200static void
201logFunc( int severity, const char * message )
202{
203    if( severity >= _EVENT_LOG_ERR )
204        tr_err( "%s", message );
205    else
206        tr_dbg( "%s", message );
207}
208
209static void
210libeventThreadFunc( void * veh )
211{
212    tr_event_handle * eh = veh;
213    tr_dbg( "Starting libevent thread" );
214
215#ifndef WIN32
216    /* Don't exit when writing on a broken socket */
217    signal( SIGPIPE, SIG_IGN );
218#endif
219
220    eh->base = event_init( );
221    eh->session->events = eh;
222
223    /* listen to the pipe's read fd */
224    event_set( &eh->pipeEvent, eh->fds[0], EV_READ | EV_PERSIST, readFromPipe, veh );
225    event_add( &eh->pipeEvent, NULL );
226    event_set_log_callback( logFunc );
227
228    /* loop until all the events are done */
229    while( !eh->die )
230        event_dispatch( );
231
232    /* shut down the thread */
233    tr_lockFree( eh->lock );
234    event_base_free( eh->base );
235    eh->session->events = NULL;
236    tr_free( eh );
237    tr_dbg( "Closing libevent thread" );
238}
239
240void
241tr_eventInit( tr_session * session )
242{
243    tr_event_handle * eh;
244
245    session->events = NULL;
246
247    eh = tr_new0( tr_event_handle, 1 );
248    eh->lock = tr_lockNew( );
249    pipe( eh->fds );
250    eh->session = session;
251    eh->thread = tr_threadNew( libeventThreadFunc, eh );
252
253    /* wait until the libevent thread is running */
254    while( session->events == NULL )
255        tr_wait_msec( 100 );
256}
257
258void
259tr_eventClose( tr_session * session )
260{
261    assert( tr_isSession( session ) );
262
263    session->events->die = TRUE;
264    tr_deepLog( __FILE__, __LINE__, NULL, "closing trevent pipe" );
265    tr_netCloseSocket( session->events->fds[1] );
266}
267
268/**
269***
270**/
271
272tr_bool
273tr_amInEventThread( const tr_session * session )
274{
275    assert( tr_isSession( session ) );
276    assert( session->events != NULL );
277
278    return tr_amInThread( session->events->thread );
279}
280
281/**
282***
283**/
284
285void
286tr_runInEventThread( tr_session * session,
287                     void func( void* ), void * user_data )
288{
289    assert( tr_isSession( session ) );
290    assert( session->events != NULL );
291
292    if( tr_amInThread( session->events->thread ) )
293    {
294        (func)( user_data );
295    }
296    else
297    {
298        const char         ch = 'r';
299        int                fd = session->events->fds[1];
300        tr_lock *          lock = session->events->lock;
301        struct tr_run_data data;
302
303        tr_lockLock( lock );
304        pipewrite( fd, &ch, 1 );
305        data.func = func;
306        data.user_data = user_data;
307        pipewrite( fd, &data, sizeof( data ) );
308        tr_lockUnlock( lock );
309    }
310}
311
312struct event_base *
313tr_eventGetBase( tr_session * session )
314{
315    assert( tr_isSession( session ) );
316
317    return session->events->base;
318}
Note: See TracBrowser for help on using the repository browser.