Ignore:
Timestamp:
Sep 17, 2008, 7:44:24 PM (14 years ago)
Author:
charles
Message:

first draft at having more accurate speed controls

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/libtransmission/peer-io.c

    r6453 r6782  
    1212
    1313#include <assert.h>
     14#include <limits.h> /* INT_MAX */
    1415#include <string.h>
    1516#include <stdio.h>
     
    3536#define IO_TIMEOUT_SECS 8
    3637
    37 /* the size of a typical request message */
    38 #define TR_RDBUF ((1024*16) + 13)
    39 
    40 /**
    41 ***
    42 **/
     38/**
     39***
     40**/
     41
     42#define dbgmsg(io, fmt...) \
     43    tr_deepLog( __FILE__,__LINE__, tr_peerIoGetAddrStr(io), ##fmt )
     44
     45struct tr_bandwidth
     46{
     47    unsigned int          isUnlimited : 1;
     48    size_t                bytesUsed;
     49    size_t                bytesLeft;
     50};
    4351
    4452struct tr_peerIo
     
    5866    time_t                timeCreated;
    5967
    60     struct tr_handle    * handle;
     68    tr_session          * session;
    6169
    6270    struct in_addr        in_addr;
    6371    struct bufferevent  * bufev;
     72    struct evbuffer     * output;
    6473
    6574    tr_can_read_cb        canRead;
     
    6877    void                * userData;
    6978
     79    size_t                bufferSize[2];
     80
     81    struct tr_bandwidth   bandwidth[2];
     82    tr_ratecontrol      * speedometer[2];
     83
    7084    tr_crypto           * crypto;
    7185};
     
    7690
    7791static void
    78 didWriteWrapper( struct bufferevent * e, void * userData )
    79 {
    80     tr_peerIo * c = (tr_peerIo *) userData;
    81     if( c->didWrite )
    82         c->didWrite( e, c->userData );
     92adjustOutputBuffer( tr_peerIo * io )
     93{
     94    struct evbuffer * live = EVBUFFER_OUTPUT( io->bufev );
     95
     96    if( io->bandwidth[TR_UP].isUnlimited )
     97    {
     98        bufferevent_write_buffer( io->bufev, io->output );
     99    }
     100    else if( io->bandwidth[TR_UP].bytesLeft > EVBUFFER_LENGTH( live ) )
     101    {
     102        /* there's free space in bufev's output buffer;
     103           try to fill it up */
     104        const size_t desiredLength = io->bandwidth[TR_UP].bytesLeft;
     105        const size_t under = desiredLength - EVBUFFER_LENGTH( live );
     106        const size_t n = MIN( under, EVBUFFER_LENGTH( io->output ) );
     107        bufferevent_write( io->bufev, EVBUFFER_DATA( io->output ), n );
     108        evbuffer_drain( io->output, n );
     109    }
     110    else if( io->bandwidth[TR_UP].bytesLeft < EVBUFFER_LENGTH( live ) )
     111    {
     112        /* bufev's output buffer exceeds our bandwidth allocation;
     113           move the excess out of bufev so it can't be sent yet */
     114        const size_t desiredLength = io->bandwidth[TR_UP].bytesLeft;
     115        const size_t over = EVBUFFER_LENGTH( live ) - desiredLength;
     116        struct evbuffer * buf = evbuffer_new( );
     117        evbuffer_add( buf, EVBUFFER_DATA( live ) + desiredLength, over );
     118        evbuffer_add_buffer( buf, io->output );
     119        evbuffer_free( io->output );
     120        io->output = buf;
     121        EVBUFFER_LENGTH( live ) = desiredLength;
     122    }
     123    else if( EVBUFFER_LENGTH( live ) )
     124    {
     125        bufferevent_enable( io->bufev, EV_WRITE );
     126    }
     127
     128    io->bufferSize[TR_UP] = EVBUFFER_LENGTH( live );
     129
     130    dbgmsg( io, "after adjusting the output buffer, its size is now %zu",
     131            io->bufferSize[TR_UP] );
    83132}
    84133
    85134static void
    86 canReadWrapper( struct bufferevent * e, void * userData )
     135adjustInputBuffer( tr_peerIo * io )
     136{
     137    if( io->bandwidth[TR_DOWN].isUnlimited )
     138    {
     139        dbgmsg( io, "unlimited reading..." );
     140        bufferevent_setwatermark( io->bufev, EV_READ, 0, 0 );
     141        bufferevent_enable( io->bufev, EV_READ );
     142    }
     143    else
     144    {
     145        const size_t n = io->bandwidth[TR_DOWN].bytesLeft;
     146        if( n == 0 )
     147        {
     148            dbgmsg( io, "disabling reads because we've hit our limit" );
     149            bufferevent_disable( io->bufev, EV_READ );
     150        }
     151        else
     152        {
     153            dbgmsg( io, "enabling reading of %zu more bytes", n );
     154            bufferevent_setwatermark( io->bufev, EV_READ, 0, n );
     155            bufferevent_enable( io->bufev, EV_READ );
     156        }
     157    }
     158}
     159
     160/***
     161****
     162***/
     163
     164static void
     165didWriteWrapper( struct bufferevent * e, void * vio )
     166{
     167    tr_peerIo * io = vio;
     168    const size_t len = EVBUFFER_LENGTH( EVBUFFER_OUTPUT( e ) );
     169
     170    dbgmsg( io, "didWrite... io->outputBufferSize was %zu, is now %zu",
     171            io->bufferSize[TR_UP], len );
     172
     173    if( len < io->bufferSize[TR_UP] )
     174    {
     175        const size_t n = io->bufferSize[TR_UP] - len;
     176        struct tr_bandwidth * b = &io->bandwidth[TR_UP];
     177        b->bytesLeft -= MIN( b->bytesLeft, (size_t)n );
     178        b->bytesUsed += n;
     179        tr_rcTransferred( io->speedometer[TR_UP], n );
     180        dbgmsg( io, "wrote %zu bytes to peer... upload bytesLeft is now %zu",
     181                n, b->bytesLeft );
     182    }
     183
     184    adjustOutputBuffer( io );
     185
     186    if( io->didWrite )
     187        io->didWrite( e, io->userData );
     188}
     189
     190static void
     191canReadWrapper( struct bufferevent * e, void * vio )
    87192{
    88193    int done = 0;
    89     tr_peerIo * c = userData;
    90     tr_handle * handle = c->handle;
    91 
    92     if( c->canRead == NULL )
    93         return;
    94 
    95     tr_globalLock( handle );
    96 
    97     while( !done )
    98     {
    99         const int ret = c->canRead( e, c->userData );
    100 
    101         switch( ret )
     194    tr_peerIo * io = vio;
     195    tr_session * session = io->session;
     196    const size_t len = EVBUFFER_LENGTH( EVBUFFER_INPUT( e ) );
     197
     198    dbgmsg( io, "canRead" );
     199
     200    /* if the input buffer has grown, record the bytes that were read */
     201    if( len > io->bufferSize[TR_DOWN] )
     202    {
     203        const size_t n = len - io->bufferSize[TR_DOWN];
     204        struct tr_bandwidth * b = io->bandwidth + TR_DOWN;
     205        b->bytesLeft -= MIN( b->bytesLeft, (size_t)n );
     206        b->bytesUsed += n;
     207        tr_rcTransferred( io->speedometer[TR_DOWN], n );
     208        dbgmsg( io, "%zu new input bytes. bytesUsed is %zu, bytesLeft is %zu",
     209                n, b->bytesUsed, b->bytesLeft );
     210
     211        adjustInputBuffer( io );
     212    }
     213
     214    /* try to consume the input buffer */
     215    if( io->canRead )
     216    {
     217        tr_globalLock( session );
     218
     219        while( !done )
    102220        {
    103             case READ_AGAIN:
    104                 if( EVBUFFER_LENGTH( e->input ) )
    105                     continue;
    106             case READ_MORE:
    107             case READ_DONE:
    108                 done = 1;
     221            const int ret = io->canRead( e, io->userData );
     222
     223            switch( ret )
     224            {
     225                case READ_AGAIN:
     226                    if( EVBUFFER_LENGTH( e->input ) )
     227                        continue;
     228                case READ_MORE:
     229                case READ_DONE:
     230                    done = 1;
     231            }
    109232        }
    110     }
    111 
    112     tr_globalUnlock( handle );
     233
     234        tr_globalUnlock( session );
     235    }
     236
     237    io->bufferSize[TR_DOWN] = EVBUFFER_LENGTH( EVBUFFER_INPUT( e ) );
    113238}
    114239
     
    125250**/
    126251
    127 void bufferevent_setwatermark(struct bufferevent *, short, size_t, size_t);
     252static void
     253bufevNew( tr_peerIo * io )
     254{
     255    io->bufev = bufferevent_new( io->socket,
     256                                 canReadWrapper,
     257                                 didWriteWrapper,
     258                                 gotErrorWrapper,
     259                                 io );
     260
     261    /* tell libevent to call didWriteWrapper after every write,
     262     * not just when the write buffer is empty */
     263    bufferevent_setwatermark( io->bufev, EV_WRITE, INT_MAX, 0 );
     264
     265    bufferevent_settimeout( io->bufev, io->timeout, io->timeout );
     266
     267    bufferevent_enable( io->bufev, EV_READ|EV_WRITE );
     268}
    128269
    129270static tr_peerIo*
    130 tr_peerIoNew( struct tr_handle     * handle,
    131               const struct in_addr * in_addr,
    132               uint16_t               port,
    133               const uint8_t        * torrentHash,
    134               int                    isIncoming,
    135               int                    socket )
    136 {
    137     tr_peerIo * c;
     271tr_peerIoNew( tr_session             * session,
     272              const struct in_addr   * in_addr,
     273              uint16_t                 port,
     274              const uint8_t          * torrentHash,
     275              int                      isIncoming,
     276              int                      socket )
     277{
     278    tr_peerIo * io;
    138279
    139280    if( socket >= 0 )
    140         tr_netSetTOS( socket, handle->peerSocketTOS );
    141 
    142     c = tr_new0( tr_peerIo, 1 );
    143     c->crypto = tr_cryptoNew( torrentHash, isIncoming );
    144     c->handle = handle;
    145     c->in_addr = *in_addr;
    146     c->port = port;
    147     c->socket = socket;
    148     c->isIncoming = isIncoming ? 1 : 0;
    149     c->timeout = IO_TIMEOUT_SECS;
    150     c->timeCreated = time( NULL );
    151     c->bufev = bufferevent_new( c->socket,
    152                                 canReadWrapper,
    153                                 didWriteWrapper,
    154                                 gotErrorWrapper,
    155                                 c );
    156     bufferevent_settimeout( c->bufev, c->timeout, c->timeout );
    157     bufferevent_enable( c->bufev, EV_READ|EV_WRITE );
    158     bufferevent_setwatermark( c->bufev, EV_READ, 0, TR_RDBUF );
    159 
    160     return c;
     281        tr_netSetTOS( socket, session->peerSocketTOS );
     282
     283    io = tr_new0( tr_peerIo, 1 );
     284    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
     285    io->session = session;
     286    io->in_addr = *in_addr;
     287    io->port = port;
     288    io->socket = socket;
     289    io->isIncoming = isIncoming != 0;
     290    io->timeout = IO_TIMEOUT_SECS;
     291    io->timeCreated = time( NULL );
     292    io->output = evbuffer_new( );
     293    io->bandwidth[TR_UP].isUnlimited = 1;
     294    io->bandwidth[TR_DOWN].isUnlimited = 1;
     295    io->speedometer[TR_UP] = tr_rcInit( );
     296    io->speedometer[TR_DOWN] = tr_rcInit( );
     297    bufevNew( io );
     298    return io;
    161299}
    162300
    163301tr_peerIo*
    164 tr_peerIoNewIncoming( struct tr_handle      * handle,
     302tr_peerIoNewIncoming( tr_session            * session,
    165303                      const struct in_addr  * in_addr,
    166304                      uint16_t                port,
    167305                      int                     socket )
    168306{
    169     assert( handle );
     307    assert( session );
    170308    assert( in_addr );
    171309    assert( socket >= 0 );
    172310
    173     return tr_peerIoNew( handle, in_addr, port,
     311    return tr_peerIoNew( session, in_addr, port,
    174312                         NULL, 1,
    175313                         socket );
     
    177315
    178316tr_peerIo*
    179 tr_peerIoNewOutgoing( struct tr_handle      * handle,
     317tr_peerIoNewOutgoing( tr_session            * session,
    180318                      const struct in_addr  * in_addr,
    181319                      int                     port,
     
    184322    int socket;
    185323
    186     assert( handle );
     324    assert( session );
    187325    assert( in_addr );
    188326    assert( port >= 0 );
     
    193331    return socket < 0
    194332        ? NULL
    195         : tr_peerIoNew( handle, in_addr, port, torrentHash, 0, socket );
     333        : tr_peerIoNew( session, in_addr, port, torrentHash, 0, socket );
    196334}
    197335
     
    201339    tr_peerIo * io = vio;
    202340
     341    tr_rcClose( io->speedometer[TR_DOWN] );
     342    tr_rcClose( io->speedometer[TR_UP] );
     343    evbuffer_free( io->output );
    203344    bufferevent_free( io->bufev );
    204345    tr_netClose( io->socket );
     
    215356        io->didWrite = NULL;
    216357        io->gotError = NULL;
    217         tr_runInEventThread( io->handle, io_dtor, io );
    218     }
    219 }
    220 
    221 tr_handle*
    222 tr_peerIoGetHandle( tr_peerIo * io )
    223 {
    224     assert( io );
    225     assert( io->handle );
    226 
    227     return io->handle;
     358        tr_runInEventThread( io->session, io_dtor, io );
     359    }
     360}
     361
     362tr_session*
     363tr_peerIoGetSession( tr_peerIo * io )
     364{
     365    assert( io );
     366    assert( io->session );
     367
     368    return io->session;
    228369}
    229370
     
    253394}
    254395
    255 void
     396static void
    256397tr_peerIoTryRead( tr_peerIo * io )
    257398{
     
    293434    if( io->socket >= 0 )
    294435    {
    295         tr_netSetTOS( io->socket, io->handle->peerSocketTOS );
     436        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
    296437
    297438        bufferevent_free( io->bufev );
    298 
    299         io->bufev = bufferevent_new( io->socket,
    300                                      canReadWrapper,
    301                                      didWriteWrapper,
    302                                      gotErrorWrapper,
    303                                      io );
    304         bufferevent_settimeout( io->bufev, io->timeout, io->timeout );
    305         bufferevent_enable( io->bufev, EV_READ|EV_WRITE );
    306         bufferevent_setwatermark( io->bufev, EV_READ, 0, TR_RDBUF );
    307 
     439        bufevNew( io );
    308440        return 0;
    309441    }
     
    413545    return io->fastPeersSupported;
    414546}
     547
    415548/**
    416549***
     
    418551
    419552size_t
    420 tr_peerIoWriteBytesWaiting( const tr_peerIo * io )
    421 {
    422     return EVBUFFER_LENGTH( EVBUFFER_OUTPUT( io->bufev ) );
    423 }
    424  
    425 void
    426 tr_peerIoWrite( tr_peerIo   * io,
    427                 const void  * writeme,
    428                 int           writeme_len )
    429 {
    430     assert( tr_amInEventThread( io->handle ) );
    431     bufferevent_write( io->bufev, writeme, writeme_len );
    432 }
    433 
    434 void
    435 tr_peerIoWriteBuf( tr_peerIo       * io,
    436                    struct evbuffer * buf )
    437 {
    438     const size_t n = EVBUFFER_LENGTH( buf );
    439     tr_peerIoWrite( io, EVBUFFER_DATA(buf), n );
    440     evbuffer_drain( buf, n );
    441 }
     553tr_peerIoGetBandwidthUsed( const tr_peerIo  * io,
     554                           tr_direction       direction )
     555{
     556    assert( io );
     557    assert( direction==TR_UP || direction==TR_DOWN );
     558    return io->bandwidth[direction].bytesUsed;
     559}
     560
     561size_t
     562tr_peerIoGetBandwidthLeft( const tr_peerIo  * io,
     563                           tr_direction       direction )
     564{
     565    assert( io );
     566    assert( direction==TR_UP || direction==TR_DOWN );
     567    return io->bandwidth[direction].bytesLeft;
     568}
     569
     570size_t
     571tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
     572{
     573    const size_t desiredBufferLen = 4096;
     574    const size_t currentLiveLen = EVBUFFER_LENGTH( EVBUFFER_OUTPUT( io->bufev ) );
     575
     576    const size_t desiredLiveLen = tr_peerIoGetBandwidthLeft( io, TR_UP );
     577    const size_t currentLbufLen = EVBUFFER_LENGTH( io->output );
     578
     579    const size_t desiredLen = desiredBufferLen + desiredLiveLen;
     580    const size_t currentLen = currentLiveLen + currentLbufLen;
     581
     582    size_t freeSpace = 0;
     583
     584    if( desiredLen > currentLen )
     585        freeSpace = desiredLen - currentLen;
     586    else
     587        freeSpace = 0;
     588
     589    return freeSpace;
     590}
     591
     592void
     593tr_peerIoSetBandwidth( tr_peerIo     * io,
     594                       tr_direction    direction,
     595                       size_t          bytesLeft )
     596{
     597    struct tr_bandwidth * b;
     598
     599    assert( io );
     600    assert( direction==TR_UP || direction==TR_DOWN );
     601
     602    b = io->bandwidth + direction;   
     603    b->isUnlimited = 0;
     604    b->bytesUsed = 0;
     605    b->bytesLeft = bytesLeft;
     606
     607    adjustOutputBuffer( io );
     608    adjustInputBuffer( io );
     609}
     610
     611void
     612tr_peerIoSetBandwidthUnlimited( tr_peerIo     * io,
     613                                tr_direction    direction )
     614{
     615    struct tr_bandwidth * b;
     616
     617    assert( io );
     618    assert( direction==TR_UP || direction==TR_DOWN );
     619
     620    b = io->bandwidth + direction;
     621    b->isUnlimited = 1;
     622    b->bytesUsed = 0;
     623    b->bytesLeft = 0;
     624
     625    adjustInputBuffer( io );
     626    adjustOutputBuffer( io );
     627}
     628
     629double
     630tr_peerIoGetRateToClient( const tr_peerIo * io )
     631{
     632    return tr_rcRate( io->speedometer[TR_DOWN] );
     633}
     634
     635double
     636tr_peerIoGetRateToPeer( const tr_peerIo * io )
     637{
     638    return tr_rcRate( io->speedometer[TR_UP] );
     639}
     640
    442641
    443642/**
     
    456655{
    457656    assert( io );
    458     assert( encryptionMode==PEER_ENCRYPTION_NONE || encryptionMode==PEER_ENCRYPTION_RC4 );
     657    assert( encryptionMode==PEER_ENCRYPTION_NONE
     658         || encryptionMode==PEER_ENCRYPTION_RC4 );
    459659
    460660    io->encryptionMode = encryptionMode;
     
    465665{
    466666    return io!=NULL && io->encryptionMode==PEER_ENCRYPTION_RC4;
     667}
     668
     669/**
     670***
     671**/
     672
     673int
     674tr_peerIoWantsBandwidth( const tr_peerIo * io, tr_direction direction )
     675{
     676    assert( direction==TR_UP || direction==TR_DOWN );
     677
     678    if( direction == TR_DOWN )
     679    {
     680        return TRUE; /* FIXME -- is there a good way to test for this? */
     681    }
     682    else
     683    {
     684        return EVBUFFER_LENGTH( EVBUFFER_OUTPUT( io->bufev ) )
     685            || EVBUFFER_LENGTH( io->output );
     686    }
     687}
     688
     689void
     690tr_peerIoWrite( tr_peerIo   * io,
     691                const void  * writeme,
     692                size_t        writemeLen )
     693{
     694    assert( tr_amInEventThread( io->session ) );
     695    dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
     696
     697    if( io->bandwidth[TR_UP].isUnlimited )
     698        bufferevent_write( io->bufev, writeme, writemeLen );
     699    else
     700        evbuffer_add( io->output, writeme, writemeLen );
     701
     702    adjustOutputBuffer( io );
     703}
     704
     705void
     706tr_peerIoWriteBuf( tr_peerIo       * io,
     707                   struct evbuffer * buf )
     708{
     709    const size_t n = EVBUFFER_LENGTH( buf );
     710    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n );
     711    evbuffer_drain( buf, n );
    467712}
    468713
Note: See TracChangeset for help on using the changeset viewer.