Changeset 7455 for branches/1.4x/libtransmission/peer-msgs.c
- Timestamp:
- Dec 22, 2008, 12:51:14 AM (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/1.4x/libtransmission/peer-msgs.c
r7354 r7455 25 25 #include "crypto.h" 26 26 #include "inout.h" 27 #include "iobuf.h"28 27 #ifdef WIN32 29 28 #include "net.h" /* for ECONN */ … … 55 54 BT_CANCEL = 8, 56 55 BT_PORT = 9, 57 BT_SUGGEST = 13, 58 BT_HAVE_ALL = 14, 59 BT_HAVE_NONE = 15, 60 BT_REJECT = 16, 61 BT_ALLOWED_FAST = 17, 56 57 BT_FEXT_SUGGEST = 13, 58 BT_FEXT_HAVE_ALL = 14, 59 BT_FEXT_HAVE_NONE = 15, 60 BT_FEXT_REJECT = 16, 61 BT_FEXT_ALLOWED_FAST = 17, 62 62 63 BT_LTEP = 20, 63 64 … … 65 66 66 67 TR_LTEP_PEX = 1, 68 69 67 70 68 71 MIN_CHOKE_PERIOD_SEC = ( 10 ), … … 72 75 73 76 PEX_INTERVAL = ( 90 * 1000 ), /* msec between sendPex() calls */ 74 PEER_PULSE_INTERVAL = ( 250 ), /* msec between peerPulse() calls75 */76 77 77 78 MAX_QUEUE_SIZE = ( 100 ), 78 79 /* (fast peers) max number of pieces we fast-allow to another peer */80 MAX_FAST_ALLOWED_COUNT = 10,81 82 /* (fast peers) max threshold for allowing fast-pieces requests */83 MAX_FAST_ALLOWED_THRESHOLD = 10,84 79 85 80 /* how long an unsent request can stay queued before it's returned … … 98 93 /* number of pieces to remove from the bitfield when 99 94 * lazy bitfields are turned on */ 100 LAZY_PIECE_COUNT = 26 95 LAZY_PIECE_COUNT = 26, 96 97 /* number of pieces we'll allow in our fast set */ 98 MAX_FAST_SET_SIZE = 3 101 99 }; 102 100 … … 122 120 123 121 static int 124 compareRequest( const void * va, 125 const void * vb ) 122 compareRequest( const void * va, const void * vb ) 126 123 { 127 124 const struct peer_request * a = va; … … 170 167 171 168 static void 172 reqListCopy( struct request_list * dest, 173 const struct request_list * src ) 169 reqListCopy( struct request_list * dest, const struct request_list * src ) 174 170 { 175 171 dest->count = dest->max = src->count; 176 dest->requests = 177 tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) ); 172 dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) ); 178 173 } 179 174 … … 185 180 186 181 memmove( &list->requests[i], 187 &list->requests[i + 1],188 sizeof( struct peer_request ) * ( --list->count - i ) );182 &list->requests[i + 1], 183 sizeof( struct peer_request ) * ( --list->count - i ) ); 189 184 } 190 185 … … 260 255 }; 261 256 257 /** 258 * Low-level communication state information about a connected peer. 259 * 260 * This structure remembers the low-level protocol states that we're 261 * in with this peer, such as active requests, pex messages, and so on. 262 * Its fields are all private to peer-msgs.c. 263 * 264 * Data not directly involved with sending & receiving messages is 265 * stored in tr_peer, where it can be accessed by both peermsgs and 266 * the peer manager. 267 * 268 * @see struct peer_atom 269 * @see tr_peer 270 */ 262 271 struct tr_peermsgs 263 272 { … … 266 275 tr_bool clientSentLtepHandshake; 267 276 tr_bool peerSentLtepHandshake; 277 tr_bool haveFastSet; 268 278 269 279 uint8_t state; 270 280 uint8_t ut_pex_id; 271 281 uint16_t pexCount; 272 uint16_t minActiveRequests;273 282 uint16_t maxActiveRequests; 283 284 size_t fastsetSize; 285 tr_piece_index_t fastset[MAX_FAST_SET_SIZE]; 274 286 275 287 /* how long the outMessages batch should be allowed to grow before … … 278 290 int outMessagesBatchPeriod; 279 291 280 tr_peer * info;292 tr_peer * peer; 281 293 282 294 tr_session * session; 283 295 tr_torrent * torrent; 284 tr_peerIo * io;285 296 286 297 tr_publisher_t * publisher; … … 289 300 290 301 struct request_list peerAskedFor; 291 struct request_list peerAskedForFast;292 302 struct request_list clientAskedFor; 293 303 struct request_list clientWillAskFor; 294 304 295 tr_timer * pexTimer; 305 tr_timer * pexTimer; 306 tr_pex * pex; 296 307 297 308 time_t clientSentPexAt; … … 301 312 time_t outMessagesBatchedAt; 302 313 303 tr_bitfield * peerAllowedPieces;304 305 314 struct tr_incoming incoming; 306 307 tr_pex * pex;308 315 309 316 /* if the peer supports the Extension Protocol in BEP 10 and … … 318 325 319 326 static void 320 myDebug( const char * file, 321 int line, 327 myDebug( const char * file, int line, 322 328 const struct tr_peermsgs * msgs, 323 const char * fmt, 324 ... ) 329 const char * fmt, ... ) 325 330 { 326 331 FILE * fp = tr_getLog( ); … … 336 341 tr_getLogTimeStr( timestr, sizeof( timestr ) ), 337 342 msgs->torrent->info.name, 338 tr_peerIoGetAddrStr( msgs-> io ),339 msgs-> info->client );343 tr_peerIoGetAddrStr( msgs->peer->io ), 344 msgs->peer->client ); 340 345 va_start( args, fmt ); 341 346 evbuffer_add_vprintf( buf, fmt, args ); … … 371 376 372 377 static void 373 protocolSendRequest( tr_peermsgs * msgs, 378 dbgOutMessageLen( tr_peermsgs * msgs ) 379 { 380 dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) ); 381 } 382 383 static void 384 protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req ) 385 { 386 tr_peerIo * io = msgs->peer->io; 387 struct evbuffer * out = msgs->outMessages; 388 389 assert( tr_peerIoSupportsFEXT( msgs->peer->io ) ); 390 391 tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) ); 392 tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT ); 393 tr_peerIoWriteUint32( io, out, req->index ); 394 tr_peerIoWriteUint32( io, out, req->offset ); 395 tr_peerIoWriteUint32( io, out, req->length ); 396 397 dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length ); 398 dbgOutMessageLen( msgs ); 399 } 400 401 static void 402 protocolSendRequest( tr_peermsgs * msgs, 374 403 const struct peer_request * req ) 375 404 { 376 tr_peerIo * io = msgs->io;405 tr_peerIo * io = msgs->peer->io; 377 406 struct evbuffer * out = msgs->outMessages; 378 407 … … 382 411 tr_peerIoWriteUint32( io, out, req->offset ); 383 412 tr_peerIoWriteUint32( io, out, req->length ); 384 dbgmsg( msgs, "requesting %u:%u->%u... outMessage size is now %d", 385 req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) ); 413 414 dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length ); 415 dbgOutMessageLen( msgs ); 386 416 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 387 417 } 388 418 389 419 static void 390 protocolSendCancel( tr_peermsgs *msgs,420 protocolSendCancel( tr_peermsgs * msgs, 391 421 const struct peer_request * req ) 392 422 { 393 tr_peerIo * io = msgs->io;423 tr_peerIo * io = msgs->peer->io; 394 424 struct evbuffer * out = msgs->outMessages; 395 425 … … 399 429 tr_peerIoWriteUint32( io, out, req->offset ); 400 430 tr_peerIoWriteUint32( io, out, req->length ); 401 dbgmsg( msgs, "cancelling %u:%u->%u... outMessage size is now %d", 402 req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) ); 431 432 dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length ); 433 dbgOutMessageLen( msgs ); 403 434 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 404 435 } … … 408 439 uint32_t index ) 409 440 { 410 tr_peerIo * io = msgs->io;441 tr_peerIo * io = msgs->peer->io; 411 442 struct evbuffer * out = msgs->outMessages; 412 443 413 tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + sizeof( uint32_t) );444 tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) ); 414 445 tr_peerIoWriteUint8 ( io, out, BT_HAVE ); 415 446 tr_peerIoWriteUint32( io, out, index ); 416 dbgmsg( msgs, "sending Have %u.. outMessage size is now %d", 417 index, (int)EVBUFFER_LENGTH( out ) ); 447 448 dbgmsg( msgs, "sending Have %u", index ); 449 dbgOutMessageLen( msgs ); 418 450 pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS ); 419 451 } 452 453 #if 0 454 static void 455 protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex ) 456 { 457 tr_peerIo * io = msgs->peer->io; 458 struct evbuffer * out = msgs->outMessages; 459 460 assert( tr_peerIoSupportsFEXT( msgs->peer->io ) ); 461 462 tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) ); 463 tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST ); 464 tr_peerIoWriteUint32( io, out, pieceIndex ); 465 466 dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex ); 467 dbgOutMessageLen( msgs ); 468 } 469 #endif 420 470 421 471 static void … … 423 473 int choke ) 424 474 { 425 tr_peerIo * io = msgs->io;475 tr_peerIo * io = msgs->peer->io; 426 476 struct evbuffer * out = msgs->outMessages; 427 477 428 478 tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) ); 429 479 tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE ); 430 dbgmsg( msgs, "sending %s... outMessage size is now %d", 431 ( choke ? "Choke" : "Unchoke" ), 432 (int)EVBUFFER_LENGTH( out ) ); 480 481 dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" ); 482 dbgOutMessageLen( msgs ); 483 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 484 } 485 486 static void 487 protocolSendHaveAll( tr_peermsgs * msgs ) 488 { 489 tr_peerIo * io = msgs->peer->io; 490 struct evbuffer * out = msgs->outMessages; 491 492 assert( tr_peerIoSupportsFEXT( msgs->peer->io ) ); 493 494 tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) ); 495 tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL ); 496 497 dbgmsg( msgs, "sending HAVE_ALL..." ); 498 dbgOutMessageLen( msgs ); 499 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 500 } 501 502 static void 503 protocolSendHaveNone( tr_peermsgs * msgs ) 504 { 505 tr_peerIo * io = msgs->peer->io; 506 struct evbuffer * out = msgs->outMessages; 507 508 assert( tr_peerIoSupportsFEXT( msgs->peer->io ) ); 509 510 tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) ); 511 tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE ); 512 513 dbgmsg( msgs, "sending HAVE_NONE..." ); 514 dbgOutMessageLen( msgs ); 433 515 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 434 516 } … … 438 520 **/ 439 521 440 static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 }; 441 442 static void 443 publish( tr_peermsgs * msgs, 444 tr_peer_event * e ) 445 { 446 tr_publisherPublish( msgs->publisher, msgs->info, e ); 447 } 448 449 static void 450 fireError( tr_peermsgs * msgs, 451 int err ) 522 static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 }; 523 524 static void 525 publish( tr_peermsgs * msgs, tr_peer_event * e ) 526 { 527 assert( msgs->peer ); 528 assert( msgs->peer->msgs == msgs ); 529 530 tr_publisherPublish( msgs->publisher, msgs->peer, e ); 531 } 532 533 static void 534 fireError( tr_peermsgs * msgs, int err ) 452 535 { 453 536 tr_peer_event e = blankEvent; 454 455 537 e.eventType = TR_PEER_ERROR; 456 538 e.err = err; … … 459 541 460 542 static void 543 fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly ) 544 { 545 tr_peer_event e = blankEvent; 546 e.eventType = TR_PEER_UPLOAD_ONLY; 547 e.uploadOnly = uploadOnly; 548 publish( msgs, &e ); 549 } 550 551 static void 461 552 fireNeedReq( tr_peermsgs * msgs ) 462 553 { 463 554 tr_peer_event e = blankEvent; 464 465 555 e.eventType = TR_PEER_NEED_REQ; 466 556 publish( msgs, &e ); … … 471 561 { 472 562 tr_peer_event e = blankEvent; 473 474 563 e.eventType = TR_PEER_PEER_PROGRESS; 475 e.progress = msgs-> info->progress;564 e.progress = msgs->peer->progress; 476 565 publish( msgs, &e ); 477 566 } 478 567 479 568 static void 480 fireGotBlock( tr_peermsgs * msgs, 481 const struct peer_request * req ) 569 fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req ) 482 570 { 483 571 tr_peer_event e = blankEvent; 484 485 572 e.eventType = TR_PEER_CLIENT_GOT_BLOCK; 486 573 e.pieceIndex = req->index; … … 500 587 e.eventType = TR_PEER_CLIENT_GOT_DATA; 501 588 e.wasPieceData = wasPieceData; 589 publish( msgs, &e ); 590 } 591 592 static void 593 fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex ) 594 { 595 tr_peer_event e = blankEvent; 596 e.eventType = TR_PEER_CLIENT_GOT_SUGGEST; 597 e.pieceIndex = pieceIndex; 598 publish( msgs, &e ); 599 } 600 601 static void 602 fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex ) 603 { 604 tr_peer_event e = blankEvent; 605 e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST; 606 e.pieceIndex = pieceIndex; 502 607 publish( msgs, &e ); 503 608 } … … 529 634 530 635 /** 636 *** ALLOWED FAST SET 637 *** For explanation, see http://www.bittorrent.org/beps/bep_0006.html 638 **/ 639 640 size_t 641 tr_generateAllowedSet( tr_piece_index_t * setmePieces, 642 size_t desiredSetSize, 643 size_t pieceCount, 644 const uint8_t * infohash, 645 const tr_address * addr ) 646 { 647 size_t setSize = 0; 648 649 assert( setmePieces ); 650 assert( desiredSetSize <= pieceCount ); 651 assert( desiredSetSize ); 652 assert( pieceCount ); 653 assert( infohash ); 654 assert( addr ); 655 656 if( 1 ) 657 { 658 uint8_t w[SHA_DIGEST_LENGTH + 4]; 659 uint8_t x[SHA_DIGEST_LENGTH]; 660 661 *(uint32_t*)w = ntohl( htonl( addr->s_addr ) & 0xffffff00 ); /* (1) */ 662 memcpy( w + 4, infohash, SHA_DIGEST_LENGTH ); /* (2) */ 663 tr_sha1( x, w, sizeof( w ), NULL ); /* (3) */ 664 665 while( setSize<desiredSetSize ) 666 { 667 int i; 668 for( i=0; i<5 && setSize<desiredSetSize; ++i ) /* (4) */ 669 { 670 size_t k; 671 uint32_t j = i * 4; /* (5) */ 672 uint32_t y = ntohl( *( uint32_t* )( x + j ) ); /* (6) */ 673 uint32_t index = y % pieceCount; /* (7) */ 674 675 for( k=0; k<setSize; ++k ) /* (8) */ 676 if( setmePieces[k] == index ) 677 break; 678 679 if( k == setSize ) 680 setmePieces[setSize++] = index; /* (9) */ 681 } 682 683 tr_sha1( x, x, sizeof( x ), NULL ); /* (3) */ 684 } 685 } 686 687 return setSize; 688 } 689 690 static void 691 updateFastSet( tr_peermsgs * msgs UNUSED ) 692 { 693 #if 0 694 const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io ); 695 const int peerIsNeedy = msgs->peer->progress < 0.10; 696 697 if( fext && peerIsNeedy && !msgs->haveFastSet ) 698 { 699 size_t i; 700 const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL ); 701 const tr_info * inf = &msgs->torrent->info; 702 const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount ); 703 704 /* build the fast set */ 705 msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr ); 706 msgs->haveFastSet = 1; 707 708 /* send it to the peer */ 709 for( i=0; i<msgs->fastsetSize; ++i ) 710 protocolSendAllowedFast( msgs, msgs->fastset[i] ); 711 } 712 #endif 713 } 714 715 /** 531 716 *** INTEREST 532 717 **/ 533 718 534 719 static int 535 isPieceInteresting( const tr_peermsgs * peer,720 isPieceInteresting( const tr_peermsgs * msgs, 536 721 tr_piece_index_t piece ) 537 722 { 538 const tr_torrent * torrent = peer->torrent;723 const tr_torrent * torrent = msgs->torrent; 539 724 540 725 return ( !torrent->info.pieces[piece].dnd ) /* we want it */ 541 && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have 542 */ 543 && ( tr_bitfieldHas( peer->info->have, piece ) ); /* peer has it */ 726 && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */ 727 && ( tr_bitfieldHas( msgs->peer->have, piece ) ); /* peer has it */ 544 728 } 545 729 … … 562 746 bitfield = tr_cpPieceBitfield( torrent->completion ); 563 747 564 if( !msgs-> info->have )748 if( !msgs->peer->have ) 565 749 return TRUE; 566 750 567 assert( bitfield->byteCount == msgs-> info->have->byteCount );751 assert( bitfield->byteCount == msgs->peer->have->byteCount ); 568 752 569 753 for( i = 0; i < torrent->info.pieceCount; ++i ) … … 583 767 assert( weAreInterested == 0 || weAreInterested == 1 ); 584 768 585 msgs->info->clientIsInterested = weAreInterested; 586 dbgmsg( msgs, "Sending %s", 587 weAreInterested ? "Interested" : "Not Interested" ); 588 tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) ); 589 tr_peerIoWriteUint8 ( 590 msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED ); 769 msgs->peer->clientIsInterested = weAreInterested; 770 dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" ); 771 tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) ); 772 tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED ); 773 591 774 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS ); 592 dbg msg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ));775 dbgOutMessageLen( msgs ); 593 776 } 594 777 … … 598 781 const int i = isPeerInteresting( msgs ); 599 782 600 if( i != msgs-> info->clientIsInterested )783 if( i != msgs->peer->clientIsInterested ) 601 784 sendInterest( msgs, i ); 602 785 if( i ) … … 604 787 } 605 788 606 static void 607 cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs ) 608 { 609 reqListClear( &msgs->peerAskedFor ); 789 static int 790 popNextRequest( tr_peermsgs * msgs, 791 struct peer_request * setme ) 792 { 793 return reqListPop( &msgs->peerAskedFor, setme ); 794 } 795 796 static void 797 cancelAllRequestsToClient( tr_peermsgs * msgs ) 798 { 799 struct peer_request req; 800 const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io ); 801 802 while( popNextRequest( msgs, &req ) ) 803 if( mustSendCancel ) 804 protocolSendReject( msgs, &req ); 610 805 } 611 806 … … 618 813 619 814 assert( msgs ); 620 assert( msgs-> info);815 assert( msgs->peer ); 621 816 assert( choke == 0 || choke == 1 ); 622 817 623 if( msgs->info->chokeChangedAt > fibrillationTime ) 624 { 625 dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", 626 choke ); 627 } 628 else if( msgs->info->peerIsChoked != choke ) 629 { 630 msgs->info->peerIsChoked = choke; 818 if( msgs->peer->chokeChangedAt > fibrillationTime ) 819 { 820 dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke ); 821 } 822 else if( msgs->peer->peerIsChoked != choke ) 823 { 824 msgs->peer->peerIsChoked = choke; 631 825 if( choke ) 632 cancelAllRequestsToClient ExceptFast( msgs );826 cancelAllRequestsToClient( msgs ); 633 827 protocolSendChoke( msgs, choke ); 634 msgs-> info->chokeChangedAt = now;828 msgs->peer->chokeChangedAt = now; 635 829 } 636 830 } … … 648 842 /* since we have more pieces now, we might not be interested in this peer */ 649 843 updateInterest( msgs ); 650 }651 652 #if 0653 static void654 sendFastSuggest( tr_peermsgs * msgs,655 uint32_t pieceIndex )656 {657 assert( msgs );658 659 if( tr_peerIoSupportsFEXT( msgs->io ) )660 {661 tr_peerIoWriteUint32( msgs->io, msgs->outMessages,662 sizeof( uint8_t ) + sizeof( uint32_t ) );663 tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );664 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );665 }666 }667 668 static void669 sendFastHave( tr_peermsgs * msgs,670 int all )671 {672 assert( msgs );673 674 if( tr_peerIoSupportsFEXT( msgs->io ) )675 {676 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof( uint8_t ) );677 tr_peerIoWriteUint8( msgs->io, msgs->outMessages,678 ( all ? BT_HAVE_ALL679 : BT_HAVE_NONE ) );680 updateInterest( msgs );681 }682 }683 684 #endif685 686 static void687 sendFastReject( tr_peermsgs * msgs,688 uint32_t pieceIndex,689 uint32_t offset,690 uint32_t length )691 {692 assert( msgs );693 694 if( tr_peerIoSupportsFEXT( msgs->io ) )695 {696 struct evbuffer * out = msgs->outMessages;697 const uint32_t len = sizeof( uint8_t ) + 3 * sizeof( uint32_t );698 dbgmsg( msgs, "sending fast reject %u:%u->%u", pieceIndex, offset,699 length );700 tr_peerIoWriteUint32( msgs->io, out, len );701 tr_peerIoWriteUint8( msgs->io, out, BT_REJECT );702 tr_peerIoWriteUint32( msgs->io, out, pieceIndex );703 tr_peerIoWriteUint32( msgs->io, out, offset );704 tr_peerIoWriteUint32( msgs->io, out, length );705 pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );706 dbgmsg( msgs, "outMessage size is now %d",707 (int)EVBUFFER_LENGTH( out ) );708 }709 }710 711 static tr_bitfield*712 getPeerAllowedPieces( tr_peermsgs * msgs )713 {714 if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )715 {716 msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(717 MAX_FAST_ALLOWED_COUNT,718 msgs->torrent->info.pieceCount,719 msgs->torrent->info.hash,720 tr_peerIoGetAddress( msgs->io, NULL ) );721 }722 723 return msgs->peerAllowedPieces;724 }725 726 static void727 sendFastAllowed( tr_peermsgs * msgs,728 uint32_t pieceIndex )729 {730 assert( msgs );731 732 if( tr_peerIoSupportsFEXT( msgs->io ) )733 {734 struct evbuffer * out = msgs->outMessages;735 dbgmsg( msgs, "sending fast allowed" );736 tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) +737 sizeof( uint32_t ) );738 tr_peerIoWriteUint8( msgs->io, out, BT_ALLOWED_FAST );739 tr_peerIoWriteUint32( msgs->io, out, pieceIndex );740 pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );741 dbgmsg( msgs, "outMessage size is now %d",742 (int)EVBUFFER_LENGTH( out ) );743 }744 }745 746 static void747 sendFastAllowedSet( tr_peermsgs * msgs )748 {749 tr_piece_index_t i = 0;750 751 while( i <= msgs->torrent->info.pieceCount )752 {753 if( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i ) )754 sendFastAllowed( msgs, i );755 i++;756 }757 }758 759 static void760 maybeSendFastAllowedSet( tr_peermsgs * msgs )761 {762 if( tr_bitfieldCountTrueBits( msgs->info->have ) <=763 MAX_FAST_ALLOWED_THRESHOLD )764 sendFastAllowedSet( msgs );765 844 } 766 845 … … 787 866 expireOldRequests( tr_peermsgs * msgs, const time_t now ) 788 867 { 789 int 790 time_t 868 int i; 869 time_t oldestAllowed; 791 870 struct request_list tmp = REQUEST_LIST_INIT; 871 const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io ); 872 dbgmsg( msgs, "entering `expire old requests' block" ); 792 873 793 874 /* cancel requests that have been queued for too long */ 794 875 oldestAllowed = now - QUEUED_REQUEST_TTL_SECS; 795 876 reqListCopy( &tmp, &msgs->clientWillAskFor ); 796 for( i = 0; i < tmp.count; ++i ) 797 { 877 for( i=0; i<tmp.count; ++i ) { 798 878 const struct peer_request * req = &tmp.requests[i]; 799 879 if( req->time_requested < oldestAllowed ) … … 802 882 reqListClear( &tmp ); 803 883 804 /* cancel requests that were sent too long ago */ 805 oldestAllowed = now - SENT_REQUEST_TTL_SECS; 806 reqListCopy( &tmp, &msgs->clientAskedFor ); 807 for( i = 0; i < tmp.count; ++i ) 808 { 809 const struct peer_request * req = &tmp.requests[i]; 810 if( req->time_requested < oldestAllowed ) 811 tr_peerMsgsCancel( msgs, req->index, req->offset, req->length ); 812 } 813 reqListClear( &tmp ); 884 /* if the peer doesn't support "Reject Request", 885 * cancel requests that were sent too long ago. */ 886 if( !fext ) { 887 oldestAllowed = now - SENT_REQUEST_TTL_SECS; 888 reqListCopy( &tmp, &msgs->clientAskedFor ); 889 for( i=0; i<tmp.count; ++i ) { 890 const struct peer_request * req = &tmp.requests[i]; 891 if( req->time_requested < oldestAllowed ) 892 tr_peerMsgsCancel( msgs, req->index, req->offset, req->length ); 893 } 894 reqListClear( &tmp ); 895 } 896 897 dbgmsg( msgs, "leaving `expire old requests' block" ); 814 898 } 815 899 … … 818 902 { 819 903 const int max = msgs->maxActiveRequests; 820 const int min = msgs->minActiveRequests;821 904 int sent = 0; 822 905 int count = msgs->clientAskedFor.count; 823 906 struct peer_request req; 824 907 825 if( count > min ) 826 return; 827 if( msgs->info->clientIsChoked ) 908 if( msgs->peer->clientIsChoked ) 828 909 return; 829 910 if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) ) … … 832 913 while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) ) 833 914 { 834 const tr_block_index_t block = 835 _tr_block( msgs->torrent, req.index, req.offset ); 915 const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset ); 836 916 837 917 assert( requestIsValid( msgs, &req ) ); 838 assert( tr_bitfieldHas( msgs-> info->have, req.index ) );918 assert( tr_bitfieldHas( msgs->peer->have, req.index ) ); 839 919 840 920 /* don't ask for it if we've already got it... this block may have … … 852 932 853 933 if( sent ) 854 dbgmsg( msgs, 855 "pump sent %d requests, now have %d active and %d queued", 856 sent, 857 msgs->clientAskedFor.count, 858 msgs->clientWillAskFor.count ); 934 dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued", 935 sent, msgs->clientAskedFor.count, msgs->clientWillAskFor.count ); 859 936 860 937 if( count < max ) … … 886 963 887 964 /* don't send requests to choked clients */ 888 if( msgs->info->clientIsChoked ) 889 { 965 if( msgs->peer->clientIsChoked ) { 890 966 dbgmsg( msgs, "declining request because they're choking us" ); 891 967 return TR_ADDREQ_CLIENT_CHOKED; … … 893 969 894 970 /* peer doesn't have this piece */ 895 if( !tr_bitfieldHas( msgs-> info->have, index ) )971 if( !tr_bitfieldHas( msgs->peer->have, index ) ) 896 972 return TR_ADDREQ_MISSING; 897 973 … … 929 1005 cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel ) 930 1006 { 931 int 1007 int i; 932 1008 struct request_list a = msgs->clientWillAskFor; 933 1009 struct request_list b = msgs->clientAskedFor; … … 969 1045 /* if it's only in the queue and hasn't been sent yet, free it */ 970 1046 if( reqListRemove( &msgs->clientWillAskFor, &req ) ) { 971 dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32 "\n", pieceIndex, offset, length );1047 dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length ); 972 1048 fireCancelledReq( msgs, &req ); 973 1049 } … … 975 1051 /* if it's already been sent, send a cancel message too */ 976 1052 if( reqListRemove( &msgs->clientAskedFor, &req ) ) { 977 dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32 "\n", pieceIndex, offset, length );1053 dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length ); 978 1054 protocolSendCancel( msgs, &req ); 979 1055 fireCancelledReq( msgs, &req ); 980 1056 } 981 1057 } 1058 982 1059 983 1060 /** … … 988 1065 sendLtepHandshake( tr_peermsgs * msgs ) 989 1066 { 990 tr_benc 991 char * 992 int 993 int 1067 tr_benc val, *m; 1068 char * buf; 1069 int len; 1070 int pex; 994 1071 struct evbuffer * out = msgs->outMessages; 995 1072 … … 1008 1085 pex = 1; 1009 1086 1010 tr_bencInitDict( &val, 4 ); 1011 tr_bencDictAddInt( &val, "e", 1012 msgs->session->encryptionMode != TR_CLEAR_PREFERRED ); 1087 tr_bencInitDict( &val, 5 ); 1088 tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED ); 1013 1089 tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) ); 1090 tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) ); 1014 1091 tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX ); 1015 1092 m = tr_bencDictAddDict( &val, "m", 1 ); … … 1018 1095 buf = tr_bencSave( &val, &len ); 1019 1096 1020 tr_peerIoWriteUint32( msgs-> io, out, 2 * sizeof( uint8_t ) + len );1021 tr_peerIoWriteUint8 ( msgs-> io, out, BT_LTEP );1022 tr_peerIoWriteUint8 ( msgs-> io, out, LTEP_HANDSHAKE );1023 tr_peerIoWriteBytes ( msgs-> io, out, buf, len );1097 tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len ); 1098 tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP ); 1099 tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE ); 1100 tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len ); 1024 1101 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 1025 dbg msg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ));1102 dbgOutMessageLen( msgs ); 1026 1103 1027 1104 /* cleanup */ … … 1039 1116 uint8_t * tmp = tr_new( uint8_t, len ); 1040 1117 1041 tr_peerIoReadBytes( msgs-> io, inbuf, tmp, len );1118 tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len ); 1042 1119 msgs->peerSentLtepHandshake = 1; 1043 1120 1044 if( tr_bencLoad( tmp, len, &val, NULL ) || val.type != TYPE_DICT)1121 if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) ) 1045 1122 { 1046 1123 dbgmsg( msgs, "GET extended-handshake, couldn't get dictionary" ); … … 1053 1130 /* does the peer prefer encrypted connections? */ 1054 1131 if( tr_bencDictFindInt( &val, "e", &i ) ) 1055 msgs-> info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES1056 : ENCRYPTION_PREFERENCE_NO;1132 msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES 1133 : ENCRYPTION_PREFERENCE_NO; 1057 1134 1058 1135 /* check supported messages for utorrent pex */ 1059 1136 msgs->peerSupportsPex = 0; 1060 if( tr_bencDictFindDict( &val, "m", &sub ) ) 1061 { 1062 if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) 1063 { 1137 if( tr_bencDictFindDict( &val, "m", &sub ) ) { 1138 if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) { 1064 1139 msgs->ut_pex_id = (uint8_t) i; 1065 1140 msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1; … … 1068 1143 } 1069 1144 1145 /* look for upload_only (BEP 21) */ 1146 if( tr_bencDictFindInt( &val, "upload_only", &i ) ) 1147 fireUploadOnly( msgs, i!=0 ); 1148 1070 1149 /* get peer's listening port */ 1071 if( tr_bencDictFindInt( &val, "p", &i ) ) 1072 { 1073 msgs->info->port = htons( (uint16_t)i ); 1074 dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port ); 1150 if( tr_bencDictFindInt( &val, "p", &i ) ) { 1151 msgs->peer->port = htons( (uint16_t)i ); 1152 dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port ); 1075 1153 } 1076 1154 … … 1084 1162 1085 1163 static void 1086 parseUtPex( tr_peermsgs * msgs, 1087 int msglen, 1088 struct evbuffer * inbuf ) 1089 { 1090 int loaded = 0; 1091 uint8_t * tmp = tr_new( uint8_t, msglen ); 1092 tr_benc val; 1164 parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf ) 1165 { 1166 int loaded = 0; 1167 uint8_t * tmp = tr_new( uint8_t, msglen ); 1168 tr_benc val; 1093 1169 const tr_torrent * tor = msgs->torrent; 1094 const uint8_t * 1095 size_t 1096 1097 tr_peerIoReadBytes( msgs-> io, inbuf, tmp, msglen );1170 const uint8_t * added; 1171 size_t added_len; 1172 1173 tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen ); 1098 1174 1099 1175 if( tr_torrentAllowsPex( tor ) … … 1129 1205 uint8_t ltep_msgid; 1130 1206 1131 tr_peerIoReadUint8( msgs-> io, inbuf, <ep_msgid );1207 tr_peerIoReadUint8( msgs->peer->io, inbuf, <ep_msgid ); 1132 1208 msglen--; 1133 1209 … … 1136 1212 dbgmsg( msgs, "got ltep handshake" ); 1137 1213 parseLtepHandshake( msgs, msglen, inbuf ); 1138 if( tr_peerIoSupportsLTEP( msgs-> io ) )1214 if( tr_peerIoSupportsLTEP( msgs->peer->io ) ) 1139 1215 { 1140 1216 sendLtepHandshake( msgs ); … … 1165 1241 return READ_LATER; 1166 1242 1167 tr_peerIoReadUint32( msgs-> io, inbuf, &len );1243 tr_peerIoReadUint32( msgs->peer->io, inbuf, &len ); 1168 1244 1169 1245 if( len == 0 ) /* peer sent us a keepalive message */ … … 1192 1268 return READ_LATER; 1193 1269 1194 tr_peerIoReadUint8( msgs-> io, inbuf, &id );1270 tr_peerIoReadUint8( msgs->peer->io, inbuf, &id ); 1195 1271 msgs->incoming.id = id; 1196 1272 … … 1211 1287 updatePeerProgress( tr_peermsgs * msgs ) 1212 1288 { 1213 msgs-> info->progress = tr_bitfieldCountTrueBits( msgs->info->have )1214 / (float)msgs->torrent->info.pieceCount;1215 dbgmsg( msgs, "peer progress is %f", msgs->info->progress );1289 msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount; 1290 dbgmsg( msgs, "peer progress is %f", msgs->peer->progress ); 1291 updateFastSet( msgs ); 1216 1292 updateInterest( msgs ); 1217 1293 firePeerProgress( msgs ); 1218 1294 } 1219 1295 1220 static int1221 clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )1222 {1223 /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */1224 if( tr_bitfieldCountTrueBits( msgs->info->have ) >1225 MAX_FAST_ALLOWED_THRESHOLD )1226 return FALSE;1227 1228 /* ...or if we don't have ourself enough pieces */1229 if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->1230 completion ) ) <1231 MAX_FAST_ALLOWED_THRESHOLD )1232 return FALSE;1233 1234 /* Maybe a bandwidth limit ? */1235 return TRUE;1236 }1237 1238 1296 static void 1239 1297 peerMadeRequest( tr_peermsgs * msgs, 1240 1298 const struct peer_request * req ) 1241 1299 { 1300 const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io ); 1242 1301 const int reqIsValid = requestIsValid( msgs, req ); 1243 const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( 1244 msgs->torrent->completion, req->index ); 1245 const int peerIsChoked = msgs->info->peerIsChoked; 1246 const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io ); 1247 const int pieceIsFast = reqIsValid && tr_bitfieldHas( 1248 getPeerAllowedPieces( msgs ), req->index ); 1249 const int canSendFast = clientCanSendFastBlock( msgs ); 1250 1251 if( !reqIsValid ) /* bad request */ 1252 { 1302 const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index ); 1303 const int peerIsChoked = msgs->peer->peerIsChoked; 1304 1305 int allow = FALSE; 1306 1307 if( !reqIsValid ) 1253 1308 dbgmsg( msgs, "rejecting an invalid request." ); 1254 sendFastReject( msgs, req->index, req->offset, req->length ); 1255 } 1256 else if( !clientHasPiece ) /* we don't have it */ 1257 { 1309 else if( !clientHasPiece ) 1258 1310 dbgmsg( msgs, "rejecting request for a piece we don't have." ); 1259 sendFastReject( msgs, req->index, req->offset, req->length ); 1260 } 1261 else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */ 1262 { 1263 tr_peerMsgsSetChoke( msgs, 1 ); 1264 sendFastReject( msgs, req->index, req->offset, req->length ); 1265 } 1266 else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) ) 1267 { 1268 sendFastReject( msgs, req->index, req->offset, req->length ); 1269 } 1270 else /* YAY */ 1271 { 1272 if( peerIsFast && pieceIsFast ) 1273 reqListAppend( &msgs->peerAskedForFast, req ); 1274 else 1275 reqListAppend( &msgs->peerAskedFor, req ); 1276 } 1311 else if( peerIsChoked ) 1312 dbgmsg( msgs, "rejecting request from choked peer" ); 1313 else 1314 allow = TRUE; 1315 1316 if( allow ) 1317 reqListAppend( &msgs->peerAskedFor, req ); 1318 else if( fext ) 1319 protocolSendReject( msgs, req ); 1277 1320 } 1278 1321 1279 1322 static int 1280 messageLengthIsCorrect( const tr_peermsgs * msg, 1281 uint8_t id, 1282 uint32_t len ) 1323 messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len ) 1283 1324 { 1284 1325 switch( id ) … … 1288 1329 case BT_INTERESTED: 1289 1330 case BT_NOT_INTERESTED: 1290 case BT_ HAVE_ALL:1291 case BT_ HAVE_NONE:1331 case BT_FEXT_HAVE_ALL: 1332 case BT_FEXT_HAVE_NONE: 1292 1333 return len == 1; 1293 1334 1294 1335 case BT_HAVE: 1295 case BT_ SUGGEST:1296 case BT_ ALLOWED_FAST:1336 case BT_FEXT_SUGGEST: 1337 case BT_FEXT_ALLOWED_FAST: 1297 1338 return len == 5; 1298 1339 … … 1302 1343 case BT_REQUEST: 1303 1344 case BT_CANCEL: 1304 case BT_ REJECT:1345 case BT_FEXT_REJECT: 1305 1346 return len == 13; 1306 1347 … … 1339 1380 return READ_LATER; 1340 1381 1341 tr_peerIoReadUint32( msgs-> io, inbuf, &req->index );1342 tr_peerIoReadUint32( msgs-> io, inbuf, &req->offset );1382 tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index ); 1383 tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset ); 1343 1384 req->length = msgs->incoming.length - 9; 1344 dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, 1345 req->offset, 1346 req->length ); 1385 dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length ); 1347 1386 return READ_NOW; 1348 1387 } 1349 1388 else 1350 1389 { 1351 int 1390 int err; 1352 1391 1353 1392 /* read in another chunk of data */ 1354 const size_t nLeft = req->length - EVBUFFER_LENGTH( 1355 msgs->incoming.block ); 1356 size_t n = MIN( nLeft, inlen ); 1357 uint8_t * buf = tr_new( uint8_t, n ); 1393 const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block ); 1394 size_t n = MIN( nLeft, inlen ); 1395 uint8_t * buf = tr_new( uint8_t, n ); 1358 1396 assert( EVBUFFER_LENGTH( inbuf ) >= n ); 1359 tr_peerIoReadBytes( msgs-> io, inbuf, buf, n );1397 tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, n ); 1360 1398 evbuffer_add( msgs->incoming.block, buf, n ); 1361 1399 fireClientGotData( msgs, n, TRUE ); 1362 1400 *setme_piece_bytes_read += n; 1363 1401 tr_free( buf ); 1364 dbgmsg( msgs, "got % dbytes for block %u:%u->%u ... %d remain",1365 (int)n, req->index, req->offset, req->length,1402 dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain", 1403 n, req->index, req->offset, req->length, 1366 1404 (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) ); 1367 1405 if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length ) … … 1369 1407 1370 1408 /* we've got the whole block ... process it */ 1371 err = clientGotBlock( msgs, EVBUFFER_DATA( 1372 msgs->incoming.block ), req ); 1409 err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req ); 1373 1410 1374 1411 /* cleanup */ 1375 evbuffer_drain( msgs->incoming.block, 1376 EVBUFFER_LENGTH( msgs->incoming.block ) ); 1412 evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) ); 1377 1413 req->length = 0; 1378 1414 msgs->state = AWAITING_BT_LENGTH; 1379 1415 if( !err ) 1380 1416 return READ_NOW; 1381 else 1382 { 1417 else { 1383 1418 fireError( msgs, err ); 1384 1419 return READ_ERR; … … 1388 1423 1389 1424 static int 1390 readBtMessage( tr_peermsgs * msgs, 1391 struct evbuffer * inbuf, 1392 size_t inlen ) 1425 readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen ) 1393 1426 { 1394 1427 uint32_t ui32; … … 1396 1429 const uint8_t id = msgs->incoming.id; 1397 1430 const size_t startBufLen = EVBUFFER_LENGTH( inbuf ); 1431 const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io ); 1398 1432 1399 1433 --msglen; /* id length */ … … 1402 1436 return READ_LATER; 1403 1437 1404 dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, 1405 (int)msglen, 1406 (int)inlen ); 1438 dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen ); 1407 1439 1408 1440 if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) ) 1409 1441 { 1410 dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", 1411 (int)id, (int)msglen ); 1442 dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen ); 1412 1443 fireError( msgs, EMSGSIZE ); 1413 1444 return READ_ERR; … … 1418 1449 case BT_CHOKE: 1419 1450 dbgmsg( msgs, "got Choke" ); 1420 msgs->info->clientIsChoked = 1; 1421 cancelAllRequestsToPeer( msgs, FALSE ); 1451 msgs->peer->clientIsChoked = 1; 1452 if( !fext ) 1453 cancelAllRequestsToPeer( msgs, FALSE ); 1422 1454 break; 1423 1455 1424 1456 case BT_UNCHOKE: 1425 1457 dbgmsg( msgs, "got Unchoke" ); 1426 msgs-> info->clientIsChoked = 0;1458 msgs->peer->clientIsChoked = 0; 1427 1459 fireNeedReq( msgs ); 1428 1460 break; … … 1430 1462 case BT_INTERESTED: 1431 1463 dbgmsg( msgs, "got Interested" ); 1432 msgs-> info->peerIsInterested = 1;1464 msgs->peer->peerIsInterested = 1; 1433 1465 break; 1434 1466 1435 1467 case BT_NOT_INTERESTED: 1436 1468 dbgmsg( msgs, "got Not Interested" ); 1437 msgs-> info->peerIsInterested = 0;1469 msgs->peer->peerIsInterested = 0; 1438 1470 break; 1439 1471 1440 1472 case BT_HAVE: 1441 tr_peerIoReadUint32( msgs-> io, inbuf, &ui32 );1473 tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 ); 1442 1474 dbgmsg( msgs, "got Have: %u", ui32 ); 1443 if( tr_bitfieldAdd( msgs->info->have, ui32 ) ) 1444 { 1475 if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) { 1445 1476 fireError( msgs, ERANGE ); 1446 1477 return READ_ERR; … … 1455 1486 dbgmsg( msgs, "got a bitfield" ); 1456 1487 msgs->peerSentBitfield = 1; 1457 tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, 1458 msglen ); 1488 tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen ); 1459 1489 updatePeerProgress( msgs ); 1460 maybeSendFastAllowedSet( msgs );1461 1490 fireNeedReq( msgs ); 1462 1491 break; … … 1466 1495 { 1467 1496 struct peer_request r; 1468 tr_peerIoReadUint32( msgs->io, inbuf, &r.index ); 1469 tr_peerIoReadUint32( msgs->io, inbuf, &r.offset ); 1470 tr_peerIoReadUint32( msgs->io, inbuf, &r.length ); 1471 dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, 1472 r.length ); 1497 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index ); 1498 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset ); 1499 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length ); 1500 dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length ); 1473 1501 peerMadeRequest( msgs, &r ); 1474 1502 break; … … 1478 1506 { 1479 1507 struct peer_request r; 1480 tr_peerIoReadUint32( msgs->io, inbuf, &r.index ); 1481 tr_peerIoReadUint32( msgs->io, inbuf, &r.offset ); 1482 tr_peerIoReadUint32( msgs->io, inbuf, &r.length ); 1483 dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, 1484 r.length ); 1485 reqListRemove( &msgs->peerAskedForFast, &r ); 1486 reqListRemove( &msgs->peerAskedFor, &r ); 1508 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index ); 1509 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset ); 1510 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length ); 1511 dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length ); 1512 if( reqListRemove( &msgs->peerAskedFor, &r ) && fext ) 1513 protocolSendReject( msgs, &r ); 1487 1514 break; 1488 1515 } … … 1494 1521 case BT_PORT: 1495 1522 dbgmsg( msgs, "Got a BT_PORT" ); 1496 tr_peerIoReadUint16( msgs-> io, inbuf, &msgs->info->port );1523 tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port ); 1497 1524 break; 1498 1525 1499 case BT_SUGGEST: 1500 { 1501 dbgmsg( msgs, "Got a BT_SUGGEST" ); 1502 tr_peerIoReadUint32( msgs->io, inbuf, &ui32 ); 1503 /* we don't do anything with this yet */ 1526 case BT_FEXT_SUGGEST: 1527 dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" ); 1528 tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 ); 1529 if( fext ) 1530 fireClientGotSuggest( msgs, ui32 ); 1531 else { 1532 fireError( msgs, EMSGSIZE ); 1533 return READ_ERR; 1534 } 1504 1535 break; 1505 } 1506 1507 case BT_HAVE_ALL: 1508 dbgmsg( msgs, "Got a BT_HAVE_ALL" ); 1509 tr_bitfieldAddRange( msgs->info->have, 0, 1510 msgs->torrent->info.pieceCount ); 1511 updatePeerProgress( msgs ); 1512 maybeSendFastAllowedSet( msgs ); 1536 1537 case BT_FEXT_ALLOWED_FAST: 1538 dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" ); 1539 tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 ); 1540 if( fext ) 1541 fireClientGotAllowedFast( msgs, ui32 ); 1542 else { 1543 fireError( msgs, EMSGSIZE ); 1544 return READ_ERR; 1545 } 1513 1546 break; 1514 1547 1515 1516 case BT_HAVE_NONE: 1517 dbgmsg( msgs, "Got a BT_HAVE_NONE" ); 1518 tr_bitfieldClear( msgs->info->have ); 1519 updatePeerProgress( msgs ); 1520 maybeSendFastAllowedSet( msgs ); 1548 case BT_FEXT_HAVE_ALL: 1549 dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" ); 1550 if( fext ) { 1551 tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount ); 1552 updatePeerProgress( msgs ); 1553 } else { 1554 fireError( msgs, EMSGSIZE ); 1555 return READ_ERR; 1556 } 1521 1557 break; 1522 1558 1523 case BT_REJECT: 1559 case BT_FEXT_HAVE_NONE: 1560 dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" ); 1561 if( fext ) { 1562 tr_bitfieldClear( msgs->peer->have ); 1563 updatePeerProgress( msgs ); 1564 } else { 1565 fireError( msgs, EMSGSIZE ); 1566 return READ_ERR; 1567 } 1568 break; 1569 1570 case BT_FEXT_REJECT: 1524 1571 { 1525 1572 struct peer_request r; 1526 dbgmsg( msgs, "Got a BT_REJECT" ); 1527 tr_peerIoReadUint32( msgs->io, inbuf, &r.index ); 1528 tr_peerIoReadUint32( msgs->io, inbuf, &r.offset ); 1529 tr_peerIoReadUint32( msgs->io, inbuf, &r.length ); 1530 reqListRemove( &msgs->clientAskedFor, &r ); 1531 break; 1532 } 1533 1534 case BT_ALLOWED_FAST: 1535 { 1536 dbgmsg( msgs, "Got a BT_ALLOWED_FAST" ); 1537 tr_peerIoReadUint32( msgs->io, inbuf, &ui32 ); 1538 /* we don't do anything with this yet */ 1573 dbgmsg( msgs, "Got a BT_FEXT_REJECT" ); 1574 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index ); 1575 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset ); 1576 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length ); 1577 if( fext ) 1578 reqListRemove( &msgs->clientAskedFor, &r ); 1579 else { 1580 fireError( msgs, EMSGSIZE ); 1581 return READ_ERR; 1582 } 1539 1583 break; 1540 1584 } … … 1547 1591 default: 1548 1592 dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id ); 1549 tr_peerIoDrain( msgs-> io, inbuf, msglen );1593 tr_peerIoDrain( msgs->peer->io, inbuf, msglen ); 1550 1594 break; 1551 1595 } … … 1559 1603 1560 1604 static void 1561 decrementDownloadedCount( tr_peermsgs * msgs, 1562 uint32_t byteCount ) 1605 decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount ) 1563 1606 { 1564 1607 tr_torrent * tor = msgs->torrent; … … 1568 1611 1569 1612 static void 1570 clientGotUnwantedBlock( tr_peermsgs * msgs, 1571 const struct peer_request * req ) 1613 clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req ) 1572 1614 { 1573 1615 decrementDownloadedCount( msgs, req->length ); … … 1575 1617 1576 1618 static void 1577 addPeerToBlamefield( tr_peermsgs * msgs, 1578 uint32_t index ) 1579 { 1580 if( !msgs->info->blame ) 1581 msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount ); 1582 tr_bitfieldAdd( msgs->info->blame, index ); 1619 addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index ) 1620 { 1621 if( !msgs->peer->blame ) 1622 msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount ); 1623 tr_bitfieldAdd( msgs->peer->blame, index ); 1583 1624 } 1584 1625 … … 1589 1630 const struct peer_request * req ) 1590 1631 { 1591 int 1592 tr_torrent * 1632 int err; 1633 tr_torrent * tor = msgs->torrent; 1593 1634 const tr_block_index_t block = _tr_block( tor, req->index, req->offset ); 1594 1635 … … 1596 1637 assert( req ); 1597 1638 1598 if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) 1599 { 1639 if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) { 1600 1640 dbgmsg( msgs, "wrong block size -- expected %u, got %d", 1601 1641 tr_torBlockCountBytes( msgs->torrent, block ), req->length ); … … 1604 1644 1605 1645 /* save the block */ 1606 dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, 1607 req->length ); 1646 dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length ); 1608 1647 1609 1648 /** … … 1611 1650 **/ 1612 1651 1613 if( !reqListRemove( &msgs->clientAskedFor, req ) ) 1614 { 1652 if( !reqListRemove( &msgs->clientAskedFor, req ) ) { 1615 1653 clientGotUnwantedBlock( msgs, req ); 1616 1654 dbgmsg( msgs, "we didn't ask for this message..." ); … … 1625 1663 **/ 1626 1664 1627 if( tr_cpBlockIsComplete( tor->completion, block ) ) 1628 { 1665 if( tr_cpBlockIsComplete( tor->completion, block ) ) { 1629 1666 dbgmsg( msgs, "we have this block already..." ); 1630 1667 clientGotUnwantedBlock( msgs, req ); … … 1655 1692 1656 1693 static ReadState 1657 canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )1694 canRead( tr_peerIo * io, void * vmsgs, size_t * piece ) 1658 1695 { 1659 1696 ReadState ret; 1660 1697 tr_peermsgs * msgs = vmsgs; 1661 struct evbuffer * in = tr_ iobuf_input( iobuf);1698 struct evbuffer * in = tr_peerIoGetReadBuffer( io ); 1662 1699 const size_t inlen = EVBUFFER_LENGTH( in ); 1663 1700 … … 1700 1737 { 1701 1738 tr_peermsgs * msgs = vmsgs; 1702 const double rateToClient = tr_peerGetPieceSpeed( msgs->info, TR_PEER_TO_CLIENT ); 1703 const int estimatedBlocksInNext30Seconds = 1704 ( rateToClient * 30 * 1024 ) / msgs->torrent->blockSize; 1705 msgs->minActiveRequests = 8; 1706 msgs->maxActiveRequests = msgs->minActiveRequests + estimatedBlocksInNext30Seconds; 1739 const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, TR_PEER_TO_CLIENT ); 1740 const int seconds = 10; 1741 const int floor = 8; 1742 const int estimatedBlocksInPeriod = ( rateToClient * seconds * 1024 ) / msgs->torrent->blockSize; 1743 1744 msgs->maxActiveRequests = floor + estimatedBlocksInPeriod; 1745 1707 1746 if( msgs->reqq > 0 ) 1708 1747 msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq ); 1748 1709 1749 return TRUE; 1710 }1711 1712 static int1713 popNextRequest( tr_peermsgs * msgs,1714 struct peer_request * setme )1715 {1716 return reqListPop( &msgs->peerAskedForFast, setme )1717 || reqListPop( &msgs->peerAskedFor, setme );1718 1750 } 1719 1751 … … 1723 1755 size_t bytesWritten = 0; 1724 1756 struct peer_request req; 1725 const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0; 1757 const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0; 1758 const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io ); 1726 1759 1727 1760 /** … … 1738 1771 const size_t len = EVBUFFER_LENGTH( msgs->outMessages ); 1739 1772 /* flush the protocol messages */ 1740 dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs-> io, len );1741 tr_peerIoWriteBuf( msgs-> io, msgs->outMessages, FALSE );1773 dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len ); 1774 tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE ); 1742 1775 msgs->clientSentAnythingAt = now; 1743 1776 msgs->outMessagesBatchedAt = 0; … … 1750 1783 **/ 1751 1784 1752 if( ( tr_peerIoGetWriteBufferSpace( msgs->io ) >= msgs->torrent->blockSize ) 1753 && popNextRequest( msgs, &req ) 1754 && requestIsValid( msgs, &req ) 1755 && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) ) 1756 { 1757 /* send a block */ 1758 uint8_t * buf = tr_new( uint8_t, req.length ); 1759 const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ); 1760 if( err ) { 1761 fireError( msgs, err ); 1762 bytesWritten = 0; 1763 msgs = NULL; 1764 } else { 1765 tr_peerIo * io = msgs->io; 1766 struct evbuffer * out = evbuffer_new( ); 1767 dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length ); 1768 tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length ); 1769 tr_peerIoWriteUint8 ( io, out, BT_PIECE ); 1770 tr_peerIoWriteUint32( io, out, req.index ); 1771 tr_peerIoWriteUint32( io, out, req.offset ); 1772 tr_peerIoWriteBytes ( io, out, buf, req.length ); 1773 tr_peerIoWriteBuf( io, out, TRUE ); 1774 bytesWritten += EVBUFFER_LENGTH( out ); 1775 evbuffer_free( out ); 1776 msgs->clientSentAnythingAt = now; 1785 if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io ) >= msgs->torrent->blockSize ) 1786 && popNextRequest( msgs, &req ) ) 1787 { 1788 if( requestIsValid( msgs, &req ) 1789 && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) ) 1790 { 1791 /* send a block */ 1792 uint8_t * buf = tr_new( uint8_t, req.length ); 1793 const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ); 1794 if( err ) { 1795 fireError( msgs, err ); 1796 bytesWritten = 0; 1797 msgs = NULL; 1798 } else { 1799 tr_peerIo * io = msgs->peer->io; 1800 struct evbuffer * out = evbuffer_new( ); 1801 dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length ); 1802 tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length ); 1803 tr_peerIoWriteUint8 ( io, out, BT_PIECE ); 1804 tr_peerIoWriteUint32( io, out, req.index ); 1805 tr_peerIoWriteUint32( io, out, req.offset ); 1806 tr_peerIoWriteBytes ( io, out, buf, req.length ); 1807 tr_peerIoWriteBuf( io, out, TRUE ); 1808 bytesWritten += EVBUFFER_LENGTH( out ); 1809 evbuffer_free( out ); 1810 msgs->clientSentAnythingAt = now; 1811 } 1812 tr_free( buf ); 1777 1813 } 1778 tr_free( buf ); 1814 else if( fext ) /* peer needs a reject message */ 1815 { 1816 protocolSendReject( msgs, &req ); 1817 } 1779 1818 } 1780 1819 … … 1788 1827 { 1789 1828 dbgmsg( msgs, "sending a keepalive message" ); 1790 tr_peerIoWriteUint32( msgs-> io, msgs->outMessages, 0 );1829 tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 ); 1791 1830 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 1792 1831 } … … 1821 1860 1822 1861 static void 1823 gotError( struct tr_iobuf * iobufUNUSED,1824 short 1825 void 1862 gotError( tr_peerIo * io UNUSED, 1863 short what, 1864 void * vmsgs ) 1826 1865 { 1827 1866 if( what & EVBUFFER_TIMEOUT ) … … 1872 1911 } 1873 1912 1874 tr_peerIoWriteUint32( msgs-> io, out,1913 tr_peerIoWriteUint32( msgs->peer->io, out, 1875 1914 sizeof( uint8_t ) + field->byteCount ); 1876 tr_peerIoWriteUint8 ( msgs-> io, out, BT_BITFIELD );1877 tr_peerIoWriteBytes ( msgs-> io, out, field->bits, field->byteCount );1878 dbgmsg( msgs, "sending bitfield... outMessage size is now % d",1879 (int)EVBUFFER_LENGTH( out ) );1915 tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD ); 1916 tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount ); 1917 dbgmsg( msgs, "sending bitfield... outMessage size is now %zu", 1918 EVBUFFER_LENGTH( out ) ); 1880 1919 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 1881 1920 … … 1884 1923 1885 1924 tr_bitfieldFree( field ); 1925 } 1926 1927 static void 1928 tellPeerWhatWeHave( tr_peermsgs * msgs ) 1929 { 1930 const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io ); 1931 1932 if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_CP_COMPLETE ) ) 1933 { 1934 protocolSendHaveAll( msgs ); 1935 } 1936 else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) ) 1937 { 1938 protocolSendHaveNone( msgs ); 1939 } 1940 else 1941 { 1942 sendBitfield( msgs ); 1943 } 1886 1944 } 1887 1945 … … 1995 2053 tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 ); 1996 2054 for( i = 0; i < diffs.addedCount; ++i ) { 1997 memcpy( walk, &diffs.added[i]. in_addr, 4 ); walk += 4;2055 memcpy( walk, &diffs.added[i].addr, 4 ); walk += 4; 1998 2056 memcpy( walk, &diffs.added[i].port, 2 ); walk += 2; 1999 2057 } … … 2013 2071 tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 ); 2014 2072 for( i = 0; i < diffs.droppedCount; ++i ) { 2015 memcpy( walk, &diffs.dropped[i]. in_addr, 4 ); walk += 4;2073 memcpy( walk, &diffs.dropped[i].addr, 4 ); walk += 4; 2016 2074 memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2; 2017 2075 } … … 2022 2080 /* write the pex message */ 2023 2081 benc = tr_bencSave( &val, &bencLen ); 2024 tr_peerIoWriteUint32( msgs-> io, out, 2 * sizeof( uint8_t ) + bencLen );2025 tr_peerIoWriteUint8 ( msgs-> io, out, BT_LTEP );2026 tr_peerIoWriteUint8 ( msgs-> io, out, msgs->ut_pex_id );2027 tr_peerIoWriteBytes ( msgs-> io, out, benc, bencLen );2082 tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + bencLen ); 2083 tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP ); 2084 tr_peerIoWriteUint8 ( msgs->peer->io, out, msgs->ut_pex_id ); 2085 tr_peerIoWriteBytes ( msgs->peer->io, out, benc, bencLen ); 2028 2086 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS ); 2029 2087 dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) ); … … 2055 2113 tr_peermsgs* 2056 2114 tr_peerMsgsNew( struct tr_torrent * torrent, 2057 struct tr_peer * info,2115 struct tr_peer * peer, 2058 2116 tr_delivery_func func, 2059 void *userData,2060 tr_publisher_tag *setme )2117 void * userData, 2118 tr_publisher_tag * setme ) 2061 2119 { 2062 2120 tr_peermsgs * m; 2063 2121 2064 assert( info);2065 assert( info->io );2122 assert( peer ); 2123 assert( peer->io ); 2066 2124 2067 2125 m = tr_new0( tr_peermsgs, 1 ); 2068 2126 m->publisher = tr_publisherNew( ); 2069 m-> info = info;2127 m->peer = peer; 2070 2128 m->session = torrent->session; 2071 2129 m->torrent = torrent; 2072 m->io = info->io; 2073 m->info->clientIsChoked = 1; 2074 m->info->peerIsChoked = 1; 2075 m->info->clientIsInterested = 0; 2076 m->info->peerIsInterested = 0; 2077 m->info->have = tr_bitfieldNew( torrent->info.pieceCount ); 2130 m->peer->clientIsChoked = 1; 2131 m->peer->peerIsChoked = 1; 2132 m->peer->clientIsInterested = 0; 2133 m->peer->peerIsInterested = 0; 2134 m->peer->have = tr_bitfieldNew( torrent->info.pieceCount ); 2078 2135 m->state = AWAITING_BT_LENGTH; 2079 2136 m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL ); … … 2082 2139 m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS; 2083 2140 m->incoming.block = evbuffer_new( ); 2084 m->peerAllowedPieces = NULL;2085 2141 m->peerAskedFor = REQUEST_LIST_INIT; 2086 m->peerAskedForFast = REQUEST_LIST_INIT;2087 2142 m->clientAskedFor = REQUEST_LIST_INIT; 2088 2143 m->clientWillAskFor = REQUEST_LIST_INIT; 2144 peer->msgs = m; 2145 2089 2146 *setme = tr_publisherSubscribe( m->publisher, func, userData ); 2090 2147 2091 if( tr_peerIoSupportsLTEP( m->io ) )2148 if( tr_peerIoSupportsLTEP( peer->io ) ) 2092 2149 sendLtepHandshake( m ); 2093 2150 2094 sendBitfield( m ); 2095 2096 tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of 2097 inactivity */ 2098 tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m ); 2151 tellPeerWhatWeHave( m ); 2152 2153 tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m ); 2099 2154 ratePulse( m ); 2100 2155 … … 2111 2166 reqListClear( &msgs->clientWillAskFor ); 2112 2167 reqListClear( &msgs->clientAskedFor ); 2113 reqListClear( &msgs->peerAskedForFast );2114 2168 reqListClear( &msgs->peerAskedFor ); 2115 tr_bitfieldFree( msgs->peerAllowedPieces ); 2169 2116 2170 evbuffer_free( msgs->incoming.block ); 2117 2171 evbuffer_free( msgs->outMessages );
Note: See TracChangeset
for help on using the changeset viewer.