Changeset 7588
- Timestamp:
- Jan 2, 2009, 11:28:57 PM (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/libtransmission/peer-msgs.c
r7584 r7588 107 107 **/ 108 108 109 enum 110 { 111 AWAITING_BT_LENGTH, 112 AWAITING_BT_ID, 113 AWAITING_BT_MESSAGE, 114 AWAITING_BT_PIECE 115 }; 116 109 117 struct peer_request 110 118 { … … 245 253 struct tr_incoming 246 254 { 247 uint32_t length; /* includes the +1 for id length */ 248 struct evbuffer * block; /* piece data for incoming blocks */ 255 uint8_t id; 256 uint32_t length; /* includes the +1 for id length */ 257 struct peer_request blockReq; /* metadata for incoming blocks */ 258 struct evbuffer * block; /* piece data for incoming blocks */ 249 259 }; 250 260 … … 270 280 tr_bool haveFastSet; 271 281 282 uint8_t state; 272 283 uint8_t ut_pex_id; 273 284 uint16_t pexCount; … … 1268 1279 1269 1280 tr_peerIoReadUint32( msgs->peer->io, inbuf, &len ); 1270 msgs->incoming.length = len;1271 1281 1272 1282 if( len == 0 ) /* peer sent us a keepalive message */ 1273 1283 dbgmsg( msgs, "got KeepAlive" ); 1284 else 1285 { 1286 msgs->incoming.length = len; 1287 msgs->state = AWAITING_BT_ID; 1288 } 1274 1289 1275 1290 return READ_NOW; 1291 } 1292 1293 static int readBtMessage( tr_peermsgs * msgs, 1294 struct evbuffer * inbuf, 1295 size_t inlen ); 1296 1297 static int 1298 readBtId( tr_peermsgs * msgs, 1299 struct evbuffer * inbuf, 1300 size_t inlen ) 1301 { 1302 uint8_t id; 1303 1304 if( inlen < sizeof( uint8_t ) ) 1305 return READ_LATER; 1306 1307 tr_peerIoReadUint8( msgs->peer->io, inbuf, &id ); 1308 msgs->incoming.id = id; 1309 1310 if( id == BT_PIECE ) 1311 { 1312 msgs->state = AWAITING_BT_PIECE; 1313 return READ_NOW; 1314 } 1315 else if( msgs->incoming.length != 1 ) 1316 { 1317 msgs->state = AWAITING_BT_MESSAGE; 1318 return READ_NOW; 1319 } 1320 else return readBtMessage( msgs, inbuf, inlen - 1 ); 1276 1321 } 1277 1322 … … 1362 1407 size_t * setme_piece_bytes_read ) 1363 1408 { 1364 struct peer_request req;1409 struct peer_request * req = &msgs->incoming.blockReq; 1365 1410 1366 1411 assert( EVBUFFER_LENGTH( inbuf ) >= inlen ); 1367 1412 dbgmsg( msgs, "In readBtPiece" ); 1368 1413 1369 tr_peerIoReadUint32( msgs->peer->io, inbuf, &req.index ); 1370 tr_peerIoReadUint32( msgs->peer->io, inbuf, &req.offset ); 1371 req.length = msgs->incoming.length - 9; 1372 dbgmsg( msgs, "got incoming block header %u:%u->%u", req.index, req.offset, req.length ); 1373 1374 { 1414 if( !req->length ) 1415 { 1416 if( inlen < 8 ) 1417 return READ_LATER; 1418 1419 tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index ); 1420 tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset ); 1421 req->length = msgs->incoming.length - 9; 1422 dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length ); 1423 return READ_NOW; 1424 } 1425 else 1426 { 1375 1427 int err; 1376 1428 1377 /* decrypt the whole block in one go */ 1378 evbuffer_expand( msgs->incoming.block, req.length ); 1379 tr_peerIoReadBytes( msgs->peer->io, inbuf, EVBUFFER_DATA( msgs->incoming.block ), req.length ); 1380 EVBUFFER_LENGTH( msgs->incoming.block ) += req.length; 1381 1382 fireClientGotData( msgs, req.length, TRUE ); 1383 *setme_piece_bytes_read += req.length; 1384 dbgmsg( msgs, "got block %u:%u->%u", req.index, req.offset, req.length ); 1385 assert( EVBUFFER_LENGTH( msgs->incoming.block ) == req.length ); 1429 /* read in another chunk of data */ 1430 const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block ); 1431 size_t n = MIN( nLeft, inlen ); 1432 size_t i = n; 1433 1434 while( i > 0 ) 1435 { 1436 uint8_t buf[MAX_STACK_ARRAY_SIZE]; 1437 const size_t thisPass = MIN( i, sizeof( buf ) ); 1438 tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass ); 1439 evbuffer_add( msgs->incoming.block, buf, thisPass ); 1440 i -= thisPass; 1441 } 1442 1443 fireClientGotData( msgs, n, TRUE ); 1444 *setme_piece_bytes_read += n; 1445 dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain", 1446 n, req->index, req->offset, req->length, 1447 (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) ); 1448 if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length ) 1449 return READ_LATER; 1386 1450 1387 1451 /* we've got the whole block ... process it */ 1388 err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), &req );1452 err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req ); 1389 1453 1390 1454 /* cleanup */ 1391 1455 evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) ); 1456 req->length = 0; 1457 msgs->state = AWAITING_BT_LENGTH; 1392 1458 if( !err ) 1393 1459 return READ_NOW; … … 1400 1466 1401 1467 static int 1402 readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen, size_t * piece ) 1403 { 1404 int ret = READ_NOW; 1405 uint8_t id; 1468 readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen ) 1469 { 1406 1470 uint32_t ui32; 1407 1471 uint32_t msglen = msgs->incoming.length; 1472 const uint8_t id = msgs->incoming.id; 1408 1473 const size_t startBufLen = EVBUFFER_LENGTH( inbuf ); 1409 1474 const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io ); 1410 1475 1476 --msglen; /* id length */ 1477 1411 1478 if( inlen < msglen ) 1412 1479 return READ_LATER; 1413 1480 1414 tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );1415 1416 1481 dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen ); 1417 1482 1418 if( !messageLengthIsCorrect( msgs, id, msglen ) )1483 if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) ) 1419 1484 { 1420 1485 dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen ); … … 1422 1487 return READ_ERR; 1423 1488 } 1424 1425 --msglen;1426 1489 1427 1490 switch( id ) … … 1495 1558 1496 1559 case BT_PIECE: 1497 ret = readBtPiece( msgs, inbuf, msglen, piece );1560 assert( 0 ); /* handled elsewhere! */ 1498 1561 break; 1499 1562 … … 1574 1637 } 1575 1638 1576 assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen - 1 ); 1577 1578 msgs->incoming.length = 0; 1579 return ret; 1639 assert( msglen + 1 == msgs->incoming.length ); 1640 assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen ); 1641 1642 msgs->state = AWAITING_BT_LENGTH; 1643 return READ_NOW; 1580 1644 } 1581 1645 … … 1678 1742 1679 1743 if( !inlen ) 1680 return READ_LATER; 1681 1682 /* Incoming data is processed in two stages. First the length is read 1683 * and then readBtMessage() waits until all the data has arrived in 1684 * the input buffer before starting to parse it */ 1685 if( msgs->incoming.length == 0 ) 1686 ret = readBtLength ( msgs, in, inlen ); 1687 else 1688 ret = readBtMessage( msgs, in, inlen, piece ); 1744 { 1745 ret = READ_LATER; 1746 } 1747 else if( msgs->state == AWAITING_BT_PIECE ) 1748 { 1749 ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER; 1750 } 1751 else switch( msgs->state ) 1752 { 1753 case AWAITING_BT_LENGTH: 1754 ret = readBtLength ( msgs, in, inlen ); break; 1755 1756 case AWAITING_BT_ID: 1757 ret = readBtId ( msgs, in, inlen ); break; 1758 1759 case AWAITING_BT_MESSAGE: 1760 ret = readBtMessage( msgs, in, inlen ); break; 1761 1762 default: 1763 assert( 0 ); 1764 } 1689 1765 1690 1766 /* log the raw data that was read */ … … 2166 2242 m->peer->peerIsInterested = 0; 2167 2243 m->peer->have = tr_bitfieldNew( torrent->info.pieceCount ); 2244 m->state = AWAITING_BT_LENGTH; 2168 2245 m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL ); 2169 2246 m->outMessages = evbuffer_new( );
Note: See TracChangeset
for help on using the changeset viewer.