Changeset 7125
- Timestamp:
- Nov 17, 2008, 4:00:57 AM (12 years ago)
- Location:
- trunk/libtransmission
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/libtransmission/handshake.c
r6988 r7125 334 334 /* send it */ 335 335 setReadState( handshake, AWAITING_YB ); 336 tr_peerIoWriteBuf( handshake->io, outbuf );336 tr_peerIoWriteBuf( handshake->io, outbuf, FALSE ); 337 337 338 338 /* cleanup */ … … 486 486 tr_cryptoDecryptInit( handshake->crypto ); 487 487 setReadState( handshake, AWAITING_VC ); 488 tr_peerIoWriteBuf( handshake->io, outbuf );488 tr_peerIoWriteBuf( handshake->io, outbuf, FALSE ); 489 489 490 490 /* cleanup */ … … 712 712 int msgSize; 713 713 uint8_t * msg = buildHandshakeMessage( handshake, &msgSize ); 714 tr_peerIoWrite( handshake->io, msg, msgSize );714 tr_peerIoWrite( handshake->io, msg, msgSize, FALSE ); 715 715 tr_free( msg ); 716 716 handshake->haveSentBitTorrentHandshake = 1; … … 780 780 781 781 setReadState( handshake, AWAITING_PAD_A ); 782 tr_peerIoWrite( handshake->io, outbuf, walk - outbuf );782 tr_peerIoWrite( handshake->io, outbuf, walk - outbuf, FALSE ); 783 783 return READ_NOW; 784 784 } … … 992 992 993 993 /* send it out */ 994 tr_peerIoWriteBuf( handshake->io, outbuf );994 tr_peerIoWriteBuf( handshake->io, outbuf, FALSE ); 995 995 evbuffer_free( outbuf ); 996 996 … … 1157 1157 handshake->haveSentBitTorrentHandshake = 1; 1158 1158 setReadState( handshake, AWAITING_HANDSHAKE ); 1159 tr_peerIoWrite( handshake->io, msg, msgSize );1159 tr_peerIoWrite( handshake->io, msg, msgSize, FALSE ); 1160 1160 tr_free( msg ); 1161 1161 } … … 1204 1204 handshake->haveSentBitTorrentHandshake = 1; 1205 1205 setReadState( handshake, AWAITING_HANDSHAKE ); 1206 tr_peerIoWrite( handshake->io, msg, msgSize );1206 tr_peerIoWrite( handshake->io, msg, msgSize, FALSE ); 1207 1207 tr_free( msg ); 1208 1208 } -
trunk/libtransmission/peer-common.h
r6876 r7125 50 50 { 51 51 PeerEventType eventType; 52 uint32_t pieceIndex; /* for GOT_BLOCK, CANCEL */ 53 uint32_t offset; /* for GOT_BLOCK */ 54 uint32_t length; /* for GOT_BLOCK + GOT_DATA */ 55 float progress; /* for TR_PEER_PEER_PROGRESS */ 56 int err; /* errno for TR_PEER_GOT_ERROR */ 52 uint32_t pieceIndex; /* for GOT_BLOCK, CANCEL */ 53 uint32_t offset; /* for GOT_BLOCK */ 54 uint32_t length; /* for GOT_BLOCK + GOT_DATA */ 55 float progress; /* for PEER_PROGRESS */ 56 int err; /* errno for GOT_ERROR */ 57 int wasPieceData; /* for GOT_DATA */ 57 58 } 58 59 tr_peer_event; -
trunk/libtransmission/peer-io.c
r7069 r7125 28 28 #include "transmission.h" 29 29 #include "crypto.h" 30 #include "list.h" 30 31 #include "net.h" 31 32 #include "peer-io.h" … … 77 78 }; 78 79 80 struct tr_datatype 81 { 82 unsigned int isPieceData : 1; 83 size_t length; 84 }; 85 79 86 struct tr_peerIo 80 87 { … … 93 100 time_t timeCreated; 94 101 95 tr_session *session;102 tr_session * session; 96 103 97 104 struct in_addr in_addr; 98 struct bufferevent * bufev; 99 struct evbuffer * output; 105 struct bufferevent * bufev; 106 struct evbuffer * output; 107 tr_list * output_datatypes; /* struct tr_datatype */ 100 108 101 109 tr_can_read_cb canRead; … … 200 208 if( len < io->bufferSize[TR_UP] ) 201 209 { 202 const size_t payload = io->bufferSize[TR_UP] - len; 203 const size_t n = addPacketOverhead( payload ); 204 struct tr_bandwidth * b = &io->bandwidth[TR_UP]; 205 b->bytesLeft -= MIN( b->bytesLeft, (size_t)n ); 206 b->bytesUsed += n; 207 tr_rcTransferred( io->session->rawSpeed[TR_UP], n ); 208 dbgmsg( io, 209 "wrote %zu bytes to peer... upload bytesLeft is now %zu", 210 n, 211 b->bytesLeft ); 210 size_t payload = io->bufferSize[TR_UP] - len; 211 212 while( payload ) 213 { 214 struct tr_datatype * next = io->output_datatypes->data; 215 const size_t chunk_length = MIN( next->length, payload ); 216 const size_t n = addPacketOverhead( chunk_length ); 217 218 if( next->isPieceData ) 219 { 220 struct tr_bandwidth * b = &io->bandwidth[TR_UP]; 221 b->bytesLeft -= MIN( b->bytesLeft, n ); 222 b->bytesUsed += n; 223 } 224 225 if( io->didWrite ) 226 io->didWrite( io, n, next->isPieceData, io->userData ); 227 228 payload -= chunk_length; 229 next->length -= chunk_length; 230 if( !next->length ) 231 tr_free( tr_list_pop_front( &io->output_datatypes ) ); 232 } 212 233 } 213 234 214 235 adjustOutputBuffer( io ); 215 236 216 if( io->didWrite )217 io->didWrite( e, io->userData );218 237 } 219 238 … … 238 257 b->bytesLeft -= MIN( b->bytesLeft, (size_t)n ); 239 258 b->bytesUsed += n; 240 tr_rcTransferred( io->session->rawSpeed[TR_DOWN], n ); 241 dbgmsg( io, 242 "%zu new input bytes. bytesUsed is %zu, bytesLeft is %zu", 243 n, b->bytesUsed, 244 b->bytesLeft ); 259 dbgmsg( io, "%zu new input bytes. bytesUsed is %zu, bytesLeft is %zu", n, b->bytesUsed, b->bytesLeft ); 245 260 246 261 adjustInputBuffer( io ); … … 387 402 tr_netClose( io->socket ); 388 403 tr_cryptoFree( io->crypto ); 404 tr_list_free( &io->output_datatypes, tr_free ); 389 405 tr_free( io ); 390 406 } … … 718 734 719 735 void 720 tr_peerIoWrite( tr_peerIo * io, 721 const void * writeme, 722 size_t writemeLen ) 723 { 736 tr_peerIoWrite( tr_peerIo * io, 737 const void * writeme, 738 size_t writemeLen, 739 int isPieceData ) 740 { 741 struct tr_datatype * datatype; 724 742 assert( tr_amInEventThread( io->session ) ); 725 743 dbgmsg( io, "adding %zu bytes into io->output", writemeLen ); … … 730 748 evbuffer_add( io->output, writeme, writemeLen ); 731 749 750 datatype = tr_new( struct tr_datatype, 1 ); 751 datatype->isPieceData = isPieceData != 0; 752 datatype->length = writemeLen; 753 tr_list_append( &io->output_datatypes, datatype ); 754 732 755 adjustOutputBuffer( io ); 733 756 } 734 757 735 758 void 736 tr_peerIoWriteBuf( tr_peerIo * io, 737 struct evbuffer * buf ) 759 tr_peerIoWriteBuf( tr_peerIo * io, 760 struct evbuffer * buf, 761 int isPieceData ) 738 762 { 739 763 const size_t n = EVBUFFER_LENGTH( buf ); 740 764 741 tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n );765 tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData ); 742 766 evbuffer_drain( buf, n ); 743 767 } -
trunk/libtransmission/peer-io.h
r7055 r7125 29 29 **/ 30 30 31 tr_peerIo* tr_peerIoNewOutgoing( 32 struct tr_handle * session, 33 const struct in_addr * addr, 34 int port, 35 const uint8_t * 36 torrentHash ); 37 38 tr_peerIo* tr_peerIoNewIncoming( struct tr_handle * session, 31 tr_peerIo* tr_peerIoNewOutgoing( struct tr_handle * session, 32 const struct in_addr * addr, 33 int port, 34 const uint8_t * torrentHash ); 35 36 tr_peerIo* tr_peerIoNewIncoming( struct tr_handle * session, 39 37 const struct in_addr * addr, 40 38 uint16_t port, … … 109 107 ReadState; 110 108 111 typedef ReadState ( *tr_can_read_cb )( struct bufferevent*, void* user_data ); 112 typedef void ( *tr_did_write_cb )( struct bufferevent *, void * ); 113 typedef void ( *tr_net_error_cb )( struct bufferevent *, short what, void * ); 114 115 void tr_peerIoSetIOFuncs( tr_peerIo * io, 116 tr_can_read_cb readcb, 117 tr_did_write_cb writecb, 118 tr_net_error_cb errcb, 119 void * user_data ); 120 121 int tr_peerIoWantsBandwidth( const tr_peerIo * io, 122 tr_direction ); 123 124 #if 0 125 void tr_peerIoTryRead( tr_peerIo * io ); 126 127 #endif 128 129 void tr_peerIoWrite( tr_peerIo * io, 130 const void * writeme, 131 size_t writemeLen ); 132 133 void tr_peerIoWriteBuf( tr_peerIo * io, 134 struct evbuffer * buf ); 135 109 typedef ReadState ( *tr_can_read_cb )( struct bufferevent * ev, 110 void * user_data ); 111 112 typedef void ( *tr_did_write_cb )( tr_peerIo * io, 113 size_t bytesWritten, 114 int wasPieceData, 115 void * userData ); 116 117 typedef void ( *tr_net_error_cb )( struct bufferevent * ev, 118 short what, 119 void * userData ); 120 121 void tr_peerIoSetIOFuncs ( tr_peerIo * io, 122 tr_can_read_cb readcb, 123 tr_did_write_cb writecb, 124 tr_net_error_cb errcb, 125 void * user_data ); 126 127 /** 128 *** 129 **/ 130 131 int tr_peerIoWantsBandwidth ( const tr_peerIo * io, 132 tr_direction direction ); 133 134 void tr_peerIoWrite ( tr_peerIo * io, 135 const void * writeme, 136 size_t writemeLen, 137 int isPieceData ); 138 139 void tr_peerIoWriteBuf ( tr_peerIo * io, 140 struct evbuffer * buf, 141 int isPieceData ); 136 142 137 143 /** -
trunk/libtransmission/peer-mgr-private.h
r7055 r7125 71 71 * protocol overhead is NOT included; this is only the piece data */ 72 72 struct tr_ratecontrol * pieceSpeed[2]; 73 74 /* the rate at which all data is being transferred between client and peer. */ 75 struct tr_ratecontrol * rawSpeed[2]; 73 76 } 74 77 tr_peer; -
trunk/libtransmission/peer-mgr.c
r7121 r7125 331 331 p->pieceSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( ); 332 332 p->pieceSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( ); 333 p->rawSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( ); 334 p->rawSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( ); 333 335 return p; 334 336 } … … 368 370 tr_free( peer->client ); 369 371 372 tr_rcClose( peer->rawSpeed[TR_CLIENT_TO_PEER] ); 373 tr_rcClose( peer->rawSpeed[TR_PEER_TO_CLIENT] ); 370 374 tr_rcClose( peer->pieceSpeed[TR_CLIENT_TO_PEER] ); 371 375 tr_rcClose( peer->pieceSpeed[TR_PEER_TO_CLIENT] ); … … 1017 1021 const time_t now = time( NULL ); 1018 1022 tr_torrent * tor = t->tor; 1023 const tr_direction dir = TR_CLIENT_TO_PEER; 1024 1019 1025 tor->activityDate = now; 1020 tor->uploadedCur += e->length; 1026 1027 if( e->wasPieceData ) 1028 tor->uploadedCur += e->length; 1029 1030 /* add it to the raw upload speed */ 1021 1031 if( peer ) 1022 tr_rcTransferred ( peer->pieceSpeed[TR_CLIENT_TO_PEER], e->length ); 1023 tr_rcTransferred ( tor->pieceSpeed[TR_CLIENT_TO_PEER], e->length ); 1024 tr_rcTransferred ( tor->session->pieceSpeed[TR_CLIENT_TO_PEER], e->length ); 1025 tr_statsAddUploaded( tor->session, e->length ); 1026 if( peer ) 1027 { 1032 tr_rcTransferred ( peer->rawSpeed[dir], e->length ); 1033 tr_rcTransferred ( tor->rawSpeed[dir], e->length ); 1034 tr_rcTransferred ( tor->session->rawSpeed[dir], e->length ); 1035 1036 /* maybe add it to the piece upload speed */ 1037 if( e->wasPieceData ) { 1038 if( peer ) 1039 tr_rcTransferred ( peer->pieceSpeed[dir], e->length ); 1040 tr_rcTransferred ( tor->pieceSpeed[dir], e->length ); 1041 tr_rcTransferred ( tor->session->pieceSpeed[dir], e->length ); 1042 } 1043 1044 /* update the stats */ 1045 if( e->wasPieceData ) 1046 tr_statsAddUploaded( tor->session, e->length ); 1047 1048 /* update our atom */ 1049 if( peer ) { 1028 1050 struct peer_atom * a = getExistingAtom( t, &peer->in_addr ); 1029 1051 a->piece_data_time = now; 1030 1052 } 1053 1031 1054 break; 1032 1055 } … … 1036 1059 const time_t now = time( NULL ); 1037 1060 tr_torrent * tor = t->tor; 1061 const tr_direction dir = TR_PEER_TO_CLIENT; 1062 1038 1063 tor->activityDate = now; 1039 tr_statsAddDownloaded( tor->session, e->length ); 1040 if( peer ) 1041 tr_rcTransferred ( peer->pieceSpeed[TR_PEER_TO_CLIENT], e->length ); 1042 tr_rcTransferred ( tor->pieceSpeed[TR_PEER_TO_CLIENT], e->length ); 1043 tr_rcTransferred ( tor->session->pieceSpeed[TR_PEER_TO_CLIENT], e->length ); 1064 1044 1065 /* only add this to downloadedCur if we got it from a peer -- 1045 1066 * webseeds shouldn't count against our ratio. As one tracker … … 1048 1069 * to manage the swarms, not the web server and does not fit 1049 1070 * into the jurisdiction of the tracker." */ 1071 if( peer && e->wasPieceData ) 1072 tor->downloadedCur += e->length; 1073 1074 /* add it to our raw download speed */ 1050 1075 if( peer ) 1051 tor->downloadedCur += e->length; 1076 tr_rcTransferred ( peer->rawSpeed[dir], e->length ); 1077 tr_rcTransferred ( tor->rawSpeed[dir], e->length ); 1078 tr_rcTransferred ( tor->session->rawSpeed[dir], e->length ); 1079 1080 /* maybe add it to the piece upload speed */ 1081 if( e->wasPieceData ) { 1082 if( peer ) 1083 tr_rcTransferred ( peer->pieceSpeed[dir], e->length ); 1084 tr_rcTransferred ( tor->pieceSpeed[dir], e->length ); 1085 tr_rcTransferred ( tor->session->pieceSpeed[dir], e->length ); 1086 } 1087 1088 /* update the stats */ 1089 if( e->wasPieceData ) 1090 tr_statsAddDownloaded( tor->session, e->length ); 1091 1092 /* update our atom */ 1052 1093 if( peer ) { 1053 1094 struct peer_atom * a = getExistingAtom( t, &peer->in_addr ); 1054 1095 a->piece_data_time = now; 1055 1096 } 1097 1056 1098 break; 1057 1099 } -
trunk/libtransmission/peer-msgs.c
r7119 r7125 430 430 **/ 431 431 432 static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };432 static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 }; 433 433 434 434 static void … … 484 484 static void 485 485 fireClientGotData( tr_peermsgs * msgs, 486 uint32_t length ) 486 uint32_t length, 487 int wasPieceData ) 487 488 { 488 489 tr_peer_event e = blankEvent; … … 490 491 e.length = length; 491 492 e.eventType = TR_PEER_CLIENT_GOT_DATA; 493 e.wasPieceData = wasPieceData; 492 494 publish( msgs, &e ); 493 495 } 494 496 495 497 static void 496 firePeerGotData( tr_peermsgs * msgs, 497 uint32_t length ) 498 firePeerGotData( tr_peermsgs * msgs, 499 uint32_t length, 500 int wasPieceData ) 498 501 { 499 502 tr_peer_event e = blankEvent; … … 501 504 e.length = length; 502 505 e.eventType = TR_PEER_PEER_GOT_DATA; 506 e.wasPieceData = wasPieceData; 507 503 508 publish( msgs, &e ); 504 509 } … … 1300 1305 const struct peer_request * req ); 1301 1306 1302 static void1303 clientGotBytes( tr_peermsgs * msgs,1304 uint32_t byteCount )1305 {1306 msgs->info->pieceDataActivityDate = time( NULL );1307 fireClientGotData( msgs, byteCount );1308 }1309 1310 1307 static int 1311 1308 readBtPiece( tr_peermsgs * msgs, … … 1343 1340 tr_peerIoReadBytes( msgs->io, inbuf, buf, n ); 1344 1341 evbuffer_add( msgs->incoming.block, buf, n ); 1345 clientGotBytes( msgs, n);1342 fireClientGotData( msgs, n, TRUE ); 1346 1343 tr_free( buf ); 1347 1344 dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain", … … 1540 1537 1541 1538 static void 1542 peerGotBytes( tr_peermsgs * msgs,1543 uint32_t byteCount,1544 const time_t now )1545 {1546 msgs->info->pieceDataActivityDate = now;1547 firePeerGotData( msgs, byteCount );1548 }1549 1550 static void1551 1539 decrementDownloadedCount( tr_peermsgs * msgs, 1552 1540 uint32_t byteCount ) … … 1635 1623 } 1636 1624 1625 static void 1626 didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs ) 1627 { 1628 tr_peermsgs * msgs = vmsgs; 1629 firePeerGotData( msgs, bytesWritten, wasPieceData ); 1630 } 1631 1637 1632 static ReadState 1638 1633 canRead( struct bufferevent * evin, … … 1653 1648 } 1654 1649 else switch( msgs->state ) 1655 { 1656 case AWAITING_BT_LENGTH: 1657 ret = readBtLength ( msgs, in, inlen ); break; 1658 1659 case AWAITING_BT_ID: 1660 ret = readBtId ( msgs, in, inlen ); break; 1661 1662 case AWAITING_BT_MESSAGE: 1663 ret = readBtMessage( msgs, in, inlen ); break; 1664 1665 default: 1666 assert( 0 ); 1667 } 1650 { 1651 case AWAITING_BT_LENGTH: 1652 ret = readBtLength ( msgs, in, inlen ); break; 1653 1654 case AWAITING_BT_ID: 1655 ret = readBtId ( msgs, in, inlen ); break; 1656 1657 case AWAITING_BT_MESSAGE: 1658 ret = readBtMessage( msgs, in, inlen ); break; 1659 1660 default: 1661 assert( 0 ); 1662 } 1663 1664 /* log the raw data that was read */ 1665 if( EVBUFFER_LENGTH( in ) != inlen ) 1666 fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE ); 1668 1667 1669 1668 return ret; … … 1727 1726 if( outlen ) 1728 1727 { 1729 tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );1728 tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen, TRUE ); 1730 1729 evbuffer_drain( msgs->outBlock, outlen ); 1731 peerGotBytes( msgs, outlen, now );1732 1730 1733 1731 len -= outlen; … … 1735 1733 msgs->sendingBlock = len != 0; 1736 1734 1737 dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, 1738 (int)len ); 1735 dbgmsg( msgs, "wrote %zu bytes; %zu left in block", outlen, len ); 1739 1736 } 1740 else dbgmsg( msgs, 1741 "stalled writing block... uploadMax %lu, outlen %lu", 1742 uploadMax, outlen ); 1737 else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen ); 1743 1738 } 1744 1739 … … 1750 1745 if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */ 1751 1746 { 1752 dbgmsg( msgs, "started an outMessages batch (length is %d)", 1753 (int)EVBUFFER_LENGTH( msgs->outMessages ) ); 1747 dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) ); 1754 1748 msgs->outMessagesBatchedAt = now; 1755 1749 } … … 1758 1752 msgs->outMessagesBatchPeriod ) ) 1759 1753 { 1760 dbgmsg( msgs, "flushing outMessages... (length is %d)", 1761 (int)EVBUFFER_LENGTH( 1762 msgs->outMessages ) ); 1763 tr_peerIoWriteBuf( msgs->io, msgs->outMessages ); 1754 dbgmsg( msgs, "flushing outMessages... (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) ); 1755 tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE ); 1764 1756 msgs->clientSentAnythingAt = now; 1765 1757 msgs->outMessagesBatchedAt = 0; … … 2090 2082 tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of 2091 2083 inactivity */ 2092 tr_peerIoSetIOFuncs( m->io, canRead, NULL, gotError, m );2084 tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m ); 2093 2085 ratePulse( m ); 2094 2086 -
trunk/libtransmission/webseed.c
r7078 r7125 52 52 ***/ 53 53 54 static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };54 static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 }; 55 55 56 56 static void … … 94 94 e.eventType = TR_PEER_CLIENT_GOT_DATA; 95 95 e.length = length; 96 e.wasPieceData = TRUE; 97 96 98 publish( w, &e ); 97 99 }
Note: See TracChangeset
for help on using the changeset viewer.