source: branches/1.4x/libtransmission/iobuf.c @ 7403

Last change on this file since 7403 was 7403, checked in by charles, 12 years ago

(1.4x libT) probable fix for the "greedy peer" bug

File size: 10.2 KB
Line 
1/*
2 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
3 * All rights reserved.
4 *
5 * Transmission modifications and new bugs by Charles Kerr.
6 * Source: libevent's "patches-1.4" branch, svn revision 949
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 *    notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 *    notice, this list of conditions and the following disclaimer in the
15 *    documentation and/or other materials provided with the distribution.
16 * 3. The name of the author may not be used to endorse or promote products
17 *    derived from this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31#include <sys/types.h>
32
33#ifdef HAVE_CONFIG_H
34#include "config.h"
35#endif
36
37#include <assert.h>
38#include <errno.h>
39#include <stdio.h>
40#include <stdlib.h>
41#include <string.h>
42#include <unistd.h> /* write */
43
44#include "evutil.h"
45#include "event.h"
46
47#include "transmission.h"
48#include "bandwidth.h"
49#include "iobuf.h"
50#include "session.h"
51#include "utils.h"
52
53#define MAGIC_NUMBER 235705
54
55struct tr_iobuf
56{
57    struct event_base * ev_base;
58
59    struct event ev_read;
60    struct event ev_write;
61
62    struct evbuffer * input;
63    struct evbuffer * output;
64
65    tr_iobuf_cb readcb;
66    tr_iobuf_cb writecb;
67    tr_iobuf_error_cb errorcb;
68    void * cbarg;
69
70    int magicNumber;
71
72    int fd;
73
74    int timeout_read;  /* in seconds */
75    int timeout_write; /* in seconds */
76    short enabled;     /* events that are currently enabled */
77
78    struct tr_handle * session;
79    struct tr_bandwidth * bandwidth;
80};
81
82/***
83****
84***/
85
86static int
87isBuf( const struct tr_iobuf * iobuf )
88{
89    return ( iobuf != NULL ) && ( iobuf->magicNumber == MAGIC_NUMBER );
90}
91
92static int
93tr_evbuffer_write( struct evbuffer *buffer, int fd, size_t howmuch )
94{
95    int n = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
96
97#ifdef WIN32
98    n = send(fd, buffer->buffer, n,  0 );
99#else
100    n = write(fd, buffer->buffer, n );
101#endif
102    if( n == -1 )
103        return -1;
104    if (n == 0)
105        return 0;
106    evbuffer_drain( buffer, n );
107
108    return n;
109}
110
111int
112tr_iobuf_flush_output_buffer( struct tr_iobuf * b, size_t howmuch )
113{
114    int res;
115
116    assert( isBuf( b ) );
117
118    howmuch = tr_bandwidthClamp( b->bandwidth, TR_UP, howmuch );
119    howmuch = MIN( howmuch, EVBUFFER_LENGTH( b->output ) );
120
121    res = howmuch ? tr_evbuffer_write( b->output, b->fd, howmuch ) : 0;
122
123    if( ( res > 0 ) && ( b->writecb != NULL ) )
124        (*b->writecb)( b, (size_t)res, b->cbarg );
125
126    if( ( res < 0 ) && ( b->errorcb != NULL ) && ( errno != EAGAIN && errno != EINTR && errno != EINPROGRESS ) )
127        (*b->errorcb)( b, (short)res, b->cbarg );
128
129    return res;
130}
131
132int
133tr_iobuf_tryread( struct tr_iobuf * b, size_t howmuch )
134{
135    int res;
136
137    assert( isBuf( b ) );
138
139    howmuch = tr_bandwidthClamp( b->bandwidth, TR_DOWN, howmuch );
140
141    res = howmuch ? evbuffer_read( b->input, b->fd, howmuch ) : 0;
142
143    if( ( res > 0 ) && ( b->readcb != NULL ) )
144        (*b->readcb)( b, (size_t)res, b->cbarg );
145
146    if( ( res < 0 ) && ( b->errorcb != NULL ) && ( errno != EAGAIN && errno != EINTR ) )
147        (*b->errorcb)( b, (short)res, b->cbarg );
148
149    return res;
150}
151
152
153static int
154tr_iobuf_add(struct event *ev, int timeout)
155{
156    struct timeval tv, *ptv = NULL;
157
158    if (timeout) {
159        evutil_timerclear(&tv);
160        tv.tv_sec = timeout;
161        ptv = &tv;
162    }
163
164    return event_add( ev, ptv );
165}
166
167static void
168tr_iobuf_readcb( int fd, short event, void * arg )
169{
170    int res;
171    short what = EVBUFFER_READ;
172    struct tr_iobuf * b = arg;
173    const size_t howmuch = tr_bandwidthClamp( b->bandwidth, TR_DOWN, b->session->so_rcvbuf );
174
175    assert( isBuf( b ) );
176
177    if( event == EV_TIMEOUT ) {
178        what |= EVBUFFER_TIMEOUT;
179        goto error;
180    }
181
182    /* if we don't have any bandwidth left, stop reading */
183    if( howmuch < 1 ) {
184        event_del( &b->ev_read );
185        return;
186    }
187
188    res = evbuffer_read( b->input, fd, howmuch );
189    if( res == -1 ) {
190        if( errno == EAGAIN || errno == EINTR )
191            goto reschedule;
192        /* error case */
193        what |= EVBUFFER_ERROR;
194    } else if( res == 0 ) {
195        /* eof case */
196        what |= EVBUFFER_EOF;
197    }
198
199    if( res <= 0 )
200        goto error;
201
202    tr_iobuf_add( &b->ev_read, b->timeout_read );
203
204    /* Invoke the user callback - must always be called last */
205    if( b->readcb != NULL )
206        ( *b->readcb )( b, res, b->cbarg );
207    return;
208
209 reschedule:
210    tr_iobuf_add( &b->ev_read, b->timeout_read );
211    return;
212
213 error:
214    (*b->errorcb)( b, what, b->cbarg );
215}
216
217static void
218tr_iobuf_writecb( int fd, short event, void * arg )
219{
220    int res = 0;
221    short what = EVBUFFER_WRITE;
222    struct tr_iobuf * b = arg;
223    size_t howmuch;
224
225    assert( isBuf( b ) );
226
227    if( event == EV_TIMEOUT ) {
228        what |= EVBUFFER_TIMEOUT;
229        goto error;
230    }
231
232    howmuch = MIN( (size_t)b->session->so_sndbuf, EVBUFFER_LENGTH( b->output ) );
233    howmuch = tr_bandwidthClamp( b->bandwidth, TR_UP, howmuch );
234
235    /* if we don't have any bandwidth left, stop writing */
236    if( howmuch < 1 ) {
237        event_del( &b->ev_write );
238        return;
239    }
240
241    res = tr_evbuffer_write( b->output, fd, howmuch );
242    if (res == -1) {
243#ifndef WIN32
244/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
245 *set errno. thus this error checking is not portable*/
246        if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS)
247            goto reschedule;
248        /* error case */
249        what |= EVBUFFER_ERROR;
250
251#else
252        goto reschedule;
253#endif
254
255    } else if (res == 0) {
256        /* eof case */
257        what |= EVBUFFER_EOF;
258    }
259    if (res <= 0)
260        goto error;
261
262    if( EVBUFFER_LENGTH( b->output ) )
263        tr_iobuf_add( &b->ev_write, b->timeout_write );
264
265    if( b->writecb != NULL )
266        (*b->writecb)( b, res, b->cbarg );
267
268    return;
269
270 reschedule:
271    if( EVBUFFER_LENGTH( b->output ) )
272        tr_iobuf_add( &b->ev_write, b->timeout_write );
273    return;
274
275 error:
276    (*b->errorcb)( b, what, b->cbarg );
277}
278
279
280/*
281 * Create a new buffered event object.
282 *
283 * The read callback is invoked whenever we read new data.
284 * The write callback is invoked whenever the output buffer is drained.
285 * The error callback is invoked on a write/read error or on EOF.
286 *
287 * Both read and write callbacks maybe NULL.  The error callback is not
288 * allowed to be NULL and have to be provided always.
289 */
290
291struct tr_iobuf *
292tr_iobuf_new( struct tr_handle    * session,
293              tr_bandwidth        * bandwidth,
294              int                   fd,
295              short                 event,
296              tr_iobuf_cb           readcb,
297              tr_iobuf_cb           writecb,
298              tr_iobuf_error_cb     errorcb,
299              void                * cbarg )
300{
301    struct tr_iobuf * b;
302
303    b = tr_new0( struct tr_iobuf, 1 );
304    b->magicNumber = MAGIC_NUMBER;
305    b->fd = fd;
306    b->session = session;
307    b->bandwidth = bandwidth;
308    b->input = evbuffer_new( );
309    b->output = evbuffer_new( );
310
311    event_set( &b->ev_read, fd, EV_READ, tr_iobuf_readcb, b );
312    event_set( &b->ev_write, fd, EV_WRITE, tr_iobuf_writecb, b );
313
314    tr_iobuf_setcb( b, readcb, writecb, errorcb, cbarg );
315    tr_iobuf_enable( b, event );
316
317    return b;
318}
319
320void
321tr_iobuf_setcb( struct tr_iobuf    * b,
322                tr_iobuf_cb          readcb,
323                tr_iobuf_cb          writecb,
324                tr_iobuf_error_cb    errorcb,
325                void               * cbarg )
326{
327    assert( isBuf( b ) );
328
329    b->readcb = readcb;
330    b->writecb = writecb;
331    b->errorcb = errorcb;
332    b->cbarg = cbarg;
333}
334
335/* Closing the file descriptor is the responsibility of the caller */
336
337void
338tr_iobuf_free( struct tr_iobuf * b )
339{
340    assert( isBuf( b ) );
341
342    b->magicNumber = 0xDEAD;
343    event_del( &b->ev_read );
344    event_del( &b->ev_write );
345    evbuffer_free( b->input );
346    evbuffer_free( b->output );
347    tr_free( b );
348}
349
350int
351tr_iobuf_enable(struct tr_iobuf * b, short event )
352{
353    assert( isBuf( b ) );
354
355    if( event & EV_READ )
356        if( tr_iobuf_add( &b->ev_read, b->timeout_read ) == -1 )
357            return -1;
358
359    if( event & EV_WRITE )
360        if ( tr_iobuf_add( &b->ev_write, b->timeout_write ) == -1 )
361            return -1;
362
363    b->enabled |= event;
364    return 0;
365}
366
367int
368tr_iobuf_disable( struct tr_iobuf * b, short event )
369{
370    assert( isBuf( b ) );
371
372    if( event & EV_READ )
373        if( event_del( &b->ev_read ) == -1 )
374            return -1;
375
376    if( event & EV_WRITE )
377        if( event_del( &b->ev_write ) == -1 )
378            return -1;
379
380    b->enabled &= ~event;
381    return 0;
382}
383
384void
385tr_iobuf_settimeout( struct tr_iobuf  * b,
386                     int                timeout_read,
387                     int                timeout_write )
388{
389    assert( isBuf( b ) );
390
391    b->timeout_read = timeout_read;
392    if( event_pending( &b->ev_read, EV_READ, NULL ) )
393        tr_iobuf_add( &b->ev_read, timeout_read );
394
395    b->timeout_write = timeout_write;
396    if( event_pending( &b->ev_write, EV_WRITE, NULL ) )
397        tr_iobuf_add( &b->ev_write, timeout_write );
398}
399
400void
401tr_iobuf_set_bandwidth( struct tr_iobuf      * b,
402                        struct tr_bandwidth  * bandwidth )
403{
404    assert( isBuf( b ) );
405
406    b->bandwidth = bandwidth;
407}
408
409struct evbuffer*
410tr_iobuf_input( struct tr_iobuf * b )
411{
412    assert( isBuf( b ) );
413
414    return b->input;
415}
416
417struct evbuffer*
418tr_iobuf_output( struct tr_iobuf * b )
419{
420    assert( isBuf( b ) );
421
422    return b->output;
423}
Note: See TracBrowser for help on using the repository browser.