Ignore:
Timestamp:
Nov 2, 2009, 12:17:30 AM (13 years ago)
Author:
charles
Message:

(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/libtransmission/peer-mgr.c

    r9466 r9470  
    8686
    8787    /* the minimum we'll wait before attempting to reconnect to a peer */
    88     MINIMUM_RECONNECT_INTERVAL_SECS = 5,
    89 
    90     /* this is how many blocks we'll try to queue up
    91      * for the iterator to walk through */
    92     ITERATOR_BLOCK_BUFFER_SIZE = 4096,
    93 
    94     /* if the number of blocks in the iterator queue drops
    95      * below this number, fill it up with more */
    96     ITERATOR_LOW_MARK = 256
     88    MINIMUM_RECONNECT_INTERVAL_SECS = 5
    9789};
     90
    9891
    9992/**
     
    144137}
    145138
    146 struct tr_blockIteratorItem
    147 {
    148     tr_block_index_t block;
    149     uint8_t requestCount;
    150 };
    151 
    152139struct tr_blockIterator
    153140{
    154     tr_bool didLoop;
    155     int pos;
    156     int size;
    157     int begin;
    158     struct tr_blockIteratorItem items[ITERATOR_BLOCK_BUFFER_SIZE];
     141    time_t expirationDate;
    159142    struct tr_torrent_peers * t;
    160     tr_priority_t priority;
     143    tr_block_index_t blockIndex, blockCount, *blocks;
     144    tr_piece_index_t pieceIndex, pieceCount, *pieces;
    161145};
    162146
     
    174158    struct tr_blockIterator  * refillQueue; /* used in refillPulse() */
    175159    struct tr_peerMgr        * manager;
     160    int                      * pendingRequestCount;
    176161
    177162    tr_bool                    isRunning;
     
    186171    tr_timer        * rechokeTimer;
    187172    tr_timer        * reconnectTimer;
     173    tr_timer        * refillUpkeepTimer;
    188174};
    189175
     
    431417    tr_ptrArrayDestruct( &t->peers, NULL );
    432418
     419    tr_free( t->pendingRequestCount );
    433420    tr_free( t );
    434421}
     
    472459static int rechokePulse   ( void * vmgr );
    473460static int reconnectPulse ( void * vmgr );
     461static int refillUpkeep   ( void * vmgr );
    474462
    475463tr_peerMgr*
     
    493481    if( m->reconnectTimer )
    494482        tr_timerFree( &m->reconnectTimer );
     483
     484    if( m->refillUpkeepTimer )
     485        tr_timerFree( &m->refillUpkeepTimer );
    495486}
    496487
     
    549540****/
    550541
     542static void
     543assertValidPiece( Torrent * t, tr_piece_index_t piece )
     544{
     545    assert( t );
     546    assert( t->tor );
     547    assert( piece < t->tor->info.pieceCount );
     548}
     549
     550static int
     551getPieceRequests( Torrent * t, tr_piece_index_t piece )
     552{
     553    assertValidPiece( t, piece );
     554
     555    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
     556}
     557
     558static void
     559incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
     560{
     561    assertValidPiece( t, piece );
     562
     563    if( t->pendingRequestCount == NULL )
     564        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
     565    t->pendingRequestCount[piece]++;
     566}
     567
     568static void
     569decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
     570{
     571    assertValidPiece( t, piece );
     572
     573    if( t->pendingRequestCount )
     574        t->pendingRequestCount[piece]--;
     575}
     576
     577struct tr_refill_piece
     578{
     579    tr_priority_t    priority;
     580    uint32_t         piece;
     581    uint32_t         peerCount;
     582    int              random;
     583    int              pendingRequestCount;
     584    int              missingBlockCount;
     585};
     586
     587static int
     588compareRefillPiece( const void * aIn, const void * bIn )
     589{
     590    const struct tr_refill_piece * a = aIn;
     591    const struct tr_refill_piece * b = bIn;
     592
     593    /* if one piece has a higher priority, it goes first */
     594    if( a->priority != b->priority )
     595        return a->priority > b->priority ? -1 : 1;
     596
     597    /* have a per-priority endgame */
     598    if( a->pendingRequestCount != b->pendingRequestCount )
     599        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
     600
     601    /* fewer missing pieces goes first */
     602    if( a->missingBlockCount != b->missingBlockCount )
     603        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
     604
     605    /* otherwise if one has fewer peers, it goes first */
     606    if( a->peerCount != b->peerCount )
     607        return a->peerCount < b->peerCount ? -1 : 1;
     608
     609    /* otherwise go with our random seed */
     610    if( a->random != b->random )
     611        return a->random < b->random ? -1 : 1;
     612
     613    return 0;
     614}
     615
     616static tr_piece_index_t *
     617getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
     618{
     619    const tr_torrent  * tor = t->tor;
     620    const tr_info     * inf = &tor->info;
     621    tr_piece_index_t    i;
     622    tr_piece_index_t    poolSize = 0;
     623    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
     624    int                 peerCount;
     625    const tr_peer    ** peers;
     626
     627    assert( torrentIsLocked( t ) );
     628
     629    peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
     630    peerCount = tr_ptrArraySize( &t->peers );
     631
     632    /* make a list of the pieces that we want but don't have */
     633    for( i = 0; i < inf->pieceCount; ++i )
     634        if( !tor->info.pieces[i].dnd
     635                && !tr_cpPieceIsComplete( &tor->completion, i ) )
     636            pool[poolSize++] = i;
     637
     638    /* sort the pool by which to request next */
     639    if( poolSize > 1 )
     640    {
     641        tr_piece_index_t j;
     642        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
     643
     644        for( j = 0; j < poolSize; ++j )
     645        {
     646            int k;
     647            const tr_piece_index_t piece = pool[j];
     648            struct tr_refill_piece * setme = p + j;
     649
     650            setme->piece = piece;
     651            setme->priority = inf->pieces[piece].priority;
     652            setme->peerCount = 0;
     653            setme->random = tr_cryptoWeakRandInt( INT_MAX );
     654            setme->pendingRequestCount = getPieceRequests( t, piece );
     655            setme->missingBlockCount
     656                         = tr_cpMissingBlocksInPiece( &tor->completion, piece );
     657
     658            for( k = 0; k < peerCount; ++k )
     659            {
     660                const tr_peer * peer = peers[k];
     661                if( peer->peerIsInterested
     662                        && !peer->clientIsChoked
     663                        && tr_bitfieldHas( peer->have, piece ) )
     664                    ++setme->peerCount;
     665            }
     666        }
     667
     668        qsort( p, poolSize, sizeof( struct tr_refill_piece ),
     669               compareRefillPiece );
     670
     671        for( j = 0; j < poolSize; ++j )
     672            pool[j] = p[j].piece;
     673
     674        tr_free( p );
     675    }
     676
     677    *pieceCount = poolSize;
     678    return pool;
     679}
     680
    551681static struct tr_blockIterator*
    552682blockIteratorNew( Torrent * t )
    553683{
    554684    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
    555     i->pos = 0;
    556     i->size = 0;
    557     i->priority = TR_PRI_HIGH;
     685    i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS;
    558686    i->t = t;
     687    i->pieces = getPreferredPieces( t, &i->pieceCount );
     688    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
     689    tordbg( t, "creating new refill queue.. it contains %"PRIu32" pieces", i->pieceCount );
    559690    return i;
    560691}
    561692
    562 static int
    563 compareIteratorItems( const void * va, const void * vb )
    564 {
    565     const struct tr_blockIteratorItem * a = va;
    566     const struct tr_blockIteratorItem * b = vb;
    567     if( a->block < b->block ) return -1;
    568     if( a->block > b->block ) return 1;
    569     return 0;
    570 }
    571 
    572 static void
    573 blockIteratorRefill( struct tr_blockIterator * it )
    574 {
    575     int pieceCount;
    576     tr_piece_index_t * pieces;
    577     const tr_torrent * tor = it->t->tor;
    578     const tr_info * inf = tr_torrentInfo( tor );
    579 
    580     /* build a pool of the pieces we might request blocks from */
    581     pieces = tr_new( tr_piece_index_t, inf->pieceCount );
    582     pieceCount = 0;
    583     {
    584         tr_piece_index_t i;
    585         for( i=0; i<inf->pieceCount; ++i )
    586             if( !inf->pieces[i].dnd )
    587                 if( inf->pieces[i].priority == it->priority )
    588                     if( !tr_cpPieceIsComplete( &tor->completion, i ) )
    589                         pieces[pieceCount++] = i;
    590     }
    591 
    592     /* while we're short on blocks and there are still pieces left... */
    593     while( ( it->size < ITERATOR_BLOCK_BUFFER_SIZE ) && ( pieceCount > 0 ) )
    594     {
    595         tr_block_index_t i;
    596         tr_block_index_t b;
    597         tr_block_index_t e;
    598 
    599         /* pull a random piece out of the pool */
    600         const int poolIndex = tr_cryptoRandInt( pieceCount );
    601         const tr_piece_index_t piece = pieces[poolIndex];
    602         pieces[poolIndex] = pieces[--pieceCount];
    603 
    604         /* add the piece's blocks that we don't have to our iterator */
    605         b = tr_torPieceFirstBlock( tor, piece );
    606         e = b + tr_torPieceCountBlocks( tor, piece );
    607         for( i=b; (i!=e) && (it->size < ITERATOR_BLOCK_BUFFER_SIZE); ++i ) {
    608             if( !tr_cpBlockIsCompleteFast( &tor->completion, i ) ) {
    609                 int pos;
    610                 struct tr_blockIteratorItem tmp;
    611                 tr_bool match;
    612                 tmp.block = i;
    613                 tmp.requestCount = 0;
    614                 pos = tr_lowerBound( &tmp, it->items, it->size, sizeof(struct tr_blockIteratorItem), compareIteratorItems, &match );
    615                 if( match )
    616                     continue;
    617                 memmove( it->items+pos+1, it->items+pos, sizeof(struct tr_blockIteratorItem)*(it->size-pos) );
    618                 it->items[pos] = tmp;
    619                 ++it->size;
    620             }
    621         }
    622     }
    623 
    624     if( it->pos >= it->size )
    625         it->pos = 0;
    626 
    627     /* cleanup */
    628     tr_free( pieces );
    629 }
    630 
    631 static void
    632 blockIteratorRewind( struct tr_blockIterator * it )
    633 {
    634     /* if we don't have any blocks in the iterator,
    635      * try to regenerate a list of blocks in the current priority.
    636      * if that fails, go to the next lower priority and retry. */
    637     for( ;; ) {
    638         if( it->size <= ITERATOR_LOW_MARK )
    639             blockIteratorRefill( it );
    640         if( it->size > 0 )
    641             break;
    642         if( it->priority == TR_PRI_LOW )
    643             return;
    644         --it->priority; /* try a lower priority */
    645     }
    646 
    647     it->begin = it->pos;
    648     it->didLoop = FALSE;
    649 }
    650 
    651693static tr_bool
    652 blockIteratorNext( struct tr_blockIterator * it, tr_block_index_t * setme )
    653 {
    654     static const int single_req_threshold = 100;
    655     static const int double_req_threshold = 50;
    656 
    657     while( !it->didLoop )
    658     {
    659         ++it->pos;
    660         it->pos %= it->size;
    661         it->didLoop |= it->pos == it->begin;
    662 
    663         if( ( it->items[it->pos].requestCount >= 1 ) && ( it->size >= single_req_threshold ) )
    664             continue;
    665         if( ( it->items[it->pos].requestCount >= 2 ) && ( it->size >= double_req_threshold ) )
    666             continue;
    667 
    668         *setme = it->items[it->pos].block;
    669         return TRUE;
    670     }
    671 
    672     return FALSE;
    673 }
    674 static void
    675 blockIteratorInvalidate( struct tr_blockIterator * it )
    676 {
    677     it->size = 0;
    678     it->priority = TR_PRI_HIGH;
    679 }
    680 
    681 void
    682 tr_peerMgrFilePrioritiesChanged( tr_torrent * tor )
    683 {
    684     if( ( tor != NULL ) && ( tor->torrentPeers != NULL ) && ( tor->torrentPeers->refillQueue != NULL ) )
    685         blockIteratorInvalidate( tor->torrentPeers->refillQueue );
    686 }
    687 
    688 static void
    689 blockIteratorRemoveBlock( struct tr_blockIterator * it, tr_block_index_t block )
    690 {
     694blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
     695{
     696    tr_bool found;
     697    Torrent * t = i->t;
     698    tr_torrent * tor = t->tor;
     699
     700    while( ( i->blockIndex == i->blockCount )
     701        && ( i->pieceIndex < i->pieceCount ) )
     702    {
     703        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
     704        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
     705        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
     706        tr_block_index_t block;
     707
     708        assert( index < tor->info.pieceCount );
     709
     710        i->blockCount = 0;
     711        i->blockIndex = 0;
     712        for( block=b; block!=e; ++block )
     713            if( !tr_cpBlockIsCompleteFast( &tor->completion, block ) )
     714                i->blocks[i->blockCount++] = block;
     715    }
     716
     717    assert( i->blockCount <= tor->blockCountInPiece );
     718
     719    if(( found = ( i->blockIndex < i->blockCount )))
     720        *setme = i->blocks[i->blockIndex++];
     721
     722    return found;
     723}
     724
     725static void
     726blockIteratorSkipCurrentPiece( struct tr_blockIterator * i )
     727{
     728    i->blockIndex = i->blockCount;
     729}
     730
     731static void
     732blockIteratorFree( struct tr_blockIterator ** inout )
     733{
     734    struct tr_blockIterator * it = *inout;
     735
    691736    if( it != NULL )
    692737    {
    693         struct tr_blockIteratorItem tmp, *pos;
    694         tmp.block = block;
    695         pos = bsearch( &tmp, it->items, it->size, sizeof(struct tr_blockIteratorItem), compareIteratorItems );
    696         if( pos != NULL )
    697         {
    698             const int i = pos - it->items;
    699 
    700             assert( pos->block == block );
    701             assert( i >= 0 );
    702             assert( i < it->size );
    703 
    704             --it->size;
    705 
    706             memmove( it->items+i, it->items+i+1, sizeof(struct tr_blockIteratorItem) * (it->size-i) );
    707 
    708             if( it->pos > i )
    709                 --it->pos;
    710         }
    711     }
    712 }
    713 
    714 static void
    715 blockIteratorFree( struct tr_blockIterator ** inout )
    716 {
    717     tr_free( *inout );
     738        tr_free( it->blocks );
     739        tr_free( it->pieces );
     740        tr_free( it );
     741    }
     742
    718743    *inout = NULL;
    719744}
    720745
    721746static tr_peer**
    722 getPeersUploadingToClient( Torrent * t, int * setmeCount )
     747getPeersUploadingToClient( Torrent * t,
     748                           int *     setmeCount )
    723749{
    724750    int j;
     
    757783}
    758784
     785static int
     786refillUpkeep( void * vmgr )
     787{
     788    tr_torrent * tor = NULL;
     789    tr_peerMgr * mgr = vmgr;
     790    time_t now;
     791    managerLock( mgr );
     792
     793    now = time( NULL );
     794    while(( tor = tr_torrentNext( mgr->session, tor ))) {
     795        Torrent * t = tor->torrentPeers;
     796        if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) {
     797            tordbg( t, "refill queue is past its shelf date; discarding." );
     798            blockIteratorFree( &t->refillQueue );
     799        }
     800    }
     801
     802    managerUnlock( mgr );
     803    return TRUE;
     804}
     805
    759806static void
    760807sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now );
     
    774821    if( !t->isRunning )
    775822        return;
    776     if( tr_torrentIsSeed( t->tor ) ) {
    777         blockIteratorFree( &t->refillQueue );
     823    if( tr_torrentIsSeed( t->tor ) )
    778824        return;
    779     }
    780825
    781826    torrentLock( t );
     
    790835    webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
    791836                          webseedCount * sizeof( tr_webseed* ) );
    792 
    793     blockIteratorRewind( t->refillQueue );
    794837
    795838    while( ( webseedCount || peerCount )
     
    822865
    823866                case TR_ADDREQ_OK:
    824                     ++t->refillQueue->items[t->refillQueue->pos].requestCount;
     867                    incrementPieceRequests( t, index );
    825868                    handled = TRUE;
    826869                    break;
     
    843886
    844887                case TR_ADDREQ_OK:
     888                    incrementPieceRequests( t, index );
    845889                    handled = TRUE;
    846890                    break;
     
    851895            }
    852896        }
     897
     898        if( !handled )
     899            blockIteratorSkipCurrentPiece( t->refillQueue );
    853900    }
    854901
     
    856903    tr_free( webseeds );
    857904    tr_free( peers );
     905
     906    if( !hasNext ) {
     907        tordbg( t, "refill queue has no more blocks to request... freeing (webseed count: %d, peer count: %d)", webseedCount, peerCount );
     908        blockIteratorFree( &t->refillQueue );
     909    }
    858910
    859911    torrentUnlock( t );
     
    9881040
    9891041        case TR_PEER_CANCEL:
    990             blockIteratorRemoveBlock( t->refillQueue, _tr_block( t->tor, e->pieceIndex, e->offset ) );
     1042            decrementPieceRequests( t, e->pieceIndex );
    9911043            break;
    9921044
     
    10821134            tr_cpBlockAdd( &tor->completion, block );
    10831135            tr_torrentSetDirty( tor );
    1084             blockIteratorRemoveBlock( t->refillQueue, block );
     1136            decrementPieceRequests( t, e->pieceIndex );
    10851137
    10861138            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
     
    15691621    if( m->reconnectTimer == NULL )
    15701622        m->reconnectTimer = tr_timerNew( s, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
     1623
     1624    if( m->refillUpkeepTimer == NULL )
     1625        m->refillUpkeepTimer = tr_timerNew( s, refillUpkeep, m, REFILL_UPKEEP_PERIOD_MSEC );
    15711626}
    15721627
Note: See TracChangeset for help on using the changeset viewer.