source: trunk/libtransmission/trevent.c @ 8050

Last change on this file since 8050 was 7824, checked in by charles, 12 years ago

(trunk libT) #1748: possible fix for the kqueue corruption errors by consolidating the three per-torrent libevent timers into three session-wide timers. Since most people reporting this error have lots of torrents loaded, consider a hypothetical example: if you had 500 torrents, this patch will reduce 1,500 libevent timers down to just three timers. On top of that, those three have simpler life cycles too...

  • Property svn:keywords set to Date Rev Author Id
File size: 9.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: trevent.c 7824 2009-02-04 16:58:52Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdlib.h>
16#include <string.h>
17#include <stdio.h>
18
19#include <signal.h>
20
21#include "transmission.h"
22#include "session.h"
23
24#ifdef WIN32
25
26#include <WinSock2.h> 
27 
28static int 
29pgpipe( int handles[2] ) 
30{
31        SOCKET s;
32        struct sockaddr_in serv_addr;
33        int len = sizeof( serv_addr );
34 
35        handles[0] = handles[1] = INVALID_SOCKET;
36 
37        if ( ( s = socket( AF_INET, SOCK_STREAM, 0 ) ) == INVALID_SOCKET )
38        {
39/*              ereport(LOG, (errmsg_internal("pgpipe failed to create socket: %ui", WSAGetLastError()))); */
40                return -1;
41        }
42 
43        memset( &serv_addr, 0, sizeof( serv_addr ) );
44        serv_addr.sin_family = AF_INET;
45        serv_addr.sin_port = htons(0);
46        serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
47        if (bind(s, (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
48        {
49/*              ereport(LOG, (errmsg_internal("pgpipe failed to bind: %ui", WSAGetLastError()))); */
50                closesocket(s);
51                return -1;
52        }
53        if (listen(s, 1) == SOCKET_ERROR)
54        {
55/*              ereport(LOG, (errmsg_internal("pgpipe failed to listen: %ui", WSAGetLastError()))); */
56                closesocket(s);
57                return -1;
58        }
59        if (getsockname(s, (SOCKADDR *) & serv_addr, &len) == SOCKET_ERROR)
60        {
61/*              ereport(LOG, (errmsg_internal("pgpipe failed to getsockname: %ui", WSAGetLastError()))); */
62                closesocket(s);
63                return -1;
64        }
65        if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
66        {
67/*              ereport(LOG, (errmsg_internal("pgpipe failed to create socket 2: %ui", WSAGetLastError()))); */
68                closesocket(s);
69                return -1;
70        }
71 
72        if (connect(handles[1], (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
73        {
74/*              ereport(LOG, (errmsg_internal("pgpipe failed to connect socket: %ui", WSAGetLastError()))); */
75                closesocket(s);
76                return -1;
77        }
78        if ((handles[0] = accept(s, (SOCKADDR *) & serv_addr, &len)) == INVALID_SOCKET)
79        {
80/*              ereport(LOG, (errmsg_internal("pgpipe failed to accept socket: %ui", WSAGetLastError()))); */
81                closesocket(handles[1]);
82                handles[1] = INVALID_SOCKET;
83                closesocket(s);
84                return -1;
85        }
86        closesocket(s);
87        return 0;
88}
89 
90static int 
91piperead( int s, char *buf, int len ) 
92{ 
93        int ret = recv(s, buf, len, 0); 
94 
95        if (ret < 0 && WSAGetLastError() == WSAECONNRESET) 
96                /* EOF on the pipe! (win32 socket based implementation) */ 
97                ret = 0; 
98        return ret; 
99} 
100 
101#define pipe(a) pgpipe(a)
102#define pipewrite(a,b,c) send(a,(char*)b,c,0)
103
104#else
105#define piperead(a,b,c) read(a,b,c)
106#define pipewrite(a,b,c) write(a,b,c)
107#endif
108
109#include <unistd.h> 
110
111#include <event.h>
112
113#include "transmission.h"
114#include "platform.h"
115#include "trevent.h"
116#include "utils.h"
117
118/***
119****
120***/
121
122typedef struct tr_event_handle
123{
124    uint8_t      die;
125    int          fds[2];
126    tr_lock *    lock;
127    tr_session *  session;
128    tr_thread *  thread;
129    struct event_base * base;
130    struct event pipeEvent;
131}
132tr_event_handle;
133
134typedef int timer_func ( void* );
135
136struct tr_timer
137{
138    tr_bool                   inCallback;
139    timer_func *              func;
140    void *                    user_data;
141    struct tr_event_handle *  eh;
142    struct timeval            tv;
143    struct event              event;
144};
145
146struct tr_run_data
147{
148    void    ( *func )( void * );
149    void *  user_data;
150};
151
152#define dbgmsg( ... ) \
153    do { \
154        if( tr_deepLoggingIsActive( ) ) \
155            tr_deepLog( __FILE__, __LINE__, "event", __VA_ARGS__ ); \
156    } while( 0 )
157
158static void
159readFromPipe( int    fd,
160              short  eventType,
161              void * veh )
162{
163    char              ch;
164    int               ret;
165    tr_event_handle * eh = veh;
166
167    dbgmsg( "readFromPipe: eventType is %hd", eventType );
168
169    /* read the command type */
170    ch = '\0';
171    do
172    {
173        ret = piperead( fd, &ch, 1 );
174    }
175    while( !eh->die && ret < 0 && errno == EAGAIN );
176
177    dbgmsg( "command is [%c], ret is %d, errno is %d", ch, ret, (int)errno );
178
179    switch( ch )
180    {
181        case 'r': /* run in libevent thread */
182        {
183            struct tr_run_data data;
184            const size_t       nwant = sizeof( data );
185            const ssize_t      ngot = piperead( fd, &data, nwant );
186            if( !eh->die && ( ngot == (ssize_t)nwant ) )
187            {
188                dbgmsg( "invoking function in libevent thread" );
189                ( data.func )( data.user_data );
190            }
191            break;
192        }
193
194        case '\0': /* eof */
195        {
196            dbgmsg( "pipe eof reached... removing event listener" );
197            event_del( &eh->pipeEvent );
198            break;
199        }
200
201        default:
202        {
203            assert( 0 && "unhandled command type!" );
204            break;
205        }
206    }
207}
208
209static void
210logFunc( int severity, const char * message )
211{
212    if( severity >= _EVENT_LOG_ERR )
213        tr_err( "%s", message );
214    else
215        tr_dbg( "%s", message );
216}
217
218static void
219libeventThreadFunc( void * veh )
220{
221    tr_event_handle * eh = veh;
222    tr_dbg( "Starting libevent thread" );
223
224#ifndef WIN32
225    /* Don't exit when writing on a broken socket */
226    signal( SIGPIPE, SIG_IGN );
227#endif
228
229    eh->base = event_init( );
230    eh->session->events = eh;
231
232    /* listen to the pipe's read fd */
233    event_set( &eh->pipeEvent, eh->fds[0], EV_READ | EV_PERSIST, readFromPipe, veh );
234    event_add( &eh->pipeEvent, NULL );
235    event_set_log_callback( logFunc );
236
237    /* loop until all the events are done */
238    event_dispatch( );
239
240    /* shut down the thread */
241    tr_lockFree( eh->lock );
242    event_base_free( eh->base );
243    eh->session->events = NULL;
244    tr_free( eh );
245    tr_dbg( "Closing libevent thread" );
246}
247
248void
249tr_eventInit( tr_session * session )
250{
251    tr_event_handle * eh;
252
253    session->events = NULL;
254
255    eh = tr_new0( tr_event_handle, 1 );
256    eh->lock = tr_lockNew( );
257    pipe( eh->fds );
258    eh->session = session;
259    eh->thread = tr_threadNew( libeventThreadFunc, eh );
260
261    /* wait until the libevent thread is running */
262    while( session->events == NULL )
263        tr_wait( 100 );
264}
265
266void
267tr_eventClose( tr_session * session )
268{
269    assert( tr_isSession( session ) );
270
271    session->events->die = TRUE;
272    tr_deepLog( __FILE__, __LINE__, NULL, "closing trevent pipe" );
273    EVUTIL_CLOSESOCKET( session->events->fds[1] );
274}
275
276/**
277***
278**/
279
280tr_bool
281tr_amInEventThread( tr_session * session )
282{
283    assert( tr_isSession( session ) );
284    assert( session->events != NULL );
285
286    return tr_amInThread( session->events->thread );
287}
288
289/**
290***
291**/
292
293static void
294timerCallback( int    fd UNUSED,
295               short  event UNUSED,
296               void * vtimer )
297{
298    int               more;
299    struct tr_timer * timer = vtimer;
300
301    timer->inCallback = 1;
302    more = ( *timer->func )( timer->user_data );
303    timer->inCallback = 0;
304
305    if( more )
306        evtimer_add( &timer->event, &timer->tv );
307    else
308        tr_timerFree( &timer );
309}
310
311void
312tr_timerFree( tr_timer ** ptimer )
313{
314    tr_timer * timer;
315
316    /* zero out the argument passed in */
317    assert( ptimer );
318    timer = *ptimer;
319    *ptimer = NULL;
320
321    /* destroy the timer directly or via the command queue */
322    if( timer && !timer->inCallback )
323    {
324        assert( tr_amInEventThread( timer->eh->session ) );
325        event_del( &timer->event );
326        tr_free( timer );
327    }
328}
329
330tr_timer*
331tr_timerNew( tr_session  * session,
332             timer_func    func,
333             void        * user_data,
334             uint64_t      interval_milliseconds )
335{
336    tr_timer * timer;
337
338    assert( tr_amInEventThread( session ) );
339
340    timer = tr_new0( tr_timer, 1 );
341    timer->func = func;
342    timer->user_data = user_data;
343    timer->eh = session->events;
344
345    tr_timevalMsec( interval_milliseconds, &timer->tv );
346    evtimer_set( &timer->event, timerCallback, timer );
347    evtimer_add( &timer->event,  &timer->tv );
348
349    return timer;
350}
351
352void
353tr_runInEventThread( tr_session * session,
354                     void func( void* ), void * user_data )
355{
356    assert( tr_isSession( session ) );
357    assert( session->events != NULL );
358
359    if( tr_amInThread( session->events->thread ) )
360    {
361        (func)( user_data );
362    }
363    else
364    {
365        const char         ch = 'r';
366        int                fd = session->events->fds[1];
367        tr_lock *          lock = session->events->lock;
368        struct tr_run_data data;
369
370        tr_lockLock( lock );
371        pipewrite( fd, &ch, 1 );
372        data.func = func;
373        data.user_data = user_data;
374        pipewrite( fd, &data, sizeof( data ) );
375        tr_lockUnlock( lock );
376    }
377}
378
379struct event_base *
380tr_eventGetBase( tr_session * session )
381{
382    assert( tr_isSession( session ) );
383
384    return session->events->base;
385}
Note: See TracBrowser for help on using the repository browser.