Ignore:
Timestamp:
Dec 20, 2008, 10:19:34 PM (12 years ago)
Author:
charles
Message:

try to rework the bandwidth code yet again s.t. it satisfies all three: (1) fairly distributes bandwidth across all peers, (2) scales well in high-bandwidth situations, (3) is good at hitting and staying at bandwidth limits/goals

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/libtransmission/peer-io.c

    r7435 r7441  
    3535
    3636#define MAGIC_NUMBER 206745
    37 #define IO_TIMEOUT_SECS 8
    3837
    3938static size_t
     
    8887
    8988    uint8_t            encryptionMode;
    90     uint8_t            timeout;
    9189    tr_port            port;
    9290    int                socket;
     
    112110    struct evbuffer  * inbuf;
    113111    struct evbuffer  * outbuf;
     112
     113    struct event       event_read;
     114    struct event       event_write;
    114115};
    115116
     
    119120
    120121static void
    121 didWriteWrapper( void     * unused UNUSED,
    122                  size_t     bytes_transferred,
    123                  void     * vio )
    124 {
    125     tr_peerIo *  io = vio;
    126 
     122didWriteWrapper( tr_peerIo * io, size_t bytes_transferred )
     123{
    127124    while( bytes_transferred )
    128125    {
     
    147144
    148145static void
    149 canReadWrapper( void    * unused UNUSED,
    150                 size_t    bytes_transferred UNUSED,
    151                 void    * vio )
    152 {
    153     int          done = 0;
    154     int          err = 0;
    155     tr_peerIo *  io = vio;
     146canReadWrapper( tr_peerIo * io )
     147{
     148    tr_bool done = 0;
     149    tr_bool err = 0;
    156150    tr_session * session = io->session;
    157151
     
    169163            const int ret = io->canRead( io, io->userData, &piece );
    170164
    171             if( ret != READ_ERR )
    172             {
    173                 const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
    174                 if( piece )
    175                     tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
    176                 if( used != piece )
    177                     tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
    178             }
     165            const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
     166
     167            if( piece )
     168                tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
     169
     170            if( used != piece )
     171                tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
    179172
    180173            switch( ret )
     
    200193}
    201194
    202 #if 0
     195#define _isBool(b) (((b)==0 || (b)==1))
     196
     197static int
     198isPeerIo( const tr_peerIo * io )
     199{
     200    return ( io != NULL )
     201        && ( io->magicNumber == MAGIC_NUMBER )
     202        && ( tr_isAddress( &io->addr ) )
     203        && ( _isBool( io->isEncrypted ) )
     204        && ( _isBool( io->isIncoming ) )
     205        && ( _isBool( io->peerIdIsSet ) )
     206        && ( _isBool( io->extendedProtocolSupported ) )
     207        && ( _isBool( io->fastExtensionSupported ) );
     208}
     209
    203210static void
    204 gotErrorWrapper( struct tr_iobuf  * iobuf,
    205                  short              what,
    206                  void             * userData )
    207 {
    208     tr_peerIo * c = userData;
    209 
    210     if( c->gotError )
    211         c->gotError( iobuf, what, c->userData );
    212 }
     211event_read_cb( int fd, short event UNUSED, void * vio )
     212{
     213    int res;
     214    short what = EVBUFFER_READ;
     215    tr_peerIo * io = vio;
     216    const size_t howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, io->session->so_rcvbuf );
     217    const tr_direction dir = TR_DOWN;
     218
     219    assert( isPeerIo( io ) );
     220
     221    dbgmsg( io, "libevent says this peer is ready to read" );
     222
     223    /* if we don't have any bandwidth left, stop reading */
     224    if( howmuch < 1 ) {
     225        tr_peerIoSetEnabled( io, dir, FALSE );
     226        return;
     227    }
     228
     229    res = evbuffer_read( io->inbuf, fd, howmuch );
     230    if( res == -1 ) {
     231        if( errno == EAGAIN || errno == EINTR )
     232            goto reschedule;
     233        /* error case */
     234        what |= EVBUFFER_ERROR;
     235    } else if( res == 0 ) {
     236        /* eof case */
     237        what |= EVBUFFER_EOF;
     238    }
     239
     240    if( res <= 0 )
     241        goto error;
     242
     243    tr_peerIoSetEnabled( io, dir, TRUE );
     244
     245    /* Invoke the user callback - must always be called last */
     246    canReadWrapper( io );
     247
     248    return;
     249
     250 reschedule:
     251    tr_peerIoSetEnabled( io, dir, TRUE );
     252    return;
     253
     254 error:
     255    if( io->gotError != NULL )
     256        io->gotError( io, what, io->userData );
     257}
     258
     259static int
     260tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
     261{
     262    struct evbuffer * buffer = io->outbuf;
     263    int n = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
     264
     265#ifdef WIN32
     266    n = send(fd, buffer->buffer, n,  0 );
     267#else
     268    n = write(fd, buffer->buffer, n );
    213269#endif
     270    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(errno):"") );
     271
     272    if( n == -1 )
     273        return -1;
     274    if (n == 0)
     275        return 0;
     276    evbuffer_drain( buffer, n );
     277
     278    return n;
     279}
     280
     281static void
     282event_write_cb( int fd, short event UNUSED, void * vio )
     283{
     284    int res = 0;
     285    short what = EVBUFFER_WRITE;
     286    tr_peerIo * io = vio;
     287    size_t howmuch;
     288    const tr_direction dir = TR_UP;
     289
     290    assert( isPeerIo( io ) );
     291
     292    dbgmsg( io, "libevent says this peer is ready to write" );
     293
     294    howmuch = MIN( (size_t)io->session->so_sndbuf, EVBUFFER_LENGTH( io->outbuf ) );
     295    howmuch = tr_bandwidthClamp( io->bandwidth, dir, howmuch );
     296
     297    /* if we don't have any bandwidth left, stop writing */
     298    if( howmuch < 1 ) {
     299        tr_peerIoSetEnabled( io, dir, FALSE );
     300        return;
     301    }
     302
     303    res = tr_evbuffer_write( io, fd, howmuch );
     304    if (res == -1) {
     305#ifndef WIN32
     306/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
     307 *  *set errno. thus this error checking is not portable*/
     308        if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS)
     309            goto reschedule;
     310        /* error case */
     311        what |= EVBUFFER_ERROR;
     312
     313#else
     314        goto reschedule;
     315#endif
     316
     317    } else if (res == 0) {
     318        /* eof case */
     319        what |= EVBUFFER_EOF;
     320    }
     321    if (res <= 0)
     322        goto error;
     323
     324    if( EVBUFFER_LENGTH( io->outbuf ) )
     325        tr_peerIoSetEnabled( io, dir, TRUE );
     326
     327    didWriteWrapper( io, res );
     328    return;
     329
     330 reschedule:
     331    if( EVBUFFER_LENGTH( io->outbuf ) )
     332        tr_peerIoSetEnabled( io, dir, TRUE );
     333    return;
     334
     335 error:
     336    io->gotError( io, what, io->userData );
     337}
    214338
    215339/**
     
    217341**/
    218342
    219 #if 0
    220 static void
    221 bufevNew( tr_peerIo * io )
    222 {
    223     io->iobuf = tr_iobuf_new( io->session,
    224                               io->bandwidth,
    225                               io->socket,
    226                               EV_READ | EV_WRITE,
    227                               canReadWrapper,
    228                               didWriteWrapper,
    229                               gotErrorWrapper,
    230                               io );
    231 
    232     tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
    233 }
    234 #endif
    235 
    236 static int
    237 isPeerIo( const tr_peerIo * io )
    238 {
    239     return ( io != NULL ) && ( io->magicNumber == MAGIC_NUMBER );
    240 }
    241343
    242344static int
     
    267369    io->socket = socket;
    268370    io->isIncoming = isIncoming != 0;
    269     io->timeout = IO_TIMEOUT_SECS;
    270371    io->timeCreated = time( NULL );
    271372    io->inbuf = evbuffer_new( );
    272373    io->outbuf = evbuffer_new( );
     374    event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
     375    event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
    273376#if 0
    274377    bufevNew( io );
     
    315418    tr_peerIo * io = vio;
    316419
     420    event_del( &io->event_read );
     421    event_del( &io->event_write );
    317422    tr_peerIoSetBandwidth( io, NULL );
    318423    evbuffer_free( io->outbuf );
     
    378483}
    379484
    380 #if 0
    381 static void
    382 tr_peerIoTryRead( tr_peerIo * io )
    383 {
    384     if( EVBUFFER_LENGTH( tr_iobuf_input( io->iobuf )))
    385         (*canReadWrapper)( io->iobuf, ~0, io );
    386 }
    387 #endif
    388 
    389 void
    390 tr_peerIoSetIOFuncs( tr_peerIo *     io,
    391                      tr_can_read_cb  readcb,
    392                      tr_did_write_cb writecb,
    393                      tr_net_error_cb errcb,
    394                      void *          userData )
     485void
     486tr_peerIoSetIOFuncs( tr_peerIo        * io,
     487                     tr_can_read_cb     readcb,
     488                     tr_did_write_cb    writecb,
     489                     tr_net_error_cb    errcb,
     490                     void             * userData )
    395491{
    396492    io->canRead = readcb;
     
    398494    io->gotError = errcb;
    399495    io->userData = userData;
    400 
    401 #if 0
    402     tr_peerIoTryRead( io );
    403 #endif
    404496}
    405497
     
    436528    return -1;
    437529}
    438 
    439 #if 0
    440 void
    441 tr_peerIoSetTimeoutSecs( tr_peerIo * io,
    442                          int         secs )
    443 {
    444     io->timeout = secs;
    445     tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
    446     tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
    447 }
    448 #endif
    449530
    450531/**
     
    564645    const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
    565646    const double period = 20; /* arbitrary */
    566     return MAX( maxBlockSize*20.5, currentSpeed*1024*period );
     647    const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */
     648    return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period );
    567649}
    568650
     
    805887    dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(errno):"") );
    806888
    807     if( res > 0 )
    808         canReadWrapper( io, res, io );
     889    if( EVBUFFER_LENGTH( io->inbuf ) )
     890        canReadWrapper( io );
    809891
    810892    if( ( res <= 0 ) && ( io->gotError ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) )
     
    827909
    828910    howmuch = tr_bandwidthClamp( io->bandwidth, TR_UP, howmuch );
    829     howmuch = MIN( howmuch, EVBUFFER_LENGTH( io->outbuf ) );
    830     n = (int) howmuch;
    831 
    832 #ifdef WIN32
    833     n = send( io->socket, EVBUFFER_DATA( io->outbuf ), n,  0 );
    834 #else
    835     n = write( io->socket, EVBUFFER_DATA( io->outbuf ),  n );
    836 #endif
    837     dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(errno):"") );
     911
     912    n = tr_evbuffer_write( io, io->socket, (int)howmuch );
    838913
    839914    if( n > 0 )
    840     {
    841         evbuffer_drain( io->outbuf, n );
    842 
    843         didWriteWrapper( NULL, n, io );
    844     }
    845 
    846     if( ( n < 0 ) && ( io->gotError ) && ( errno != EPIPE ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) )
    847     {
     915        didWriteWrapper( io, n );
     916
     917    if( ( n < 0 ) && ( io->gotError ) && ( errno != EPIPE ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) ) {
    848918        short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
    849919        io->gotError( io, what, io->userData );
     
    876946    return io->inbuf;
    877947}
     948
     949tr_bool
     950tr_peerIoHasBandwidthLeft( const tr_peerIo * io, tr_direction dir )
     951{
     952    assert( isPeerIo( io ) );
     953    assert( dir==TR_UP || dir==TR_DOWN );
     954
     955    return tr_bandwidthClamp( io->bandwidth, dir, 1024 ) > 0;
     956}
     957
     958/***
     959****
     960****/
     961
     962static void
     963event_enable( tr_peerIo * io, short event )
     964{
     965    assert( isPeerIo( io ) );
     966
     967    if( event & EV_READ )
     968        event_add( &io->event_read, NULL );
     969
     970    if( event & EV_WRITE )
     971        event_add( &io->event_write, NULL );
     972}
     973
     974static void
     975event_disable( struct tr_peerIo * io, short event )
     976{
     977    assert( isPeerIo( io ) );
     978
     979    if( event & EV_READ )
     980        event_del( &io->event_read );
     981
     982    if( event & EV_WRITE )
     983        event_del( &io->event_write );
     984}
     985
     986
     987void
     988tr_peerIoSetEnabled( tr_peerIo    * io,
     989                     tr_direction   dir,
     990                     tr_bool        isEnabled )
     991{
     992    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
     993
     994    if( isEnabled )
     995        event_enable( io, event );
     996    else
     997        event_disable( io, event );
     998}
Note: See TracChangeset for help on using the changeset viewer.