source: branches/1.4x/libtransmission/iobuf.c @ 7177

Last change on this file since 7177 was 7177, checked in by charles, 12 years ago

(1.4x libT) backport the bandwidth fixes back to the 1.4x branch, pass #3: add the new files bandwidth.[ch] and iobuf.[ch]

File size: 9.1 KB
Line 
1/*
2 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
3 * All rights reserved.
4 *
5 * Transmission modifications and new bugs by Charles Kerr.
6 * Source: libevent's "patches-1.4" branch, svn revision 949
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 *    notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 *    notice, this list of conditions and the following disclaimer in the
15 *    documentation and/or other materials provided with the distribution.
16 * 3. The name of the author may not be used to endorse or promote products
17 *    derived from this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31#include <sys/types.h>
32
33#ifdef HAVE_CONFIG_H
34#include "config.h"
35#endif
36
37#include <assert.h>
38#include <errno.h>
39#include <stdio.h>
40#include <stdlib.h>
41#include <string.h>
42#include <unistd.h> /* write */
43
44#include "evutil.h"
45#include "event.h"
46
47#include "transmission.h"
48#include "bandwidth.h"
49#include "iobuf.h"
50#include "session.h"
51#include "utils.h"
52
53#define MAGIC_NUMBER 235705
54
55struct tr_iobuf
56{
57    struct event_base * ev_base;
58
59    struct event ev_read;
60    struct event ev_write;
61
62    struct evbuffer * input;
63    struct evbuffer * output;
64
65    tr_iobuf_cb readcb;
66    tr_iobuf_cb writecb;
67    tr_iobuf_error_cb errorcb;
68    void * cbarg;
69
70    int magicNumber;
71
72    int timeout_read;  /* in seconds */
73    int timeout_write; /* in seconds */
74    short enabled;     /* events that are currently enabled */
75
76    struct tr_handle * session;
77    struct tr_bandwidth * bandwidth;
78};
79
80/***
81****
82***/
83
84static int
85isBuf( const struct tr_iobuf * iobuf )
86{
87    return ( iobuf != NULL ) && ( iobuf->magicNumber == MAGIC_NUMBER );
88}
89
90static int
91tr_evbuffer_write( struct evbuffer *buffer, int fd, size_t maxlen )
92{
93    int n = MIN( EVBUFFER_LENGTH( buffer ), maxlen );
94
95#ifdef WIN32
96    n = send(fd, buffer->buffer, n,  0 );
97#else
98    n = write(fd, buffer->buffer, n );
99#endif
100    if( n == -1 )
101        return -1;
102    if (n == 0)
103        return 0;
104    evbuffer_drain( buffer, n );
105
106    return n;
107}
108
109static int
110tr_iobuf_add(struct event *ev, int timeout)
111{
112    struct timeval tv, *ptv = NULL;
113
114    if (timeout) {
115        evutil_timerclear(&tv);
116        tv.tv_sec = timeout;
117        ptv = &tv;
118    }
119
120    return event_add( ev, ptv );
121}
122
123static void
124tr_iobuf_readcb( int fd, short event, void * arg )
125{
126    int res;
127    short what = EVBUFFER_READ;
128    struct tr_iobuf * b = arg;
129    const size_t howmuch = tr_bandwidthClamp( b->bandwidth, TR_DOWN, b->session->so_rcvbuf );
130
131    assert( isBuf( b ) );
132
133    if( event == EV_TIMEOUT ) {
134        what |= EVBUFFER_TIMEOUT;
135        goto error;
136    }
137
138    /* if we don't have any bandwidth left, stop reading */
139    if( howmuch < 1 ) {
140        event_del( &b->ev_read );
141        return;
142    }
143
144    res = evbuffer_read( b->input, fd, howmuch );
145    if( res == -1 ) {
146        if( errno == EAGAIN || errno == EINTR )
147            goto reschedule;
148        /* error case */
149        what |= EVBUFFER_ERROR;
150    } else if( res == 0 ) {
151        /* eof case */
152        what |= EVBUFFER_EOF;
153    }
154
155    if( res <= 0 )
156        goto error;
157
158    tr_iobuf_add( &b->ev_read, b->timeout_read );
159
160    /* Invoke the user callback - must always be called last */
161    if( b->readcb != NULL )
162        ( *b->readcb )( b, res, b->cbarg );
163    return;
164
165 reschedule:
166    tr_iobuf_add( &b->ev_read, b->timeout_read );
167    return;
168
169 error:
170    (*b->errorcb)( b, what, b->cbarg );
171}
172
173static void
174tr_iobuf_writecb( int fd, short event, void * arg )
175{
176    int res = 0;
177    short what = EVBUFFER_WRITE;
178    struct tr_iobuf * b = arg;
179    size_t howmuch;
180
181    assert( isBuf( b ) );
182
183    if( event == EV_TIMEOUT ) {
184        what |= EVBUFFER_TIMEOUT;
185        goto error;
186    }
187
188    howmuch = MIN( (size_t)b->session->so_sndbuf, EVBUFFER_LENGTH( b->output ) );
189    howmuch = tr_bandwidthClamp( b->bandwidth, TR_UP, howmuch );
190
191    /* if we don't have any bandwidth left, stop writing */
192    if( howmuch < 1 ) {
193        event_del( &b->ev_write );
194        return;
195    }
196
197    res = tr_evbuffer_write( b->output, fd, howmuch );
198    if (res == -1) {
199#ifndef WIN32
200/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
201 *set errno. thus this error checking is not portable*/
202        if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS)
203            goto reschedule;
204        /* error case */
205        what |= EVBUFFER_ERROR;
206
207#else
208        goto reschedule;
209#endif
210
211    } else if (res == 0) {
212        /* eof case */
213        what |= EVBUFFER_EOF;
214    }
215    if (res <= 0)
216        goto error;
217
218    if( EVBUFFER_LENGTH( b->output ) )
219        tr_iobuf_add( &b->ev_write, b->timeout_write );
220
221    if( b->writecb != NULL )
222        (*b->writecb)( b, res, b->cbarg );
223
224    return;
225
226 reschedule:
227    if( EVBUFFER_LENGTH( b->output ) )
228        tr_iobuf_add( &b->ev_write, b->timeout_write );
229    return;
230
231 error:
232    (*b->errorcb)( b, what, b->cbarg );
233}
234
235
236/*
237 * Create a new buffered event object.
238 *
239 * The read callback is invoked whenever we read new data.
240 * The write callback is invoked whenever the output buffer is drained.
241 * The error callback is invoked on a write/read error or on EOF.
242 *
243 * Both read and write callbacks maybe NULL.  The error callback is not
244 * allowed to be NULL and have to be provided always.
245 */
246
247struct tr_iobuf *
248tr_iobuf_new( struct tr_handle    * session,
249              tr_bandwidth        * bandwidth,
250              int                   fd,
251              short                 event,
252              tr_iobuf_cb           readcb,
253              tr_iobuf_cb           writecb,
254              tr_iobuf_error_cb     errorcb,
255              void                * cbarg )
256{
257    struct tr_iobuf * b;
258
259    b = tr_new0( struct tr_iobuf, 1 );
260    b->magicNumber = MAGIC_NUMBER;
261    b->session = session;
262    b->bandwidth = bandwidth;
263    b->input = evbuffer_new( );
264    b->output = evbuffer_new( );
265
266    event_set( &b->ev_read, fd, EV_READ, tr_iobuf_readcb, b );
267    event_set( &b->ev_write, fd, EV_WRITE, tr_iobuf_writecb, b );
268
269    tr_iobuf_setcb( b, readcb, writecb, errorcb, cbarg );
270    tr_iobuf_enable( b, event );
271
272    return b;
273}
274
275void
276tr_iobuf_setcb( struct tr_iobuf    * b,
277                tr_iobuf_cb          readcb,
278                tr_iobuf_cb          writecb,
279                tr_iobuf_error_cb    errorcb,
280                void               * cbarg )
281{
282    assert( isBuf( b ) );
283
284    b->readcb = readcb;
285    b->writecb = writecb;
286    b->errorcb = errorcb;
287    b->cbarg = cbarg;
288}
289
290/* Closing the file descriptor is the responsibility of the caller */
291
292void
293tr_iobuf_free( struct tr_iobuf * b )
294{
295    assert( isBuf( b ) );
296
297    b->magicNumber = 0xDEAD;
298    event_del( &b->ev_read );
299    event_del( &b->ev_write );
300    evbuffer_free( b->input );
301    evbuffer_free( b->output );
302    tr_free( b );
303}
304
305int
306tr_iobuf_enable(struct tr_iobuf * b, short event )
307{
308    assert( isBuf( b ) );
309
310    if( event & EV_READ )
311        if( tr_iobuf_add( &b->ev_read, b->timeout_read ) == -1 )
312            return -1;
313
314    if( event & EV_WRITE )
315        if ( tr_iobuf_add( &b->ev_write, b->timeout_write ) == -1 )
316            return -1;
317
318    b->enabled |= event;
319    return 0;
320}
321
322int
323tr_iobuf_disable( struct tr_iobuf * b, short event )
324{
325    assert( isBuf( b ) );
326
327    if( event & EV_READ )
328        if( event_del( &b->ev_read ) == -1 )
329            return -1;
330
331    if( event & EV_WRITE )
332        if( event_del( &b->ev_write ) == -1 )
333            return -1;
334
335    b->enabled &= ~event;
336    return 0;
337}
338
339void
340tr_iobuf_settimeout( struct tr_iobuf  * b,
341                     int                timeout_read,
342                     int                timeout_write )
343{
344    assert( isBuf( b ) );
345
346    b->timeout_read = timeout_read;
347    if( event_pending( &b->ev_read, EV_READ, NULL ) )
348        tr_iobuf_add( &b->ev_read, timeout_read );
349
350    b->timeout_write = timeout_write;
351    if( event_pending( &b->ev_write, EV_WRITE, NULL ) )
352        tr_iobuf_add( &b->ev_write, timeout_write );
353}
354
355void
356tr_iobuf_set_bandwidth( struct tr_iobuf      * b,
357                        struct tr_bandwidth  * bandwidth )
358{
359    assert( isBuf( b ) );
360
361    b->bandwidth = bandwidth;
362}
363
364struct evbuffer*
365tr_iobuf_input( struct tr_iobuf * b )
366{
367    assert( isBuf( b ) );
368
369    return b->input;
370}
371
372struct evbuffer*
373tr_iobuf_output( struct tr_iobuf * b )
374{
375    assert( isBuf( b ) );
376
377    return b->output;
378}
Note: See TracBrowser for help on using the repository browser.