source: trunk/libtransmission/trevent.c @ 12545

Last change on this file since 12545 was 12228, checked in by jordan, 11 years ago

(trunk libT) still fiddling around with #includes -- this time removing unncecessary libT includes from libT .c files

  • Property svn:keywords set to Date Rev Author Id
File size: 7.6 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: trevent.c 12228 2011-03-25 01:41:57Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h>
16
17#include <signal.h>
18
19#include <event2/dns.h>
20#include <event2/event.h>
21
22#include "transmission.h"
23#include "net.h"
24#include "session.h"
25
26#ifdef WIN32
27
28#include "utils.h"
29#include <winsock2.h>
30
31static int
32pgpipe( int handles[2] )
33{
34    SOCKET s;
35    struct sockaddr_in serv_addr;
36    int len = sizeof( serv_addr );
37
38    handles[0] = handles[1] = INVALID_SOCKET;
39
40    if ( ( s = socket( AF_INET, SOCK_STREAM, 0 ) ) == INVALID_SOCKET )
41    {
42        tr_dbg("pgpipe failed to create socket: %ui", WSAGetLastError());
43        return -1;
44    }
45
46    memset( &serv_addr, 0, sizeof( serv_addr ) );
47    serv_addr.sin_family = AF_INET;
48    serv_addr.sin_port = htons(0);
49    serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
50    if (bind(s, (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
51    {
52        tr_dbg("pgpipe failed to bind: %ui", WSAGetLastError());
53        closesocket(s);
54        return -1;
55    }
56    if (listen(s, 1) == SOCKET_ERROR)
57    {
58        tr_ndbg("event","pgpipe failed to listen: %ui", WSAGetLastError());
59        closesocket(s);
60        return -1;
61    }
62    if (getsockname(s, (SOCKADDR *) & serv_addr, &len) == SOCKET_ERROR)
63    {
64        tr_dbg("pgpipe failed to getsockname: %ui", WSAGetLastError());
65        closesocket(s);
66        return -1;
67    }
68    if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
69    {
70        tr_dbg("pgpipe failed to create socket 2: %ui", WSAGetLastError());
71        closesocket(s);
72        return -1;
73    }
74
75    if (connect(handles[1], (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
76    {
77        tr_dbg("pgpipe failed to connect socket: %ui", WSAGetLastError());
78        closesocket(s);
79        return -1;
80    }
81    if ((handles[0] = accept(s, (SOCKADDR *) & serv_addr, &len)) == INVALID_SOCKET)
82    {
83        tr_dbg("pgpipe failed to accept socket: %ui", WSAGetLastError());
84        closesocket(handles[1]);
85        handles[1] = INVALID_SOCKET;
86        closesocket(s);
87        return -1;
88    }
89    closesocket(s);
90    return 0;
91}
92
93static int
94piperead( int s, char *buf, int len )
95{
96    int ret = recv(s, buf, len, 0);
97
98    if (ret < 0) {
99        const int werror= WSAGetLastError();
100        switch(werror) {
101          /* simplified error mapping (not valid for connect) */
102            case WSAEWOULDBLOCK:
103                errno = EAGAIN;
104                break;
105            case WSAECONNRESET:
106                /* EOF on the pipe! (win32 socket based implementation) */
107                ret = 0;
108                /* fall through */
109            default:
110                errno = werror;
111                break;
112        }
113    } else
114        errno = 0;
115    return ret;
116}
117
118#define pipe(a) pgpipe(a)
119#define pipewrite(a,b,c) send(a,(char*)b,c,0)
120
121#else
122#define piperead(a,b,c) read(a,b,c)
123#define pipewrite(a,b,c) write(a,b,c)
124#endif
125
126#include <unistd.h> /* read(), write(), pipe() */
127
128#include "transmission.h"
129#include "platform.h" /* tr_lockLock() */
130#include "trevent.h"
131#include "utils.h"
132
133/***
134****
135***/
136
137typedef struct tr_event_handle
138{
139    uint8_t      die;
140    int          fds[2];
141    tr_lock *    lock;
142    tr_session *  session;
143    tr_thread *  thread;
144    struct event_base * base;
145    struct event * pipeEvent;
146}
147tr_event_handle;
148
149struct tr_run_data
150{
151    void    ( *func )( void * );
152    void *  user_data;
153};
154
155#define dbgmsg( ... ) \
156    do { \
157        if( tr_deepLoggingIsActive( ) ) \
158            tr_deepLog( __FILE__, __LINE__, "event", __VA_ARGS__ ); \
159    } while( 0 )
160
161static void
162readFromPipe( int    fd,
163              short  eventType,
164              void * veh )
165{
166    char              ch;
167    int               ret;
168    tr_event_handle * eh = veh;
169
170    dbgmsg( "readFromPipe: eventType is %hd", eventType );
171
172    /* read the command type */
173    ch = '\0';
174    do
175    {
176        ret = piperead( fd, &ch, 1 );
177    }
178    while( !eh->die && ret < 0 && errno == EAGAIN );
179
180    dbgmsg( "command is [%c], ret is %d, errno is %d", ch, ret, (int)errno );
181
182    switch( ch )
183    {
184        case 'r': /* run in libevent thread */
185        {
186            struct tr_run_data data;
187            const size_t       nwant = sizeof( data );
188            const ssize_t      ngot = piperead( fd, &data, nwant );
189            if( !eh->die && ( ngot == (ssize_t)nwant ) )
190            {
191                dbgmsg( "invoking function in libevent thread" );
192                ( data.func )( data.user_data );
193            }
194            break;
195        }
196
197        case '\0': /* eof */
198        {
199            dbgmsg( "pipe eof reached... removing event listener" );
200            event_free( eh->pipeEvent );
201            break;
202        }
203
204        default:
205        {
206            assert( 0 && "unhandled command type!" );
207            break;
208        }
209    }
210}
211
212static void
213logFunc( int severity, const char * message )
214{
215    if( severity >= _EVENT_LOG_ERR )
216        tr_err( "%s", message );
217    else
218        tr_dbg( "%s", message );
219}
220
221static void
222libeventThreadFunc( void * veh )
223{
224    struct event_base * base;
225    tr_event_handle * eh = veh;
226
227#ifndef WIN32
228    /* Don't exit when writing on a broken socket */
229    signal( SIGPIPE, SIG_IGN );
230#endif
231
232    /* create the libevent bases */
233    base = event_base_new( );
234
235    /* set the struct's fields */
236    eh->base = base;
237    eh->session->event_base = base;
238    eh->session->evdns_base = evdns_base_new( base, true );
239    eh->session->events = eh;
240
241    /* listen to the pipe's read fd */
242    eh->pipeEvent = event_new( base, eh->fds[0], EV_READ | EV_PERSIST, readFromPipe, veh );
243    event_add( eh->pipeEvent, NULL );
244    event_set_log_callback( logFunc );
245
246    /* loop until all the events are done */
247    while( !eh->die )
248        event_base_dispatch( base );
249
250    /* shut down the thread */
251    tr_lockFree( eh->lock );
252    event_base_free( base );
253    eh->session->events = NULL;
254    tr_free( eh );
255    tr_dbg( "Closing libevent thread" );
256}
257
258void
259tr_eventInit( tr_session * session )
260{
261    tr_event_handle * eh;
262
263    session->events = NULL;
264
265    eh = tr_new0( tr_event_handle, 1 );
266    eh->lock = tr_lockNew( );
267    pipe( eh->fds );
268    eh->session = session;
269    eh->thread = tr_threadNew( libeventThreadFunc, eh );
270
271    /* wait until the libevent thread is running */
272    while( session->events == NULL )
273        tr_wait_msec( 100 );
274}
275
276void
277tr_eventClose( tr_session * session )
278{
279    assert( tr_isSession( session ) );
280
281    session->events->die = true;
282    tr_deepLog( __FILE__, __LINE__, NULL, "closing trevent pipe" );
283    tr_netCloseSocket( session->events->fds[1] );
284}
285
286/**
287***
288**/
289
290bool
291tr_amInEventThread( const tr_session * session )
292{
293    assert( tr_isSession( session ) );
294    assert( session->events != NULL );
295
296    return tr_amInThread( session->events->thread );
297}
298
299/**
300***
301**/
302
303void
304tr_runInEventThread( tr_session * session,
305                     void func( void* ), void * user_data )
306{
307    assert( tr_isSession( session ) );
308    assert( session->events != NULL );
309
310    if( tr_amInThread( session->events->thread ) )
311    {
312        (func)( user_data );
313    }
314    else
315    {
316        const char         ch = 'r';
317        int                fd = session->events->fds[1];
318        tr_lock *          lock = session->events->lock;
319        struct tr_run_data data;
320
321        tr_lockLock( lock );
322        pipewrite( fd, &ch, 1 );
323        data.func = func;
324        data.user_data = user_data;
325        pipewrite( fd, &data, sizeof( data ) );
326        tr_lockUnlock( lock );
327    }
328}
Note: See TracBrowser for help on using the repository browser.