Ignore:
Timestamp:
Dec 22, 2008, 12:51:14 AM (12 years ago)
Author:
charles
Message:

(1.4x libT) backport handshake, peer, bandwidth, peer-io to 1.4x.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/1.4x/libtransmission/peer-io.c

    r7403 r7455  
    11/*
    2  * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
     2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
    33 *
    44 * This file is licensed by the GPL version 2.  Works owned by the
     
    2020 #include <winsock2.h>
    2121#else
    22  #include <netinet/in.h> /* struct in_addr */
    2322 #include <arpa/inet.h> /* inet_ntoa */
    2423#endif
     
    2928#include "bandwidth.h"
    3029#include "crypto.h"
    31 #include "iobuf.h"
    3230#include "list.h"
    3331#include "net.h"
     
    3735
    3836#define MAGIC_NUMBER 206745
    39 #define IO_TIMEOUT_SECS 8
    4037
    4138static size_t
     
    8178struct tr_peerIo
    8279{
    83     tr_bool                  isEncrypted;
    84     tr_bool                  isIncoming;
    85     tr_bool                  peerIdIsSet;
    86     tr_bool                  extendedProtocolSupported;
    87     tr_bool                  fastPeersSupported;
    88 
    89     int                      magicNumber;
    90 
    91     uint8_t                  encryptionMode;
    92     uint8_t                  timeout;
    93     uint16_t                 port;
    94     int                      socket;
    95 
    96     uint8_t                  peerId[20];
    97     time_t                   timeCreated;
    98 
    99     tr_session             * session;
    100 
    101     struct in_addr           in_addr;
    102     struct tr_iobuf        * iobuf;
    103     tr_list                * output_datatypes; /* struct tr_datatype */
    104 
    105     tr_can_read_cb           canRead;
    106     tr_did_write_cb          didWrite;
    107     tr_net_error_cb          gotError;
    108     void *                   userData;
    109 
    110     size_t                   bufferSize[2];
    111 
    112     tr_bandwidth           * bandwidth;
    113     tr_crypto              * crypto;
     80    tr_bool            isEncrypted;
     81    tr_bool            isIncoming;
     82    tr_bool            peerIdIsSet;
     83    tr_bool            extendedProtocolSupported;
     84    tr_bool            fastExtensionSupported;
     85
     86    int                magicNumber;
     87
     88    uint8_t            encryptionMode;
     89    tr_port            port;
     90    int                socket;
     91
     92    uint8_t            peerId[20];
     93    time_t             timeCreated;
     94
     95    tr_session       * session;
     96
     97    tr_address         addr;
     98    tr_list          * output_datatypes; /* struct tr_datatype */
     99
     100    tr_can_read_cb     canRead;
     101    tr_did_write_cb    didWrite;
     102    tr_net_error_cb    gotError;
     103    void *             userData;
     104
     105    size_t             bufferSize[2];
     106
     107    tr_bandwidth     * bandwidth;
     108    tr_crypto        * crypto;
     109
     110    struct evbuffer  * inbuf;
     111    struct evbuffer  * outbuf;
     112
     113    struct event       event_read;
     114    struct event       event_write;
    114115};
    115116
     
    119120
    120121static void
    121 didWriteWrapper( struct tr_iobuf  * iobuf,
    122                  size_t             bytes_transferred,
    123                  void             * vio )
    124 {
    125     tr_peerIo *  io = vio;
    126 
     122didWriteWrapper( tr_peerIo * io, size_t bytes_transferred )
     123{
    127124    while( bytes_transferred )
    128125    {
     
    144141            tr_free( tr_list_pop_front( &io->output_datatypes ) );
    145142    }
    146 
    147     if( EVBUFFER_LENGTH( tr_iobuf_output( iobuf ) ) )
    148         tr_iobuf_enable( io->iobuf, EV_WRITE );
    149143}
    150144
    151145static void
    152 canReadWrapper( struct tr_iobuf  * iobuf,
    153                 size_t             bytes_transferred UNUSED,
    154                 void              * vio )
    155 {
    156     int          done = 0;
    157     int          err = 0;
    158     tr_peerIo *  io = vio;
     146canReadWrapper( tr_peerIo * io )
     147{
     148    tr_bool done = 0;
     149    tr_bool err = 0;
    159150    tr_session * session = io->session;
    160151
     
    169160        {
    170161            size_t piece = 0;
    171             const size_t oldLen = EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) );
    172             const int ret = io->canRead( iobuf, io->userData, &piece );
    173 
    174             if( ret != READ_ERR )
    175             {
    176                 const size_t used = oldLen - EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) );
    177                 if( piece )
    178                     tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
    179                 if( used != piece )
    180                     tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
    181             }
     162            const size_t oldLen = EVBUFFER_LENGTH( io->inbuf );
     163            const int ret = io->canRead( io, io->userData, &piece );
     164
     165            const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
     166
     167            if( piece )
     168                tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
     169
     170            if( used != piece )
     171                tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
    182172
    183173            switch( ret )
    184174            {
    185175                case READ_NOW:
    186                     if( EVBUFFER_LENGTH( tr_iobuf_input( iobuf )))
     176                    if( EVBUFFER_LENGTH( io->inbuf ) )
    187177                        continue;
    188178                    done = 1;
     
    203193}
    204194
     195#define _isBool(b) (((b)==0 || (b)==1))
     196
     197tr_bool
     198tr_isPeerIo( const tr_peerIo * io )
     199{
     200    return ( io != NULL )
     201        && ( io->magicNumber == MAGIC_NUMBER )
     202        && ( _isBool( io->isEncrypted ) )
     203        && ( _isBool( io->isIncoming ) )
     204        && ( _isBool( io->peerIdIsSet ) )
     205        && ( _isBool( io->extendedProtocolSupported ) )
     206        && ( _isBool( io->fastExtensionSupported ) );
     207}
     208
    205209static void
    206 gotErrorWrapper( struct tr_iobuf  * iobuf,
    207                  short              what,
    208                  void             * userData )
    209 {
    210     tr_peerIo * c = userData;
    211 
    212     if( c->gotError )
    213         c->gotError( iobuf, what, c->userData );
     210event_read_cb( int fd, short event UNUSED, void * vio )
     211{
     212    int res;
     213    short what = EVBUFFER_READ;
     214    tr_peerIo * io = vio;
     215    const size_t howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, io->session->so_rcvbuf );
     216    const tr_direction dir = TR_DOWN;
     217
     218    assert( tr_isPeerIo( io ) );
     219
     220    dbgmsg( io, "libevent says this peer is ready to read" );
     221
     222    /* if we don't have any bandwidth left, stop reading */
     223    if( howmuch < 1 ) {
     224        tr_peerIoSetEnabled( io, dir, FALSE );
     225        return;
     226    }
     227
     228    res = evbuffer_read( io->inbuf, fd, howmuch );
     229    if( res == -1 ) {
     230        if( errno == EAGAIN || errno == EINTR )
     231            goto reschedule;
     232        /* error case */
     233        what |= EVBUFFER_ERROR;
     234    } else if( res == 0 ) {
     235        /* eof case */
     236        what |= EVBUFFER_EOF;
     237    }
     238
     239    if( res <= 0 )
     240        goto error;
     241
     242    tr_peerIoSetEnabled( io, dir, TRUE );
     243
     244    /* Invoke the user callback - must always be called last */
     245    canReadWrapper( io );
     246
     247    return;
     248
     249 reschedule:
     250    tr_peerIoSetEnabled( io, dir, TRUE );
     251    return;
     252
     253 error:
     254    if( io->gotError != NULL )
     255        io->gotError( io, what, io->userData );
     256}
     257
     258static int
     259tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
     260{
     261    struct evbuffer * buffer = io->outbuf;
     262    int n = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
     263
     264#ifdef WIN32
     265    n = send(fd, buffer->buffer, n,  0 );
     266#else
     267    n = write(fd, buffer->buffer, n );
     268#endif
     269    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(errno):"") );
     270
     271    if( n == -1 )
     272        return -1;
     273    if (n == 0)
     274        return 0;
     275    evbuffer_drain( buffer, n );
     276
     277    return n;
     278}
     279
     280static void
     281event_write_cb( int fd, short event UNUSED, void * vio )
     282{
     283    int res = 0;
     284    short what = EVBUFFER_WRITE;
     285    tr_peerIo * io = vio;
     286    size_t howmuch;
     287    const tr_direction dir = TR_UP;
     288
     289    assert( tr_isPeerIo( io ) );
     290
     291    dbgmsg( io, "libevent says this peer is ready to write" );
     292
     293    howmuch = MIN( (size_t)io->session->so_sndbuf, EVBUFFER_LENGTH( io->outbuf ) );
     294    howmuch = tr_bandwidthClamp( io->bandwidth, dir, howmuch );
     295
     296    /* if we don't have any bandwidth left, stop writing */
     297    if( howmuch < 1 ) {
     298        tr_peerIoSetEnabled( io, dir, FALSE );
     299        return;
     300    }
     301
     302    res = tr_evbuffer_write( io, fd, howmuch );
     303    if (res == -1) {
     304#ifndef WIN32
     305/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
     306 *  *set errno. thus this error checking is not portable*/
     307        if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS)
     308            goto reschedule;
     309        /* error case */
     310        what |= EVBUFFER_ERROR;
     311
     312#else
     313        goto reschedule;
     314#endif
     315
     316    } else if (res == 0) {
     317        /* eof case */
     318        what |= EVBUFFER_EOF;
     319    }
     320    if (res <= 0)
     321        goto error;
     322
     323    if( EVBUFFER_LENGTH( io->outbuf ) )
     324        tr_peerIoSetEnabled( io, dir, TRUE );
     325
     326    didWriteWrapper( io, res );
     327    return;
     328
     329 reschedule:
     330    if( EVBUFFER_LENGTH( io->outbuf ) )
     331        tr_peerIoSetEnabled( io, dir, TRUE );
     332    return;
     333
     334 error:
     335    if( io->gotError != NULL )
     336        io->gotError( io, what, io->userData );
    214337}
    215338
     
    218341**/
    219342
    220 static void
    221 bufevNew( tr_peerIo * io )
    222 {
    223     io->iobuf = tr_iobuf_new( io->session,
    224                               io->bandwidth,
    225                               io->socket,
    226                               EV_READ | EV_WRITE,
    227                               canReadWrapper,
    228                               didWriteWrapper,
    229                               gotErrorWrapper,
    230                               io );
    231 
    232     tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
    233 }
    234343
    235344static int
    236 isPeerIo( const tr_peerIo * io )
    237 {
    238     return ( io != NULL ) && ( io->magicNumber == MAGIC_NUMBER );
     345isFlag( int flag )
     346{
     347    return( ( flag == 0 ) || ( flag == 1 ) );
    239348}
    240349
    241350static tr_peerIo*
    242 tr_peerIoNew( tr_session *          session,
    243               const struct in_addr * in_addr,
    244               uint16_t               port,
    245               const uint8_t *        torrentHash,
    246               int                    isIncoming,
    247               int                    socket )
     351tr_peerIoNew( tr_session       * session,
     352              const tr_address * addr,
     353              tr_port            port,
     354              const uint8_t    * torrentHash,
     355              int                isIncoming,
     356              int                socket )
    248357{
    249358    tr_peerIo * io;
     
    256365    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
    257366    io->session = session;
    258     io->in_addr = *in_addr;
     367    io->addr = *addr;
    259368    io->port = port;
    260369    io->socket = socket;
    261370    io->isIncoming = isIncoming != 0;
    262     io->timeout = IO_TIMEOUT_SECS;
    263371    io->timeCreated = time( NULL );
     372    io->inbuf = evbuffer_new( );
     373    io->outbuf = evbuffer_new( );
     374    event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
     375    event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
     376#if 0
    264377    bufevNew( io );
     378#endif
    265379    tr_peerIoSetBandwidth( io, session->bandwidth );
    266380    return io;
     
    268382
    269383tr_peerIo*
    270 tr_peerIoNewIncoming( tr_session *          session,
    271                       const struct in_addr * in_addr,
    272                       uint16_t               port,
    273                       int                    socket )
     384tr_peerIoNewIncoming( tr_session       * session,
     385                      const tr_address * addr,
     386                      tr_port            port,
     387                      int                socket )
    274388{
    275389    assert( session );
    276     assert( in_addr );
     390    assert( addr );
    277391    assert( socket >= 0 );
    278392
    279     return tr_peerIoNew( session, in_addr, port,
    280                          NULL, 1,
    281                          socket );
     393    return tr_peerIoNew( session, addr, port, NULL, 1, socket );
    282394}
    283395
    284396tr_peerIo*
    285 tr_peerIoNewOutgoing( tr_session *          session,
    286                       const struct in_addr * in_addr,
    287                       int                    port,
    288                       const uint8_t *        torrentHash )
     397tr_peerIoNewOutgoing( tr_session       * session,
     398                      const tr_address * addr,
     399                      tr_port            port,
     400                      const uint8_t    * torrentHash )
    289401{
    290402    int socket;
    291403
    292404    assert( session );
    293     assert( in_addr );
    294     assert( port >= 0 );
     405    assert( addr );
    295406    assert( torrentHash );
    296407
    297     socket = tr_netOpenTCP( session, in_addr, port );
     408    socket = tr_netOpenTCP( session, addr, port );
    298409
    299410    return socket < 0
    300411           ? NULL
    301            : tr_peerIoNew( session, in_addr, port, torrentHash, 0, socket );
     412           : tr_peerIoNew( session, addr, port, torrentHash, 0, socket );
    302413}
    303414
     
    307418    tr_peerIo * io = vio;
    308419
     420    event_del( &io->event_read );
     421    event_del( &io->event_write );
    309422    tr_peerIoSetBandwidth( io, NULL );
    310     tr_iobuf_free( io->iobuf );
     423    evbuffer_free( io->outbuf );
     424    evbuffer_free( io->inbuf );
    311425    tr_netClose( io->socket );
    312426    tr_cryptoFree( io->crypto );
     
    332446tr_peerIoGetSession( tr_peerIo * io )
    333447{
    334     assert( isPeerIo( io ) );
     448    assert( tr_isPeerIo( io ) );
    335449    assert( io->session );
    336450
     
    338452}
    339453
    340 const struct in_addr*
     454const tr_address*
    341455tr_peerIoGetAddress( const tr_peerIo * io,
    342                            uint16_t * port )
    343 {
    344     assert( isPeerIo( io ) );
     456                           tr_port  * port )
     457{
     458    assert( tr_isPeerIo( io ) );
    345459
    346460    if( port )
    347461        *port = io->port;
    348462
    349     return &io->in_addr;
     463    return &io->addr;
    350464}
    351465
    352466const char*
    353 tr_peerIoAddrStr( const struct in_addr * addr,
    354                   uint16_t               port )
     467tr_peerIoAddrStr( const tr_address * addr, tr_port port )
    355468{
    356469    static char buf[512];
    357 
    358     tr_snprintf( buf, sizeof( buf ), "%s:%u", inet_ntoa( *addr ),
    359                 ntohs( port ) );
     470    tr_snprintf( buf, sizeof( buf ), "%s:%u", inet_ntoa( *addr ), ntohs( port ) );
    360471    return buf;
    361472}
     
    364475tr_peerIoGetAddrStr( const tr_peerIo * io )
    365476{
    366     return tr_peerIoAddrStr( &io->in_addr, io->port );
    367 }
    368 
    369 static void
    370 tr_peerIoTryRead( tr_peerIo * io )
    371 {
    372     if( EVBUFFER_LENGTH( tr_iobuf_input( io->iobuf )))
    373         (*canReadWrapper)( io->iobuf, ~0, io );
    374 }
    375 
    376 void
    377 tr_peerIoSetIOFuncs( tr_peerIo *     io,
    378                      tr_can_read_cb  readcb,
    379                      tr_did_write_cb writecb,
    380                      tr_net_error_cb errcb,
    381                      void *          userData )
     477    return tr_peerIoAddrStr( &io->addr, io->port );
     478}
     479
     480void
     481tr_peerIoSetIOFuncs( tr_peerIo        * io,
     482                     tr_can_read_cb     readcb,
     483                     tr_did_write_cb    writecb,
     484                     tr_net_error_cb    errcb,
     485                     void             * userData )
    382486{
    383487    io->canRead = readcb;
     
    385489    io->gotError = errcb;
    386490    io->userData = userData;
    387 
    388     tr_peerIoTryRead( io );
    389 }
    390 
    391 int
     491}
     492
     493tr_bool
    392494tr_peerIoIsIncoming( const tr_peerIo * c )
    393495{
    394     return c->isIncoming ? 1 : 0;
     496    return c->isIncoming != 0;
    395497}
    396498
     
    403505        tr_netClose( io->socket );
    404506
    405     io->socket = tr_netOpenTCP( io->session, &io->in_addr, io->port );
     507    io->socket = tr_netOpenTCP( io->session, &io->addr, io->port );
    406508
    407509    if( io->socket >= 0 )
     
    411513
    412514        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
    413         tr_iobuf_free( io->iobuf );
     515#if 0
    414516        bufevNew( io );
     517#endif
    415518
    416519        tr_peerIoSetBandwidth( io, bandwidth );
     
    419522
    420523    return -1;
    421 }
    422 
    423 void
    424 tr_peerIoSetTimeoutSecs( tr_peerIo * io,
    425                          int         secs )
    426 {
    427     io->timeout = secs;
    428     tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
    429     tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
    430524}
    431525
     
    438532                         const uint8_t * hash )
    439533{
    440     assert( isPeerIo( io ) );
     534    assert( tr_isPeerIo( io ) );
    441535
    442536    tr_cryptoSetTorrentHash( io->crypto, hash );
     
    446540tr_peerIoGetTorrentHash( tr_peerIo * io )
    447541{
    448     assert( isPeerIo( io ) );
     542    assert( tr_isPeerIo( io ) );
    449543    assert( io->crypto );
    450544
     
    455549tr_peerIoHasTorrentHash( const tr_peerIo * io )
    456550{
    457     assert( isPeerIo( io ) );
     551    assert( tr_isPeerIo( io ) );
    458552    assert( io->crypto );
    459553
     
    469563                     const uint8_t * peer_id )
    470564{
    471     assert( isPeerIo( io ) );
     565    assert( tr_isPeerIo( io ) );
    472566
    473567    if( ( io->peerIdIsSet = peer_id != NULL ) )
     
    480574tr_peerIoGetPeersId( const tr_peerIo * io )
    481575{
    482     assert( isPeerIo( io ) );
     576    assert( tr_isPeerIo( io ) );
    483577    assert( io->peerIdIsSet );
    484578
     
    491585
    492586void
    493 tr_peerIoEnableLTEP( tr_peerIo * io,
    494                      int         flag )
    495 {
    496     assert( isPeerIo( io ) );
    497     assert( flag == 0 || flag == 1 );
    498 
     587tr_peerIoEnableFEXT( tr_peerIo * io,
     588                     tr_bool     flag )
     589{
     590    assert( tr_isPeerIo( io ) );
     591    assert( isFlag( flag ) );
     592
     593    dbgmsg( io, "setting FEXT support flag to %d", (flag?1:0) );
     594    io->fastExtensionSupported = flag;
     595}
     596
     597tr_bool
     598tr_peerIoSupportsFEXT( const tr_peerIo * io )
     599{
     600    assert( tr_isPeerIo( io ) );
     601
     602    return io->fastExtensionSupported;
     603}
     604
     605/**
     606***
     607**/
     608
     609void
     610tr_peerIoEnableLTEP( tr_peerIo  * io,
     611                     tr_bool      flag )
     612{
     613    assert( tr_isPeerIo( io ) );
     614    assert( isFlag( flag ) );
     615
     616    dbgmsg( io, "setting LTEP support flag to %d", (flag?1:0) );
    499617    io->extendedProtocolSupported = flag;
    500618}
    501619
    502 void
    503 tr_peerIoEnableFEXT( tr_peerIo * io,
    504                      int         flag )
    505 {
    506     assert( isPeerIo( io ) );
    507     assert( flag == 0 || flag == 1 );
    508 
    509     io->fastPeersSupported = flag;
    510 }
    511 
    512 int
     620tr_bool
    513621tr_peerIoSupportsLTEP( const tr_peerIo * io )
    514622{
    515     assert( isPeerIo( io ) );
     623    assert( tr_isPeerIo( io ) );
    516624
    517625    return io->extendedProtocolSupported;
    518 }
    519 
    520 int
    521 tr_peerIoSupportsFEXT( const tr_peerIo * io )
    522 {
    523     assert( isPeerIo( io ) );
    524 
    525     return io->fastPeersSupported;
    526626}
    527627
     
    540640    const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
    541641    const double period = 20; /* arbitrary */
    542     return MAX( maxBlockSize*5.5, currentSpeed*1024*period );
     642    const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */
     643    return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period );
    543644}
    544645
     
    547648{
    548649    const size_t desiredLen = getDesiredOutputBufferSize( io );
    549     const size_t currentLen = EVBUFFER_LENGTH( tr_iobuf_output( io->iobuf ) );
     650    const size_t currentLen = EVBUFFER_LENGTH( io->outbuf );
    550651    size_t freeSpace = 0;
    551652
     
    560661                       tr_bandwidth  * bandwidth )
    561662{
    562     assert( isPeerIo( io ) );
     663    assert( tr_isPeerIo( io ) );
    563664
    564665    if( io->bandwidth )
    565         tr_bandwidthRemoveBuffer( io->bandwidth, io->iobuf );
     666        tr_bandwidthRemovePeer( io->bandwidth, io );
    566667
    567668    io->bandwidth = bandwidth;
    568     tr_iobuf_set_bandwidth( io->iobuf, bandwidth );
    569669
    570670    if( io->bandwidth )
    571         tr_bandwidthAddBuffer( io->bandwidth, io->iobuf );
    572 
    573     tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
     671        tr_bandwidthAddPeer( io->bandwidth, io );
    574672}
    575673
     
    588686                        int         encryptionMode )
    589687{
    590     assert( isPeerIo( io ) );
     688    assert( tr_isPeerIo( io ) );
    591689    assert( encryptionMode == PEER_ENCRYPTION_NONE
    592690          || encryptionMode == PEER_ENCRYPTION_RC4 );
     
    620718    tr_list_append( &io->output_datatypes, datatype );
    621719
    622     evbuffer_add( tr_iobuf_output( io->iobuf ), writeme, writemeLen );
    623     tr_iobuf_enable( io->iobuf, EV_WRITE );
     720    evbuffer_add( io->outbuf, writeme, writemeLen );
    624721}
    625722
     
    736833    uint16_t tmp;
    737834
     835    assert( tr_isPeerIo( io ) );
     836
    738837    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
    739838    *setme = ntohs( tmp );
     
    747846    uint32_t tmp;
    748847
     848    assert( tr_isPeerIo( io ) );
     849
    749850    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
    750851    *setme = ntohl( tmp );
     
    756857                size_t            byteCount )
    757858{
    758     uint8_t * tmp = tr_new( uint8_t, byteCount );
    759 
     859    uint8_t * tmp;
     860
     861    assert( tr_isPeerIo( io ) );
     862
     863    tmp = tr_new( uint8_t, byteCount );
    760864    tr_peerIoReadBytes( io, inbuf, tmp, byteCount );
    761865    tr_free( tmp );
     
    767871    return time( NULL ) - io->timeCreated;
    768872}
     873
     874/***
     875****
     876***/
     877
     878static int
     879tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
     880{
     881    int res;
     882
     883    assert( tr_isPeerIo( io ) );
     884
     885    howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, howmuch );
     886
     887    res = howmuch ? evbuffer_read( io->inbuf, io->socket, howmuch ) : 0;
     888
     889    dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(errno):"") );
     890
     891    if( EVBUFFER_LENGTH( io->inbuf ) )
     892        canReadWrapper( io );
     893
     894    if( ( res <= 0 ) && ( io->gotError ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) )
     895    {
     896        short what = EVBUFFER_READ | EVBUFFER_ERROR;
     897        if( res == 0 )
     898            what |= EVBUFFER_EOF;
     899        io->gotError( io, what, io->userData );
     900    }
     901
     902    return res;
     903}
     904
     905static int
     906tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
     907{
     908    int n;
     909
     910    assert( tr_isPeerIo( io ) );
     911
     912    howmuch = tr_bandwidthClamp( io->bandwidth, TR_UP, howmuch );
     913
     914    n = tr_evbuffer_write( io, io->socket, (int)howmuch );
     915
     916    if( n > 0 )
     917        didWriteWrapper( io, n );
     918
     919    if( ( n < 0 ) && ( io->gotError ) && ( errno != EPIPE ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) ) {
     920        short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
     921        io->gotError( io, what, io->userData );
     922    }
     923
     924    return n;
     925}
     926
     927int
     928tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
     929{
     930    int ret;
     931
     932    assert( tr_isPeerIo( io ) );
     933    assert( tr_isDirection( dir ) );
     934
     935    if( dir==TR_DOWN )
     936        ret = tr_peerIoTryRead( io, limit );
     937    else
     938        ret = tr_peerIoTryWrite( io, limit );
     939
     940    return ret;
     941}
     942
     943struct evbuffer *
     944tr_peerIoGetReadBuffer( tr_peerIo * io )
     945{
     946    assert( tr_isPeerIo( io ) );
     947
     948    return io->inbuf;
     949}
     950
     951tr_bool
     952tr_peerIoHasBandwidthLeft( const tr_peerIo * io, tr_direction dir )
     953{
     954    assert( tr_isPeerIo( io ) );
     955    assert( tr_isDirection( dir ) );
     956
     957    return tr_bandwidthClamp( io->bandwidth, dir, 1024 ) > 0;
     958}
     959
     960/***
     961****
     962****/
     963
     964static void
     965event_enable( tr_peerIo * io, short event )
     966{
     967    assert( tr_isPeerIo( io ) );
     968
     969    if( event & EV_READ )
     970        event_add( &io->event_read, NULL );
     971
     972    if( event & EV_WRITE )
     973        event_add( &io->event_write, NULL );
     974}
     975
     976static void
     977event_disable( struct tr_peerIo * io, short event )
     978{
     979    assert( tr_isPeerIo( io ) );
     980
     981    if( event & EV_READ )
     982        event_del( &io->event_read );
     983
     984    if( event & EV_WRITE )
     985        event_del( &io->event_write );
     986}
     987
     988
     989void
     990tr_peerIoSetEnabled( tr_peerIo    * io,
     991                     tr_direction   dir,
     992                     tr_bool        isEnabled )
     993{
     994    short event;
     995
     996    assert( tr_isPeerIo( io ) );
     997    assert( tr_isDirection( dir ) );
     998
     999    event = dir == TR_UP ? EV_WRITE : EV_READ;
     1000
     1001    if( isEnabled )
     1002        event_enable( io, event );
     1003    else
     1004        event_disable( io, event );
     1005}
Note: See TracChangeset for help on using the changeset viewer.