Changeset 9494 for trunk/libtransmission/peer-mgr.c
- Timestamp:
- Nov 8, 2009, 11:20:00 PM (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/libtransmission/peer-mgr.c
r9484 r9494 137 137 } 138 138 139 struct tr_blockIterator 140 { 141 time_t expirationDate; 142 struct tr_torrent_peers * t; 143 tr_block_index_t blockIndex, blockCount, *blocks; 144 tr_piece_index_t pieceIndex, pieceCount, *pieces; 139 struct block_request 140 { 141 tr_block_index_t block; 142 tr_peer * peer; 143 time_t sentAt; 144 }; 145 146 struct weighted_piece 147 { 148 tr_piece_index_t index; 149 int16_t salt; 150 int16_t requestCount; 145 151 }; 146 152 … … 156 162 tr_torrent * tor; 157 163 tr_peer * optimistic; /* the optimistic peer, or NULL if none */ 158 struct tr_blockIterator * refillQueue; /* used in refillPulse() */159 164 struct tr_peerMgr * manager; 160 int * pendingRequestCount;165 //int * pendingRequestCount; 161 166 162 167 tr_bool isRunning; 168 169 struct block_request * requests; 170 int requestsSort; 171 int requestCount; 172 int requestAlloc; 173 174 struct weighted_piece * pieces; 175 int piecesSort; 176 int pieceCount; 177 178 tr_bool isInEndgame; 163 179 } 164 180 Torrent; … … 352 368 } 353 369 354 static void 355 peerDestructor( tr_peer * peer ) 356 { 357 assert( peer ); 370 static void peerDeclinedAllRequests( Torrent *, const tr_peer * ); 371 372 static void 373 peerDestructor( Torrent * t, tr_peer * peer ) 374 { 375 assert( peer != NULL ); 376 377 peerDeclinedAllRequests( t, peer ); 358 378 359 379 if( peer->msgs != NULL ) … … 386 406 removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare ); 387 407 assert( removed == peer ); 388 peerDestructor( removed );408 peerDestructor( t, removed ); 389 409 } 390 410 … … 395 415 removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) ); 396 416 } 397 398 static void blockIteratorFree( struct tr_blockIterator ** inout );399 417 400 418 static void … … 411 429 evtimer_del( &t->refillTimer ); 412 430 413 blockIteratorFree( &t->refillQueue );414 431 tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree ); 415 432 tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free ); … … 417 434 tr_ptrArrayDestruct( &t->peers, NULL ); 418 435 419 tr_free( t->pendingRequestCount ); 436 tr_free( t->requests ); 437 tr_free( t->pieces ); 420 438 tr_free( t ); 421 439 } 422 440 423 441 424 static void refillPulse( int, short, void* );442 //static void refillPulse( int, short, void* ); 425 443 426 444 static void peerCallbackFunc( void * vpeer, … … 442 460 t->webseeds = TR_PTR_ARRAY_INIT; 443 461 t->outgoingHandshakes = TR_PTR_ARRAY_INIT; 444 evtimer_set( &t->refillTimer, refillPulse, t ); 462 t->requests = 0; 463 //evtimer_set( &t->refillTimer, refillPulse, t ); 445 464 446 465 … … 534 553 } 535 554 536 /**** 537 ***** 538 ***** REFILL 539 ***** 540 ****/ 541 542 static void 543 assertValidPiece( Torrent * t, tr_piece_index_t piece ) 544 { 545 assert( t ); 546 assert( t->tor ); 547 assert( piece < t->tor->info.pieceCount ); 548 } 555 /** 556 *** REQUESTS 557 *** 558 *** There are two data structures associated with managing block requests: 559 *** 560 *** 1. Torrent::requests, an array of "struct block_request" which keeps 561 *** track of which blocks have been requested, and when, and by which peers. 562 *** This is list is used for (a) cancelling requests that have been pending 563 *** for too long and (b) avoiding duplicate requests before endgame. 564 *** 565 *** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the 566 *** pieces that we want to request. It's used to decide which pieces to 567 *** return next when tr_peerMgrGetBlockRequests() is called. 568 **/ 569 570 /** 571 *** struct block_request 572 **/ 573 574 enum 575 { 576 REQ_UNSORTED, 577 REQ_SORTED_BY_BLOCK, 578 REQ_SORTED_BY_TIME 579 }; 549 580 550 581 static int 551 getPieceRequests( Torrent * t, tr_piece_index_t piece ) 552 { 553 assertValidPiece( t, piece ); 554 555 return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0; 556 } 557 558 static void 559 incrementPieceRequests( Torrent * t, tr_piece_index_t piece ) 560 { 561 assertValidPiece( t, piece ); 562 563 if( t->pendingRequestCount == NULL ) 564 t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount ); 565 t->pendingRequestCount[piece]++; 566 } 567 568 static void 569 decrementPieceRequests( Torrent * t, tr_piece_index_t piece ) 570 { 571 assertValidPiece( t, piece ); 572 573 if( t->pendingRequestCount ) 574 t->pendingRequestCount[piece]--; 575 } 576 577 struct tr_refill_piece 578 { 579 tr_priority_t priority; 580 uint32_t piece; 581 uint32_t peerCount; 582 int random; 583 int pendingRequestCount; 584 int missingBlockCount; 582 compareReqByBlock( const void * va, const void * vb ) 583 { 584 const struct block_request * a = va; 585 const struct block_request * b = vb; 586 if( a->block < b->block ) return -1; 587 if( a->block > b->block ) return 1; 588 return 0; 589 } 590 591 static int 592 compareReqByTime( const void * va, const void * vb ) 593 { 594 const struct block_request * a = va; 595 const struct block_request * b = vb; 596 if( a->sentAt < b->sentAt ) return -1; 597 if( a->sentAt > b->sentAt ) return 1; 598 return 0; 599 } 600 601 static void 602 requestListSort( Torrent * t, int mode ) 603 { 604 assert( mode==REQ_SORTED_BY_BLOCK || mode==REQ_SORTED_BY_TIME ); 605 606 if( t->requestsSort != mode ) 607 { 608 int(*compar)(const void *, const void *); 609 610 t->requestsSort = mode; 611 612 switch( mode ) { 613 case REQ_SORTED_BY_BLOCK: compar = compareReqByBlock; break; 614 case REQ_SORTED_BY_TIME: compar = compareReqByTime; break; 615 default: assert( 0 && "unhandled" ); 616 } 617 618 //fprintf( stderr, "sorting requests by %s\n", (mode==REQ_SORTED_BY_BLOCK)?"block":"time" ); 619 qsort( t->requests, t->requestCount, 620 sizeof( struct block_request ), compar ); 621 } 622 } 623 624 static void 625 requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer ) 626 { 627 struct block_request key; 628 629 /* ensure enough room is available... */ 630 if( t->requestCount + 1 >= t->requestAlloc ) 631 { 632 const int CHUNK_SIZE = 128; 633 t->requestAlloc += CHUNK_SIZE; 634 t->requests = tr_renew( struct block_request, 635 t->requests, t->requestAlloc ); 636 } 637 638 /* populate the record we're inserting */ 639 key.block = block; 640 key.peer = peer; 641 key.sentAt = time( NULL ); 642 643 /* insert the request to our array... */ 644 switch( t->requestsSort ) 645 { 646 case REQ_UNSORTED: 647 case REQ_SORTED_BY_TIME: 648 t->requests[t->requestCount++] = key; 649 break; 650 651 case REQ_SORTED_BY_BLOCK: { 652 tr_bool exact; 653 const int pos = tr_lowerBound( &key, t->requests, t->requestCount, 654 sizeof( struct block_request ), 655 compareReqByBlock, &exact ); 656 assert( !exact ); 657 memmove( t->requests + pos + 1, 658 t->requests + pos, 659 sizeof( struct block_request ) * ( t->requestCount++ - pos ) ); 660 t->requests[pos] = key; 661 break; 662 } 663 } 664 } 665 666 static struct block_request * 667 requestListLookup( Torrent * t, tr_block_index_t block ) 668 { 669 struct block_request key; 670 key.block = block; 671 672 requestListSort( t, REQ_SORTED_BY_BLOCK ); 673 674 return bsearch( &key, t->requests, t->requestCount, 675 sizeof( struct block_request ), 676 compareReqByBlock ); 677 } 678 679 static void 680 requestListRemove( Torrent * t, tr_block_index_t block ) 681 { 682 const struct block_request * b = requestListLookup( t, block ); 683 if( b != NULL ) 684 { 685 const int pos = b - t->requests; 686 assert( pos < t->requestCount ); 687 memmove( t->requests + pos, 688 t->requests + pos + 1, 689 sizeof( struct block_request ) * ( --t->requestCount - pos ) ); 690 } 691 } 692 693 /** 694 *** struct weighted_piece 695 **/ 696 697 enum 698 { 699 PIECES_UNSORTED, 700 PIECES_SORTED_BY_INDEX, 701 PIECES_SORTED_BY_WEIGHT 585 702 }; 586 703 704 const tr_torrent * weightTorrent; 705 706 /* we try to create a "weight" s.t. high-priority pieces come before others, 707 * and that partially-complete pieces come before empty ones. */ 587 708 static int 588 compareRefillPiece( const void * aIn, const void * bIn ) 589 { 590 const struct tr_refill_piece * a = aIn; 591 const struct tr_refill_piece * b = bIn; 592 593 /* if one piece has a higher priority, it goes first */ 594 if( a->priority != b->priority ) 595 return a->priority > b->priority ? -1 : 1; 596 597 /* have a per-priority endgame */ 598 if( a->pendingRequestCount != b->pendingRequestCount ) 599 return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1; 600 601 /* fewer missing pieces goes first */ 602 if( a->missingBlockCount != b->missingBlockCount ) 603 return a->missingBlockCount < b->missingBlockCount ? -1 : 1; 604 605 /* otherwise if one has fewer peers, it goes first */ 606 if( a->peerCount != b->peerCount ) 607 return a->peerCount < b->peerCount ? -1 : 1; 608 609 /* otherwise go with our random seed */ 610 if( a->random != b->random ) 611 return a->random < b->random ? -1 : 1; 612 709 comparePieceByWeight( const void * va, const void * vb ) 710 { 711 const struct weighted_piece * a = va; 712 const struct weighted_piece * b = vb; 713 int ia, ib, missing, pending; 714 const tr_torrent * tor = weightTorrent; 715 716 /* primary key: weight */ 717 missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index ); 718 pending = a->requestCount; 719 ia = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending); 720 missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index ); 721 pending = b->requestCount; 722 ib = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending); 723 if( ia < ib ) return -1; 724 if( ia > ib ) return 1; 725 726 /* secondary key: higher priorities go first */ 727 ia = tor->info.pieces[a->index].priority; 728 ib = tor->info.pieces[b->index].priority; 729 if( ia > ib ) return -1; 730 if( ia < ib ) return 1; 731 732 /* tertiary key: random */ 733 return a->salt - b->salt; 734 } 735 736 static int 737 comparePieceByIndex( const void * va, const void * vb ) 738 { 739 const struct weighted_piece * a = va; 740 const struct weighted_piece * b = vb; 741 if( a->index < b->index ) return -1; 742 if( a->index > b->index ) return 1; 613 743 return 0; 614 744 } 615 745 616 static tr_piece_index_t * 617 getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount ) 618 { 619 const tr_torrent * tor = t->tor; 620 const tr_info * inf = &tor->info; 621 tr_piece_index_t i; 622 tr_piece_index_t poolSize = 0; 623 tr_piece_index_t * pool = tr_new( tr_piece_index_t , inf->pieceCount ); 624 int peerCount; 625 const tr_peer ** peers; 626 627 assert( torrentIsLocked( t ) ); 628 629 peers = (const tr_peer**) tr_ptrArrayBase( &t->peers ); 630 peerCount = tr_ptrArraySize( &t->peers ); 631 632 /* make a list of the pieces that we want but don't have */ 633 for( i = 0; i < inf->pieceCount; ++i ) 634 if( !tor->info.pieces[i].dnd 635 && !tr_cpPieceIsComplete( &tor->completion, i ) ) 636 pool[poolSize++] = i; 637 638 /* sort the pool by which to request next */ 639 if( poolSize > 1 ) 640 { 641 tr_piece_index_t j; 642 struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize ); 643 644 for( j = 0; j < poolSize; ++j ) 746 static void 747 pieceListSort( Torrent * t, int mode ) 748 { 749 int(*compar)(const void *, const void *); 750 751 assert( mode==PIECES_SORTED_BY_INDEX 752 || mode==PIECES_SORTED_BY_WEIGHT ); 753 754 if( t->piecesSort != mode ) 755 { 756 //fprintf( stderr, "sort mode was %d, is now %d\n", t->piecesSort, mode ); 757 t->piecesSort = mode; 758 759 switch( mode ) { 760 case PIECES_SORTED_BY_WEIGHT: compar = comparePieceByWeight; break; 761 case PIECES_SORTED_BY_INDEX: compar = comparePieceByIndex; break; 762 default: assert( 0 && "unhandled" ); break; 763 } 764 765 //fprintf( stderr, "sorting pieces by %s...\n", (mode==PIECES_SORTED_BY_WEIGHT)?"weight":"index" ); 766 weightTorrent = t->tor; 767 qsort( t->pieces, t->pieceCount, 768 sizeof( struct weighted_piece ), compar ); 769 } 770 771 /* Also, as long as we've got the pieces sorted by weight, 772 * let's also update t.isInEndgame */ 773 if( t->piecesSort == PIECES_SORTED_BY_WEIGHT ) 774 { 775 tr_bool endgame = TRUE; 776 777 if( ( t->pieces != NULL ) && ( t->pieceCount > 0 ) ) 645 778 { 646 int k; 647 const tr_piece_index_t piece = pool[j]; 648 struct tr_refill_piece * setme = p + j; 649 650 setme->piece = piece; 651 setme->priority = inf->pieces[piece].priority; 652 setme->peerCount = 0; 653 setme->random = tr_cryptoWeakRandInt( INT_MAX ); 654 setme->pendingRequestCount = getPieceRequests( t, piece ); 655 setme->missingBlockCount 656 = tr_cpMissingBlocksInPiece( &tor->completion, piece ); 657 658 for( k = 0; k < peerCount; ++k ) 779 const tr_completion * cp = &t->tor->completion; 780 const struct weighted_piece * p = t->pieces; 781 const int pending = p->requestCount; 782 const int missing = tr_cpMissingBlocksInPiece( cp, p->index ); 783 endgame = pending >= missing; 784 } 785 786 t->isInEndgame = endgame; 787 } 788 } 789 790 static struct weighted_piece * 791 pieceListLookup( Torrent * t, tr_piece_index_t index ) 792 { 793 struct weighted_piece key; 794 key.index = index; 795 796 pieceListSort( t, PIECES_SORTED_BY_INDEX ); 797 798 return bsearch( &key, t->pieces, t->pieceCount, 799 sizeof( struct weighted_piece ), 800 comparePieceByIndex ); 801 } 802 803 static void 804 pieceListRebuild( Torrent * t ) 805 { 806 if( !tr_torrentIsSeed( t->tor ) ) 807 { 808 tr_piece_index_t i; 809 tr_piece_index_t * pool; 810 tr_piece_index_t poolCount = 0; 811 const tr_torrent * tor = t->tor; 812 const tr_info * inf = tr_torrentInfo( tor ); 813 struct weighted_piece * pieces; 814 int pieceCount; 815 816 /* build the new list */ 817 pool = tr_new( tr_piece_index_t, inf->pieceCount ); 818 for( i=0; i<inf->pieceCount; ++i ) 819 if( !inf->pieces[i].dnd ) 820 if( !tr_cpPieceIsComplete( &tor->completion, i ) ) 821 pool[poolCount++] = i; 822 pieceCount = poolCount; 823 pieces = tr_new0( struct weighted_piece, pieceCount ); 824 for( i=0; i<poolCount; ++i ) { 825 struct weighted_piece * piece = pieces + i; 826 piece->index = pool[i]; 827 piece->requestCount = 0; 828 piece->salt = tr_cryptoWeakRandInt( 255 ); 829 } 830 831 /* if we already had a list of pieces, merge it into 832 * the new list so we don't lose its requestCounts */ 833 if( t->pieces != NULL ) 834 { 835 struct weighted_piece * o = t->pieces; 836 struct weighted_piece * oend = o + t->pieceCount; 837 struct weighted_piece * n = pieces; 838 struct weighted_piece * nend = n + pieceCount; 839 840 pieceListSort( t, PIECES_SORTED_BY_INDEX ); 841 842 while( o!=oend && n!=nend ) { 843 if( o->index < n->index ) 844 ++o; 845 else if( o->index > n->index ) 846 ++n; 847 else 848 *n++ = *o++; 849 } 850 851 tr_free( t->pieces ); 852 } 853 854 t->pieces = pieces; 855 t->pieceCount = pieceCount; 856 t->piecesSort = PIECES_SORTED_BY_INDEX; 857 858 /* cleanup */ 859 tr_free( pool ); 860 } 861 } 862 863 static void 864 pieceListRemovePiece( Torrent * t, tr_piece_index_t piece ) 865 { 866 struct weighted_piece * p = pieceListLookup( t, piece ); 867 868 if( p != NULL ) 869 { 870 const int pos = p - t->pieces; 871 872 memmove( t->pieces + pos, 873 t->pieces + pos + 1, 874 sizeof( struct weighted_piece ) * ( --t->pieceCount - pos ) ); 875 876 if( t->pieceCount == 0 ) 877 { 878 tr_free( t->pieces ); 879 t->pieces = NULL; 880 } 881 } 882 } 883 884 static void 885 pieceListRemoveRequest( Torrent * t, tr_block_index_t block ) 886 { 887 struct weighted_piece * p; 888 const tr_piece_index_t index = tr_torBlockPiece( t->tor, block ); 889 890 if(( p = pieceListLookup( t, index ))) 891 if( p->requestCount > 0 ) 892 --p->requestCount; 893 894 /* note: this invalidates the weighted.piece.weight field, 895 * but that's OK since the call to pieceListLookup ensured 896 * that we were sorted by index anyway.. next time we resort 897 * by weight, pieceListSort() will update the weights */ 898 } 899 900 /** 901 *** 902 **/ 903 904 void 905 tr_peerMgrRebuildRequests( tr_torrent * tor ) 906 { 907 assert( tr_isTorrent( tor ) ); 908 909 pieceListRebuild( tor->torrentPeers ); 910 } 911 912 void 913 tr_peerMgrGetNextRequests( tr_torrent * tor, 914 tr_peer * peer, 915 int numwant, 916 tr_block_index_t * setme, 917 int * numgot ) 918 { 919 int i; 920 int got; 921 Torrent * t; 922 struct weighted_piece * pieces; 923 const tr_bitfield * have = peer->have; 924 925 /* sanity clause */ 926 assert( tr_isTorrent( tor ) ); 927 assert( numwant > 0 ); 928 929 /* walk through the pieces and find blocks that should be requested */ 930 got = 0; 931 t = tor->torrentPeers; 932 933 /* prep the pieces list */ 934 if( t->pieces == NULL ) 935 pieceListRebuild( t ); 936 pieceListSort( t, PIECES_SORTED_BY_WEIGHT ); 937 //if( t->isInEndgame ) fprintf( stderr, "endgame\n" ); 938 939 #if 0 940 { 941 int i=0, n=MIN(10,t->pieceCount); 942 fprintf( stderr, "the next pieces we want to request are " ); 943 for( i=0; i<n; i++ ) fprintf( stderr, "%d(weight:%d) ", (int)t->pieces[i].index, (int)t->pieces[i].weight ); 944 fprintf( stderr, "\n" ); 945 } 946 #endif 947 948 pieces = t->pieces; 949 for( i=0; i<t->pieceCount && got<numwant; ++i ) 950 { 951 struct weighted_piece * p = pieces + i; 952 953 /* if the peer has this piece that we want... */ 954 if( tr_bitfieldHasFast( have, p->index ) ) 955 { 956 tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index ); 957 const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index ); 958 959 for( ; b!=e && got<numwant; ++b ) 659 960 { 660 const tr_peer * peer = peers[k]; 661 if( peer->peerIsInterested 662 && !peer->clientIsChoked 663 && tr_bitfieldHas( peer->have, piece ) ) 664 ++setme->peerCount; 961 struct block_request * breq; 962 963 /* don't request blocks we've already got */ 964 if( tr_cpBlockIsCompleteFast( &tor->completion, b ) ) 965 continue; 966 967 /* don't request blocks we've already requested (FIXME) */ 968 breq = requestListLookup( t, b ); 969 if( breq != NULL ) { 970 assert( breq->peer != NULL ); 971 if( breq->peer == peer ) continue; 972 if( !t->isInEndgame ) continue; 973 } 974 975 setme[got++] = b; 976 //fprintf( stderr, "peer %p is requesting block %"PRIu64"\n", peer, b ); 977 978 /* update our own tables */ 979 if( breq == NULL ) 980 requestListAdd( t, b, peer ); 981 ++p->requestCount; 665 982 } 666 983 } 667 668 qsort( p, poolSize, sizeof( struct tr_refill_piece ), 669 compareRefillPiece ); 670 671 for( j = 0; j < poolSize; ++j ) 672 pool[j] = p[j].piece; 673 674 tr_free( p ); 675 } 676 677 *pieceCount = poolSize; 678 return pool; 679 } 680 681 static struct tr_blockIterator* 682 blockIteratorNew( Torrent * t ) 683 { 684 struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 ); 685 i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS; 686 i->t = t; 687 i->pieces = getPreferredPieces( t, &i->pieceCount ); 688 i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece ); 689 tordbg( t, "creating new refill queue.. it contains %"PRIu32" pieces", i->pieceCount ); 690 return i; 691 } 692 693 static tr_bool 694 blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme ) 695 { 696 tr_bool found; 697 Torrent * t = i->t; 698 tr_torrent * tor = t->tor; 699 700 while( ( i->blockIndex == i->blockCount ) 701 && ( i->pieceIndex < i->pieceCount ) ) 702 { 703 const tr_piece_index_t index = i->pieces[i->pieceIndex++]; 704 const tr_block_index_t b = tr_torPieceFirstBlock( tor, index ); 705 const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index ); 706 tr_block_index_t block; 707 708 assert( index < tor->info.pieceCount ); 709 710 i->blockCount = 0; 711 i->blockIndex = 0; 712 for( block=b; block!=e; ++block ) 713 if( !tr_cpBlockIsCompleteFast( &tor->completion, block ) ) 714 i->blocks[i->blockCount++] = block; 715 } 716 717 assert( i->blockCount <= tor->blockCountInPiece ); 718 719 if(( found = ( i->blockIndex < i->blockCount ))) 720 *setme = i->blocks[i->blockIndex++]; 721 722 return found; 723 } 724 725 static void 726 blockIteratorSkipCurrentPiece( struct tr_blockIterator * i ) 727 { 728 i->blockIndex = i->blockCount; 729 } 730 731 static void 732 blockIteratorFree( struct tr_blockIterator ** inout ) 733 { 734 struct tr_blockIterator * it = *inout; 735 736 if( it != NULL ) 737 { 738 tr_free( it->blocks ); 739 tr_free( it->pieces ); 740 tr_free( it ); 741 } 742 743 *inout = NULL; 744 } 745 746 static tr_peer** 747 getPeersUploadingToClient( Torrent * t, 748 int * setmeCount ) 749 { 750 int j; 751 int peerCount = 0; 752 int retCount = 0; 753 tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount ); 754 tr_peer ** ret = tr_new( tr_peer *, peerCount ); 755 756 j = 0; /* this is a temporary test to make sure we walk through all the peers */ 757 if( peerCount ) 758 { 759 /* Get a list of peers we're downloading from. 760 Pick a different starting point each time so all peers 761 get a chance at being the first in line */ 762 const int fencepost = tr_cryptoWeakRandInt( peerCount ); 763 int i = fencepost; 764 do { 765 if( clientIsDownloadingFrom( peers[i] ) ) 766 ret[retCount++] = peers[i]; 767 i = ( i + 1 ) % peerCount; 768 ++j; 769 } while( i != fencepost ); 770 } 771 assert( j == peerCount ); 772 *setmeCount = retCount; 773 return ret; 774 } 775 776 static uint32_t 777 getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b ) 778 { 779 const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b ); 780 const uint64_t blockPos = tor->blockSize * b; 781 assert( blockPos >= piecePos ); 782 return (uint32_t)( blockPos - piecePos ); 783 } 784 984 } 985 986 /* We almost always change only a handful of pieces in the array. 987 * In these cases, it's cheaper to sort those changed pieces and merge, 988 * than qsort()ing the whole array again */ 989 if( got > 0 ) 990 { 991 struct weighted_piece * p; 992 struct weighted_piece * pieces; 993 struct weighted_piece * a = t->pieces; 994 struct weighted_piece * a_end = t->pieces + i; 995 struct weighted_piece * b = a_end; 996 struct weighted_piece * b_end = t->pieces + t->pieceCount; 997 998 /* rescore the pieces that we changed */ 999 weightTorrent = t->tor; 1000 //fprintf( stderr, "sorting %d changed pieces...\n", (int)(a_end-a) ); 1001 qsort( a, a_end-a, sizeof( struct weighted_piece ), comparePieceByWeight ); 1002 1003 /* allocate a new array */ 1004 p = pieces = tr_new( struct weighted_piece, t->pieceCount ); 1005 1006 /* merge the two sorted arrays into this new array */ 1007 weightTorrent = t->tor; 1008 while( a!=a_end && b!=b_end ) 1009 *p++ = comparePieceByWeight( a, b ) < 0 ? *a++ : *b++; 1010 while( a!=a_end ) *p++ = *a++; 1011 while( b!=b_end ) *p++ = *b++; 1012 1013 #if 0 1014 /* make sure we did it right */ 1015 assert( p - pieces == t->pieceCount ); 1016 for( it=pieces; it+1<p; ++it ) 1017 assert( it->weight <= it[1].weight ); 1018 #endif 1019 1020 /* update */ 1021 tr_free( t->pieces ); 1022 t->pieces = pieces; 1023 } 1024 1025 //fprintf( stderr, "peer %p wanted %d requests; got %d\n", peer, numwant, got ); 1026 *numgot = got; 1027 } 1028 1029 tr_bool 1030 tr_peerMgrDidPeerRequest( const tr_torrent * tor, 1031 const tr_peer * peer, 1032 tr_block_index_t block ) 1033 { 1034 const Torrent * t = tor->torrentPeers; 1035 const struct block_request * b = requestListLookup( (Torrent*)t, block ); 1036 if( b == NULL ) return FALSE; 1037 if( b->peer == peer ) return TRUE; 1038 if( t->isInEndgame ) return TRUE; 1039 return FALSE; 1040 } 1041 1042 /* cancel requests that are too old */ 785 1043 static int 786 1044 refillUpkeep( void * vmgr ) 787 1045 { 788 tr_torrent * tor = NULL; 1046 time_t now; 1047 time_t too_old; 1048 tr_torrent * tor; 789 1049 tr_peerMgr * mgr = vmgr; 790 time_t now;791 1050 managerLock( mgr ); 792 1051 793 1052 now = time( NULL ); 794 while(( tor = tr_torrentNext( mgr->session, tor ))) { 1053 too_old = now - REQUEST_TTL_SECS; 1054 1055 tor = NULL; 1056 while(( tor = tr_torrentNext( mgr->session, tor ))) 1057 { 795 1058 Torrent * t = tor->torrentPeers; 796 if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) { 797 tordbg( t, "refill queue is past its shelf date; discarding." ); 798 blockIteratorFree( &t->refillQueue ); 1059 const int n = t->requestCount; 1060 if( n > 0 ) 1061 { 1062 int keepCount = 0; 1063 int cancelCount = 0; 1064 struct block_request * keep = tr_new( struct block_request, n ); 1065 struct block_request * cancel = tr_new( struct block_request, n ); 1066 const struct block_request * it; 1067 const struct block_request * end; 1068 1069 for( it=t->requests, end=it+n; it!=end; ++it ) 1070 if( it->sentAt <= too_old ) 1071 cancel[cancelCount++] = *it; 1072 else 1073 keep[keepCount++] = *it; 1074 1075 /* prune out the ones we aren't keeping */ 1076 tr_free( t->requests ); 1077 t->requests = keep; 1078 t->requestCount = keepCount; 1079 t->requestAlloc = n; 1080 1081 /* send cancel messages for all the "cancel" ones */ 1082 for( it=cancel, end=it+cancelCount; it!=end; ++it ) 1083 if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) 1084 tr_peerMsgsCancel( it->peer->msgs, it->block ); 1085 1086 /* decrement the pending request counts for the timed-out blocks */ 1087 for( it=cancel, end=it+cancelCount; it!=end; ++it ) 1088 pieceListRemoveRequest( t, it->block ); 1089 1090 /* cleanup loop */ 1091 tr_free( cancel ); 799 1092 } 800 1093 } … … 802 1095 managerUnlock( mgr ); 803 1096 return TRUE; 804 }805 806 static void807 sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now );808 809 static void810 refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent )811 {812 tr_block_index_t block;813 int peerCount;814 int webseedCount;815 tr_peer ** peers;816 tr_webseed ** webseeds;817 Torrent * t = vtorrent;818 tr_torrent * tor = t->tor;819 tr_bool hasNext = TRUE;820 821 if( !t->isRunning )822 return;823 if( tr_torrentIsSeed( t->tor ) )824 return;825 826 torrentLock( t );827 tordbg( t, "Refilling Request Buffers..." );828 829 if( t->refillQueue == NULL )830 t->refillQueue = blockIteratorNew( t );831 832 peers = getPeersUploadingToClient( t, &peerCount );833 sortPeersByLiveliness( peers, NULL, peerCount, tr_date( ) );834 webseedCount = tr_ptrArraySize( &t->webseeds );835 webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),836 webseedCount * sizeof( tr_webseed* ) );837 838 while( ( webseedCount || peerCount )839 && (( hasNext = blockIteratorNext( t->refillQueue, &block ))) )840 {841 int j;842 tr_bool handled = FALSE;843 844 const tr_piece_index_t index = tr_torBlockPiece( tor, block );845 const uint32_t offset = getBlockOffsetInPiece( tor, block );846 const uint32_t length = tr_torBlockCountBytes( tor, block );847 848 assert( block < tor->blockCount );849 850 /* find a peer who can ask for this block */851 for( j=0; !handled && j<peerCount; )852 {853 const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );854 switch( val )855 {856 case TR_ADDREQ_FULL:857 case TR_ADDREQ_CLIENT_CHOKED:858 peers[j] = peers[--peerCount];859 break;860 861 case TR_ADDREQ_MISSING:862 case TR_ADDREQ_DUPLICATE:863 ++j;864 break;865 866 case TR_ADDREQ_OK:867 incrementPieceRequests( t, index );868 handled = TRUE;869 break;870 871 default:872 assert( 0 && "unhandled value" );873 break;874 }875 }876 877 /* maybe one of the webseeds can do it */878 for( j=0; !handled && j<webseedCount; )879 {880 const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );881 switch( val )882 {883 case TR_ADDREQ_FULL:884 webseeds[j] = webseeds[--webseedCount];885 break;886 887 case TR_ADDREQ_OK:888 incrementPieceRequests( t, index );889 handled = TRUE;890 break;891 892 default:893 assert( 0 && "unhandled value" );894 break;895 }896 }897 898 if( !handled )899 blockIteratorSkipCurrentPiece( t->refillQueue );900 }901 902 /* cleanup */903 tr_free( webseeds );904 tr_free( peers );905 906 if( !hasNext ) {907 tordbg( t, "refill queue has no more blocks to request... freeing (webseed count: %d, peer count: %d)", webseedCount, peerCount );908 blockIteratorFree( &t->refillQueue );909 }910 911 torrentUnlock( t );912 }913 914 static void915 broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )916 {917 size_t i;918 size_t peerCount;919 tr_peer ** peers;920 921 assert( torrentIsLocked( t ) );922 923 tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );924 925 peerCount = tr_ptrArraySize( &t->peers );926 peers = (tr_peer**) tr_ptrArrayBase( &t->peers );927 for( i=0; i<peerCount; ++i )928 if( peers[i]->msgs )929 tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );930 1097 } 931 1098 … … 946 1113 947 1114 static void 948 gotBadPiece( Torrent * t, 949 tr_piece_index_t pieceIndex ) 1115 gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex ) 950 1116 { 951 1117 tr_torrent * tor = t->tor; … … 954 1120 tor->corruptCur += byteCount; 955 1121 tor->downloadedCur -= MIN( tor->downloadedCur, byteCount ); 956 }957 958 static void959 refillSoon( Torrent * t )960 {961 if( !evtimer_pending( &t->refillTimer, NULL ) )962 tr_timerAdd( &t->refillTimer, 0, REFILL_PERIOD_MSEC );963 1122 } 964 1123 … … 1009 1168 } 1010 1169 #endif 1170 } 1171 1172 static void 1173 decrementDownloadedCount( tr_torrent * tor, uint32_t byteCount ) 1174 { 1175 tor->downloadedCur -= MIN( tor->downloadedCur, byteCount ); 1176 } 1177 1178 static void 1179 clientGotUnwantedBlock( tr_torrent * tor, tr_block_index_t block ) 1180 { 1181 decrementDownloadedCount( tor, tr_torBlockCountBytes( tor, block ) ); 1182 } 1183 1184 static void 1185 removeRequestFromTables( Torrent * t, tr_block_index_t block ) 1186 { 1187 requestListRemove( t, block ); 1188 pieceListRemoveRequest( t, block ); 1189 } 1190 1191 /* peer choked us, or maybe it disconnected. 1192 either way we need to remove all its requests */ 1193 static void 1194 peerDeclinedAllRequests( Torrent * t, const tr_peer * peer ) 1195 { 1196 int i, n; 1197 tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount ); 1198 1199 for( i=n=0; i<t->requestCount; ++i ) 1200 if( peer == t->requests[i].peer ) 1201 blocks[n++] = t->requests[i].block; 1202 1203 for( i=0; i<n; ++i ) 1204 removeRequestFromTables( t, blocks[i] ); 1205 1206 tr_free( blocks ); 1011 1207 } 1012 1208 … … 1035 1231 break; 1036 1232 1037 case TR_PEER_NEED_REQ:1038 refillSoon( t );1039 break;1040 1041 case TR_PEER_CANCEL:1042 decrementPieceRequests( t, e->pieceIndex );1043 break;1044 1045 1233 case TR_PEER_PEER_GOT_DATA: 1046 1234 { … … 1067 1255 break; 1068 1256 } 1257 1258 case TR_PEER_CLIENT_GOT_REJ: 1259 removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ) ); 1260 break; 1261 1262 case TR_PEER_CLIENT_GOT_CHOKE: 1263 peerDeclinedAllRequests( t, peer ); 1264 break; 1069 1265 1070 1266 case TR_PEER_CLIENT_GOT_PORT: … … 1129 1325 { 1130 1326 tr_torrent * tor = t->tor; 1131 1132 1327 tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset ); 1133 1134 tr_cpBlockAdd( &tor->completion, block ); 1135 tr_torrentSetDirty( tor ); 1136 decrementPieceRequests( t, e->pieceIndex ); 1137 1138 broadcastGotBlock( t, e->pieceIndex, e->offset, e->length ); 1139 1140 if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) ) 1328 //static int numBlocks = 0; 1329 //fprintf( stderr, "got a total of %d blocks\n", ++numBlocks ); 1330 1331 requestListRemove( t, block ); 1332 pieceListRemoveRequest( t, block ); 1333 1334 if( tr_cpBlockIsComplete( &tor->completion, block ) ) 1141 1335 { 1142 const tr_piece_index_t p = e->pieceIndex; 1143 const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 ); 1144 1145 if( !ok ) 1336 tordbg( t, "we have this block already..." ); 1337 clientGotUnwantedBlock( tor, block ); 1338 } 1339 else 1340 { 1341 tr_cpBlockAdd( &tor->completion, block ); 1342 tr_torrentSetDirty( tor ); 1343 1344 if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) ) 1146 1345 { 1147 tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ), 1148 (unsigned long)p ); 1149 } 1150 1151 tr_torrentSetHasPiece( tor, p, ok ); 1152 tr_torrentSetPieceChecked( tor, p, TRUE ); 1153 tr_peerMgrSetBlame( tor, p, ok ); 1154 1155 if( !ok ) 1156 { 1157 gotBadPiece( t, p ); 1158 } 1159 else 1160 { 1161 int i; 1162 int peerCount; 1163 tr_peer ** peers; 1164 tr_file_index_t fileIndex; 1165 1166 peerCount = tr_ptrArraySize( &t->peers ); 1167 peers = (tr_peer**) tr_ptrArrayBase( &t->peers ); 1168 for( i=0; i<peerCount; ++i ) 1169 tr_peerMsgsHave( peers[i]->msgs, p ); 1170 1171 for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) { 1172 const tr_file * file = &tor->info.files[fileIndex]; 1173 if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) 1174 if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) 1175 tr_torrentFileCompleted( tor, fileIndex ); 1346 const tr_piece_index_t p = e->pieceIndex; 1347 const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 ); 1348 //fprintf( stderr, "we now have piece #%d\n", (int)p ); 1349 1350 if( !ok ) 1351 { 1352 tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ), 1353 (unsigned long)p ); 1354 } 1355 1356 tr_torrentSetHasPiece( tor, p, ok ); 1357 tr_torrentSetPieceChecked( tor, p, TRUE ); 1358 tr_peerMgrSetBlame( tor, p, ok ); 1359 1360 if( !ok ) 1361 { 1362 gotBadPiece( t, p ); 1363 } 1364 else 1365 { 1366 int i; 1367 int peerCount; 1368 tr_peer ** peers; 1369 tr_file_index_t fileIndex; 1370 1371 peerCount = tr_ptrArraySize( &t->peers ); 1372 peers = (tr_peer**) tr_ptrArrayBase( &t->peers ); 1373 for( i=0; i<peerCount; ++i ) 1374 tr_peerMsgsHave( peers[i]->msgs, p ); 1375 1376 for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) { 1377 const tr_file * file = &tor->info.files[fileIndex]; 1378 if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) 1379 if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) 1380 tr_torrentFileCompleted( tor, fileIndex ); 1381 } 1382 1383 pieceListRemovePiece( t, p ); 1176 1384 } 1177 1385 } … … 1635 1843 ensureMgrTimersExist( t->manager ); 1636 1844 1637 if( !t->isRunning ) 1638 { 1639 t->isRunning = TRUE; 1640 1641 if( !tr_ptrArrayEmpty( &t->webseeds ) ) 1642 refillSoon( t ); 1643 } 1845 t->isRunning = TRUE; 1644 1846 1645 1847 rechokePulse( t->manager ); … … 1650 1852 stopTorrent( Torrent * t ) 1651 1853 { 1854 int i, n; 1855 1652 1856 assert( torrentIsLocked( t ) ); 1653 1857 … … 1655 1859 1656 1860 /* disconnect the peers. */ 1657 tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor ); 1861 for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i ) 1862 peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) ); 1658 1863 tr_ptrArrayClear( &t->peers ); 1659 1864
Note: See TracChangeset
for help on using the changeset viewer.