Ignore:
Timestamp:
Nov 8, 2009, 11:20:00 PM (13 years ago)
Author:
charles
Message:

(trunk libT) #2548: T's request queue can send out too many duplicate requests

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/libtransmission/peer-mgr.c

    r9484 r9494  
    137137}
    138138
    139 struct tr_blockIterator
    140 {
    141     time_t expirationDate;
    142     struct tr_torrent_peers * t;
    143     tr_block_index_t blockIndex, blockCount, *blocks;
    144     tr_piece_index_t pieceIndex, pieceCount, *pieces;
     139struct block_request
     140{
     141    tr_block_index_t block;
     142    tr_peer * peer;
     143    time_t sentAt;
     144};
     145
     146struct weighted_piece
     147{
     148    tr_piece_index_t index;
     149    int16_t salt;
     150    int16_t requestCount;
    145151};
    146152
     
    156162    tr_torrent               * tor;
    157163    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
    158     struct tr_blockIterator  * refillQueue; /* used in refillPulse() */
    159164    struct tr_peerMgr        * manager;
    160     int                      * pendingRequestCount;
     165    //int                      * pendingRequestCount;
    161166
    162167    tr_bool                    isRunning;
     168
     169    struct block_request     * requests;
     170    int                        requestsSort;
     171    int                        requestCount;
     172    int                        requestAlloc;
     173
     174    struct weighted_piece    * pieces;
     175    int                        piecesSort;
     176    int                        pieceCount;
     177
     178    tr_bool                    isInEndgame;
    163179}
    164180Torrent;
     
    352368}
    353369
    354 static void
    355 peerDestructor( tr_peer * peer )
    356 {
    357     assert( peer );
     370static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
     371
     372static void
     373peerDestructor( Torrent * t, tr_peer * peer )
     374{
     375    assert( peer != NULL );
     376
     377    peerDeclinedAllRequests( t, peer );
    358378
    359379    if( peer->msgs != NULL )
     
    386406    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
    387407    assert( removed == peer );
    388     peerDestructor( removed );
     408    peerDestructor( t, removed );
    389409}
    390410
     
    395415        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
    396416}
    397 
    398 static void blockIteratorFree( struct tr_blockIterator ** inout );
    399417
    400418static void
     
    411429    evtimer_del( &t->refillTimer );
    412430
    413     blockIteratorFree( &t->refillQueue );
    414431    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
    415432    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
     
    417434    tr_ptrArrayDestruct( &t->peers, NULL );
    418435
    419     tr_free( t->pendingRequestCount );
     436    tr_free( t->requests );
     437    tr_free( t->pieces );
    420438    tr_free( t );
    421439}
    422440
    423441
    424 static void refillPulse( int, short, void* );
     442//static void refillPulse( int, short, void* );
    425443
    426444static void peerCallbackFunc( void * vpeer,
     
    442460    t->webseeds = TR_PTR_ARRAY_INIT;
    443461    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
    444     evtimer_set( &t->refillTimer, refillPulse, t );
     462    t->requests = 0;
     463    //evtimer_set( &t->refillTimer, refillPulse, t );
    445464
    446465
     
    534553}
    535554
    536 /****
    537 *****
    538 *****  REFILL
    539 *****
    540 ****/
    541 
    542 static void
    543 assertValidPiece( Torrent * t, tr_piece_index_t piece )
    544 {
    545     assert( t );
    546     assert( t->tor );
    547     assert( piece < t->tor->info.pieceCount );
    548 }
     555/**
     556***  REQUESTS
     557***
     558*** There are two data structures associated with managing block requests:
     559***
     560*** 1. Torrent::requests, an array of "struct block_request" which keeps
     561***    track of which blocks have been requested, and when, and by which peers.
     562***    This is list is used for (a) cancelling requests that have been pending
     563***    for too long and (b) avoiding duplicate requests before endgame.
     564***
     565*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
     566***    pieces that we want to request.  It's used to decide which pieces to
     567***    return next when tr_peerMgrGetBlockRequests() is called.
     568**/
     569
     570/**
     571*** struct block_request
     572**/
     573
     574enum
     575{
     576    REQ_UNSORTED,
     577    REQ_SORTED_BY_BLOCK,
     578    REQ_SORTED_BY_TIME
     579};
    549580
    550581static int
    551 getPieceRequests( Torrent * t, tr_piece_index_t piece )
    552 {
    553     assertValidPiece( t, piece );
    554 
    555     return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
    556 }
    557 
    558 static void
    559 incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
    560 {
    561     assertValidPiece( t, piece );
    562 
    563     if( t->pendingRequestCount == NULL )
    564         t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
    565     t->pendingRequestCount[piece]++;
    566 }
    567 
    568 static void
    569 decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
    570 {
    571     assertValidPiece( t, piece );
    572 
    573     if( t->pendingRequestCount )
    574         t->pendingRequestCount[piece]--;
    575 }
    576 
    577 struct tr_refill_piece
    578 {
    579     tr_priority_t    priority;
    580     uint32_t         piece;
    581     uint32_t         peerCount;
    582     int              random;
    583     int              pendingRequestCount;
    584     int              missingBlockCount;
     582compareReqByBlock( const void * va, const void * vb )
     583{
     584    const struct block_request * a = va;
     585    const struct block_request * b = vb;
     586    if( a->block < b->block ) return -1;
     587    if( a->block > b->block ) return 1;
     588    return 0;
     589}
     590
     591static int
     592compareReqByTime( const void * va, const void * vb )
     593{
     594    const struct block_request * a = va;
     595    const struct block_request * b = vb;
     596    if( a->sentAt < b->sentAt ) return -1;
     597    if( a->sentAt > b->sentAt ) return 1;
     598    return 0;
     599}
     600
     601static void
     602requestListSort( Torrent * t, int mode )
     603{
     604    assert( mode==REQ_SORTED_BY_BLOCK || mode==REQ_SORTED_BY_TIME );
     605
     606    if( t->requestsSort != mode )
     607    {
     608        int(*compar)(const void *, const void *);
     609
     610        t->requestsSort = mode;
     611
     612        switch( mode ) {
     613            case REQ_SORTED_BY_BLOCK: compar = compareReqByBlock; break;
     614            case REQ_SORTED_BY_TIME: compar = compareReqByTime; break;
     615            default: assert( 0 && "unhandled" );
     616        }
     617
     618//fprintf( stderr, "sorting requests by %s\n", (mode==REQ_SORTED_BY_BLOCK)?"block":"time" );
     619        qsort( t->requests, t->requestCount,
     620               sizeof( struct block_request ), compar );
     621    }
     622}
     623
     624static void
     625requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
     626{
     627    struct block_request key;
     628
     629    /* ensure enough room is available... */
     630    if( t->requestCount + 1 >= t->requestAlloc )
     631    {
     632        const int CHUNK_SIZE = 128;
     633        t->requestAlloc += CHUNK_SIZE;
     634        t->requests = tr_renew( struct block_request,
     635                                t->requests, t->requestAlloc );
     636    }
     637
     638    /* populate the record we're inserting */
     639    key.block = block;
     640    key.peer = peer;
     641    key.sentAt = time( NULL );
     642
     643    /* insert the request to our array... */
     644    switch( t->requestsSort )
     645    {
     646        case REQ_UNSORTED:
     647        case REQ_SORTED_BY_TIME:
     648            t->requests[t->requestCount++] = key;
     649            break;
     650
     651        case REQ_SORTED_BY_BLOCK: {
     652            tr_bool exact;
     653            const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
     654                                           sizeof( struct block_request ),
     655                                           compareReqByBlock, &exact );
     656            assert( !exact );
     657            memmove( t->requests + pos + 1,
     658                     t->requests + pos,
     659                     sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
     660            t->requests[pos] = key;
     661            break;
     662        }
     663    }
     664}
     665
     666static struct block_request *
     667requestListLookup( Torrent * t, tr_block_index_t block )
     668{
     669    struct block_request key;
     670    key.block = block;
     671
     672    requestListSort( t, REQ_SORTED_BY_BLOCK );
     673
     674    return bsearch( &key, t->requests, t->requestCount,
     675                    sizeof( struct block_request ),
     676                    compareReqByBlock );
     677}
     678
     679static void
     680requestListRemove( Torrent * t, tr_block_index_t block )
     681{
     682    const struct block_request * b = requestListLookup( t, block );
     683    if( b != NULL )
     684    {
     685        const int pos = b - t->requests;
     686        assert( pos < t->requestCount );
     687        memmove( t->requests + pos,
     688                 t->requests + pos + 1,
     689                 sizeof( struct block_request ) * ( --t->requestCount - pos ) );
     690    }
     691}
     692
     693/**
     694*** struct weighted_piece
     695**/
     696
     697enum
     698{
     699    PIECES_UNSORTED,
     700    PIECES_SORTED_BY_INDEX,
     701    PIECES_SORTED_BY_WEIGHT
    585702};
    586703
     704const tr_torrent * weightTorrent;
     705
     706/* we try to create a "weight" s.t. high-priority pieces come before others,
     707 * and that partially-complete pieces come before empty ones. */
    587708static int
    588 compareRefillPiece( const void * aIn, const void * bIn )
    589 {
    590     const struct tr_refill_piece * a = aIn;
    591     const struct tr_refill_piece * b = bIn;
    592 
    593     /* if one piece has a higher priority, it goes first */
    594     if( a->priority != b->priority )
    595         return a->priority > b->priority ? -1 : 1;
    596 
    597     /* have a per-priority endgame */
    598     if( a->pendingRequestCount != b->pendingRequestCount )
    599         return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
    600 
    601     /* fewer missing pieces goes first */
    602     if( a->missingBlockCount != b->missingBlockCount )
    603         return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
    604 
    605     /* otherwise if one has fewer peers, it goes first */
    606     if( a->peerCount != b->peerCount )
    607         return a->peerCount < b->peerCount ? -1 : 1;
    608 
    609     /* otherwise go with our random seed */
    610     if( a->random != b->random )
    611         return a->random < b->random ? -1 : 1;
    612 
     709comparePieceByWeight( const void * va, const void * vb )
     710{
     711    const struct weighted_piece * a = va;
     712    const struct weighted_piece * b = vb;
     713    int ia, ib, missing, pending;
     714    const tr_torrent * tor = weightTorrent;
     715
     716    /* primary key: weight */
     717    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
     718    pending = a->requestCount;
     719    ia = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
     720    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
     721    pending = b->requestCount;
     722    ib = missing > pending ? missing - pending : (int)(tor->blockCountInPiece + pending);
     723    if( ia < ib ) return -1;
     724    if( ia > ib ) return 1;
     725
     726    /* secondary key: higher priorities go first */
     727    ia = tor->info.pieces[a->index].priority;
     728    ib = tor->info.pieces[b->index].priority;
     729    if( ia > ib ) return -1;
     730    if( ia < ib ) return 1;
     731
     732    /* tertiary key: random */
     733    return a->salt - b->salt;
     734}
     735
     736static int
     737comparePieceByIndex( const void * va, const void * vb )
     738{
     739    const struct weighted_piece * a = va;
     740    const struct weighted_piece * b = vb;
     741    if( a->index < b->index ) return -1;
     742    if( a->index > b->index ) return 1;
    613743    return 0;
    614744}
    615745
    616 static tr_piece_index_t *
    617 getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
    618 {
    619     const tr_torrent  * tor = t->tor;
    620     const tr_info     * inf = &tor->info;
    621     tr_piece_index_t    i;
    622     tr_piece_index_t    poolSize = 0;
    623     tr_piece_index_t  * pool = tr_new( tr_piece_index_t , inf->pieceCount );
    624     int                 peerCount;
    625     const tr_peer    ** peers;
    626 
    627     assert( torrentIsLocked( t ) );
    628 
    629     peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
    630     peerCount = tr_ptrArraySize( &t->peers );
    631 
    632     /* make a list of the pieces that we want but don't have */
    633     for( i = 0; i < inf->pieceCount; ++i )
    634         if( !tor->info.pieces[i].dnd
    635                 && !tr_cpPieceIsComplete( &tor->completion, i ) )
    636             pool[poolSize++] = i;
    637 
    638     /* sort the pool by which to request next */
    639     if( poolSize > 1 )
    640     {
    641         tr_piece_index_t j;
    642         struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
    643 
    644         for( j = 0; j < poolSize; ++j )
     746static void
     747pieceListSort( Torrent * t, int mode )
     748{
     749    int(*compar)(const void *, const void *);
     750
     751    assert( mode==PIECES_SORTED_BY_INDEX
     752         || mode==PIECES_SORTED_BY_WEIGHT );
     753
     754    if( t->piecesSort != mode )
     755    {
     756//fprintf( stderr, "sort mode was %d, is now %d\n", t->piecesSort, mode );
     757        t->piecesSort = mode;
     758
     759        switch( mode ) {
     760            case PIECES_SORTED_BY_WEIGHT: compar = comparePieceByWeight; break;
     761            case PIECES_SORTED_BY_INDEX: compar = comparePieceByIndex; break;
     762            default: assert( 0 && "unhandled" );  break;
     763        }
     764
     765//fprintf( stderr, "sorting pieces by %s...\n", (mode==PIECES_SORTED_BY_WEIGHT)?"weight":"index" );
     766        weightTorrent = t->tor;
     767        qsort( t->pieces, t->pieceCount,
     768               sizeof( struct weighted_piece ), compar );
     769    }
     770
     771    /* Also, as long as we've got the pieces sorted by weight,
     772     * let's also update t.isInEndgame */
     773    if( t->piecesSort == PIECES_SORTED_BY_WEIGHT )
     774    {
     775        tr_bool endgame = TRUE;
     776
     777        if( ( t->pieces != NULL ) && ( t->pieceCount > 0 ) )
    645778        {
    646             int k;
    647             const tr_piece_index_t piece = pool[j];
    648             struct tr_refill_piece * setme = p + j;
    649 
    650             setme->piece = piece;
    651             setme->priority = inf->pieces[piece].priority;
    652             setme->peerCount = 0;
    653             setme->random = tr_cryptoWeakRandInt( INT_MAX );
    654             setme->pendingRequestCount = getPieceRequests( t, piece );
    655             setme->missingBlockCount
    656                          = tr_cpMissingBlocksInPiece( &tor->completion, piece );
    657 
    658             for( k = 0; k < peerCount; ++k )
     779            const tr_completion * cp = &t->tor->completion;
     780            const struct weighted_piece * p = t->pieces;
     781            const int pending = p->requestCount;
     782            const int missing = tr_cpMissingBlocksInPiece( cp, p->index );
     783            endgame = pending >= missing;
     784        }
     785
     786        t->isInEndgame = endgame;
     787    }
     788}
     789
     790static struct weighted_piece *
     791pieceListLookup( Torrent * t, tr_piece_index_t index )
     792{
     793    struct weighted_piece key;
     794    key.index = index;
     795
     796    pieceListSort( t, PIECES_SORTED_BY_INDEX );
     797
     798    return bsearch( &key, t->pieces, t->pieceCount,
     799                    sizeof( struct weighted_piece ),
     800                    comparePieceByIndex );
     801}
     802
     803static void
     804pieceListRebuild( Torrent * t )
     805{
     806    if( !tr_torrentIsSeed( t->tor ) )
     807    {
     808        tr_piece_index_t i;
     809        tr_piece_index_t * pool;
     810        tr_piece_index_t poolCount = 0;
     811        const tr_torrent * tor = t->tor;
     812        const tr_info * inf = tr_torrentInfo( tor );
     813        struct weighted_piece * pieces;
     814        int pieceCount;
     815
     816        /* build the new list */
     817        pool = tr_new( tr_piece_index_t, inf->pieceCount );
     818        for( i=0; i<inf->pieceCount; ++i )
     819            if( !inf->pieces[i].dnd )
     820                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
     821                    pool[poolCount++] = i;
     822        pieceCount = poolCount;
     823        pieces = tr_new0( struct weighted_piece, pieceCount );
     824        for( i=0; i<poolCount; ++i ) {
     825            struct weighted_piece * piece = pieces + i;
     826            piece->index = pool[i];
     827            piece->requestCount = 0;
     828            piece->salt = tr_cryptoWeakRandInt( 255 );
     829        }
     830
     831        /* if we already had a list of pieces, merge it into
     832         * the new list so we don't lose its requestCounts */
     833        if( t->pieces != NULL )
     834        {
     835            struct weighted_piece * o = t->pieces;
     836            struct weighted_piece * oend = o + t->pieceCount;
     837            struct weighted_piece * n = pieces;
     838            struct weighted_piece * nend = n + pieceCount;
     839
     840            pieceListSort( t, PIECES_SORTED_BY_INDEX );
     841
     842            while( o!=oend && n!=nend ) {
     843                if( o->index < n->index )
     844                    ++o;
     845                else if( o->index > n->index )
     846                    ++n;
     847                else
     848                    *n++ = *o++;
     849            }
     850
     851            tr_free( t->pieces );
     852        }
     853
     854        t->pieces = pieces;
     855        t->pieceCount = pieceCount;
     856        t->piecesSort = PIECES_SORTED_BY_INDEX;
     857
     858        /* cleanup */
     859        tr_free( pool );
     860    }
     861}
     862
     863static void
     864pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
     865{
     866    struct weighted_piece * p = pieceListLookup( t, piece );
     867
     868    if( p != NULL )
     869    {
     870        const int pos = p - t->pieces;
     871
     872        memmove( t->pieces + pos,
     873                 t->pieces + pos + 1,
     874                 sizeof( struct weighted_piece ) * ( --t->pieceCount - pos ) );
     875
     876        if( t->pieceCount == 0 )
     877        {
     878            tr_free( t->pieces );
     879            t->pieces = NULL;
     880        }
     881    }
     882}
     883
     884static void
     885pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
     886{
     887    struct weighted_piece * p;
     888    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
     889
     890    if(( p = pieceListLookup( t, index )))
     891        if( p->requestCount > 0 )
     892            --p->requestCount;
     893
     894    /* note: this invalidates the weighted.piece.weight field,
     895     * but that's OK since the call to pieceListLookup ensured
     896     * that we were sorted by index anyway.. next time we resort
     897     * by weight, pieceListSort() will update the weights */
     898}
     899
     900/**
     901***
     902**/
     903
     904void
     905tr_peerMgrRebuildRequests( tr_torrent * tor )
     906{
     907    assert( tr_isTorrent( tor ) );
     908
     909    pieceListRebuild( tor->torrentPeers );
     910}
     911
     912void
     913tr_peerMgrGetNextRequests( tr_torrent           * tor,
     914                           tr_peer              * peer,
     915                           int                    numwant,
     916                           tr_block_index_t     * setme,
     917                           int                  * numgot )
     918{
     919    int i;
     920    int got;
     921    Torrent * t;
     922    struct weighted_piece * pieces;
     923    const tr_bitfield * have = peer->have;
     924
     925    /* sanity clause */
     926    assert( tr_isTorrent( tor ) );
     927    assert( numwant > 0 );
     928
     929    /* walk through the pieces and find blocks that should be requested */
     930    got = 0;
     931    t = tor->torrentPeers;
     932
     933    /* prep the pieces list */
     934    if( t->pieces == NULL )
     935        pieceListRebuild( t );
     936    pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
     937//if( t->isInEndgame ) fprintf( stderr, "endgame\n" );
     938
     939#if 0
     940{
     941int i=0, n=MIN(10,t->pieceCount);
     942fprintf( stderr, "the next pieces we want to request are " );
     943for( i=0; i<n; i++ ) fprintf( stderr, "%d(weight:%d) ", (int)t->pieces[i].index, (int)t->pieces[i].weight );
     944fprintf( stderr, "\n" );
     945}
     946#endif
     947
     948    pieces = t->pieces;
     949    for( i=0; i<t->pieceCount && got<numwant; ++i )
     950    {
     951        struct weighted_piece * p = pieces + i;
     952
     953        /* if the peer has this piece that we want... */
     954        if( tr_bitfieldHasFast( have, p->index ) )
     955        {
     956            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
     957            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
     958
     959            for( ; b!=e && got<numwant; ++b )
    659960            {
    660                 const tr_peer * peer = peers[k];
    661                 if( peer->peerIsInterested
    662                         && !peer->clientIsChoked
    663                         && tr_bitfieldHas( peer->have, piece ) )
    664                     ++setme->peerCount;
     961                struct block_request * breq;
     962
     963                /* don't request blocks we've already got */
     964                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
     965                    continue;
     966
     967                /* don't request blocks we've already requested (FIXME) */
     968                breq = requestListLookup( t, b );
     969                if( breq != NULL ) {
     970                    assert( breq->peer != NULL );
     971                    if( breq->peer == peer ) continue;
     972                    if( !t->isInEndgame ) continue;
     973                }
     974
     975                setme[got++] = b;
     976//fprintf( stderr, "peer %p is requesting block %"PRIu64"\n", peer, b );
     977
     978                /* update our own tables */
     979                if( breq == NULL )
     980                    requestListAdd( t, b, peer );
     981                ++p->requestCount;
    665982            }
    666983        }
    667 
    668         qsort( p, poolSize, sizeof( struct tr_refill_piece ),
    669                compareRefillPiece );
    670 
    671         for( j = 0; j < poolSize; ++j )
    672             pool[j] = p[j].piece;
    673 
    674         tr_free( p );
    675     }
    676 
    677     *pieceCount = poolSize;
    678     return pool;
    679 }
    680 
    681 static struct tr_blockIterator*
    682 blockIteratorNew( Torrent * t )
    683 {
    684     struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
    685     i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS;
    686     i->t = t;
    687     i->pieces = getPreferredPieces( t, &i->pieceCount );
    688     i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
    689     tordbg( t, "creating new refill queue.. it contains %"PRIu32" pieces", i->pieceCount );
    690     return i;
    691 }
    692 
    693 static tr_bool
    694 blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
    695 {
    696     tr_bool found;
    697     Torrent * t = i->t;
    698     tr_torrent * tor = t->tor;
    699 
    700     while( ( i->blockIndex == i->blockCount )
    701         && ( i->pieceIndex < i->pieceCount ) )
    702     {
    703         const tr_piece_index_t index = i->pieces[i->pieceIndex++];
    704         const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
    705         const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
    706         tr_block_index_t block;
    707 
    708         assert( index < tor->info.pieceCount );
    709 
    710         i->blockCount = 0;
    711         i->blockIndex = 0;
    712         for( block=b; block!=e; ++block )
    713             if( !tr_cpBlockIsCompleteFast( &tor->completion, block ) )
    714                 i->blocks[i->blockCount++] = block;
    715     }
    716 
    717     assert( i->blockCount <= tor->blockCountInPiece );
    718 
    719     if(( found = ( i->blockIndex < i->blockCount )))
    720         *setme = i->blocks[i->blockIndex++];
    721 
    722     return found;
    723 }
    724 
    725 static void
    726 blockIteratorSkipCurrentPiece( struct tr_blockIterator * i )
    727 {
    728     i->blockIndex = i->blockCount;
    729 }
    730 
    731 static void
    732 blockIteratorFree( struct tr_blockIterator ** inout )
    733 {
    734     struct tr_blockIterator * it = *inout;
    735 
    736     if( it != NULL )
    737     {
    738         tr_free( it->blocks );
    739         tr_free( it->pieces );
    740         tr_free( it );
    741     }
    742 
    743     *inout = NULL;
    744 }
    745 
    746 static tr_peer**
    747 getPeersUploadingToClient( Torrent * t,
    748                            int *     setmeCount )
    749 {
    750     int j;
    751     int peerCount = 0;
    752     int retCount = 0;
    753     tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
    754     tr_peer ** ret = tr_new( tr_peer *, peerCount );
    755 
    756     j = 0; /* this is a temporary test to make sure we walk through all the peers */
    757     if( peerCount )
    758     {
    759         /* Get a list of peers we're downloading from.
    760            Pick a different starting point each time so all peers
    761            get a chance at being the first in line */
    762         const int fencepost = tr_cryptoWeakRandInt( peerCount );
    763         int i = fencepost;
    764         do {
    765             if( clientIsDownloadingFrom( peers[i] ) )
    766                 ret[retCount++] = peers[i];
    767             i = ( i + 1 ) % peerCount;
    768             ++j;
    769         } while( i != fencepost );
    770     }
    771     assert( j == peerCount );
    772     *setmeCount = retCount;
    773     return ret;
    774 }
    775 
    776 static uint32_t
    777 getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
    778 {
    779     const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
    780     const uint64_t blockPos = tor->blockSize * b;
    781     assert( blockPos >= piecePos );
    782     return (uint32_t)( blockPos - piecePos );
    783 }
    784 
     984    }
     985
     986    /* We almost always change only a handful of pieces in the array.
     987     * In these cases, it's cheaper to sort those changed pieces and merge,
     988     * than qsort()ing the whole array again */
     989    if( got > 0 )
     990    {
     991        struct weighted_piece * p;
     992        struct weighted_piece * pieces;
     993        struct weighted_piece * a = t->pieces;
     994        struct weighted_piece * a_end = t->pieces + i;
     995        struct weighted_piece * b = a_end;
     996        struct weighted_piece * b_end = t->pieces + t->pieceCount;
     997
     998        /* rescore the pieces that we changed */
     999        weightTorrent = t->tor;
     1000//fprintf( stderr, "sorting %d changed pieces...\n", (int)(a_end-a) );
     1001        qsort( a, a_end-a, sizeof( struct weighted_piece ), comparePieceByWeight );
     1002
     1003        /* allocate a new array */
     1004        p = pieces = tr_new( struct weighted_piece, t->pieceCount );
     1005
     1006        /* merge the two sorted arrays into this new array */
     1007        weightTorrent = t->tor;
     1008        while( a!=a_end && b!=b_end )
     1009            *p++ = comparePieceByWeight( a, b ) < 0 ? *a++ : *b++;
     1010        while( a!=a_end ) *p++ = *a++;
     1011        while( b!=b_end ) *p++ = *b++;
     1012
     1013#if 0
     1014        /* make sure we did it right */
     1015        assert( p - pieces == t->pieceCount );
     1016        for( it=pieces; it+1<p; ++it )
     1017            assert( it->weight <= it[1].weight );
     1018#endif
     1019
     1020        /* update */
     1021        tr_free( t->pieces );
     1022        t->pieces = pieces;
     1023    }
     1024
     1025    //fprintf( stderr, "peer %p wanted %d requests; got %d\n", peer, numwant, got );
     1026    *numgot = got;
     1027}
     1028
     1029tr_bool
     1030tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
     1031                          const tr_peer     * peer,
     1032                          tr_block_index_t    block )
     1033{
     1034    const Torrent * t = tor->torrentPeers;
     1035    const struct block_request * b = requestListLookup( (Torrent*)t, block );
     1036    if( b == NULL ) return FALSE;
     1037    if( b->peer == peer ) return TRUE;
     1038    if( t->isInEndgame ) return TRUE;
     1039    return FALSE;
     1040}
     1041
     1042/* cancel requests that are too old */
    7851043static int
    7861044refillUpkeep( void * vmgr )
    7871045{
    788     tr_torrent * tor = NULL;
     1046    time_t now;
     1047    time_t too_old;
     1048    tr_torrent * tor;
    7891049    tr_peerMgr * mgr = vmgr;
    790     time_t now;
    7911050    managerLock( mgr );
    7921051
    7931052    now = time( NULL );
    794     while(( tor = tr_torrentNext( mgr->session, tor ))) {
     1053    too_old = now - REQUEST_TTL_SECS;
     1054
     1055    tor = NULL;
     1056    while(( tor = tr_torrentNext( mgr->session, tor )))
     1057    {
    7951058        Torrent * t = tor->torrentPeers;
    796         if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) {
    797             tordbg( t, "refill queue is past its shelf date; discarding." );
    798             blockIteratorFree( &t->refillQueue );
     1059        const int n = t->requestCount;
     1060        if( n > 0 )
     1061        {
     1062            int keepCount = 0;
     1063            int cancelCount = 0;
     1064            struct block_request * keep = tr_new( struct block_request, n );
     1065            struct block_request * cancel = tr_new( struct block_request, n );
     1066            const struct block_request * it;
     1067            const struct block_request * end;
     1068
     1069            for( it=t->requests, end=it+n; it!=end; ++it )
     1070                if( it->sentAt <= too_old )
     1071                    cancel[cancelCount++] = *it;
     1072                else
     1073                    keep[keepCount++] = *it;
     1074
     1075            /* prune out the ones we aren't keeping */
     1076            tr_free( t->requests );
     1077            t->requests = keep;
     1078            t->requestCount = keepCount;
     1079            t->requestAlloc = n;
     1080
     1081            /* send cancel messages for all the "cancel" ones */
     1082            for( it=cancel, end=it+cancelCount; it!=end; ++it )
     1083                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) )
     1084                    tr_peerMsgsCancel( it->peer->msgs, it->block );
     1085
     1086            /* decrement the pending request counts for the timed-out blocks */
     1087            for( it=cancel, end=it+cancelCount; it!=end; ++it )
     1088                pieceListRemoveRequest( t, it->block );
     1089
     1090            /* cleanup loop */
     1091            tr_free( cancel );
    7991092        }
    8001093    }
     
    8021095    managerUnlock( mgr );
    8031096    return TRUE;
    804 }
    805 
    806 static void
    807 sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now );
    808 
    809 static void
    810 refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent )
    811 {
    812     tr_block_index_t block;
    813     int peerCount;
    814     int webseedCount;
    815     tr_peer ** peers;
    816     tr_webseed ** webseeds;
    817     Torrent * t = vtorrent;
    818     tr_torrent * tor = t->tor;
    819     tr_bool hasNext = TRUE;
    820 
    821     if( !t->isRunning )
    822         return;
    823     if( tr_torrentIsSeed( t->tor ) )
    824         return;
    825 
    826     torrentLock( t );
    827     tordbg( t, "Refilling Request Buffers..." );
    828 
    829     if( t->refillQueue == NULL )
    830         t->refillQueue = blockIteratorNew( t );
    831 
    832     peers = getPeersUploadingToClient( t, &peerCount );
    833     sortPeersByLiveliness( peers, NULL, peerCount, tr_date( ) );
    834     webseedCount = tr_ptrArraySize( &t->webseeds );
    835     webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
    836                           webseedCount * sizeof( tr_webseed* ) );
    837 
    838     while( ( webseedCount || peerCount )
    839         && (( hasNext = blockIteratorNext( t->refillQueue, &block ))) )
    840     {
    841         int j;
    842         tr_bool handled = FALSE;
    843 
    844         const tr_piece_index_t index = tr_torBlockPiece( tor, block );
    845         const uint32_t offset = getBlockOffsetInPiece( tor, block );
    846         const uint32_t length = tr_torBlockCountBytes( tor, block );
    847 
    848         assert( block < tor->blockCount );
    849 
    850         /* find a peer who can ask for this block */
    851         for( j=0; !handled && j<peerCount; )
    852         {
    853             const tr_addreq_t val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length );
    854             switch( val )
    855             {
    856                 case TR_ADDREQ_FULL:
    857                 case TR_ADDREQ_CLIENT_CHOKED:
    858                     peers[j] = peers[--peerCount];
    859                     break;
    860 
    861                 case TR_ADDREQ_MISSING:
    862                 case TR_ADDREQ_DUPLICATE:
    863                     ++j;
    864                     break;
    865 
    866                 case TR_ADDREQ_OK:
    867                     incrementPieceRequests( t, index );
    868                     handled = TRUE;
    869                     break;
    870 
    871                 default:
    872                     assert( 0 && "unhandled value" );
    873                     break;
    874             }
    875         }
    876 
    877         /* maybe one of the webseeds can do it */
    878         for( j=0; !handled && j<webseedCount; )
    879         {
    880             const tr_addreq_t val = tr_webseedAddRequest( webseeds[j], index, offset, length );
    881             switch( val )
    882             {
    883                 case TR_ADDREQ_FULL:
    884                     webseeds[j] = webseeds[--webseedCount];
    885                     break;
    886 
    887                 case TR_ADDREQ_OK:
    888                     incrementPieceRequests( t, index );
    889                     handled = TRUE;
    890                     break;
    891 
    892                 default:
    893                     assert( 0 && "unhandled value" );
    894                     break;
    895             }
    896         }
    897 
    898         if( !handled )
    899             blockIteratorSkipCurrentPiece( t->refillQueue );
    900     }
    901 
    902     /* cleanup */
    903     tr_free( webseeds );
    904     tr_free( peers );
    905 
    906     if( !hasNext ) {
    907         tordbg( t, "refill queue has no more blocks to request... freeing (webseed count: %d, peer count: %d)", webseedCount, peerCount );
    908         blockIteratorFree( &t->refillQueue );
    909     }
    910 
    911     torrentUnlock( t );
    912 }
    913 
    914 static void
    915 broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
    916 {
    917     size_t i;
    918     size_t peerCount;
    919     tr_peer ** peers;
    920 
    921     assert( torrentIsLocked( t ) );
    922 
    923     tordbg( t, "got a block; cancelling any duplicate requests from peers %"PRIu32":%"PRIu32"->%"PRIu32, index, offset, length );
    924 
    925     peerCount = tr_ptrArraySize( &t->peers );
    926     peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
    927     for( i=0; i<peerCount; ++i )
    928         if( peers[i]->msgs )
    929             tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
    9301097}
    9311098
     
    9461113
    9471114static void
    948 gotBadPiece( Torrent *        t,
    949              tr_piece_index_t pieceIndex )
     1115gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
    9501116{
    9511117    tr_torrent *   tor = t->tor;
     
    9541120    tor->corruptCur += byteCount;
    9551121    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
    956 }
    957 
    958 static void
    959 refillSoon( Torrent * t )
    960 {
    961     if( !evtimer_pending( &t->refillTimer, NULL ) )
    962         tr_timerAdd( &t->refillTimer, 0, REFILL_PERIOD_MSEC );
    9631122}
    9641123
     
    10091168    }
    10101169#endif
     1170}
     1171
     1172static void
     1173decrementDownloadedCount( tr_torrent * tor, uint32_t byteCount )
     1174{
     1175    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
     1176}
     1177
     1178static void
     1179clientGotUnwantedBlock( tr_torrent * tor, tr_block_index_t block )
     1180{
     1181    decrementDownloadedCount( tor, tr_torBlockCountBytes( tor, block ) );
     1182}
     1183
     1184static void
     1185removeRequestFromTables( Torrent * t, tr_block_index_t block )
     1186{
     1187    requestListRemove( t, block );
     1188    pieceListRemoveRequest( t, block );
     1189}
     1190
     1191/* peer choked us, or maybe it disconnected.
     1192   either way we need to remove all its requests */
     1193static void
     1194peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
     1195{
     1196    int i, n;
     1197    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
     1198
     1199    for( i=n=0; i<t->requestCount; ++i )
     1200        if( peer == t->requests[i].peer )
     1201            blocks[n++] = t->requests[i].block;
     1202
     1203    for( i=0; i<n; ++i )
     1204        removeRequestFromTables( t, blocks[i] );
     1205
     1206    tr_free( blocks );
    10111207}
    10121208
     
    10351231            break;
    10361232
    1037         case TR_PEER_NEED_REQ:
    1038             refillSoon( t );
    1039             break;
    1040 
    1041         case TR_PEER_CANCEL:
    1042             decrementPieceRequests( t, e->pieceIndex );
    1043             break;
    1044 
    10451233        case TR_PEER_PEER_GOT_DATA:
    10461234        {
     
    10671255            break;
    10681256        }
     1257
     1258        case TR_PEER_CLIENT_GOT_REJ:
     1259            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ) );
     1260            break;
     1261
     1262        case TR_PEER_CLIENT_GOT_CHOKE:
     1263            peerDeclinedAllRequests( t, peer );
     1264            break;
    10691265
    10701266        case TR_PEER_CLIENT_GOT_PORT:
     
    11291325        {
    11301326            tr_torrent * tor = t->tor;
    1131 
    11321327            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
    1133 
    1134             tr_cpBlockAdd( &tor->completion, block );
    1135             tr_torrentSetDirty( tor );
    1136             decrementPieceRequests( t, e->pieceIndex );
    1137 
    1138             broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
    1139 
    1140             if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
     1328//static int numBlocks = 0;
     1329//fprintf( stderr, "got a total of %d blocks\n", ++numBlocks );
     1330
     1331            requestListRemove( t, block );
     1332            pieceListRemoveRequest( t, block );
     1333
     1334            if( tr_cpBlockIsComplete( &tor->completion, block ) )
    11411335            {
    1142                 const tr_piece_index_t p = e->pieceIndex;
    1143                 const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
    1144 
    1145                 if( !ok )
     1336                tordbg( t, "we have this block already..." );
     1337                clientGotUnwantedBlock( tor, block );
     1338            }
     1339            else
     1340            {
     1341                tr_cpBlockAdd( &tor->completion, block );
     1342                tr_torrentSetDirty( tor );
     1343
     1344                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
    11461345                {
    1147                     tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
    1148                                (unsigned long)p );
    1149                 }
    1150 
    1151                 tr_torrentSetHasPiece( tor, p, ok );
    1152                 tr_torrentSetPieceChecked( tor, p, TRUE );
    1153                 tr_peerMgrSetBlame( tor, p, ok );
    1154 
    1155                 if( !ok )
    1156                 {
    1157                     gotBadPiece( t, p );
    1158                 }
    1159                 else
    1160                 {
    1161                     int i;
    1162                     int peerCount;
    1163                     tr_peer ** peers;
    1164                     tr_file_index_t fileIndex;
    1165 
    1166                     peerCount = tr_ptrArraySize( &t->peers );
    1167                     peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
    1168                     for( i=0; i<peerCount; ++i )
    1169                         tr_peerMsgsHave( peers[i]->msgs, p );
    1170 
    1171                     for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
    1172                         const tr_file * file = &tor->info.files[fileIndex];
    1173                         if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) )
    1174                             if( tr_cpFileIsComplete( &tor->completion, fileIndex ) )
    1175                                 tr_torrentFileCompleted( tor, fileIndex );
     1346                    const tr_piece_index_t p = e->pieceIndex;
     1347                    const tr_bool ok = tr_ioTestPiece( tor, p, NULL, 0 );
     1348//fprintf( stderr, "we now have piece #%d\n", (int)p );
     1349
     1350                    if( !ok )
     1351                    {
     1352                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
     1353                                   (unsigned long)p );
     1354                    }
     1355
     1356                    tr_torrentSetHasPiece( tor, p, ok );
     1357                    tr_torrentSetPieceChecked( tor, p, TRUE );
     1358                    tr_peerMgrSetBlame( tor, p, ok );
     1359
     1360                    if( !ok )
     1361                    {
     1362                        gotBadPiece( t, p );
     1363                    }
     1364                    else
     1365                    {
     1366                        int i;
     1367                        int peerCount;
     1368                        tr_peer ** peers;
     1369                        tr_file_index_t fileIndex;
     1370
     1371                        peerCount = tr_ptrArraySize( &t->peers );
     1372                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
     1373                        for( i=0; i<peerCount; ++i )
     1374                            tr_peerMsgsHave( peers[i]->msgs, p );
     1375
     1376                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
     1377                            const tr_file * file = &tor->info.files[fileIndex];
     1378                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) )
     1379                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) )
     1380                                    tr_torrentFileCompleted( tor, fileIndex );
     1381                        }
     1382
     1383                        pieceListRemovePiece( t, p );
    11761384                    }
    11771385                }
     
    16351843    ensureMgrTimersExist( t->manager );
    16361844
    1637     if( !t->isRunning )
    1638     {
    1639         t->isRunning = TRUE;
    1640 
    1641         if( !tr_ptrArrayEmpty( &t->webseeds ) )
    1642             refillSoon( t );
    1643     }
     1845    t->isRunning = TRUE;
    16441846
    16451847    rechokePulse( t->manager );
     
    16501852stopTorrent( Torrent * t )
    16511853{
     1854    int i, n;
     1855
    16521856    assert( torrentIsLocked( t ) );
    16531857
     
    16551859
    16561860    /* disconnect the peers. */
    1657     tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
     1861    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
     1862        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
    16581863    tr_ptrArrayClear( &t->peers );
    16591864
Note: See TracChangeset for help on using the changeset viewer.