Changeset 9470 for trunk/libtransmission/peer-mgr.c
- Timestamp:
- Nov 2, 2009, 12:17:30 AM (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/libtransmission/peer-mgr.c
r9466 r9470 86 86 87 87 /* the minimum we'll wait before attempting to reconnect to a peer */ 88 MINIMUM_RECONNECT_INTERVAL_SECS = 5, 89 90 /* this is how many blocks we'll try to queue up 91 * for the iterator to walk through */ 92 ITERATOR_BLOCK_BUFFER_SIZE = 4096, 93 94 /* if the number of blocks in the iterator queue drops 95 * below this number, fill it up with more */ 96 ITERATOR_LOW_MARK = 256 88 MINIMUM_RECONNECT_INTERVAL_SECS = 5 97 89 }; 90 98 91 99 92 /** … … 144 137 } 145 138 146 struct tr_blockIteratorItem147 {148 tr_block_index_t block;149 uint8_t requestCount;150 };151 152 139 struct tr_blockIterator 153 140 { 154 tr_bool didLoop; 155 int pos; 156 int size; 157 int begin; 158 struct tr_blockIteratorItem items[ITERATOR_BLOCK_BUFFER_SIZE]; 141 time_t expirationDate; 159 142 struct tr_torrent_peers * t; 160 tr_priority_t priority; 143 tr_block_index_t blockIndex, blockCount, *blocks; 144 tr_piece_index_t pieceIndex, pieceCount, *pieces; 161 145 }; 162 146 … … 174 158 struct tr_blockIterator * refillQueue; /* used in refillPulse() */ 175 159 struct tr_peerMgr * manager; 160 int * pendingRequestCount; 176 161 177 162 tr_bool isRunning; … … 186 171 tr_timer * rechokeTimer; 187 172 tr_timer * reconnectTimer; 173 tr_timer * refillUpkeepTimer; 188 174 }; 189 175 … … 431 417 tr_ptrArrayDestruct( &t->peers, NULL ); 432 418 419 tr_free( t->pendingRequestCount ); 433 420 tr_free( t ); 434 421 } … … 472 459 static int rechokePulse ( void * vmgr ); 473 460 static int reconnectPulse ( void * vmgr ); 461 static int refillUpkeep ( void * vmgr ); 474 462 475 463 tr_peerMgr* … … 493 481 if( m->reconnectTimer ) 494 482 tr_timerFree( &m->reconnectTimer ); 483 484 if( m->refillUpkeepTimer ) 485 tr_timerFree( &m->refillUpkeepTimer ); 495 486 } 496 487 … … 549 540 ****/ 550 541 542 static void 543 assertValidPiece( Torrent * t, tr_piece_index_t piece ) 544 { 545 assert( t ); 546 assert( t->tor ); 547 assert( piece < t->tor->info.pieceCount ); 548 } 549 550 static int 551 getPieceRequests( Torrent * t, tr_piece_index_t piece ) 552 { 553 assertValidPiece( t, piece ); 554 555 return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0; 556 } 557 558 static void 559 incrementPieceRequests( Torrent * t, tr_piece_index_t piece ) 560 { 561 assertValidPiece( t, piece ); 562 563 if( t->pendingRequestCount == NULL ) 564 t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount ); 565 t->pendingRequestCount[piece]++; 566 } 567 568 static void 569 decrementPieceRequests( Torrent * t, tr_piece_index_t piece ) 570 { 571 assertValidPiece( t, piece ); 572 573 if( t->pendingRequestCount ) 574 t->pendingRequestCount[piece]--; 575 } 576 577 struct tr_refill_piece 578 { 579 tr_priority_t priority; 580 uint32_t piece; 581 uint32_t peerCount; 582 int random; 583 int pendingRequestCount; 584 int missingBlockCount; 585 }; 586 587 static int 588 compareRefillPiece( const void * aIn, const void * bIn ) 589 { 590 const struct tr_refill_piece * a = aIn; 591 const struct tr_refill_piece * b = bIn; 592 593 /* if one piece has a higher priority, it goes first */ 594 if( a->priority != b->priority ) 595 return a->priority > b->priority ? -1 : 1; 596 597 /* have a per-priority endgame */ 598 if( a->pendingRequestCount != b->pendingRequestCount ) 599 return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1; 600 601 /* fewer missing pieces goes first */ 602 if( a->missingBlockCount != b->missingBlockCount ) 603 return a->missingBlockCount < b->missingBlockCount ? -1 : 1; 604 605 /* otherwise if one has fewer peers, it goes first */ 606 if( a->peerCount != b->peerCount ) 607 return a->peerCount < b->peerCount ? -1 : 1; 608 609 /* otherwise go with our random seed */ 610 if( a->random != b->random ) 611 return a->random < b->random ? -1 : 1; 612 613 return 0; 614 } 615 616 static tr_piece_index_t * 617 getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount ) 618 { 619 const tr_torrent * tor = t->tor; 620 const tr_info * inf = &tor->info; 621 tr_piece_index_t i; 622 tr_piece_index_t poolSize = 0; 623 tr_piece_index_t * pool = tr_new( tr_piece_index_t , inf->pieceCount ); 624 int peerCount; 625 const tr_peer ** peers; 626 627 assert( torrentIsLocked( t ) ); 628 629 peers = (const tr_peer**) tr_ptrArrayBase( &t->peers ); 630 peerCount = tr_ptrArraySize( &t->peers ); 631 632 /* make a list of the pieces that we want but don't have */ 633 for( i = 0; i < inf->pieceCount; ++i ) 634 if( !tor->info.pieces[i].dnd 635 && !tr_cpPieceIsComplete( &tor->completion, i ) ) 636 pool[poolSize++] = i; 637 638 /* sort the pool by which to request next */ 639 if( poolSize > 1 ) 640 { 641 tr_piece_index_t j; 642 struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize ); 643 644 for( j = 0; j < poolSize; ++j ) 645 { 646 int k; 647 const tr_piece_index_t piece = pool[j]; 648 struct tr_refill_piece * setme = p + j; 649 650 setme->piece = piece; 651 setme->priority = inf->pieces[piece].priority; 652 setme->peerCount = 0; 653 setme->random = tr_cryptoWeakRandInt( INT_MAX ); 654 setme->pendingRequestCount = getPieceRequests( t, piece ); 655 setme->missingBlockCount 656 = tr_cpMissingBlocksInPiece( &tor->completion, piece ); 657 658 for( k = 0; k < peerCount; ++k ) 659 { 660 const tr_peer * peer = peers[k]; 661 if( peer->peerIsInterested 662 && !peer->clientIsChoked 663 && tr_bitfieldHas( peer->have, piece ) ) 664 ++setme->peerCount; 665 } 666 } 667 668 qsort( p, poolSize, sizeof( struct tr_refill_piece ), 669 compareRefillPiece ); 670 671 for( j = 0; j < poolSize; ++j ) 672 pool[j] = p[j].piece; 673 674 tr_free( p ); 675 } 676 677 *pieceCount = poolSize; 678 return pool; 679 } 680 551 681 static struct tr_blockIterator* 552 682 blockIteratorNew( Torrent * t ) 553 683 { 554 684 struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 ); 555 i->pos = 0; 556 i->size = 0; 557 i->priority = TR_PRI_HIGH; 685 i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS; 558 686 i->t = t; 687 i->pieces = getPreferredPieces( t, &i->pieceCount ); 688 i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece ); 689 tordbg( t, "creating new refill queue.. it contains %"PRIu32" pieces", i->pieceCount ); 559 690 return i; 560 691 } 561 692 562 static int563 compareIteratorItems( const void * va, const void * vb )564 {565 const struct tr_blockIteratorItem * a = va;566 const struct tr_blockIteratorItem * b = vb;567 if( a->block < b->block ) return -1;568 if( a->block > b->block ) return 1;569 return 0;570 }571 572 static void573 blockIteratorRefill( struct tr_blockIterator * it )574 {575 int pieceCount;576 tr_piece_index_t * pieces;577 const tr_torrent * tor = it->t->tor;578 const tr_info * inf = tr_torrentInfo( tor );579 580 /* build a pool of the pieces we might request blocks from */581 pieces = tr_new( tr_piece_index_t, inf->pieceCount );582 pieceCount = 0;583 {584 tr_piece_index_t i;585 for( i=0; i<inf->pieceCount; ++i )586 if( !inf->pieces[i].dnd )587 if( inf->pieces[i].priority == it->priority )588 if( !tr_cpPieceIsComplete( &tor->completion, i ) )589 pieces[pieceCount++] = i;590 }591 592 /* while we're short on blocks and there are still pieces left... */593 while( ( it->size < ITERATOR_BLOCK_BUFFER_SIZE ) && ( pieceCount > 0 ) )594 {595 tr_block_index_t i;596 tr_block_index_t b;597 tr_block_index_t e;598 599 /* pull a random piece out of the pool */600 const int poolIndex = tr_cryptoRandInt( pieceCount );601 const tr_piece_index_t piece = pieces[poolIndex];602 pieces[poolIndex] = pieces[--pieceCount];603 604 /* add the piece's blocks that we don't have to our iterator */605 b = tr_torPieceFirstBlock( tor, piece );606 e = b + tr_torPieceCountBlocks( tor, piece );607 for( i=b; (i!=e) && (it->size < ITERATOR_BLOCK_BUFFER_SIZE); ++i ) {608 if( !tr_cpBlockIsCompleteFast( &tor->completion, i ) ) {609 int pos;610 struct tr_blockIteratorItem tmp;611 tr_bool match;612 tmp.block = i;613 tmp.requestCount = 0;614 pos = tr_lowerBound( &tmp, it->items, it->size, sizeof(struct tr_blockIteratorItem), compareIteratorItems, &match );615 if( match )616 continue;617 memmove( it->items+pos+1, it->items+pos, sizeof(struct tr_blockIteratorItem)*(it->size-pos) );618 it->items[pos] = tmp;619 ++it->size;620 }621 }622 }623 624 if( it->pos >= it->size )625 it->pos = 0;626 627 /* cleanup */628 tr_free( pieces );629 }630 631 static void632 blockIteratorRewind( struct tr_blockIterator * it )633 {634 /* if we don't have any blocks in the iterator,635 * try to regenerate a list of blocks in the current priority.636 * if that fails, go to the next lower priority and retry. */637 for( ;; ) {638 if( it->size <= ITERATOR_LOW_MARK )639 blockIteratorRefill( it );640 if( it->size > 0 )641 break;642 if( it->priority == TR_PRI_LOW )643 return;644 --it->priority; /* try a lower priority */645 }646 647 it->begin = it->pos;648 it->didLoop = FALSE;649 }650 651 693 static tr_bool 652 blockIteratorNext( struct tr_blockIterator * it, tr_block_index_t * setme ) 653 { 654 static const int single_req_threshold = 100; 655 static const int double_req_threshold = 50; 656 657 while( !it->didLoop ) 658 { 659 ++it->pos; 660 it->pos %= it->size; 661 it->didLoop |= it->pos == it->begin; 662 663 if( ( it->items[it->pos].requestCount >= 1 ) && ( it->size >= single_req_threshold ) ) 664 continue; 665 if( ( it->items[it->pos].requestCount >= 2 ) && ( it->size >= double_req_threshold ) ) 666 continue; 667 668 *setme = it->items[it->pos].block; 669 return TRUE; 670 } 671 672 return FALSE; 673 } 674 static void 675 blockIteratorInvalidate( struct tr_blockIterator * it ) 676 { 677 it->size = 0; 678 it->priority = TR_PRI_HIGH; 679 } 680 681 void 682 tr_peerMgrFilePrioritiesChanged( tr_torrent * tor ) 683 { 684 if( ( tor != NULL ) && ( tor->torrentPeers != NULL ) && ( tor->torrentPeers->refillQueue != NULL ) ) 685 blockIteratorInvalidate( tor->torrentPeers->refillQueue ); 686 } 687 688 static void 689 blockIteratorRemoveBlock( struct tr_blockIterator * it, tr_block_index_t block ) 690 { 694 blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme ) 695 { 696 tr_bool found; 697 Torrent * t = i->t; 698 tr_torrent * tor = t->tor; 699 700 while( ( i->blockIndex == i->blockCount ) 701 && ( i->pieceIndex < i->pieceCount ) ) 702 { 703 const tr_piece_index_t index = i->pieces[i->pieceIndex++]; 704 const tr_block_index_t b = tr_torPieceFirstBlock( tor, index ); 705 const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index ); 706 tr_block_index_t block; 707 708 assert( index < tor->info.pieceCount ); 709 710 i->blockCount = 0; 711 i->blockIndex = 0; 712 for( block=b; block!=e; ++block ) 713 if( !tr_cpBlockIsCompleteFast( &tor->completion, block ) ) 714 i->blocks[i->blockCount++] = block; 715 } 716 717 assert( i->blockCount <= tor->blockCountInPiece ); 718 719 if(( found = ( i->blockIndex < i->blockCount ))) 720 *setme = i->blocks[i->blockIndex++]; 721 722 return found; 723 } 724 725 static void 726 blockIteratorSkipCurrentPiece( struct tr_blockIterator * i ) 727 { 728 i->blockIndex = i->blockCount; 729 } 730 731 static void 732 blockIteratorFree( struct tr_blockIterator ** inout ) 733 { 734 struct tr_blockIterator * it = *inout; 735 691 736 if( it != NULL ) 692 737 { 693 struct tr_blockIteratorItem tmp, *pos; 694 tmp.block = block; 695 pos = bsearch( &tmp, it->items, it->size, sizeof(struct tr_blockIteratorItem), compareIteratorItems ); 696 if( pos != NULL ) 697 { 698 const int i = pos - it->items; 699 700 assert( pos->block == block ); 701 assert( i >= 0 ); 702 assert( i < it->size ); 703 704 --it->size; 705 706 memmove( it->items+i, it->items+i+1, sizeof(struct tr_blockIteratorItem) * (it->size-i) ); 707 708 if( it->pos > i ) 709 --it->pos; 710 } 711 } 712 } 713 714 static void 715 blockIteratorFree( struct tr_blockIterator ** inout ) 716 { 717 tr_free( *inout ); 738 tr_free( it->blocks ); 739 tr_free( it->pieces ); 740 tr_free( it ); 741 } 742 718 743 *inout = NULL; 719 744 } 720 745 721 746 static tr_peer** 722 getPeersUploadingToClient( Torrent * t, int * setmeCount ) 747 getPeersUploadingToClient( Torrent * t, 748 int * setmeCount ) 723 749 { 724 750 int j; … … 757 783 } 758 784 785 static int 786 refillUpkeep( void * vmgr ) 787 { 788 tr_torrent * tor = NULL; 789 tr_peerMgr * mgr = vmgr; 790 time_t now; 791 managerLock( mgr ); 792 793 now = time( NULL ); 794 while(( tor = tr_torrentNext( mgr->session, tor ))) { 795 Torrent * t = tor->torrentPeers; 796 if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) { 797 tordbg( t, "refill queue is past its shelf date; discarding." ); 798 blockIteratorFree( &t->refillQueue ); 799 } 800 } 801 802 managerUnlock( mgr ); 803 return TRUE; 804 } 805 759 806 static void 760 807 sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now ); … … 774 821 if( !t->isRunning ) 775 822 return; 776 if( tr_torrentIsSeed( t->tor ) ) { 777 blockIteratorFree( &t->refillQueue ); 823 if( tr_torrentIsSeed( t->tor ) ) 778 824 return; 779 }780 825 781 826 torrentLock( t ); … … 790 835 webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ), 791 836 webseedCount * sizeof( tr_webseed* ) ); 792 793 blockIteratorRewind( t->refillQueue );794 837 795 838 while( ( webseedCount || peerCount ) … … 822 865 823 866 case TR_ADDREQ_OK: 824 ++t->refillQueue->items[t->refillQueue->pos].requestCount;867 incrementPieceRequests( t, index ); 825 868 handled = TRUE; 826 869 break; … … 843 886 844 887 case TR_ADDREQ_OK: 888 incrementPieceRequests( t, index ); 845 889 handled = TRUE; 846 890 break; … … 851 895 } 852 896 } 897 898 if( !handled ) 899 blockIteratorSkipCurrentPiece( t->refillQueue ); 853 900 } 854 901 … … 856 903 tr_free( webseeds ); 857 904 tr_free( peers ); 905 906 if( !hasNext ) { 907 tordbg( t, "refill queue has no more blocks to request... freeing (webseed count: %d, peer count: %d)", webseedCount, peerCount ); 908 blockIteratorFree( &t->refillQueue ); 909 } 858 910 859 911 torrentUnlock( t ); … … 988 1040 989 1041 case TR_PEER_CANCEL: 990 blockIteratorRemoveBlock( t->refillQueue, _tr_block( t->tor, e->pieceIndex, e->offset ));1042 decrementPieceRequests( t, e->pieceIndex ); 991 1043 break; 992 1044 … … 1082 1134 tr_cpBlockAdd( &tor->completion, block ); 1083 1135 tr_torrentSetDirty( tor ); 1084 blockIteratorRemoveBlock( t->refillQueue, block);1136 decrementPieceRequests( t, e->pieceIndex ); 1085 1137 1086 1138 broadcastGotBlock( t, e->pieceIndex, e->offset, e->length ); … … 1569 1621 if( m->reconnectTimer == NULL ) 1570 1622 m->reconnectTimer = tr_timerNew( s, reconnectPulse, m, RECONNECT_PERIOD_MSEC ); 1623 1624 if( m->refillUpkeepTimer == NULL ) 1625 m->refillUpkeepTimer = tr_timerNew( s, refillUpkeep, m, REFILL_UPKEEP_PERIOD_MSEC ); 1571 1626 } 1572 1627
Note: See TracChangeset
for help on using the changeset viewer.