source: trunk/libtransmission/trevent.c @ 12184

Last change on this file since 12184 was 12184, checked in by jordan, 11 years ago

(trunk libT) better shutdown management of libutp and UDP trackers in tr_sessionClose().

This is a little overlapping since the utp code can be closed more-or-less immediately, but the udp manager needs to stay open in order to process the udp tracker connection requests before sending out event=stopped. Moreover DNS resolver can be shut down after the UDP tracker is shutdown.

  • Property svn:keywords set to Date Rev Author Id
File size: 7.6 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: trevent.c 12184 2011-03-17 18:51:31Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdlib.h>
16#include <string.h>
17#include <stdio.h>
18
19#include <signal.h>
20
21#include <event2/dns.h>
22#include <event2/event.h>
23
24#include "transmission.h"
25#include "net.h"
26#include "session.h"
27
28#ifdef WIN32
29
30#include "utils.h"
31#include <winsock2.h>
32
33static int
34pgpipe( int handles[2] )
35{
36    SOCKET s;
37    struct sockaddr_in serv_addr;
38    int len = sizeof( serv_addr );
39
40    handles[0] = handles[1] = INVALID_SOCKET;
41
42    if ( ( s = socket( AF_INET, SOCK_STREAM, 0 ) ) == INVALID_SOCKET )
43    {
44        tr_dbg("pgpipe failed to create socket: %ui", WSAGetLastError());
45        return -1;
46    }
47
48    memset( &serv_addr, 0, sizeof( serv_addr ) );
49    serv_addr.sin_family = AF_INET;
50    serv_addr.sin_port = htons(0);
51    serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
52    if (bind(s, (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
53    {
54        tr_dbg("pgpipe failed to bind: %ui", WSAGetLastError());
55        closesocket(s);
56        return -1;
57    }
58    if (listen(s, 1) == SOCKET_ERROR)
59    {
60        tr_ndbg("event","pgpipe failed to listen: %ui", WSAGetLastError());
61        closesocket(s);
62        return -1;
63    }
64    if (getsockname(s, (SOCKADDR *) & serv_addr, &len) == SOCKET_ERROR)
65    {
66        tr_dbg("pgpipe failed to getsockname: %ui", WSAGetLastError());
67        closesocket(s);
68        return -1;
69    }
70    if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
71    {
72        tr_dbg("pgpipe failed to create socket 2: %ui", WSAGetLastError());
73        closesocket(s);
74        return -1;
75    }
76
77    if (connect(handles[1], (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
78    {
79        tr_dbg("pgpipe failed to connect socket: %ui", WSAGetLastError());
80        closesocket(s);
81        return -1;
82    }
83    if ((handles[0] = accept(s, (SOCKADDR *) & serv_addr, &len)) == INVALID_SOCKET)
84    {
85        tr_dbg("pgpipe failed to accept socket: %ui", WSAGetLastError());
86        closesocket(handles[1]);
87        handles[1] = INVALID_SOCKET;
88        closesocket(s);
89        return -1;
90    }
91    closesocket(s);
92    return 0;
93}
94
95static int
96piperead( int s, char *buf, int len )
97{
98    int ret = recv(s, buf, len, 0);
99
100    if (ret < 0) {
101        const int werror= WSAGetLastError();
102        switch(werror) {
103          /* simplified error mapping (not valid for connect) */
104            case WSAEWOULDBLOCK:
105                errno = EAGAIN;
106                break;
107            case WSAECONNRESET:
108                /* EOF on the pipe! (win32 socket based implementation) */
109                ret = 0;
110                /* fall through */
111            default:
112                errno = werror;
113                break;
114        }
115    } else
116        errno = 0;
117    return ret;
118}
119
120#define pipe(a) pgpipe(a)
121#define pipewrite(a,b,c) send(a,(char*)b,c,0)
122
123#else
124#define piperead(a,b,c) read(a,b,c)
125#define pipewrite(a,b,c) write(a,b,c)
126#endif
127
128#include <unistd.h>
129
130#include "transmission.h"
131#include "platform.h"
132#include "trevent.h"
133#include "utils.h"
134
135/***
136****
137***/
138
139typedef struct tr_event_handle
140{
141    uint8_t      die;
142    int          fds[2];
143    tr_lock *    lock;
144    tr_session *  session;
145    tr_thread *  thread;
146    struct event_base * base;
147    struct event * pipeEvent;
148}
149tr_event_handle;
150
151struct tr_run_data
152{
153    void    ( *func )( void * );
154    void *  user_data;
155};
156
157#define dbgmsg( ... ) \
158    do { \
159        if( tr_deepLoggingIsActive( ) ) \
160            tr_deepLog( __FILE__, __LINE__, "event", __VA_ARGS__ ); \
161    } while( 0 )
162
163static void
164readFromPipe( int    fd,
165              short  eventType,
166              void * veh )
167{
168    char              ch;
169    int               ret;
170    tr_event_handle * eh = veh;
171
172    dbgmsg( "readFromPipe: eventType is %hd", eventType );
173
174    /* read the command type */
175    ch = '\0';
176    do
177    {
178        ret = piperead( fd, &ch, 1 );
179    }
180    while( !eh->die && ret < 0 && errno == EAGAIN );
181
182    dbgmsg( "command is [%c], ret is %d, errno is %d", ch, ret, (int)errno );
183
184    switch( ch )
185    {
186        case 'r': /* run in libevent thread */
187        {
188            struct tr_run_data data;
189            const size_t       nwant = sizeof( data );
190            const ssize_t      ngot = piperead( fd, &data, nwant );
191            if( !eh->die && ( ngot == (ssize_t)nwant ) )
192            {
193                dbgmsg( "invoking function in libevent thread" );
194                ( data.func )( data.user_data );
195            }
196            break;
197        }
198
199        case '\0': /* eof */
200        {
201            dbgmsg( "pipe eof reached... removing event listener" );
202            event_free( eh->pipeEvent );
203            break;
204        }
205
206        default:
207        {
208            assert( 0 && "unhandled command type!" );
209            break;
210        }
211    }
212}
213
214static void
215logFunc( int severity, const char * message )
216{
217    if( severity >= _EVENT_LOG_ERR )
218        tr_err( "%s", message );
219    else
220        tr_dbg( "%s", message );
221}
222
223static void
224libeventThreadFunc( void * veh )
225{
226    struct event_base * base;
227    tr_event_handle * eh = veh;
228
229#ifndef WIN32
230    /* Don't exit when writing on a broken socket */
231    signal( SIGPIPE, SIG_IGN );
232#endif
233
234    /* create the libevent bases */
235    base = event_base_new( );
236
237    /* set the struct's fields */
238    eh->base = base;
239    eh->session->event_base = base;
240    eh->session->evdns_base = evdns_base_new( base, TRUE );
241    eh->session->events = eh;
242
243    /* listen to the pipe's read fd */
244    eh->pipeEvent = event_new( base, eh->fds[0], EV_READ | EV_PERSIST, readFromPipe, veh );
245    event_add( eh->pipeEvent, NULL );
246    event_set_log_callback( logFunc );
247
248    /* loop until all the events are done */
249    while( !eh->die )
250        event_base_dispatch( base );
251
252    /* shut down the thread */
253    tr_lockFree( eh->lock );
254    event_base_free( base );
255    eh->session->events = NULL;
256    tr_free( eh );
257    tr_dbg( "Closing libevent thread" );
258}
259
260void
261tr_eventInit( tr_session * session )
262{
263    tr_event_handle * eh;
264
265    session->events = NULL;
266
267    eh = tr_new0( tr_event_handle, 1 );
268    eh->lock = tr_lockNew( );
269    pipe( eh->fds );
270    eh->session = session;
271    eh->thread = tr_threadNew( libeventThreadFunc, eh );
272
273    /* wait until the libevent thread is running */
274    while( session->events == NULL )
275        tr_wait_msec( 100 );
276}
277
278void
279tr_eventClose( tr_session * session )
280{
281    assert( tr_isSession( session ) );
282
283    session->events->die = TRUE;
284    tr_deepLog( __FILE__, __LINE__, NULL, "closing trevent pipe" );
285    tr_netCloseSocket( session->events->fds[1] );
286}
287
288/**
289***
290**/
291
292tr_bool
293tr_amInEventThread( const tr_session * session )
294{
295    assert( tr_isSession( session ) );
296    assert( session->events != NULL );
297
298    return tr_amInThread( session->events->thread );
299}
300
301/**
302***
303**/
304
305void
306tr_runInEventThread( tr_session * session,
307                     void func( void* ), void * user_data )
308{
309    assert( tr_isSession( session ) );
310    assert( session->events != NULL );
311
312    if( tr_amInThread( session->events->thread ) )
313    {
314        (func)( user_data );
315    }
316    else
317    {
318        const char         ch = 'r';
319        int                fd = session->events->fds[1];
320        tr_lock *          lock = session->events->lock;
321        struct tr_run_data data;
322
323        tr_lockLock( lock );
324        pipewrite( fd, &ch, 1 );
325        data.func = func;
326        data.user_data = user_data;
327        pipewrite( fd, &data, sizeof( data ) );
328        tr_lockUnlock( lock );
329    }
330}
Note: See TracBrowser for help on using the repository browser.