Changeset 7171 for trunk/libtransmission/peer-msgs.c
- Timestamp:
- Nov 28, 2008, 4:00:29 PM (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/libtransmission/peer-msgs.c
r7169 r7171 266 266 unsigned int clientSentLtepHandshake : 1; 267 267 unsigned int peerSentLtepHandshake : 1; 268 unsigned int sendingBlock : 1;269 268 270 269 uint8_t state; … … 287 286 tr_publisher_t * publisher; 288 287 289 struct evbuffer * outBlock; /* buffer of the current piece message */290 288 struct evbuffer * outMessages; /* all the non-piece messages */ 291 289 … … 1683 1681 } 1684 1682 1685 static void1686 sendKeepalive( tr_peermsgs * msgs )1687 {1688 dbgmsg( msgs, "sending a keepalive message" );1689 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );1690 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );1691 }1692 1693 1683 /** 1694 1684 *** … … 1717 1707 } 1718 1708 1709 static size_t 1710 fillOutputBuffer( tr_peermsgs * msgs, time_t now ) 1711 { 1712 size_t bytesWritten = 0; 1713 struct peer_request req; 1714 const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0; 1715 1716 /** 1717 *** Protocol messages 1718 **/ 1719 1720 if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */ 1721 { 1722 dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) ); 1723 msgs->outMessagesBatchedAt = now; 1724 } 1725 else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) ) 1726 { 1727 const size_t len = EVBUFFER_LENGTH( msgs->outMessages ); 1728 /* flush the protocol messages */ 1729 dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len ); 1730 tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE ); 1731 msgs->clientSentAnythingAt = now; 1732 msgs->outMessagesBatchedAt = 0; 1733 msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS; 1734 bytesWritten += len; 1735 } 1736 1737 /** 1738 *** Blocks 1739 **/ 1740 1741 if( ( tr_peerIoGetWriteBufferSpace( msgs->io ) >= msgs->torrent->blockSize ) 1742 && popNextRequest( msgs, &req ) 1743 && requestIsValid( msgs, &req ) 1744 && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) ) 1745 { 1746 /* send a block */ 1747 uint8_t * buf = tr_new( uint8_t, req.length ); 1748 const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ); 1749 if( err ) { 1750 fireError( msgs, err ); 1751 } else { 1752 tr_peerIo * io = msgs->io; 1753 struct evbuffer * out = evbuffer_new( ); 1754 dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length ); 1755 tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length ); 1756 tr_peerIoWriteUint8 ( io, out, BT_PIECE ); 1757 tr_peerIoWriteUint32( io, out, req.index ); 1758 tr_peerIoWriteUint32( io, out, req.offset ); 1759 tr_peerIoWriteBytes ( io, out, buf, req.length ); 1760 tr_peerIoWriteBuf( io, out, TRUE ); 1761 bytesWritten += EVBUFFER_LENGTH( out ); 1762 evbuffer_free( out ); 1763 msgs->clientSentAnythingAt = now; 1764 } 1765 tr_free( buf ); 1766 } 1767 1768 /** 1769 *** Keepalive 1770 **/ 1771 1772 if( msgs->clientSentAnythingAt 1773 && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) ) 1774 { 1775 dbgmsg( msgs, "sending a keepalive message" ); 1776 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 ); 1777 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 1778 } 1779 1780 return bytesWritten; 1781 } 1782 1719 1783 static int 1720 1784 peerPulse( void * vmsgs ) … … 1725 1789 ratePulse( msgs ); 1726 1790 1727 /*tr_peerIoTryRead( msgs->io );*/1728 1791 pumpRequestQueue( msgs, now ); 1729 1792 expireOldRequests( msgs, now ); 1730 1793 1731 if( msgs->sendingBlock ) 1732 { 1733 const size_t uploadMax = tr_peerIoGetWriteBufferSpace( msgs->io ); 1734 size_t len = EVBUFFER_LENGTH( msgs->outBlock ); 1735 const size_t outlen = MIN( len, uploadMax ); 1736 1737 assert( len ); 1738 1739 if( outlen ) 1740 { 1741 tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen, TRUE ); 1742 evbuffer_drain( msgs->outBlock, outlen ); 1743 1744 len -= outlen; 1745 msgs->clientSentAnythingAt = now; 1746 msgs->sendingBlock = len != 0; 1747 1748 dbgmsg( msgs, "wrote %zu bytes; %zu left in block", outlen, len ); 1749 } 1750 else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen ); 1751 } 1752 1753 if( !msgs->sendingBlock ) 1754 { 1755 struct peer_request req; 1756 const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0; 1757 1758 if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */ 1759 { 1760 dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) ); 1761 msgs->outMessagesBatchedAt = now; 1762 } 1763 else if( ( haveMessages ) && 1764 ( ( now - msgs->outMessagesBatchedAt ) > msgs->outMessagesBatchPeriod ) ) 1765 { 1766 dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, EVBUFFER_LENGTH( msgs->outMessages ) ); 1767 tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE ); 1768 msgs->clientSentAnythingAt = now; 1769 msgs->outMessagesBatchedAt = 0; 1770 msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS; 1771 } 1772 else if( !EVBUFFER_LENGTH( msgs->outBlock ) 1773 && popNextRequest( msgs, &req ) 1774 && requestIsValid( msgs, &req ) 1775 && tr_cpPieceIsComplete( msgs->torrent->completion, 1776 req.index ) ) 1777 { 1778 uint8_t * buf = tr_new( uint8_t, req.length ); 1779 const int err = tr_ioRead( msgs->torrent, 1780 req.index, req.offset, req.length, 1781 buf ); 1782 if( err ) 1783 { 1784 fireError( msgs, err ); 1785 } 1786 else 1787 { 1788 tr_peerIo * io = msgs->io; 1789 struct evbuffer * out = msgs->outBlock; 1790 1791 dbgmsg( msgs, "sending block %u:%u->%u", req.index, 1792 req.offset, 1793 req.length ); 1794 tr_peerIoWriteUint32( 1795 io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + 1796 req.length ); 1797 tr_peerIoWriteUint8 ( io, out, BT_PIECE ); 1798 tr_peerIoWriteUint32( io, out, req.index ); 1799 tr_peerIoWriteUint32( io, out, req.offset ); 1800 tr_peerIoWriteBytes ( io, out, buf, req.length ); 1801 msgs->sendingBlock = 1; 1802 } 1803 1804 tr_free( buf ); 1805 } 1806 else if( ( !haveMessages ) 1807 && ( now - msgs->clientSentAnythingAt ) > 1808 KEEPALIVE_INTERVAL_SECS ) 1809 { 1810 sendKeepalive( msgs ); 1811 } 1812 } 1794 for( ;; ) 1795 if( fillOutputBuffer( msgs, now ) < 1 ) 1796 break; 1813 1797 1814 1798 return TRUE; /* loop forever */ … … 2082 2066 m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS; 2083 2067 m->incoming.block = evbuffer_new( ); 2084 m->outBlock = evbuffer_new( );2085 2068 m->peerAllowedPieces = NULL; 2086 2069 m->peerAskedFor = REQUEST_LIST_INIT; … … 2117 2100 evbuffer_free( msgs->incoming.block ); 2118 2101 evbuffer_free( msgs->outMessages ); 2119 evbuffer_free( msgs->outBlock );2120 2102 tr_free( msgs->pex ); 2121 2103
Note: See TracChangeset
for help on using the changeset viewer.