Changeset 7176 for branches/1.4x/libtransmission/peer-msgs.c
- Timestamp:
- Nov 29, 2008, 4:44:24 PM (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/1.4x/libtransmission/peer-msgs.c
r7168 r7176 25 25 #include "crypto.h" 26 26 #include "inout.h" 27 #include "iobuf.h" 27 28 #ifdef WIN32 28 29 #include "net.h" /* for ECONN */ … … 261 262 struct tr_peermsgs 262 263 { 263 unsigned int peerSentBitfield : 1; 264 unsigned int peerSupportsPex : 1; 265 unsigned int clientSentLtepHandshake : 1; 266 unsigned int peerSentLtepHandshake : 1; 267 unsigned int sendingBlock : 1; 264 tr_bool peerSentBitfield; 265 tr_bool peerSupportsPex; 266 tr_bool clientSentLtepHandshake; 267 tr_bool peerSentLtepHandshake; 268 268 269 269 uint8_t state; … … 286 286 tr_publisher_t * publisher; 287 287 288 struct evbuffer * outBlock; /* buffer of the current piece message */289 288 struct evbuffer * outMessages; /* all the non-piece messages */ 290 289 … … 345 344 } 346 345 347 #define dbgmsg( msgs, ... ) myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ) 346 #define dbgmsg( msgs, ... ) \ 347 do { \ 348 if( tr_deepLoggingIsActive( ) ) \ 349 myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \ 350 } while( 0 ) 348 351 349 352 /** … … 430 433 **/ 431 434 432 static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };435 static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 }; 433 436 434 437 static void … … 484 487 static void 485 488 fireClientGotData( tr_peermsgs * msgs, 486 uint32_t length ) 489 uint32_t length, 490 int wasPieceData ) 487 491 { 488 492 tr_peer_event e = blankEvent; … … 490 494 e.length = length; 491 495 e.eventType = TR_PEER_CLIENT_GOT_DATA; 496 e.wasPieceData = wasPieceData; 492 497 publish( msgs, &e ); 493 498 } 494 499 495 500 static void 496 firePeerGotData( tr_peermsgs * msgs, 497 uint32_t length ) 501 firePeerGotData( tr_peermsgs * msgs, 502 uint32_t length, 503 int wasPieceData ) 498 504 { 499 505 tr_peer_event e = blankEvent; … … 501 507 e.length = length; 502 508 e.eventType = TR_PEER_PEER_GOT_DATA; 509 e.wasPieceData = wasPieceData; 510 503 511 publish( msgs, &e ); 504 512 } … … 906 914 **/ 907 915 908 dbgmsg( msgs, "added req for piece %lu", (unsigned long)index ); 916 dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list", 917 index, offset, length ); 909 918 req.time_requested = time( NULL ); 910 919 reqListAppend( &msgs->clientWillAskFor, &req ); … … 918 927 struct request_list a = msgs->clientWillAskFor; 919 928 struct request_list b = msgs->clientAskedFor; 929 dbgmsg( msgs, "cancelling all requests to peer" ); 920 930 921 931 msgs->clientAskedFor = REQUEST_LIST_INIT; … … 945 955 assert( length > 0 ); 946 956 957 947 958 /* have we asked the peer for this piece? */ 948 959 req.index = pieceIndex; … … 951 962 952 963 /* if it's only in the queue and hasn't been sent yet, free it */ 953 if( reqListRemove( &msgs->clientWillAskFor, &req ) ) 964 if( reqListRemove( &msgs->clientWillAskFor, &req ) ) { 965 dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length ); 954 966 fireCancelledReq( msgs, &req ); 967 } 955 968 956 969 /* if it's already been sent, send a cancel message too */ 957 970 if( reqListRemove( &msgs->clientAskedFor, &req ) ) { 971 dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length ); 958 972 protocolSendCancel( msgs, &req ); 959 973 fireCancelledReq( msgs, &req ); … … 1029 1043 } 1030 1044 1031 dbgmsg( msgs, "here is the ltep handshake we got [%*.*s]", len, len, 1032 tmp ); 1045 dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len, tmp ); 1033 1046 1034 1047 /* does the peer prefer encrypted connections? */ … … 1300 1313 const struct peer_request * req ); 1301 1314 1302 static void1303 clientGotBytes( tr_peermsgs * msgs,1304 uint32_t byteCount )1305 {1306 msgs->info->pieceDataActivityDate = time( NULL );1307 fireClientGotData( msgs, byteCount );1308 }1309 1310 1315 static int 1311 readBtPiece( tr_peermsgs * msgs, 1312 struct evbuffer * inbuf, 1313 size_t inlen ) 1316 readBtPiece( tr_peermsgs * msgs, 1317 struct evbuffer * inbuf, 1318 size_t inlen, 1319 size_t * setme_piece_bytes_read ) 1314 1320 { 1315 1321 struct peer_request * req = &msgs->incoming.blockReq; … … 1343 1349 tr_peerIoReadBytes( msgs->io, inbuf, buf, n ); 1344 1350 evbuffer_add( msgs->incoming.block, buf, n ); 1345 clientGotBytes( msgs, n ); 1351 fireClientGotData( msgs, n, TRUE ); 1352 *setme_piece_bytes_read += n; 1346 1353 tr_free( buf ); 1347 1354 dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain", … … 1540 1547 1541 1548 static void 1542 peerGotBytes( tr_peermsgs * msgs,1543 uint32_t byteCount,1544 const time_t now )1545 {1546 msgs->info->pieceDataActivityDate = now;1547 firePeerGotData( msgs, byteCount );1548 }1549 1550 static void1551 1549 decrementDownloadedCount( tr_peermsgs * msgs, 1552 1550 uint32_t byteCount ) … … 1635 1633 } 1636 1634 1635 static int peerPulse( void * vmsgs ); 1636 1637 static void 1638 didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs ) 1639 { 1640 tr_peermsgs * msgs = vmsgs; 1641 firePeerGotData( msgs, bytesWritten, wasPieceData ); 1642 peerPulse( msgs ); 1643 } 1644 1637 1645 static ReadState 1638 canRead( struct bufferevent * evin, 1639 void * vmsgs ) 1646 canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece ) 1640 1647 { 1641 1648 ReadState ret; 1642 1649 tr_peermsgs * msgs = vmsgs; 1643 struct evbuffer * in = EVBUFFER_INPUT ( evin);1650 struct evbuffer * in = tr_iobuf_input( iobuf ); 1644 1651 const size_t inlen = EVBUFFER_LENGTH( in ); 1645 1652 … … 1650 1657 else if( msgs->state == AWAITING_BT_PIECE ) 1651 1658 { 1652 ret = inlen ? readBtPiece( msgs, in, inlen ) : READ_LATER;1659 ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER; 1653 1660 } 1654 1661 else switch( msgs->state ) 1655 { 1656 case AWAITING_BT_LENGTH: 1657 ret = readBtLength ( msgs, in, inlen ); break; 1658 1659 case AWAITING_BT_ID: 1660 ret = readBtId ( msgs, in, inlen ); break; 1661 1662 case AWAITING_BT_MESSAGE: 1663 ret = readBtMessage( msgs, in, inlen ); break; 1664 1665 default: 1666 assert( 0 ); 1667 } 1662 { 1663 case AWAITING_BT_LENGTH: 1664 ret = readBtLength ( msgs, in, inlen ); break; 1665 1666 case AWAITING_BT_ID: 1667 ret = readBtId ( msgs, in, inlen ); break; 1668 1669 case AWAITING_BT_MESSAGE: 1670 ret = readBtMessage( msgs, in, inlen ); break; 1671 1672 default: 1673 assert( 0 ); 1674 } 1675 1676 /* log the raw data that was read */ 1677 if( EVBUFFER_LENGTH( in ) != inlen ) 1678 fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE ); 1668 1679 1669 1680 return ret; 1670 }1671 1672 static void1673 sendKeepalive( tr_peermsgs * msgs )1674 {1675 dbgmsg( msgs, "sending a keepalive message" );1676 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );1677 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );1678 1681 } 1679 1682 … … 1691 1694 ( rateToClient * 30 * 1024 ) / peer->torrent->blockSize; 1692 1695 1693 peer->minActiveRequests = 4; 1694 peer->maxActiveRequests = peer->minActiveRequests + 1695 estimatedBlocksInNext30Seconds; 1696 peer->minActiveRequests = 8; 1697 peer->maxActiveRequests = peer->minActiveRequests + estimatedBlocksInNext30Seconds; 1696 1698 return TRUE; 1697 1699 } … … 1705 1707 } 1706 1708 1709 static size_t 1710 fillOutputBuffer( tr_peermsgs * msgs, time_t now ) 1711 { 1712 size_t bytesWritten = 0; 1713 struct peer_request req; 1714 const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0; 1715 1716 /** 1717 *** Protocol messages 1718 **/ 1719 1720 if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */ 1721 { 1722 dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) ); 1723 msgs->outMessagesBatchedAt = now; 1724 } 1725 else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) ) 1726 { 1727 const size_t len = EVBUFFER_LENGTH( msgs->outMessages ); 1728 /* flush the protocol messages */ 1729 dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len ); 1730 tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE ); 1731 msgs->clientSentAnythingAt = now; 1732 msgs->outMessagesBatchedAt = 0; 1733 msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS; 1734 bytesWritten += len; 1735 } 1736 1737 /** 1738 *** Blocks 1739 **/ 1740 1741 if( ( tr_peerIoGetWriteBufferSpace( msgs->io ) >= msgs->torrent->blockSize ) 1742 && popNextRequest( msgs, &req ) 1743 && requestIsValid( msgs, &req ) 1744 && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) ) 1745 { 1746 /* send a block */ 1747 uint8_t * buf = tr_new( uint8_t, req.length ); 1748 const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ); 1749 if( err ) { 1750 fireError( msgs, err ); 1751 } else { 1752 tr_peerIo * io = msgs->io; 1753 struct evbuffer * out = evbuffer_new( ); 1754 dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length ); 1755 tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length ); 1756 tr_peerIoWriteUint8 ( io, out, BT_PIECE ); 1757 tr_peerIoWriteUint32( io, out, req.index ); 1758 tr_peerIoWriteUint32( io, out, req.offset ); 1759 tr_peerIoWriteBytes ( io, out, buf, req.length ); 1760 tr_peerIoWriteBuf( io, out, TRUE ); 1761 bytesWritten += EVBUFFER_LENGTH( out ); 1762 evbuffer_free( out ); 1763 msgs->clientSentAnythingAt = now; 1764 } 1765 tr_free( buf ); 1766 } 1767 1768 /** 1769 *** Keepalive 1770 **/ 1771 1772 if( msgs->clientSentAnythingAt 1773 && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) ) 1774 { 1775 dbgmsg( msgs, "sending a keepalive message" ); 1776 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 ); 1777 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 1778 } 1779 1780 return bytesWritten; 1781 } 1782 1707 1783 static int 1708 1784 peerPulse( void * vmsgs ) … … 1713 1789 ratePulse( msgs ); 1714 1790 1715 /*tr_peerIoTryRead( msgs->io );*/1716 1791 pumpRequestQueue( msgs, now ); 1717 1792 expireOldRequests( msgs, now ); 1718 1793 1719 if( msgs->sendingBlock ) 1720 { 1721 const size_t uploadMax = tr_peerIoGetWriteBufferSpace( msgs->io ); 1722 size_t len = EVBUFFER_LENGTH( msgs->outBlock ); 1723 const size_t outlen = MIN( len, uploadMax ); 1724 1725 assert( len ); 1726 1727 if( outlen ) 1728 { 1729 tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen ); 1730 evbuffer_drain( msgs->outBlock, outlen ); 1731 peerGotBytes( msgs, outlen, now ); 1732 1733 len -= outlen; 1734 msgs->clientSentAnythingAt = now; 1735 msgs->sendingBlock = len != 0; 1736 1737 dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, 1738 (int)len ); 1739 } 1740 else dbgmsg( msgs, 1741 "stalled writing block... uploadMax %lu, outlen %lu", 1742 uploadMax, outlen ); 1743 } 1744 1745 if( !msgs->sendingBlock ) 1746 { 1747 struct peer_request req; 1748 const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0; 1749 1750 if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */ 1751 { 1752 dbgmsg( msgs, "started an outMessages batch (length is %d)", 1753 (int)EVBUFFER_LENGTH( msgs->outMessages ) ); 1754 msgs->outMessagesBatchedAt = now; 1755 } 1756 else if( haveMessages 1757 && ( ( now - msgs->outMessagesBatchedAt ) > 1758 msgs->outMessagesBatchPeriod ) ) 1759 { 1760 dbgmsg( msgs, "flushing outMessages... (length is %d)", 1761 (int)EVBUFFER_LENGTH( 1762 msgs->outMessages ) ); 1763 tr_peerIoWriteBuf( msgs->io, msgs->outMessages ); 1764 msgs->clientSentAnythingAt = now; 1765 msgs->outMessagesBatchedAt = 0; 1766 msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS; 1767 } 1768 else if( !EVBUFFER_LENGTH( msgs->outBlock ) 1769 && popNextRequest( msgs, &req ) 1770 && requestIsValid( msgs, &req ) 1771 && tr_cpPieceIsComplete( msgs->torrent->completion, 1772 req.index ) ) 1773 { 1774 uint8_t * buf = tr_new( uint8_t, req.length ); 1775 const int err = tr_ioRead( msgs->torrent, 1776 req.index, req.offset, req.length, 1777 buf ); 1778 if( err ) 1779 { 1780 fireError( msgs, err ); 1781 } 1782 else 1783 { 1784 tr_peerIo * io = msgs->io; 1785 struct evbuffer * out = msgs->outBlock; 1786 1787 dbgmsg( msgs, "sending block %u:%u->%u", req.index, 1788 req.offset, 1789 req.length ); 1790 tr_peerIoWriteUint32( 1791 io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + 1792 req.length ); 1793 tr_peerIoWriteUint8 ( io, out, BT_PIECE ); 1794 tr_peerIoWriteUint32( io, out, req.index ); 1795 tr_peerIoWriteUint32( io, out, req.offset ); 1796 tr_peerIoWriteBytes ( io, out, buf, req.length ); 1797 msgs->sendingBlock = 1; 1798 } 1799 1800 tr_free( buf ); 1801 } 1802 else if( ( !haveMessages ) 1803 && ( now - msgs->clientSentAnythingAt ) > 1804 KEEPALIVE_INTERVAL_SECS ) 1805 { 1806 sendKeepalive( msgs ); 1807 } 1808 } 1794 for( ;; ) 1795 if( fillOutputBuffer( msgs, now ) < 1 ) 1796 break; 1809 1797 1810 1798 return TRUE; /* loop forever */ … … 1819 1807 1820 1808 static void 1821 gotError( struct bufferevent * evbuf UNUSED,1822 short 1823 void *vmsgs )1809 gotError( struct tr_iobuf * iobuf UNUSED, 1810 short what, 1811 void * vmsgs ) 1824 1812 { 1825 1813 if( what & EVBUFFER_TIMEOUT ) 1826 dbgmsg( vmsgs, "libevent got a timeout, what=%hd, secs=%d", what, 1827 evbuf->timeout_read ); 1814 dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what ); 1828 1815 if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) ) 1829 1816 dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)", … … 1843 1830 field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) ); 1844 1831 1832 #if 0 1845 1833 if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) ) 1846 1834 { … … 1848 1836 speed over a truly random sample -- let's limit the pool size to 1849 1837 the first 1000 pieces so large torrents don't bog things down */ 1850 size_t poolSize = MIN( msgs->torrent->info.pieceCount,1851 1852 tr_piece_index_t * pool = tr_new( tr_piece_index_t, poolSize );1838 size_t poolSize; 1839 const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 ); 1840 tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize ); 1853 1841 1854 1842 /* build the pool */ 1855 for( i = 0; i < poolSize; ++i ) 1856 pool[i] = i; 1843 for( i=poolSize=0; i<maxPoolSize; ++i ) 1844 if( tr_bitfieldHas( field, i ) ) 1845 pool[poolSize++] = i; 1857 1846 1858 1847 /* pull random piece indices from the pool */ 1859 1848 while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) ) 1860 1849 { 1861 const int 1850 const int pos = tr_cryptoWeakRandInt( poolSize ); 1862 1851 const tr_piece_index_t piece = pool[pos]; 1863 1852 tr_bitfieldRem( field, piece ); … … 1869 1858 tr_free( pool ); 1870 1859 } 1860 #endif 1871 1861 1872 1862 tr_peerIoWriteUint32( msgs->io, out, … … 1947 1937 if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) ) 1948 1938 { 1949 PexDiffs diffs; 1950 tr_pex * newPex = NULL; 1951 const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr, 1952 msgs->torrent->info.hash, 1953 &newPex ); 1939 PexDiffs diffs; 1940 tr_pex * newPex = NULL; 1941 const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr, 1942 msgs->torrent->info.hash, 1943 &newPex ); 1944 1954 1945 /* build the diffs */ 1955 1946 diffs.added = tr_new( tr_pex, newCount ); … … 1970 1961 if( diffs.addedCount || diffs.droppedCount ) 1971 1962 { 1972 int 1973 tr_benc 1974 uint8_t * tmp, *walk;1975 char * benc;1976 int bencLen;1963 int i; 1964 tr_benc val; 1965 char * benc; 1966 int bencLen; 1967 uint8_t * tmp, *walk; 1977 1968 struct evbuffer * out = msgs->outMessages; 1978 1969 … … 1987 1978 /* "added" */ 1988 1979 tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 ); 1989 for( i = 0; i < diffs.addedCount; ++i ) 1990 { 1980 for( i = 0; i < diffs.addedCount; ++i ) { 1991 1981 memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4; 1992 1982 memcpy( walk, &diffs.added[i].port, 2 ); walk += 2; … … 2006 1996 /* "dropped" */ 2007 1997 tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 ); 2008 for( i = 0; i < diffs.droppedCount; ++i ) 2009 { 1998 for( i = 0; i < diffs.droppedCount; ++i ) { 2010 1999 memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4; 2011 2000 memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2; … … 2021 2010 tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id ); 2022 2011 tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen ); 2023 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 2024 dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( out ) ); 2025 2026 /* cleanup */ 2012 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS ); 2013 dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) ); 2014 2027 2015 tr_free( benc ); 2028 2016 tr_bencFree( &val ); 2029 2017 } 2030 2018 2019 /* cleanup */ 2031 2020 tr_free( diffs.added ); 2032 2021 tr_free( diffs.dropped ); … … 2077 2066 m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS; 2078 2067 m->incoming.block = evbuffer_new( ); 2079 m->outBlock = evbuffer_new( );2080 2068 m->peerAllowedPieces = NULL; 2081 2069 m->peerAskedFor = REQUEST_LIST_INIT; … … 2092 2080 tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of 2093 2081 inactivity */ 2094 tr_peerIoSetIOFuncs( m->io, canRead, NULL, gotError, m );2082 tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m ); 2095 2083 ratePulse( m ); 2096 2084 … … 2112 2100 evbuffer_free( msgs->incoming.block ); 2113 2101 evbuffer_free( msgs->outMessages ); 2114 evbuffer_free( msgs->outBlock );2115 2102 tr_free( msgs->pex ); 2116 2103
Note: See TracChangeset
for help on using the changeset viewer.