Changeset 3020


Ignore:
Timestamp:
Sep 10, 2007, 5:21:54 AM (14 years ago)
Author:
charles
Message:

reimplement trevent using a list, rather than a pipe, so that we can destroy enqueued commands when we destroy the io channel that goes with them.

Location:
branches/encryption/libtransmission
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • branches/encryption/libtransmission/list.c

    r2987 r3020  
    121121
    122122void*
     123tr_list_pop_front( tr_list ** list )
     124{
     125    void * ret = NULL;
     126    if( *list != NULL )
     127    {
     128        ret = (*list)->data;
     129        tr_list_remove_node( list, *list );
     130    }
     131    return ret;
     132}
     133
     134void*
    123135tr_list_remove_data ( tr_list ** list, const void * data )
    124136{
  • branches/encryption/libtransmission/list.h

    r2987 r3020  
    3535                                     void             * data );
    3636
     37void*       tr_list_pop_front      ( tr_list         ** list );
     38
    3739void*       tr_list_remove_data    ( tr_list         ** list,
    3840                                     const void       * data );
  • branches/encryption/libtransmission/trevent.c

    r3018 r3020  
    2020#include <sys/types.h> /* for evhttp */
    2121
    22 #ifdef WIN32
    23   #include <fcntl.h>
    24   #define pipe(f) _pipe(f, 1000, _O_BINARY)
    25 #else
    26   #include <unistd.h>
    27 #endif
    28 
    2922#include <event.h>
    3023#include <evdns.h>
     
    3225
    3326#include "transmission.h"
     27#include "list.h"
    3428#include "platform.h"
    3529#include "utils.h"
     
    4842typedef struct tr_event_handle
    4943{
    50     int fds[2];
    5144    tr_lock_t * lock;
    5245    tr_handle_t * h;
    5346    tr_thread_t * thread;
     47    tr_list * commands;
    5448    struct event_base * base;
    55     struct event pipeEvent;
     49    struct event pulse;
     50    struct timeval pulseInterval;
    5651}
    5752tr_event_handle;
     
    6257#endif
    6358
    64 
    65 void
    66 readFromPipe( int fd, short eventType UNUSED, void * unused UNUSED )
    67 {
    68     char ch;
    69     int ret;
     59enum mode
     60{
     61   TR_EV_EVENT_DEL,
     62   TR_EV_EVENT_ADD,
     63   TR_EV_EVHTTP_MAKE_REQUEST,
     64   TR_EV_BUFFEREVENT_SET,
     65   TR_EV_BUFFEREVENT_WRITE,
     66   TR_EV_BUFFEREVENT_FREE
     67};
     68
     69struct tr_event_command
     70{
     71    int mode;
     72
    7073    struct event * event;
    7174    struct timeval interval;
     75
    7276    struct evhttp_connection * evcon;
    7377    struct evhttp_request * req;
    74     enum evhttp_cmd_type type;
     78    enum evhttp_cmd_type evtype;
    7579    char * uri;
     80
     81    struct bufferevent * bufev;
     82    short enable;
     83    short disable;
    7684    char * buf;
    7785    size_t buflen;
    78     struct bufferevent * bufev;
    79     short mode;
    80 
    81 #ifdef DEBUG
    82     fprintf( stderr, "reading...reads: [%d] writes: [%d]\n", ++reads, writes );
    83 #endif
    84 
    85     ch = '\0';
    86     do {
    87         ret = read( fd, &ch, 1 );
    88     } while ( ret<0 && errno==EAGAIN );
    89 
    90     if( ret < 0 )
     86};
     87
     88static void
     89pumpList( int i UNUSED, short s UNUSED, void * veh )
     90{
     91    tr_event_handle * eh = veh;
     92
     93    for( ;; )
    9194    {
    92         tr_err( "Couldn't read from libevent pipe: %s", strerror(errno) );
    93     }
    94     else switch( ch )
    95     {
    96         case 'd': /* event_del */
    97             read( fd, &event, sizeof(struct event*) );
    98             event_del( event );
    99             tr_free( event );
     95        struct tr_event_command * cmd;
     96
     97        /* get the next command */
     98        tr_lockLock( eh->lock );
     99        cmd = tr_list_pop_front( &eh->commands );
     100        tr_lockUnlock( eh->lock );
     101        if( cmd == NULL )
    100102            break;
    101103
    102         case 'e': /* event_add */
    103             read( fd, &event, sizeof(struct event*) );
    104             read( fd, &interval, sizeof(struct timeval) );
    105             event_add( event, &interval );
    106             break;
    107 
    108         case 'h': /* http_make_request */
    109             ret = read( fd, &evcon, sizeof(struct evhttp_connection*) );
    110             read( fd, &req, sizeof(struct evhttp_request*) );
    111             read( fd, &type, sizeof(enum evhttp_cmd_type) );
    112             read( fd, &uri, sizeof(char*) );
    113             evhttp_make_request( evcon, req, type, uri );
    114             tr_free( uri );
    115             break;
    116 
    117         case 'm': /* set bufferevent mode */
    118             read( fd, &bufev, sizeof(struct evhttp_request*) );
    119             mode = 0;
    120             read( fd, &mode, sizeof(short) );
    121             bufferevent_enable( bufev, mode );
    122             mode = 0;
    123             read( fd, &mode, sizeof(short) );
    124             bufferevent_disable( bufev, mode );
    125 fprintf( stderr, "after enable/disable, the mode is %hd\n", bufev->enabled );
    126             break;
    127 
    128         case 'w': /* bufferevent_write */
    129             read( fd, &bufev, sizeof(struct bufferevent*) );
    130             read( fd, &buf, sizeof(char*) );
    131             read( fd, &buflen, sizeof(size_t) );
    132             bufferevent_write( bufev, buf, buflen );
    133             tr_free( buf );
    134             break;
    135 
    136         default:
    137             assert( 0 && "unhandled event pipe condition!" );
    138     }
     104        /* process the command */
     105        switch( cmd->mode )
     106        {
     107            case TR_EV_EVENT_DEL:
     108                event_del( cmd->event );
     109                tr_free( cmd->event );
     110                break;
     111
     112            case TR_EV_EVENT_ADD:
     113                event_add( cmd->event, &cmd->interval );
     114                break;
     115
     116            case TR_EV_EVHTTP_MAKE_REQUEST:
     117                evhttp_make_request( cmd->evcon, cmd->req, cmd->evtype, cmd->uri );
     118                tr_free( cmd->uri );
     119                break;
     120
     121           case TR_EV_BUFFEREVENT_SET:
     122                bufferevent_enable( cmd->bufev, cmd->enable );
     123                bufferevent_disable( cmd->bufev, cmd->disable );
     124                break;
     125
     126            case TR_EV_BUFFEREVENT_WRITE:
     127                bufferevent_write( cmd->bufev, cmd->buf, cmd->buflen );
     128                tr_free( cmd->buf );
     129                break;
     130
     131            case TR_EV_BUFFEREVENT_FREE:
     132                bufferevent_free( cmd->bufev );
     133                break;
     134
     135            default:
     136                assert( 0 && "unhandled command type!" );
     137        }
     138
     139        /* cleanup */
     140        tr_free( cmd );
     141    }
     142
     143    timeout_add( &eh->pulse, &eh->pulseInterval );
    139144}
    140145
     
    165170    event_set_log_callback( logFunc );
    166171    evdns_init( );
    167 
    168     /* listen to the pipe's read fd */
    169     event_set( &eh->pipeEvent, eh->fds[0], EV_READ|EV_PERSIST, readFromPipe, NULL );
    170     event_add( &eh->pipeEvent, NULL );
     172    timeout_set( &eh->pulse, pumpList, veh );
     173    timeout_add( &eh->pulse, &eh->pulseInterval );
    171174
    172175    event_dispatch( );
    173176
    174177    evdns_shutdown( FALSE );
    175     event_del( &eh->pipeEvent );
    176178    tr_lockFree( eh->lock );
    177179    event_base_free( eh->base );
     
    188190    eh = tr_new0( tr_event_handle, 1 );
    189191    eh->lock = tr_lockNew( );
    190     pipe( eh->fds );
    191192    eh->h = handle;
     193    eh->pulseInterval = timevalMsec( 20 );
     194    eh->thread = tr_threadNew( libeventThreadFunc, eh, "libeventThreadFunc" );
     195
    192196    handle->events = eh;
    193     eh->thread = tr_threadNew( libeventThreadFunc, eh, "libeventThreadFunc" );
    194197}
    195198
     
    200203
    201204    event_base_loopexit( eh->base, NULL );
     205}
     206
     207/**
     208***
     209**/
     210
     211static void
     212pushList( struct tr_event_handle * eh, struct tr_event_command * command )
     213{
     214    tr_lockLock( eh->lock );
     215    tr_list_append( &eh->commands, command );
     216    tr_lockUnlock( eh->lock );
    202217}
    203218
     
    208223{
    209224    if( tr_amInThread( handle->events->thread ) )
    210     {
    211225        event_add( event, interval );
    212     }
    213     else
    214     {
    215         const char ch = 'e';
    216         int fd = handle->events->fds[1];
    217         tr_lock_t * lock = handle->events->lock;
    218 
    219         tr_lockLock( lock );
    220         tr_dbg( "writing event to pipe: event.ev_arg is %p", event->ev_arg );
    221         write( fd, &ch, 1 );
    222         write( fd, &event, sizeof(struct event*) );
    223         write( fd, interval, sizeof(struct timeval) );
    224         tr_lockUnlock( lock );
     226    else {
     227        struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
     228        cmd->mode = TR_EV_EVENT_ADD;
     229        cmd->event = event;
     230        cmd->interval = *interval;
     231        pushList( handle->events, cmd );
    225232    }
    226233}
     
    230237              struct event   * event )
    231238{
    232     if( tr_amInThread( handle->events->thread ) )
    233     {
     239    if( tr_amInThread( handle->events->thread ) ) {
    234240        event_del( event );
    235241        tr_free( event );
    236     }
    237     else
    238     {
    239         const char ch = 'd';
    240         int fd = handle->events->fds[1];
    241         tr_lock_t * lock = handle->events->lock;
    242 
    243         tr_lockLock( lock );
    244         tr_dbg( "writing event to pipe: del event %p", event );
    245         write( fd, &ch, 1 );
    246         write( fd, &event, sizeof(struct event*) );
    247         tr_lockUnlock( lock );
     242    } else {
     243        struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
     244        cmd->mode = TR_EV_EVENT_DEL;
     245        cmd->event = event;
     246        pushList( handle->events, cmd );
    248247    }
    249248}
     
    256255                        char                      * uri)
    257256{
    258     if( tr_amInThread( handle->events->thread ) )
    259     {
     257    if( tr_amInThread( handle->events->thread ) ) {
    260258        evhttp_make_request( evcon, req, type, uri );
    261259        tr_free( uri );
    262     }
    263     else
    264     {
    265         const char ch = 'h';
    266         int fd = handle->events->fds[1];
    267         tr_lock_t * lock = handle->events->lock;
    268 
    269         tr_lockLock( lock );
    270         tr_dbg( "writing HTTP req to pipe: req.cb_arg is %p", req->cb_arg );
    271         write( fd, &ch, 1 );
    272         write( fd, &evcon, sizeof(struct evhttp_connection*) );
    273         write( fd, &req, sizeof(struct evhttp_request*) );
    274         write( fd, &type, sizeof(enum evhttp_cmd_type) );
    275         write( fd, &uri, sizeof(char*) );
    276         tr_lockUnlock( lock );
     260    } else {
     261        struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
     262        cmd->mode = TR_EV_EVHTTP_MAKE_REQUEST;
     263        cmd->evcon = evcon;
     264        cmd->req = req;
     265        cmd->evtype = type;
     266        cmd->uri = uri;
     267        pushList( handle->events, cmd );
    277268    }
    278269}
     
    285276{
    286277    if( tr_amInThread( handle->events->thread ) )
    287     {
    288278        bufferevent_write( bufev, (void*)buf, buflen );
    289     }
    290     else
    291     {
    292         const char ch = 'w';
    293         int fd = handle->events->fds[1];
    294         tr_lock_t * lock = handle->events->lock;
    295         char * local = tr_strndup( buf, buflen );
    296 
    297         tr_lockLock( lock );
    298         tr_dbg( "writing bufferevent_write pipe" );
    299         write( fd, &ch, 1 );
    300         write( fd, &bufev, sizeof(struct bufferevent*) );
    301         write( fd, &local, sizeof(char*) );
    302         write( fd, &buflen, sizeof(size_t) );
    303         tr_lockUnlock( lock );
     279    else {
     280        struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
     281        cmd->mode = TR_EV_BUFFEREVENT_WRITE;
     282        cmd->bufev = bufev;
     283        cmd->buf = tr_strndup( buf, buflen );
     284        cmd->buflen = buflen;
     285        pushList( handle->events, cmd );
    304286    }
    305287}
     
    311293                       short                mode_disable )
    312294{
    313     if( tr_amInThread( handle->events->thread ) )
    314     {
     295    if( tr_amInThread( handle->events->thread ) ) {
    315296        bufferevent_enable( bufev, mode_enable );
    316297        bufferevent_disable( bufev, mode_disable );
    317     }
    318     else
    319     {
    320         const char ch = 'm';
    321         int fd = handle->events->fds[1];
    322         tr_lock_t * lock = handle->events->lock;
    323 
    324         tr_lockLock( lock );
    325         write( fd, &ch, 1 );
    326         write( fd, &bufev, sizeof(struct bufferevent*) );
    327         write( fd, &mode_enable, sizeof(short) );
    328         write( fd, &mode_disable, sizeof(short) );
    329         tr_lockUnlock( lock );
    330     }
    331 }
     298    } else {
     299        struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
     300        cmd->mode = TR_EV_BUFFEREVENT_SET;
     301        cmd->bufev = bufev;
     302        cmd->enable = mode_enable;
     303        cmd->disable = mode_disable;
     304        pushList( handle->events, cmd );
     305    }
     306}
     307
     308static int
     309compareFunc( const void * va, const void * vb )
     310{
     311    const struct tr_event_command * a = va;
     312    const struct bufferevent * b = vb;
     313    return a->bufev == b ? 0 : 1;
     314}
     315
     316void
     317tr_bufferevent_free( struct tr_handle   * handle,
     318                     struct bufferevent * bufev )
     319{
     320    void * v;
     321    tr_event_handle * eh = handle->events;
     322
     323    /* purge pending commands from the list */
     324    tr_lockLock( eh->lock );
     325    while(( v = tr_list_remove( &eh->commands, bufev, compareFunc ) )) {
     326        fprintf( stderr, "---> I AM PURGING A QUEUED COMMAND BECAUSE ITS BUFEV IS GOING AWAY <--\n" );
     327        tr_free( v );
     328    }
     329    tr_lockUnlock( eh->lock );
     330
     331    if( tr_amInThread( handle->events->thread ) )
     332        bufferevent_free( bufev );
     333    else {
     334        struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
     335        cmd->mode = TR_EV_BUFFEREVENT_FREE;
     336        cmd->bufev = bufev;
     337        pushList( handle->events, cmd );
     338    }
     339}
     340                 
Note: See TracChangeset for help on using the changeset viewer.