source: branches/2.0x/libtransmission/trevent.c @ 10886

Last change on this file since 10886 was 10886, checked in by charles, 12 years ago

(2.0x libT) remove an unnecessary debugging message that shows up in the console when starting 2.00

  • Property svn:keywords set to Date Rev Author Id
File size: 7.6 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: trevent.c 10886 2010-06-26 21:10:05Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdlib.h>
16#include <string.h>
17#include <stdio.h>
18
19#include <signal.h>
20
21#include <event.h>
22
23#include "transmission.h"
24#include "net.h"
25#include "session.h"
26
27#ifdef WIN32
28
29#include <WinSock2.h>
30
31static int
32pgpipe( int handles[2] )
33{
34        SOCKET s;
35        struct sockaddr_in serv_addr;
36        int len = sizeof( serv_addr );
37
38        handles[0] = handles[1] = INVALID_SOCKET;
39
40        if ( ( s = socket( AF_INET, SOCK_STREAM, 0 ) ) == INVALID_SOCKET )
41        {
42/*              ereport(LOG, (errmsg_internal("pgpipe failed to create socket: %ui", WSAGetLastError()))); */
43                return -1;
44        }
45
46        memset( &serv_addr, 0, sizeof( serv_addr ) );
47        serv_addr.sin_family = AF_INET;
48        serv_addr.sin_port = htons(0);
49        serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
50        if (bind(s, (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
51        {
52/*              ereport(LOG, (errmsg_internal("pgpipe failed to bind: %ui", WSAGetLastError()))); */
53                closesocket(s);
54                return -1;
55        }
56        if (listen(s, 1) == SOCKET_ERROR)
57        {
58/*              ereport(LOG, (errmsg_internal("pgpipe failed to listen: %ui", WSAGetLastError()))); */
59                closesocket(s);
60                return -1;
61        }
62        if (getsockname(s, (SOCKADDR *) & serv_addr, &len) == SOCKET_ERROR)
63        {
64/*              ereport(LOG, (errmsg_internal("pgpipe failed to getsockname: %ui", WSAGetLastError()))); */
65                closesocket(s);
66                return -1;
67        }
68        if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
69        {
70/*              ereport(LOG, (errmsg_internal("pgpipe failed to create socket 2: %ui", WSAGetLastError()))); */
71                closesocket(s);
72                return -1;
73        }
74
75        if (connect(handles[1], (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
76        {
77/*              ereport(LOG, (errmsg_internal("pgpipe failed to connect socket: %ui", WSAGetLastError()))); */
78                closesocket(s);
79                return -1;
80        }
81        if ((handles[0] = accept(s, (SOCKADDR *) & serv_addr, &len)) == INVALID_SOCKET)
82        {
83/*              ereport(LOG, (errmsg_internal("pgpipe failed to accept socket: %ui", WSAGetLastError()))); */
84                closesocket(handles[1]);
85                handles[1] = INVALID_SOCKET;
86                closesocket(s);
87                return -1;
88        }
89        closesocket(s);
90        return 0;
91}
92
93static int
94piperead( int s, char *buf, int len )
95{
96        int ret = recv(s, buf, len, 0);
97
98        if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
99                /* EOF on the pipe! (win32 socket based implementation) */
100                ret = 0;
101        return ret;
102}
103
104#define pipe(a) pgpipe(a)
105#define pipewrite(a,b,c) send(a,(char*)b,c,0)
106
107#else
108#define piperead(a,b,c) read(a,b,c)
109#define pipewrite(a,b,c) write(a,b,c)
110#endif
111
112#include <unistd.h>
113
114#include <event.h>
115
116#include "transmission.h"
117#include "platform.h"
118#include "trevent.h"
119#include "utils.h"
120
121/***
122****
123***/
124
125typedef struct tr_event_handle
126{
127    uint8_t      die;
128    int          fds[2];
129    tr_lock *    lock;
130    tr_session *  session;
131    tr_thread *  thread;
132    struct event_base * base;
133    struct event pipeEvent;
134}
135tr_event_handle;
136
137struct tr_run_data
138{
139    void    ( *func )( void * );
140    void *  user_data;
141};
142
143#define dbgmsg( ... ) \
144    do { \
145        if( tr_deepLoggingIsActive( ) ) \
146            tr_deepLog( __FILE__, __LINE__, "event", __VA_ARGS__ ); \
147    } while( 0 )
148
149static void
150readFromPipe( int    fd,
151              short  eventType,
152              void * veh )
153{
154    char              ch;
155    int               ret;
156    tr_event_handle * eh = veh;
157
158    dbgmsg( "readFromPipe: eventType is %hd", eventType );
159
160    /* read the command type */
161    ch = '\0';
162    do
163    {
164        ret = piperead( fd, &ch, 1 );
165    }
166    while( !eh->die && ret < 0 && errno == EAGAIN );
167
168    dbgmsg( "command is [%c], ret is %d, errno is %d", ch, ret, (int)errno );
169
170    switch( ch )
171    {
172        case 'r': /* run in libevent thread */
173        {
174            struct tr_run_data data;
175            const size_t       nwant = sizeof( data );
176            const ssize_t      ngot = piperead( fd, &data, nwant );
177            if( !eh->die && ( ngot == (ssize_t)nwant ) )
178            {
179                dbgmsg( "invoking function in libevent thread" );
180                ( data.func )( data.user_data );
181            }
182            break;
183        }
184
185        case '\0': /* eof */
186        {
187            dbgmsg( "pipe eof reached... removing event listener" );
188            event_del( &eh->pipeEvent );
189            break;
190        }
191
192        default:
193        {
194            assert( 0 && "unhandled command type!" );
195            break;
196        }
197    }
198}
199
200static void
201logFunc( int severity, const char * message )
202{
203    if( severity >= _EVENT_LOG_ERR )
204        tr_err( "%s", message );
205    else
206        tr_dbg( "%s", message );
207}
208
209static void
210libeventThreadFunc( void * veh )
211{
212    tr_event_handle * eh = veh;
213
214#ifndef WIN32
215    /* Don't exit when writing on a broken socket */
216    signal( SIGPIPE, SIG_IGN );
217#endif
218
219    eh->base = event_init( );
220    eh->session->events = eh;
221
222    /* listen to the pipe's read fd */
223    event_set( &eh->pipeEvent, eh->fds[0], EV_READ | EV_PERSIST, readFromPipe, veh );
224    event_add( &eh->pipeEvent, NULL );
225    event_set_log_callback( logFunc );
226
227    /* loop until all the events are done */
228    while( !eh->die )
229        event_dispatch( );
230
231    /* shut down the thread */
232    tr_lockFree( eh->lock );
233    event_base_free( eh->base );
234    eh->session->events = NULL;
235    tr_free( eh );
236    tr_dbg( "Closing libevent thread" );
237}
238
239void
240tr_eventInit( tr_session * session )
241{
242    tr_event_handle * eh;
243
244    session->events = NULL;
245
246    eh = tr_new0( tr_event_handle, 1 );
247    eh->lock = tr_lockNew( );
248    pipe( eh->fds );
249    eh->session = session;
250    eh->thread = tr_threadNew( libeventThreadFunc, eh );
251
252    /* wait until the libevent thread is running */
253    while( session->events == NULL )
254        tr_wait_msec( 100 );
255}
256
257void
258tr_eventClose( tr_session * session )
259{
260    assert( tr_isSession( session ) );
261
262    session->events->die = TRUE;
263    tr_deepLog( __FILE__, __LINE__, NULL, "closing trevent pipe" );
264    tr_netCloseSocket( session->events->fds[1] );
265}
266
267/**
268***
269**/
270
271tr_bool
272tr_amInEventThread( const tr_session * session )
273{
274    assert( tr_isSession( session ) );
275    assert( session->events != NULL );
276
277    return tr_amInThread( session->events->thread );
278}
279
280/**
281***
282**/
283
284void
285tr_runInEventThread( tr_session * session,
286                     void func( void* ), void * user_data )
287{
288    assert( tr_isSession( session ) );
289    assert( session->events != NULL );
290
291    if( tr_amInThread( session->events->thread ) )
292    {
293        (func)( user_data );
294    }
295    else
296    {
297        const char         ch = 'r';
298        int                fd = session->events->fds[1];
299        tr_lock *          lock = session->events->lock;
300        struct tr_run_data data;
301
302        tr_lockLock( lock );
303        pipewrite( fd, &ch, 1 );
304        data.func = func;
305        data.user_data = user_data;
306        pipewrite( fd, &data, sizeof( data ) );
307        tr_lockUnlock( lock );
308    }
309}
310
311struct event_base *
312tr_eventGetBase( tr_session * session )
313{
314    assert( tr_isSession( session ) );
315
316    return session->events->base;
317}
Note: See TracBrowser for help on using the repository browser.