source: trunk/libtransmission/trevent.c @ 3349

Last change on this file since 3349 was 3349, checked in by charles, 15 years ago

commit more of tiennou's fastpeers patch

  • Property svn:keywords set to Date Rev Author Id
File size: 10.1 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: trevent.c 3349 2007-10-10 16:39:12Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdlib.h>
16#include <string.h>
17#include <stdio.h>
18
19#include <signal.h>
20#include <sys/queue.h> /* for evhttp */
21#include <sys/types.h> /* for evhttp */
22
23#include <event.h>
24#include <evdns.h>
25#include <evhttp.h>
26
27#include "transmission.h"
28#include "list.h"
29#include "platform.h"
30#include "trevent.h"
31#include "utils.h"
32
33/* #define DEBUG */
34#ifdef DEBUG
35#include <stdio.h>
36#undef tr_dbg
37#define tr_dbg( a, b... ) fprintf(stderr, a "\n", ##b )
38#endif
39
40/***
41****
42***/
43
44typedef struct tr_event_handle
45{
46    tr_lock * lock;
47    tr_handle * h;
48    tr_thread * thread;
49    tr_list * commands;
50    struct event_base * base;
51    struct event pulse;
52    struct timeval pulseInterval;
53    uint8_t die;
54
55    int timerCount; 
56}
57tr_event_handle;
58
59#ifdef DEBUG
60static int reads = 0;
61static int writes = 0;
62#endif
63
64enum mode
65{
66    TR_EV_EVHTTP_MAKE_REQUEST,
67    TR_EV_BUFFEREVENT_SET,
68    TR_EV_BUFFEREVENT_WRITE,
69    TR_EV_TIMER_ADD,
70    TR_EV_TIMER_DEL,
71    TR_EV_EXEC
72};
73
74typedef int timer_func(void*);
75
76struct tr_timer
77{
78    struct event event;
79    struct timeval tv;
80    timer_func * func;
81    void * user_data;
82    struct tr_event_handle * eh;
83    uint8_t inCallback;
84};
85
86struct tr_event_command
87{
88    int mode;
89
90    struct tr_timer * timer;
91
92    struct evhttp_connection * evcon;
93    struct evhttp_request * req;
94    enum evhttp_cmd_type evtype;
95    char * uri;
96
97    struct bufferevent * bufev;
98    short enable;
99    short disable;
100    char * buf;
101    size_t buflen;
102
103    void (*func)( void* );
104    void * user_data;
105};
106
107static void
108pumpList( int i UNUSED, short s UNUSED, void * veh )
109{
110    tr_event_handle * eh = veh;
111    int doDie;
112
113    for( ;; )
114    {
115        struct tr_event_command * cmd;
116
117        doDie = eh->die && !eh->timerCount;
118        if( doDie )
119            break;
120
121        /* get the next command */
122        tr_lockLock( eh->lock );
123        cmd = tr_list_pop_front( &eh->commands );
124        tr_lockUnlock( eh->lock );
125        if( cmd == NULL )
126            break;
127
128        /* process the command */
129        switch( cmd->mode )
130        {
131            case TR_EV_TIMER_ADD:
132                timeout_add( &cmd->timer->event, &cmd->timer->tv );
133                ++eh->timerCount;
134                break;
135
136            case TR_EV_TIMER_DEL:
137                event_del( &cmd->timer->event );
138                tr_free( cmd->timer );
139                --eh->timerCount;
140                break;
141
142            case TR_EV_EVHTTP_MAKE_REQUEST:
143                evhttp_make_request( cmd->evcon, cmd->req, cmd->evtype, cmd->uri );
144                tr_free( cmd->uri );
145                break;
146
147           case TR_EV_BUFFEREVENT_SET:
148                bufferevent_enable( cmd->bufev, cmd->enable );
149                bufferevent_disable( cmd->bufev, cmd->disable );
150                break;
151
152            case TR_EV_BUFFEREVENT_WRITE:
153                bufferevent_write( cmd->bufev, cmd->buf, cmd->buflen );
154                tr_free( cmd->buf );
155                break;
156
157            case TR_EV_EXEC:
158                (cmd->func)( cmd->user_data );
159                break;
160
161            default:
162                assert( 0 && "unhandled command type!" );
163        }
164
165        /* cleanup */
166        tr_free( cmd );
167    }
168
169    if( !doDie )
170        timeout_add( &eh->pulse, &eh->pulseInterval );
171    else {
172        assert( eh->timerCount ==  0 );
173        evdns_shutdown( FALSE );
174        event_del( &eh->pulse );
175    }
176}
177
178static void
179logFunc( int severity, const char * message )
180{
181    switch( severity )
182    {
183        case _EVENT_LOG_DEBUG: 
184            tr_dbg( "%s", message );
185            break;
186        case _EVENT_LOG_ERR:
187            tr_err( "%s", message );
188            break;
189        default:
190            tr_inf( "%s", message );
191            break;
192    }
193}
194
195static void
196libeventThreadFunc( void * veh )
197{
198    tr_event_handle * eh = (tr_event_handle *) veh;
199    tr_dbg( "Starting libevent thread" );
200
201#ifndef WIN32
202    /* Don't exit when writing on a broken socket */
203    signal( SIGPIPE, SIG_IGN );
204#endif
205
206    eh->base = event_init( );
207    evdns_init( );
208    timeout_set( &eh->pulse, pumpList, veh );
209    timeout_add( &eh->pulse, &eh->pulseInterval );
210    eh->h->events = eh;
211
212    event_dispatch( );
213
214    tr_lockFree( eh->lock );
215    event_base_free( eh->base );
216
217    eh->h->events = NULL;
218
219    tr_free( eh );
220    tr_dbg( "Closing libevent thread" );
221}
222
223void
224tr_eventInit( tr_handle * handle )
225{
226    tr_event_handle * eh;
227
228    eh = tr_new0( tr_event_handle, 1 );
229    eh->lock = tr_lockNew( );
230    eh->h = handle;
231    eh->pulseInterval = timevalMsec( 50 );
232    eh->thread = tr_threadNew( libeventThreadFunc, eh, "libeventThreadFunc" );
233}
234
235void
236tr_eventClose( tr_handle * handle )
237{
238    tr_event_handle * eh = handle->events;
239
240    tr_lockLock( eh->lock );
241    tr_list_free( &eh->commands, tr_free );
242    eh->die = TRUE;
243    tr_lockUnlock( eh->lock );
244}
245
246/**
247***
248**/
249
250static void
251pushList( struct tr_event_handle * eh, struct tr_event_command * command )
252{
253    tr_lockLock( eh->lock );
254    tr_list_append( &eh->commands, command );
255    tr_lockUnlock( eh->lock );
256}
257
258void
259tr_evhttp_make_request (tr_handle                 * handle,
260                        struct evhttp_connection  * evcon,
261                        struct evhttp_request     * req,
262                        enum   evhttp_cmd_type      type,
263                        char                      * uri)
264{
265    if( tr_amInThread( handle->events->thread ) ) {
266        evhttp_make_request( evcon, req, type, uri );
267        tr_free( uri );
268    } else {
269        struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
270        cmd->mode = TR_EV_EVHTTP_MAKE_REQUEST;
271        cmd->evcon = evcon;
272        cmd->req = req;
273        cmd->evtype = type;
274        cmd->uri = uri;
275        pushList( handle->events, cmd );
276    }
277}
278
279void
280tr_bufferevent_write( tr_handle             * handle,
281                      struct bufferevent    * bufev,
282                      const void            * buf,
283                      size_t                  buflen )
284{
285    if( tr_amInThread( handle->events->thread ) )
286        bufferevent_write( bufev, (void*)buf, buflen );
287    else {
288        struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
289        cmd->mode = TR_EV_BUFFEREVENT_WRITE;
290        cmd->bufev = bufev;
291        cmd->buf = tr_strndup( buf, buflen );
292        cmd->buflen = buflen;
293        pushList( handle->events, cmd );
294    }
295}
296
297void
298tr_setBufferEventMode( struct tr_handle   * handle,
299                       struct bufferevent * bufev,
300                       short                mode_enable,
301                       short                mode_disable )
302{
303    if( tr_amInThread( handle->events->thread ) ) {
304        bufferevent_enable( bufev, mode_enable );
305        bufferevent_disable( bufev, mode_disable );
306    } else {
307        struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
308        cmd->mode = TR_EV_BUFFEREVENT_SET;
309        cmd->bufev = bufev;
310        cmd->enable = mode_enable;
311        cmd->disable = mode_disable;
312        pushList( handle->events, cmd );
313    }
314}
315
316/**
317***
318**/
319
320static int
321timerCompareFunc( const void * va, const void * vb )
322{
323    const struct tr_event_command * a = va;
324    const struct tr_timer * b = vb;
325    return a->timer == b ? 0 : 1;
326}
327
328static void
329timerCallback( int fd UNUSED, short event UNUSED, void * vtimer )
330{
331    int more;
332    struct tr_timer * timer = vtimer;
333    void * del;
334
335    del = tr_list_remove( &timer->eh->commands, timer, timerCompareFunc );
336
337    if( del != NULL ) /* there's a TIMER_DEL command queued for this timer... */
338        more = FALSE;
339    else {
340        timer->inCallback = 1;
341        more = (*timer->func)( timer->user_data );
342        timer->inCallback = 0;
343    }
344
345    if( more )
346        timeout_add( &timer->event, &timer->tv );
347    else
348        tr_timerFree( &timer );
349
350    tr_free( del );
351}
352
353void
354tr_timerFree( tr_timer ** ptimer )
355{
356    tr_timer * timer;
357
358    /* zero out the argument passed in */
359    assert( ptimer );
360    timer = *ptimer;
361    *ptimer = NULL;
362
363    /* destroy the timer directly or via the command queue */
364    if( timer!=NULL && !timer->inCallback ) {
365        if( tr_amInThread( timer->eh->thread ) ) {
366            void * del = tr_list_remove( &timer->eh->commands, timer, timerCompareFunc );
367            --timer->eh->timerCount;
368            event_del( &timer->event );
369            tr_free( timer );
370            tr_free( del );
371        } else {
372            struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
373            cmd->mode = TR_EV_TIMER_DEL;
374            cmd->timer = timer;
375            pushList( timer->eh, cmd );
376        }
377    }
378}
379
380tr_timer*
381tr_timerNew( struct tr_handle * handle,
382             timer_func         func,
383             void             * user_data,
384             int                timeout_milliseconds )
385{
386    tr_timer * timer = tr_new0( tr_timer, 1 );
387    timer->tv = timevalMsec( timeout_milliseconds );
388    timer->func = func;
389    timer->user_data = user_data;
390    timer->eh = handle->events;
391    timeout_set( &timer->event, timerCallback, timer );
392
393    if( tr_amInThread( handle->events->thread ) ) {
394        timeout_add( &timer->event,  &timer->tv );
395        ++handle->events->timerCount;
396    } else {
397        struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
398        cmd->mode = TR_EV_TIMER_ADD;
399        cmd->timer = timer;
400        pushList( handle->events, cmd );
401    }
402
403    return timer;
404}
405
406void
407tr_runInEventThread( struct tr_handle * handle,
408                     void               func( void* ),
409                     void             * user_data )
410{
411    if( tr_amInThread( handle->events->thread ) )
412        (func)( user_data );
413    else {
414        struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
415        cmd->mode = TR_EV_EXEC;
416        cmd->func = func;
417        cmd->user_data = user_data;
418        pushList( handle->events, cmd );
419    }
420}
Note: See TracBrowser for help on using the repository browser.