source: trunk/libtransmission/iobuf.c @ 7154

Last change on this file since 7154 was 7154, checked in by charles, 12 years ago

(libT) yet another stab at getting bandwidth management under control. this version may suck less than previous attempts. It also breaks the mac build until someone adds iobuf.[ch] to xcode...

File size: 9.1 KB
Line 
1/*
2 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
3 * All rights reserved.
4 *
5 * Transmission modifications and new bugs by Charles Kerr.
6 * Source: libevent's "patches-1.4" branch, svn revision 949
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 *    notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 *    notice, this list of conditions and the following disclaimer in the
15 *    documentation and/or other materials provided with the distribution.
16 * 3. The name of the author may not be used to endorse or promote products
17 *    derived from this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31#include <sys/types.h>
32
33#ifdef HAVE_CONFIG_H
34#include "config.h"
35#endif
36
37#include <assert.h>
38#include <errno.h>
39#include <stdio.h>
40#include <stdlib.h>
41#include <string.h>
42#include <unistd.h> /* write */
43
44#include "evutil.h"
45#include "event.h"
46
47#include "transmission.h"
48#include "bandwidth.h"
49#include "iobuf.h"
50#include "session.h"
51#include "utils.h"
52
53#define MAGIC_NUMBER 235705
54
55struct tr_iobuf
56{
57    struct event_base * ev_base;
58
59    struct event ev_read;
60    struct event ev_write;
61
62    struct evbuffer * input;
63    struct evbuffer * output;
64
65    tr_iobuf_cb readcb;
66    tr_iobuf_cb writecb;
67    tr_iobuf_error_cb errorcb;
68    void * cbarg;
69
70    int magicNumber;
71
72    int timeout_read;  /* in seconds */
73    int timeout_write; /* in seconds */
74    short enabled;     /* events that are currently enabled */
75
76    struct tr_handle * session;
77    struct tr_bandwidth * bandwidth;
78};
79
80/***
81****
82***/
83
84static int
85isBuf( const struct tr_iobuf * iobuf )
86{
87    return ( iobuf != NULL ) && ( iobuf->magicNumber == MAGIC_NUMBER );
88}
89
90static int
91tr_evbuffer_write( struct evbuffer *buffer, int fd, size_t maxlen )
92{
93    int n = MIN( EVBUFFER_LENGTH( buffer ), maxlen );
94
95#ifdef WIN32
96    n = send(fd, buffer->buffer, n,  0 );
97#else
98    n = write(fd, buffer->buffer, n );
99#endif
100    if( n == -1 )
101        return -1;
102    if (n == 0)
103        return 0;
104    evbuffer_drain( buffer, n );
105
106    return n;
107}
108
109static int
110tr_iobuf_add(struct event *ev, int timeout)
111{
112    struct timeval tv, *ptv = NULL;
113
114    if (timeout) {
115        evutil_timerclear(&tv);
116        tv.tv_sec = timeout;
117        ptv = &tv;
118    }
119
120    return event_add( ev, ptv );
121}
122
123static void
124tr_iobuf_readcb( int fd, short event, void * arg )
125{
126    int res;
127    short what = EVBUFFER_READ;
128    struct tr_iobuf * b = arg;
129    const size_t howmuch = tr_bandwidthClamp( b->bandwidth, TR_DOWN, 2048 );
130
131    assert( isBuf( b ) );
132
133    if( event == EV_TIMEOUT ) {
134        what |= EVBUFFER_TIMEOUT;
135        goto error;
136    }
137
138    /* if we don't have any bandwidth left, stop reading */
139    if( howmuch < 1 ) {
140        event_del( &b->ev_read );
141        return;
142    }
143
144    res = evbuffer_read( b->input, fd, howmuch );
145    if( res == -1 ) {
146        if( errno == EAGAIN || errno == EINTR )
147            goto reschedule;
148        /* error case */
149        what |= EVBUFFER_ERROR;
150    } else if( res == 0 ) {
151        /* eof case */
152        what |= EVBUFFER_EOF;
153    }
154
155    if( res <= 0 )
156        goto error;
157
158    tr_iobuf_add( &b->ev_read, b->timeout_read );
159
160    /* Invoke the user callback - must always be called last */
161    if( b->readcb != NULL )
162        ( *b->readcb )( b, res, b->cbarg );
163    return;
164
165 reschedule:
166    tr_iobuf_add( &b->ev_read, b->timeout_read );
167    return;
168
169 error:
170    (*b->errorcb)( b, what, b->cbarg );
171}
172
173static void
174tr_iobuf_writecb( int fd, short event, void * arg )
175{
176    int res = 0;
177    short what = EVBUFFER_WRITE;
178    struct tr_iobuf * b = arg;
179    size_t howmuch;
180
181    assert( isBuf( b ) );
182
183    if( event == EV_TIMEOUT ) {
184        what |= EVBUFFER_TIMEOUT;
185        goto error;
186    }
187
188    howmuch = MIN( (size_t)b->session->so_sndbuf, EVBUFFER_LENGTH( b->output ) );
189    howmuch = tr_bandwidthClamp( b->bandwidth, TR_UP, howmuch );
190
191    /* if we don't have any bandwidth left, stop writing */
192    if( howmuch < 1 ) {
193        event_del( &b->ev_write );
194        return;
195    }
196
197    res = tr_evbuffer_write( b->output, fd, howmuch );
198    if (res == -1) {
199#ifndef WIN32
200/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
201 *set errno. thus this error checking is not portable*/
202        if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS)
203            goto reschedule;
204        /* error case */
205        what |= EVBUFFER_ERROR;
206
207#else
208        goto reschedule;
209#endif
210
211    } else if (res == 0) {
212        /* eof case */
213        what |= EVBUFFER_EOF;
214    }
215    if (res <= 0)
216        goto error;
217
218    if( EVBUFFER_LENGTH( b->output ) )
219        tr_iobuf_add( &b->ev_write, b->timeout_write );
220
221    if( b->writecb != NULL )
222        (*b->writecb)( b, res, b->cbarg );
223
224    return;
225
226 reschedule:
227    if( EVBUFFER_LENGTH( b->output ) )
228        tr_iobuf_add( &b->ev_write, b->timeout_write );
229    return;
230
231 error:
232    (*b->errorcb)( b, what, b->cbarg );
233}
234
235
236/*
237 * Create a new buffered event object.
238 *
239 * The read callback is invoked whenever we read new data.
240 * The write callback is invoked whenever the output buffer is drained.
241 * The error callback is invoked on a write/read error or on EOF.
242 *
243 * Both read and write callbacks maybe NULL.  The error callback is not
244 * allowed to be NULL and have to be provided always.
245 */
246
247struct tr_iobuf *
248tr_iobuf_new( struct tr_handle    * session,
249              tr_bandwidth        * bandwidth,
250              int                   fd,
251              short                 event,
252              tr_iobuf_cb           readcb,
253              tr_iobuf_cb           writecb,
254              tr_iobuf_error_cb     errorcb,
255              void                * cbarg )
256{
257    struct tr_iobuf * b;
258
259    b = tr_new0( struct tr_iobuf, 1 );
260    b->magicNumber = MAGIC_NUMBER;
261    b->session = session;
262    b->bandwidth = bandwidth;
263    b->input = evbuffer_new( );
264    b->output = evbuffer_new( );
265
266    event_set( &b->ev_read, fd, EV_READ, tr_iobuf_readcb, b );
267    event_set( &b->ev_write, fd, EV_WRITE, tr_iobuf_writecb, b );
268
269    tr_iobuf_setcb( b, readcb, writecb, errorcb, cbarg );
270    tr_iobuf_enable( b, event );
271
272    return b;
273}
274
275void
276tr_iobuf_setcb( struct tr_iobuf    * b,
277                tr_iobuf_cb          readcb,
278                tr_iobuf_cb          writecb,
279                tr_iobuf_error_cb    errorcb,
280                void               * cbarg )
281{
282    assert( isBuf( b ) );
283
284    b->readcb = readcb;
285    b->writecb = writecb;
286    b->errorcb = errorcb;
287    b->cbarg = cbarg;
288}
289
290/* Closing the file descriptor is the responsibility of the caller */
291
292void
293tr_iobuf_free( struct tr_iobuf * b )
294{
295    assert( isBuf( b ) );
296
297    b->magicNumber = 0xDEAD;
298    event_del( &b->ev_read );
299    event_del( &b->ev_write );
300    evbuffer_free( b->input );
301    evbuffer_free( b->output );
302    tr_free( b );
303}
304
305int
306tr_iobuf_enable(struct tr_iobuf * b, short event )
307{
308    assert( isBuf( b ) );
309
310    if( event & EV_READ )
311        if( tr_iobuf_add( &b->ev_read, b->timeout_read ) == -1 )
312            return -1;
313
314    if( event & EV_WRITE )
315        if ( tr_iobuf_add( &b->ev_write, b->timeout_write ) == -1 )
316            return -1;
317
318    b->enabled |= event;
319    return 0;
320}
321
322int
323tr_iobuf_disable( struct tr_iobuf * b, short event )
324{
325    assert( isBuf( b ) );
326
327    if( event & EV_READ )
328        if( event_del( &b->ev_read ) == -1 )
329            return -1;
330
331    if( event & EV_WRITE )
332        if( event_del( &b->ev_write ) == -1 )
333            return -1;
334
335    b->enabled &= ~event;
336    return 0;
337}
338
339void
340tr_iobuf_settimeout( struct tr_iobuf  * b,
341                     int                timeout_read,
342                     int                timeout_write )
343{
344    assert( isBuf( b ) );
345
346    b->timeout_read = timeout_read;
347    if( event_pending( &b->ev_read, EV_READ, NULL ) )
348        tr_iobuf_add( &b->ev_read, timeout_read );
349
350    b->timeout_write = timeout_write;
351    if( event_pending( &b->ev_write, EV_WRITE, NULL ) )
352        tr_iobuf_add( &b->ev_write, timeout_write );
353}
354
355void
356tr_iobuf_set_bandwidth( struct tr_iobuf      * b,
357                        struct tr_bandwidth  * bandwidth )
358{
359    assert( isBuf( b ) );
360
361    b->bandwidth = bandwidth;
362}
363
364struct evbuffer*
365tr_iobuf_input( struct tr_iobuf * b )
366{
367    assert( isBuf( b ) );
368
369    return b->input;
370}
371
372struct evbuffer*
373tr_iobuf_output( struct tr_iobuf * b )
374{
375    assert( isBuf( b ) );
376
377    return b->output;
378}
Note: See TracBrowser for help on using the repository browser.