Changeset 7171


Ignore:
Timestamp:
Nov 28, 2008, 4:00:29 PM (10 years ago)
Author:
charles
Message:

(libT) make peer-io's output buffer size more flexible based on the peer's speed

Location:
trunk/libtransmission
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/libtransmission/peer-io.c

    r7159 r7171  
    530530**/
    531531
     532static size_t
     533getDesiredOutputBufferSize( const tr_peerIo * io )
     534{
     535    /* this is all kind of arbitrary, but what seems to work well is
     536     * being large enough to hold the next 15 seconds' worth of input,
     537     * or two and a half blocks, whichever is bigger.
     538     * It's okay to tweak this as needed */
     539    const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
     540    const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
     541    const double period = 20; /* arbitrary */
     542    return MAX( maxBlockSize*2.5, currentSpeed*1024*period );
     543}
     544
    532545size_t
    533546tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
    534547{
    535     const size_t desiredLen = io->session->so_sndbuf * 2; /* FIXME: bigger? */
     548    const size_t desiredLen = getDesiredOutputBufferSize( io );
    536549    const size_t currentLen = EVBUFFER_LENGTH( tr_iobuf_output( io->iobuf ) );
    537550    size_t freeSpace = 0;
  • trunk/libtransmission/peer-msgs.c

    r7169 r7171  
    266266    unsigned int    clientSentLtepHandshake : 1;
    267267    unsigned int    peerSentLtepHandshake   : 1;
    268     unsigned int    sendingBlock            : 1;
    269268
    270269    uint8_t         state;
     
    287286    tr_publisher_t *       publisher;
    288287
    289     struct evbuffer *      outBlock; /* buffer of the current piece message */
    290288    struct evbuffer *      outMessages; /* all the non-piece messages */
    291289
     
    16831681}
    16841682
    1685 static void
    1686 sendKeepalive( tr_peermsgs * msgs )
    1687 {
    1688     dbgmsg( msgs, "sending a keepalive message" );
    1689     tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
    1690     pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
    1691 }
    1692 
    16931683/**
    16941684***
     
    17171707}
    17181708
     1709static size_t
     1710fillOutputBuffer( tr_peermsgs * msgs, time_t now )
     1711{
     1712    size_t bytesWritten = 0;
     1713    struct peer_request req;
     1714    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
     1715
     1716    /**
     1717    ***  Protocol messages
     1718    **/
     1719
     1720    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
     1721    {
     1722        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
     1723        msgs->outMessagesBatchedAt = now;
     1724    }
     1725    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
     1726    {
     1727        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
     1728        /* flush the protocol messages */
     1729        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len );
     1730        tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
     1731        msgs->clientSentAnythingAt = now;
     1732        msgs->outMessagesBatchedAt = 0;
     1733        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
     1734        bytesWritten +=  len;
     1735    }
     1736
     1737    /**
     1738    ***  Blocks
     1739    **/
     1740
     1741    if( ( tr_peerIoGetWriteBufferSpace( msgs->io ) >= msgs->torrent->blockSize )
     1742        && popNextRequest( msgs, &req )
     1743        && requestIsValid( msgs, &req )
     1744        && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
     1745    {
     1746        /* send a block */
     1747        uint8_t * buf = tr_new( uint8_t, req.length );
     1748        const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
     1749        if( err ) {
     1750            fireError( msgs, err );
     1751        } else {
     1752            tr_peerIo * io = msgs->io;
     1753            struct evbuffer * out = evbuffer_new( );
     1754            dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
     1755            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
     1756            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
     1757            tr_peerIoWriteUint32( io, out, req.index );
     1758            tr_peerIoWriteUint32( io, out, req.offset );
     1759            tr_peerIoWriteBytes ( io, out, buf, req.length );
     1760            tr_peerIoWriteBuf( io, out, TRUE );
     1761            bytesWritten += EVBUFFER_LENGTH( out );
     1762            evbuffer_free( out );
     1763            msgs->clientSentAnythingAt = now;
     1764        }
     1765        tr_free( buf );
     1766    }
     1767
     1768    /**
     1769    ***  Keepalive
     1770    **/
     1771
     1772    if( msgs->clientSentAnythingAt
     1773        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
     1774    {
     1775        dbgmsg( msgs, "sending a keepalive message" );
     1776        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
     1777        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
     1778    }
     1779
     1780    return bytesWritten;
     1781}
     1782
    17191783static int
    17201784peerPulse( void * vmsgs )
     
    17251789    ratePulse( msgs );
    17261790
    1727     /*tr_peerIoTryRead( msgs->io );*/
    17281791    pumpRequestQueue( msgs, now );
    17291792    expireOldRequests( msgs, now );
    17301793
    1731     if( msgs->sendingBlock )
    1732     {
    1733         const size_t uploadMax = tr_peerIoGetWriteBufferSpace( msgs->io );
    1734         size_t       len = EVBUFFER_LENGTH( msgs->outBlock );
    1735         const size_t outlen = MIN( len, uploadMax );
    1736 
    1737         assert( len );
    1738 
    1739         if( outlen )
    1740         {
    1741             tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen, TRUE );
    1742             evbuffer_drain( msgs->outBlock, outlen );
    1743 
    1744             len -= outlen;
    1745             msgs->clientSentAnythingAt = now;
    1746             msgs->sendingBlock = len != 0;
    1747 
    1748             dbgmsg( msgs, "wrote %zu bytes; %zu left in block", outlen, len );
    1749         }
    1750         else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
    1751     }
    1752 
    1753     if( !msgs->sendingBlock )
    1754     {
    1755         struct peer_request req;
    1756         const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
    1757 
    1758         if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
    1759         {
    1760             dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
    1761             msgs->outMessagesBatchedAt = now;
    1762         }
    1763         else if( ( haveMessages ) &&
    1764                  ( ( now - msgs->outMessagesBatchedAt ) > msgs->outMessagesBatchPeriod ) )
    1765         {
    1766             dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, EVBUFFER_LENGTH( msgs->outMessages ) );
    1767             tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
    1768             msgs->clientSentAnythingAt = now;
    1769             msgs->outMessagesBatchedAt = 0;
    1770             msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
    1771         }
    1772         else if( !EVBUFFER_LENGTH( msgs->outBlock )
    1773                && popNextRequest( msgs, &req )
    1774                && requestIsValid( msgs, &req )
    1775                && tr_cpPieceIsComplete( msgs->torrent->completion,
    1776                                         req.index ) )
    1777         {
    1778             uint8_t * buf = tr_new( uint8_t, req.length );
    1779             const int err = tr_ioRead( msgs->torrent,
    1780                                        req.index, req.offset, req.length,
    1781                                        buf );
    1782             if( err )
    1783             {
    1784                 fireError( msgs, err );
    1785             }
    1786             else
    1787             {
    1788                 tr_peerIo *       io = msgs->io;
    1789                 struct evbuffer * out = msgs->outBlock;
    1790 
    1791                 dbgmsg( msgs, "sending block %u:%u->%u", req.index,
    1792                         req.offset,
    1793                         req.length );
    1794                 tr_peerIoWriteUint32(
    1795                     io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) +
    1796                     req.length );
    1797                 tr_peerIoWriteUint8 ( io, out, BT_PIECE );
    1798                 tr_peerIoWriteUint32( io, out, req.index );
    1799                 tr_peerIoWriteUint32( io, out, req.offset );
    1800                 tr_peerIoWriteBytes ( io, out, buf, req.length );
    1801                 msgs->sendingBlock = 1;
    1802             }
    1803 
    1804             tr_free( buf );
    1805         }
    1806         else if( ( !haveMessages )
    1807                && ( now - msgs->clientSentAnythingAt ) >
    1808                 KEEPALIVE_INTERVAL_SECS )
    1809         {
    1810             sendKeepalive( msgs );
    1811         }
    1812     }
     1794    for( ;; )
     1795        if( fillOutputBuffer( msgs, now ) < 1 )
     1796            break;
    18131797
    18141798    return TRUE; /* loop forever */
     
    20822066    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
    20832067    m->incoming.block = evbuffer_new( );
    2084     m->outBlock = evbuffer_new( );
    20852068    m->peerAllowedPieces = NULL;
    20862069    m->peerAskedFor = REQUEST_LIST_INIT;
     
    21172100        evbuffer_free( msgs->incoming.block );
    21182101        evbuffer_free( msgs->outMessages );
    2119         evbuffer_free( msgs->outBlock );
    21202102        tr_free( msgs->pex );
    21212103
Note: See TracChangeset for help on using the changeset viewer.