Changeset 9465


Ignore:
Timestamp:
Nov 1, 2009, 2:10:47 AM (12 years ago)
Author:
charles
Message:

(trunk libT) #2548: T's request queue can send out too many duplicate requests

Location:
trunk/libtransmission
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/libtransmission/peer-mgr.c

    r9438 r9465  
    8686
    8787    /* the minimum we'll wait before attempting to reconnect to a peer */
    88     MINIMUM_RECONNECT_INTERVAL_SECS = 5
     88    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
     89
     90    /* this is how many blocks we'll try to queue up
     91     * for the iterator to walk through */
     92    ITERATOR_BLOCK_BUFFER_SIZE = 2048,
     93
     94    /* if the number of blocks in the iterator queue drops
     95     * below this number, fill it up with more */
     96    ITERATOR_LOW_MARK = 256
    8997};
    90 
    9198
    9299/**
     
    137144}
    138145
     146struct tr_blockIteratorItem
     147{
     148    tr_block_index_t block;
     149    uint8_t requestCount;
     150};
     151
    139152struct tr_blockIterator
    140153{
    141     time_t expirationDate;
     154    tr_bool didLoop;
     155    int pos;
     156    int size;
     157    int begin;
     158    struct tr_blockIteratorItem items[ITERATOR_BLOCK_BUFFER_SIZE];
    142159    struct tr_torrent_peers * t;
    143     tr_block_index_t blockIndex, blockCount, *blocks;
    144     tr_piece_index_t pieceIndex, pieceCount, *pieces;
     160    tr_priority_t priority;
    145161};
    146162
     
    158174    struct tr_blockIterator  * refillQueue; /* used in refillPulse() */
    159175    struct tr_peerMgr        * manager;
    160     int                      * pendingRequestCount;
    161176
    162177    tr_bool                    isRunning;
     
    171186    tr_timer        * rechokeTimer;
    172187    tr_timer        * reconnectTimer;
    173     tr_timer        * refillUpkeepTimer;
    174188};
    175189
     
    417431    tr_ptrArrayDestruct( &t->peers, NULL );
    418432
    419     tr_free( t->pendingRequestCount );
    420433    tr_free( t );
    421434}
     
    459472static int rechokePulse   ( void * vmgr );
    460473static int reconnectPulse ( void * vmgr );
    461 static int refillUpkeep   ( void * vmgr );
    462474
    463475tr_peerMgr*
     
    481493    if( m->reconnectTimer )
    482494        tr_timerFree( &m->reconnectTimer );
    483 
    484     if( m->refillUpkeepTimer )
    485         tr_timerFree( &m->refillUpkeepTimer );
    486495}
    487496
     
    540549****/
    541550
    542 static void
    543 assertValidPiece( Torrent * t, tr_piece_index_t piece )
    544 {
    545     assert( t );
    546     assert( t->tor );
    547     assert( piece < t->tor->info.pieceCount );
    548 }
    549 
    550 static int
    551 getPieceRequests( Torrent * t, tr_piece_index_t piece )
    552 {
    553     assertValidPiece( t, piece );
    554 
    555     return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
    556 }
    557 
    558 static void
    559 incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
    560 {
    561     assertValidPiece( t, piece );
    562 
    563     if( t->pendingRequestCount == NULL )
    564         t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
    565     t->pendingRequestCount[piece]++;
    566 }
    567 
    568 static void
    569 decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
    570 {
    571     assertValidPiece( t, piece );
    572 
    573     if( t->pendingRequestCount )
    574         t->pendingRequestCount[piece]--;
    575 }
    576 
    577 struct tr_refill_piece
    578 {
    579     tr_priority_t    priority;
    580     uint32_t         piece;
    581     uint32_t         peerCount;
    582     int              random;
    583     int              pendingRequestCount;
    584     int              missingBlockCount;
    585 };
    586 
    587 static int
    588 compareRefillPiece( const void * aIn, const void * bIn )
    589 {
    590     const struct tr_refill_piece * a = aIn;
    591     const struct tr_refill_piece * b = bIn;
    592 
    593     /* if one piece has a higher priority, it goes first */
    594     if( a->priority != b->priority )
    595         return a->priority > b->priority ? -1 : 1;
    596 
    597     /* have a per-priority endgame */
    598     if( a->pendingRequestCount != b->pendingRequestCount )
    599         return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
    600 
    601     /* fewer missing pieces goes first */
    602     if( a->missingBlockCount != b->missingBlockCount )
    603         return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
    604 
    605     /* otherwise if one has fewer peers, it goes first */
    606     if( a->peerCount != b->peerCount )
    607         return a->peerCount < b->peerCount ? -1 : 1;
    608 
    609     /* otherwise go with our random seed */
    610     if( a->random != b->random )
    611         return a->random < b->random ? -1 : 1;
    612 
    613     return 0;
    614 }
    615 
    616 static tr_piece_index_t *
    617 getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
    618 {
    619     const tr_torrent  * tor = t->tor;
    620     const tr_info     * inf = &tor->info;
    621     tr_piece_index_t    i;
    622     tr_piece_index_t    poolSize = 0;
    623     tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
    624     int                 peerCount;
    625     const tr_peer    ** peers;
    626 
    627     assert( torrentIsLocked( t ) );
    628 
    629     peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
    630     peerCount = tr_ptrArraySize( &t->peers );
    631 
    632     /* make a list of the pieces that we want but don't have */
    633     for( i = 0; i < inf->pieceCount; ++i )
    634         if( !tor->info.pieces[i].dnd
    635                 && !tr_cpPieceIsComplete( &tor->completion, i ) )
    636             pool[poolSize++] = i;
    637 
    638     /* sort the pool by which to request next */
    639     if( poolSize > 1 )
    640     {
    641         tr_piece_index_t j;
    642         struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
    643 
    644         for( j = 0; j < poolSize; ++j )
    645         {
    646             int k;
    647             const tr_piece_index_t piece = pool[j];
    648             struct tr_refill_piece * setme = p + j;
    649 
    650             setme->piece = piece;
    651             setme->priority = inf->pieces[piece].priority;
    652             setme->peerCount = 0;
    653             setme->random = tr_cryptoWeakRandInt( INT_MAX );
    654             setme->pendingRequestCount = getPieceRequests( t, piece );
    655             setme->missingBlockCount
    656                          = tr_cpMissingBlocksInPiece( &tor->completion, piece );
    657 
    658             for( k = 0; k < peerCount; ++k )
    659             {
    660                 const tr_peer * peer = peers[k];
    661                 if( peer->peerIsInterested
    662                         && !peer->clientIsChoked
    663                         && tr_bitfieldHas( peer->have, piece ) )
    664                     ++setme->peerCount;
    665             }
    666         }
    667 
    668         qsort( p, poolSize, sizeof( struct tr_refill_piece ),
    669                compareRefillPiece );
    670 
    671         for( j = 0; j < poolSize; ++j )
    672             pool[j] = p[j].piece;
    673 
    674         tr_free( p );
    675     }
    676 
    677     *pieceCount = poolSize;
    678     return pool;
    679 }
    680 
    681551static struct tr_blockIterator*
    682552blockIteratorNew( Torrent * t )
    683553{
    684554    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
    685     i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS;
     555    i->pos = 0;
     556    i->size = 0;
     557    i->priority = TR_PRI_HIGH;
    686558    i->t = t;
    687     i->pieces = getPreferredPieces( t, &i->pieceCount );
    688     i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
    689     tordbg( t, "creating new refill queue.. it contains %"PRIu32" pieces", i->pieceCount );
    690559    return i;
    691560}
    692561
     562static int
     563compareIteratorItems( const void * va, const void * vb )
     564{
     565    const struct tr_blockIteratorItem * a = va;
     566    const struct tr_blockIteratorItem * b = vb;
     567    if( a->block < b->block ) return -1;
     568    if( a->block > b->block ) return 1;
     569    return 0;
     570}
     571
     572static void
     573blockIteratorRefill( struct tr_blockIterator * it )
     574{
     575    int pieceCount;
     576    tr_piece_index_t * pieces;
     577    const tr_torrent * tor = it->t->tor;
     578    const tr_info * inf = tr_torrentInfo( tor );
     579
     580    /* build a pool of the pieces we might request blocks from */
     581    pieces = tr_new( tr_piece_index_t, inf->pieceCount );
     582    pieceCount = 0;
     583    {
     584        tr_piece_index_t i;
     585        for( i=0; i<inf->pieceCount; ++i )
     586            if( !inf->pieces[i].dnd )
     587                if( inf->pieces[i].priority == it->priority )
     588                    if( !tr_cpPieceIsComplete( &tor->completion, i ) )
     589                        pieces[pieceCount++] = i;
     590    }
     591
     592    /* while we're short on blocks and there are still pieces left... */
     593    while( ( it->size < ITERATOR_BLOCK_BUFFER_SIZE ) && ( pieceCount > 0 ) )
     594    {
     595        tr_block_index_t i;
     596        tr_block_index_t b;
     597        tr_block_index_t e;
     598
     599        /* pull a random piece out of the pool */
     600        const int poolIndex = tr_cryptoRandInt( pieceCount );
     601        const tr_piece_index_t piece = pieces[poolIndex];
     602        pieces[poolIndex] = pieces[--pieceCount];
     603
     604        /* add the piece's blocks that we don't have to our iterator */
     605        b = tr_torPieceFirstBlock( tor, piece );
     606        e = b + tr_torPieceCountBlocks( tor, piece );
     607        for( i=b; (i!=e) && (it->size < ITERATOR_BLOCK_BUFFER_SIZE); ++i ) {
     608            if( !tr_cpBlockIsCompleteFast( &tor->completion, i ) ) {
     609                int pos;
     610                struct tr_blockIteratorItem tmp;
     611                tr_bool match;
     612                tmp.block = i;
     613                tmp.requestCount = 0;
     614                pos = tr_lowerBound( &tmp, it->items, it->size, sizeof(struct tr_blockIteratorItem), compareIteratorItems, &match );
     615                if( match )
     616                    continue;
     617                memmove( it->items+pos+1, it->items+pos, sizeof(struct tr_blockIteratorItem)*(it->size-pos) );
     618                it->items[pos] = tmp;
     619                ++it->size;
     620            }
     621        }
     622    }
     623
     624    if( it->pos >= it->size )
     625        it->pos = 0;
     626
     627    /* cleanup */
     628    tr_free( pieces );
     629}
     630
     631static void
     632blockIteratorRewind( struct tr_blockIterator * it )
     633{
     634    /* if we don't have any blocks in the iterator,
     635     * try to regenerate a list of blocks in the current priority.
     636     * if that fails, go to the next lower priority and retry. */
     637    for( ;; ) {
     638        if( it->size <= ITERATOR_LOW_MARK )
     639            blockIteratorRefill( it );
     640        if( it->size > 0 )
     641            break;
     642        if( it->priority == TR_PRI_LOW )
     643            return;
     644        --it->priority; /* try a lower priority */
     645    }
     646
     647    it->begin = it->pos;
     648    it->didLoop = FALSE;
     649}
     650
    693651static tr_bool
    694 blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
    695 {
    696     tr_bool found;
    697     Torrent * t = i->t;
    698     tr_torrent * tor = t->tor;
    699 
    700     while( ( i->blockIndex == i->blockCount )
    701         && ( i->pieceIndex < i->pieceCount ) )
    702     {
    703         const tr_piece_index_t index = i->pieces[i->pieceIndex++];
    704         const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
    705         const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
    706         tr_block_index_t block;
    707 
    708         assert( index < tor->info.pieceCount );
    709 
    710         i->blockCount = 0;
    711         i->blockIndex = 0;
    712         for( block=b; block!=e; ++block )
    713             if( !tr_cpBlockIsCompleteFast( &tor->completion, block ) )
    714                 i->blocks[i->blockCount++] = block;
    715     }
    716 
    717     assert( i->blockCount <= tor->blockCountInPiece );
    718 
    719     if(( found = ( i->blockIndex < i->blockCount )))
    720         *setme = i->blocks[i->blockIndex++];
    721 
    722     return found;
    723 }
    724 
    725 static void
    726 blockIteratorSkipCurrentPiece( struct tr_blockIterator * i )
    727 {
    728     i->blockIndex = i->blockCount;
     652blockIteratorNext( struct tr_blockIterator * it, tr_block_index_t * setme )
     653{
     654    static const int single_req_threshold = 100;
     655    static const int double_req_threshold = 50;
     656
     657    while( !it->didLoop )
     658    {
     659        ++it->pos;
     660        it->pos %= it->size;
     661        it->didLoop |= it->pos == it->begin;
     662
     663        if( ( it->items[it->pos].requestCount >= 1 ) && ( it->size >= single_req_threshold ) )
     664            continue;
     665        if( ( it->items[it->pos].requestCount >= 2 ) && ( it->size >= double_req_threshold ) )
     666            continue;
     667
     668        *setme = it->items[it->pos].block;
     669        return TRUE;
     670    }
     671
     672    return FALSE;
     673}
     674static void
     675blockIteratorInvalidate( struct tr_blockIterator * it )
     676{
     677    it->size = 0;
     678    it->priority = TR_PRI_HIGH;
     679}
     680
     681void
     682tr_peerMgrFilePrioritiesChanged( tr_torrent * tor )
     683{
     684    if( ( tor != NULL ) && ( tor->torrentPeers != NULL ) && ( tor->torrentPeers->refillQueue != NULL ) )
     685        blockIteratorInvalidate( tor->torrentPeers->refillQueue );
     686}
     687
     688static void
     689blockIteratorRemoveBlock( struct tr_blockIterator * it, tr_block_index_t block )
     690{
     691    if( it != NULL )
     692    {
     693        struct tr_blockIteratorItem tmp, *pos;
     694        tmp.block = block;
     695        pos = bsearch( &tmp, it->items, it->size, sizeof(struct tr_blockIteratorItem), compareIteratorItems );
     696        if( pos != NULL )
     697        {
     698            const int i = pos - it->items;
     699
     700            assert( pos->block == block );
     701            assert( i >= 0 );
     702            assert( i < it->size );
     703
     704            --it->size;
     705
     706            memmove( it->items+i, it->items+i+1, sizeof(struct tr_blockIteratorItem) * (it->size-i) );
     707
     708            if( it->pos > i )
     709                --it->pos;
     710        }
     711    }
    729712}
    730713
     
    732715blockIteratorFree( struct tr_blockIterator ** inout )
    733716{
    734     struct tr_blockIterator * it = *inout;
    735 
    736     if( it != NULL )
    737     {
    738         tr_free( it->blocks );
    739         tr_free( it->pieces );
    740         tr_free( it );
    741     }
    742 
     717    tr_free( *inout );
    743718    *inout = NULL;
    744719}
    745720
    746721static tr_peer**
    747 getPeersUploadingToClient( Torrent * t,
    748                            int *     setmeCount )
     722getPeersUploadingToClient( Torrent * t, int * setmeCount )
    749723{
    750724    int j;
     
    783757}
    784758
    785 static int
    786 refillUpkeep( void * vmgr )
    787 {
    788     tr_torrent * tor = NULL;
    789     tr_peerMgr * mgr = vmgr;
    790     time_t now;
    791     managerLock( mgr );
    792 
    793     now = time( NULL );
    794     while(( tor = tr_torrentNext( mgr->session, tor ))) {
    795         Torrent * t = tor->torrentPeers;
    796         if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) {
    797             tordbg( t, "refill queue is past its shelf date; discarding." );
    798             blockIteratorFree( &t->refillQueue );
    799         }
    800     }
    801 
    802     managerUnlock( mgr );
    803     return TRUE;
    804 }
    805 
    806759static void
    807760sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now );
     
    821774    if( !t->isRunning )
    822775        return;
    823     if( tr_torrentIsSeed( t->tor ) )
     776    if( tr_torrentIsSeed( t->tor ) ) {
     777        blockIteratorFree( &t->refillQueue );
    824778        return;
     779    }
    825780
    826781    torrentLock( t );
     
    835790    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
    836791                          webseedCount * sizeof( tr_webseed* ) );
     792
     793    blockIteratorRewind( t->refillQueue );
    837794
    838795    while( ( webseedCount || peerCount )
     
    865822
    866823                case TR_ADDREQ_OK:
    867                     incrementPieceRequests( t, index );
     824                    ++t->refillQueue->items[t->refillQueue->pos].requestCount;
    868825                    handled = TRUE;
    869826                    break;
     
    886843
    887844                case TR_ADDREQ_OK:
    888                     incrementPieceRequests( t, index );
    889845                    handled = TRUE;
    890846                    break;
     
    895851            }
    896852        }
    897 
    898         if( !handled )
    899             blockIteratorSkipCurrentPiece( t->refillQueue );
    900853    }
    901854
     
    903856    tr_free( webseeds );
    904857    tr_free( peers );
    905 
    906     if( !hasNext ) {
    907         tordbg( t, "refill queue has no more blocks to request... freeing (webseed count: %d, peer count: %d)", webseedCount, peerCount );
    908         blockIteratorFree( &t->refillQueue );
    909     }
    910858
    911859    torrentUnlock( t );
     
    1040988
    1041989        case TR_PEER_CANCEL:
    1042             decrementPieceRequests( t, e->pieceIndex );
     990            blockIteratorRemoveBlock( t->refillQueue, _tr_block( t->tor, e->pieceIndex, e->offset ) );
    1043991            break;
    1044992
     
    11341082            tr_cpBlockAdd( &tor->completion, block );
    11351083            tr_torrentSetDirty( tor );
    1136             decrementPieceRequests( t, e->pieceIndex );
     1084            blockIteratorRemoveBlock( t->refillQueue, block );
    11371085
    11381086            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
     
    16211569    if( m->reconnectTimer == NULL )
    16221570        m->reconnectTimer = tr_timerNew( s, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
    1623 
    1624     if( m->refillUpkeepTimer == NULL )
    1625         m->refillUpkeepTimer = tr_timerNew( s, refillUpkeep, m, REFILL_UPKEEP_PERIOD_MSEC );
    16261571}
    16271572
  • trunk/libtransmission/peer-mgr.h

    r9434 r9465  
    119119                              const tr_address * addr );
    120120
     121void tr_peerMgrFilePrioritiesChanged( tr_torrent * tor );
     122
    121123void tr_peerMgrAddIncoming( tr_peerMgr  * manager,
    122124                            tr_address  * addr,
  • trunk/libtransmission/peer-msgs.c

    r9434 r9465  
    459459}
    460460
     461static double blocksGotten = 0.0;
     462
    461463static void
    462464fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
    463465{
    464466    tr_peer_event e = blankEvent;
     467++blocksGotten;
    465468    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
    466469    e.pieceIndex = req->index;
     
    15591562clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
    15601563{
     1564static double unwantedGotten = 0.0;
     1565fprintf( stderr, "dupe ratio: %f\n", ++unwantedGotten / blocksGotten );
    15611566    decrementDownloadedCount( msgs, req->length );
    15621567}
  • trunk/libtransmission/torrent.c

    r9455 r9465  
    16861686        tr_torrentInitFilePriority( tor, files[i], priority );
    16871687
     1688    tr_peerMgrFilePrioritiesChanged( tor );
    16881689    tr_torrentSetDirty( tor );
    16891690    tr_torrentUnlock( tor );
Note: See TracChangeset for help on using the changeset viewer.