Changeset 3762


Ignore:
Timestamp:
Nov 9, 2007, 1:22:15 AM (14 years ago)
Author:
charles
Message:
  • try to get peers to scale up to speed faster.
  • remove unnecessary steps when receiving block data from peers.
Location:
trunk/libtransmission
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/libtransmission/peer-msgs.c

    r3760 r3762  
    3939***
    4040**/
    41 
    42 #define MAX_ALLOWED_SET_COUNT   10 /* number of pieces generated for allow-fast,
    43                                       threshold for fast-allowing others */
    4441
    4542enum
     
    6865    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
    6966
    70     KEEPALIVE_INTERVAL_SECS = 90,          /* idle seconds before we send a keepalive */
     67    /* idle seconds before we send a keepalive */
     68    KEEPALIVE_INTERVAL_SECS = 90,
     69
    7170    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
    7271    PEER_PULSE_INTERVAL     = (100),       /* msec between calls to pulse() */
    7372    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
     73
     74    /* number of pieces generated for allow-fast,
     75      threshold for fast-allowing others */
     76    MAX_ALLOWED_SET_COUNT   = 10
    7477};
    7578
     
    7780{
    7881    AWAITING_BT_LENGTH,
    79     AWAITING_BT_MESSAGE,
    80     READING_BT_PIECE
     82    AWAITING_BT_MESSAGE
    8183};
    8284
     
    9294compareRequest( const void * va, const void * vb )
    9395{
    94     struct peer_request * a = (struct peer_request*) va;
    95     struct peer_request * b = (struct peer_request*) vb;
    96     if( a->index != b->index ) return a->index - b->index;
    97     if( a->offset != b->offset ) return a->offset - b->offset;
    98     if( a->length != b->length ) return a->length - b->length;
     96    int i;
     97    const struct peer_request * a = va;
     98    const struct peer_request * b = vb;
     99    if(( i = tr_compareUint32( a->index, b->index ))) return i;
     100    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
     101    if(( i = tr_compareUint32( a->length, b->length ))) return i;
    99102    return 0;
    100103}
     
    112115    struct evbuffer * outBlock;    /* buffer of all the current piece message */
    113116    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
    114     struct evbuffer * inBlock;     /* the block we're currently receiving */
    115117    tr_list * peerAskedFor;
    116118    tr_list * peerAskedForFast;
     
    121123    tr_timer * pulseTimer;
    122124    tr_timer * pexTimer;
    123 
    124     struct peer_request blockToUs; /* the block currntly being sent to us */
    125125
    126126    time_t lastReqAddedAt;
     
    283283
    284284static void
    285 fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t length )
     285fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
    286286{
    287287    tr_peermsgs_event e = blankEvent;
    288288    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
    289     e.pieceIndex = pieceIndex;
    290     e.offset = offset;
    291     e.length = length;
     289    e.pieceIndex = req->index;
     290    e.offset = req->offset;
     291    e.length = req->length;
    292292    publish( msgs, &e );
    293293}
     
    433433#endif
    434434static void
    435 sendFastHave( tr_peermsgs * msgs,
    436               int           all)
    437 {
    438     dbgmsg( msgs, "w00t telling them we %s pieces", (all ? "HAVE_ALL" : "HAVE_NONE" ) );
     435sendFastHave( tr_peermsgs * msgs, int all )
     436{
     437    dbgmsg( msgs, "w00t telling them we have %s pieces", (all ? "ALL" : "NONE" ) );
    439438    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
    440     tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL : BT_HAVE_NONE ) );
    441    
     439    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
     440                                                            : BT_HAVE_NONE ) );
    442441    updateInterest( msgs );
    443442}
     
    492491
    493492static int
    494 reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length )
     493reqIsValid( const tr_peermsgs   * msgs,
     494            uint32_t              index,
     495            uint32_t              offset,
     496            uint32_t              length )
    495497{
    496498    const tr_torrent * tor = msgs->torrent;
     
    529531    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
    530532    {
    531         struct peer_request * req = tr_list_pop_front( &msgs->clientWillAskFor );
    532         protocolSendRequest( msgs, req );
    533         req->time_requested = msgs->lastReqAddedAt = time( NULL );
    534         tr_list_append( &msgs->clientAskedFor, req );
     533        struct peer_request * r = tr_list_pop_front( &msgs->clientWillAskFor );
     534        protocolSendRequest( msgs, r );
     535        r->time_requested = msgs->lastReqAddedAt = time( NULL );
     536        tr_list_append( &msgs->clientAskedFor, r );
    535537        ++count;
    536538        ++sent;
     
    890892        sendFastReject( msgs, req->index, req->offset, req->length );
    891893    }
    892     else if( peerIsChoked && !peerIsFast ) /* maybe he doesn't know he's choked? */
     894    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
    893895    {
    894896        tr_peerMsgsSetChoke( msgs, 1 );
     
    950952}
    951953
     954static void
     955clientGotBlock( tr_peermsgs * msgs, struct evbuffer * inbuf, const struct peer_request * req );
    952956
    953957static int
     
    10381042        }
    10391043
    1040         case BT_PIECE:
    1041             dbgmsg( msgs, "got a Piece!" );
    1042             assert( msgs->blockToUs.length == 0 );
    1043             tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
    1044             tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
    1045             msgs->blockToUs.length = msglen - 8;
    1046             assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
    1047             msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
    1048             return READ_AGAIN;
     1044        case BT_PIECE: {
     1045            struct peer_request req;
     1046            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
     1047            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
     1048            req.length = msglen - 8;
     1049            dbgmsg( msgs, "got a Block %u:%u->%u", req.index, req.offset, req.length );
     1050            clientGotBlock( msgs, inbuf, &req );
    10491051            break;
     1052        }
    10501053
    10511054        case BT_PORT:
     
    11701173
    11711174static void
    1172 gotUnwantedBlock( tr_peermsgs * msgs,
    1173                   uint32_t      index UNUSED,
    1174                   uint32_t      offset UNUSED,
    1175                   uint32_t      length )
    1176 {
    1177     reassignBytesToCorrupt( msgs, length );
    1178 }
    1179 
    1180 static void
    1181 addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
     1175clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
     1176{
     1177    reassignBytesToCorrupt( msgs, req->length );
     1178}
     1179
     1180static void
     1181addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
    11821182{
    11831183    if( !msgs->info->blame )
     
    11871187
    11881188static void
    1189 gotBlock( tr_peermsgs      * msgs,
    1190           struct evbuffer  * inbuf,
    1191           uint32_t           index,
    1192           uint32_t           offset,
    1193           uint32_t           length )
    1194 {
     1189clientGotBlock( tr_peermsgs * msgs, struct evbuffer * inbuf, const struct peer_request * req )
     1190{
     1191    int i;
     1192    uint8_t * data;
    11951193    tr_torrent * tor = msgs->torrent;
    1196     const int block = _tr_block( tor, index, offset );
    1197     struct peer_request key, *req;
     1194    const int block = _tr_block( tor, req->index, req->offset );
     1195    struct peer_request *myreq;
     1196
     1197    assert( msgs != NULL );
     1198    assert( inbuf != NULL );
     1199    assert( req != NULL );
     1200    assert( req->length > 0 );
     1201    assert( EVBUFFER_LENGTH( inbuf ) >= req->length );
     1202    assert( req->length == (uint32_t)tr_torBlockCountBytes( msgs->torrent, block ) );
     1203
     1204    /* save the block */
     1205    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
    11981206
    11991207    /**
     
    12011209    **/
    12021210
    1203     key.index = index;
    1204     key.offset = offset;
    1205     key.length = length;
    1206     req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
    1207                                                  compareRequest );
    1208     if( req == NULL ) {
    1209         gotUnwantedBlock( msgs, index, offset, length );
     1211    myreq = tr_list_remove( &msgs->clientAskedFor, req, compareRequest );
     1212    if( myreq == NULL ) {
     1213        clientGotUnwantedBlock( msgs, req );
    12101214        dbgmsg( msgs, "we didn't ask for this message..." );
    12111215        return;
    12121216    }
     1217
    12131218    dbgmsg( msgs, "got block %u:%u->%u (turnaround time %d secs)",
    1214                      req->index, req->offset, req->length,
    1215                      (int)(time(NULL) - req->time_requested) );
    1216     tr_free( req );
     1219                  myreq->index, myreq->offset, myreq->length,
     1220                  (int)(time(NULL) - myreq->time_requested) );
    12171221    dbgmsg( msgs, "peer has %d more blocks we've asked for",
    12181222                  tr_list_size(msgs->clientAskedFor));
     
    12231227
    12241228    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
    1225         dbgmsg( msgs, "have this block already..." );
    1226         tr_dbg( "have this block already..." );
    1227         gotUnwantedBlock( msgs, index, offset, length );
     1229        dbgmsg( msgs, "we have this block already..." );
     1230        clientGotUnwantedBlock( msgs, req );
    12281231        return;
    12291232    }
    12301233
    1231     if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
    1232         dbgmsg( msgs, "block is the wrong length..." );
    1233         tr_dbg( "block is the wrong length..." );
    1234         gotUnwantedBlock( msgs, index, offset, length );
     1234    /**
     1235    ***  Save the block
     1236    **/
     1237
     1238    data = tr_new( uint8_t, req->length );
     1239    tr_peerIoReadBytes( msgs->io, inbuf, data, req->length );
     1240    msgs->info->peerSentPieceDataAt = time( NULL );
     1241    clientGotBytes( msgs, req->length );
     1242    i = tr_ioWrite( tor, req->index, req->offset, req->length, data );
     1243    tr_free( data );
     1244    if( i )
    12351245        return;
    1236     }
    1237 
    1238     /**
    1239     ***  Write the block
    1240     **/
    1241 
    1242     if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf )))
    1243         return;
    12441246
    12451247#warning this sanity check is here to help track down the excess corrupt data bug, but is expensive and should be removed before the next release
    12461248{
    1247     uint8_t * tmp = tr_new( uint8_t, length );
    1248     const int val = tr_ioRead( tor, index, offset, length, tmp );
     1249    uint8_t * check = tr_new( uint8_t, req->length );
     1250    const int val = tr_ioRead( tor, req->index, req->offset, req->length, check );
    12491251    assert( !val );
    1250     assert( !memcmp( tmp, EVBUFFER_DATA(inbuf), length ) );
    1251     tr_free( tmp );
     1252    assert( !memcmp( check, data, req->length ) );
     1253    tr_free( check );
    12521254}
    12531255
    12541256    tr_cpBlockAdd( tor->completion, block );
    12551257
    1256     addUsToBlamefield( msgs, index );
    1257 
    1258     fireGotBlock( msgs, index, offset, length );
     1258    addPeerToBlamefield( msgs, req->index );
     1259
     1260    fireGotBlock( msgs, req );
    12591261
    12601262    /**
     
    12621264    **/
    12631265
    1264     if( tr_cpPieceIsComplete( tor->completion, index ) )
    1265     {
    1266         if( tr_ioHash( tor, index ) )
     1266    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
     1267    {
     1268        if( tr_ioHash( tor, req->index ) )
    12671269        {
    1268             gotBadPiece( msgs, index );
     1270            gotBadPiece( msgs, req->index );
    12691271            return;
    12701272        }
    12711273
    1272         fireClientHave( msgs, index );
    1273     }
    1274 }
    1275 
    1276 
    1277 static ReadState
    1278 readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
    1279 {
    1280     uint32_t inlen;
    1281     uint8_t * tmp;
    1282 
    1283     assert( msgs != NULL );
    1284     assert( msgs->blockToUs.length > 0 );
    1285     assert( inbuf != NULL );
    1286     assert( EVBUFFER_LENGTH( inbuf ) > 0 );
    1287 
    1288     /* read from the inbuf into our block buffer */
    1289     inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
    1290     tmp = tr_new( uint8_t, inlen );
    1291     tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
    1292     evbuffer_add( msgs->inBlock, tmp, inlen );
    1293 
    1294     /* update our tables accordingly */
    1295     assert( inlen >= msgs->blockToUs.length );
    1296     msgs->blockToUs.length -= inlen;
    1297     msgs->info->peerSentPieceDataAt = time( NULL );
    1298     clientGotBytes( msgs, inlen );
    1299 
    1300     /* if this was the entire block, save it */
    1301     if( !msgs->blockToUs.length )
    1302     {
    1303         dbgmsg( msgs, "got block %u:%u", msgs->blockToUs.index, msgs->blockToUs.offset );
    1304         assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
    1305         gotBlock( msgs, msgs->inBlock,
    1306                         msgs->blockToUs.index,
    1307                         msgs->blockToUs.offset,
    1308                         EVBUFFER_LENGTH( msgs->inBlock ) );
    1309         evbuffer_drain( msgs->inBlock, EVBUFFER_LENGTH( msgs->inBlock ) );
    1310         msgs->state = AWAITING_BT_LENGTH;
    1311     }
    1312 
    1313     /* cleanup */
    1314     tr_free( tmp );
    1315     return READ_AGAIN;
     1274        fireClientHave( msgs, req->index );
     1275    }
    13161276}
    13171277
     
    13331293        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
    13341294        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
    1335         case READING_BT_PIECE:    ret = readBtPiece   ( msgs, inbuf ); break;
    13361295        default: assert( 0 );
    13371296    }
     
    13711330
    13721331    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
    1373         return tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
     1332        return tor->handle->useUploadLimit
     1333            ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
    13741334
    13751335    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
     
    13851345    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
    13861346    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
    1387     msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/10), 100 );
    1388     msgs->minActiveRequests = msgs->maxActiveRequests / 2;
     1347    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/5), 100 );
     1348    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
    13891349    return TRUE;
    13901350}
     
    16771637    m->outMessages = evbuffer_new( );
    16781638    m->outBlock = evbuffer_new( );
    1679     m->inBlock = evbuffer_new( );
    16801639    m->peerAllowedPieces = NULL;
    16811640    m->clientAllowedPieces = NULL;
     
    16971656    }
    16981657   
    1699     tr_peerIoSetTimeoutSecs( m->io, 150 ); /* error if we don't read or write for 2.5 minutes */
     1658    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
    17001659    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
    17011660    tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
     
    17421701        evbuffer_free( msgs->outMessages );
    17431702        evbuffer_free( msgs->outBlock );
    1744         evbuffer_free( msgs->inBlock );
    17451703        tr_free( msgs->pex );
    17461704        msgs->pexCount = 0;
  • trunk/libtransmission/trevent.c

    r3457 r3762  
    229229    eh->lock = tr_lockNew( );
    230230    eh->h = handle;
    231     eh->pulseInterval = timevalMsec( 50 );
     231    eh->pulseInterval = timevalMsec( 20 );
    232232    eh->thread = tr_threadNew( libeventThreadFunc, eh, "libeventThreadFunc" );
    233233}
Note: See TracChangeset for help on using the changeset viewer.