Changeset 3839


Ignore:
Timestamp:
Nov 16, 2007, 8:40:03 PM (15 years ago)
Author:
charles
Message:

progress on the "speed limits kill my transfer rate" bug.

Location:
trunk/libtransmission
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/libtransmission/handshake.c

    r3785 r3839  
    10311031    handshake->handle = tr_peerIoGetHandle( io );
    10321032   
    1033     tr_peerIoSetIOMode( handshake->io, EV_READ|EV_WRITE, 0 );
    10341033    tr_peerIoSetIOFuncs( handshake->io, canRead, NULL, gotError, handshake );
    10351034
  • trunk/libtransmission/peer-io.c

    r3830 r3839  
    9999                    continue;
    100100            case READ_MORE:
    101                 tr_peerIoSetIOMode( c, EV_READ, 0 );
    102101            case READ_DONE:
    103102                done = 1;
     
    119118***
    120119**/
     120
     121void bufferevent_setwatermark(struct bufferevent *, short, size_t, size_t);
    121122
    122123static tr_peerIo*
     
    143144    bufferevent_settimeout( c->bufev, IO_TIMEOUT_SECS, IO_TIMEOUT_SECS );
    144145    bufferevent_enable( c->bufev, EV_READ|EV_WRITE );
     146    bufferevent_setwatermark( c->bufev, EV_READ, 0, 1024 );
     147
    145148    return c;
    146149}
     
    234237}
    235238
     239void
     240tr_peerIoTryRead( tr_peerIo * io )
     241{
     242    if( EVBUFFER_LENGTH( io->bufev->input ) )
     243        canReadWrapper( io->bufev, io );
     244}
     245
    236246void
    237247tr_peerIoSetIOFuncs( tr_peerIo          * io,
     
    246256    io->userData = userData;
    247257
    248     if( EVBUFFER_LENGTH( io->bufev->input ) )
    249         canReadWrapper( io->bufev, io );
    250 }
    251 
    252 void
    253 tr_peerIoSetIOMode( tr_peerIo * io, short enable, short disable )
    254 {
    255     assert( tr_amInEventThread( io->handle ) );
    256     bufferevent_enable( io->bufev, enable );
    257     bufferevent_disable( io->bufev, disable );
     258    tr_peerIoTryRead( io );
    258259}
    259260
     
    285286        bufferevent_settimeout( io->bufev, IO_TIMEOUT_SECS, IO_TIMEOUT_SECS );
    286287        bufferevent_enable( io->bufev, EV_READ|EV_WRITE );
     288        bufferevent_setwatermark( io->bufev, EV_READ, 0, 1024 );
    287289
    288290        return 0;
  • trunk/libtransmission/peer-io.h

    r3501 r3839  
    109109                           void             * user_data );
    110110
    111 void  tr_peerIoSetIOMode ( tr_peerIo   * io,
    112                            short         enable_mode,
    113                            short         disable_mode );
     111size_t tr_peerIoWriteBytesWaiting( const tr_peerIo * io );
    114112
    115 size_t tr_peerIoWriteBytesWaiting( const tr_peerIo * io );
     113void tr_peerIoTryRead( tr_peerIo * io );
    116114
    117115void tr_peerIoWrite( tr_peerIo   * io,
  • trunk/libtransmission/peer-msgs.c

    r3836 r3839  
    8181{
    8282    AWAITING_BT_LENGTH,
    83     AWAITING_BT_MESSAGE
     83    AWAITING_BT_ID,
     84    AWAITING_BT_MESSAGE,
     85    AWAITING_BT_PIECE
    8486};
    8587
     
    103105    return 0;
    104106}
     107
     108/* this is raw, unchanged data from the peer regarding
     109 * the current message that it's sending us. */
     110struct tr_incoming
     111{
     112    uint32_t length; /* includes the +1 for id length */
     113    uint8_t id;
     114    struct peer_request blockReq; /* metadata for incoming blocks */
     115    struct evbuffer * block; /* piece data for incoming blocks */
     116};
    105117
    106118struct tr_peermsgs
     
    142154    uint8_t ut_pex_id;
    143155    uint16_t pexCount;
    144     uint32_t incomingMessageLength;
    145156    uint32_t maxActiveRequests;
    146157    uint32_t minActiveRequests;
     158
     159    struct tr_incoming incoming;
    147160
    148161    tr_pex * pex;
     
    865878{
    866879    uint32_t len;
    867     const size_t needlen = sizeof(uint32_t);
    868 
    869     if( EVBUFFER_LENGTH(inbuf) < needlen )
     880
     881    if( EVBUFFER_LENGTH(inbuf) < sizeof(len) )
    870882        return READ_MORE;
    871883
     
    875887        dbgmsg( msgs, "got KeepAlive" );
    876888    else {
    877         msgs->incomingMessageLength = len;
     889        msgs->incoming.length = len;
     890        msgs->state = AWAITING_BT_ID;
     891    }
     892dbgmsg( msgs, "readBtLength: got a length of %d, msgs->state is now %d", (int)len, (int)msgs->state );
     893
     894    return READ_AGAIN;
     895}
     896
     897static int
     898readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf );
     899
     900static int
     901readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf )
     902{
     903    uint8_t id;
     904
     905    if( EVBUFFER_LENGTH(inbuf) < sizeof(uint8_t) )
     906        return READ_MORE;
     907
     908    tr_peerIoReadUint8( msgs->io, inbuf, &id );
     909    msgs->incoming.id = id;
     910
     911    if( id==BT_PIECE )
     912    {
     913        msgs->state = AWAITING_BT_PIECE;
     914        return READ_AGAIN;
     915    }
     916    else if( msgs->incoming.length != 1 )
     917    {
    878918        msgs->state = AWAITING_BT_MESSAGE;
    879     }
    880 
    881     return READ_AGAIN;
     919        return READ_AGAIN;
     920    }
     921    else return readBtMessage( msgs, inbuf );
    882922}
    883923
     
    9901030
    9911031static int
     1032readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
     1033{
     1034    struct peer_request * req = &msgs->incoming.blockReq;
     1035    dbgmsg( msgs, "In readBtPiece" );
     1036
     1037    if( !req->length )
     1038    {
     1039        if( EVBUFFER_LENGTH( inbuf ) < 8 )
     1040            return READ_MORE;
     1041
     1042        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
     1043        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
     1044        req->length = msgs->incoming.length - 9;
     1045        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
     1046        return READ_AGAIN;
     1047    }
     1048    else
     1049    {
     1050        int err;
     1051
     1052        /* read in another chunk of data */
     1053        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
     1054        size_t n = MIN( nLeft, EVBUFFER_LENGTH(inbuf) );
     1055        uint8_t * buf = tr_new( uint8_t, n );
     1056        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
     1057        evbuffer_add( msgs->incoming.block, buf, n );
     1058        tr_free( buf );
     1059        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
     1060               (int)n, req->index, req->offset, req->length,
     1061               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
     1062        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
     1063            return READ_MORE;
     1064
     1065        /* we've got the whole block ... process it */
     1066        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
     1067
     1068        /* cleanup */
     1069        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
     1070        req->length = 0;
     1071        msgs->state = AWAITING_BT_LENGTH;
     1072        if( !err )
     1073            return READ_AGAIN;
     1074        else {
     1075            fireGotAssertError( msgs );
     1076            return READ_DONE;
     1077        }
     1078    }
     1079}
     1080
     1081static int
    9921082readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
    9931083{
    994     int ret;
    995     uint8_t id;
    9961084    uint32_t ui32;
    997     uint32_t msglen = msgs->incomingMessageLength;
     1085    uint32_t msglen = msgs->incoming.length;
     1086    const uint8_t id = msgs->incoming.id;
    9981087    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
    9991088
    1000     if( EVBUFFER_LENGTH(inbuf) < msglen )
     1089dbgmsg( msgs, "in readBtMessage" );
     1090
     1091    --msglen; // id length
     1092
     1093    if( EVBUFFER_LENGTH(inbuf) < msglen ) {
     1094        dbgmsg( msgs, " too short!!! " );
    10011095        return READ_MORE;
    1002 
    1003     tr_peerIoReadUint8( msgs->io, inbuf, &id );
     1096    }
     1097
    10041098    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)EVBUFFER_LENGTH(inbuf) );
    10051099
    1006     if( !messageLengthIsCorrect( msgs, id, msglen ) )
     1100    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
    10071101    {
    10081102        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
     
    10111105    }
    10121106
    1013     --msglen;
    1014     ret = 0;
    10151107    switch( id )
    10161108    {
     
    10811173        }
    10821174
    1083         case BT_PIECE: {
    1084             uint8_t * block;
    1085             struct peer_request req;
    1086             tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
    1087             tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
    1088             req.length = msglen - 8;
    1089             block = tr_new( uint8_t, req.length );
    1090             tr_peerIoReadBytes( msgs->io, inbuf, block, req.length );
    1091             dbgmsg( msgs, "got a Block %u:%u->%u", req.index, req.offset, req.length );
    1092             ret = clientGotBlock( msgs, block, &req );
    1093             tr_free( block );
     1175        case BT_PIECE:
     1176            assert( 0 ); /* should be handled elsewhere! */
    10941177            break;
    1095         }
    10961178       
    10971179        case BT_PORT:
     
    11471229            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
    11481230            tr_peerIoDrain( msgs->io, inbuf, msglen );
    1149 assert( 0 );
    11501231            break;
    11511232    }
     
    11531234    dbgmsg( msgs, "startBufLen was %d, msglen was %d, current inbuf len is %d", (int)startBufLen, (int)(msglen+1), (int)EVBUFFER_LENGTH(inbuf) );
    11541235
    1155     if( ret == (int)TR_ERROR_ASSERT )
    1156     {
    1157         fireGotAssertError( msgs );
    1158         return READ_DONE;
    1159     }
    1160     else if( ret == TR_OK )
    1161     {
    1162         assert( msglen + 1 == msgs->incomingMessageLength );
    1163         assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msgs->incomingMessageLength );
    1164 
    1165         msgs->incomingMessageLength = -1;
    1166         msgs->state = AWAITING_BT_LENGTH;
    1167         return READ_AGAIN;
    1168     }
    1169     else
    1170     {
    1171         fireGotError( msgs );
    1172         return READ_DONE;
    1173     }
     1236    assert( msglen + 1 == msgs->incoming.length );
     1237    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
     1238
     1239    msgs->state = AWAITING_BT_LENGTH;
     1240    return READ_AGAIN;
    11741241}
    11751242
     
    13411408    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
    13421409
     1410dbgmsg( msgs, "canRead, state is %d", msgs->state );
    13431411    if( !canDownload( msgs ) )
    13441412    {
     1413dbgmsg( msgs, "oh, but canDownload failed" );
    13451414        msgs->notListening = 1;
    1346         tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
    13471415        ret = READ_DONE;
    13481416    }
     
    13501418    {
    13511419        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
     1420        case AWAITING_BT_ID:      ret = readBtId      ( msgs, inbuf ); break;
    13521421        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
     1422        case AWAITING_BT_PIECE:   ret = readBtPiece   ( msgs, inbuf ); break;
    13531423        default: assert( 0 );
    13541424    }
     
    14311501    {
    14321502        msgs->notListening = 0;
    1433         tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
     1503        tr_peerIoTryRead( msgs->io );
    14341504    }
    14351505
     
    16901760    m->info->peerIsInterested = 0;
    16911761    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
     1762    m->state = AWAITING_BT_LENGTH;
    16921763    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
    16931764    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
    16941765    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
    16951766    m->outMessages = evbuffer_new( );
     1767    m->incoming.block = evbuffer_new( );
    16961768    m->outBlock = evbuffer_new( );
    16971769    m->peerAllowedPieces = NULL;
     
    17191791    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
    17201792    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
    1721     tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
    17221793    ratePulse( m );
    17231794
     
    17631834        tr_bitfieldFree( msgs->clientAllowedPieces );
    17641835        tr_bitfieldFree( msgs->clientSuggestedPieces );
     1836        evbuffer_free( msgs->incoming.block );
    17651837        evbuffer_free( msgs->outMessages );
    17661838        evbuffer_free( msgs->outBlock );
Note: See TracChangeset for help on using the changeset viewer.