source: trunk/libtransmission/trevent.c @ 3217

Last change on this file since 3217 was 3217, checked in by charles, 15 years ago

fix a couple of memory corruption errors while trying to track down tiennou's report on peer-msgs.c:pulse() -> inout.c:163 assertion failure.

  • Property svn:keywords set to Date Rev Author Id
File size: 10.9 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: trevent.c 3217 2007-09-28 14:27:56Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdlib.h>
16#include <string.h>
17#include <stdio.h>
18
19#include <signal.h>
20#include <sys/queue.h> /* for evhttp */
21#include <sys/types.h> /* for evhttp */
22
23#include <event.h>
24#include <evdns.h>
25#include <evhttp.h>
26
27#include "transmission.h"
28#include "list.h"
29#include "platform.h"
30#include "trevent.h"
31#include "utils.h"
32
33/* #define DEBUG */
34#ifdef DEBUG
35#include <stdio.h>
36#undef tr_dbg
37#define tr_dbg( a, b... ) fprintf(stderr, a "\n", ##b )
38#endif
39
40/***
41****
42***/
43
44typedef struct tr_event_handle
45{
46    tr_lock * lock;
47    tr_handle * h;
48    tr_thread * thread;
49    tr_list * commands;
50    struct event_base * base;
51    struct event pulse;
52    struct timeval pulseInterval;
53    uint8_t die;
54
55    int timerCount; 
56}
57tr_event_handle;
58
59#ifdef DEBUG
60static int reads = 0;
61static int writes = 0;
62#endif
63
64enum mode
65{
66    TR_EV_EVHTTP_MAKE_REQUEST,
67    TR_EV_BUFFEREVENT_SET,
68    TR_EV_BUFFEREVENT_WRITE,
69    TR_EV_BUFFEREVENT_FREE,
70    TR_EV_TIMER_ADD,
71    TR_EV_TIMER_DEL,
72    TR_EV_EXEC
73};
74
75typedef int timer_func(void*);
76
77struct tr_timer
78{
79    struct event event;
80    struct timeval tv;
81    timer_func * func;
82    void * user_data;
83    struct tr_event_handle * eh;
84    uint8_t inCallback;
85};
86
87struct tr_event_command
88{
89    int mode;
90
91    struct tr_timer * timer;
92
93    struct evhttp_connection * evcon;
94    struct evhttp_request * req;
95    enum evhttp_cmd_type evtype;
96    char * uri;
97
98    struct bufferevent * bufev;
99    short enable;
100    short disable;
101    char * buf;
102    size_t buflen;
103
104    void (*func)( void* );
105    void * user_data;
106};
107
108static void
109pumpList( int i UNUSED, short s UNUSED, void * veh )
110{
111    tr_event_handle * eh = veh;
112
113    while( !eh->die )
114    {
115        struct tr_event_command * cmd;
116
117        /* get the next command */
118        tr_lockLock( eh->lock );
119        cmd = tr_list_pop_front( &eh->commands );
120        tr_lockUnlock( eh->lock );
121        if( cmd == NULL )
122            break;
123
124        /* process the command */
125        switch( cmd->mode )
126        {
127            case TR_EV_TIMER_ADD:
128                timeout_add( &cmd->timer->event, &cmd->timer->tv );
129                ++eh->timerCount;
130                break;
131
132            case TR_EV_TIMER_DEL:
133                event_del( &cmd->timer->event );
134                tr_free( cmd->timer );
135                --eh->timerCount;
136                break;
137
138            case TR_EV_EVHTTP_MAKE_REQUEST:
139                evhttp_make_request( cmd->evcon, cmd->req, cmd->evtype, cmd->uri );
140                tr_free( cmd->uri );
141                break;
142
143           case TR_EV_BUFFEREVENT_SET:
144                bufferevent_enable( cmd->bufev, cmd->enable );
145                bufferevent_disable( cmd->bufev, cmd->disable );
146                break;
147
148            case TR_EV_BUFFEREVENT_WRITE:
149                bufferevent_write( cmd->bufev, cmd->buf, cmd->buflen );
150                tr_free( cmd->buf );
151                break;
152
153            case TR_EV_BUFFEREVENT_FREE:
154                bufferevent_free( cmd->bufev );
155                break;
156
157            case TR_EV_EXEC:
158                (cmd->func)( cmd->user_data );
159                break;
160
161            default:
162                assert( 0 && "unhandled command type!" );
163        }
164
165        /* cleanup */
166        tr_free( cmd );
167    }
168
169    if( !eh->die )
170        timeout_add( &eh->pulse, &eh->pulseInterval );
171    else {
172        assert( eh->timerCount ==  0 );
173        evdns_shutdown( FALSE );
174        event_del( &eh->pulse );
175    }
176}
177
178static void
179logFunc( int severity, const char * message )
180{
181    switch( severity )
182    {
183        case _EVENT_LOG_DEBUG: 
184            tr_dbg( "%s", message );
185            break;
186        case _EVENT_LOG_ERR:
187            tr_err( "%s", message );
188            break;
189        default:
190            tr_inf( "%s", message );
191            break;
192    }
193}
194
195static void
196libeventThreadFunc( void * veh )
197{
198    tr_event_handle * eh = (tr_event_handle *) veh;
199    tr_dbg( "Starting libevent thread" );
200
201#ifndef WIN32
202    /* Don't exit when writing on a broken socket */
203    signal( SIGPIPE, SIG_IGN );
204#endif
205
206    eh->base = event_init( );
207    //event_set_log_callback( logFunc );
208    evdns_init( );
209    timeout_set( &eh->pulse, pumpList, veh );
210    timeout_add( &eh->pulse, &eh->pulseInterval );
211    eh->h->events = eh;
212
213    event_dispatch( );
214
215    tr_lockFree( eh->lock );
216    event_base_free( eh->base );
217
218    eh->h->events = NULL;
219
220    tr_free( eh );
221    tr_dbg( "Closing libevent thread" );
222}
223
224void
225tr_eventInit( tr_handle * handle )
226{
227    tr_event_handle * eh;
228
229    eh = tr_new0( tr_event_handle, 1 );
230    eh->lock = tr_lockNew( );
231    eh->h = handle;
232    eh->pulseInterval = timevalMsec( 50 );
233    eh->thread = tr_threadNew( libeventThreadFunc, eh, "libeventThreadFunc" );
234}
235
236void
237tr_eventClose( tr_handle * handle )
238{
239    tr_event_handle * eh = handle->events;
240
241    tr_lockLock( eh->lock );
242    tr_list_free( &eh->commands, tr_free );
243    eh->die = TRUE;
244    tr_lockUnlock( eh->lock );
245
246    //event_base_loopexit( eh->base, NULL );
247}
248
249/**
250***
251**/
252
253static void
254pushList( struct tr_event_handle * eh, struct tr_event_command * command )
255{
256    tr_lockLock( eh->lock );
257    tr_list_append( &eh->commands, command );
258    tr_lockUnlock( eh->lock );
259}
260
261void
262tr_evhttp_make_request (tr_handle                 * handle,
263                        struct evhttp_connection  * evcon,
264                        struct evhttp_request     * req,
265                        enum   evhttp_cmd_type      type,
266                        char                      * uri)
267{
268    if( tr_amInThread( handle->events->thread ) ) {
269        evhttp_make_request( evcon, req, type, uri );
270        tr_free( uri );
271    } else {
272        struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
273        cmd->mode = TR_EV_EVHTTP_MAKE_REQUEST;
274        cmd->evcon = evcon;
275        cmd->req = req;
276        cmd->evtype = type;
277        cmd->uri = uri;
278        pushList( handle->events, cmd );
279    }
280}
281
282void
283tr_bufferevent_write( tr_handle             * handle,
284                      struct bufferevent    * bufev,
285                      const void            * buf,
286                      size_t                  buflen )
287{
288    if( tr_amInThread( handle->events->thread ) )
289        bufferevent_write( bufev, (void*)buf, buflen );
290    else {
291        struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
292        cmd->mode = TR_EV_BUFFEREVENT_WRITE;
293        cmd->bufev = bufev;
294        cmd->buf = tr_strndup( buf, buflen );
295        cmd->buflen = buflen;
296        pushList( handle->events, cmd );
297    }
298}
299
300void
301tr_setBufferEventMode( struct tr_handle   * handle,
302                       struct bufferevent * bufev,
303                       short                mode_enable,
304                       short                mode_disable )
305{
306    if( tr_amInThread( handle->events->thread ) ) {
307        bufferevent_enable( bufev, mode_enable );
308        bufferevent_disable( bufev, mode_disable );
309    } else {
310        struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
311        cmd->mode = TR_EV_BUFFEREVENT_SET;
312        cmd->bufev = bufev;
313        cmd->enable = mode_enable;
314        cmd->disable = mode_disable;
315        pushList( handle->events, cmd );
316    }
317}
318
319/**
320***
321**/
322
323static int
324timerCompareFunc( const void * va, const void * vb )
325{
326    const struct tr_event_command * a = va;
327    const struct tr_timer * b = vb;
328    return a->timer == b ? 0 : 1;
329}
330
331static void
332timerCallback( int fd UNUSED, short event UNUSED, void * vtimer )
333{
334    int more;
335    struct tr_timer * timer = vtimer;
336    void * del;
337
338    del = tr_list_remove( &timer->eh->commands, timer, timerCompareFunc );
339
340    if( del != NULL ) /* there's a TIMER_DEL command queued for this timer... */
341        more = FALSE;
342    else {
343        timer->inCallback = 1;
344        more = (*timer->func)( timer->user_data );
345        timer->inCallback = 0;
346    }
347
348    if( more )
349        timeout_add( &timer->event, &timer->tv );
350    else
351        tr_timerFree( &timer );
352}
353
354void
355tr_timerFree( tr_timer ** ptimer )
356{
357    tr_timer * timer;
358
359    /* zero out the argument passed in */
360    assert( ptimer );
361    timer = *ptimer;
362    *ptimer = NULL;
363
364    /* destroy the timer directly or via the command queue */
365    if( timer!=NULL && !timer->inCallback ) {
366        if( tr_amInThread( timer->eh->thread ) ) {
367            --timer->eh->timerCount;
368            event_del( &timer->event );
369            tr_free( timer );
370        } else {
371            struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
372            cmd->mode = TR_EV_TIMER_DEL;
373            cmd->timer = timer;
374            pushList( timer->eh, cmd );
375        }
376    }
377}
378
379tr_timer*
380tr_timerNew( struct tr_handle * handle,
381             timer_func         func,
382             void             * user_data,
383             int                timeout_milliseconds )
384{
385    tr_timer * timer = tr_new0( tr_timer, 1 );
386    timer->tv = timevalMsec( timeout_milliseconds );
387    timer->func = func;
388    timer->user_data = user_data;
389    timer->eh = handle->events;
390    timeout_set( &timer->event, timerCallback, timer );
391
392    if( tr_amInThread( handle->events->thread ) ) {
393        timeout_add( &timer->event,  &timer->tv );
394        ++handle->events->timerCount;
395    } else {
396        struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
397        cmd->mode = TR_EV_TIMER_ADD;
398        cmd->timer = timer;
399        pushList( handle->events, cmd );
400    }
401
402    return timer;
403}
404
405void
406tr_runInEventThread( struct tr_handle * handle,
407                     void               func( void* ),
408                     void             * user_data )
409{
410    if( tr_amInThread( handle->events->thread ) )
411        (func)( user_data );
412    else {
413        struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
414        cmd->mode = TR_EV_EXEC;
415        cmd->func = func;
416        cmd->user_data = user_data;
417        pushList( handle->events, cmd );
418    }
419}
420
421
422/**
423***
424**/
425
426static int
427bufCompareFunc( const void * va, const void * vb )
428{
429    const struct tr_event_command * a = va;
430    const struct bufferevent * b = vb;
431    return a->bufev == b ? 0 : 1;
432}
433
434void
435tr_bufferevent_free( struct tr_handle   * handle,
436                     struct bufferevent * bufev )
437{
438    void * v;
439    tr_event_handle * eh = handle->events;
440
441    /* purge pending commands from the list */
442    tr_lockLock( eh->lock );
443    while(( v = tr_list_remove( &eh->commands, bufev, bufCompareFunc ) ))
444        tr_free( v );
445    tr_lockUnlock( eh->lock );
446
447    if( tr_amInThread( handle->events->thread ) )
448        bufferevent_free( bufev );
449    else {
450        struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
451        cmd->mode = TR_EV_BUFFEREVENT_FREE;
452        cmd->bufev = bufev;
453        pushList( handle->events, cmd );
454    }
455}
Note: See TracBrowser for help on using the repository browser.