Changeset 6954


Ignore:
Timestamp:
Oct 25, 2008, 2:20:16 AM (12 years ago)
Author:
charles
Message:

new & improved fix for #617: Transmission goes above the set bandwidth limits

Location:
trunk/libtransmission
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/libtransmission/peer-mgr.c

    r6944 r6954  
    9797    uint16_t          numFails;
    9898    struct in_addr    addr;
    99     time_t            time; /* the last time the peer's connection status
    100                               changed */
     99    time_t            time; /* when the peer's connection status last changed */
    101100    time_t            piece_data_time;
    102101};
     
    104103typedef struct
    105104{
     105    unsigned int    isRunning : 1;
     106
    106107    uint8_t         hash[SHA_DIGEST_LENGTH];
     108    int         *   pendingRequestCount;
    107109    tr_ptrArray *   outgoingHandshakes; /* tr_handshake */
    108110    tr_ptrArray *   pool; /* struct peer_atom */
     
    114116    tr_torrent *    tor;
    115117    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
    116     tr_bitfield *   requestedBlocks;
    117 
    118     unsigned int    isRunning : 1;
    119118
    120119    struct tr_peerMgr * manager;
     
    405404    tr_timerFree( &t->refillTimer );
    406405
    407     tr_bitfieldFree( t->requestedBlocks );
    408406    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
    409407    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
     
    411409    tr_ptrArrayFree( t->peers, NULL );
    412410
     411    tr_free( t->pendingRequestCount );
    413412    tr_free( t );
    414413}
     
    432431    t->webseeds = tr_ptrArrayNew( );
    433432    t->outgoingHandshakes = tr_ptrArrayNew( );
    434     t->requestedBlocks = tr_bitfieldNew( tor->blockCount );
    435433    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
    436434
     
    587585}
    588586
    589 /***
    590 ****  Refill
    591 ***/
     587/****
     588*****
     589*****  REFILL
     590*****
     591****/
     592
     593static void
     594assertValidPiece( Torrent * t, tr_piece_index_t piece )
     595{
     596    assert( t );
     597    assert( t->tor );
     598    assert( piece < t->tor->info.pieceCount );
     599}
     600
     601static int
     602getPieceRequests( Torrent * t, tr_piece_index_t piece )
     603{
     604    assertValidPiece( t, piece );
     605
     606    return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
     607}
     608
     609static void
     610incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
     611{
     612    assertValidPiece( t, piece );
     613
     614    if( t->pendingRequestCount == NULL )
     615        t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
     616    t->pendingRequestCount[piece]++;
     617}
     618
     619static void
     620decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
     621{
     622    assertValidPiece( t, piece );
     623
     624    if( t->pendingRequestCount )
     625        t->pendingRequestCount[piece]--;
     626}
    592627
    593628struct tr_refill_piece
    594629{
    595     int              missingBlockCount;
    596630    tr_priority_t    priority;
    597     int              random;
    598631    uint32_t         piece;
    599632    uint32_t         peerCount;
     633    int              missingBlockCount;
     634    int              random;
     635    int              pendingRequestCount;
    600636};
    601637
     
    606642    const struct tr_refill_piece * a = aIn;
    607643    const struct tr_refill_piece * b = bIn;
     644
     645    /* if one piece has a higher priority, it goes first */
     646    if( a->priority != b->priority )
     647        return a->priority > b->priority ? -1 : 1;
     648
     649    /* have a per-priority endgame */
     650    if( a->pendingRequestCount != b->pendingRequestCount )
     651        return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
    608652
    609653    /* fewer missing pieces goes first */
     
    611655        return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
    612656
    613     /* if one piece has a higher priority, it goes first */
    614     if( a->priority != b->priority )
    615         return a->priority > b->priority ? -1 : 1;
    616 
    617657    /* otherwise if one has fewer peers, it goes first */
    618658    if( a->peerCount != b->peerCount )
     
    626666}
    627667
    628 static int
    629 isPieceInteresting( const tr_torrent * tor,
    630                     tr_piece_index_t   piece )
    631 {
    632     if( tor->info.pieces[piece].dnd ) /* we don't want it */
    633         return 0;
    634 
    635     if( tr_cpPieceIsComplete( tor->completion, piece ) ) /* we have it */
    636         return 0;
    637 
    638     return 1;
    639 }
    640 
    641 static uint32_t*
    642 getPreferredPieces( Torrent *  t,
    643                     uint32_t * pieceCount )
    644 {
    645     const tr_torrent * tor = t->tor;
    646     const tr_info *    inf = &tor->info;
    647     tr_piece_index_t   i;
    648     uint32_t           poolSize = 0;
    649     uint32_t *         pool = tr_new( uint32_t, inf->pieceCount );
    650     int                peerCount;
    651     tr_peer**          peers;
     668static tr_piece_index_t *
     669getPreferredPieces( Torrent           * t,
     670                    tr_piece_index_t  * pieceCount )
     671{
     672    const tr_torrent  * tor = t->tor;
     673    const tr_info     * inf = &tor->info;
     674    tr_piece_index_t    i;
     675    tr_piece_index_t    poolSize = 0;
     676    tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
     677    int                 peerCount;
     678    tr_peer**           peers;
    652679
    653680    assert( torrentIsLocked( t ) );
     
    655682    peers = getConnectedPeers( t, &peerCount );
    656683
     684    /* make a list of the pieces that we want but don't have */
    657685    for( i = 0; i < inf->pieceCount; ++i )
    658         if( isPieceInteresting( tor, i ) )
     686        if( !tor->info.pieces[i].dnd && !tr_cpPieceIsComplete( tor->completion, i ) )
    659687            pool[poolSize++] = i;
    660688
     
    662690    if( poolSize > 1 )
    663691    {
    664         uint32_t                 j;
    665         struct tr_refill_piece * p = tr_new( struct tr_refill_piece,
    666                                              poolSize );
     692        tr_piece_index_t j;
     693        struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
    667694
    668695        for( j = 0; j < poolSize; ++j )
    669696        {
    670             int                      k;
    671             const tr_piece_index_t   piece = pool[j];
     697            int k;
     698            const tr_piece_index_t piece = pool[j];
    672699            struct tr_refill_piece * setme = p + j;
    673700
     
    677704            setme->random = tr_cryptoWeakRandInt( INT_MAX );
    678705            setme->missingBlockCount = tr_cpMissingBlocksInPiece( tor->completion, piece );
     706            setme->pendingRequestCount = getPieceRequests( t, piece );
    679707
    680708            for( k = 0; k < peerCount; ++k )
     
    702730}
    703731
    704 static uint64_t*
    705 getPreferredBlocks( Torrent * t, tr_block_index_t * setmeCount )
    706 {
    707     int s;
    708     uint32_t i;
    709     uint32_t pieceCount;
    710     uint32_t blockCount;
    711     uint32_t unreqCount[3], reqCount[3];
    712     uint32_t * pieces;
    713     uint64_t * ret, * walk;
    714     uint64_t * unreq[3], *req[3];
    715     const tr_torrent * tor = t->tor;
    716 
    717     assert( torrentIsLocked( t ) );
    718 
    719     pieces = getPreferredPieces( t, &pieceCount );
    720 
    721     /**
    722      * Now we walk through those preferred pieces to find all the blocks
    723      * are still missing from them.  We put unrequested blocks first,
    724      * of course, but by including requested blocks afterwards, endgame
    725      * handling happens naturally.
    726      *
    727      * By doing this once per priority we also effectively get an endgame
    728      * mode for each priority level.  The helps keep high priority files
    729      * from getting stuck at 99% due of unresponsive peers.
    730      */
    731 
    732     /* make temporary bins for the four tiers of blocks */
    733     for( i=0; i<3; ++i ) {
    734         req[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
    735         reqCount[i] = 0;
    736         unreq[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
    737         unreqCount[i] = 0;
    738     }
    739 
    740     /* sort the blocks into our temp bins */
    741     for( i=blockCount=0; i<pieceCount; ++i )
    742     {
    743         const tr_piece_index_t index = pieces[i];
    744         const int priorityIndex = tor->info.pieces[index].priority + 1;
    745         const tr_block_index_t begin = tr_torPieceFirstBlock( tor, index );
    746         const tr_block_index_t end = begin + tr_torPieceCountBlocks( tor, index );
     732struct tr_blockIterator
     733{
     734    Torrent * t;
     735    tr_block_index_t blockIndex, blockCount, *blocks;
     736    tr_piece_index_t pieceIndex, pieceCount, *pieces;
     737};
     738
     739static struct tr_blockIterator*
     740blockIteratorNew( Torrent * t )
     741{
     742    struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
     743    i->t = t;
     744    i->pieces = getPreferredPieces( t, &i->pieceCount );
     745    i->blocks = tr_new0( tr_block_index_t, t->tor->blockCount );
     746    return i;
     747}
     748
     749static int
     750blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
     751{
     752    int found;
     753    Torrent * t = i->t;
     754    tr_torrent * tor = t->tor;
     755
     756    while( ( i->blockIndex == i->blockCount )
     757        && ( i->pieceIndex < i->pieceCount ) )
     758    {
     759        const tr_piece_index_t index = i->pieces[i->pieceIndex++];
     760        const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
     761        const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
    747762        tr_block_index_t block;
    748763
    749         assert( tr_bitfieldTestFast( t->requestedBlocks, end-1 ) );
    750 
    751         for( block=begin; block<end; ++block )
    752         {
    753             if( tr_cpBlockIsComplete( tor->completion, block ) )
    754                 continue;
    755 
    756             ++blockCount;
    757 
    758             if( tr_bitfieldHasFast( t->requestedBlocks, block ) )
    759             {
    760                 const uint32_t n = reqCount[priorityIndex]++;
    761                 req[priorityIndex][n] = block;
    762             }
    763             else
    764             {
    765                 const uint32_t n = unreqCount[priorityIndex]++;
    766                 unreq[priorityIndex][n] = block;
    767             }
    768         }
    769     }
    770 
    771     /* join the bins together, going from highest priority to lowest so
    772      * the the blocks we want to request first will be first in the list */
    773     ret = walk = tr_new( uint64_t, blockCount );
    774     for( s=2; s>=0; --s ) {
    775         memcpy( walk, unreq[s], sizeof(uint64_t) * unreqCount[s] );
    776         walk += unreqCount[s];
    777         memcpy( walk, req[s], sizeof(uint64_t) * reqCount[s] );
    778         walk += reqCount[s];
    779     }
    780     assert( ( walk - ret ) == ( int )blockCount );
    781     *setmeCount = blockCount;
    782 
    783     /* cleanup */
    784     tr_free( pieces );
    785     for( i=0; i<3; ++i ) {
    786         tr_free( unreq[i] );
    787         tr_free( req[i] );
    788     }
    789     return ret;
     764        assert( index < tor->info.pieceCount );
     765
     766        i->blockCount = 0;
     767        i->blockIndex = 0;
     768        for( block=b; block!=e; ++block )
     769            if( !tr_cpBlockIsComplete( tor->completion, block ) )
     770                i->blocks[i->blockCount++] = block;
     771    }
     772
     773    if(( found = ( i->blockIndex < i->blockCount )))
     774        *setme = i->blocks[i->blockIndex++];
     775
     776    return found;
     777}
     778
     779static void
     780blockIteratorFree( struct tr_blockIterator * i )
     781{
     782    tr_free( i->pieces );
     783    tr_free( i->blocks );
     784    tr_free( i );
    790785}
    791786
     
    794789                           int *     setmeCount )
    795790{
    796     int        i;
    797     int        peerCount = 0;
    798     int        retCount = 0;
     791    int j;
     792    int peerCount = 0;
     793    int retCount = 0;
    799794    tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
    800795    tr_peer ** ret = tr_new( tr_peer *, peerCount );
    801796
    802     /* get a list of peers we're downloading from */
    803     for( i = 0; i < peerCount; ++i )
    804         if( clientIsDownloadingFrom( peers[i] ) )
    805             ret[retCount++] = peers[i];
    806 
    807     /* pick a different starting point each time so all peers
    808      * get a chance at the first blocks in the queue */
    809     if( retCount )
    810     {
    811         tr_peer ** tmp = tr_new( tr_peer *, retCount );
    812         i = tr_cryptoWeakRandInt( retCount );
    813         memcpy( tmp, ret, sizeof( tr_peer* ) * retCount );
    814         memcpy( ret, tmp + i, sizeof( tr_peer* ) * ( retCount - i ) );
    815         memcpy( ret + ( retCount - i ), tmp, sizeof( tr_peer* ) * i );
    816         tr_free( tmp );
    817     }
    818 
     797    j = 0; /* this is a temporary test to make sure we walk through all the peers */
     798    if( peerCount )
     799    {
     800        /* Get a list of peers we're downloading from.
     801           Pick a different starting point each time so all peers
     802           get a chance at the first blocks in the queue */
     803        const int fencepost = tr_cryptoWeakRandInt( peerCount );
     804        int i = fencepost;
     805        do {
     806            if( clientIsDownloadingFrom( peers[i] ) )
     807                ret[retCount++] = peers[i];
     808            i = ( i + 1 ) % peerCount;
     809            ++j;
     810        } while( i != fencepost );
     811    }
     812    assert( j == peerCount );
    819813    *setmeCount = retCount;
    820814    return ret;
     
    824818refillPulse( void * vtorrent )
    825819{
    826     Torrent *        t = vtorrent;
    827     tr_torrent *     tor = t->tor;
    828     tr_block_index_t i;
    829     int              peerCount;
    830     int              webseedCount;
    831     tr_peer **       peers;
    832     tr_webseed **    webseeds;
    833     tr_block_index_t blockCount;
    834     uint64_t       * blocks;
     820    tr_block_index_t block;
     821    int peerCount;
     822    int webseedCount;
     823    tr_peer ** peers;
     824    tr_webseed ** webseeds;
     825    struct tr_blockIterator * blockIterator;
     826    Torrent * t = vtorrent;
     827    tr_torrent * tor = t->tor;
    835828
    836829    if( !t->isRunning )
     
    842835    tordbg( t, "Refilling Request Buffers..." );
    843836
    844     blocks = getPreferredBlocks( t, &blockCount );
     837    blockIterator = blockIteratorNew( t );
    845838    peers = getPeersUploadingToClient( t, &peerCount );
    846839    webseedCount = tr_ptrArraySize( t->webseeds );
    847     webseeds = tr_memdup( tr_ptrArrayBase(
    848                              t->webseeds ), webseedCount *
    849                          sizeof( tr_webseed* ) );
    850 
    851     for( i = 0; ( webseedCount || peerCount ) && i < blockCount; ++i )
     840    webseeds = tr_memdup( tr_ptrArrayBase( t->webseeds ),
     841                          webseedCount * sizeof( tr_webseed* ) );
     842
     843    while( ( webseedCount || peerCount )
     844        && blockIteratorNext( blockIterator, &block ) )
    852845    {
    853846        int j;
    854847        int handled = FALSE;
    855848
    856         const tr_block_index_t block = blocks[i];
    857849        const tr_piece_index_t index = tr_torBlockPiece( tor, block );
    858850        const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
     
    865857
    866858        /* find a peer who can ask for this block */
    867         for( j = 0; !handled && j < peerCount; )
     859        for( j=0; !handled && j<peerCount; )
    868860        {
    869861            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
     
    881873
    882874                case TR_ADDREQ_OK:
    883                     tr_bitfieldAdd( t->requestedBlocks, block );
     875                    incrementPieceRequests( t, index );
    884876                    handled = TRUE;
    885877                    break;
     
    892884
    893885        /* maybe one of the webseeds can do it */
    894         for( j = 0; !handled && j < webseedCount; )
     886        for( j=0; !handled && j<webseedCount; )
    895887        {
    896888            const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, begin, length );
     
    902894
    903895                case TR_ADDREQ_OK:
    904                     tr_bitfieldAdd( t->requestedBlocks, block );
     896                    incrementPieceRequests( t, index );
    905897                    handled = TRUE;
    906898                    break;
     
    914906
    915907    /* cleanup */
     908    blockIteratorFree( blockIterator );
    916909    tr_free( webseeds );
    917910    tr_free( peers );
    918     tr_free( blocks );
    919911
    920912    t->refillTimer = NULL;
     
    993985
    994986        case TR_PEER_CANCEL:
    995             tr_bitfieldRem( t->requestedBlocks, _tr_block( t->tor, e->pieceIndex, e->offset ) );
     987            decrementPieceRequests( t, e->pieceIndex );
    996988            break;
    997989
     
    1005997            if( peer )
    1006998            {
    1007                 struct peer_atom * atom = getExistingAtom( t,
    1008                                                            &peer->in_addr );
    1009                 atom->piece_data_time = time( NULL );
     999                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
     1000                a->piece_data_time = now;
    10101001            }
    10111002            break;
     
    10271018            tr_statsAddDownloaded( tor->session, e->length );
    10281019            if( peer ) {
    1029                 struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
    1030                 atom->piece_data_time = time( NULL );
     1020                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
     1021                a->piece_data_time = now;
    10311022            }
    10321023            break;
     
    10661057
    10671058            tr_cpBlockAdd( tor->completion, block );
     1059            decrementPieceRequests( t, e->pieceIndex );
    10681060
    10691061            broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
     
    23062298/****
    23072299*****
    2308 *****
     2300*****  BANDWIDTH ALLOCATION
    23092301*****
    23102302****/
     
    23262318        usedBytes += history[i];
    23272319
    2328     n =
    2329         ( desiredAvgKB *
    2330           1024.0 ) *
    2331         ( BANDWIDTH_PULSE_HISTORY +
    2332           1.0 ) / BANDWIDTH_PULSES_PER_SECOND - usedBytes;
     2320    n = ( desiredAvgKB * 1024.0 )
     2321      * ( BANDWIDTH_PULSE_HISTORY + 1.0 )
     2322      / BANDWIDTH_PULSES_PER_SECOND
     2323      - usedBytes;
    23332324
    23342325    /* clamp the return value to lessen oscillation */
  • trunk/libtransmission/peer-msgs.c

    r6897 r6954  
    946946
    947947    /* if it's only in the queue and hasn't been sent yet, free it */
    948     if( !reqListRemove( &msgs->clientWillAskFor, &req ) )
     948    if( reqListRemove( &msgs->clientWillAskFor, &req ) )
    949949        fireCancelledReq( msgs, &req );
    950950
    951951    /* if it's already been sent, send a cancel message too */
    952     if( !reqListRemove( &msgs->clientAskedFor, &req ) ) {
     952    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
    953953        protocolSendCancel( msgs, &req );
    954954        fireCancelledReq( msgs, &req );
Note: See TracChangeset for help on using the changeset viewer.