Changeset 3065


Ignore:
Timestamp:
Sep 15, 2007, 12:39:55 AM (16 years ago)
Author:
charles
Message:

handle peers sending us corrupted blocks. better handling and broadcasting of `have' messages.

Location:
branches/encryption/libtransmission
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • branches/encryption/libtransmission/handshake.c

    r3061 r3065  
    625625    return READ_AGAIN;
    626626}
     627
     628static void
     629gotError( struct bufferevent * evbuf UNUSED, short what UNUSED, void * arg );
    627630
    628631/*ccc*/
     
    676679    fprintf( stderr, "pstrlen is [%s]\n", pstr );
    677680    bytesRead += pstrlen;
    678     assert( !strcmp( (char*)pstr, "BitTorrent protocol" ) );
     681    fprintf( stderr, "%*.*s", pstrlen, pstrlen, (char*)pstr );
     682    if( strcmp( (char*)pstr, "BitTorrent protocol" ) ) {
     683        tr_free( pstr );
     684        gotError( NULL, 0, handshake );
     685        return READ_DONE;
     686    }
    679687    tr_free( pstr );
    680688
     
    796804
    797805static void
    798 gotError( struct bufferevent * evbuf UNUSED, short what, void * arg )
     806gotError( struct bufferevent * evbuf UNUSED, short what UNUSED, void * arg )
    799807{
    800808    tr_handshake * handshake = (tr_handshake *) arg;
    801 fprintf( stderr, "handshake %p: got error [%s]; what==%hd... state was [%s]\n", handshake, strerror(errno), what, getStateName(handshake->state) );
    802809
    803810    /* if the error happened while we were sending a public key, we might
  • branches/encryption/libtransmission/inout.c

    r3025 r3065  
    252252
    253253int
    254 tr_ioHash( tr_io * io, int pieceIndex )
     254tr_ioHash( tr_torrent * tor, int pieceIndex )
    255255{
    256256    int ret;
    257     tr_torrent * tor = io->tor;
    258257    const int success = !checkPiece( tor, pieceIndex );
    259258
  • branches/encryption/libtransmission/inout.h

    r3017 r3065  
    4141int tr_ioWrite ( struct tr_torrent *, int index, int begin, int len, uint8_t * );
    4242
    43 /***********************************************************************
    44  * tr_ioHash
    45  ***********************************************************************
    46  * Hashes the specified piece and updates the completion accordingly.
    47  **********************************************************************/
    48 int tr_ioHash ( tr_io *, int piece );
     43/* hashes the specified piece and updates the completion accordingly. */
     44int tr_ioHash ( tr_torrent*, int piece );
    4945
    5046/* close all the files associated with this torrent*/
  • branches/encryption/libtransmission/peer-mgr.c

    r3062 r3065  
    3232#define RECHOKE_PERIOD_SECONDS (15 * 1000)
    3333
    34 /* how many downloaders to unchoke per-torrent.
    35  * http://wiki.theory.org/BitTorrentSpecification#Choking_and_Optimistic_Unchoking */
    36 #define NUM_DOWNLOADERS_TO_UNCHOKE 6
     34/* how many peers to unchoke per-torrent. */
     35/* FIXME: make this user-configurable */
     36#define NUM_UNCHOKED_PEERS_PER_TORRENT 8
    3737
    3838/**
     
    400400                const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
    401401                if( val==TR_ADDREQ_FULL || val==TR_ADDREQ_CLIENT_CHOKED ) {
    402                     fprintf( stderr, "peer %p (of %d) is full\n", peers[j]->msgs, size );
     402                    fprintf( stderr, "peer %p (of %d) is full or is choking us\n", peers[j]->msgs, size );
    403403                    peers[j] = peers[--size];
    404404                }
     
    409409                    fprintf( stderr, "peer %p took the request for block %d\n", peers[j]->msgs, b );
    410410                    incrementReqCount( &t->blocks[i] );
    411                     j = size;
     411                    break;
    412412                }
    413413            }
     
    430430{
    431431    if( t->refillTimer == NULL ) {
    432         t->refillTimer = tr_timerNew( t->manager->handle, refillPulse, t, 750 );
     432        t->refillTimer = tr_timerNew( t->manager->handle, refillPulse, t, 250 );
    433433        fprintf( stderr, "timer REFILL %p is new\n", t->refillTimer );
    434434    }
     
    436436
    437437static void
    438 broadcastHave( Torrent * t, uint32_t index )
     438broadcastClientHave( Torrent * t, uint32_t index )
    439439{
    440440    int i, size;
     
    463463    switch( e->eventType )
    464464    {
    465         case TR_PEERMSG_GOT_BITFIELD: {
     465        case TR_PEERMSG_PEER_BITFIELD: {
    466466            if( t->blocks!=NULL ) {
    467467                int i;
     
    479479        }
    480480
    481         case TR_PEERMSG_GOT_HAVE: {
    482             const uint32_t begin = tr_torPieceFirstBlock( t->tor, e->pieceIndex );
    483             const uint32_t end = begin + tr_torPieceCountBlocks( t->tor, (int)e->pieceIndex );
    484             uint32_t i;
    485             for( i=begin; t->blocks!=NULL && i<end; ++i ) {
    486                 assert( t->blocks[i].block == i );
    487                 incrementScarcity( &t->blocks[i] );
     481        case TR_PEERMSG_CLIENT_HAVE:
     482            broadcastClientHave( t, e->pieceIndex );
     483            break;
     484
     485        case TR_PEERMSG_PEER_HAVE: {
     486            if( t->blocks != NULL ) {
     487                uint32_t i;
     488                const uint32_t begin = tr_torPieceFirstBlock( t->tor, e->pieceIndex );
     489                const uint32_t end = begin + tr_torPieceCountBlocks( t->tor, (int)e->pieceIndex );
     490                for( i=begin; i<end; ++i ) {
     491                    assert( t->blocks[i].block == i );
     492                    incrementScarcity( &t->blocks[i] );
     493                }
    488494            }
    489             broadcastHave( t, e->pieceIndex );
    490495            break;
    491496        }
    492497
    493         case TR_PEERMSG_GOT_BLOCK: {
     498        case TR_PEERMSG_CLIENT_BLOCK: {
    494499            if( t->blocks != NULL ) {
    495500                const uint32_t i = _tr_block( t->tor, e->pieceIndex, e->offset );
     
    697702                    int              success UNUSED )
    698703{
    699     assert( 0 );
     704    fprintf( stderr, "FIXME: tr_peerMgrSetBlame\n" );
    700705}
    701706
     
    941946    tr_peer * peer;
    942947    float rate;
    943     int isInterested;
     948    int randomKey;
    944949}
    945950ChokeData;
    946951
    947952static int
    948 compareChokeByRate( const void * va, const void * vb )
     953compareChoke( const void * va, const void * vb )
    949954{
    950955    const ChokeData * a = ( const ChokeData * ) va;
     
    952957    if( a->rate > b->rate ) return -1;
    953958    if( a->rate < b->rate ) return 1;
    954     return 0;
    955 }
    956 
    957 static int
    958 compareChokeByDownloader( const void * va, const void * vb )
    959 {
    960     const ChokeData * a = ( const ChokeData * ) va;
    961     const ChokeData * b = ( const ChokeData * ) vb;
    962 
    963     /* primary key: interest */
    964     if(  a->isInterested && !b->isInterested ) return -1;
    965     if( !a->isInterested &&  b->isInterested ) return 1;
    966 
    967     /* second key: rate */
    968     return compareChokeByRate( va, vb );
     959    return a->randomKey - b->randomKey;
    969960}
    970961
     
    975966    int i, size, unchoked;
    976967    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
    977     float bestDownloaderRate;
    978968    ChokeData * data;
    979969    tr_peer ** peers = getConnectedPeers( t, &size );
    980970    const time_t now = time( NULL );
     971    int optimistic = FALSE;
    981972
    982973fprintf( stderr, "[%s] rechoking torrent %p, with %d peers\n", ctime(&now), t, size );
     
    990981    for( i=0; i<size; ++i ) {
    991982        data[i].peer = peers[i];
    992         data[i].isInterested = peers[i]->peerIsInterested;
     983        data[i].randomKey = tr_rand( INT_MAX );
    993984        data[i].rate = done ? tr_peerIoGetRateToPeer( peers[i]->io )
    994985                            : tr_peerIoGetRateToClient( peers[i]->io );
    995986    }
    996987
    997     /* find the best downloaders and unchoke them */
    998     qsort( data, size, sizeof(ChokeData), compareChokeByDownloader );
    999     bestDownloaderRate = data[0].rate;
    1000     for( i=unchoked=0; i<size && unchoked<NUM_DOWNLOADERS_TO_UNCHOKE; ++i ) {
     988    /* find the best peers and unchoke them */
     989    qsort( data, size, sizeof(ChokeData), compareChoke );
     990    for( i=unchoked=0; i<size && unchoked<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
    1001991        tr_peerMsgsSetChoke( data[i].peer->msgs, FALSE );
    1002992        ++unchoked;
     
    1005995    size -= i;
    1006996
    1007     /* of those remaining, unchoke those that are faster than the downloaders */
    1008     qsort( data, size, sizeof(ChokeData), compareChokeByRate );
    1009     for( i=0; i<size && data[i].rate >= bestDownloaderRate; ++i )
    1010         tr_peerMsgsSetChoke( data[i].peer->msgs, FALSE );
    1011     memmove( data, data+i, sizeof(ChokeData)*(size-i) );
    1012     size -= i;
    1013 
    1014     /* of those remaining, optimistically unchoke one; choke the rest */
    1015     if( size > 0 ) {
    1016         const int optimistic = tr_rand( size );
    1017         for( i=0; i<size; ++i )
    1018             tr_peerMsgsSetChoke( data[i].peer->msgs, i!=optimistic );
     997    /* of those remaining, optimistically the first interested one
     998       (the were randomized in qsort already) and choke the rest */
     999    for( i=0; i<size; ++i ) {
     1000        if( optimistic || !data[i].peer->peerIsInterested )
     1001            tr_peerMsgsSetChoke( data[i].peer->msgs, TRUE );
     1002        else {
     1003            optimistic = TRUE;
     1004            tr_peerMsgsSetChoke( data[i].peer->msgs, FALSE );
     1005        }
    10191006    }
    10201007
  • branches/encryption/libtransmission/peer-msgs.c

    r3061 r3065  
    4646#define PEER_PULSE_INTERVAL_MSEC 50
    4747
    48 /* the most requests we'll batch up for this peer */
    49 #define OUT_REQUESTS_MAX 5
    50 
    51 /* when we get down to this many requests, we ask the manager for more */
    52 #define OUT_REQUESTS_LOW 3
    53 
    5448enum
    5549{
     
    8175    uint32_t offset;
    8276    uint32_t length;
     77    time_t time_requested;
    8378};
    8479
     
    154149{
    155150    tr_peermsgs_event e = blankEvent;
    156     e.eventType = TR_PEERMSG_GOT_BITFIELD;
     151    e.eventType = TR_PEERMSG_PEER_BITFIELD;
    157152    e.bitfield = bitfield;
    158153    tr_publisherPublish( peer->publisher, peer, &e );
     
    160155
    161156static void
    162 fireGotHave( tr_peermsgs * peer, uint32_t pieceIndex )
     157fireHave( tr_peermsgs * msgs, int isClient, uint32_t pieceIndex )
    163158{
    164159    tr_peermsgs_event e = blankEvent;
    165     e.eventType = TR_PEERMSG_GOT_HAVE;
     160    e.eventType = isClient ? TR_PEERMSG_CLIENT_HAVE : TR_PEERMSG_PEER_HAVE;
    166161    e.pieceIndex = pieceIndex;
    167     tr_publisherPublish( peer->publisher, peer, &e );
     162    tr_publisherPublish( msgs->publisher, msgs, &e );
     163}
     164
     165static void
     166fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
     167{
     168    fireHave( msgs, TRUE, pieceIndex );
     169}
     170
     171static void
     172firePeerHave( tr_peermsgs * msgs, uint32_t pieceIndex )
     173{
     174    fireHave( msgs, FALSE, pieceIndex );
    168175}
    169176
     
    172179{
    173180    tr_peermsgs_event e = blankEvent;
    174     e.eventType = TR_PEERMSG_GOT_BLOCK;
     181    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
    175182    e.pieceIndex = pieceIndex;
    176183    e.offset = offset;
     
    236243    const uint8_t bt_msgid = weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED;
    237244
    238     fprintf( stderr, "peer %p: enqueueing an %s message\n", peer, (weAreInterested ? "interested" : "not interested") );
     245    fprintf( stderr, "peer %p: sending an %s message\n", peer, (weAreInterested ? "INTERESTED" : "NOT_INTERESTED") );
    239246    tr_peerIoWriteUint32( peer->io, peer->outMessages, len );
    240247    tr_peerIoWriteBytes( peer->io, peer->outMessages, &bt_msgid, 1 );
     
    332339
    333340int
    334 tr_peerMsgsAddRequest( tr_peermsgs * peer,
     341tr_peerMsgsAddRequest( tr_peermsgs * msgs,
    335342                       uint32_t      index,
    336343                       uint32_t      offset,
     
    340347    const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
    341348    struct peer_request * req;
    342 
    343     if( peer->info->clientIsChoked )
     349    int maxSize;
     350
     351    if( msgs->info->clientIsChoked )
    344352        return TR_ADDREQ_CLIENT_CHOKED;
    345353
    346     if( !tr_bitfieldHas( peer->info->have, index ) )
     354    if( !tr_bitfieldHas( msgs->info->have, index ) )
    347355        return TR_ADDREQ_MISSING;
    348356
    349     if( tr_list_size( peer->clientAskedFor) >= OUT_REQUESTS_MAX )
     357    maxSize = 2 + (int)(tr_peerIoGetRateToClient(msgs->io)/10);
     358    if( maxSize > 100 )
     359        maxSize = 100;
     360    fprintf( stderr, "peer %p has a max request queue size of %d\n", msgs, maxSize );
     361    if( tr_list_size( msgs->clientAskedFor) >= maxSize )
    350362        return TR_ADDREQ_FULL;
    351363
    352364    /* queue the request */
    353     tr_peerIoWriteUint32( peer->io, peer->outMessages, len );
    354     tr_peerIoWriteBytes( peer->io, peer->outMessages, &bt_msgid, 1 );
    355     tr_peerIoWriteUint32( peer->io, peer->outMessages, index );
    356     tr_peerIoWriteUint32( peer->io, peer->outMessages, offset );
    357     tr_peerIoWriteUint32( peer->io, peer->outMessages, length );
     365    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
     366    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
     367    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, index );
     368    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
     369    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
    358370    fprintf( stderr, "w00t peer %p: requesting a block from piece %u, offset %u, length %u\n",
    359              peer, (unsigned int)index, (unsigned int)offset, (unsigned int)length );
     371             msgs, (unsigned int)index, (unsigned int)offset, (unsigned int)length );
    360372
    361373    /* add it to our `requests sent' list */
     
    364376    req->offset = offset;
    365377    req->length = length;
    366     tr_list_append( &peer->clientAskedFor, req );
    367     fprintf( stderr, "w00t added a request; peer %p's clientAskedFor.size() is now %d\n", peer, tr_list_size(peer->clientAskedFor));
     378    req->time_requested = time( NULL );
     379    tr_list_append( &msgs->clientAskedFor, req );
     380    fprintf( stderr, "w00t added a request; peer %p's clientAskedFor.size() is now %d\n", msgs, tr_list_size(msgs->clientAskedFor));
    368381
    369382    return TR_ADDREQ_OK;
     
    504517
    505518static int
    506 readBtMessage( tr_peermsgs * peer, struct evbuffer * inbuf )
     519readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
    507520{
    508521    uint8_t id;
    509522    uint32_t ui32;
    510     uint32_t msglen = peer->incomingMessageLength;
     523    uint32_t msglen = msgs->incomingMessageLength;
    511524
    512525    if( EVBUFFER_LENGTH(inbuf) < msglen )
    513526        return READ_MORE;
    514527
    515     tr_peerIoReadBytes( peer->io, inbuf, &id, 1 );
     528    tr_peerIoReadBytes( msgs->io, inbuf, &id, 1 );
    516529    msglen--;
    517530    fprintf( stderr, "got a message from the peer... "
     
    522535        case BT_CHOKE:
    523536            assert( msglen == 0 );
    524             fprintf( stderr, "got a BT_CHOKE\n" );
    525             peer->info->clientIsChoked = 1;
    526             tr_list_foreach( peer->peerAskedFor, tr_free );
    527             tr_list_free( &peer->peerAskedFor );
     537            fprintf( stderr, "peer-msgs %p sent us a BT_CHOKE\n", msgs );
     538            msgs->info->clientIsChoked = 1;
     539            tr_list_foreach( msgs->peerAskedFor, tr_free );
     540            tr_list_free( &msgs->peerAskedFor );
    528541            /* FIXME: maybe choke them */
    529542            /* FIXME: unmark anything we'd requested from them... */
     
    532545        case BT_UNCHOKE:
    533546            assert( msglen == 0 );
    534             fprintf( stderr, "got a BT_UNCHOKE\n" );
    535             peer->info->clientIsChoked = 0;
     547            fprintf( stderr, "peer-msgs %p sent us a BT_UNCHOKE\n", msgs );
     548            msgs->info->clientIsChoked = 0;
    536549            /* FIXME: maybe unchoke them */
    537550            /* FIXME: maybe send them requests */
     
    540553        case BT_INTERESTED:
    541554            assert( msglen == 0 );
    542             fprintf( stderr, "got a BT_INTERESTED\n" );
    543             peer->info->peerIsInterested = 1;
     555            fprintf( stderr, "peer-msgs %p sent us a BT_INTERESTED\n", msgs );
     556            msgs->info->peerIsInterested = 1;
    544557            /* FIXME: maybe unchoke them */
    545558            break;
     
    547560        case BT_NOT_INTERESTED:
    548561            assert( msglen == 0 );
    549             fprintf( stderr, "got a BT_NOT_INTERESTED\n" );
    550             peer->info->peerIsInterested = 0;
     562            fprintf( stderr, "peer-msgs %p sent us a BT_NOT_INTERESTED\n", msgs );
     563            msgs->info->peerIsInterested = 0;
    551564            /* FIXME: maybe choke them */
    552565            break;
     
    554567        case BT_HAVE:
    555568            assert( msglen == 4 );
    556             fprintf( stderr, "got a BT_HAVE\n" );
    557             tr_peerIoReadUint32( peer->io, inbuf, &ui32 );
    558             tr_bitfieldAdd( peer->info->have, ui32 );
    559             peer->info->progress = tr_bitfieldCountTrueBits( peer->info->have ) / (float)peer->torrent->info.pieceCount;
    560             fprintf( stderr, "after the HAVE message, peer progress is %f\n", peer->info->progress );
    561             updateInterest( peer );
    562             fireGotHave( peer, ui32 );
     569            fprintf( stderr, "peer-msgs %p sent us a BT_HAVE\n", msgs );
     570            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
     571            tr_bitfieldAdd( msgs->info->have, ui32 );
     572            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
     573            fprintf( stderr, "after the HAVE message, peer progress is %f\n", msgs->info->progress );
     574            updateInterest( msgs );
     575            firePeerHave( msgs, ui32 );
    563576            break;
    564577
    565578        case BT_BITFIELD:
    566             assert( msglen == peer->info->have->len );
    567             fprintf( stderr, "got a BT_BITFIELD\n" );
    568             tr_peerIoReadBytes( peer->io, inbuf, peer->info->have->bits, msglen );
    569             peer->info->progress = tr_bitfieldCountTrueBits( peer->info->have ) / (float)peer->torrent->info.pieceCount;
    570             fprintf( stderr, "after the BITFIELD peer progress is %f\n", peer->info->progress );
    571             fireGotBitfield( peer, peer->info->have );
    572             updateInterest( peer );
     579            assert( msglen == msgs->info->have->len );
     580            fprintf( stderr, "peer-msgs %p sent us a BT_BITFIELD\n", msgs );
     581            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
     582            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
     583            fprintf( stderr, "after the BITFIELD peer progress is %f\n", msgs->info->progress );
     584            fireGotBitfield( msgs, msgs->info->have );
     585            updateInterest( msgs );
    573586            /* FIXME: maybe unchoke */
    574587            break;
     
    579592            fprintf( stderr, "got a BT_REQUEST\n" );
    580593            req = tr_new( struct peer_request, 1 );
    581             tr_peerIoReadUint32( peer->io, inbuf, &req->index );
    582             tr_peerIoReadUint32( peer->io, inbuf, &req->offset );
    583             tr_peerIoReadUint32( peer->io, inbuf, &req->length );
    584             if( !peer->info->peerIsChoked )
    585                 tr_list_prepend( &peer->peerAskedFor, req );
     594            tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
     595            tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
     596            tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
     597            if( !msgs->info->peerIsChoked )
     598                tr_list_prepend( &msgs->peerAskedFor, req );
    586599            break;
    587600        }
     
    592605            assert( msglen == 12 );
    593606            fprintf( stderr, "got a BT_CANCEL\n" );
    594             tr_peerIoReadUint32( peer->io, inbuf, &req.index );
    595             tr_peerIoReadUint32( peer->io, inbuf, &req.offset );
    596             tr_peerIoReadUint32( peer->io, inbuf, &req.length );
    597             node = tr_list_find( peer->peerAskedFor, &req, peer_request_compare );
     607            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
     608            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
     609            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
     610            node = tr_list_find( msgs->peerAskedFor, &req, peer_request_compare );
    598611            if( node != NULL ) {
    599612                fprintf( stderr, "found the req that peer is cancelling... cancelled.\n" );
    600                 tr_list_remove_data( &peer->peerAskedFor, node->data );
     613                tr_list_remove_data( &msgs->peerAskedFor, node->data );
    601614            }
    602615            break;
     
    604617
    605618        case BT_PIECE: {
    606             fprintf( stderr, "got a BT_PIECE\n" );
    607             assert( peer->blockToUs.length == 0 );
    608             peer->state = READING_BT_PIECE;
    609             tr_peerIoReadUint32( peer->io, inbuf, &peer->blockToUs.index );
    610             tr_peerIoReadUint32( peer->io, inbuf, &peer->blockToUs.offset );
    611             peer->blockToUs.length = msglen - 8;
    612 fprintf( stderr, "left to read is [%"PRIu64"]\n", (uint64_t)peer->blockToUs.length );
    613             assert( peer->blockToUs.length > 0 );
    614             evbuffer_drain( peer->inBlock, ~0 );
     619            fprintf( stderr, "peer-msgs %p sent us a BT_PIECE\n", msgs );
     620            assert( msgs->blockToUs.length == 0 );
     621            msgs->state = READING_BT_PIECE;
     622            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
     623            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
     624            msgs->blockToUs.length = msglen - 8;
     625            assert( msgs->blockToUs.length > 0 );
     626            evbuffer_drain( msgs->inBlock, ~0 );
    615627            return READ_AGAIN;
    616628            break;
     
    619631        case BT_PORT: {
    620632            assert( msglen == 2 );
    621             fprintf( stderr, "got a BT_PORT\n" );
    622             tr_peerIoReadUint16( peer->io, inbuf, &peer->listeningPort );
     633            fprintf( stderr, "peer-msgs %p sent us a BT_PORT\n", msgs );
     634            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->listeningPort );
    623635            break;
    624636        }
    625637
    626638        case BT_LTEP:
    627             fprintf( stderr, "got a BT_LTEP\n" );
    628             parseLtep( peer, msglen, inbuf );
     639            fprintf( stderr, "peer-msgs %p sent us a BT_LTEP\n", msgs );
     640            parseLtep( msgs, msglen, inbuf );
    629641            break;
    630642
    631643        default:
    632             fprintf( stderr, "got an unknown BT message type: %d\n", (int)id );
    633             tr_peerIoDrain( peer->io, inbuf, msglen );
     644            fprintf( stderr, "peer-msgs %p sent us an UNKNOWN: %d\n", msgs, (int)id );
     645            tr_peerIoDrain( msgs->io, inbuf, msglen );
    634646            assert( 0 );
    635647    }
    636648
    637     peer->incomingMessageLength = -1;
    638     peer->state = AWAITING_BT_LENGTH;
     649    msgs->incomingMessageLength = -1;
     650    msgs->state = AWAITING_BT_LENGTH;
    639651    return READ_AGAIN;
    640652}
    641653
     654static void
     655clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
     656{
     657    tr_torrent * tor = msgs->torrent;
     658    tor->downloadedCur += byteCount;
     659    tr_rcTransferred( tor->download, byteCount );
     660    tr_rcTransferred( tor->handle->download, byteCount );
     661}
     662
     663static void
     664peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
     665{
     666    tr_torrent * tor = msgs->torrent;
     667    tor->uploadedCur += byteCount;
     668    tr_rcTransferred( tor->upload, byteCount );
     669    tr_rcTransferred( tor->upload, byteCount );
     670}
     671
    642672static int
    643 canDownload( const tr_peermsgs * peer UNUSED )
     673canDownload( const tr_peermsgs * msgs UNUSED )
    644674{
    645675#if 0
    646     tr_torrent * tor = peer->torrent;
     676    tr_torrent * tor = msgs->torrent;
    647677
    648678    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
     
    657687
    658688static void
    659 gotBlock( tr_peermsgs * peer, int index, int offset, struct evbuffer * inbuf )
    660 {
    661     tr_torrent * tor = peer->torrent;
    662     const size_t length = EVBUFFER_LENGTH( inbuf );
     689reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
     690{
     691    tr_torrent * tor = msgs->torrent;
     692
     693    /* increment the `corrupt' field */
     694    tor->corruptCur += byteCount;
     695
     696    /* decrement the `downloaded' field */
     697    if( tor->downloadedCur >= byteCount )
     698        tor->downloadedCur -= byteCount;
     699    else
     700        tor->downloadedCur = 0;
     701}
     702
     703
     704static void
     705gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
     706{
     707    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
     708    reassignBytesToCorrupt( msgs, byteCount );
     709}
     710
     711static void
     712gotUnwantedBlock( tr_peermsgs * msgs, uint32_t index UNUSED, uint32_t offset UNUSED, uint32_t length )
     713{
     714    reassignBytesToCorrupt( msgs, length );
     715}
     716
     717static void
     718addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
     719{
     720    if( !msgs->info->blame )
     721         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
     722    tr_bitfieldAdd( msgs->info->blame, index );
     723}
     724
     725static void
     726gotBlock( tr_peermsgs      * msgs,
     727          struct evbuffer  * inbuf,
     728          uint32_t           index,
     729          uint32_t           offset,
     730          uint32_t           length )
     731{
     732    tr_torrent * tor = msgs->torrent;
    663733    const int block = _tr_block( tor, index, offset );
    664734    struct peer_request key, *req;
    665735
    666     /* remove it from our `we asked for this' list */
     736    /**
     737    *** Remove the block from our `we asked for this' list
     738    **/
     739
    667740    key.index = index;
    668741    key.offset = offset;
    669742    key.length = length;
    670     req = (struct peer_request*) tr_list_remove( &peer->clientAskedFor, &key,
     743    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
    671744                                                 peer_request_compare );
     745    fprintf( stderr, "w00t got a block from %p. turnaround time for this block was %d seconds\n",
     746                     msgs, (int)(time(NULL) - req->time_requested) );
    672747    if( req == NULL ) {
    673 fprintf( stderr, "we didn't ask for this message...\n" );
     748        gotUnwantedBlock( msgs, index, offset, length );
     749        fprintf( stderr, "we didn't ask for this message...\n" );
    674750        tr_dbg( "we didn't ask the peer for this message..." );
    675751        return;
    676752    }
    677753    tr_free( req );
    678     fprintf( stderr, "peer %p now has %d block requests in its outbox\n", peer, tr_list_size(peer->clientAskedFor));
    679 
    680     /* sanity clause */
     754    fprintf( stderr, "peer %p now has %d block requests in its outbox\n",
     755             msgs, tr_list_size(msgs->clientAskedFor));
     756
     757    /**
     758    *** Error checks
     759    **/
     760
    681761    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
    682 fprintf( stderr, "have this block already...\n" );
     762        fprintf( stderr, "have this block already...\n" );
    683763        tr_dbg( "have this block already..." );
     764        gotUnwantedBlock( msgs, index, offset, length );
    684765        return;
    685766    }
     767
    686768    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
    687 fprintf( stderr, "block is the wrong length... expected %d and got %d\n", (int)length, (int)tr_torBlockCountBytes(tor,block) );
     769        fprintf( stderr, "block is the wrong length..." );
    688770        tr_dbg( "block is the wrong length..." );
     771        gotUnwantedBlock( msgs, index, offset, length );
    689772        return;
    690773    }
    691774
    692     fireGotBlock( peer, index, offset, length );
    693 
    694     /* write to disk */
    695     if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf )))
     775    /**
     776    ***  Write the block
     777    **/
     778
     779    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf ))) {
    696780        return;
    697 
    698     /* make a note that this peer helped us with this piece */
    699     if( !peer->info->blame )
    700          peer->info->blame = tr_bitfieldNew( tor->info.pieceCount );
    701     tr_bitfieldAdd( peer->info->blame, index );
     781    }
    702782
    703783    tr_cpBlockAdd( tor->completion, block );
    704784
    705     tor->downloadedCur += length;
    706     tr_rcTransferred( tor->download, length );
    707     tr_rcTransferred( tor->handle->download, length );
     785    addUsToBlamefield( msgs, index );
     786
     787    fireGotBlock( msgs, index, offset, length );
     788
     789    fireBlocksRunningLow( msgs );
     790
     791    /**
     792    ***  Handle if this was the last block in the piece
     793    **/
     794
     795    if( tr_cpPieceIsComplete( tor->completion, index ) )
     796    {
     797        if( !tr_ioHash( tor, index ) )
     798        {
     799            gotBadPiece( msgs, index );
     800            return;
     801        }
     802
     803        fireClientHave( msgs, index );
     804    }
    708805}
    709806
    710807
    711808static ReadState
    712 readBtPiece( tr_peermsgs * peer, struct evbuffer * inbuf )
    713 {
    714     assert( peer->blockToUs.length > 0 );
    715 
    716     if( !canDownload( peer ) )
    717     {
    718         peer->notListening = 1;
    719         tr_peerIoSetIOMode ( peer->io, 0, EV_READ );
     809readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
     810{
     811    assert( msgs->blockToUs.length > 0 );
     812
     813    if( !canDownload( msgs ) )
     814    {
     815        msgs->notListening = 1;
     816        tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
    720817        return READ_DONE;
    721818    }
     
    723820    {
    724821        /* inbuf ->  inBlock */
    725         const uint32_t len = MIN( EVBUFFER_LENGTH(inbuf), peer->blockToUs.length );
     822        const uint32_t len = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
    726823        uint8_t * tmp = tr_new( uint8_t, len );
    727         tr_peerIoReadBytes( peer->io, inbuf, tmp, len );
    728         evbuffer_add( peer->inBlock, tmp, len );
     824        tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
     825        evbuffer_add( msgs->inBlock, tmp, len );
    729826        tr_free( tmp );
    730         peer->blockToUs.length -= len;
    731 fprintf( stderr, "got %"PRIu64"; left to read is [%"PRIu64"]\n", (uint64_t)len, (uint64_t)peer->blockToUs.length );
    732 
    733 
    734         if( !peer->blockToUs.length )
     827        msgs->blockToUs.length -= len;
     828        clientGotBytes( msgs, len );
     829fprintf( stderr, "got %"PRIu64"; left to read is [%"PRIu64"]\n", (uint64_t)len, (uint64_t)msgs->blockToUs.length );
     830
     831        if( !msgs->blockToUs.length )
    735832        {
    736 fprintf( stderr, "w00t -- index %u, offset %u\n", peer->blockToUs.index, peer->blockToUs.offset );
    737             gotBlock( peer, peer->blockToUs.index,
    738                             peer->blockToUs.offset,
    739                             peer->inBlock );
    740             evbuffer_drain( peer->outBlock, ~0 );
    741             peer->state = AWAITING_BT_LENGTH;
     833fprintf( stderr, "w00t -- index %u, offset %u\n", msgs->blockToUs.index, msgs->blockToUs.offset );
     834            gotBlock( msgs, msgs->inBlock,
     835                            msgs->blockToUs.index,
     836                            msgs->blockToUs.offset,
     837                            EVBUFFER_LENGTH( msgs->inBlock ) );
     838            evbuffer_drain( msgs->inBlock, ~0 );
     839            msgs->state = AWAITING_BT_LENGTH;
    742840        }
    743841
     
    795893        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
    796894    }
    797 
    798     /* if we're running low on requests, ask for ones */
    799     if( tr_list_size(msgs->clientAskedFor) <= OUT_REQUESTS_LOW )
    800         fireBlocksRunningLow( msgs );
    801895
    802896    if(( len = EVBUFFER_LENGTH( msgs->outBlock ) ))
     
    807901            tr_peerIoWrite( msgs->io, EVBUFFER_DATA(msgs->outBlock), outlen );
    808902            evbuffer_drain( msgs->outBlock, outlen );
    809 
    810             msgs->torrent->uploadedCur += outlen;
    811             tr_rcTransferred( msgs->torrent->upload, outlen );
    812             tr_rcTransferred( msgs->handle->upload, outlen );
    813 
     903            peerGotBytes( msgs, outlen );
    814904            len -= outlen;
    815905        }
     
    834924        tr_free( tmp );
    835925    }
     926
     927    fireBlocksRunningLow( msgs );
    836928
    837929    return TRUE; /* loop forever */
     
    9861078        /* write the pex message */
    9871079        benc = tr_bencSaveMalloc( &val, &bencLen );
    988         tr_peerIoWriteUint32( peer->io, peer->outBlock, 1 + 1 + bencLen );
    989         tr_peerIoWriteBytes ( peer->io, peer->outBlock, &bt_msgid, 1 );
    990         tr_peerIoWriteBytes ( peer->io, peer->outBlock, &ltep_msgid, 1 );
    991         tr_peerIoWriteBytes ( peer->io, peer->outBlock, benc, bencLen );
     1080        tr_peerIoWriteUint32( peer->io, peer->outMessages, 1 + 1 + bencLen );
     1081        tr_peerIoWriteBytes ( peer->io, peer->outMessages, &bt_msgid, 1 );
     1082        tr_peerIoWriteBytes ( peer->io, peer->outMessages, &ltep_msgid, 1 );
     1083        tr_peerIoWriteBytes ( peer->io, peer->outMessages, benc, bencLen );
    9921084
    9931085        /* cleanup */
  • branches/encryption/libtransmission/peer-msgs.h

    r3061 r3065  
    5757typedef enum
    5858{
    59     TR_PEERMSG_GOT_BITFIELD,
    60     TR_PEERMSG_GOT_HAVE,
    61     TR_PEERMSG_GOT_BLOCK,
     59    TR_PEERMSG_PEER_BITFIELD,
     60    TR_PEERMSG_PEER_HAVE,
     61    TR_PEERMSG_CLIENT_HAVE,
     62    TR_PEERMSG_CLIENT_BLOCK,
    6263    TR_PEERMSG_GOT_PEX,
    6364    TR_PEERMSG_GOT_ERROR,
  • branches/encryption/libtransmission/shared.c

    r3061 r3065  
    8282    s->natpmp     = tr_natpmpInit();
    8383    s->upnp       = tr_upnpInit();
    84     s->pulseTimer   = tr_timerNew( h, SharedLoop, s, 200 );
     84    s->pulseTimer   = tr_timerNew( h, SharedLoop, s, 250 );
    8585
    8686    return s;
Note: See TracChangeset for help on using the changeset viewer.