Changeset 9494
- Timestamp:
- Nov 8, 2009, 11:20:00 PM (13 years ago)
- Location:
- trunk/libtransmission
- Files:
-
- 3 deleted
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/libtransmission/Makefile.am
r9328 r9494 40 40 publish.c \ 41 41 ratecontrol.c \ 42 request-list.c \43 42 resume.c \ 44 43 rpcimpl.c \ … … 88 87 publish.h \ 89 88 ratecontrol.h \ 90 request-list.h \91 89 resume.h \ 92 90 rpcimpl.h \ … … 112 110 json-test \ 113 111 peer-msgs-test \ 114 request-list-test \115 112 rpc-test \ 116 113 test-peer-id \ … … 158 155 test_peer_id_LDFLAGS = ${apps_ldflags} 159 156 160 request_list_test_SOURCES = request-list-test.c161 request_list_test_LDADD = ${apps_ldadd}162 request_list_test_LDFLAGS = ${apps_ldflags}163 164 157 peer_msgs_test_SOURCES = peer-msgs-test.c 165 158 peer_msgs_test_LDADD = ${apps_ldadd} -
trunk/libtransmission/peer-common.h
r9434 r9494 29 29 #include "transmission.h" 30 30 31 enum 32 { 33 /** when we're making requests from another peer, 34 batch them together to send enough requests to 35 meet our bandwidth goals for the next N seconds */ 36 REQUEST_BUF_SECS = 10, 37 38 /** how long we'll let requests we've made stay pending 39 before we cancel them */ 40 REQUEST_TTL_SECS = 30 41 }; 42 31 43 typedef enum 32 44 { … … 46 58 { 47 59 TR_PEER_CLIENT_GOT_BLOCK, 60 TR_PEER_CLIENT_GOT_CHOKE, 48 61 TR_PEER_CLIENT_GOT_DATA, 49 62 TR_PEER_CLIENT_GOT_ALLOWED_FAST, 50 63 TR_PEER_CLIENT_GOT_SUGGEST, 51 64 TR_PEER_CLIENT_GOT_PORT, 65 TR_PEER_CLIENT_GOT_REJ, 52 66 TR_PEER_PEER_GOT_DATA, 53 67 TR_PEER_PEER_PROGRESS, 54 68 TR_PEER_ERROR, 55 TR_PEER_CANCEL, 56 TR_PEER_UPLOAD_ONLY, 57 TR_PEER_NEED_REQ 69 TR_PEER_UPLOAD_ONLY 58 70 } 59 71 PeerEventType; -
trunk/libtransmission/peer-mgr.c
r9484 r9494 137 137 } 138 138 139 struct tr_blockIterator 140 { 141 time_t expirationDate; 142 struct tr_torrent_peers * t; 143 tr_block_index_t blockIndex, blockCount, *blocks; 144 tr_piece_index_t pieceIndex, pieceCount, *pieces; 139 struct block_request 140 { 141 tr_block_index_t block; 142 tr_peer * peer; 143 time_t sentAt; 144 }; 145 146 struct weighted_piece 147 { 148 tr_piece_index_t index; 149 int16_t salt; 150 int16_t requestCount; 145 151 }; 146 152 … … 156 162 tr_torrent * tor; 157 163 tr_peer * optimistic; /* the optimistic peer, or NULL if none */ 158 struct tr_blockIterator * refillQueue; /* used in refillPulse() */159 164 struct tr_peerMgr * manager; 160 int * pendingRequestCount;165 //int * pendingRequestCount; 161 166 162 167 tr_bool isRunning; 168 169 struct block_request * requests; 170 int requestsSort; 171 int requestCount; 172 int requestAlloc; 173 174 struct weighted_piece * pieces; 175 int piecesSort; 176 int pieceCount; 177 178 tr_bool isInEndgame; 163 179 } 164 180 Torrent; … … 352 368 } 353 369 354 static void 355 peerDestructor( tr_peer * peer ) 356 { 357 assert( peer ); 370 static void peerDeclinedAllRequests( Torrent *, const tr_peer * ); 371 372 static void 373 peerDestructor( Torrent * t, tr_peer * peer ) 374 { 375 assert( peer != NULL ); 376 377 peerDeclinedAllRequests( t, peer ); 358 378 359 379 if( peer->msgs != NULL ) … … 386 406 removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare ); 387 407 assert( removed == peer ); 388 peerDestructor( removed );408 peerDestructor( t, removed ); 389 409 } 390 410 … … 395 415 removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) ); 396 416 } 397 398 static void blockIteratorFree( struct tr_blockIterator ** inout );399 417 400 418 static void … … 411 429 evtimer_del( &t->refillTimer ); 412 430 413 blockIteratorFree( &t->refillQueue );414 431 tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree ); 415 432 tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free ); … … 417 434 tr_ptrArrayDestruct( &t->peers, NULL ); 418 435 419 tr_free( t->pendingRequestCount ); 436 tr_free( t->requests ); 437 tr_free( t->pieces ); 420 438 tr_free( t ); 421 439 } 422 440 423 441 424 static void refillPulse( int, short, void* );442 //static void refillPulse( int, short, void* ); 425 443 426 444 static void peerCallbackFunc( void * vpeer, … … 442 460 t->webseeds = TR_PTR_ARRAY_INIT; 443 461 t->outgoingHandshakes = TR_PTR_ARRAY_INIT; 444 evtimer_set( &t->refillTimer, refillPulse, t ); 462 t->requests = 0; 463 //evtimer_set( &t->refillTimer, refillPulse, t ); 445 464 446 465 … … 534 553 } 535 554 536 /**** 537 ***** 538 ***** REFILL 539 ***** 540 ****/ 541 542 static void 543 assertValidPiece( Torrent * t, tr_piece_index_t piece ) 544 { 545 assert( t ); 546 assert( t->tor ); 547 assert( piece < t->tor->info.pieceCount ); 548 } 555 /** 556 *** REQUESTS 557 *** 558 *** There are two data structures associated with managing block requests: 559 *** 560 *** 1. Torrent::requests, an array of "struct block_request" which keeps 561 *** track of which blocks have been requested, and when, and by which peers. 562 *** This is list is used for (a) cancelling requests that have been pending 563 *** for too long and (b) avoiding duplicate requests before endgame. 564 *** 565 *** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the 566 *** pieces that we want to request. It's used to decide which pieces to 567 *** return next when tr_peerMgrGetBlockRequests() is called. 568 **/ 569 570 /** 571 *** struct block_request 572 **/ 573 574 enum 575 { 576 REQ_UNSORTED, 577 REQ_SORTED_BY_BLOCK, 578 REQ_SORTED_BY_TIME 579 }; 549 580 550 581 static int 551 getPieceRequests( Torrent * t, tr_piece_index_t piece ) 552 { 553 assertValidPiece( t, piece ); 554 555 return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0; 556 } 557 558 static void 559 incrementPieceRequests( Torrent * t, tr_piece_index_t piece ) 560 { 561 assertValidPiece( t, piece ); 562 563 if( t->pendingRequestCount == NULL ) 564 t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount ); 565 t->pendingRequestCount[piece]++; 566 } 567 568 static void 569 decrementPieceRequests( Torrent * t, tr_piece_index_t piece ) 570 { 571 assertValidPiece( t, piece ); 572 573 if( t->pendingRequestCount ) 574 t->pendingRequestCount[piece]--; 575 } 576 577 struct tr_refill_piece 578 { 579 tr_priority_t priority; 580 uint32_t piece; 581 uint32_t peerCount; 582 int random; 583 int pendingRequestCount; 584 int missingBlockCount; 582 compareReqByBlock( const void * va, const void * vb ) 583 { 584 const struct block_request * a = va; 585 const struct block_request * b = vb; 586 if( a->block < b->block ) return -1; 587 if( a->block > b->block ) return 1; 588 return 0; 589 } 590 591 static int 592 compareReqByTime( const void * va, const void * vb ) 593 { 594 const struct block_request * a = va; 595 const struct block_request * b = vb; 596 if( a->sentAt < b->sentAt ) return -1; 597 if( a->sentAt > b->sentAt ) return 1; 598 return 0; 599 } 600 601 static void 602 requestListSort( Torrent * t, int mode ) 603 { 604 assert( mode==REQ_SORTED_BY_BLOCK || mode==REQ_SORTED_BY_TIME ); 605 606 if( t->requestsSort != mode ) 607 { 608 int(*compar)(const void *, const void *); 609 610 t->requestsSort = mode; 611 612 switch( mode ) { 613 case REQ_SORTED_BY_BLOCK: compar = compareReqByBlock; break; 614 case REQ_SORTED_BY_TIME: compar = compareReqByTime; break; 615 default: assert( 0 && "unhandled" ); 616 } 617 618 //fprintf( stderr, "sorting requests by %s\n", (mode==REQ_SORTED_BY_BLOCK)?"block":"time" ); 619 qsort( t->requests, t->requestCount, 620 sizeof( struct block_request ), compar ); 621 } 622 } 623 624 static void 625 requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer ) 626 { 627 struct block_request key; 628 629 /* ensure enough room is available... */ 630 if( t->requestCount + 1 >= t->requestAlloc ) 631 { 632 const int CHUNK_SIZE = 128; 633 t->requestAlloc += CHUNK_SIZE; 634 t->requests = tr_renew( struct block_request, 635 t->requests, t->requestAlloc ); 636 } 637 638 /* populate the record we're inserting */ 639 key.block = block; 640 key.peer = peer; 641 key.sentAt = time( NULL ); 642 643 /* insert the request to our array... */ 644 switch( t->requestsSort ) 645 { 646 case REQ_UNSORTED: 647 case REQ_SORTED_BY_TIME: 648 t->requests[t->requestCount++] = key; 649 break; 650 651 case REQ_SORTED_BY_BLOCK: { 652 tr_bool exact; 653 const int pos = tr_lowerBound( &key, t->requests, t->requestCount, 654 sizeof( struct block_request ), 655 compareReqByBlock, &exact ); 656 assert( !exact ); 657 memmove( t->requests + pos + 1, 658 t->requests + pos, 659 sizeof( struct block_request ) * ( t->requestCount++ - pos ) ); 660 t->requests[pos] = key; 661 break; 662 } 663 } 664 } 665 666 static struct block_request * 667 requestListLookup( Torrent * t, tr_block_index_t block ) 668 { 669 struct block_request key; 670 key.block = block; 671 672 requestListSort( t, REQ_SORTED_BY_BLOCK ); 673 674 return bsearch( &key, t->requests, t->requestCount, 675 sizeof( struct block_request ), 676 compareReqByBlock ); 677 } 678 679 static void 680 requestListRemove( Torrent * t, tr_block_index_t block ) 681 { 682 const struct block_request * b = requestListLookup( t, block ); 683 if( b != NULL ) 684 { 685 const int pos = b - t->requests; 686 assert( pos < t->requestCount ); 687 memmove( t->requests + pos, 688 t->requests + pos + 1, 689 sizeof( struct block_request ) * ( --t->requestCount - pos ) ); 690 } 691 } 692 693 /** 694 *** struct weighted_piece 695 **/ 696 697 enum 698 { 699 PIECES_UNSORTED, 700 PIECES_SORTED_BY_INDEX, 701 PIECES_SORTED_BY_WEIGHT 585 702 }; 586 703 704 const tr_torrent * weightTorrent; 705 706 /* we try to create a "weight" s.t. high-priority pieces come before others, 707 * and that partially-complete pieces come before empty ones. */ 587 708 static int 588 compareRefillPiece( const void * aIn, const void * bIn ) 589 { 590 const struct tr_refill_piece * a = aIn; 591 const struct tr_refill_piece * b = bIn; 592 593 /* if one piece has a higher priority, it goes first */ 594 if( a->priority != b->priority ) 595 return a->priority > b->priority ? -1 : 1; 596 597 /* have a per-priority endgame */ 598 if( a->pendingRequestCount != b->pendingRequestCount ) 599 return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1; 600 601 /* fewer missing pieces goes first */ 602 if( a->missingBlockCount != b->missingBlockCount ) 603 return a->missingBlockCount < b->missingBlockCount ? -1 : 1; 604 605 /* otherwise if one has fewer peers, it goes first */ 606 if( a->peerCount != b->peerCount ) 607 return a->peerCount < b->peerCount ? -1 : 1; 608 609 /* otherwise go with our random seed */ 610 if( a->random != b->random ) 611 return a->random < b->random ? -1 : 1; 612 709 comparePieceByWeight( const void * va, const void * vb ) 710 { 711 const struct weighted_piece * a = va; 712 const struct weighted_piece * b = vb; 713 int ia, ib, missing, pending; 714 const tr_torrent * tor = weightTorrent; 715 716 /* primary key: weight */ 717 missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index ); 718 pending = a->requestCount; 719 ia = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending); 720 missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index ); 721 pending = b->requestCount; 722 ib = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending); 723 if( ia < ib ) return -1; 724 if( ia > ib ) return 1; 725 726 /* secondary key: higher priorities go first */ 727 ia = tor->info.pieces[a->index].priority; 728 ib = tor->info.pieces[b->index].priority; 729 if( ia > ib ) return -1; 730 if( ia < ib ) return 1; 731 732 /* tertiary key: random */ 733 return a->salt - b->salt; 734 } 735 736 static int 737 comparePieceByIndex( const void * va, const void * vb ) 738 { 739 const struct weighted_piece * a = va; 740 const struct weighted_piece * b = vb; 741 if( a->index < b->index ) return -1; 742 if( a->index > b->index ) return 1; 613 743 return 0; 614 744 } 615 745 616 static tr_piece_index_t * 617 getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount ) 618 { 619 const tr_torrent * tor = t->tor; 620 const tr_info * inf = &tor->info; 621 tr_piece_index_t i; 622 tr_piece_index_t poolSize = 0; 623 tr_piece_index_t * pool = tr_new( tr_piece_index_t , inf->pieceCount ); 624 int peerCount; 625 const tr_peer ** peers; 626 627 assert( torrentIsLocked( t ) ); 628 629 peers = (const tr_peer**) tr_ptrArrayBase( &t->peers ); 630 peerCount = tr_ptrArraySize( &t->peers ); 631 632 /* make a list of the pieces that we want but don't have */ 633 for( i = 0; i < inf->pieceCount; ++i ) 634 if( !tor->info.pieces[i].dnd 635 && !tr_cpPieceIsComplete( &tor->completion, i ) ) 636 pool[poolSize++] = i; 637 638 /* sort the pool by which to request next */ 639 if( poolSize > 1 ) 640 { 641 tr_piece_index_t j; 642 struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize ); 643 644 for( j = 0; j < poolSize; ++j ) 746 static void 747 pieceListSort( Torrent * t, int mode ) 748 { 749 int(*compar)(const void *, const void *); 750 751 assert( mode==PIECES_SORTED_BY_INDEX 752 || mode==PIECES_SORTED_BY_WEIGHT ); 753 754 if( t->piecesSort != mode ) 755 { 756 //fprintf( stderr, "sort mode was %d, is now %d\n", t->piecesSort, mode ); 757 t->piecesSort = mode; 758 759 switch( mode ) { 760 case PIECES_SORTED_BY_WEIGHT: compar = comparePieceByWeight; break; 761 case PIECES_SORTED_BY_INDEX: compar = comparePieceByIndex; break; 762 default: assert( 0 && "unhandled" ); break; 763 } 764 765 //fprintf( stderr, "sorting pieces by %s...\n", (mode==PIECES_SORTED_BY_WEIGHT)?"weight":"index" ); 766 weightTorrent = t->tor; 767 qsort( t->pieces, t->pieceCount, 768 sizeof( struct weighted_piece ), compar ); 769 } 770 771 /* Also, as long as we've got the pieces sorted by weight, 772 * let's also update t.isInEndgame */ 773 if( t->piecesSort == PIECES_SORTED_BY_WEIGHT ) 774 { 775 tr_bool endgame = TRUE; 776 777 if( ( t->pieces != NULL ) && ( t->pieceCount > 0 ) ) 645 778 { 646 int k; 647 const tr_piece_index_t piece = pool[j]; 648 struct tr_refill_piece * setme = p + j; 649 650 setme->piece = piece; 651 setme->priority = inf->pieces[piece].priority; 652 setme->peerCount = 0; 653 setme->random = tr_cryptoWeakRandInt( INT_MAX ); 654 setme->pendingRequestCount = getPieceRequests( t, piece ); 655 setme->missingBlockCount 656 = tr_cpMissingBlocksInPiece( &tor->completion, piece ); 657 658 for( k = 0; k < peerCount; ++k ) 779 const tr_completion * cp = &t->tor->completion; 780 const struct weighted_piece * p = t->pieces; 781 const int pending = p->requestCount; 782 const int missing = tr_cpMissingBlocksInPiece( cp, p->index ); 783 endgame = pending >= missing; 784 } 785 786 t->isInEndgame = endgame; 787 } 788 } 789 790 static struct weighted_piece * 791 pieceListLookup( Torrent * t, tr_piece_index_t index ) 792 { 793 struct weighted_piece key; 794 key.index = index; 795 796 pieceListSort( t, PIECES_SORTED_BY_INDEX ); 797 798 return bsearch( &key, t->pieces, t->pieceCount, 799 sizeof( struct weighted_piece ), 800 comparePieceByIndex ); 801 } 802 803 static void 804 pieceListRebuild( Torrent * t ) 805 { 806 if( !tr_torrentIsSeed( t->tor ) ) 807 { 808 tr_piece_index_t i; 809 tr_piece_index_t * pool; 810 tr_piece_index_t poolCount = 0; 811 const tr_torrent * tor = t->tor; 812 const tr_info * inf = tr_torrentInfo( tor ); 813 struct weighted_piece * pieces; 814 int pieceCount; 815 816 /* build the new list */ 817 pool = tr_new( tr_piece_index_t, inf->pieceCount ); 818 for( i=0; i<inf->pieceCount; ++i ) 819 if( !inf->pieces[i].dnd ) 820 if( !tr_cpPieceIsComplete( &tor->completion, i ) ) 821 pool[poolCount++] = i; 822 pieceCount = poolCount; 823 pieces = tr_new0( struct weighted_piece, pieceCount ); 824 for( i=0; i<poolCount; ++i ) { 825 struct weighted_piece * piece = pieces + i; 826 piece->index = pool[i]; 827 piece->requestCount = 0; 828 piece->salt = tr_cryptoWeakRandInt( 255 ); 829 } 830 831 /* if we already had a list of pieces, merge it into 832 * the new list so we don't lose its requestCounts */ 833 if( t->pieces != NULL ) 834 { 835 struct weighted_piece * o = t->pieces; 836 struct weighted_piece * oend = o + t->pieceCount; 837 struct weighted_piece * n = pieces; 838 struct weighted_piece * nend = n + pieceCount; 839 840 pieceListSort( t, PIECES_SORTED_BY_INDEX ); 841 842 while( o!=oend && n!=nend ) { 843 if( o->index < n->index ) 844 ++o; 845 else if( o->index > n->index ) 846 ++n; 847 else 848 *n++ = *o++; 849 } 850 851 tr_free( t->pieces ); 852 } 853 854 t->pieces = pieces; 855 t->pieceCount = pieceCount; 856 t->piecesSort = PIECES_SORTED_BY_INDEX; 857 858 /* cleanup */ 859 tr_free( pool ); 860 } 861 } 862 863 static void 864 pieceListRemovePiece( Torrent * t, tr_piece_index_t piece ) 865 { 866 struct weighted_piece * p = pieceListLookup( t, piece ); 867 868 if( p != NULL ) 869 { 870 const int pos = p - t->pieces; 871 872 memmove( t->pieces + pos, 873 t->pieces + pos + 1, 874 sizeof( struct weighted_piece ) * ( --t->pieceCount - pos ) ); 875 876 if( t->pieceCount == 0 ) 877 { 878 tr_free( t->pieces ); 879 t->pieces = NULL; 880 } 881 } 882 } 883 884 static void 885 pieceListRemoveRequest( Torrent * t, tr_block_index_t block ) 886 { 887 struct weighted_piece * p; 888 const tr_piece_index_t index = tr_torBlockPiece( t->tor, block ); 889 890 if(( p = pieceListLookup( t, index ))) 891 if( p->requestCount > 0 ) 892 --p->requestCount; 893 894 /* note: this invalidates the weighted.piece.weight field, 895 * but that's OK since the call to pieceListLookup ensured 896 * that we were sorted by index anyway.. next time we resort 897 * by weight, pieceListSort() will update the weights */ 898 } 899 900 /** 901 *** 902 **/ 903 904 void 905 tr_peerMgrRebuildRequests( tr_torrent * tor ) 906 { 907 assert( tr_isTorrent( tor ) ); 908 909 pieceListRebuild( tor->torrentPeers ); 910 } 911 912 void 913 tr_peerMgrGetNextRequests( tr_torrent * tor, 914 tr_peer * peer, 915 int numwant, 916 tr_block_index_t * setme, 917 int * numgot ) 918 { 919 int i; 920 int got; 921 Torrent * t; 922 struct weighted_piece * pieces; 923 const tr_bitfield * have = peer->have; 924 925 /* sanity clause */ 926 assert( tr_isTorrent( tor ) ); 927 assert( numwant > 0 ); 928 929 /* walk through the pieces and find blocks that should be requested */ 930 got = 0; 931 t = tor->torrentPeers; 932 933 /* prep the pieces list */ 934 if( t->pieces == NULL ) 935 pieceListRebuild( t ); 936 pieceListSort( t, PIECES_SORTED_BY_WEIGHT ); 937 //if( t->isInEndgame ) fprintf( stderr, "endgame\n" ); 938 939 #if 0 940 { 941 int i=0, n=MIN(10,t->pieceCount); 942 fprintf( stderr, "the next pieces we want to request are " ); 943 for( i=0; i<n; i++ ) fprintf( stderr, "%d(weight:%d) ", (int)t->pieces[i].index, (int)t->pieces[i].weight ); 944 fprintf( stderr, "\n" ); 945 } 946 #endif 947 948 pieces = t->pieces; 949 for( i=0; i<t->pieceCount && got<numwant; ++i ) 950 { 951 struct weighted_piece * p = pieces + i; 952 953 /* if the peer has this piece that we want... */ 954 if( tr_bitfieldHasFast( have, p->index ) ) 955 { 956 tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index ); 957 const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index ); 958 959 for( ; b!=e && got<numwant; ++b ) 659 960 { 660 const tr_peer * peer = peers[k]; 661 if( peer->peerIsInterested 662 && !peer->clientIsChoked 663 && tr_bitfieldHas( peer->have, piece ) ) 664 ++setme->peerCount; 961 struct block_request * breq; 962 963 /* don't request blocks we've already got */ 964 if( tr_cpBlockIsCompleteFast( &tor->completion, b ) ) 965 continue; 966 967 /* don't request blocks we've already requested (FIXME) */ 968 breq = requestListLookup( t, b ); 969 if( breq != NULL ) { 970 assert( breq->peer != NULL ); 971 if( breq->peer == peer ) continue; 972 if( !t->isInEndgame ) continue; 973 } 974 975 setme[got++] = b; 976 //fprintf( stderr, "peer %p is requesting block %"PRIu64"\n", peer, b ); 977 978 /* update our own tables */ 979 if( breq == NULL ) 980 requestListAdd( t, b, peer ); 981 ++p->requestCount; 665 982 } 666 983 } 667 668 qsort( p, poolSize, sizeof( struct tr_refill_piece ), 669 compareRefillPiece ); 670 671 for( j = 0; j < poolSize; ++j ) 672 pool[j] = p[j].piece; 673 674 tr_free( p ); 675 } 676 677 *pieceCount = poolSize; 678 return pool; 679 } 680 681 static struct tr_blockIterator* 682 blockIteratorNew( Torrent * t ) 683 { 684 struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 ); 685 i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS; 686 i->t = t; 687 i->pieces = getPreferredPieces( t, &i->pieceCount ); 688 i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece ); 689 tordbg( t, "creating new refill queue.. it contains %"PRIu32" pieces", i->pieceCount ); 690 return i; 691 } 692 693 static tr_bool 694 blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme ) 695 { 696 tr_bool found; 697 Torrent * t = i->t; 698 tr_torrent * tor = t->tor; 699 700 while( ( i->blockIndex == i->blockCount ) 701 && ( i->pieceIndex < i->pieceCount ) ) 702 { 703 const tr_piece_index_t index = i->pieces[i->pieceIndex++]; 704 const tr_block_index_t b = tr_torPieceFirstBlock( tor, index ); 705 const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index ); 706 tr_block_index_t block; 707 708 assert( index < tor->info.pieceCount ); 709 710 i->blockCount = 0; 711 i->blockIndex = 0; 712 for( block=b; block!=e; ++block ) 713 if( !tr_cpBlockIsCompleteFast( &tor->completion, block ) ) 714 i->blocks[i->blockCount++] = block; 715 } 716 717 assert( i->blockCount <= tor->blockCountInPiece ); 718 719 if(( found = ( i->blockIndex < i->blockCount ))) 720 *setme = i->blocks[i->blockIndex++]; 721 722 return found; 723 } 724 725 static void 726 blockIteratorSkipCurrentPiece( struct tr_blockIterator * i ) 727 { 728 i->blockIndex = i->blockCount; 729 } 730 731 static void 732 blockIteratorFree( struct tr_blockIterator ** inout ) 733 { 734 struct tr_blockIterator * it = *inout; 735 736 if( it != NULL ) 737 { 738 tr_free( it->blocks ); 739 tr_free( it->pieces ); 740 tr_free( it ); 741 } 742 743 *inout = NULL; 744 } 745 746 static tr_peer** 747 getPeersUploadingToClient( Torrent * t, 748 int * setmeCount ) 749 { 750 int j; 751 int peerCount = 0; 752 int retCount = 0; 753 tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount ); 754 tr_peer ** ret = tr_new( tr_peer *, peerCount ); 755 756 j = 0; /* this is a temporary test to make sure we walk through all the peers */ 757 if( peerCount ) 758 { 759 /* Get a list of peers we're downloading from. 760 Pick a different starting point each time so all peers 761 get a chance at being the first in line */ 762 const int fencepost = tr_cryptoWeakRandInt( peerCount ); 763 int i = fencepost; 764 do { 765 if( clientIsDownloadingFrom( peers[i] ) ) 766 ret[retCount++] = peers[i]; 767 i = ( i + 1 ) % peerCount; 768 ++j; 769 } while( i != fencepost ); 770 } 771 assert( j == peerCount ); 772 *setmeCount = retCount; 773 return ret; 774 } 775 776 static uint32_t 777 getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b ) 778 { 779 const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b ); 780 const uint64_t blockPos = tor->blockSize * b; 781 assert( blockPos >= piecePos ); 782 return (uint32_t)( blockPos - piecePos ); 783 } 784 984 } 985 986 /* We almost always change only a handful of pieces in the array. 987 * In these cases, it's cheaper to sort those changed pieces and merge, 988 * than qsort()ing the whole array again */ 989 if( got > 0 ) 990 { 991 struct weighted_piece * p; 992 struct weighted_piece * pieces; 993 struct weighted_piece * a = t->pieces; 994 struct weighted_piece * a_end = t->pieces + i; 995 struct weighted_piece * b = a_end; 996 struct weighted_piece * b_end = t->pieces + t->pieceCount; 997 998 /* rescore the pieces that we changed */ 999 weightTorrent = t->tor; 1000 //fprintf( stderr, "sorting %d changed pieces...\n", (int)(a_end-a) ); 1001 qsort( a, a_end-a, sizeof( struct weighted_piece ), comparePieceByWeight ); 1002 1003 /* allocate a new array */ 1004 p = pieces = tr_new( struct weighted_piece, t->pieceCount ); 1005 1006 /* merge the two sorted arrays into this new array */ 1007 weightTorrent = t->tor; 1008 while( a!=a_end && b!=b_end ) 1009 *p++ = comparePieceByWeight( a, b ) < 0 ? *a++ : *b++; 1010 while( a!=a_end ) *p++ = *a++; 1011 while( b!=b_end ) *p++ = *b++; 1012 1013 #if 0 1014 /* make sure we did it right */ 1015 assert( p - pieces == t->pieceCount ); 1016 for( it=pieces; it+1<p; ++it ) 1017 assert( it->weight <= it[1].weight ); 1018 #endif 1019 1020 /* update */ 1021 tr_free( t->pieces ); 1022 t->pieces = pieces; 1023 } 1024 1025 //fprintf( stderr, "peer %p wanted %d requests; got %d\n", peer, numwant, got ); 1026 *numgot = got; 1027 } 1028 1029 tr_bool 1030 tr_peerMgrDidPeerRequest( const tr_torrent * tor, 1031 const tr_peer * peer, 1032 tr_block_index_t block ) 1033 { 1034 const Torrent * t = tor->torrentPeers; 1035 const struct block_request * b = requestListLookup( (Torrent*)t, block ); 1036 if( b == NULL ) return FALSE; 1037 if( b->peer == peer ) return TRUE; 1038 if( t->isInEndgame ) return TRUE; 1039 return FALSE; 1040 } 1041 1042 /* cancel requests that are too old */ 785 1043 static int 786 1044 refillUpkeep( void * vmgr ) 787 1045 { 788 tr_torrent * tor = NULL; 1046 time_t now; 1047 time_t too_old; 1048 tr_torrent * tor; 789 1049 tr_peerMgr * mgr = vmgr; 790 time_t now;791 1050 managerLock( mgr ); 792 1051 793 1052 now = time( NULL ); 794 while(( tor = tr_torrentNext( mgr->session, tor ))) { 1053 too_old = now - REQUEST_TTL_SECS; 1054 1055 tor = NULL; 1056 while(( tor = tr_torrentNext( mgr->session, tor ))) 1057 { 795 1058 Torrent * t = tor->torrentPeers; 796 if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) { 797 tordbg( t, "refill queue is past its shelf date; discarding." ); 798 blockIteratorFree( &t->refillQueue ); 1059 const int n = t->requestCount; 1060 if( n > 0 ) 1061 { 1062 int keepCount = 0; 1063 int cancelCount = 0; 1064 struct block_request * keep = tr_new( struct block_request, n ); 1065 struct block_request * cancel = tr_new( struct block_request, n ); 1066 const struct block_request * it; 1067 const struct block_request * end; 1068 1069 for( it=t->requests, end=it+n; it!=end; ++it ) 1070 if( it->sentAt <= too_old ) 1071 cancel[cancelCount++] = *it; 1072 else 1073 keep[keepCount++] = *it; 1074 1075 /* prune out the ones we aren't keeping */ 1076 tr_free( t->requests ); 1077 t->requests = keep; 1078 t->requestCount = keepCount; 1079 t->requestAlloc = n; 1080 1081 /* send cancel messages for all the "cancel" ones */ 1082 for( it=cancel, end=it+cancelCount; it!=end; ++it ) 1083 if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) 1084 tr_peerMsgsCancel( it->peer->msgs, it->block ); 1085 1086 /* decrement the pending request counts for the timed-out blocks */ 1087 for( it=cancel, end=it+cancelCount; it!=end; ++it ) 1088 pieceListRemoveRequest( t, it->block ); 1089 1090 /* cleanup loop */ 1091 tr_free( cancel ); 799 1092 } 800 1093 } … … 802 1095 managerUnlock( mgr ); 803 1096 return TRUE; 804 }805 806 static void807 sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now );808 809 static void810 refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent )811 {812 tr_block_index_t block;813 int peerCount;814 int webseedCount;815 tr_peer ** peers;816 tr_webseed ** webseeds;817 Torrent * t = vtorrent;818 tr_torrent * tor = t->tor;819 tr_bool hasNext = TRUE;820 821 if( !t->isRunning )822 return;823 if( tr_torrentIsSeed( t->tor ) )824 return;825 826 torrentLock( t );827 tordbg( t, "Refilling Request Buffers..." );828 829 if( t->refillQueue == NULL )830 t->refillQueue = blockIteratorNew( t );831 832 peers = getPeersUploadingToClient( t, &peerCount );833 sortPeersByLiveliness( peers, NULL, peerCount, tr_date( ) );834 webseedCount = tr_ptrArraySize( &t->webseeds );835 webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),836 webseedCount * sizeof( tr_webseed* ) );837 838 while( ( webseedCount || peerCount )839 && (( hasNext = blockIteratorNext( t->refillQueue, &block ))) )840 {841 int j;842 tr_bool handled = FALSE;843 844 const tr_piece_index_t index = tr_torBlockPiece( tor, block );845 const uint32_t offset = getBlockOffsetInPiece( tor, block );846 const uint32_t length = tr_torBlockCountBytes( tor, block );847 848 assert( block < tor->blockCount );849 850 /* find a peer who can ask for this block */851 for( j=0; !handled && j<peerCount; )852 {853 const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );854 switch( val )855 {856 case TR_ADDREQ_FULL:857 case TR_ADDREQ_CLIENT_CHOKED:858 peers[j] = peers[--peerCount];859 break;860 861 case TR_ADDREQ_MISSING:862 case TR_ADDREQ_DUPLICATE:863 ++j;864 break;865 866 case TR_ADDREQ_OK:867 incrementPieceRequests( t, index );868 handled = TRUE;869 break;870 871 default:872 assert( 0 && "unhandled value" );873 break;874 }875 }876 877 /* maybe one of the webseeds can do it */878 for( j=0; !handled && j<webseedCount; )879 {880 const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );881 switch( val )882 {883 case TR_ADDREQ_FULL:884 webseeds[j] = webseeds[--webseedCount];885 break;886 887 case TR_ADDREQ_OK:888 incrementPieceRequests( t, index );889 handled = TRUE;890 break;891 892 default:893 assert( 0 && "unhandled value" );894 break;895 }896 }897 898 if( !handled )899 blockIteratorSkipCurrentPiece( t->refillQueue );900 }901 902 /* cleanup */903 tr_free( webseeds );904 tr_free( peers );905 906 if( !hasNext ) {907 tordbg( t, "refill queue has no more blocks to request... freeing (webseed count: %d, peer count: %d)", webseedCount, peerCount );908 blockIteratorFree( &t->refillQueue );909 }910 911 torrentUnlock( t );912 }913 914 static void915 broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )916 {917 size_t i;918 size_t peerCount;919 tr_peer ** peers;920 921 assert( torrentIsLocked( t ) );922 923 tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );924 925 peerCount = tr_ptrArraySize( &t->peers );926 peers = (tr_peer**) tr_ptrArrayBase( &t->peers );927 for( i=0; i<peerCount; ++i )928 if( peers[i]->msgs )929 tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );930 1097 } 931 1098 … … 946 1113 947 1114 static void 948 gotBadPiece( Torrent * t, 949 tr_piece_index_t pieceIndex ) 1115 gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex ) 950 1116 { 951 1117 tr_torrent * tor = t->tor; … … 954 1120 tor->corruptCur += byteCount; 955 1121 tor->downloadedCur -= MIN( tor->downloadedCur, byteCount ); 956 }957 958 static void959 refillSoon( Torrent * t )960 {961 if( !evtimer_pending( &t->refillTimer, NULL ) )962 tr_timerAdd( &t->refillTimer, 0, REFILL_PERIOD_MSEC );963 1122 } 964 1123 … … 1009 1168 } 1010 1169 #endif 1170 } 1171 1172 static void 1173 decrementDownloadedCount( tr_torrent * tor, uint32_t byteCount ) 1174 { 1175 tor->downloadedCur -= MIN( tor->downloadedCur, byteCount ); 1176 } 1177 1178 static void 1179 clientGotUnwantedBlock( tr_torrent * tor, tr_block_index_t block ) 1180 { 1181 decrementDownloadedCount( tor, tr_torBlockCountBytes( tor, block ) ); 1182 } 1183 1184 static void 1185 removeRequestFromTables( Torrent * t, tr_block_index_t block ) 1186 { 1187 requestListRemove( t, block ); 1188 pieceListRemoveRequest( t, block ); 1189 } 1190 1191 /* peer choked us, or maybe it disconnected. 1192 either way we need to remove all its requests */ 1193 static void 1194 peerDeclinedAllRequests( Torrent * t, const tr_peer * peer ) 1195 { 1196 int i, n; 1197 tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount ); 1198 1199 for( i=n=0; i<t->requestCount; ++i ) 1200 if( peer == t->requests[i].peer ) 1201 blocks[n++] = t->requests[i].block; 1202 1203 for( i=0; i<n; ++i ) 1204 removeRequestFromTables( t, blocks[i] ); 1205 1206 tr_free( blocks ); 1011 1207 } 1012 1208 … … 1035 1231 break; 1036 1232 1037 case TR_PEER_NEED_REQ:1038 refillSoon( t );1039 break;1040 1041 case TR_PEER_CANCEL:1042 decrementPieceRequests( t, e->pieceIndex );1043 break;1044 1045 1233 case TR_PEER_PEER_GOT_DATA: 1046 1234 { … … 1067 1255 break; 1068 1256 } 1257 1258 case TR_PEER_CLIENT_GOT_REJ: 1259 removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ) ); 1260 break; 1261 1262 case TR_PEER_CLIENT_GOT_CHOKE: 1263 peerDeclinedAllRequests( t, peer ); 1264 break; 1069 1265 1070 1266 case TR_PEER_CLIENT_GOT_PORT: … … 1129 1325 { 1130 1326 tr_torrent * tor = t->tor; 1131 1132 1327 tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset ); 1133 1134 tr_cpBlockAdd( &tor->completion, block ); 1135 tr_torrentSetDirty( tor ); 1136 decrementPieceRequests( t, e->pieceIndex ); 1137 1138 broadcastGotBlock( t, e->pieceIndex, e->offset, e->length ); 1139 1140 if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) ) 1328 //static int numBlocks = 0; 1329 //fprintf( stderr, "got a total of %d blocks\n", ++numBlocks ); 1330 1331 requestListRemove( t, block ); 1332 pieceListRemoveRequest( t, block ); 1333 1334 if( tr_cpBlockIsComplete( &tor->completion, block ) ) 1141 1335 { 1142 const tr_piece_index_t p = e->pieceIndex; 1143 const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 ); 1144 1145 if( !ok ) 1336 tordbg( t, "we have this block already..." ); 1337 clientGotUnwantedBlock( tor, block ); 1338 } 1339 else 1340 { 1341 tr_cpBlockAdd( &tor->completion, block ); 1342 tr_torrentSetDirty( tor ); 1343 1344 if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) ) 1146 1345 { 1147 tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ), 1148 (unsigned long)p ); 1149 } 1150 1151 tr_torrentSetHasPiece( tor, p, ok ); 1152 tr_torrentSetPieceChecked( tor, p, TRUE ); 1153 tr_peerMgrSetBlame( tor, p, ok ); 1154 1155 if( !ok ) 1156 { 1157 gotBadPiece( t, p ); 1158 } 1159 else 1160 { 1161 int i; 1162 int peerCount; 1163 tr_peer ** peers; 1164 tr_file_index_t fileIndex; 1165 1166 peerCount = tr_ptrArraySize( &t->peers ); 1167 peers = (tr_peer**) tr_ptrArrayBase( &t->peers ); 1168 for( i=0; i<peerCount; ++i ) 1169 tr_peerMsgsHave( peers[i]->msgs, p ); 1170 1171 for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) { 1172 const tr_file * file = &tor->info.files[fileIndex]; 1173 if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) 1174 if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) 1175 tr_torrentFileCompleted( tor, fileIndex ); 1346 const tr_piece_index_t p = e->pieceIndex; 1347 const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 ); 1348 //fprintf( stderr, "we now have piece #%d\n", (int)p ); 1349 1350 if( !ok ) 1351 { 1352 tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ), 1353 (unsigned long)p ); 1354 } 1355 1356 tr_torrentSetHasPiece( tor, p, ok ); 1357 tr_torrentSetPieceChecked( tor, p, TRUE ); 1358 tr_peerMgrSetBlame( tor, p, ok ); 1359 1360 if( !ok ) 1361 { 1362 gotBadPiece( t, p ); 1363 } 1364 else 1365 { 1366 int i; 1367 int peerCount; 1368 tr_peer ** peers; 1369 tr_file_index_t fileIndex; 1370 1371 peerCount = tr_ptrArraySize( &t->peers ); 1372 peers = (tr_peer**) tr_ptrArrayBase( &t->peers ); 1373 for( i=0; i<peerCount; ++i ) 1374 tr_peerMsgsHave( peers[i]->msgs, p ); 1375 1376 for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) { 1377 const tr_file * file = &tor->info.files[fileIndex]; 1378 if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) 1379 if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) 1380 tr_torrentFileCompleted( tor, fileIndex ); 1381 } 1382 1383 pieceListRemovePiece( t, p ); 1176 1384 } 1177 1385 } … … 1635 1843 ensureMgrTimersExist( t->manager ); 1636 1844 1637 if( !t->isRunning ) 1638 { 1639 t->isRunning = TRUE; 1640 1641 if( !tr_ptrArrayEmpty( &t->webseeds ) ) 1642 refillSoon( t ); 1643 } 1845 t->isRunning = TRUE; 1644 1846 1645 1847 rechokePulse( t->manager ); … … 1650 1852 stopTorrent( Torrent * t ) 1651 1853 { 1854 int i, n; 1855 1652 1856 assert( torrentIsLocked( t ) ); 1653 1857 … … 1655 1859 1656 1860 /* disconnect the peers. */ 1657 tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor ); 1861 for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i ) 1862 peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) ); 1658 1863 tr_ptrArrayClear( &t->peers ); 1659 1864 -
trunk/libtransmission/peer-mgr.h
r9470 r9494 26 26 #include "bitfield.h" 27 27 #include "net.h" 28 #include "peer-common.h" /* struct peer_request */ 28 29 #include "publish.h" /* tr_publisher_tag */ 29 30 #include "utils.h" … … 118 119 tr_bool tr_peerMgrPeerIsSeed( const tr_torrent * tor, 119 120 const tr_address * addr ); 121 122 void tr_peerMgrGetNextRequests( tr_torrent * torrent, 123 tr_peer * peer, 124 int numwant, 125 tr_block_index_t * setme, 126 int * numgot ); 127 128 tr_bool tr_peerMgrDidPeerRequest( const tr_torrent * torrent, 129 const tr_peer * peer, 130 tr_block_index_t block ); 131 132 void tr_peerMgrRebuildRequests( tr_torrent * torrent ); 120 133 121 134 void tr_peerMgrAddIncoming( tr_peerMgr * manager, -
trunk/libtransmission/peer-msgs.c
r9470 r9494 33 33 #include "platform.h" /* MAX_STACK_ARRAY_SIZE */ 34 34 #include "ratecontrol.h" 35 #include "request-list.h"36 35 #include "session.h" 37 36 #include "stats.h" … … 79 78 PEX_INTERVAL_SECS = 90, /* sec between sendPex() calls */ 80 79 80 REQQ = 512, 81 81 82 82 MAX_BLOCK_SIZE = ( 1024 * 16 ), 83 84 85 /* how long an unsent request can stay queued before it's returned86 back to the peer-mgr's pool of requests */87 QUEUED_REQUEST_TTL_SECS = 20,88 89 /* how long a sent request can stay queued before it's returned90 back to the peer-mgr's pool of requests */91 SENT_REQUEST_TTL_SECS = 240,92 83 93 84 /* used in lowering the outMessages queue period */ 94 85 IMMEDIATE_PRIORITY_INTERVAL_SECS = 0, 95 86 HIGH_PRIORITY_INTERVAL_SECS = 2, 96 LOW_PRIORITY_INTERVAL_SECS = 20,87 LOW_PRIORITY_INTERVAL_SECS = 10, 97 88 98 89 /* number of pieces to remove from the bitfield when … … 111 102 AWAITING_BT_PIECE 112 103 }; 104 105 /** 106 *** 107 **/ 108 109 struct peer_request 110 { 111 uint32_t index; 112 uint32_t offset; 113 uint32_t length; 114 }; 115 116 static uint32_t 117 getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b ) 118 { 119 const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b ); 120 const uint64_t blockPos = tor->blockSize * b; 121 assert( blockPos >= piecePos ); 122 return (uint32_t)( blockPos - piecePos ); 123 } 124 125 static void 126 blockToReq( const tr_torrent * tor, 127 tr_block_index_t block, 128 struct peer_request * setme ) 129 { 130 assert( setme != NULL ); 131 132 setme->index = tr_torBlockPiece( tor, block ); 133 setme->offset = getBlockOffsetInPiece( tor, block ); 134 setme->length = tr_torBlockCountBytes( tor, block ); 135 } 113 136 114 137 /** … … 147 170 /*tr_bool haveFastSet;*/ 148 171 172 int activeRequestCount; 173 int desiredRequestCount; 174 149 175 /* how long the outMessages batch should be allowed to grow before 150 176 * it's flushed -- some messages (like requests >:) should be sent … … 156 182 uint16_t pexCount; 157 183 uint16_t pexCount6; 158 uint16_t maxActiveRequests;159 184 160 185 #if 0 … … 171 196 struct evbuffer * outMessages; /* all the non-piece messages */ 172 197 173 struct request_list peerAskedFor; 174 struct request_list clientAskedFor; 175 struct request_list clientWillAskFor; 176 198 struct peer_request peerAskedFor[REQQ]; 199 int peerAskedForCount; 200 177 201 tr_pex * pex; 178 202 tr_pex * pex6; … … 443 467 444 468 static void 445 fireNeedReq( tr_peermsgs * msgs )446 {447 tr_peer_event e = blankEvent;448 e.eventType = TR_PEER_NEED_REQ;449 publish( msgs, &e );450 }451 452 static void453 469 firePeerProgress( tr_peermsgs * msgs ) 454 470 { … … 471 487 472 488 static void 489 fireGotRej( tr_peermsgs * msgs, const struct peer_request * req ) 490 { 491 tr_peer_event e = blankEvent; 492 e.eventType = TR_PEER_CLIENT_GOT_REJ; 493 e.pieceIndex = req->index; 494 e.offset = req->offset; 495 e.length = req->length; 496 publish( msgs, &e ); 497 } 498 499 static void 500 fireGotChoke( tr_peermsgs * msgs ) 501 { 502 tr_peer_event e = blankEvent; 503 e.eventType = TR_PEER_CLIENT_GOT_CHOKE; 504 publish( msgs, &e ); 505 } 506 507 static void 473 508 fireClientGotData( tr_peermsgs * msgs, 474 509 uint32_t length, … … 521 556 e.wasPieceData = wasPieceData; 522 557 523 publish( msgs, &e );524 }525 526 static void527 fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )528 {529 tr_peer_event e = blankEvent;530 e.eventType = TR_PEER_CANCEL;531 e.pieceIndex = req->index;532 e.offset = req->offset;533 e.length = req->length;534 558 publish( msgs, &e ); 535 559 } … … 689 713 if( i != msgs->peer->clientIsInterested ) 690 714 sendInterest( msgs, i ); 691 if( i )692 fireNeedReq( msgs );693 715 } 694 716 695 717 static tr_bool 696 popNextRequest( tr_peermsgs * msgs, 697 struct peer_request * setme ) 698 { 699 return reqListPop( &msgs->peerAskedFor, setme ); 718 popNextRequest( tr_peermsgs * msgs, struct peer_request * setme ) 719 { 720 if( msgs->peerAskedForCount == 0 ) 721 return FALSE; 722 723 *setme = msgs->peerAskedFor[0]; 724 memmove( msgs->peerAskedFor, msgs->peerAskedFor + 1, --msgs->peerAskedForCount ); 725 return TRUE; 700 726 } 701 727 … … 706 732 const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io ); 707 733 708 while( popNextRequest( msgs, &req ) 734 while( popNextRequest( msgs, &req )) 709 735 if( mustSendCancel ) 710 736 protocolSendReject( msgs, &req ); … … 769 795 } 770 796 771 static void 772 expireFromList( tr_peermsgs * msgs, 773 struct request_list * list, 774 const time_t oldestAllowed ) 775 { 776 size_t i; 777 struct request_list tmp = REQUEST_LIST_INIT; 778 779 /* since the fifo list is sorted by time, the oldest will be first */ 780 if( !list->len || ( list->fifo[0].time_requested >= oldestAllowed ) ) 781 return; 782 783 /* if we found one too old, start pruning them */ 784 reqListCopy( &tmp, list ); 785 for( i=0; i<tmp.len; ++i ) { 786 const struct peer_request * req = &tmp.fifo[i]; 787 if( req->time_requested >= oldestAllowed ) 788 break; 789 tr_peerMsgsCancel( msgs, req->index, req->offset, req->length ); 790 } 791 reqListClear( &tmp ); 792 } 793 794 static void 795 expireOldRequests( tr_peermsgs * msgs, const time_t now ) 796 { 797 time_t oldestAllowed; 798 const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io ); 799 dbgmsg( msgs, "entering `expire old requests' block" ); 800 801 /* cancel requests that have been queued for too long */ 802 oldestAllowed = now - QUEUED_REQUEST_TTL_SECS; 803 expireFromList( msgs, &msgs->clientWillAskFor, oldestAllowed ); 804 805 /* if the peer doesn't support "Reject Request", 806 * cancel requests that were sent too long ago. */ 807 if( !fext ) { 808 oldestAllowed = now - SENT_REQUEST_TTL_SECS; 809 expireFromList( msgs, &msgs->clientAskedFor, oldestAllowed ); 810 } 811 812 dbgmsg( msgs, "leaving `expire old requests' block" ); 813 } 814 815 static void 816 pumpRequestQueue( tr_peermsgs * msgs, const time_t now ) 817 { 818 const int max = msgs->maxActiveRequests; 819 int sent = 0; 820 int len = msgs->clientAskedFor.len; 797 798 void 799 tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block ) 800 { 821 801 struct peer_request req; 822 823 dbgmsg( msgs, "clientIsChoked %d, download allowed %d, len %d, max %d, msgs->clientWillAskFor.len %d", 824 (int)msgs->peer->clientIsChoked, 825 (int)tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ), 826 len, max, msgs->clientWillAskFor.len ); 827 828 if( msgs->peer->clientIsChoked ) 829 return; 830 if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) ) 831 return; 832 833 while( ( len < max ) && reqListPop( &msgs->clientWillAskFor, &req ) ) 834 { 835 const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset ); 836 837 assert( requestIsValid( msgs, &req ) ); 838 assert( tr_bitfieldHas( msgs->peer->have, req.index ) ); 839 840 /* don't ask for it if we've already got it... this block may have 841 * come in from a different peer after we cancelled a request for it */ 842 if( !tr_cpBlockIsComplete( &msgs->torrent->completion, block ) ) 843 { 844 protocolSendRequest( msgs, &req ); 845 req.time_requested = now; 846 reqListAppend( &msgs->clientAskedFor, &req ); 847 848 ++len; 849 ++sent; 850 } 851 else dbgmsg( msgs, "not asking for it because we've already got it..." ); 852 } 853 854 if( sent ) 855 dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued", 856 sent, msgs->clientAskedFor.len, msgs->clientWillAskFor.len ); 857 858 if( len < max ) 859 fireNeedReq( msgs ); 860 } 861 862 static TR_INLINE tr_bool 863 requestQueueIsFull( const tr_peermsgs * msgs ) 864 { 865 const int req_max = msgs->maxActiveRequests; 866 return msgs->clientWillAskFor.len >= (size_t)req_max; 867 } 868 869 tr_addreq_t 870 tr_peerMsgsAddRequest( tr_peermsgs * msgs, 871 uint32_t index, 872 uint32_t offset, 873 uint32_t length ) 874 { 875 struct peer_request req; 876 877 assert( msgs ); 878 assert( msgs->torrent ); 879 880 /** 881 *** Reasons to decline the request 882 **/ 883 884 /* don't send requests to choked clients */ 885 if( msgs->peer->clientIsChoked ) { 886 dbgmsg( msgs, "declining request because they're choking us" ); 887 return TR_ADDREQ_CLIENT_CHOKED; 888 } 889 890 /* peer's queue is full */ 891 if( requestQueueIsFull( msgs ) ) { 892 dbgmsg( msgs, "declining request because we're full" ); 893 return TR_ADDREQ_FULL; 894 } 895 896 /* peer doesn't have this piece */ 897 if( !tr_bitfieldHas( msgs->peer->have, index ) ) 898 return TR_ADDREQ_MISSING; 899 900 /* have we already asked for this piece? */ 901 req.index = index; 902 req.offset = offset; 903 req.length = length; 904 if( reqListHas( &msgs->clientAskedFor, &req ) ) { 905 dbgmsg( msgs, "declining because it's a duplicate" ); 906 return TR_ADDREQ_DUPLICATE; 907 } 908 if( reqListHas( &msgs->clientWillAskFor, &req ) ) { 909 dbgmsg( msgs, "declining because it's a duplicate" ); 910 return TR_ADDREQ_DUPLICATE; 911 } 912 913 /** 914 *** Accept this request 915 **/ 916 917 dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list", 918 index, offset, length ); 919 req.time_requested = time( NULL ); 920 reqListAppend( &msgs->clientWillAskFor, &req ); 921 return TR_ADDREQ_OK; 922 } 923 924 static void 925 cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel ) 926 { 927 size_t i; 928 struct request_list a = msgs->clientWillAskFor; 929 struct request_list b = msgs->clientAskedFor; 930 dbgmsg( msgs, "cancelling all requests to peer" ); 931 932 msgs->clientAskedFor = REQUEST_LIST_INIT; 933 msgs->clientWillAskFor = REQUEST_LIST_INIT; 934 935 for( i=0; i<a.len; ++i ) 936 fireCancelledReq( msgs, &a.fifo[i] ); 937 938 for( i = 0; i < b.len; ++i ) { 939 fireCancelledReq( msgs, &b.fifo[i] ); 940 if( sendCancel ) 941 protocolSendCancel( msgs, &b.fifo[i] ); 942 } 943 944 reqListClear( &a ); 945 reqListClear( &b ); 946 } 947 948 void 949 tr_peerMsgsCancel( tr_peermsgs * msgs, 950 uint32_t pieceIndex, 951 uint32_t offset, 952 uint32_t length ) 953 { 954 struct peer_request req; 955 956 assert( msgs != NULL ); 957 assert( length > 0 ); 958 959 960 /* have we asked the peer for this piece? */ 961 req.index = pieceIndex; 962 req.offset = offset; 963 req.length = length; 964 965 /* if it's only in the queue and hasn't been sent yet, free it */ 966 if( reqListRemove( &msgs->clientWillAskFor, &req ) ) { 967 dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length ); 968 fireCancelledReq( msgs, &req ); 969 } 970 971 /* if it's already been sent, send a cancel message too */ 972 if( reqListRemove( &msgs->clientAskedFor, &req ) ) { 973 dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length ); 974 protocolSendCancel( msgs, &req ); 975 fireCancelledReq( msgs, &req ); 976 } 977 } 978 802 blockToReq( msgs->torrent, block, &req ); 803 protocolSendCancel( msgs, &req ); 804 } 979 805 980 806 /** … … 1008 834 tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED ); 1009 835 tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( getSession(msgs) ) ); 836 tr_bencDictAddInt( &val, "reqq", REQQ ); 1010 837 tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) ); 1011 838 tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX ); … … 1252 1079 else if( peerIsChoked ) 1253 1080 dbgmsg( msgs, "rejecting request from choked peer" ); 1081 else if( msgs->peerAskedForCount + 1 >= REQQ ) 1082 dbgmsg( msgs, "rejecting request ... reqq is full" ); 1254 1083 else 1255 1084 allow = TRUE; 1256 1085 1257 1086 if( allow ) 1258 reqListAppend( &msgs->peerAskedFor, req );1087 msgs->peerAskedFor[msgs->peerAskedForCount++] = *req; 1259 1088 else if( fext ) 1260 1089 protocolSendReject( msgs, req ); … … 1365 1194 } 1366 1195 1196 static void updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now ); 1197 1198 static void 1199 decrementActiveRequestCount( tr_peermsgs * msgs ) 1200 { 1201 if( msgs->activeRequestCount > 0 ) 1202 msgs->activeRequestCount--; 1203 } 1204 1367 1205 static int 1368 1206 readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen ) … … 1394 1232 msgs->peer->clientIsChoked = 1; 1395 1233 if( !fext ) 1396 cancelAllRequestsToPeer( msgs, FALSE);1234 fireGotChoke( msgs ); 1397 1235 break; 1398 1236 … … 1400 1238 dbgmsg( msgs, "got Unchoke" ); 1401 1239 msgs->peer->clientIsChoked = 0; 1402 fireNeedReq( msgs);1240 updateDesiredRequestCount( msgs, tr_date( ) ); 1403 1241 break; 1404 1242 … … 1426 1264 1427 1265 case BT_BITFIELD: 1428 {1429 1266 dbgmsg( msgs, "got a bitfield" ); 1430 1267 tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen ); 1431 1268 updatePeerProgress( msgs ); 1432 fireNeedReq( msgs );1433 1269 break; 1434 }1435 1270 1436 1271 case BT_REQUEST: … … 1447 1282 case BT_CANCEL: 1448 1283 { 1284 int i; 1449 1285 struct peer_request r; 1450 1286 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index ); … … 1452 1288 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length ); 1453 1289 dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length ); 1454 if( reqListRemove( &msgs->peerAskedFor, &r ) && fext ) 1455 protocolSendReject( msgs, &r ); 1290 for( i=0; i<msgs->peerAskedForCount; ++i ) { 1291 const struct peer_request * req = msgs->peerAskedFor + i; 1292 if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) ) 1293 break; 1294 } 1295 if( i < msgs->peerAskedForCount ) 1296 memmove( msgs->peerAskedFor+i, msgs->peerAskedFor+i+1, --msgs->peerAskedForCount-i ); 1456 1297 break; 1457 1298 } … … 1521 1362 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset ); 1522 1363 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length ); 1523 if( fext ) 1524 reqListRemove( &msgs->clientAskedFor, &r ); 1525 else { 1364 if( fext ) { 1365 decrementActiveRequestCount( msgs ); 1366 fireGotRej( msgs, &r ); 1367 } else { 1526 1368 fireError( msgs, EMSGSIZE ); 1527 1369 return READ_ERR; … … 1592 1434 dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length ); 1593 1435 1594 /** 1595 *** Remove the block from our `we asked for this' list 1596 **/ 1597 1598 if( !reqListRemove( &msgs->clientAskedFor, req ) ) { 1599 clientGotUnwantedBlock( msgs, req ); 1436 if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) { 1600 1437 dbgmsg( msgs, "we didn't ask for this message..." ); 1601 return 0;1602 }1603 1604 dbgmsg( msgs, "peer has %d more blocks we've asked for",1605 msgs->clientAskedFor.len );1606 1607 /**1608 *** Error checks1609 **/1610 1611 if( tr_cpBlockIsComplete( &tor->completion, block ) ) {1612 dbgmsg( msgs, "we have this block already..." );1613 clientGotUnwantedBlock( msgs, req );1614 1438 return 0; 1615 1439 } … … 1623 1447 1624 1448 addPeerToBlamefield( msgs, req->index ); 1449 decrementActiveRequestCount( msgs ); 1625 1450 fireGotBlock( msgs, req ); 1626 1451 return 0; … … 1686 1511 **/ 1687 1512 1688 static int 1689 ratePulse( tr_peermsgs * msgs, uint64_t now ) 1690 { 1691 int irate; 1692 const int floor = 8; 1693 const int seconds = 10; 1694 double rate; 1695 int estimatedBlocksInPeriod; 1513 static void 1514 updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now ) 1515 { 1696 1516 const tr_torrent * const torrent = msgs->torrent; 1697 1517 1698 /* Get the rate limit we should use. 1699 * FIXME: this needs to consider all the other peers as well... */ 1700 rate = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT ); 1701 if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) ) 1702 rate = MIN( rate, tr_torrentGetSpeedLimit( torrent, TR_PEER_TO_CLIENT ) ); 1703 if( tr_torrentUsesSessionLimits( torrent ) ) 1704 if( tr_sessionGetActiveSpeedLimit( torrent->session, TR_PEER_TO_CLIENT, &irate ) ) 1705 rate = MIN( rate, irate ); 1706 1707 estimatedBlocksInPeriod = ( rate * seconds * 1024 ) / torrent->blockSize; 1708 msgs->maxActiveRequests = MAX( floor, estimatedBlocksInPeriod ); 1709 1710 if( msgs->reqq > 0 ) 1711 msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq ); 1712 1713 return TRUE; 1518 if( tr_torrentIsSeed( msgs->torrent ) ) 1519 { 1520 msgs->desiredRequestCount = 0; 1521 } 1522 else if( msgs->peer->clientIsChoked ) 1523 { 1524 msgs->desiredRequestCount = 0; 1525 } 1526 else 1527 { 1528 int irate; 1529 int estimatedBlocksInPeriod; 1530 double rate; 1531 const int floor = 16; 1532 const int seconds = REQUEST_BUF_SECS; 1533 1534 /* Get the rate limit we should use. 1535 * FIXME: this needs to consider all the other peers as well... */ 1536 rate = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT ); 1537 if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) ) 1538 rate = MIN( rate, tr_torrentGetSpeedLimit( torrent, TR_PEER_TO_CLIENT ) ); 1539 1540 /* honor the session limits, if enabled */ 1541 if( tr_torrentUsesSessionLimits( torrent ) ) 1542 if( tr_sessionGetActiveSpeedLimit( torrent->session, TR_PEER_TO_CLIENT, &irate ) ) 1543 rate = MIN( rate, irate ); 1544 1545 /* use this desired rate to figure out how 1546 * many requests we should send to this peer */ 1547 estimatedBlocksInPeriod = ( rate * seconds * 1024 ) / torrent->blockSize; 1548 msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod ); 1549 1550 /* honor the peer's maximum request count, if specified */ 1551 if( msgs->reqq > 0 ) 1552 if( msgs->desiredRequestCount > msgs->reqq ) 1553 msgs->desiredRequestCount = msgs->reqq; 1554 } 1555 } 1556 1557 static void 1558 updateRequests( tr_peermsgs * msgs ) 1559 { 1560 const int MIN_BATCH_SIZE = 4; 1561 const int numwant = msgs->desiredRequestCount - msgs->activeRequestCount; 1562 1563 /* make sure we have enough block requests queued up */ 1564 if( numwant >= MIN_BATCH_SIZE ) 1565 { 1566 int i; 1567 int n; 1568 tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant ); 1569 1570 tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n ); 1571 1572 for( i=0; i<n; ++i ) 1573 { 1574 struct peer_request req; 1575 blockToReq( msgs->torrent, blocks[i], &req ); 1576 protocolSendRequest( msgs, &req ); 1577 } 1578 1579 msgs->activeRequestCount += n; 1580 1581 tr_free( blocks ); 1582 } 1714 1583 } 1715 1584 … … 1819 1688 const time_t now = time( NULL ); 1820 1689 1821 ratePulse( msgs, now );1822 1823 pumpRequestQueue( msgs, now);1824 expireOldRequests( msgs, now );1690 if ( tr_isPeerIo( msgs->peer->io ) ) { 1691 updateDesiredRequestCount( msgs, now ); 1692 updateRequests( msgs ); 1693 } 1825 1694 1826 1695 for( ;; ) … … 2180 2049 m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS; 2181 2050 m->incoming.block = evbuffer_new( ); 2182 m->peerAskedFor = REQUEST_LIST_INIT; 2183 m->clientAskedFor = REQUEST_LIST_INIT; 2184 m->clientWillAskFor = REQUEST_LIST_INIT; 2051 m->peerAskedForCount = 0; 2185 2052 evtimer_set( &m->pexTimer, pexPulse, m ); 2186 2053 tr_timerAdd( &m->pexTimer, PEX_INTERVAL_SECS, 0 ); … … 2198 2065 2199 2066 tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m ); 2200 ratePulse( m, tr_date() );2067 updateDesiredRequestCount( m, tr_date( ) ); 2201 2068 2202 2069 return m; … … 2210 2077 evtimer_del( &msgs->pexTimer ); 2211 2078 tr_publisherDestruct( &msgs->publisher ); 2212 reqListClear( &msgs->clientWillAskFor );2213 reqListClear( &msgs->clientAskedFor );2214 reqListClear( &msgs->peerAskedFor );2215 2079 2216 2080 evbuffer_free( msgs->incoming.block ); -
trunk/libtransmission/peer-msgs.h
r8561 r9494 49 49 50 50 void tr_peerMsgsCancel( tr_peermsgs * msgs, 51 uint32_t pieceIndex, 52 uint32_t offset, 53 uint32_t length ); 54 51 tr_block_index_t block ); 55 52 56 53 void tr_peerMsgsFree( tr_peermsgs* ); 57 58 tr_addreq_t tr_peerMsgsAddRequest( tr_peermsgs * peer,59 uint32_t pieceIndex,60 uint32_t offset,61 uint32_t length );62 54 63 55 void tr_peerMsgsUnsubscribe( tr_peermsgs * peer, -
trunk/libtransmission/torrent.c
r9470 r9494 1678 1678 { 1679 1679 tr_file_index_t i; 1680 1681 assert( tr_isTorrent( tor ) ); 1682 1680 assert( tr_isTorrent( tor ) ); 1683 1681 tr_torrentLock( tor ); 1684 1682 1685 1683 for( i = 0; i < fileCount; ++i ) 1686 1684 tr_torrentInitFilePriority( tor, files[i], priority ); 1687 1688 1685 tr_torrentSetDirty( tor ); 1686 tr_peerMgrRebuildRequests( tor ); 1687 1689 1688 tr_torrentUnlock( tor ); 1690 1689 } … … 1827 1826 { 1828 1827 assert( tr_isTorrent( tor ) ); 1829 1830 1828 tr_torrentLock( tor ); 1829 1831 1830 tr_torrentInitFileDLs( tor, files, fileCount, doDownload ); 1832 1831 tr_torrentSetDirty( tor ); 1832 tr_peerMgrRebuildRequests( tor ); 1833 1833 1834 tr_torrentUnlock( tor ); 1834 1835 } -
trunk/libtransmission/webseed.c
r9434 r9494 65 65 fireNeedReq( tr_webseed * w ) 66 66 { 67 #if 0 67 68 tr_peer_event e = blankEvent; 68 69 e.eventType = TR_PEER_NEED_REQ; 69 70 publish( w, &e ); 71 #endif 70 72 } 71 73
Note: See TracChangeset
for help on using the changeset viewer.