Changeset 6101


Ignore:
Timestamp:
Jun 9, 2008, 10:53:45 PM (13 years ago)
Author:
charles
Message:

libT: first cut at implementing the internal peers' request queues as pieces rather than blocks, as discussed with erdgeist and denis, to avoid a couple of nasty CPU bottlenecks.

Location:
trunk/libtransmission
Files:
10 edited

Legend:

Unmodified
Added
Removed
  • trunk/libtransmission/peer-common.h

    r6073 r6101  
    4949{
    5050    PeerEventType eventType;
    51     uint32_t pieceIndex; /* for GOT_BLOCK */
     51    uint32_t pieceIndex; /* for GOT_BLOCK, CANCEL */
    5252    uint32_t offset;     /* for GOT_BLOCK */
    5353    uint32_t length;     /* for GOT_BLOCK + GOT_DATA */
  • trunk/libtransmission/peer-mgr.c

    r6074 r6101  
    113113    tr_torrent * tor;
    114114    tr_peer * optimistic; /* the optimistic peer, or NULL if none */
    115     tr_bitfield * requested;
     115    tr_bitfield * requestedPieces;
    116116
    117117    unsigned int isRunning : 1;
     
    367367    tr_timerFree( &t->refillTimer );
    368368
    369     tr_bitfieldFree( t->requested );
     369    tr_bitfieldFree( t->requestedPieces );
    370370    tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
    371371    tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
     
    391391    t->webseeds = tr_ptrArrayNew( );
    392392    t->outgoingHandshakes = tr_ptrArrayNew( );
    393     t->requested = tr_bitfieldNew( tor->blockCount );
     393    t->requestedPieces = tr_bitfieldNew( tor->info.pieceCount );
    394394    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
    395395
     
    544544    const struct tr_refill_piece * b = bIn;
    545545
    546     /* fewer missing pieces goes first */
    547     if( a->missingBlockCount != b->missingBlockCount )
    548         return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
    549    
    550546    /* if one piece has a higher priority, it goes first */
    551547    if( a->priority != b->priority )
     
    556552        return a->peerCount < b->peerCount ? -1 : 1;
    557553
    558 #if 0
    559     /* otherwise download them in order */
    560     return tr_compareUint16( a->piece, b->piece );
    561 #else
    562554    /* otherwise go with our random seed */
    563555    return tr_compareUint16( a->random, b->random );
    564 #endif
    565556}
    566557
     
    635626    *pieceCount = poolSize;
    636627    return pool;
    637 }
    638 
    639 static uint64_t*
    640 getPreferredBlocks( Torrent * t, tr_block_index_t * setmeCount )
    641 {
    642     int s;
    643     uint32_t i;
    644     uint32_t pieceCount;
    645     uint32_t blockCount;
    646     uint32_t unreqCount[3], reqCount[3];
    647     uint32_t * pieces;
    648     uint64_t * ret, * walk;
    649     uint64_t * unreq[3], *req[3];
    650     const tr_torrent * tor = t->tor;
    651 
    652     assert( torrentIsLocked( t ) );
    653 
    654     pieces = getPreferredPieces( t, &pieceCount );
    655 
    656     /**
    657      * Now we walk through those preferred pieces to find all the blocks
    658      * are still missing from them.  We put unrequested blocks first,
    659      * of course, but by including requested blocks afterwards, endgame
    660      * handling happens naturally.
    661      *
    662      * By doing this once per priority we also effectively get an endgame
    663      * mode for each priority level.  The helps keep high priority files
    664      * from getting stuck at 99% due of unresponsive peers.
    665      */
    666 
    667     /* make temporary bins for the four tiers of blocks */
    668     for( i=0; i<3; ++i ) {
    669         req[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
    670         reqCount[i] = 0;
    671         unreq[i] = tr_new( uint64_t, pieceCount *  tor->blockCountInPiece );
    672         unreqCount[i] = 0;
    673     }
    674 
    675     /* sort the blocks into our temp bins */
    676     for( i=blockCount=0; i<pieceCount; ++i )
    677     {
    678         const tr_piece_index_t index = pieces[i];
    679         const int priorityIndex = tor->info.pieces[index].priority + 1;
    680         const tr_block_index_t begin = tr_torPieceFirstBlock( tor, index );
    681         const tr_block_index_t end = begin + tr_torPieceCountBlocks( tor, index );
    682         tr_block_index_t block;
    683 
    684         assert( tr_bitfieldTestFast( t->requested, end-1 ) );
    685 
    686         for( block=begin; block<end; ++block )
    687         {
    688             if( tr_cpBlockIsComplete( tor->completion, block ) )
    689                 continue;
    690 
    691             ++blockCount;
    692 
    693             if( tr_bitfieldHasFast( t->requested, block ) )
    694             {
    695                 const uint32_t n = reqCount[priorityIndex]++;
    696                 req[priorityIndex][n] = block;
    697             }
    698             else
    699             {
    700                 const uint32_t n = unreqCount[priorityIndex]++;
    701                 unreq[priorityIndex][n] = block;
    702             }
    703         }
    704     }
    705 
    706     /* join the bins together, going from highest priority to lowest so
    707      * the the blocks we want to request first will be first in the list */
    708     ret = walk = tr_new( uint64_t, blockCount );
    709     for( s=2; s>=0; --s ) {
    710         memcpy( walk, unreq[s], sizeof(uint64_t) * unreqCount[s] );
    711         walk += unreqCount[s];
    712         memcpy( walk, req[s], sizeof(uint64_t) * reqCount[s] );
    713         walk += reqCount[s];
    714     }
    715     assert( ( walk - ret ) == ( int )blockCount );
    716     *setmeCount = blockCount;
    717 
    718     /* cleanup */
    719     tr_free( pieces );
    720     for( i=0; i<3; ++i ) {
    721         tr_free( unreq[i] );
    722         tr_free( req[i] );
    723     }
    724     return ret;
    725628}
    726629
     
    759662    Torrent * t = vtorrent;
    760663    tr_torrent * tor = t->tor;
    761     tr_block_index_t i;
    762664    int peerCount;
    763665    int webseedCount;
    764666    tr_peer ** peers;
    765667    tr_webseed ** webseeds;
    766     tr_block_index_t blockCount;
    767     uint64_t * blocks;
     668    uint32_t pieceCount;
     669    uint32_t * pieces;
     670    tr_piece_index_t i;
    768671
    769672    if( !t->isRunning )
     
    775678    tordbg( t, "Refilling Request Buffers..." );
    776679
    777     blocks = getPreferredBlocks( t, &blockCount );
     680    pieces = getPreferredPieces( t, &pieceCount );
    778681    peers = getPeersUploadingToClient( t, &peerCount );
    779682    webseedCount = tr_ptrArraySize( t->webseeds );
    780683    webseeds = tr_memdup( tr_ptrArrayBase(t->webseeds), webseedCount*sizeof(tr_webseed*) );
    781684
    782     for( i=0; (webseedCount || peerCount) && i<blockCount; ++i )
     685    for( i=0; (webseedCount || peerCount) && i<pieceCount; ++i )
    783686    {
    784687        int j;
    785688        int handled = FALSE;
    786 
    787         const tr_block_index_t block = blocks[i];
    788         const tr_piece_index_t index = tr_torBlockPiece( tor, block );
    789         const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
    790         const uint32_t length = tr_torBlockCountBytes( tor, block );
    791 
    792         assert( tr_torrentReqIsValid( tor, index, begin, length ) );
    793         assert( _tr_block( tor, index, begin ) == block );
    794         assert( begin < tr_torPieceCountBytes( tor, index ) );
    795         assert( (begin + length) <= tr_torPieceCountBytes( tor, index ) );
    796 
    797         /* find a peer who can ask for this block */
     689        const tr_piece_index_t piece = pieces[i];
     690
     691        assert( piece < tor->info.pieceSize );
     692
     693        /* find a peer who can ask for this piece */
    798694        for( j=0; !handled && j<peerCount; )
    799695        {
    800             const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
     696            const int val = tr_peerMsgsAddRequest( peers[j]->msgs, piece );
    801697            switch( val )
    802698            {
     
    810706                    break;
    811707                case TR_ADDREQ_OK:
    812                     tr_bitfieldAdd( t->requested, block );
     708                    tr_bitfieldAdd( t->requestedPieces, piece );
    813709                    handled = TRUE;
    814710                    break;
     
    822718        for( j=0; !handled && j<webseedCount; )
    823719        {
    824             const int val = tr_webseedAddRequest( webseeds[j], index, begin, length );
     720            const int val = tr_webseedAddRequest( webseeds[j], piece );
    825721            switch( val )
    826722            {
     
    829725                    break;
    830726                case TR_ADDREQ_OK:
    831                     tr_bitfieldAdd( t->requested, block );
     727                    tr_bitfieldAdd( t->requestedPieces, piece );
    832728                    handled = TRUE;
    833729                    break;
     
    842738    tr_free( webseeds );
    843739    tr_free( peers );
    844     tr_free( blocks );
     740    tr_free( pieces );
    845741
    846742    t->refillTimer = NULL;
    847743    torrentUnlock( t );
    848744    return FALSE;
    849 }
    850 
    851 static void
    852 broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
    853 {
    854     int i, size;
    855     tr_peer ** peers;
    856 
    857     assert( torrentIsLocked( t ) );
    858 
    859     peers = getConnectedPeers( t, &size );
    860     for( i=0; i<size; ++i )
    861         tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
    862     tr_free( peers );
    863745}
    864746
     
    905787
    906788        case TR_PEER_CANCEL:
    907             tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) );
     789            tr_bitfieldRem( t->requestedPieces, e->pieceIndex );
    908790            break;
    909791
     
    927809            tr_torrent * tor = t->tor;
    928810            tor->activityDate = now;
    929             tor->downloadedCur += e->length;
     811            /* only add this to downloadedCur if we got it from a peer --
     812             * webseeds shouldn't count against our ratio.  As one tracker
     813             * admin put it, "Those pieces are downloaded directly from the
     814             * content distributor, not the peers, it is the tracker's job
     815             * to manage the swarms, not the web server and does not fit
     816             * into the jurisdiction of the tracker." */
     817            if( peer )
     818                tor->downloadedCur += e->length;
    930819            tr_rcTransferred( tor->download, e->length );
    931820            tr_rcTransferred( tor->handle->download, e->length );
     
    960849
    961850            tr_cpBlockAdd( tor->completion, block );
    962 
    963             broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
    964851
    965852            if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
  • trunk/libtransmission/peer-msgs.c

    r6073 r6101  
    179179}
    180180
     181static void
     182reqListAppendPiece( const tr_torrent     * tor,
     183                    struct request_list  * list,
     184                    tr_piece_index_t       piece )
     185{
     186    const time_t now = time( NULL );
     187    const size_t n = tr_torPieceCountBlocks( tor, piece );
     188    const tr_block_index_t begin = tr_torPieceFirstBlock( tor, piece );
     189    const tr_block_index_t end = begin + n;
     190    tr_block_index_t i;
     191
     192    if( list->count + n >= list->max )
     193        reqListReserve( list, list->max + n );
     194
     195    for( i=begin; i<end; ++i ) {
     196        if( !tr_cpBlockIsComplete( tor->completion, i ) ) {
     197            struct peer_request * req = list->requests + list->count++;
     198            req->index = piece;
     199            req->offset = (i * tor->blockSize) - (piece * tor->info.pieceSize);
     200            req->length = tr_torBlockCountBytes( tor, i );
     201            req->time_requested = now;
     202            assert( tr_torrentReqIsValid( tor, req->index, req->offset, req->length ) );
     203        }
     204    }
     205}
     206
    181207static tr_errno
    182208reqListPop( struct request_list * list, struct peer_request * setme )
     
    196222
    197223static int
     224reqListHasPiece( struct request_list * list, const tr_piece_index_t piece )
     225{
     226    uint16_t i;
     227
     228    for( i=0; i<list->count; ++i )
     229        if( list->requests[i].index == piece )
     230            return 1;
     231
     232    return 0;
     233}
     234
     235static int
    198236reqListFind( struct request_list * list, const struct peer_request * key )
    199237{
    200238    uint16_t i;
     239
    201240    for( i=0; i<list->count; ++i )
    202241        if( !compareRequest( key, list->requests+i ) )
    203242            return i;
     243
    204244    return -1;
    205245}
     
    246286    uint8_t ut_pex_id;
    247287    uint16_t pexCount;
     288    uint16_t minActiveRequests;
    248289    uint16_t maxActiveRequests;
    249     uint16_t minActiveRequests;
    250290
    251291    tr_peer * info;
     
    459499
    460500static void
    461 fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
     501fireCancelledReq( tr_peermsgs * msgs, const tr_piece_index_t pieceIndex )
    462502{
    463503    tr_peer_event e = blankEvent;
    464504    e.eventType = TR_PEER_CANCEL;
    465     e.pieceIndex = req->index;
    466     e.offset = req->offset;
    467     e.length = req->length;
     505    e.pieceIndex = pieceIndex;
    468506    publish( msgs, &e );
    469507}
     
    684722
    685723static int
    686 reqIsValid( const tr_peermsgs   * msgs,
     724reqIsValid( const tr_peermsgs   * peer,
    687725            uint32_t              index,
    688726            uint32_t              offset,
    689727            uint32_t              length )
    690728{
    691     return tr_torrentReqIsValid( msgs->torrent, index, offset, length );
    692 }
    693 
    694 static int
    695 requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
    696 {
    697     return reqIsValid( msgs, req->index, req->offset, req->length );
     729    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
     730}
     731
     732static int
     733requestIsValid( const tr_peermsgs * peer, const struct peer_request * req )
     734{
     735    return reqIsValid( peer, req->index, req->offset, req->length );
     736}
     737
     738static void
     739tr_peerMsgsCancel( tr_peermsgs * msgs,
     740                   uint32_t      pieceIndex )
     741{
     742    uint16_t i;
     743    struct request_list tmp = REQUEST_LIST_INIT;
     744    struct request_list * src;
     745
     746    src = &msgs->clientWillAskFor;
     747    for( i=0; i<src->count; ++i )
     748        if( src->requests[i].index != pieceIndex )
     749            reqListAppend( &tmp, src->requests + i );
     750
     751    /* swap */
     752    reqListClear( &msgs->clientWillAskFor );
     753    msgs->clientWillAskFor = tmp;
     754    tmp = REQUEST_LIST_INIT;
     755
     756    src = &msgs->clientAskedFor;
     757    for( i=0; i<src->count; ++i )
     758        if( src->requests[i].index == pieceIndex )
     759            protocolSendCancel( msgs, src->requests + i );
     760        else
     761            reqListAppend( &tmp, src->requests + i );
     762
     763    /* swap */
     764    reqListClear( &msgs->clientAskedFor );
     765    msgs->clientAskedFor = tmp;
     766    tmp = REQUEST_LIST_INIT;
     767
     768    fireCancelledReq( msgs, pieceIndex );
    698769}
    699770
     
    711782        const struct peer_request * req = &tmp.requests[i];
    712783        if( req->time_requested < oldestAllowed )
    713             tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
     784            tr_peerMsgsCancel( msgs, req->index );
    714785    }
    715786    reqListClear( &tmp );
     
    721792        const struct peer_request * req = &tmp.requests[i];
    722793        if( req->time_requested < oldestAllowed )
    723             tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
     794            tr_peerMsgsCancel( msgs, req->index );
    724795    }
    725796    reqListClear( &tmp );
     
    767838pulse( void * vmsgs );
    768839
     840static int
     841requestQueueIsFull( const tr_peermsgs * msgs )
     842{
     843    const int req_max = msgs->maxActiveRequests;
     844    return msgs->clientWillAskFor.count >= req_max;
     845}
     846
    769847int
    770 tr_peerMsgsAddRequest( tr_peermsgs * msgs,
    771                        uint32_t      index,
    772                        uint32_t      offset,
    773                        uint32_t      length )
    774 {
    775     const int req_max = msgs->maxActiveRequests;
     848tr_peerMsgsAddRequest( tr_peermsgs       * msgs,
     849                       tr_piece_index_t    piece )
     850{
    776851    struct peer_request req;
    777852
    778853    assert( msgs != NULL );
    779854    assert( msgs->torrent != NULL );
    780     assert( reqIsValid( msgs, index, offset, length ) );
     855    assert( piece < msgs->torrent->info.pieceCount );
    781856
    782857    /**
     
    791866
    792867    /* peer doesn't have this piece */
    793     if( !tr_bitfieldHas( msgs->info->have, index ) )
     868    if( !tr_bitfieldHas( msgs->info->have, piece ) )
    794869        return TR_ADDREQ_MISSING;
    795870
    796871    /* peer's queue is full */
    797     if( msgs->clientWillAskFor.count >= req_max ) {
     872    if( requestQueueIsFull( msgs ) ) {
    798873        dbgmsg( msgs, "declining request because we're full" );
    799874        return TR_ADDREQ_FULL;
     
    801876
    802877    /* have we already asked for this piece? */
    803     req.index = index;
    804     req.offset = offset;
    805     req.length = length;
    806     if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
    807         dbgmsg( msgs, "declining because it's a duplicate" );
    808         return TR_ADDREQ_DUPLICATE;
    809     }
    810     if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
     878    if( reqListHasPiece( &msgs->clientAskedFor, piece ) ||
     879        reqListHasPiece( &msgs->clientWillAskFor, piece ) ) {
    811880        dbgmsg( msgs, "declining because it's a duplicate" );
    812881        return TR_ADDREQ_DUPLICATE;
     
    817886    **/
    818887
    819     dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
     888    dbgmsg( msgs, "added req for piece %lu", (unsigned long)piece );
    820889    req.time_requested = time( NULL );
    821     reqListAppend( &msgs->clientWillAskFor, &req );
     890    reqListAppendPiece( msgs->torrent, &msgs->clientWillAskFor, piece );
    822891    return TR_ADDREQ_OK;
    823892}
     
    834903
    835904    for( i=0; i<a.count; ++i )
    836         fireCancelledReq( msgs, &a.requests[i] );
     905        fireCancelledReq( msgs, a.requests[i].index );
    837906
    838907    for( i=0; i<b.count; ++i ) {
    839         fireCancelledReq( msgs, &b.requests[i] );
     908        fireCancelledReq( msgs, b.requests[i].index );
    840909        protocolSendCancel( msgs, &b.requests[i] );
    841910    }
     
    843912    reqListClear( &a );
    844913    reqListClear( &b );
    845 }
    846 
    847 void
    848 tr_peerMsgsCancel( tr_peermsgs * msgs,
    849                    uint32_t      pieceIndex,
    850                    uint32_t      offset,
    851                    uint32_t      length )
    852 {
    853     struct peer_request req;
    854 
    855     assert( msgs != NULL );
    856     assert( length > 0 );
    857 
    858     /* have we asked the peer for this piece? */
    859     req.index = pieceIndex;
    860     req.offset = offset;
    861     req.length = length;
    862 
    863     /* if it's only in the queue and hasn't been sent yet, free it */
    864     if( !reqListRemove( &msgs->clientWillAskFor, &req ) )
    865         fireCancelledReq( msgs, &req );
    866 
    867     /* if it's already been sent, send a cancel message too */
    868     if( !reqListRemove( &msgs->clientAskedFor, &req ) ) {
    869         protocolSendCancel( msgs, &req );
    870         fireCancelledReq( msgs, &req );
    871     }
    872914}
    873915
     
    15721614
    15731615static int
    1574 ratePulse( void * vmsgs )
    1575 {
    1576     tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
    1577     msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
    1578     msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
    1579     msgs->maxActiveRequests = MIN( 4 + (int)(msgs->info->rateToClient/4), MAX_QUEUE_SIZE );
    1580     msgs->minActiveRequests = msgs->maxActiveRequests / 3;
     1616getMaxBlocksFromPeerSoon( const tr_peermsgs * peer )
     1617{
     1618    const double seconds = 30;
     1619    const double bytesPerSecond = peer->info->rateToClient * 1024;
     1620    const double totalBytes = bytesPerSecond * seconds;
     1621    const int blockCount = totalBytes / peer->torrent->blockSize;
     1622    /*fprintf( stderr, "rate %f -- blockCount %d\n", peer->info->rateToClient, blockCount );*/
     1623    return blockCount;
     1624}
     1625
     1626static int
     1627ratePulse( void * vpeer )
     1628{
     1629    tr_peermsgs * peer = vpeer;
     1630    peer->info->rateToClient = tr_rcRate( peer->info->rcToClient );
     1631    peer->info->rateToPeer = tr_rcRate( peer->info->rcToPeer );
     1632    peer->minActiveRequests = 4;
     1633    peer->maxActiveRequests = peer->minActiveRequests + getMaxBlocksFromPeerSoon( peer );
    15811634    return TRUE;
    15821635}
  • trunk/libtransmission/peer-msgs.h

    r6073 r6101  
    3636                              uint32_t      pieceIndex );
    3737
     38#if 0
    3839void         tr_peerMsgsCancel( tr_peermsgs * msgs,
    3940                                uint32_t      pieceIndex,
    4041                                uint32_t      offset,
    4142                                uint32_t      length );
     43#endif
    4244
    4345void         tr_peerMsgsFree( tr_peermsgs* );
    4446
     47int          tr_peerMsgsAddRequest( tr_peermsgs      * peer,
     48                                    tr_piece_index_t   piece );
    4549
    46 #if 0
    47 enum {
    48     TR_ADDREQ_OK=0,
    49     TR_ADDREQ_FULL,
    50     TR_ADDREQ_DUPLICATE,
    51     TR_ADDREQ_MISSING,
    52     TR_ADDREQ_CLIENT_CHOKED
    53 };
    54 #endif
    55 
    56 int          tr_peerMsgsAddRequest( tr_peermsgs * peer,
    57                                     uint32_t      index,
    58                                     uint32_t      begin,
    59                                     uint32_t      length );
    60 
    61 /**
    62 ***  PeerMsgs Publish / Subscribe
    63 **/
    64 
    65 #if 0
    66 typedef enum
    67 {
    68     TR_PEERMSG_CLIENT_HAVE,
    69     TR_PEERMSG_CLIENT_BLOCK,
    70     TR_PEERMSG_PIECE_DATA,
    71     TR_PEERMSG_PEER_PROGRESS,
    72     TR_PEERMSG_ERROR,
    73     TR_PEERMSG_CANCEL,
    74     TR_PEERMSG_NEED_REQ
    75 }
    76 PeerMsgsEventType;
    77 
    78 typedef struct
    79 {
    80     PeerMsgsEventType eventType;
    81     uint32_t pieceIndex; /* for TR_PEERMSG_GOT_BLOCK, TR_PEERMSG_GOT_HAVE */
    82     uint32_t offset;     /* for TR_PEERMSG_GOT_BLOCK */
    83     uint32_t length;     /* for TR_PEERMSG_GOT_BLOCK */
    84     float progress;      /* for TR_PEERMSG_PEER_PROGRESS */
    85     tr_errno err;        /* for TR_PEERMSG_GOT_ERROR */
    86 }
    87 tr_peermsgs_event;
    88 #endif
    89 
    90 void  tr_peerMsgsUnsubscribe ( tr_peermsgs       * peer,
    91                                tr_publisher_tag    tag );
     50void         tr_peerMsgsUnsubscribe ( tr_peermsgs       * peer,
     51                                      tr_publisher_tag    tag );
    9252
    9353#endif
  • trunk/libtransmission/torrent.c

    r6093 r6101  
    14281428        err = 4;
    14291429
     1430if( err ) fprintf( stderr, "index %lu offset %lu length %lu err %d\n", (unsigned long)index, (unsigned long)offset, (unsigned long)length, err );
     1431
    14301432    return !err;
    14311433}
  • trunk/libtransmission/utils-test.c

    r6072 r6101  
    3434    int i;
    3535    int bitcount = 5000000;
    36     size_t pos;
    3736    tr_bitfield * field = tr_bitfieldNew( bitcount );
    3837
     
    4645        check( tr_bitfieldHas( field, i ) == (!(i%7)) );
    4746
     47#if 0
    4848    /* testing the "find next" function */
    4949    check( tr_bitfieldFindTrue( field, 0, &pos ) );
     
    6565    check( tr_bitfieldFindTrue( field, 16, &pos ) );
    6666    check( pos == 21 );
    67 
     67#endif
    6868
    6969    tr_bitfieldFree( field );
  • trunk/libtransmission/utils.c

    r6073 r6101  
    733733}
    734734
     735#if 0
    735736static int
    736737find_top_bit( uint8_t val )
     
    777778    return 0;
    778779}
     780#endif
    779781
    780782int
  • trunk/libtransmission/utils.h

    r6073 r6101  
    264264tr_bitfield* tr_bitfieldOr( tr_bitfield*, const tr_bitfield* );
    265265
     266#if 0
    266267/** @brief finds the first true bit in the bitfield, starting at `startPos'
    267268    @param setmePos the position of the true bit, if found, is set here.
     
    270271                         size_t               startPos,
    271272                         size_t             * setmePos );
     273#endif
    272274
    273275
  • trunk/libtransmission/webseed.c

    r6073 r6101  
    2424#include "webseed.h"
    2525
     26#define MAX_QUEUE_SIZE 4
     27
    2628struct tr_webseed
    2729{
    28     unsigned int        busy : 1;
    2930    unsigned int        dead : 1;
    3031
     
    3536    void              * callback_userdata;
    3637
    37     tr_piece_index_t    pieceIndex;
    38     uint32_t            pieceOffset;
    39     uint32_t            byteCount;
     38    uint64_t            bytesSaved;
     39
     40    tr_piece_index_t    queue[MAX_QUEUE_SIZE];
     41    int                 queueSize;
    4042
    4143    struct evbuffer   * content;
     
    146148    const int success = ( response_code == 206 );
    147149
    148 fprintf( stderr, "server responded with code %ld and %lu bytes\n", response_code, (unsigned long)response_byte_count );
     150/*fprintf( stderr, "server responded with code %ld and %lu bytes\n", response_code, (unsigned long)response_byte_count );*/
    149151    if( !success )
    150152    {
    151153        /* FIXME */
    152154    }
     155    else if( w->dead )
     156    {
     157        tr_webseedFree( w );
     158    }
    153159    else
    154160    {
     161        const tr_piece_index_t piece = w->queue[0];
     162        tr_block_index_t block;
     163        size_t len;
     164
    155165        evbuffer_add( w->content, response, response_byte_count );
    156         if( !w->dead )
    157             fireClientGotData( w, response_byte_count );
    158 
    159         if( EVBUFFER_LENGTH( w->content ) < w->byteCount )
     166
     167        fireClientGotData( w, response_byte_count );
     168
     169        block = _tr_block( w->torrent, piece, w->bytesSaved );
     170        len = tr_torBlockCountBytes( w->torrent, block );
     171
     172        while( EVBUFFER_LENGTH( w->content ) >= len )
     173        {
     174/*fprintf( stderr, "saving piece index %lu, offset %lu, len %lu\n", (unsigned long)piece, (unsigned long)w->bytesSaved, (unsigned long)len );*/
     175            /* save one block */
     176            tr_ioWrite( w->torrent, piece, w->bytesSaved, len,
     177                        EVBUFFER_DATA(w->content) );
     178            evbuffer_drain( w->content, len );
     179            fireClientGotBlock( w, piece, w->bytesSaved, len );
     180            w->bytesSaved += len;
     181
     182            /* march to the next one */
     183            ++block;
     184            len = tr_torBlockCountBytes( w->torrent, block );
     185        }
     186
     187        if( w->bytesSaved < tr_torPieceCountBytes( w->torrent, piece ) )
    160188            requestNextChunk( w );
    161189        else {
    162             tr_ioWrite( w->torrent, w->pieceIndex, w->pieceOffset, w->byteCount, EVBUFFER_DATA(w->content) );
     190            w->bytesSaved = 0;
    163191            evbuffer_drain( w->content, EVBUFFER_LENGTH( w->content ) );
    164             w->busy = 0;
    165             if( w->dead )
    166                 tr_webseedFree( w );
    167             else  {
    168                 fireClientGotBlock( w, w->pieceIndex, w->pieceOffset, w->byteCount );
     192/*fprintf( stderr, "w->callback_userdata is %p\n", w->callback_userdata );*/
     193            memmove( w->queue, w->queue+1, sizeof(tr_piece_index_t)*(MAX_QUEUE_SIZE-1) );
     194/*fprintf( stderr, "w->callback_userdata is %p\n", w->callback_userdata );*/
     195            if( --w->queueSize )
     196                requestNextChunk( w );
     197            if( w->queueSize < ( MAX_QUEUE_SIZE / 2 ) )
    169198                fireNeedReq( w );
    170             }
    171199        }
    172200    }
     
    177205{
    178206    const tr_info * inf = tr_torrentInfo( w->torrent );
    179     const uint32_t have = EVBUFFER_LENGTH( w->content );
    180     const uint32_t left = w->byteCount - have;
    181     const uint32_t pieceOffset = w->pieceOffset + have;
     207    const uint32_t have = w->bytesSaved + EVBUFFER_LENGTH( w->content );
     208    const tr_piece_index_t piece = w->queue[0];
     209    const uint32_t left = tr_torPieceCountBytes( w->torrent, piece ) - have;
     210    const uint32_t pieceOffset = have;
    182211    tr_file_index_t fileIndex;
    183212    uint64_t fileOffset;
     
    186215    char * range;
    187216
    188     tr_ioFindFileLocation( w->torrent, w->pieceIndex, pieceOffset,
     217    tr_ioFindFileLocation( w->torrent, piece, pieceOffset,
    189218                           &fileIndex, &fileOffset );
    190219    thisPass = MIN( left, inf->files[fileIndex].length - fileOffset );
    191220
    192221    url = makeURL( w, &inf->files[fileIndex] );
    193 //fprintf( stderr, "url is [%s]\n", url );
    194222    range = tr_strdup_printf( "%"PRIu64"-%"PRIu64, fileOffset, fileOffset + thisPass - 1 );
    195 fprintf( stderr, "range is [%s] ... we want %lu total, we have %lu, so %lu are left, and we're asking for %lu this time\n", range, (unsigned long)w->byteCount, (unsigned long)have, (unsigned long)left, (unsigned long)thisPass );
     223/*fprintf( stderr, "range is [%s] ... we want %lu total, we have %lu, so %lu are left, and we're asking for %lu this time\n", range, (unsigned long)tr_torPieceCountBytes(w->torrent,piece), (unsigned long)have, (unsigned long)left, (unsigned long)thisPass );*/
    196224    tr_webRun( w->torrent->handle, url, range, webResponseFunc, w );
    197225    tr_free( range );
     
    200228
    201229int
    202 tr_webseedAddRequest( tr_webseed  * w,
    203                       uint32_t      pieceIndex,
    204                       uint32_t      pieceOffset,
    205                       uint32_t      byteCount )
     230tr_webseedAddRequest( tr_webseed        * w,
     231                      tr_piece_index_t    piece )
    206232{
    207233    int ret;
    208234
    209     if( w->busy || w->dead )
     235    if( w->dead || w->queueSize >= MAX_QUEUE_SIZE )
    210236    {
    211237        ret = TR_ADDREQ_FULL;
     
    213239    else
    214240    {
    215 fprintf( stderr, "webseed is starting a new piece here -- piece %lu, offset %lu!!!\n", (unsigned long)pieceIndex, (unsigned long)pieceOffset );
    216         w->busy = 1;
    217         w->pieceIndex = pieceIndex;
    218         w->pieceOffset = pieceOffset;
    219         w->byteCount = byteCount;
    220         evbuffer_drain( w->content, EVBUFFER_LENGTH( w->content ) );
    221         requestNextChunk( w );
     241        int wasEmpty = w->queueSize == 0;
     242        w->queue[ w->queueSize++ ] = piece;
     243        if( wasEmpty )
     244            requestNextChunk( w );
    222245        ret = TR_ADDREQ_OK;
    223246    }
     
    242265    w->callback = callback;
    243266    w->callback_userdata = callback_userdata;
     267/*fprintf( stderr, "w->callback_userdata is %p\n", w->callback_userdata );*/
    244268    return w;
    245269}
     
    250274    if( w )
    251275    {
    252         if( w->busy )
     276        if( w->queueSize > 0 )
    253277        {
    254278            w->dead = 1;
  • trunk/libtransmission/webseed.h

    r6073 r6101  
    2525void tr_webseedFree( tr_webseed * );
    2626
    27 int tr_webseedAddRequest( tr_webseed  * w,
    28                           uint32_t      index,
    29                           uint32_t      begin,
    30                           uint32_t      length );
     27int tr_webseedAddRequest( tr_webseed          * w,
     28                          tr_piece_index_t      piece );
    3129
    3230#endif
Note: See TracChangeset for help on using the changeset viewer.