Changeset 3839
- Timestamp:
- Nov 16, 2007, 8:40:03 PM (15 years ago)
- Location:
- trunk/libtransmission
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/libtransmission/handshake.c
r3785 r3839 1031 1031 handshake->handle = tr_peerIoGetHandle( io ); 1032 1032 1033 tr_peerIoSetIOMode( handshake->io, EV_READ|EV_WRITE, 0 );1034 1033 tr_peerIoSetIOFuncs( handshake->io, canRead, NULL, gotError, handshake ); 1035 1034 -
trunk/libtransmission/peer-io.c
r3830 r3839 99 99 continue; 100 100 case READ_MORE: 101 tr_peerIoSetIOMode( c, EV_READ, 0 );102 101 case READ_DONE: 103 102 done = 1; … … 119 118 *** 120 119 **/ 120 121 void bufferevent_setwatermark(struct bufferevent *, short, size_t, size_t); 121 122 122 123 static tr_peerIo* … … 143 144 bufferevent_settimeout( c->bufev, IO_TIMEOUT_SECS, IO_TIMEOUT_SECS ); 144 145 bufferevent_enable( c->bufev, EV_READ|EV_WRITE ); 146 bufferevent_setwatermark( c->bufev, EV_READ, 0, 1024 ); 147 145 148 return c; 146 149 } … … 234 237 } 235 238 239 void 240 tr_peerIoTryRead( tr_peerIo * io ) 241 { 242 if( EVBUFFER_LENGTH( io->bufev->input ) ) 243 canReadWrapper( io->bufev, io ); 244 } 245 236 246 void 237 247 tr_peerIoSetIOFuncs( tr_peerIo * io, … … 246 256 io->userData = userData; 247 257 248 if( EVBUFFER_LENGTH( io->bufev->input ) ) 249 canReadWrapper( io->bufev, io ); 250 } 251 252 void 253 tr_peerIoSetIOMode( tr_peerIo * io, short enable, short disable ) 254 { 255 assert( tr_amInEventThread( io->handle ) ); 256 bufferevent_enable( io->bufev, enable ); 257 bufferevent_disable( io->bufev, disable ); 258 tr_peerIoTryRead( io ); 258 259 } 259 260 … … 285 286 bufferevent_settimeout( io->bufev, IO_TIMEOUT_SECS, IO_TIMEOUT_SECS ); 286 287 bufferevent_enable( io->bufev, EV_READ|EV_WRITE ); 288 bufferevent_setwatermark( io->bufev, EV_READ, 0, 1024 ); 287 289 288 290 return 0; -
trunk/libtransmission/peer-io.h
r3501 r3839 109 109 void * user_data ); 110 110 111 void tr_peerIoSetIOMode ( tr_peerIo * io, 112 short enable_mode, 113 short disable_mode ); 111 size_t tr_peerIoWriteBytesWaiting( const tr_peerIo * io ); 114 112 115 size_t tr_peerIoWriteBytesWaiting( consttr_peerIo * io );113 void tr_peerIoTryRead( tr_peerIo * io ); 116 114 117 115 void tr_peerIoWrite( tr_peerIo * io, -
trunk/libtransmission/peer-msgs.c
r3836 r3839 81 81 { 82 82 AWAITING_BT_LENGTH, 83 AWAITING_BT_MESSAGE 83 AWAITING_BT_ID, 84 AWAITING_BT_MESSAGE, 85 AWAITING_BT_PIECE 84 86 }; 85 87 … … 103 105 return 0; 104 106 } 107 108 /* this is raw, unchanged data from the peer regarding 109 * the current message that it's sending us. */ 110 struct tr_incoming 111 { 112 uint32_t length; /* includes the +1 for id length */ 113 uint8_t id; 114 struct peer_request blockReq; /* metadata for incoming blocks */ 115 struct evbuffer * block; /* piece data for incoming blocks */ 116 }; 105 117 106 118 struct tr_peermsgs … … 142 154 uint8_t ut_pex_id; 143 155 uint16_t pexCount; 144 uint32_t incomingMessageLength;145 156 uint32_t maxActiveRequests; 146 157 uint32_t minActiveRequests; 158 159 struct tr_incoming incoming; 147 160 148 161 tr_pex * pex; … … 865 878 { 866 879 uint32_t len; 867 const size_t needlen = sizeof(uint32_t); 868 869 if( EVBUFFER_LENGTH(inbuf) < needlen ) 880 881 if( EVBUFFER_LENGTH(inbuf) < sizeof(len) ) 870 882 return READ_MORE; 871 883 … … 875 887 dbgmsg( msgs, "got KeepAlive" ); 876 888 else { 877 msgs->incomingMessageLength = len; 889 msgs->incoming.length = len; 890 msgs->state = AWAITING_BT_ID; 891 } 892 dbgmsg( msgs, "readBtLength: got a length of %d, msgs->state is now %d", (int)len, (int)msgs->state ); 893 894 return READ_AGAIN; 895 } 896 897 static int 898 readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf ); 899 900 static int 901 readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf ) 902 { 903 uint8_t id; 904 905 if( EVBUFFER_LENGTH(inbuf) < sizeof(uint8_t) ) 906 return READ_MORE; 907 908 tr_peerIoReadUint8( msgs->io, inbuf, &id ); 909 msgs->incoming.id = id; 910 911 if( id==BT_PIECE ) 912 { 913 msgs->state = AWAITING_BT_PIECE; 914 return READ_AGAIN; 915 } 916 else if( msgs->incoming.length != 1 ) 917 { 878 918 msgs->state = AWAITING_BT_MESSAGE; 879 }880 881 return READ_AGAIN;919 return READ_AGAIN; 920 } 921 else return readBtMessage( msgs, inbuf ); 882 922 } 883 923 … … 990 1030 991 1031 static int 1032 readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf ) 1033 { 1034 struct peer_request * req = &msgs->incoming.blockReq; 1035 dbgmsg( msgs, "In readBtPiece" ); 1036 1037 if( !req->length ) 1038 { 1039 if( EVBUFFER_LENGTH( inbuf ) < 8 ) 1040 return READ_MORE; 1041 1042 tr_peerIoReadUint32( msgs->io, inbuf, &req->index ); 1043 tr_peerIoReadUint32( msgs->io, inbuf, &req->offset ); 1044 req->length = msgs->incoming.length - 9; 1045 dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length ); 1046 return READ_AGAIN; 1047 } 1048 else 1049 { 1050 int err; 1051 1052 /* read in another chunk of data */ 1053 const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block); 1054 size_t n = MIN( nLeft, EVBUFFER_LENGTH(inbuf) ); 1055 uint8_t * buf = tr_new( uint8_t, n ); 1056 tr_peerIoReadBytes( msgs->io, inbuf, buf, n ); 1057 evbuffer_add( msgs->incoming.block, buf, n ); 1058 tr_free( buf ); 1059 dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain", 1060 (int)n, req->index, req->offset, req->length, 1061 (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) ); 1062 if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length ) 1063 return READ_MORE; 1064 1065 /* we've got the whole block ... process it */ 1066 err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req ); 1067 1068 /* cleanup */ 1069 evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) ); 1070 req->length = 0; 1071 msgs->state = AWAITING_BT_LENGTH; 1072 if( !err ) 1073 return READ_AGAIN; 1074 else { 1075 fireGotAssertError( msgs ); 1076 return READ_DONE; 1077 } 1078 } 1079 } 1080 1081 static int 992 1082 readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf ) 993 1083 { 994 int ret;995 uint8_t id;996 1084 uint32_t ui32; 997 uint32_t msglen = msgs->incomingMessageLength; 1085 uint32_t msglen = msgs->incoming.length; 1086 const uint8_t id = msgs->incoming.id; 998 1087 const size_t startBufLen = EVBUFFER_LENGTH( inbuf ); 999 1088 1000 if( EVBUFFER_LENGTH(inbuf) < msglen ) 1089 dbgmsg( msgs, "in readBtMessage" ); 1090 1091 --msglen; // id length 1092 1093 if( EVBUFFER_LENGTH(inbuf) < msglen ) { 1094 dbgmsg( msgs, " too short!!! " ); 1001 1095 return READ_MORE; 1002 1003 tr_peerIoReadUint8( msgs->io, inbuf, &id ); 1096 } 1097 1004 1098 dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)EVBUFFER_LENGTH(inbuf) ); 1005 1099 1006 if( !messageLengthIsCorrect( msgs, id, msglen ) )1100 if( !messageLengthIsCorrect( msgs, id, msglen+1 ) ) 1007 1101 { 1008 1102 dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen ); … … 1011 1105 } 1012 1106 1013 --msglen;1014 ret = 0;1015 1107 switch( id ) 1016 1108 { … … 1081 1173 } 1082 1174 1083 case BT_PIECE: { 1084 uint8_t * block; 1085 struct peer_request req; 1086 tr_peerIoReadUint32( msgs->io, inbuf, &req.index ); 1087 tr_peerIoReadUint32( msgs->io, inbuf, &req.offset ); 1088 req.length = msglen - 8; 1089 block = tr_new( uint8_t, req.length ); 1090 tr_peerIoReadBytes( msgs->io, inbuf, block, req.length ); 1091 dbgmsg( msgs, "got a Block %u:%u->%u", req.index, req.offset, req.length ); 1092 ret = clientGotBlock( msgs, block, &req ); 1093 tr_free( block ); 1175 case BT_PIECE: 1176 assert( 0 ); /* should be handled elsewhere! */ 1094 1177 break; 1095 }1096 1178 1097 1179 case BT_PORT: … … 1147 1229 dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id ); 1148 1230 tr_peerIoDrain( msgs->io, inbuf, msglen ); 1149 assert( 0 );1150 1231 break; 1151 1232 } … … 1153 1234 dbgmsg( msgs, "startBufLen was %d, msglen was %d, current inbuf len is %d", (int)startBufLen, (int)(msglen+1), (int)EVBUFFER_LENGTH(inbuf) ); 1154 1235 1155 if( ret == (int)TR_ERROR_ASSERT ) 1156 { 1157 fireGotAssertError( msgs ); 1158 return READ_DONE; 1159 } 1160 else if( ret == TR_OK ) 1161 { 1162 assert( msglen + 1 == msgs->incomingMessageLength ); 1163 assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msgs->incomingMessageLength ); 1164 1165 msgs->incomingMessageLength = -1; 1166 msgs->state = AWAITING_BT_LENGTH; 1167 return READ_AGAIN; 1168 } 1169 else 1170 { 1171 fireGotError( msgs ); 1172 return READ_DONE; 1173 } 1236 assert( msglen + 1 == msgs->incoming.length ); 1237 assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen ); 1238 1239 msgs->state = AWAITING_BT_LENGTH; 1240 return READ_AGAIN; 1174 1241 } 1175 1242 … … 1341 1408 struct evbuffer * inbuf = EVBUFFER_INPUT ( evin ); 1342 1409 1410 dbgmsg( msgs, "canRead, state is %d", msgs->state ); 1343 1411 if( !canDownload( msgs ) ) 1344 1412 { 1413 dbgmsg( msgs, "oh, but canDownload failed" ); 1345 1414 msgs->notListening = 1; 1346 tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );1347 1415 ret = READ_DONE; 1348 1416 } … … 1350 1418 { 1351 1419 case AWAITING_BT_LENGTH: ret = readBtLength ( msgs, inbuf ); break; 1420 case AWAITING_BT_ID: ret = readBtId ( msgs, inbuf ); break; 1352 1421 case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break; 1422 case AWAITING_BT_PIECE: ret = readBtPiece ( msgs, inbuf ); break; 1353 1423 default: assert( 0 ); 1354 1424 } … … 1431 1501 { 1432 1502 msgs->notListening = 0; 1433 tr_peerIo SetIOMode ( msgs->io, EV_READ, 0);1503 tr_peerIoTryRead( msgs->io ); 1434 1504 } 1435 1505 … … 1690 1760 m->info->peerIsInterested = 0; 1691 1761 m->info->have = tr_bitfieldNew( torrent->info.pieceCount ); 1762 m->state = AWAITING_BT_LENGTH; 1692 1763 m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL ); 1693 1764 m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL ); 1694 1765 m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL ); 1695 1766 m->outMessages = evbuffer_new( ); 1767 m->incoming.block = evbuffer_new( ); 1696 1768 m->outBlock = evbuffer_new( ); 1697 1769 m->peerAllowedPieces = NULL; … … 1719 1791 tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */ 1720 1792 tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m ); 1721 tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );1722 1793 ratePulse( m ); 1723 1794 … … 1763 1834 tr_bitfieldFree( msgs->clientAllowedPieces ); 1764 1835 tr_bitfieldFree( msgs->clientSuggestedPieces ); 1836 evbuffer_free( msgs->incoming.block ); 1765 1837 evbuffer_free( msgs->outMessages ); 1766 1838 evbuffer_free( msgs->outBlock );
Note: See TracChangeset
for help on using the changeset viewer.