Changeset 7125


Ignore:
Timestamp:
Nov 17, 2008, 4:00:57 AM (12 years ago)
Author:
charles
Message:

(libT) better possible fix for #1468: Speed display is very jumpy

Location:
trunk/libtransmission
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • trunk/libtransmission/handshake.c

    r6988 r7125  
    334334    /* send it */
    335335    setReadState( handshake, AWAITING_YB );
    336     tr_peerIoWriteBuf( handshake->io, outbuf );
     336    tr_peerIoWriteBuf( handshake->io, outbuf, FALSE );
    337337
    338338    /* cleanup */
     
    486486    tr_cryptoDecryptInit( handshake->crypto );
    487487    setReadState( handshake, AWAITING_VC );
    488     tr_peerIoWriteBuf( handshake->io, outbuf );
     488    tr_peerIoWriteBuf( handshake->io, outbuf, FALSE );
    489489
    490490    /* cleanup */
     
    712712        int       msgSize;
    713713        uint8_t * msg = buildHandshakeMessage( handshake, &msgSize );
    714         tr_peerIoWrite( handshake->io, msg, msgSize );
     714        tr_peerIoWrite( handshake->io, msg, msgSize, FALSE );
    715715        tr_free( msg );
    716716        handshake->haveSentBitTorrentHandshake = 1;
     
    780780
    781781    setReadState( handshake, AWAITING_PAD_A );
    782     tr_peerIoWrite( handshake->io, outbuf, walk - outbuf );
     782    tr_peerIoWrite( handshake->io, outbuf, walk - outbuf, FALSE );
    783783    return READ_NOW;
    784784}
     
    992992
    993993    /* send it out */
    994     tr_peerIoWriteBuf( handshake->io, outbuf );
     994    tr_peerIoWriteBuf( handshake->io, outbuf, FALSE );
    995995    evbuffer_free( outbuf );
    996996
     
    11571157        handshake->haveSentBitTorrentHandshake = 1;
    11581158        setReadState( handshake, AWAITING_HANDSHAKE );
    1159         tr_peerIoWrite( handshake->io, msg, msgSize );
     1159        tr_peerIoWrite( handshake->io, msg, msgSize, FALSE );
    11601160        tr_free( msg );
    11611161    }
     
    12041204        handshake->haveSentBitTorrentHandshake = 1;
    12051205        setReadState( handshake, AWAITING_HANDSHAKE );
    1206         tr_peerIoWrite( handshake->io, msg, msgSize );
     1206        tr_peerIoWrite( handshake->io, msg, msgSize, FALSE );
    12071207        tr_free( msg );
    12081208    }
  • trunk/libtransmission/peer-common.h

    r6876 r7125  
    5050{
    5151    PeerEventType    eventType;
    52     uint32_t         pieceIndex; /* for GOT_BLOCK, CANCEL */
    53     uint32_t         offset; /* for GOT_BLOCK */
    54     uint32_t         length; /* for GOT_BLOCK + GOT_DATA */
    55     float            progress; /* for TR_PEER_PEER_PROGRESS */
    56     int              err; /* errno for TR_PEER_GOT_ERROR */
     52    uint32_t         pieceIndex;   /* for GOT_BLOCK, CANCEL */
     53    uint32_t         offset;       /* for GOT_BLOCK */
     54    uint32_t         length;       /* for GOT_BLOCK + GOT_DATA */
     55    float            progress;     /* for PEER_PROGRESS */
     56    int              err;          /* errno for GOT_ERROR */
     57    int              wasPieceData; /* for GOT_DATA */
    5758}
    5859tr_peer_event;
  • trunk/libtransmission/peer-io.c

    r7069 r7125  
    2828#include "transmission.h"
    2929#include "crypto.h"
     30#include "list.h"
    3031#include "net.h"
    3132#include "peer-io.h"
     
    7778};
    7879
     80struct tr_datatype
     81{
     82    unsigned int    isPieceData : 1;
     83    size_t          length;
     84};
     85
    7986struct tr_peerIo
    8087{
     
    93100    time_t                 timeCreated;
    94101
    95     tr_session *          session;
     102    tr_session           * session;
    96103
    97104    struct in_addr         in_addr;
    98     struct bufferevent *   bufev;
    99     struct evbuffer *      output;
     105    struct bufferevent   * bufev;
     106    struct evbuffer      * output;
     107    tr_list              * output_datatypes; /* struct tr_datatype */
    100108
    101109    tr_can_read_cb         canRead;
     
    200208    if( len < io->bufferSize[TR_UP] )
    201209    {
    202         const size_t payload = io->bufferSize[TR_UP] - len;
    203         const size_t n = addPacketOverhead( payload );
    204         struct tr_bandwidth * b = &io->bandwidth[TR_UP];
    205         b->bytesLeft -= MIN( b->bytesLeft, (size_t)n );
    206         b->bytesUsed += n;
    207         tr_rcTransferred( io->session->rawSpeed[TR_UP], n );
    208         dbgmsg( io,
    209                 "wrote %zu bytes to peer... upload bytesLeft is now %zu",
    210                 n,
    211                 b->bytesLeft );
     210        size_t payload = io->bufferSize[TR_UP] - len;
     211
     212        while( payload )
     213        {
     214            struct tr_datatype * next = io->output_datatypes->data;
     215            const size_t chunk_length = MIN( next->length, payload );
     216            const size_t n = addPacketOverhead( chunk_length );
     217
     218            if( next->isPieceData )
     219            {
     220                struct tr_bandwidth * b = &io->bandwidth[TR_UP];
     221                b->bytesLeft -= MIN( b->bytesLeft, n );
     222                b->bytesUsed += n;
     223            }
     224
     225            if( io->didWrite )
     226                io->didWrite( io, n, next->isPieceData, io->userData );
     227
     228            payload -= chunk_length;
     229            next->length -= chunk_length;
     230            if( !next->length )
     231                tr_free( tr_list_pop_front( &io->output_datatypes ) );
     232        }
    212233    }
    213234
    214235    adjustOutputBuffer( io );
    215236
    216     if( io->didWrite )
    217         io->didWrite( e, io->userData );
    218237}
    219238
     
    238257        b->bytesLeft -= MIN( b->bytesLeft, (size_t)n );
    239258        b->bytesUsed += n;
    240         tr_rcTransferred( io->session->rawSpeed[TR_DOWN], n );
    241         dbgmsg( io,
    242                 "%zu new input bytes. bytesUsed is %zu, bytesLeft is %zu",
    243                 n, b->bytesUsed,
    244                 b->bytesLeft );
     259        dbgmsg( io, "%zu new input bytes. bytesUsed is %zu, bytesLeft is %zu", n, b->bytesUsed, b->bytesLeft );
    245260
    246261        adjustInputBuffer( io );
     
    387402    tr_netClose( io->socket );
    388403    tr_cryptoFree( io->crypto );
     404    tr_list_free( &io->output_datatypes, tr_free );
    389405    tr_free( io );
    390406}
     
    718734
    719735void
    720 tr_peerIoWrite( tr_peerIo *  io,
    721                 const void * writeme,
    722                 size_t       writemeLen )
    723 {
     736tr_peerIoWrite( tr_peerIo   * io,
     737                const void  * writeme,
     738                size_t        writemeLen,
     739                int           isPieceData )
     740{
     741    struct tr_datatype * datatype;
    724742    assert( tr_amInEventThread( io->session ) );
    725743    dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
     
    730748        evbuffer_add( io->output, writeme, writemeLen );
    731749
     750    datatype = tr_new( struct tr_datatype, 1 );
     751    datatype->isPieceData = isPieceData != 0;
     752    datatype->length = writemeLen;
     753    tr_list_append( &io->output_datatypes, datatype );
     754
    732755    adjustOutputBuffer( io );
    733756}
    734757
    735758void
    736 tr_peerIoWriteBuf( tr_peerIo *       io,
    737                    struct evbuffer * buf )
     759tr_peerIoWriteBuf( tr_peerIo         * io,
     760                   struct evbuffer   * buf,
     761                   int                 isPieceData )
    738762{
    739763    const size_t n = EVBUFFER_LENGTH( buf );
    740764
    741     tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n );
     765    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
    742766    evbuffer_drain( buf, n );
    743767}
  • trunk/libtransmission/peer-io.h

    r7055 r7125  
    2929**/
    3030
    31 tr_peerIo*           tr_peerIoNewOutgoing(
    32     struct tr_handle *     session,
    33     const struct in_addr * addr,
    34     int                    port,
    35     const  uint8_t *
    36                            torrentHash );
    37 
    38 tr_peerIo*           tr_peerIoNewIncoming( struct tr_handle *     session,
     31tr_peerIo*           tr_peerIoNewOutgoing( struct tr_handle     * session,
     32                                           const struct in_addr * addr,
     33                                           int                    port,
     34                                           const  uint8_t       * torrentHash );
     35
     36tr_peerIo*           tr_peerIoNewIncoming( struct tr_handle     * session,
    3937                                           const struct in_addr * addr,
    4038                                           uint16_t               port,
     
    109107ReadState;
    110108
    111 typedef ReadState ( *tr_can_read_cb )( struct bufferevent*, void* user_data );
    112 typedef void ( *tr_did_write_cb )( struct bufferevent *, void * );
    113 typedef void ( *tr_net_error_cb )( struct bufferevent *, short what, void * );
    114 
    115 void              tr_peerIoSetIOFuncs( tr_peerIo *     io,
    116                                        tr_can_read_cb  readcb,
    117                                        tr_did_write_cb writecb,
    118                                        tr_net_error_cb errcb,
    119                                        void *          user_data );
    120 
    121 int               tr_peerIoWantsBandwidth( const tr_peerIo * io,
    122                                                              tr_direction );
    123 
    124 #if 0
    125 void              tr_peerIoTryRead( tr_peerIo * io );
    126 
    127 #endif
    128 
    129 void              tr_peerIoWrite( tr_peerIo *  io,
    130                                   const void * writeme,
    131                                   size_t       writemeLen );
    132 
    133 void              tr_peerIoWriteBuf( tr_peerIo *       io,
    134                                      struct evbuffer * buf );
    135 
     109typedef ReadState ( *tr_can_read_cb  )( struct bufferevent   * ev,
     110                                        void                 * user_data );
     111
     112typedef void      ( *tr_did_write_cb )( tr_peerIo            * io,
     113                                        size_t                 bytesWritten,
     114                                        int                    wasPieceData,
     115                                        void                 * userData );
     116
     117typedef void      ( *tr_net_error_cb )( struct bufferevent  * ev,
     118                                        short                 what,
     119                                        void                * userData );
     120
     121void    tr_peerIoSetIOFuncs      ( tr_peerIo        * io,
     122                                   tr_can_read_cb     readcb,
     123                                   tr_did_write_cb    writecb,
     124                                   tr_net_error_cb    errcb,
     125                                   void             * user_data );
     126
     127/**
     128***
     129**/
     130
     131int     tr_peerIoWantsBandwidth ( const tr_peerIo   * io,
     132                                  tr_direction        direction );
     133
     134void    tr_peerIoWrite          ( tr_peerIo         * io,
     135                                  const void        * writeme,
     136                                  size_t              writemeLen,
     137                                  int                 isPieceData );
     138
     139void    tr_peerIoWriteBuf       ( tr_peerIo         * io,
     140                                  struct evbuffer   * buf,
     141                                  int                 isPieceData );
    136142
    137143/**
  • trunk/libtransmission/peer-mgr-private.h

    r7055 r7125  
    7171     * protocol overhead is NOT included; this is only the piece data */
    7272    struct tr_ratecontrol  * pieceSpeed[2];
     73
     74    /* the rate at which all data is being transferred between client and peer. */
     75    struct tr_ratecontrol  * rawSpeed[2];
    7376}
    7477tr_peer;
  • trunk/libtransmission/peer-mgr.c

    r7121 r7125  
    331331    p->pieceSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
    332332    p->pieceSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
     333    p->rawSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
     334    p->rawSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
    333335    return p;
    334336}
     
    368370    tr_free( peer->client );
    369371
     372    tr_rcClose( peer->rawSpeed[TR_CLIENT_TO_PEER] );
     373    tr_rcClose( peer->rawSpeed[TR_PEER_TO_CLIENT] );
    370374    tr_rcClose( peer->pieceSpeed[TR_CLIENT_TO_PEER] );
    371375    tr_rcClose( peer->pieceSpeed[TR_PEER_TO_CLIENT] );
     
    10171021            const time_t now = time( NULL );
    10181022            tr_torrent * tor = t->tor;
     1023            const tr_direction dir = TR_CLIENT_TO_PEER;
     1024
    10191025            tor->activityDate = now;
    1020             tor->uploadedCur += e->length;
     1026
     1027            if( e->wasPieceData )
     1028                tor->uploadedCur += e->length;
     1029
     1030            /* add it to the raw upload speed */
    10211031            if( peer )
    1022                 tr_rcTransferred ( peer->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
    1023             tr_rcTransferred ( tor->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
    1024             tr_rcTransferred ( tor->session->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
    1025             tr_statsAddUploaded( tor->session, e->length );
    1026             if( peer )
    1027             {
     1032                tr_rcTransferred ( peer->rawSpeed[dir], e->length );
     1033            tr_rcTransferred ( tor->rawSpeed[dir], e->length );
     1034            tr_rcTransferred ( tor->session->rawSpeed[dir], e->length );
     1035
     1036            /* maybe add it to the piece upload speed */
     1037            if( e->wasPieceData ) {
     1038                if( peer )
     1039                    tr_rcTransferred ( peer->pieceSpeed[dir], e->length );
     1040                tr_rcTransferred ( tor->pieceSpeed[dir], e->length );
     1041                tr_rcTransferred ( tor->session->pieceSpeed[dir], e->length );
     1042            }
     1043
     1044            /* update the stats */
     1045            if( e->wasPieceData )
     1046                tr_statsAddUploaded( tor->session, e->length );
     1047
     1048            /* update our atom */
     1049            if( peer ) {
    10281050                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
    10291051                a->piece_data_time = now;
    10301052            }
     1053
    10311054            break;
    10321055        }
     
    10361059            const time_t now = time( NULL );
    10371060            tr_torrent * tor = t->tor;
     1061            const tr_direction dir = TR_PEER_TO_CLIENT;
     1062
    10381063            tor->activityDate = now;
    1039             tr_statsAddDownloaded( tor->session, e->length );
    1040             if( peer )
    1041                 tr_rcTransferred ( peer->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
    1042             tr_rcTransferred ( tor->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
    1043             tr_rcTransferred ( tor->session->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
     1064
    10441065            /* only add this to downloadedCur if we got it from a peer --
    10451066             * webseeds shouldn't count against our ratio.  As one tracker
     
    10481069             * to manage the swarms, not the web server and does not fit
    10491070             * into the jurisdiction of the tracker." */
     1071            if( peer && e->wasPieceData )
     1072                tor->downloadedCur += e->length;
     1073
     1074            /* add it to our raw download speed */
    10501075            if( peer )
    1051                 tor->downloadedCur += e->length;
     1076                tr_rcTransferred ( peer->rawSpeed[dir], e->length );
     1077            tr_rcTransferred ( tor->rawSpeed[dir], e->length );
     1078            tr_rcTransferred ( tor->session->rawSpeed[dir], e->length );
     1079
     1080            /* maybe add it to the piece upload speed */
     1081            if( e->wasPieceData ) {
     1082                if( peer )
     1083                    tr_rcTransferred ( peer->pieceSpeed[dir], e->length );
     1084                tr_rcTransferred ( tor->pieceSpeed[dir], e->length );
     1085                tr_rcTransferred ( tor->session->pieceSpeed[dir], e->length );
     1086            }
     1087     
     1088            /* update the stats */
     1089            if( e->wasPieceData )
     1090                tr_statsAddDownloaded( tor->session, e->length );
     1091
     1092            /* update our atom */
    10521093            if( peer ) {
    10531094                struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
    10541095                a->piece_data_time = now;
    10551096            }
     1097
    10561098            break;
    10571099        }
  • trunk/libtransmission/peer-msgs.c

    r7119 r7125  
    430430**/
    431431
    432 static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
     432static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 };
    433433
    434434static void
     
    484484static void
    485485fireClientGotData( tr_peermsgs * msgs,
    486                    uint32_t      length )
     486                   uint32_t      length,
     487                   int           wasPieceData )
    487488{
    488489    tr_peer_event e = blankEvent;
     
    490491    e.length = length;
    491492    e.eventType = TR_PEER_CLIENT_GOT_DATA;
     493    e.wasPieceData = wasPieceData;
    492494    publish( msgs, &e );
    493495}
    494496
    495497static void
    496 firePeerGotData( tr_peermsgs * msgs,
    497                  uint32_t      length )
     498firePeerGotData( tr_peermsgs  * msgs,
     499                 uint32_t       length,
     500                 int            wasPieceData )
    498501{
    499502    tr_peer_event e = blankEvent;
     
    501504    e.length = length;
    502505    e.eventType = TR_PEER_PEER_GOT_DATA;
     506    e.wasPieceData = wasPieceData;
     507
    503508    publish( msgs, &e );
    504509}
     
    13001305                           const struct peer_request * req );
    13011306
    1302 static void
    1303 clientGotBytes( tr_peermsgs * msgs,
    1304                 uint32_t      byteCount )
    1305 {
    1306     msgs->info->pieceDataActivityDate = time( NULL );
    1307     fireClientGotData( msgs, byteCount );
    1308 }
    1309 
    13101307static int
    13111308readBtPiece( tr_peermsgs *     msgs,
     
    13431340        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
    13441341        evbuffer_add( msgs->incoming.block, buf, n );
    1345         clientGotBytes( msgs, n );
     1342        fireClientGotData( msgs, n, TRUE );
    13461343        tr_free( buf );
    13471344        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
     
    15401537
    15411538static void
    1542 peerGotBytes( tr_peermsgs * msgs,
    1543               uint32_t      byteCount,
    1544               const time_t  now )
    1545 {
    1546     msgs->info->pieceDataActivityDate = now;
    1547     firePeerGotData( msgs, byteCount );
    1548 }
    1549 
    1550 static void
    15511539decrementDownloadedCount( tr_peermsgs * msgs,
    15521540                          uint32_t      byteCount )
     
    16351623}
    16361624
     1625static void
     1626didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
     1627{
     1628    tr_peermsgs * msgs = vmsgs;
     1629    firePeerGotData( msgs, bytesWritten, wasPieceData );
     1630}
     1631
    16371632static ReadState
    16381633canRead( struct bufferevent * evin,
     
    16531648    }
    16541649    else switch( msgs->state )
    1655         {
    1656             case AWAITING_BT_LENGTH:
    1657                 ret = readBtLength ( msgs, in, inlen ); break;
    1658 
    1659             case AWAITING_BT_ID:
    1660                 ret = readBtId     ( msgs, in, inlen ); break;
    1661 
    1662             case AWAITING_BT_MESSAGE:
    1663                 ret = readBtMessage( msgs, in, inlen ); break;
    1664 
    1665             default:
    1666                 assert( 0 );
    1667         }
     1650    {
     1651        case AWAITING_BT_LENGTH:
     1652            ret = readBtLength ( msgs, in, inlen ); break;
     1653
     1654        case AWAITING_BT_ID:
     1655            ret = readBtId     ( msgs, in, inlen ); break;
     1656
     1657        case AWAITING_BT_MESSAGE:
     1658            ret = readBtMessage( msgs, in, inlen ); break;
     1659
     1660        default:
     1661            assert( 0 );
     1662    }
     1663
     1664    /* log the raw data that was read */
     1665    if( EVBUFFER_LENGTH( in ) != inlen )
     1666        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
    16681667
    16691668    return ret;
     
    17271726        if( outlen )
    17281727        {
    1729             tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
     1728            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen, TRUE );
    17301729            evbuffer_drain( msgs->outBlock, outlen );
    1731             peerGotBytes( msgs, outlen, now );
    17321730
    17331731            len -= outlen;
     
    17351733            msgs->sendingBlock = len != 0;
    17361734
    1737             dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen,
    1738                     (int)len );
     1735            dbgmsg( msgs, "wrote %zu bytes; %zu left in block", outlen, len );
    17391736        }
    1740         else dbgmsg( msgs,
    1741                      "stalled writing block... uploadMax %lu, outlen %lu",
    1742                      uploadMax, outlen );
     1737        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
    17431738    }
    17441739
     
    17501745        if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
    17511746        {
    1752             dbgmsg( msgs, "started an outMessages batch (length is %d)",
    1753                    (int)EVBUFFER_LENGTH( msgs->outMessages ) );
     1747            dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
    17541748            msgs->outMessagesBatchedAt = now;
    17551749        }
     
    17581752                   msgs->outMessagesBatchPeriod ) )
    17591753        {
    1760             dbgmsg( msgs, "flushing outMessages... (length is %d)",
    1761                    (int)EVBUFFER_LENGTH(
    1762                        msgs->outMessages ) );
    1763             tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
     1754            dbgmsg( msgs, "flushing outMessages... (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
     1755            tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
    17641756            msgs->clientSentAnythingAt = now;
    17651757            msgs->outMessagesBatchedAt = 0;
     
    20902082    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of
    20912083                                             inactivity */
    2092     tr_peerIoSetIOFuncs( m->io, canRead, NULL, gotError, m );
     2084    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
    20932085    ratePulse( m );
    20942086
  • trunk/libtransmission/webseed.c

    r7078 r7125  
    5252***/
    5353
    54 static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
     54static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 };
    5555
    5656static void
     
    9494    e.eventType = TR_PEER_CLIENT_GOT_DATA;
    9595    e.length = length;
     96    e.wasPieceData = TRUE;
     97
    9698    publish( w, &e );
    9799}
Note: See TracChangeset for help on using the changeset viewer.