Changeset 7154


Ignore:
Timestamp:
Nov 25, 2008, 9:35:17 PM (12 years ago)
Author:
charles
Message:

(libT) yet another stab at getting bandwidth management under control. this version may suck less than previous attempts. It also breaks the mac build until someone adds iobuf.[ch] to xcode...

Location:
trunk/libtransmission
Files:
2 added
14 edited

Legend:

Unmodified
Added
Removed
  • trunk/libtransmission/Makefile.am

    r7147 r7154  
    2525    handshake.c \
    2626    inout.c \
     27    iobuf.c \
    2728    json.c \
    2829    JSON_parser.c \
     
    7071    handshake.h \
    7172    inout.h \
     73    iobuf.h \
    7274    json.h \
    7375    JSON_parser.h \
  • trunk/libtransmission/bandwidth.c

    r7148 r7154  
    1111 */
    1212
     13#include <assert.h>
    1314#include <limits.h>
     15
     16#include "event.h"
     17
    1418#include "transmission.h"
    1519#include "bandwidth.h"
     20#include "iobuf.h"
     21#include "ptrarray.h"
    1622#include "utils.h"
    1723
     
    2531    INTERVAL_MSEC = HISTORY_MSEC,
    2632    GRANULARITY_MSEC = 250,
    27     HISTORY_SIZE = ( INTERVAL_MSEC / GRANULARITY_MSEC )
     33    HISTORY_SIZE = ( INTERVAL_MSEC / GRANULARITY_MSEC ),
     34    MAGIC_NUMBER = 43143
    2835};
    2936
     
    7683******/
    7784
    78 struct tr_bandwidth
     85struct tr_band
    7986{
    8087    unsigned int isLimited : 1;
    81 
     88    unsigned int honorParentLimits : 1;
    8289    size_t bytesLeft;
    83 
     90    double desiredSpeed;
    8491    struct bratecontrol raw;
    8592    struct bratecontrol piece;
    86 
     93};
     94
     95struct tr_bandwidth
     96{
     97    struct tr_band band[2];
     98    struct tr_bandwidth * parent;
     99    int magicNumber;
    87100    tr_session * session;
     101    tr_ptrArray * children; /* struct tr_bandwidth */
     102    tr_ptrArray * iobufs; /* struct tr_iobuf */
    88103};
    89104
     
    92107***/
    93108
     109static int
     110comparePointers( const void * a, const void * b )
     111{
     112    return a - b;
     113}
     114
     115static int
     116isBandwidth( const tr_bandwidth * b )
     117{
     118    return ( b != NULL ) && ( b->magicNumber == MAGIC_NUMBER );
     119}
     120
     121static int
     122isDirection( const tr_direction dir )
     123{
     124    return ( dir == TR_UP ) || ( dir == TR_DOWN );
     125}
     126
     127/***
     128****
     129***/
     130
    94131tr_bandwidth*
    95 tr_bandwidthNew( tr_session * session )
     132tr_bandwidthNew( tr_session * session, tr_bandwidth * parent )
    96133{
    97134    tr_bandwidth * b = tr_new0( tr_bandwidth, 1 );
    98135    b->session = session;
     136    b->children = tr_ptrArrayNew( );
     137    b->iobufs = tr_ptrArrayNew( );
     138    b->magicNumber = MAGIC_NUMBER;
     139    b->band[TR_UP].honorParentLimits = 1;
     140    b->band[TR_DOWN].honorParentLimits = 1;
     141    tr_bandwidthSetParent( b, parent );
    99142    return b;
    100143}
     
    103146tr_bandwidthFree( tr_bandwidth * b )
    104147{
     148    assert( isBandwidth( b ) );
     149
     150    tr_bandwidthSetParent( b, NULL );
     151    tr_ptrArrayFree( b->iobufs, NULL );
     152    tr_ptrArrayFree( b->children, NULL );
     153    b->magicNumber = 0xDEAD;
    105154    tr_free( b );
    106155}
     
    111160
    112161void
     162tr_bandwidthSetParent( tr_bandwidth  * b,
     163                       tr_bandwidth  * parent )
     164{
     165    assert( isBandwidth( b ) );
     166    assert( b != parent );
     167
     168    if( b->parent )
     169    {
     170        assert( isBandwidth( b->parent ) );
     171
     172        tr_ptrArrayRemoveSorted( b->parent->children, b, comparePointers );
     173        b->parent= NULL;
     174    }
     175
     176    if( parent )
     177    {
     178        assert( isBandwidth( parent ) );
     179        assert( parent->parent != b );
     180
     181        tr_ptrArrayInsertSorted( parent->children, b, comparePointers );
     182        b->parent = parent;
     183    }
     184}
     185
     186void
     187tr_bandwidthHonorParentLimits( tr_bandwidth  * b,
     188                               tr_direction    dir,
     189                               int             honorParentLimits )
     190{
     191    assert( isBandwidth( b ) );
     192    assert( isDirection( dir ) );
     193
     194    b->band[dir].honorParentLimits = honorParentLimits != 0;
     195}
     196
     197/***
     198****
     199***/
     200
     201void
     202tr_bandwidthSetDesiredSpeed( tr_bandwidth  * b,
     203                             tr_direction    dir,
     204                             double          desiredSpeed )
     205{
     206    assert( isBandwidth( b ) );
     207    assert( isDirection( dir ) );
     208
     209    b->band[dir].desiredSpeed = desiredSpeed;
     210}
     211
     212double
     213tr_bandwidthGetDesiredSpeed( const tr_bandwidth  * b,
     214                             tr_direction          dir )
     215{
     216    assert( isBandwidth( b ) );
     217    assert( isDirection( dir ) );
     218
     219    return b->band[dir].desiredSpeed;
     220}
     221
     222void
    113223tr_bandwidthSetLimited( tr_bandwidth  * b,
    114                         size_t          bytesLeft )
    115 {
    116     b->isLimited = 1;
    117     b->bytesLeft = bytesLeft;
    118 }
    119 
    120 void
    121 tr_bandwidthSetUnlimited( tr_bandwidth * b )
    122 {
    123     b->isLimited = 0;
    124 }
     224                        tr_direction    dir,
     225                        int             isLimited )
     226{
     227    assert( isBandwidth( b ) );
     228    assert( isDirection( dir ) );
     229
     230    b->band[dir].isLimited = isLimited != 0;
     231}
     232
     233int
     234tr_bandwidthIsLimited( const tr_bandwidth  * b,
     235                       tr_direction          dir )
     236{
     237    assert( isBandwidth( b ) );
     238    assert( isDirection( dir ) );
     239
     240    return b->band[dir].isLimited != 0;
     241}
     242
     243#if 0
     244#define DEBUG_DIRECTION TR_DOWN
     245#endif
     246
     247void
     248tr_bandwidthAllocate( tr_bandwidth  * b,
     249                      tr_direction    dir,
     250                      int             period_msec )
     251{
     252    const double currentSpeed = tr_bandwidthGetPieceSpeed( b, dir ); /* KiB/s */
     253    const double desiredSpeed = b->band[dir].desiredSpeed;           /* KiB/s */
     254    const double seconds_per_pulse = period_msec / 1000.0;
     255    const double current_bytes_per_pulse = currentSpeed * 1024.0 * seconds_per_pulse;
     256    const double desired_bytes_per_pulse = desiredSpeed * 1024.0 * seconds_per_pulse;
     257    const double pulses_per_history = (double)HISTORY_MSEC / period_msec;
     258    const double min = desired_bytes_per_pulse * 0.90;
     259    const double max = desired_bytes_per_pulse * 1.50;
     260    const double next_pulse_bytes = desired_bytes_per_pulse * ( pulses_per_history + 1 )
     261                                  - ( current_bytes_per_pulse * pulses_per_history );
     262    double clamped;
     263
     264    /* clamp the return value to lessen oscillation */
     265    clamped = next_pulse_bytes;
     266    clamped = MAX( clamped, min );
     267    clamped = MIN( clamped, max );
     268    b->band[dir].bytesLeft = clamped;
     269
     270#ifdef DEBUG_DIRECTION
     271if( dir == DEBUG_DIRECTION )
     272fprintf( stderr, "bandwidth %p currentSpeed(%5.2f) desiredSpeed(%5.2f), allocating %5.2f (unclamped: %5.2f)\n",
     273         b, currentSpeed, desiredSpeed,
     274         clamped/1024.0, next_pulse_bytes/1024.0 );
     275#endif
     276
     277    /* notify the io buffers that there's more bandwidth available */
     278    if( !b->band[dir].isLimited || ( clamped > 0 ) ) {
     279        int i, n=0;
     280        short what = dir==TR_UP ? EV_WRITE : EV_READ;
     281        struct tr_iobuf ** iobufs = (struct tr_iobuf**) tr_ptrArrayPeek( b->iobufs, &n );
     282#ifdef DEBUG_DIRECTION
     283if( dir == DEBUG_DIRECTION )
     284fprintf( stderr, "bandwidth %p has %d iobufs\n", b, n );
     285#endif
     286        for( i=0; i<n; ++i )
     287            tr_iobuf_enable( iobufs[i], what );
     288    }
     289
     290    /* all children should reallocate too */
     291    if( 1 ) {
     292        int i, n=0;
     293        struct tr_bandwidth ** children = (struct tr_bandwidth**) tr_ptrArrayPeek( b->children, &n );
     294        for( i=0; i<n; ++i )
     295            tr_bandwidthAllocate( children[i], dir, period_msec );
     296    }
     297}
     298
     299/***
     300****
     301***/
     302
     303void
     304tr_bandwidthAddBuffer( tr_bandwidth        * b,
     305                       struct tr_iobuf     * iobuf )
     306{
     307    assert( isBandwidth( b ) );
     308    assert( iobuf );
     309
     310    tr_ptrArrayInsertSorted( b->iobufs, iobuf, comparePointers );
     311}
     312
     313void
     314tr_bandwidthRemoveBuffer( tr_bandwidth        * b,
     315                          struct tr_iobuf     * iobuf )
     316{
     317    assert( isBandwidth( b ) );
     318    assert( iobuf );
     319
     320    tr_ptrArrayRemoveSorted( b->iobufs, iobuf, comparePointers );
     321}
     322
     323/***
     324****
     325***/
    125326
    126327size_t
    127328tr_bandwidthClamp( const tr_bandwidth  * b,
     329                   tr_direction          dir,
    128330                   size_t                byteCount )
    129331{
    130 /* const size_t n = byteCount; */
    131 
    132     if( b && b->isLimited )
    133         byteCount = MIN( byteCount, b->bytesLeft );
    134 
    135 /* if( n != byteCount ) fprintf( stderr, "%p: %zu clamped to %zu\n", b, n, byteCount ); */
     332    assert( isBandwidth( b ) );
     333    assert( isDirection( dir ) );
     334
     335    if( b )
     336    {
     337        if( b->band[dir].isLimited )
     338            byteCount = MIN( byteCount, b->band[dir].bytesLeft );
     339
     340        if( b->parent && b->band[dir].honorParentLimits )
     341            byteCount = tr_bandwidthClamp( b->parent, dir, byteCount );
     342    }
     343
    136344    return byteCount;
    137345}
    138346
    139 /***
    140 ****
    141 ***/
    142 
    143347double
    144 tr_bandwidthGetRawSpeed( const tr_bandwidth * b )
    145 {
    146     return getSpeed( &b->raw );
     348tr_bandwidthGetRawSpeed( const tr_bandwidth * b, tr_direction dir )
     349{
     350    assert( isBandwidth( b ) );
     351    assert( isDirection( dir ) );
     352
     353    return getSpeed( &b->band[dir].raw );
    147354}
    148355
    149356double
    150 tr_bandwidthGetPieceSpeed( const tr_bandwidth * b UNUSED )
    151 {
    152     return getSpeed( &b->piece );
     357tr_bandwidthGetPieceSpeed( const tr_bandwidth * b, tr_direction dir )
     358{
     359    assert( isBandwidth( b ) );
     360    assert( isDirection( dir ) );
     361
     362    return getSpeed( &b->band[dir].piece );
    153363}
    154364
    155365void
    156366tr_bandwidthUsed( tr_bandwidth  * b,
     367                  tr_direction    dir,
    157368                  size_t          byteCount,
    158369                  int             isPieceData )
    159370{
    160     if( b->isLimited && isPieceData )
    161     {
    162         b->bytesLeft -= MIN( b->bytesLeft, byteCount );
    163         /* fprintf( stderr, "%p used %zu bytes ... %zu left\n", b, byteCount, b->bytesLeft ); */
    164     }
    165 
    166     bytesUsed( &b->raw, byteCount );
     371    struct tr_band * band;
     372
     373    assert( isBandwidth( b ) );
     374    assert( isDirection( dir ) );
     375
     376    band = &b->band[dir];
     377
     378    if( band->isLimited && isPieceData )
     379        band->bytesLeft -= MIN( band->bytesLeft, byteCount );
     380
     381#ifdef DEBUG_DIRECTION
     382if( ( dir == DEBUG_DIRECTION ) && band->isLimited && isPieceData )
     383fprintf( stderr, "%p consumed %zu bytes of piece data... %zu left\n", b, byteCount, band->bytesLeft );
     384#endif
     385
     386    bytesUsed( &band->raw, byteCount );
    167387
    168388    if( isPieceData )
    169         bytesUsed( &b->piece, byteCount );
    170 }
     389        bytesUsed( &band->piece, byteCount );
     390
     391    if( b->parent != NULL )
     392        tr_bandwidthUsed( b->parent, dir, byteCount, isPieceData );
     393}
  • trunk/libtransmission/bandwidth.h

    r7151 r7154  
    1818#define TR_BANDWIDTH_H
    1919
     20struct tr_iobuf;
     21
     22/**
     23 * Bandwidth is an object for measuring and constraining bandwidth speeds.
     24 *
     25 * Bandwidth objects can be "stacked" so that a peer can be made to obey
     26 * multiple constraints (for example, obeying the global speed limit and a
     27 * per-torrent speed limit).
     28 *
     29 * HIERARCHY
     30 *
     31 *   Transmission's bandwidth hierarchy is a tree.
     32 *   At the top is the global bandwidth object owned by tr_session.
     33 *   Its children are per-torrent bandwidth objects owned by tr_torrent.
     34 *   Underneath those are per-peer bandwidth objects owned by tr_peer.
     35 *
     36 *   tr_session also owns a tr_handshake's bandwidths, so that the handshake
     37 *   I/O can be counted in the global raw totals.  When the handshake is done,
     38 *   the bandwidth's ownership passes to a tr_peer.
     39 *
     40 * MEASURING
     41 *
     42 *   When you ask a bandwidth object for its speed, it gives the speed of the
     43 *   subtree underneath it as well.  So you can get Transmission's overall
     44 *   speed by quering tr_session's bandwidth, per-torrent speeds by asking
     45 *   tr_torrent's bandwidth, and per-peer speeds by asking tr_peer's bandwidth.
     46 *
     47 * CONSTRAINING
     48 *
     49 *   Call tr_bandwidthAllocate() periodically.  tr_bandwidth knows its current
     50 *   speed and will decide how many bytes to make available over the
     51 *   user-specified period to reach the user-specified desired speed.
     52 *   If appropriate, it notifies its iobufs that new bandwidth is available.
     53 *
     54 *   tr_bandwidthAllocate() operates on the tr_bandwidth subtree, so usually
     55 *   you'll only need to invoke it for the top-level tr_session bandwidth.
     56 *
     57 *   The iobufs all have a pointer to their associated tr_bandwidth object,
     58 *   and call tr_bandwidthClamp() before performing I/O to see how much
     59 *   bandwidth they can safely use.
     60 */
    2061typedef struct tr_bandwidth tr_bandwidth;
    2162
     
    2465**/
    2566
    26 tr_bandwidth* tr_bandwidthNew           ( tr_session          * session );
     67/** @brief create a new tr_bandwidth object */
     68tr_bandwidth*
     69         tr_bandwidthNew              ( tr_session          * session,
     70                                        tr_bandwidth        * parent );
    2771
    28 void          tr_bandwidthFree          ( tr_bandwidth        * bandwidth );
     72/** @brief destroy a tr_bandwidth object */
     73void     tr_bandwidthFree             ( tr_bandwidth        * bandwidth );
     74
     75/******
     76*******
     77******/
    2978
    3079/**
    31 ***
    32 **/
    33 
    34 void          tr_bandwidthSetLimited    ( tr_bandwidth        * bandwidth,
    35                                           size_t                byteCount );
    36 
    37 void          tr_bandwidthSetUnlimited  ( tr_bandwidth        * bandwidth );
    38 
    39 size_t        tr_bandwidthClamp         ( const tr_bandwidth  * bandwidth,
    40                                           size_t                byteCount );
     80 * @brief Set the desired speed (in KiB/s) for this bandwidth subtree.
     81 * @see tr_bandwidthAllocate
     82 * @see tr_bandwidthGetDesiredSpeed
     83 */
     84void    tr_bandwidthSetDesiredSpeed   ( tr_bandwidth        * bandwidth,
     85                                        tr_direction          direction,
     86                                        double                desiredSpeed );
    4187
    4288/**
    43 ***
    44 **/
     89 * @brief Get the desired speed (in KiB/s) for ths bandwidth subtree.
     90 * @see tr_bandwidthSetDesiredSpeed
     91 */
     92double  tr_bandwidthGetDesiredSpeed   ( const tr_bandwidth  * bandwidth,
     93                                        tr_direction          direction );
    4594
    46 double        tr_bandwidthGetRawSpeed   ( const tr_bandwidth  * bandwidth );
     95/**
     96 * @brief Set whether or not this bandwidth should throttle its iobufs' speeds
     97 */
     98void    tr_bandwidthSetLimited        ( tr_bandwidth        * bandwidth,
     99                                        tr_direction          direction,
     100                                        int                   isLimited );
    47101
    48 double        tr_bandwidthGetPieceSpeed ( const tr_bandwidth  * bandwidth );
     102/**
     103 * @return nonzero if this bandwidth throttles its iobufs' speeds
     104 */
     105int     tr_bandwidthIsLimited         ( const tr_bandwidth  * bandwidth,
     106                                        tr_direction          direction );
    49107
    50 void          tr_bandwidthUsed          ( tr_bandwidth        * bandwidth,
    51                                           size_t                byteCount,
    52                                           int                   isPieceData );
     108/**
     109 * @brief allocate the next period_msec's worth of bandwidth for the iobufs to consume
     110 */
     111void    tr_bandwidthAllocate          ( tr_bandwidth        * bandwidth,
     112                                        tr_direction          direction,
     113                                        int                   period_msec );
     114
     115/**
     116 * @brief clamps byteCount down to a number that this bandwidth will allow to be consumed
     117 */
     118size_t  tr_bandwidthClamp             ( const tr_bandwidth  * bandwidth,
     119                                        tr_direction          direction,
     120                                        size_t                byteCount );
     121
     122/******
     123*******
     124******/
     125
     126/**
     127 * @brief Get the raw total of bytes read or sent by this bandwidth subtree.
     128 */
     129double  tr_bandwidthGetRawSpeed       ( const tr_bandwidth  * bandwidth,
     130                                        tr_direction          direction );
     131
     132/**
     133 * @brief Get the number of piece data bytes read or sent by this bandwidth subtree.
     134 */
     135double  tr_bandwidthGetPieceSpeed     ( const tr_bandwidth  * bandwidth,
     136                                        tr_direction          direction );
     137
     138/**
     139 * @brief Notify the bandwidth object that some of its allocated bandwidth has been consumed.
     140 * This is is usually invoked by the iobuf after a read or write.
     141 */
     142void    tr_bandwidthUsed              ( tr_bandwidth        * bandwidth,
     143                                        tr_direction          direction,
     144                                        size_t                byteCount,
     145                                        int                   isPieceData );
     146
     147/******
     148*******
     149******/
     150
     151void    tr_bandwidthSetParent         ( tr_bandwidth        * bandwidth,
     152                                        tr_bandwidth        * parent );
     153
     154/**
     155 * Almost all the time we do want to honor a parents' bandwidth cap, so that
     156 * (for example) a peer is constrained by a per-torrent cap and the global cap.
     157 * But when we set a torrent's speed mode to TR_SPEEDLIMIT_UNLIMITED, then
     158 * in that particular case we want to ignore the global speed limit...
     159 */
     160void    tr_bandwidthHonorParentLimits ( tr_bandwidth        * bandwidth,
     161                                        tr_direction          direction,
     162                                        int                   isEnabled );
     163
     164/******
     165*******
     166******/
     167
     168/**
     169 * @brief add an iobuf to this bandwidth's list of iobufs.
     170 * They will be notified when more bandwidth is made available for them to consume.
     171 */
     172void    tr_bandwidthAddBuffer         ( tr_bandwidth        * bandwidth,
     173                                        struct tr_iobuf     * iobuf );
     174
     175/**
     176 * @brief remove an iobuf from this bandwidth's list of iobufs.
     177 */
     178void    tr_bandwidthRemoveBuffer      ( tr_bandwidth        * bandwidth,
     179                                        struct tr_iobuf     * iobuf );
    53180
    54181#endif
  • trunk/libtransmission/handshake.c

    r7152 r7154  
    2525#include "crypto.h"
    2626#include "handshake.h"
     27#include "iobuf.h"
    2728#include "peer-io.h"
    2829#include "peer-mgr.h"
     
    10291030
    10301031static ReadState
    1031 canRead( struct bufferevent * evin,
    1032          void *               arg )
    1033 {
    1034     tr_handshake *    handshake = (tr_handshake *) arg;
    1035     struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
     1032canRead( struct tr_iobuf * iobuf, void * arg, size_t * piece )
     1033{
     1034    tr_handshake *    handshake = arg;
     1035    struct evbuffer * inbuf = tr_iobuf_input( iobuf );
    10361036    ReadState         ret;
    10371037    int               readyForMore = TRUE;
     1038
     1039    /* no piece data in handshake */
     1040    *piece = 0;
    10381041
    10391042    dbgmsg( handshake, "handling canRead; state is [%s]",
     
    11371140
    11381141static void
    1139 gotError( struct bufferevent * evbuf UNUSED,
    1140           short                      what,
    1141           void *                    arg )
     1142gotError( struct tr_iobuf  * iobuf UNUSED,
     1143          short              what,
     1144          void             * arg )
    11421145{
    11431146    tr_handshake * handshake = (tr_handshake *) arg;
     
    11801183    tr_handshake * handshake;
    11811184
    1182     tr_peerIoSetBandwidth( io, TR_UP, NULL );
    1183     tr_peerIoSetBandwidth( io, TR_DOWN, NULL );
    1184 
    11851185    handshake = tr_new0( tr_handshake, 1 );
    11861186    handshake->io = io;
  • trunk/libtransmission/net.c

    r7152 r7154  
    126126setSndBuf( tr_session * session, int fd )
    127127{
    128 #if 0
    129128    if( fd >= 0 )
    130129    {
     
    134133        setsockopt( fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof( rcvbuf ) );
    135134    }
    136 #endif
    137135}
    138136
  • trunk/libtransmission/peer-io.c

    r7147 r7154  
    2929#include "bandwidth.h"
    3030#include "crypto.h"
     31#include "iobuf.h"
    3132#include "list.h"
    3233#include "net.h"
     
    3536#include "utils.h"
    3637
     38#define MAGIC_NUMBER 206745
    3739#define IO_TIMEOUT_SECS 8
    3840
     
    7981struct tr_peerIo
    8082{
    81     unsigned int           isEncrypted               : 1;
    82     unsigned int           isIncoming                : 1;
    83     unsigned int           peerIdIsSet               : 1;
    84     unsigned int           extendedProtocolSupported : 1;
    85     unsigned int           fastPeersSupported        : 1;
    86 
    87     uint8_t                encryptionMode;
    88     uint8_t                timeout;
    89     uint16_t               port;
    90     int                    socket;
    91 
    92     uint8_t                peerId[20];
    93     time_t                 timeCreated;
    94 
    95     tr_session           * session;
    96 
    97     struct in_addr         in_addr;
    98     struct bufferevent   * bufev;
    99     struct evbuffer      * output;
    100     tr_list              * output_datatypes; /* struct tr_datatype */
    101 
    102     tr_can_read_cb         canRead;
    103     tr_did_write_cb        didWrite;
    104     tr_net_error_cb        gotError;
    105     void *                 userData;
    106 
    107     size_t                 bufferSize[2];
    108 
    109     tr_bandwidth         * bandwidth[2];
    110     tr_crypto            * crypto;
     83    unsigned int             isEncrypted               : 1;
     84    unsigned int             isIncoming                : 1;
     85    unsigned int             peerIdIsSet               : 1;
     86    unsigned int             extendedProtocolSupported : 1;
     87    unsigned int             fastPeersSupported        : 1;
     88
     89    int                      magicNumber;
     90
     91    uint8_t                  encryptionMode;
     92    uint8_t                  timeout;
     93    uint16_t                 port;
     94    int                      socket;
     95
     96    uint8_t                  peerId[20];
     97    time_t                   timeCreated;
     98
     99    tr_session             * session;
     100
     101    struct in_addr           in_addr;
     102    struct tr_iobuf        * iobuf;
     103    tr_list                * output_datatypes; /* struct tr_datatype */
     104
     105    tr_can_read_cb           canRead;
     106    tr_did_write_cb          didWrite;
     107    tr_net_error_cb          gotError;
     108    void *                   userData;
     109
     110    size_t                   bufferSize[2];
     111
     112    tr_bandwidth           * bandwidth;
     113    tr_crypto              * crypto;
    111114};
    112 
    113 /**
    114 ***
    115 **/
    116 
    117 static void
    118 adjustOutputBuffer( tr_peerIo * io )
    119 {
    120     struct evbuffer * live = EVBUFFER_OUTPUT( io->bufev );
    121     size_t curLive = EVBUFFER_LENGTH( live );
    122     size_t maxLive = tr_bandwidthClamp( io->bandwidth[TR_UP], io->session->so_sndbuf );
    123 
    124     if( ( curLive < maxLive ) && EVBUFFER_LENGTH( io->output ) )
    125     {
    126         size_t freeSpace = maxLive - curLive;
    127         size_t n = MIN( freeSpace, EVBUFFER_LENGTH( io->output ) );
    128         bufferevent_write( io->bufev, EVBUFFER_DATA( io->output ), n );
    129         evbuffer_drain( io->output, n );
    130         curLive += n;
    131     }
    132 
    133     io->bufferSize[TR_UP] = curLive;
    134 
    135     if( curLive )
    136         bufferevent_enable( io->bufev, EV_WRITE );
    137 
    138     dbgmsg( io, "after adjusting the output buffer, its size is now %zu", curLive );
    139 }
    140 
    141 static void
    142 adjustInputBuffer( tr_peerIo * io )
    143 {
    144     /* FIXME: the max read size probably needs to vary depending on the
    145      * number of peers that we have connected...  1024 is going to force
    146      * us way over the limit when there are lots of peers */
    147     static const int maxBufSize = 1024;
    148     const size_t n = tr_bandwidthClamp( io->bandwidth[TR_DOWN], maxBufSize );
    149 
    150     if( !n )
    151     {
    152         dbgmsg( io, "disabling reads because we've hit our limit" );
    153         bufferevent_disable( io->bufev, EV_READ );
    154     }
    155     else
    156     {
    157         dbgmsg( io, "enabling reading of %zu more bytes", n );
    158         bufferevent_setwatermark( io->bufev, EV_READ, 0, n );
    159         bufferevent_enable( io->bufev, EV_READ );
    160     }
    161 
    162     io->bufferSize[TR_DOWN] = EVBUFFER_LENGTH( EVBUFFER_INPUT( io->bufev ) );
    163 }
    164115
    165116/***
     
    168119
    169120static void
    170 didWriteWrapper( struct bufferevent * e,
    171                  void *               vio )
     121didWriteWrapper( struct tr_iobuf  * iobuf,
     122                 size_t             bytes_transferred,
     123                 void             * vio )
    172124{
    173125    tr_peerIo *  io = vio;
    174     const size_t len = EVBUFFER_LENGTH( EVBUFFER_OUTPUT( e ) );
    175 
    176     dbgmsg( io, "didWrite... io->outputBufferSize was %zu, is now %zu",
    177             io->bufferSize[TR_UP], len );
    178 
    179     if( len < io->bufferSize[TR_UP] )
     126
     127    while( bytes_transferred )
    180128    {
    181         size_t payload = io->bufferSize[TR_UP] - len;
    182 
    183         while( payload )
    184         {
    185             struct tr_datatype * next = io->output_datatypes->data;
    186             const size_t chunk_length = MIN( next->length, payload );
    187             const size_t n = addPacketOverhead( chunk_length );
    188 
    189             if( io->didWrite )
    190                 io->didWrite( io, n, next->isPieceData, io->userData );
    191 
    192             payload -= chunk_length;
    193             next->length -= chunk_length;
    194             if( !next->length )
    195                 tr_free( tr_list_pop_front( &io->output_datatypes ) );
    196         }
     129        struct tr_datatype * next = io->output_datatypes->data;
     130        const size_t chunk_length = MIN( next->length, bytes_transferred );
     131        const size_t n = addPacketOverhead( chunk_length );
     132
     133        tr_bandwidthUsed( io->bandwidth, TR_UP, n, next->isPieceData );
     134
     135        if( io->didWrite )
     136            io->didWrite( io, n, next->isPieceData, io->userData );
     137
     138        bytes_transferred -= chunk_length;
     139        next->length -= chunk_length;
     140        if( !next->length )
     141            tr_free( tr_list_pop_front( &io->output_datatypes ) );
    197142    }
    198143
    199     adjustOutputBuffer( io );
    200 
     144    if( EVBUFFER_LENGTH( tr_iobuf_output( iobuf ) ) )
     145        tr_iobuf_enable( io->iobuf, EV_WRITE );
    201146}
    202147
    203148static void
    204 canReadWrapper( struct bufferevent * e,
    205                 void *               vio )
     149canReadWrapper( struct tr_iobuf  * iobuf,
     150                size_t             bytes_transferred UNUSED,
     151                void              * vio )
    206152{
    207153    int          done = 0;
     
    219165        while( !done && !err )
    220166        {
    221             const int ret = io->canRead( e, io->userData );
     167            size_t piece = 0;
     168            const size_t oldLen = EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) );
     169            const int ret = io->canRead( iobuf, io->userData, &piece );
     170
     171            if( ret != err )
     172            {
     173                const size_t used = oldLen - EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) );
     174                if( piece )
     175                    tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
     176                if( used != piece )
     177                    tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
     178            }
    222179
    223180            switch( ret )
    224181            {
    225182                case READ_NOW:
    226                     if( EVBUFFER_LENGTH( e->input ) )
     183                    if( EVBUFFER_LENGTH( tr_iobuf_input( iobuf )))
    227184                        continue;
    228185                    done = 1;
     
    241198        tr_globalUnlock( session );
    242199    }
    243 
    244     if( !err )
    245         adjustInputBuffer( io );
    246200}
    247201
    248202static void
    249 gotErrorWrapper( struct bufferevent * e,
    250                  short                what,
    251                  void *              userData )
     203gotErrorWrapper( struct tr_iobuf  * iobuf,
     204                 short              what,
     205                 void             * userData )
    252206{
    253207    tr_peerIo * c = userData;
    254208
    255209    if( c->gotError )
    256         c->gotError( e, what, c->userData );
     210        c->gotError( iobuf, what, c->userData );
    257211}
    258212
     
    264218bufevNew( tr_peerIo * io )
    265219{
    266     io->bufev = bufferevent_new( io->socket,
    267                                  canReadWrapper,
    268                                  didWriteWrapper,
    269                                  gotErrorWrapper,
    270                                  io );
    271 
    272     /* tell libevent to call didWriteWrapper after every write,
    273      * not just when the write buffer is empty */
    274     bufferevent_setwatermark( io->bufev, EV_WRITE, INT_MAX, 0 );
    275 
    276     bufferevent_settimeout( io->bufev, io->timeout, io->timeout );
    277 
    278     bufferevent_enable( io->bufev, EV_READ | EV_WRITE );
     220    io->iobuf = tr_iobuf_new( io->session,
     221                              io->bandwidth,
     222                              io->socket,
     223                              EV_READ | EV_WRITE,
     224                              canReadWrapper,
     225                              didWriteWrapper,
     226                              gotErrorWrapper,
     227                              io );
     228
     229    tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
     230}
     231
     232static int
     233isPeerIo( const tr_peerIo * io )
     234{
     235    return ( io != NULL ) && ( io->magicNumber == MAGIC_NUMBER );
    279236}
    280237
     
    293250
    294251    io = tr_new0( tr_peerIo, 1 );
     252    io->magicNumber = MAGIC_NUMBER;
    295253    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
    296254    io->session = session;
     
    301259    io->timeout = IO_TIMEOUT_SECS;
    302260    io->timeCreated = time( NULL );
    303     io->output = evbuffer_new( );
    304261    bufevNew( io );
     262    tr_peerIoSetBandwidth( io, session->bandwidth );
    305263    return io;
    306264}
     
    346304    tr_peerIo * io = vio;
    347305
    348     evbuffer_free( io->output );
    349     bufferevent_free( io->bufev );
     306    tr_peerIoSetBandwidth( io, NULL );
     307    tr_iobuf_free( io->iobuf );
    350308    tr_netClose( io->socket );
    351309    tr_cryptoFree( io->crypto );
    352310    tr_list_free( &io->output_datatypes, tr_free );
     311
     312    io->magicNumber = 0xDEAD;
    353313    tr_free( io );
    354314}
     
    369329tr_peerIoGetSession( tr_peerIo * io )
    370330{
    371     assert( io );
     331    assert( isPeerIo( io ) );
    372332    assert( io->session );
    373333
     
    379339                           uint16_t * port )
    380340{
    381     assert( io );
     341    assert( isPeerIo( io ) );
    382342
    383343    if( port )
     
    407367tr_peerIoTryRead( tr_peerIo * io )
    408368{
    409     if( EVBUFFER_LENGTH( io->bufev->input ) )
    410         canReadWrapper( io->bufev, io );
     369    if( EVBUFFER_LENGTH( tr_iobuf_input( io->iobuf )))
     370        (*canReadWrapper)( io->iobuf, ~0, io );
    411371}
    412372
     
    444404    if( io->socket >= 0 )
    445405    {
     406        tr_bandwidth * bandwidth = io->bandwidth;
     407        tr_peerIoSetBandwidth( io, NULL );
     408
    446409        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
    447 
    448         bufferevent_free( io->bufev );
     410        tr_iobuf_free( io->iobuf );
    449411        bufevNew( io );
     412
     413        tr_peerIoSetBandwidth( io, bandwidth );
    450414        return 0;
    451415    }
     
    459423{
    460424    io->timeout = secs;
    461     bufferevent_settimeout( io->bufev, io->timeout, io->timeout );
    462     bufferevent_enable( io->bufev, EV_READ | EV_WRITE );
     425    tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
     426    tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
    463427}
    464428
     
    471435                         const uint8_t * hash )
    472436{
    473     assert( io );
     437    assert( isPeerIo( io ) );
    474438
    475439    tr_cryptoSetTorrentHash( io->crypto, hash );
     
    479443tr_peerIoGetTorrentHash( tr_peerIo * io )
    480444{
    481     assert( io );
     445    assert( isPeerIo( io ) );
    482446    assert( io->crypto );
    483447
     
    488452tr_peerIoHasTorrentHash( const tr_peerIo * io )
    489453{
    490     assert( io );
     454    assert( isPeerIo( io ) );
    491455    assert( io->crypto );
    492456
     
    502466                     const uint8_t * peer_id )
    503467{
    504     assert( io );
     468    assert( isPeerIo( io ) );
    505469
    506470    if( ( io->peerIdIsSet = peer_id != NULL ) )
     
    513477tr_peerIoGetPeersId( const tr_peerIo * io )
    514478{
    515     assert( io );
     479    assert( isPeerIo( io ) );
    516480    assert( io->peerIdIsSet );
    517481
     
    527491                     int         flag )
    528492{
    529     assert( io );
     493    assert( isPeerIo( io ) );
    530494    assert( flag == 0 || flag == 1 );
    531495
     
    537501                     int         flag )
    538502{
    539     assert( io );
     503    assert( isPeerIo( io ) );
    540504    assert( flag == 0 || flag == 1 );
    541505
     
    546510tr_peerIoSupportsLTEP( const tr_peerIo * io )
    547511{
    548     assert( io );
     512    assert( isPeerIo( io ) );
    549513
    550514    return io->extendedProtocolSupported;
     
    554518tr_peerIoSupportsFEXT( const tr_peerIo * io )
    555519{
    556     assert( io );
     520    assert( isPeerIo( io ) );
    557521
    558522    return io->fastPeersSupported;
     
    566530tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
    567531{
    568     const size_t desiredLiveLen = tr_bandwidthClamp( io->bandwidth[TR_UP], io->session->so_rcvbuf );
    569     const size_t currentLiveLen = EVBUFFER_LENGTH( EVBUFFER_OUTPUT( io->bufev ) );
    570     const size_t desiredQueueLen = io->session->so_sndbuf;
    571     const size_t currentQueueLen = EVBUFFER_LENGTH( io->output );
    572     const size_t desiredLen = desiredLiveLen + desiredQueueLen;
    573     const size_t currentLen = currentLiveLen + currentQueueLen;
     532    const size_t desiredLen = io->session->so_sndbuf * 2; /* FIXME: bigger? */
     533    const size_t currentLen = EVBUFFER_LENGTH( tr_iobuf_output( io->iobuf ) );
    574534    size_t freeSpace = 0;
    575535
     
    582542void
    583543tr_peerIoSetBandwidth( tr_peerIo     * io,
    584                        tr_direction    direction,
    585544                       tr_bandwidth  * bandwidth )
    586545{
    587     assert( io );
    588     assert( direction == TR_UP || direction == TR_DOWN );
    589 
    590     io->bandwidth[direction] = bandwidth;
    591 
    592     if( direction ==  TR_UP )
    593         adjustOutputBuffer( io );
    594     else
    595         adjustInputBuffer( io );
     546    assert( isPeerIo( io ) );
     547
     548    if( io->bandwidth )
     549        tr_bandwidthRemoveBuffer( io->bandwidth, io->iobuf );
     550
     551    io->bandwidth = bandwidth;
     552    tr_iobuf_set_bandwidth( io->iobuf, bandwidth );
     553
     554    if( io->bandwidth )
     555        tr_bandwidthAddBuffer( io->bandwidth, io->iobuf );
     556
     557    tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
    596558}
    597559
     
    610572                        int         encryptionMode )
    611573{
    612     assert( io );
     574    assert( isPeerIo( io ) );
    613575    assert( encryptionMode == PEER_ENCRYPTION_NONE
    614576          || encryptionMode == PEER_ENCRYPTION_RC4 );
     
    637599    dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
    638600
    639     evbuffer_add( io->output, writeme, writemeLen );
    640 
    641601    datatype = tr_new( struct tr_datatype, 1 );
    642602    datatype->isPieceData = isPieceData != 0;
     
    644604    tr_list_append( &io->output_datatypes, datatype );
    645605
    646     adjustOutputBuffer( io );
     606    evbuffer_add( tr_iobuf_output( io->iobuf ), writeme, writemeLen );
     607    tr_iobuf_enable( io->iobuf, EV_WRITE );
    647608}
    648609
  • trunk/libtransmission/peer-io.h

    r7151 r7154  
    2424struct in_addr;
    2525struct evbuffer;
    26 struct bufferevent;
    2726struct tr_bandwidth;
    2827struct tr_crypto;
     28struct tr_iobuf;
    2929typedef struct tr_peerIo tr_peerIo;
    3030
     
    111111ReadState;
    112112
    113 typedef ReadState ( *tr_can_read_cb  )( struct bufferevent   * ev,
    114                                         void                 * user_data );
    115 
    116 typedef void      ( *tr_did_write_cb )( tr_peerIo            * io,
    117                                         size_t                 bytesWritten,
    118                                         int                    wasPieceData,
    119                                         void                 * userData );
    120 
    121 typedef void      ( *tr_net_error_cb )( struct bufferevent  * ev,
    122                                         short                 what,
    123                                         void                * userData );
     113typedef ReadState ( *tr_can_read_cb  )( struct tr_iobuf  * iobuf,
     114                                        void             * user_data,
     115                                        size_t           * setme_piece_byte_count );
     116
     117typedef void      ( *tr_did_write_cb )( tr_peerIo        * io,
     118                                        size_t             bytesWritten,
     119                                        int                wasPieceData,
     120                                        void             * userData );
     121
     122typedef void      ( *tr_net_error_cb )( struct tr_iobuf  * ev,
     123                                        short              what,
     124                                        void             * userData );
    124125
    125126void    tr_peerIoSetIOFuncs      ( tr_peerIo        * io,
     
    206207
    207208void              tr_peerIoSetBandwidth( tr_peerIo            * io,
    208                                          tr_direction           direction,
    209209                                         struct tr_bandwidth  * bandwidth );
    210210
     211void              tr_peerIoBandwidthUsed( tr_peerIo           * io,
     212                                          tr_direction          direction,
     213                                          size_t                byteCount,
     214                                          int                   isPieceData );
     215
     216
    211217
    212218#endif
  • trunk/libtransmission/peer-mgr-private.h

    r7151 r7154  
    2828#include "publish.h" /* tr_publisher_tag */
    2929
     30struct tr_bandwidth;
    3031struct tr_bitfield;
    3132struct tr_peerIo;
    3233struct tr_peermsgs;
    33 struct tr_ratecontrol;
    3434
    3535enum
     
    7272    tr_publisher_tag         msgsTag;
    7373
    74     /* the rate at which pieces are being transferred between client and peer.
    75      * protocol overhead is NOT included; this is only the piece data */
    76     struct tr_ratecontrol  * pieceSpeed[2];
    77 
    78     /* the rate at which all data is being transferred between client and peer. */
    79     struct tr_ratecontrol  * rawSpeed[2];
     74    struct tr_bandwidth    * bandwidth;
    8075}
    8176tr_peer;
  • trunk/libtransmission/peer-mgr.c

    r7148 r7154  
    3434#include "peer-msgs.h"
    3535#include "ptrarray.h"
    36 #include "ratecontrol.h"
    3736#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
    3837#include "torrent.h"
     
    5958   
    6059    /* how frequently to reallocate bandwidth */
    61     BANDWIDTH_PERIOD_MSEC = 333,
     60    BANDWIDTH_PERIOD_MSEC = 200,
    6261
    6362    /* max # of peers to ask fer per torrent per reconnect pulse */
     
    324323
    325324static tr_peer*
    326 peerConstructor( const struct in_addr * in_addr )
     325peerConstructor( tr_torrent * tor, const struct in_addr * in_addr )
    327326{
    328327    tr_peer * p;
     
    330329    p = tr_new0( tr_peer, 1 );
    331330    memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) );
    332     p->pieceSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
    333     p->pieceSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
    334     p->rawSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
    335     p->rawSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
     331    p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth );
    336332    return p;
    337333}
     
    349345    if( peer == NULL )
    350346    {
    351         peer = peerConstructor( in_addr );
     347        peer = peerConstructor( torrent->tor, in_addr );
    352348        tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
    353349    }
     
    371367    tr_free( peer->client );
    372368
    373     tr_rcClose( peer->rawSpeed[TR_CLIENT_TO_PEER] );
    374     tr_rcClose( peer->rawSpeed[TR_PEER_TO_CLIENT] );
    375     tr_rcClose( peer->pieceSpeed[TR_CLIENT_TO_PEER] );
    376     tr_rcClose( peer->pieceSpeed[TR_PEER_TO_CLIENT] );
     369    tr_bandwidthFree( peer->bandwidth );
     370
    377371    tr_free( peer );
    378372}
     
    10181012            const time_t now = time( NULL );
    10191013            tr_torrent * tor = t->tor;
    1020             const tr_direction dir = TR_CLIENT_TO_PEER;
    10211014
    10221015            tor->activityDate = now;
     
    10241017            if( e->wasPieceData )
    10251018                tor->uploadedCur += e->length;
    1026 
    1027             /* add it to the raw upload speed */
    1028             if( peer )
    1029                 tr_rcTransferred ( peer->rawSpeed[dir], e->length );
    1030 
    1031             /* maybe add it to the piece upload speed */
    1032             if( e->wasPieceData ) {
    1033                 if( peer )
    1034                     tr_rcTransferred ( peer->pieceSpeed[dir], e->length );
    1035             }
    1036 
    1037             tr_bandwidthUsed( tor->bandwidth[dir], e->length, e->wasPieceData );
    1038             tr_bandwidthUsed( tor->session->bandwidth[dir], e->length, e->wasPieceData );
    10391019
    10401020            /* update the stats */
     
    10551035            const time_t now = time( NULL );
    10561036            tr_torrent * tor = t->tor;
    1057             const tr_direction dir = TR_PEER_TO_CLIENT;
    10581037
    10591038            tor->activityDate = now;
     
    10681047                tor->downloadedCur += e->length;
    10691048
    1070             /* add it to our raw download speed */
    1071             if( peer )
    1072                 tr_rcTransferred ( peer->rawSpeed[dir], e->length );
    1073 
    1074             /* maybe add it to the piece upload speed */
    1075             if( e->wasPieceData ) {
    1076                 if( peer )
    1077                     tr_rcTransferred ( peer->pieceSpeed[dir], e->length );
    1078             }
    1079 
    1080             tr_bandwidthUsed( tor->bandwidth[dir], e->length, e->wasPieceData );
    1081             tr_bandwidthUsed( tor->session->bandwidth[dir], e->length, e->wasPieceData );
    1082      
    10831049            /* update the stats */
    10841050            if( e->wasPieceData )
     
    13131279                if( !peer_id )
    13141280                    peer->client = NULL;
    1315                 else
    1316                 {
     1281                else {
    13171282                    char client[128];
    13181283                    tr_clientForId( client, sizeof( client ), peer_id );
    13191284                    peer->client = tr_strdup( client );
    13201285                }
     1286
    13211287                peer->port = port;
    13221288                peer->io = io;
    1323                 peer->msgs =
    1324                     tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t,
    1325                                     &peer->msgsTag );
     1289                peer->msgs = tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
     1290                tr_peerIoSetBandwidth( io, peer->bandwidth );
    13261291
    13271292                success = TRUE;
     
    18051770    assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
    18061771
    1807     return tr_rcRate( peer->pieceSpeed[direction] );
     1772    return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction );
    18081773}
    18091774
     
    23632328    int       i, j;
    23642329
    2365     for( i = 0; i < torrentCount; ++i )
     2330    for( i=0; i<torrentCount; ++i )
    23662331    {
    23672332        Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
    2368         for( j = 0; j < tr_ptrArraySize( t->peers ); ++j )
     2333        for( j=0; j<tr_ptrArraySize( t->peers ); ++j )
    23692334        {
    23702335            tr_peer * peer = tr_ptrArrayNth( t->peers, j );
     
    23742339}
    23752340
    2376 static void
    2377 setTorrentBandwidth( Torrent * t,
    2378                      tr_direction dir,
    2379                      tr_bandwidth * bandwidth )
    2380 {
    2381     int i;
    2382     int peerCount = 0;
    2383     tr_peer ** peers = getConnectedPeers( t, &peerCount );
    2384 
    2385     for( i=0; i<peerCount; ++i )
    2386         tr_peerIoSetBandwidth( peers[i]->io, dir, bandwidth );
    2387 
    2388     tr_free( peers );
    2389 }
    2390 
    2391 #if 0
    2392 #define DEBUG_DIRECTION TR_DOWN
    2393 #endif
    2394 
    2395 static double
    2396 bytesPerPulse( double KiB_per_second )
    2397 {
    2398     return KiB_per_second * ( 1024.0 * BANDWIDTH_PERIOD_MSEC / 1000.0 );
    2399 }
    2400 
    2401 /*
    2402  * @param currentSpeed current speed in KiB/s
    2403  * @param desiredSpeed desired speed in KiB/s
    2404  */
    2405 static double
    2406 allocateHowMuch( tr_direction dir UNUSED, double currentSpeed, double desiredSpeed )
    2407 {
    2408     const int pulses_per_history = TR_RATECONTROL_HISTORY_MSEC / BANDWIDTH_PERIOD_MSEC;
    2409     const double current_bytes_per_pulse = bytesPerPulse( currentSpeed );
    2410     const double desired_bytes_per_pulse = bytesPerPulse( desiredSpeed );
    2411     const double min = desired_bytes_per_pulse * 0.90;
    2412     const double max = desired_bytes_per_pulse * 1.50;
    2413     const double next_pulse_bytes = desired_bytes_per_pulse * ( pulses_per_history + 1 )
    2414                                   - ( current_bytes_per_pulse * pulses_per_history );
    2415     double clamped;
    2416 
    2417     /* clamp the return value to lessen oscillation */
    2418     clamped = next_pulse_bytes;
    2419     clamped = MAX( clamped, min );
    2420     clamped = MIN( clamped, max );
    2421 
    2422 #ifdef DEBUG_DIRECTION
    2423 if( dir == DEBUG_DIRECTION )
    2424 fprintf( stderr, "currentSpeed(%5.2f) desiredSpeed(%5.2f), allocating %5.2f (unclamped: %5.2f)\n",
    2425          currentSpeed,
    2426          desiredSpeed,
    2427          clamped/1024.0,
    2428          next_pulse_bytes/1024.0 );
    2429 #endif
    2430 
    2431     return clamped;
    2432 }
    2433 
    2434 /**
    2435  * Allocate bandwidth for each peer connection.
    2436  *
    2437  * @param mgr the peer manager
    2438  * @param direction whether to allocate upload or download bandwidth
    2439  */
    2440 static void
    2441 allocateBandwidth( tr_peerMgr * mgr,
    2442                    tr_direction direction )
    2443 {
    2444     int              i;
    2445     tr_session     * session = mgr->session;
    2446     const int        torrentCount = tr_ptrArraySize( mgr->torrents );
    2447     Torrent       ** torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents );
    2448     tr_bandwidth   * global_pool = session->bandwidth[direction];
    2449 
    2450     assert( mgr );
    2451     assert( direction == TR_UP || direction == TR_DOWN );
    2452 
    2453     /* before allocating bandwidth, pump the connected peers */
     2341static int
     2342bandwidthPulse( void * vmgr )
     2343{
     2344    tr_peerMgr * mgr = vmgr;
     2345    managerLock( mgr );
     2346
    24542347    pumpAllPeers( mgr );
    2455 
    2456     for( i=0; i<torrentCount; ++i )
    2457     {
    2458         Torrent * t = torrents[i];
    2459         tr_speedlimit speedMode;
    2460 
    2461         /* no point in allocating bandwidth for stopped torrents */
    2462         if( tr_torrentGetActivity( t->tor ) == TR_STATUS_STOPPED )
    2463             continue;
    2464 
    2465         /* if piece data is disallowed, don't bother limiting bandwidth --
    2466          * we won't be asking for, or sending out, any pieces */
    2467         if( !tr_torrentIsPieceTransferAllowed( t->tor, direction ) )
    2468             speedMode = TR_SPEEDLIMIT_UNLIMITED;
    2469         else
    2470             speedMode = tr_torrentGetSpeedMode( t->tor, direction );
    2471            
    2472         /* process the torrent's peers based on its speed mode */
    2473         switch( speedMode )
    2474         {
    2475             case TR_SPEEDLIMIT_UNLIMITED:
    2476             {
    2477                 tr_bandwidth * b = t->tor->bandwidth[direction];
    2478                 tr_bandwidthSetUnlimited( b );
    2479                 setTorrentBandwidth( t, direction, b );
    2480                 break;
    2481             }
    2482 
    2483             case TR_SPEEDLIMIT_SINGLE:
    2484             {
    2485                 tr_bandwidth * b = t->tor->bandwidth[direction];
    2486                 const double currentSpeed = tr_bandwidthGetPieceSpeed( b );
    2487                 const double desiredSpeed = tr_torrentGetSpeedLimit( t->tor, direction );
    2488                 const double bytesPerPulse = allocateHowMuch( direction, currentSpeed, desiredSpeed );
    2489 #ifdef DEBUG_DIRECTION
    2490 if( direction == DEBUG_DIRECTION )
    2491 fprintf( stderr, "single: currentSpeed %.0f ... desiredSpeed %.0f ... bytesPerPulse %.0f\n", currentSpeed, desiredSpeed, bytesPerPulse );
    2492 #endif
    2493                 tr_bandwidthSetLimited( b, bytesPerPulse );
    2494                 setTorrentBandwidth( t, direction, b );
    2495                 break;
    2496             }
    2497 
    2498             case TR_SPEEDLIMIT_GLOBAL:
    2499             {
    2500                 setTorrentBandwidth( t, direction, global_pool );
    2501                 break;
    2502             }
    2503         }
    2504     }
    2505 
    2506     /* handle the global pool's connections */
    2507     if( !tr_sessionIsSpeedLimitEnabled( session, direction ) )
    2508         tr_bandwidthSetUnlimited( global_pool );
    2509     else {
    2510         const double currentSpeed = tr_bandwidthGetPieceSpeed( global_pool );
    2511         const double desiredSpeed = tr_sessionGetSpeedLimit( session, direction );
    2512         const double bytesPerPulse = allocateHowMuch( direction, currentSpeed, desiredSpeed );
    2513 #ifdef DEBUG_DIRECTION
    2514 if( direction == DEBUG_DIRECTION )
    2515 fprintf( stderr, "global(%p): currentSpeed %.0f ... desiredSpeed %.0f ... bytesPerPulse %.0f\n", global_pool, currentSpeed, desiredSpeed, bytesPerPulse );
    2516 #endif
    2517         tr_bandwidthSetLimited( session->bandwidth[direction], bytesPerPulse );
    2518     }
    2519 
    2520     /* now that we've allocated bandwidth, pump all the connected peers */
     2348    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
     2349    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
    25212350    pumpAllPeers( mgr );
    2522 }
    2523 
    2524 static int
    2525 bandwidthPulse( void * vmgr )
    2526 {
    2527     tr_peerMgr * mgr = vmgr;
    2528     int          i;
    2529 
    2530     managerLock( mgr );
    2531 
    2532     /* allocate the upload and download bandwidth */
    2533     for( i = 0; i < 2; ++i )
    2534         allocateBandwidth( mgr, i );
    25352351
    25362352    managerUnlock( mgr );
  • trunk/libtransmission/peer-msgs.c

    r7147 r7154  
    2525#include "crypto.h"
    2626#include "inout.h"
     27#include "iobuf.h"
    2728#ifdef WIN32
    2829#include "net.h" /* for ECONN */
     
    13151316
    13161317static int
    1317 readBtPiece( tr_peermsgs *     msgs,
    1318              struct evbuffer * inbuf,
    1319              size_t            inlen )
     1318readBtPiece( tr_peermsgs      * msgs,
     1319             struct evbuffer  * inbuf,
     1320             size_t             inlen,
     1321             size_t           * setme_piece_bytes_read )
    13201322{
    13211323    struct peer_request * req = &msgs->incoming.blockReq;
     
    13501352        evbuffer_add( msgs->incoming.block, buf, n );
    13511353        fireClientGotData( msgs, n, TRUE );
     1354        *setme_piece_bytes_read += n;
    13521355        tr_free( buf );
    13531356        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
     
    16431646
    16441647static ReadState
    1645 canRead( struct bufferevent * evin,
    1646          void *               vmsgs )
     1648canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )
    16471649{
    16481650    ReadState         ret;
    16491651    tr_peermsgs *     msgs = vmsgs;
    1650     struct evbuffer * in = EVBUFFER_INPUT ( evin );
     1652    struct evbuffer * in = tr_iobuf_input( iobuf );
    16511653    const size_t      inlen = EVBUFFER_LENGTH( in );
    16521654
     
    16571659    else if( msgs->state == AWAITING_BT_PIECE )
    16581660    {
    1659         ret = inlen ? readBtPiece( msgs, in, inlen ) : READ_LATER;
     1661        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
    16601662    }
    16611663    else switch( msgs->state )
     
    18211823
    18221824static void
    1823 gotError( struct bufferevent * evbuf UNUSED,
    1824           short                      what,
    1825           void *                    vmsgs )
     1825gotError( struct tr_iobuf  * iobuf UNUSED,
     1826          short              what,
     1827          void             * vmsgs )
    18261828{
    18271829    if( what & EVBUFFER_TIMEOUT )
    1828         dbgmsg( vmsgs, "libevent got a timeout, what=%hd, secs=%d", what,
    1829                 evbuf->timeout_read );
     1830        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
    18301831    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
    18311832        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
  • trunk/libtransmission/session.c

    r7147 r7154  
    275275    /* Initialize rate and file descripts controls */
    276276
    277     h->uploadLimit = uploadLimit;
    278     h->useUploadLimit = useUploadLimit;
    279     h->downloadLimit = downloadLimit;
    280     h->useDownloadLimit = useDownloadLimit;
    281 
    282277    tr_fdInit( globalPeerLimit );
    283278    h->shared = tr_sharedInit( h, isPortForwardingEnabled, publicPort );
    284279    h->isPortSet = publicPort >= 0;
    285280
    286     h->bandwidth[TR_UP] = tr_bandwidthNew( h );
    287     h->bandwidth[TR_DOWN] = tr_bandwidthNew( h );
     281    h->bandwidth = tr_bandwidthNew( h, NULL );
     282    tr_bandwidthSetDesiredSpeed( h->bandwidth, TR_UP, uploadLimit );
     283    tr_bandwidthSetDesiredSpeed( h->bandwidth, TR_DOWN, downloadLimit );
     284    tr_bandwidthSetLimited( h->bandwidth, TR_UP, useUploadLimit );
     285    tr_bandwidthSetLimited( h->bandwidth, TR_DOWN, useDownloadLimit );
    288286
    289287    /* first %s is the application name
     
    444442
    445443void
    446 tr_sessionSetSpeedLimitEnabled( tr_handle *  h,
    447                                 tr_direction direction,
    448                                 int          use_flag )
    449 {
    450     assert( h );
    451     assert( direction == TR_UP || direction == TR_DOWN );
    452 
    453     if( direction == TR_UP )
    454         h->useUploadLimit = use_flag ? 1 : 0;
    455     else
    456         h->useDownloadLimit = use_flag ? 1 : 0;
    457 }
    458 
    459 int
    460 tr_sessionIsSpeedLimitEnabled( const tr_handle * h,
    461                                tr_direction      direction )
    462 {
    463     return direction == TR_UP ? h->useUploadLimit : h->useDownloadLimit;
    464 }
    465 
    466 void
    467 tr_sessionSetSpeedLimit( tr_handle *  h,
    468                          tr_direction direction,
    469                          int          KiB_sec )
    470 {
    471     if( direction == TR_DOWN )
    472         h->downloadLimit = KiB_sec;
    473     else
    474         h->uploadLimit = KiB_sec;
    475 }
    476 
    477 int
    478 tr_sessionGetSpeedLimit( const tr_handle * h,
    479                          tr_direction      direction )
    480 {
    481     return direction == TR_UP ? h->uploadLimit : h->downloadLimit;
     444tr_sessionSetSpeedLimitEnabled( tr_session      * session,
     445                                tr_direction      dir,
     446                                int               isLimited )
     447{
     448    tr_bandwidthSetLimited( session->bandwidth, dir, isLimited );
     449}
     450
     451int
     452tr_sessionIsSpeedLimitEnabled( const tr_session  * session,
     453                               tr_direction        dir )
     454{
     455    return !tr_bandwidthIsLimited( session->bandwidth, dir );
     456}
     457
     458void
     459tr_sessionSetSpeedLimit( tr_session    * session,
     460                         tr_direction    dir,
     461                         int             desiredSpeed )
     462{
     463    tr_bandwidthSetDesiredSpeed( session->bandwidth, dir, desiredSpeed );
     464}
     465
     466int
     467tr_sessionGetSpeedLimit( const tr_session  * session,
     468                         tr_direction        dir )
     469{
     470    return tr_bandwidthGetDesiredSpeed( session->bandwidth, dir );
    482471}
    483472
     
    506495tr_sessionGetPieceSpeed( const tr_session * session, tr_direction dir )
    507496{
    508     assert( dir==TR_UP || dir==TR_DOWN );
    509 
    510     return session ? tr_bandwidthGetPieceSpeed( session->bandwidth[dir] ) : 0.0;
     497    return session ? tr_bandwidthGetPieceSpeed( session->bandwidth, dir ) : 0.0;
    511498}
    512499
     
    514501tr_sessionGetRawSpeed( const tr_session * session, tr_direction dir )
    515502{
    516     assert( dir==TR_UP || dir==TR_DOWN );
    517 
    518     return session ? tr_bandwidthGetPieceSpeed( session->bandwidth[dir] ) : 0.0;
     503    return session ? tr_bandwidthGetPieceSpeed( session->bandwidth, dir ) : 0.0;
    519504}
    520505
     
    630615
    631616    /* free the session memory */
    632     tr_bandwidthFree( session->bandwidth[TR_UP] );
    633     tr_bandwidthFree( session->bandwidth[TR_DOWN] );
     617    tr_bandwidthFree( session->bandwidth );
    634618    tr_lockFree( session->lock );
    635619    for( i = 0; i < session->metainfoLookupCount; ++i )
  • trunk/libtransmission/session.h

    r7151 r7154  
    6363    unsigned int                 isProxyAuthEnabled : 1;
    6464    unsigned int                 isClosed           : 1;
    65     unsigned int                 useUploadLimit     : 1;
    66     unsigned int                 useDownloadLimit   : 1;
    6765    unsigned int                 useLazyBitfield    : 1;
    6866
     
    8886    char *                       proxyUsername;
    8987    char *                       proxyPassword;
    90 
    91     int                          uploadLimit;
    92     int                          downloadLimit;
    9388
    9489    struct tr_list *             blocklists;
     
    117112
    118113    /* monitors the "global pool" speeds */
    119     struct tr_bandwidth       * bandwidth[2];
     114    struct tr_bandwidth       * bandwidth;
    120115};
    121116
  • trunk/libtransmission/torrent.c

    r7147 r7154  
    144144void
    145145tr_torrentSetSpeedMode( tr_torrent *  tor,
    146                         tr_direction  direction,
     146                        tr_direction  dir,
    147147                        tr_speedlimit mode )
    148148{
    149     tr_speedlimit * limit = direction == TR_UP ? &tor->uploadLimitMode
    150                                                : &tor->downloadLimitMode;
    151 
    152     *limit = mode;
     149    assert( tor != NULL );
     150    assert( dir==TR_UP || dir==TR_DOWN );
     151    assert( mode==TR_SPEEDLIMIT_GLOBAL || mode==TR_SPEEDLIMIT_SINGLE || mode==TR_SPEEDLIMIT_UNLIMITED  );
     152
     153    tor->speedLimitMode[dir] = mode;
     154
     155    tr_bandwidthSetLimited( tor->bandwidth, dir, mode==TR_SPEEDLIMIT_SINGLE );
     156    tr_bandwidthHonorParentLimits( tor->bandwidth, dir, mode!=TR_SPEEDLIMIT_UNLIMITED );
    153157}
    154158
    155159tr_speedlimit
    156160tr_torrentGetSpeedMode( const tr_torrent * tor,
    157                         tr_direction       direction )
    158 {
    159     return direction == TR_UP ? tor->uploadLimitMode
    160                               : tor->downloadLimitMode;
     161                        tr_direction       dir )
     162{
     163    assert( tor != NULL );
     164    assert( dir==TR_UP || dir==TR_DOWN );
     165
     166    return tor->speedLimitMode[dir];
    161167}
    162168
    163169void
    164170tr_torrentSetSpeedLimit( tr_torrent * tor,
    165                          tr_direction direction,
    166                          int          single_KiB_sec )
    167 {
    168     switch( direction )
    169     {
    170         case TR_UP:
    171             tor->uploadLimit = single_KiB_sec; break;
    172 
    173         case TR_DOWN:
    174             tor->downloadLimit = single_KiB_sec; break;
    175 
    176         default:
    177             assert( 0 );
    178     }
     171                         tr_direction dir,
     172                         int          desiredSpeed )
     173{
     174    tr_bandwidthSetDesiredSpeed( tor->bandwidth, dir, desiredSpeed );
    179175}
    180176
    181177int
    182178tr_torrentGetSpeedLimit( const tr_torrent * tor,
    183                          tr_direction       direction )
    184 {
    185     switch( direction )
    186     {
    187         case TR_UP:
    188             return tor->uploadLimit;
    189 
    190         case TR_DOWN:
    191             return tor->downloadLimit;
    192 
    193         default:
    194             assert( 0 );
    195     }
     179                         tr_direction       dir )
     180{
     181    return tr_bandwidthGetDesiredSpeed( tor->bandwidth, dir );
    196182}
    197183
     
    498484    randomizeTiers( info );
    499485
    500     tor->bandwidth[TR_UP] = tr_bandwidthNew( h );
    501     tor->bandwidth[TR_DOWN] = tr_bandwidthNew( h );
     486    tor->bandwidth = tr_bandwidthNew( h, h->bandwidth );
     487
     488fprintf( stderr, "torrent [%s] bandwidth is %p\n", info->name, tor->bandwidth );
    502489
    503490    tor->blockSize = getBlockSize( info->pieceSize );
     
    541528    tr_torrentInitFilePieces( tor );
    542529
    543     tor->uploadLimit = 0;
    544     tor->downloadLimit = 0;
    545530    tor->swarmSpeed = tr_rcInit( );
    546531
     
    814799                            s->peersFrom );
    815800
    816     s->rawUploadSpeed     = tr_bandwidthGetRawSpeed  ( tor->bandwidth[TR_UP] );
    817     s->rawDownloadSpeed   = tr_bandwidthGetRawSpeed  ( tor->bandwidth[TR_DOWN] );
    818     s->pieceUploadSpeed   = tr_bandwidthGetPieceSpeed( tor->bandwidth[TR_UP] );
    819     s->pieceDownloadSpeed = tr_bandwidthGetPieceSpeed( tor->bandwidth[TR_DOWN] );
     801    s->rawUploadSpeed     = tr_bandwidthGetRawSpeed  ( tor->bandwidth, TR_UP );
     802    s->rawDownloadSpeed   = tr_bandwidthGetRawSpeed  ( tor->bandwidth, TR_DOWN );
     803    s->pieceUploadSpeed   = tr_bandwidthGetPieceSpeed( tor->bandwidth, TR_UP );
     804    s->pieceDownloadSpeed = tr_bandwidthGetPieceSpeed( tor->bandwidth, TR_DOWN );
    820805
    821806    usableSeeds += tor->info.webseedCount;
     
    11001085    h->torrentCount--;
    11011086
    1102     tr_bandwidthFree( tor->bandwidth[TR_DOWN] );
    1103     tr_bandwidthFree( tor->bandwidth[TR_UP] );
     1087    tr_bandwidthFree( tor->bandwidth );
    11041088
    11051089    tr_metainfoFree( inf );
  • trunk/libtransmission/torrent.h

    r7151 r7154  
    173173    tr_info                  info;
    174174
    175     int                      uploadLimit;
    176     tr_speedlimit            uploadLimitMode;
    177     int                      downloadLimit;
    178     tr_speedlimit            downloadLimitMode;
     175    tr_speedlimit            speedLimitMode[2];
    179176
    180177    struct tr_ratecontrol *  swarmSpeed;
     
    235232    int                        uniqueId;
    236233
    237     struct tr_bandwidth      * bandwidth[2];
     234    struct tr_bandwidth      * bandwidth;
    238235};
    239236
Note: See TracChangeset for help on using the changeset viewer.