Changeset 7154
- Timestamp:
- Nov 25, 2008, 9:35:17 PM (12 years ago)
- Location:
- trunk/libtransmission
- Files:
-
- 2 added
- 14 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/libtransmission/Makefile.am
r7147 r7154 25 25 handshake.c \ 26 26 inout.c \ 27 iobuf.c \ 27 28 json.c \ 28 29 JSON_parser.c \ … … 70 71 handshake.h \ 71 72 inout.h \ 73 iobuf.h \ 72 74 json.h \ 73 75 JSON_parser.h \ -
trunk/libtransmission/bandwidth.c
r7148 r7154 11 11 */ 12 12 13 #include <assert.h> 13 14 #include <limits.h> 15 16 #include "event.h" 17 14 18 #include "transmission.h" 15 19 #include "bandwidth.h" 20 #include "iobuf.h" 21 #include "ptrarray.h" 16 22 #include "utils.h" 17 23 … … 25 31 INTERVAL_MSEC = HISTORY_MSEC, 26 32 GRANULARITY_MSEC = 250, 27 HISTORY_SIZE = ( INTERVAL_MSEC / GRANULARITY_MSEC ) 33 HISTORY_SIZE = ( INTERVAL_MSEC / GRANULARITY_MSEC ), 34 MAGIC_NUMBER = 43143 28 35 }; 29 36 … … 76 83 ******/ 77 84 78 struct tr_band width85 struct tr_band 79 86 { 80 87 unsigned int isLimited : 1; 81 88 unsigned int honorParentLimits : 1; 82 89 size_t bytesLeft; 83 90 double desiredSpeed; 84 91 struct bratecontrol raw; 85 92 struct bratecontrol piece; 86 93 }; 94 95 struct tr_bandwidth 96 { 97 struct tr_band band[2]; 98 struct tr_bandwidth * parent; 99 int magicNumber; 87 100 tr_session * session; 101 tr_ptrArray * children; /* struct tr_bandwidth */ 102 tr_ptrArray * iobufs; /* struct tr_iobuf */ 88 103 }; 89 104 … … 92 107 ***/ 93 108 109 static int 110 comparePointers( const void * a, const void * b ) 111 { 112 return a - b; 113 } 114 115 static int 116 isBandwidth( const tr_bandwidth * b ) 117 { 118 return ( b != NULL ) && ( b->magicNumber == MAGIC_NUMBER ); 119 } 120 121 static int 122 isDirection( const tr_direction dir ) 123 { 124 return ( dir == TR_UP ) || ( dir == TR_DOWN ); 125 } 126 127 /*** 128 **** 129 ***/ 130 94 131 tr_bandwidth* 95 tr_bandwidthNew( tr_session * session )132 tr_bandwidthNew( tr_session * session, tr_bandwidth * parent ) 96 133 { 97 134 tr_bandwidth * b = tr_new0( tr_bandwidth, 1 ); 98 135 b->session = session; 136 b->children = tr_ptrArrayNew( ); 137 b->iobufs = tr_ptrArrayNew( ); 138 b->magicNumber = MAGIC_NUMBER; 139 b->band[TR_UP].honorParentLimits = 1; 140 b->band[TR_DOWN].honorParentLimits = 1; 141 tr_bandwidthSetParent( b, parent ); 99 142 return b; 100 143 } … … 103 146 tr_bandwidthFree( tr_bandwidth * b ) 104 147 { 148 assert( isBandwidth( b ) ); 149 150 tr_bandwidthSetParent( b, NULL ); 151 tr_ptrArrayFree( b->iobufs, NULL ); 152 tr_ptrArrayFree( b->children, NULL ); 153 b->magicNumber = 0xDEAD; 105 154 tr_free( b ); 106 155 } … … 111 160 112 161 void 162 tr_bandwidthSetParent( tr_bandwidth * b, 163 tr_bandwidth * parent ) 164 { 165 assert( isBandwidth( b ) ); 166 assert( b != parent ); 167 168 if( b->parent ) 169 { 170 assert( isBandwidth( b->parent ) ); 171 172 tr_ptrArrayRemoveSorted( b->parent->children, b, comparePointers ); 173 b->parent= NULL; 174 } 175 176 if( parent ) 177 { 178 assert( isBandwidth( parent ) ); 179 assert( parent->parent != b ); 180 181 tr_ptrArrayInsertSorted( parent->children, b, comparePointers ); 182 b->parent = parent; 183 } 184 } 185 186 void 187 tr_bandwidthHonorParentLimits( tr_bandwidth * b, 188 tr_direction dir, 189 int honorParentLimits ) 190 { 191 assert( isBandwidth( b ) ); 192 assert( isDirection( dir ) ); 193 194 b->band[dir].honorParentLimits = honorParentLimits != 0; 195 } 196 197 /*** 198 **** 199 ***/ 200 201 void 202 tr_bandwidthSetDesiredSpeed( tr_bandwidth * b, 203 tr_direction dir, 204 double desiredSpeed ) 205 { 206 assert( isBandwidth( b ) ); 207 assert( isDirection( dir ) ); 208 209 b->band[dir].desiredSpeed = desiredSpeed; 210 } 211 212 double 213 tr_bandwidthGetDesiredSpeed( const tr_bandwidth * b, 214 tr_direction dir ) 215 { 216 assert( isBandwidth( b ) ); 217 assert( isDirection( dir ) ); 218 219 return b->band[dir].desiredSpeed; 220 } 221 222 void 113 223 tr_bandwidthSetLimited( tr_bandwidth * b, 114 size_t bytesLeft ) 115 { 116 b->isLimited = 1; 117 b->bytesLeft = bytesLeft; 118 } 119 120 void 121 tr_bandwidthSetUnlimited( tr_bandwidth * b ) 122 { 123 b->isLimited = 0; 124 } 224 tr_direction dir, 225 int isLimited ) 226 { 227 assert( isBandwidth( b ) ); 228 assert( isDirection( dir ) ); 229 230 b->band[dir].isLimited = isLimited != 0; 231 } 232 233 int 234 tr_bandwidthIsLimited( const tr_bandwidth * b, 235 tr_direction dir ) 236 { 237 assert( isBandwidth( b ) ); 238 assert( isDirection( dir ) ); 239 240 return b->band[dir].isLimited != 0; 241 } 242 243 #if 0 244 #define DEBUG_DIRECTION TR_DOWN 245 #endif 246 247 void 248 tr_bandwidthAllocate( tr_bandwidth * b, 249 tr_direction dir, 250 int period_msec ) 251 { 252 const double currentSpeed = tr_bandwidthGetPieceSpeed( b, dir ); /* KiB/s */ 253 const double desiredSpeed = b->band[dir].desiredSpeed; /* KiB/s */ 254 const double seconds_per_pulse = period_msec / 1000.0; 255 const double current_bytes_per_pulse = currentSpeed * 1024.0 * seconds_per_pulse; 256 const double desired_bytes_per_pulse = desiredSpeed * 1024.0 * seconds_per_pulse; 257 const double pulses_per_history = (double)HISTORY_MSEC / period_msec; 258 const double min = desired_bytes_per_pulse * 0.90; 259 const double max = desired_bytes_per_pulse * 1.50; 260 const double next_pulse_bytes = desired_bytes_per_pulse * ( pulses_per_history + 1 ) 261 - ( current_bytes_per_pulse * pulses_per_history ); 262 double clamped; 263 264 /* clamp the return value to lessen oscillation */ 265 clamped = next_pulse_bytes; 266 clamped = MAX( clamped, min ); 267 clamped = MIN( clamped, max ); 268 b->band[dir].bytesLeft = clamped; 269 270 #ifdef DEBUG_DIRECTION 271 if( dir == DEBUG_DIRECTION ) 272 fprintf( stderr, "bandwidth %p currentSpeed(%5.2f) desiredSpeed(%5.2f), allocating %5.2f (unclamped: %5.2f)\n", 273 b, currentSpeed, desiredSpeed, 274 clamped/1024.0, next_pulse_bytes/1024.0 ); 275 #endif 276 277 /* notify the io buffers that there's more bandwidth available */ 278 if( !b->band[dir].isLimited || ( clamped > 0 ) ) { 279 int i, n=0; 280 short what = dir==TR_UP ? EV_WRITE : EV_READ; 281 struct tr_iobuf ** iobufs = (struct tr_iobuf**) tr_ptrArrayPeek( b->iobufs, &n ); 282 #ifdef DEBUG_DIRECTION 283 if( dir == DEBUG_DIRECTION ) 284 fprintf( stderr, "bandwidth %p has %d iobufs\n", b, n ); 285 #endif 286 for( i=0; i<n; ++i ) 287 tr_iobuf_enable( iobufs[i], what ); 288 } 289 290 /* all children should reallocate too */ 291 if( 1 ) { 292 int i, n=0; 293 struct tr_bandwidth ** children = (struct tr_bandwidth**) tr_ptrArrayPeek( b->children, &n ); 294 for( i=0; i<n; ++i ) 295 tr_bandwidthAllocate( children[i], dir, period_msec ); 296 } 297 } 298 299 /*** 300 **** 301 ***/ 302 303 void 304 tr_bandwidthAddBuffer( tr_bandwidth * b, 305 struct tr_iobuf * iobuf ) 306 { 307 assert( isBandwidth( b ) ); 308 assert( iobuf ); 309 310 tr_ptrArrayInsertSorted( b->iobufs, iobuf, comparePointers ); 311 } 312 313 void 314 tr_bandwidthRemoveBuffer( tr_bandwidth * b, 315 struct tr_iobuf * iobuf ) 316 { 317 assert( isBandwidth( b ) ); 318 assert( iobuf ); 319 320 tr_ptrArrayRemoveSorted( b->iobufs, iobuf, comparePointers ); 321 } 322 323 /*** 324 **** 325 ***/ 125 326 126 327 size_t 127 328 tr_bandwidthClamp( const tr_bandwidth * b, 329 tr_direction dir, 128 330 size_t byteCount ) 129 331 { 130 /* const size_t n = byteCount; */ 131 132 if( b && b->isLimited ) 133 byteCount = MIN( byteCount, b->bytesLeft ); 134 135 /* if( n != byteCount ) fprintf( stderr, "%p: %zu clamped to %zu\n", b, n, byteCount ); */ 332 assert( isBandwidth( b ) ); 333 assert( isDirection( dir ) ); 334 335 if( b ) 336 { 337 if( b->band[dir].isLimited ) 338 byteCount = MIN( byteCount, b->band[dir].bytesLeft ); 339 340 if( b->parent && b->band[dir].honorParentLimits ) 341 byteCount = tr_bandwidthClamp( b->parent, dir, byteCount ); 342 } 343 136 344 return byteCount; 137 345 } 138 346 139 /***140 ****141 ***/142 143 347 double 144 tr_bandwidthGetRawSpeed( const tr_bandwidth * b ) 145 { 146 return getSpeed( &b->raw ); 348 tr_bandwidthGetRawSpeed( const tr_bandwidth * b, tr_direction dir ) 349 { 350 assert( isBandwidth( b ) ); 351 assert( isDirection( dir ) ); 352 353 return getSpeed( &b->band[dir].raw ); 147 354 } 148 355 149 356 double 150 tr_bandwidthGetPieceSpeed( const tr_bandwidth * b UNUSED ) 151 { 152 return getSpeed( &b->piece ); 357 tr_bandwidthGetPieceSpeed( const tr_bandwidth * b, tr_direction dir ) 358 { 359 assert( isBandwidth( b ) ); 360 assert( isDirection( dir ) ); 361 362 return getSpeed( &b->band[dir].piece ); 153 363 } 154 364 155 365 void 156 366 tr_bandwidthUsed( tr_bandwidth * b, 367 tr_direction dir, 157 368 size_t byteCount, 158 369 int isPieceData ) 159 370 { 160 if( b->isLimited && isPieceData ) 161 { 162 b->bytesLeft -= MIN( b->bytesLeft, byteCount ); 163 /* fprintf( stderr, "%p used %zu bytes ... %zu left\n", b, byteCount, b->bytesLeft ); */ 164 } 165 166 bytesUsed( &b->raw, byteCount ); 371 struct tr_band * band; 372 373 assert( isBandwidth( b ) ); 374 assert( isDirection( dir ) ); 375 376 band = &b->band[dir]; 377 378 if( band->isLimited && isPieceData ) 379 band->bytesLeft -= MIN( band->bytesLeft, byteCount ); 380 381 #ifdef DEBUG_DIRECTION 382 if( ( dir == DEBUG_DIRECTION ) && band->isLimited && isPieceData ) 383 fprintf( stderr, "%p consumed %zu bytes of piece data... %zu left\n", b, byteCount, band->bytesLeft ); 384 #endif 385 386 bytesUsed( &band->raw, byteCount ); 167 387 168 388 if( isPieceData ) 169 bytesUsed( &b->piece, byteCount ); 170 } 389 bytesUsed( &band->piece, byteCount ); 390 391 if( b->parent != NULL ) 392 tr_bandwidthUsed( b->parent, dir, byteCount, isPieceData ); 393 } -
trunk/libtransmission/bandwidth.h
r7151 r7154 18 18 #define TR_BANDWIDTH_H 19 19 20 struct tr_iobuf; 21 22 /** 23 * Bandwidth is an object for measuring and constraining bandwidth speeds. 24 * 25 * Bandwidth objects can be "stacked" so that a peer can be made to obey 26 * multiple constraints (for example, obeying the global speed limit and a 27 * per-torrent speed limit). 28 * 29 * HIERARCHY 30 * 31 * Transmission's bandwidth hierarchy is a tree. 32 * At the top is the global bandwidth object owned by tr_session. 33 * Its children are per-torrent bandwidth objects owned by tr_torrent. 34 * Underneath those are per-peer bandwidth objects owned by tr_peer. 35 * 36 * tr_session also owns a tr_handshake's bandwidths, so that the handshake 37 * I/O can be counted in the global raw totals. When the handshake is done, 38 * the bandwidth's ownership passes to a tr_peer. 39 * 40 * MEASURING 41 * 42 * When you ask a bandwidth object for its speed, it gives the speed of the 43 * subtree underneath it as well. So you can get Transmission's overall 44 * speed by quering tr_session's bandwidth, per-torrent speeds by asking 45 * tr_torrent's bandwidth, and per-peer speeds by asking tr_peer's bandwidth. 46 * 47 * CONSTRAINING 48 * 49 * Call tr_bandwidthAllocate() periodically. tr_bandwidth knows its current 50 * speed and will decide how many bytes to make available over the 51 * user-specified period to reach the user-specified desired speed. 52 * If appropriate, it notifies its iobufs that new bandwidth is available. 53 * 54 * tr_bandwidthAllocate() operates on the tr_bandwidth subtree, so usually 55 * you'll only need to invoke it for the top-level tr_session bandwidth. 56 * 57 * The iobufs all have a pointer to their associated tr_bandwidth object, 58 * and call tr_bandwidthClamp() before performing I/O to see how much 59 * bandwidth they can safely use. 60 */ 20 61 typedef struct tr_bandwidth tr_bandwidth; 21 62 … … 24 65 **/ 25 66 26 tr_bandwidth* tr_bandwidthNew ( tr_session * session ); 67 /** @brief create a new tr_bandwidth object */ 68 tr_bandwidth* 69 tr_bandwidthNew ( tr_session * session, 70 tr_bandwidth * parent ); 27 71 28 void tr_bandwidthFree ( tr_bandwidth * bandwidth ); 72 /** @brief destroy a tr_bandwidth object */ 73 void tr_bandwidthFree ( tr_bandwidth * bandwidth ); 74 75 /****** 76 ******* 77 ******/ 29 78 30 79 /** 31 *** 32 **/ 33 34 void tr_bandwidthSetLimited ( tr_bandwidth * bandwidth, 35 size_t byteCount ); 36 37 void tr_bandwidthSetUnlimited ( tr_bandwidth * bandwidth ); 38 39 size_t tr_bandwidthClamp ( const tr_bandwidth * bandwidth, 40 size_t byteCount ); 80 * @brief Set the desired speed (in KiB/s) for this bandwidth subtree. 81 * @see tr_bandwidthAllocate 82 * @see tr_bandwidthGetDesiredSpeed 83 */ 84 void tr_bandwidthSetDesiredSpeed ( tr_bandwidth * bandwidth, 85 tr_direction direction, 86 double desiredSpeed ); 41 87 42 88 /** 43 *** 44 **/ 89 * @brief Get the desired speed (in KiB/s) for ths bandwidth subtree. 90 * @see tr_bandwidthSetDesiredSpeed 91 */ 92 double tr_bandwidthGetDesiredSpeed ( const tr_bandwidth * bandwidth, 93 tr_direction direction ); 45 94 46 double tr_bandwidthGetRawSpeed ( const tr_bandwidth * bandwidth ); 95 /** 96 * @brief Set whether or not this bandwidth should throttle its iobufs' speeds 97 */ 98 void tr_bandwidthSetLimited ( tr_bandwidth * bandwidth, 99 tr_direction direction, 100 int isLimited ); 47 101 48 double tr_bandwidthGetPieceSpeed ( const tr_bandwidth * bandwidth ); 102 /** 103 * @return nonzero if this bandwidth throttles its iobufs' speeds 104 */ 105 int tr_bandwidthIsLimited ( const tr_bandwidth * bandwidth, 106 tr_direction direction ); 49 107 50 void tr_bandwidthUsed ( tr_bandwidth * bandwidth, 51 size_t byteCount, 52 int isPieceData ); 108 /** 109 * @brief allocate the next period_msec's worth of bandwidth for the iobufs to consume 110 */ 111 void tr_bandwidthAllocate ( tr_bandwidth * bandwidth, 112 tr_direction direction, 113 int period_msec ); 114 115 /** 116 * @brief clamps byteCount down to a number that this bandwidth will allow to be consumed 117 */ 118 size_t tr_bandwidthClamp ( const tr_bandwidth * bandwidth, 119 tr_direction direction, 120 size_t byteCount ); 121 122 /****** 123 ******* 124 ******/ 125 126 /** 127 * @brief Get the raw total of bytes read or sent by this bandwidth subtree. 128 */ 129 double tr_bandwidthGetRawSpeed ( const tr_bandwidth * bandwidth, 130 tr_direction direction ); 131 132 /** 133 * @brief Get the number of piece data bytes read or sent by this bandwidth subtree. 134 */ 135 double tr_bandwidthGetPieceSpeed ( const tr_bandwidth * bandwidth, 136 tr_direction direction ); 137 138 /** 139 * @brief Notify the bandwidth object that some of its allocated bandwidth has been consumed. 140 * This is is usually invoked by the iobuf after a read or write. 141 */ 142 void tr_bandwidthUsed ( tr_bandwidth * bandwidth, 143 tr_direction direction, 144 size_t byteCount, 145 int isPieceData ); 146 147 /****** 148 ******* 149 ******/ 150 151 void tr_bandwidthSetParent ( tr_bandwidth * bandwidth, 152 tr_bandwidth * parent ); 153 154 /** 155 * Almost all the time we do want to honor a parents' bandwidth cap, so that 156 * (for example) a peer is constrained by a per-torrent cap and the global cap. 157 * But when we set a torrent's speed mode to TR_SPEEDLIMIT_UNLIMITED, then 158 * in that particular case we want to ignore the global speed limit... 159 */ 160 void tr_bandwidthHonorParentLimits ( tr_bandwidth * bandwidth, 161 tr_direction direction, 162 int isEnabled ); 163 164 /****** 165 ******* 166 ******/ 167 168 /** 169 * @brief add an iobuf to this bandwidth's list of iobufs. 170 * They will be notified when more bandwidth is made available for them to consume. 171 */ 172 void tr_bandwidthAddBuffer ( tr_bandwidth * bandwidth, 173 struct tr_iobuf * iobuf ); 174 175 /** 176 * @brief remove an iobuf from this bandwidth's list of iobufs. 177 */ 178 void tr_bandwidthRemoveBuffer ( tr_bandwidth * bandwidth, 179 struct tr_iobuf * iobuf ); 53 180 54 181 #endif -
trunk/libtransmission/handshake.c
r7152 r7154 25 25 #include "crypto.h" 26 26 #include "handshake.h" 27 #include "iobuf.h" 27 28 #include "peer-io.h" 28 29 #include "peer-mgr.h" … … 1029 1030 1030 1031 static ReadState 1031 canRead( struct bufferevent * evin, 1032 void * arg ) 1033 { 1034 tr_handshake * handshake = (tr_handshake *) arg; 1035 struct evbuffer * inbuf = EVBUFFER_INPUT ( evin ); 1032 canRead( struct tr_iobuf * iobuf, void * arg, size_t * piece ) 1033 { 1034 tr_handshake * handshake = arg; 1035 struct evbuffer * inbuf = tr_iobuf_input( iobuf ); 1036 1036 ReadState ret; 1037 1037 int readyForMore = TRUE; 1038 1039 /* no piece data in handshake */ 1040 *piece = 0; 1038 1041 1039 1042 dbgmsg( handshake, "handling canRead; state is [%s]", … … 1137 1140 1138 1141 static void 1139 gotError( struct bufferevent * evbuf UNUSED,1140 short 1141 void *arg )1142 gotError( struct tr_iobuf * iobuf UNUSED, 1143 short what, 1144 void * arg ) 1142 1145 { 1143 1146 tr_handshake * handshake = (tr_handshake *) arg; … … 1180 1183 tr_handshake * handshake; 1181 1184 1182 tr_peerIoSetBandwidth( io, TR_UP, NULL );1183 tr_peerIoSetBandwidth( io, TR_DOWN, NULL );1184 1185 1185 handshake = tr_new0( tr_handshake, 1 ); 1186 1186 handshake->io = io; -
trunk/libtransmission/net.c
r7152 r7154 126 126 setSndBuf( tr_session * session, int fd ) 127 127 { 128 #if 0129 128 if( fd >= 0 ) 130 129 { … … 134 133 setsockopt( fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof( rcvbuf ) ); 135 134 } 136 #endif137 135 } 138 136 -
trunk/libtransmission/peer-io.c
r7147 r7154 29 29 #include "bandwidth.h" 30 30 #include "crypto.h" 31 #include "iobuf.h" 31 32 #include "list.h" 32 33 #include "net.h" … … 35 36 #include "utils.h" 36 37 38 #define MAGIC_NUMBER 206745 37 39 #define IO_TIMEOUT_SECS 8 38 40 … … 79 81 struct tr_peerIo 80 82 { 81 unsigned int isEncrypted : 1; 82 unsigned int isIncoming : 1; 83 unsigned int peerIdIsSet : 1; 84 unsigned int extendedProtocolSupported : 1; 85 unsigned int fastPeersSupported : 1; 86 87 uint8_t encryptionMode; 88 uint8_t timeout; 89 uint16_t port; 90 int socket; 91 92 uint8_t peerId[20]; 93 time_t timeCreated; 94 95 tr_session * session; 96 97 struct in_addr in_addr; 98 struct bufferevent * bufev; 99 struct evbuffer * output; 100 tr_list * output_datatypes; /* struct tr_datatype */ 101 102 tr_can_read_cb canRead; 103 tr_did_write_cb didWrite; 104 tr_net_error_cb gotError; 105 void * userData; 106 107 size_t bufferSize[2]; 108 109 tr_bandwidth * bandwidth[2]; 110 tr_crypto * crypto; 83 unsigned int isEncrypted : 1; 84 unsigned int isIncoming : 1; 85 unsigned int peerIdIsSet : 1; 86 unsigned int extendedProtocolSupported : 1; 87 unsigned int fastPeersSupported : 1; 88 89 int magicNumber; 90 91 uint8_t encryptionMode; 92 uint8_t timeout; 93 uint16_t port; 94 int socket; 95 96 uint8_t peerId[20]; 97 time_t timeCreated; 98 99 tr_session * session; 100 101 struct in_addr in_addr; 102 struct tr_iobuf * iobuf; 103 tr_list * output_datatypes; /* struct tr_datatype */ 104 105 tr_can_read_cb canRead; 106 tr_did_write_cb didWrite; 107 tr_net_error_cb gotError; 108 void * userData; 109 110 size_t bufferSize[2]; 111 112 tr_bandwidth * bandwidth; 113 tr_crypto * crypto; 111 114 }; 112 113 /**114 ***115 **/116 117 static void118 adjustOutputBuffer( tr_peerIo * io )119 {120 struct evbuffer * live = EVBUFFER_OUTPUT( io->bufev );121 size_t curLive = EVBUFFER_LENGTH( live );122 size_t maxLive = tr_bandwidthClamp( io->bandwidth[TR_UP], io->session->so_sndbuf );123 124 if( ( curLive < maxLive ) && EVBUFFER_LENGTH( io->output ) )125 {126 size_t freeSpace = maxLive - curLive;127 size_t n = MIN( freeSpace, EVBUFFER_LENGTH( io->output ) );128 bufferevent_write( io->bufev, EVBUFFER_DATA( io->output ), n );129 evbuffer_drain( io->output, n );130 curLive += n;131 }132 133 io->bufferSize[TR_UP] = curLive;134 135 if( curLive )136 bufferevent_enable( io->bufev, EV_WRITE );137 138 dbgmsg( io, "after adjusting the output buffer, its size is now %zu", curLive );139 }140 141 static void142 adjustInputBuffer( tr_peerIo * io )143 {144 /* FIXME: the max read size probably needs to vary depending on the145 * number of peers that we have connected... 1024 is going to force146 * us way over the limit when there are lots of peers */147 static const int maxBufSize = 1024;148 const size_t n = tr_bandwidthClamp( io->bandwidth[TR_DOWN], maxBufSize );149 150 if( !n )151 {152 dbgmsg( io, "disabling reads because we've hit our limit" );153 bufferevent_disable( io->bufev, EV_READ );154 }155 else156 {157 dbgmsg( io, "enabling reading of %zu more bytes", n );158 bufferevent_setwatermark( io->bufev, EV_READ, 0, n );159 bufferevent_enable( io->bufev, EV_READ );160 }161 162 io->bufferSize[TR_DOWN] = EVBUFFER_LENGTH( EVBUFFER_INPUT( io->bufev ) );163 }164 115 165 116 /*** … … 168 119 169 120 static void 170 didWriteWrapper( struct bufferevent * e, 171 void * vio ) 121 didWriteWrapper( struct tr_iobuf * iobuf, 122 size_t bytes_transferred, 123 void * vio ) 172 124 { 173 125 tr_peerIo * io = vio; 174 const size_t len = EVBUFFER_LENGTH( EVBUFFER_OUTPUT( e ) ); 175 176 dbgmsg( io, "didWrite... io->outputBufferSize was %zu, is now %zu", 177 io->bufferSize[TR_UP], len ); 178 179 if( len < io->bufferSize[TR_UP] ) 126 127 while( bytes_transferred ) 180 128 { 181 size_t payload = io->bufferSize[TR_UP] - len; 182 183 while( payload ) 184 { 185 struct tr_datatype * next = io->output_datatypes->data; 186 const size_t chunk_length = MIN( next->length, payload ); 187 const size_t n = addPacketOverhead( chunk_length ); 188 189 if( io->didWrite ) 190 io->didWrite( io, n, next->isPieceData, io->userData ); 191 192 payload -= chunk_length; 193 next->length -= chunk_length; 194 if( !next->length ) 195 tr_free( tr_list_pop_front( &io->output_datatypes ) ); 196 } 129 struct tr_datatype * next = io->output_datatypes->data; 130 const size_t chunk_length = MIN( next->length, bytes_transferred ); 131 const size_t n = addPacketOverhead( chunk_length ); 132 133 tr_bandwidthUsed( io->bandwidth, TR_UP, n, next->isPieceData ); 134 135 if( io->didWrite ) 136 io->didWrite( io, n, next->isPieceData, io->userData ); 137 138 bytes_transferred -= chunk_length; 139 next->length -= chunk_length; 140 if( !next->length ) 141 tr_free( tr_list_pop_front( &io->output_datatypes ) ); 197 142 } 198 143 199 adjustOutputBuffer( io );200 144 if( EVBUFFER_LENGTH( tr_iobuf_output( iobuf ) ) ) 145 tr_iobuf_enable( io->iobuf, EV_WRITE ); 201 146 } 202 147 203 148 static void 204 canReadWrapper( struct bufferevent * e, 205 void * vio ) 149 canReadWrapper( struct tr_iobuf * iobuf, 150 size_t bytes_transferred UNUSED, 151 void * vio ) 206 152 { 207 153 int done = 0; … … 219 165 while( !done && !err ) 220 166 { 221 const int ret = io->canRead( e, io->userData ); 167 size_t piece = 0; 168 const size_t oldLen = EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) ); 169 const int ret = io->canRead( iobuf, io->userData, &piece ); 170 171 if( ret != err ) 172 { 173 const size_t used = oldLen - EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) ); 174 if( piece ) 175 tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE ); 176 if( used != piece ) 177 tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE ); 178 } 222 179 223 180 switch( ret ) 224 181 { 225 182 case READ_NOW: 226 if( EVBUFFER_LENGTH( e->input ))183 if( EVBUFFER_LENGTH( tr_iobuf_input( iobuf ))) 227 184 continue; 228 185 done = 1; … … 241 198 tr_globalUnlock( session ); 242 199 } 243 244 if( !err )245 adjustInputBuffer( io );246 200 } 247 201 248 202 static void 249 gotErrorWrapper( struct bufferevent * e,250 short 251 void *userData )203 gotErrorWrapper( struct tr_iobuf * iobuf, 204 short what, 205 void * userData ) 252 206 { 253 207 tr_peerIo * c = userData; 254 208 255 209 if( c->gotError ) 256 c->gotError( e, what, c->userData );210 c->gotError( iobuf, what, c->userData ); 257 211 } 258 212 … … 264 218 bufevNew( tr_peerIo * io ) 265 219 { 266 io->bufev = bufferevent_new( io->socket, 267 canReadWrapper, 268 didWriteWrapper, 269 gotErrorWrapper, 270 io ); 271 272 /* tell libevent to call didWriteWrapper after every write, 273 * not just when the write buffer is empty */ 274 bufferevent_setwatermark( io->bufev, EV_WRITE, INT_MAX, 0 ); 275 276 bufferevent_settimeout( io->bufev, io->timeout, io->timeout ); 277 278 bufferevent_enable( io->bufev, EV_READ | EV_WRITE ); 220 io->iobuf = tr_iobuf_new( io->session, 221 io->bandwidth, 222 io->socket, 223 EV_READ | EV_WRITE, 224 canReadWrapper, 225 didWriteWrapper, 226 gotErrorWrapper, 227 io ); 228 229 tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout ); 230 } 231 232 static int 233 isPeerIo( const tr_peerIo * io ) 234 { 235 return ( io != NULL ) && ( io->magicNumber == MAGIC_NUMBER ); 279 236 } 280 237 … … 293 250 294 251 io = tr_new0( tr_peerIo, 1 ); 252 io->magicNumber = MAGIC_NUMBER; 295 253 io->crypto = tr_cryptoNew( torrentHash, isIncoming ); 296 254 io->session = session; … … 301 259 io->timeout = IO_TIMEOUT_SECS; 302 260 io->timeCreated = time( NULL ); 303 io->output = evbuffer_new( );304 261 bufevNew( io ); 262 tr_peerIoSetBandwidth( io, session->bandwidth ); 305 263 return io; 306 264 } … … 346 304 tr_peerIo * io = vio; 347 305 348 evbuffer_free( io->output);349 bufferevent_free( io->bufev);306 tr_peerIoSetBandwidth( io, NULL ); 307 tr_iobuf_free( io->iobuf ); 350 308 tr_netClose( io->socket ); 351 309 tr_cryptoFree( io->crypto ); 352 310 tr_list_free( &io->output_datatypes, tr_free ); 311 312 io->magicNumber = 0xDEAD; 353 313 tr_free( io ); 354 314 } … … 369 329 tr_peerIoGetSession( tr_peerIo * io ) 370 330 { 371 assert( i o);331 assert( isPeerIo( io ) ); 372 332 assert( io->session ); 373 333 … … 379 339 uint16_t * port ) 380 340 { 381 assert( i o);341 assert( isPeerIo( io ) ); 382 342 383 343 if( port ) … … 407 367 tr_peerIoTryRead( tr_peerIo * io ) 408 368 { 409 if( EVBUFFER_LENGTH( io->bufev->input ))410 canReadWrapper( io->bufev, io );369 if( EVBUFFER_LENGTH( tr_iobuf_input( io->iobuf ))) 370 (*canReadWrapper)( io->iobuf, ~0, io ); 411 371 } 412 372 … … 444 404 if( io->socket >= 0 ) 445 405 { 406 tr_bandwidth * bandwidth = io->bandwidth; 407 tr_peerIoSetBandwidth( io, NULL ); 408 446 409 tr_netSetTOS( io->socket, io->session->peerSocketTOS ); 447 448 bufferevent_free( io->bufev ); 410 tr_iobuf_free( io->iobuf ); 449 411 bufevNew( io ); 412 413 tr_peerIoSetBandwidth( io, bandwidth ); 450 414 return 0; 451 415 } … … 459 423 { 460 424 io->timeout = secs; 461 bufferevent_settimeout( io->bufev, io->timeout, io->timeout );462 bufferevent_enable( io->bufev, EV_READ | EV_WRITE );425 tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout ); 426 tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE ); 463 427 } 464 428 … … 471 435 const uint8_t * hash ) 472 436 { 473 assert( i o);437 assert( isPeerIo( io ) ); 474 438 475 439 tr_cryptoSetTorrentHash( io->crypto, hash ); … … 479 443 tr_peerIoGetTorrentHash( tr_peerIo * io ) 480 444 { 481 assert( i o);445 assert( isPeerIo( io ) ); 482 446 assert( io->crypto ); 483 447 … … 488 452 tr_peerIoHasTorrentHash( const tr_peerIo * io ) 489 453 { 490 assert( i o);454 assert( isPeerIo( io ) ); 491 455 assert( io->crypto ); 492 456 … … 502 466 const uint8_t * peer_id ) 503 467 { 504 assert( i o);468 assert( isPeerIo( io ) ); 505 469 506 470 if( ( io->peerIdIsSet = peer_id != NULL ) ) … … 513 477 tr_peerIoGetPeersId( const tr_peerIo * io ) 514 478 { 515 assert( i o);479 assert( isPeerIo( io ) ); 516 480 assert( io->peerIdIsSet ); 517 481 … … 527 491 int flag ) 528 492 { 529 assert( i o);493 assert( isPeerIo( io ) ); 530 494 assert( flag == 0 || flag == 1 ); 531 495 … … 537 501 int flag ) 538 502 { 539 assert( i o);503 assert( isPeerIo( io ) ); 540 504 assert( flag == 0 || flag == 1 ); 541 505 … … 546 510 tr_peerIoSupportsLTEP( const tr_peerIo * io ) 547 511 { 548 assert( i o);512 assert( isPeerIo( io ) ); 549 513 550 514 return io->extendedProtocolSupported; … … 554 518 tr_peerIoSupportsFEXT( const tr_peerIo * io ) 555 519 { 556 assert( i o);520 assert( isPeerIo( io ) ); 557 521 558 522 return io->fastPeersSupported; … … 566 530 tr_peerIoGetWriteBufferSpace( const tr_peerIo * io ) 567 531 { 568 const size_t desiredLiveLen = tr_bandwidthClamp( io->bandwidth[TR_UP], io->session->so_rcvbuf ); 569 const size_t currentLiveLen = EVBUFFER_LENGTH( EVBUFFER_OUTPUT( io->bufev ) ); 570 const size_t desiredQueueLen = io->session->so_sndbuf; 571 const size_t currentQueueLen = EVBUFFER_LENGTH( io->output ); 572 const size_t desiredLen = desiredLiveLen + desiredQueueLen; 573 const size_t currentLen = currentLiveLen + currentQueueLen; 532 const size_t desiredLen = io->session->so_sndbuf * 2; /* FIXME: bigger? */ 533 const size_t currentLen = EVBUFFER_LENGTH( tr_iobuf_output( io->iobuf ) ); 574 534 size_t freeSpace = 0; 575 535 … … 582 542 void 583 543 tr_peerIoSetBandwidth( tr_peerIo * io, 584 tr_direction direction,585 544 tr_bandwidth * bandwidth ) 586 545 { 587 assert( io ); 588 assert( direction == TR_UP || direction == TR_DOWN ); 589 590 io->bandwidth[direction] = bandwidth; 591 592 if( direction == TR_UP ) 593 adjustOutputBuffer( io ); 594 else 595 adjustInputBuffer( io ); 546 assert( isPeerIo( io ) ); 547 548 if( io->bandwidth ) 549 tr_bandwidthRemoveBuffer( io->bandwidth, io->iobuf ); 550 551 io->bandwidth = bandwidth; 552 tr_iobuf_set_bandwidth( io->iobuf, bandwidth ); 553 554 if( io->bandwidth ) 555 tr_bandwidthAddBuffer( io->bandwidth, io->iobuf ); 556 557 tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE ); 596 558 } 597 559 … … 610 572 int encryptionMode ) 611 573 { 612 assert( i o);574 assert( isPeerIo( io ) ); 613 575 assert( encryptionMode == PEER_ENCRYPTION_NONE 614 576 || encryptionMode == PEER_ENCRYPTION_RC4 ); … … 637 599 dbgmsg( io, "adding %zu bytes into io->output", writemeLen ); 638 600 639 evbuffer_add( io->output, writeme, writemeLen );640 641 601 datatype = tr_new( struct tr_datatype, 1 ); 642 602 datatype->isPieceData = isPieceData != 0; … … 644 604 tr_list_append( &io->output_datatypes, datatype ); 645 605 646 adjustOutputBuffer( io ); 606 evbuffer_add( tr_iobuf_output( io->iobuf ), writeme, writemeLen ); 607 tr_iobuf_enable( io->iobuf, EV_WRITE ); 647 608 } 648 609 -
trunk/libtransmission/peer-io.h
r7151 r7154 24 24 struct in_addr; 25 25 struct evbuffer; 26 struct bufferevent;27 26 struct tr_bandwidth; 28 27 struct tr_crypto; 28 struct tr_iobuf; 29 29 typedef struct tr_peerIo tr_peerIo; 30 30 … … 111 111 ReadState; 112 112 113 typedef ReadState ( *tr_can_read_cb )( struct bufferevent * ev, 114 void * user_data ); 115 116 typedef void ( *tr_did_write_cb )( tr_peerIo * io, 117 size_t bytesWritten, 118 int wasPieceData, 119 void * userData ); 120 121 typedef void ( *tr_net_error_cb )( struct bufferevent * ev, 122 short what, 123 void * userData ); 113 typedef ReadState ( *tr_can_read_cb )( struct tr_iobuf * iobuf, 114 void * user_data, 115 size_t * setme_piece_byte_count ); 116 117 typedef void ( *tr_did_write_cb )( tr_peerIo * io, 118 size_t bytesWritten, 119 int wasPieceData, 120 void * userData ); 121 122 typedef void ( *tr_net_error_cb )( struct tr_iobuf * ev, 123 short what, 124 void * userData ); 124 125 125 126 void tr_peerIoSetIOFuncs ( tr_peerIo * io, … … 206 207 207 208 void tr_peerIoSetBandwidth( tr_peerIo * io, 208 tr_direction direction,209 209 struct tr_bandwidth * bandwidth ); 210 210 211 void tr_peerIoBandwidthUsed( tr_peerIo * io, 212 tr_direction direction, 213 size_t byteCount, 214 int isPieceData ); 215 216 211 217 212 218 #endif -
trunk/libtransmission/peer-mgr-private.h
r7151 r7154 28 28 #include "publish.h" /* tr_publisher_tag */ 29 29 30 struct tr_bandwidth; 30 31 struct tr_bitfield; 31 32 struct tr_peerIo; 32 33 struct tr_peermsgs; 33 struct tr_ratecontrol;34 34 35 35 enum … … 72 72 tr_publisher_tag msgsTag; 73 73 74 /* the rate at which pieces are being transferred between client and peer. 75 * protocol overhead is NOT included; this is only the piece data */ 76 struct tr_ratecontrol * pieceSpeed[2]; 77 78 /* the rate at which all data is being transferred between client and peer. */ 79 struct tr_ratecontrol * rawSpeed[2]; 74 struct tr_bandwidth * bandwidth; 80 75 } 81 76 tr_peer; -
trunk/libtransmission/peer-mgr.c
r7148 r7154 34 34 #include "peer-msgs.h" 35 35 #include "ptrarray.h" 36 #include "ratecontrol.h"37 36 #include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */ 38 37 #include "torrent.h" … … 59 58 60 59 /* how frequently to reallocate bandwidth */ 61 BANDWIDTH_PERIOD_MSEC = 333,60 BANDWIDTH_PERIOD_MSEC = 200, 62 61 63 62 /* max # of peers to ask fer per torrent per reconnect pulse */ … … 324 323 325 324 static tr_peer* 326 peerConstructor( const struct in_addr * in_addr )325 peerConstructor( tr_torrent * tor, const struct in_addr * in_addr ) 327 326 { 328 327 tr_peer * p; … … 330 329 p = tr_new0( tr_peer, 1 ); 331 330 memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) ); 332 p->pieceSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( ); 333 p->pieceSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( ); 334 p->rawSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( ); 335 p->rawSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( ); 331 p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth ); 336 332 return p; 337 333 } … … 349 345 if( peer == NULL ) 350 346 { 351 peer = peerConstructor( in_addr );347 peer = peerConstructor( torrent->tor, in_addr ); 352 348 tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare ); 353 349 } … … 371 367 tr_free( peer->client ); 372 368 373 tr_rcClose( peer->rawSpeed[TR_CLIENT_TO_PEER] ); 374 tr_rcClose( peer->rawSpeed[TR_PEER_TO_CLIENT] ); 375 tr_rcClose( peer->pieceSpeed[TR_CLIENT_TO_PEER] ); 376 tr_rcClose( peer->pieceSpeed[TR_PEER_TO_CLIENT] ); 369 tr_bandwidthFree( peer->bandwidth ); 370 377 371 tr_free( peer ); 378 372 } … … 1018 1012 const time_t now = time( NULL ); 1019 1013 tr_torrent * tor = t->tor; 1020 const tr_direction dir = TR_CLIENT_TO_PEER;1021 1014 1022 1015 tor->activityDate = now; … … 1024 1017 if( e->wasPieceData ) 1025 1018 tor->uploadedCur += e->length; 1026 1027 /* add it to the raw upload speed */1028 if( peer )1029 tr_rcTransferred ( peer->rawSpeed[dir], e->length );1030 1031 /* maybe add it to the piece upload speed */1032 if( e->wasPieceData ) {1033 if( peer )1034 tr_rcTransferred ( peer->pieceSpeed[dir], e->length );1035 }1036 1037 tr_bandwidthUsed( tor->bandwidth[dir], e->length, e->wasPieceData );1038 tr_bandwidthUsed( tor->session->bandwidth[dir], e->length, e->wasPieceData );1039 1019 1040 1020 /* update the stats */ … … 1055 1035 const time_t now = time( NULL ); 1056 1036 tr_torrent * tor = t->tor; 1057 const tr_direction dir = TR_PEER_TO_CLIENT;1058 1037 1059 1038 tor->activityDate = now; … … 1068 1047 tor->downloadedCur += e->length; 1069 1048 1070 /* add it to our raw download speed */1071 if( peer )1072 tr_rcTransferred ( peer->rawSpeed[dir], e->length );1073 1074 /* maybe add it to the piece upload speed */1075 if( e->wasPieceData ) {1076 if( peer )1077 tr_rcTransferred ( peer->pieceSpeed[dir], e->length );1078 }1079 1080 tr_bandwidthUsed( tor->bandwidth[dir], e->length, e->wasPieceData );1081 tr_bandwidthUsed( tor->session->bandwidth[dir], e->length, e->wasPieceData );1082 1083 1049 /* update the stats */ 1084 1050 if( e->wasPieceData ) … … 1313 1279 if( !peer_id ) 1314 1280 peer->client = NULL; 1315 else 1316 { 1281 else { 1317 1282 char client[128]; 1318 1283 tr_clientForId( client, sizeof( client ), peer_id ); 1319 1284 peer->client = tr_strdup( client ); 1320 1285 } 1286 1321 1287 peer->port = port; 1322 1288 peer->io = io; 1323 peer->msgs = 1324 tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, 1325 &peer->msgsTag ); 1289 peer->msgs = tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag ); 1290 tr_peerIoSetBandwidth( io, peer->bandwidth ); 1326 1291 1327 1292 success = TRUE; … … 1805 1770 assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT ); 1806 1771 1807 return tr_ rcRate( peer->pieceSpeed[direction]);1772 return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction ); 1808 1773 } 1809 1774 … … 2363 2328 int i, j; 2364 2329 2365 for( i = 0; i <torrentCount; ++i )2330 for( i=0; i<torrentCount; ++i ) 2366 2331 { 2367 2332 Torrent * t = tr_ptrArrayNth( mgr->torrents, i ); 2368 for( j = 0; j <tr_ptrArraySize( t->peers ); ++j )2333 for( j=0; j<tr_ptrArraySize( t->peers ); ++j ) 2369 2334 { 2370 2335 tr_peer * peer = tr_ptrArrayNth( t->peers, j ); … … 2374 2339 } 2375 2340 2376 static void 2377 setTorrentBandwidth( Torrent * t, 2378 tr_direction dir, 2379 tr_bandwidth * bandwidth ) 2380 { 2381 int i; 2382 int peerCount = 0; 2383 tr_peer ** peers = getConnectedPeers( t, &peerCount ); 2384 2385 for( i=0; i<peerCount; ++i ) 2386 tr_peerIoSetBandwidth( peers[i]->io, dir, bandwidth ); 2387 2388 tr_free( peers ); 2389 } 2390 2391 #if 0 2392 #define DEBUG_DIRECTION TR_DOWN 2393 #endif 2394 2395 static double 2396 bytesPerPulse( double KiB_per_second ) 2397 { 2398 return KiB_per_second * ( 1024.0 * BANDWIDTH_PERIOD_MSEC / 1000.0 ); 2399 } 2400 2401 /* 2402 * @param currentSpeed current speed in KiB/s 2403 * @param desiredSpeed desired speed in KiB/s 2404 */ 2405 static double 2406 allocateHowMuch( tr_direction dir UNUSED, double currentSpeed, double desiredSpeed ) 2407 { 2408 const int pulses_per_history = TR_RATECONTROL_HISTORY_MSEC / BANDWIDTH_PERIOD_MSEC; 2409 const double current_bytes_per_pulse = bytesPerPulse( currentSpeed ); 2410 const double desired_bytes_per_pulse = bytesPerPulse( desiredSpeed ); 2411 const double min = desired_bytes_per_pulse * 0.90; 2412 const double max = desired_bytes_per_pulse * 1.50; 2413 const double next_pulse_bytes = desired_bytes_per_pulse * ( pulses_per_history + 1 ) 2414 - ( current_bytes_per_pulse * pulses_per_history ); 2415 double clamped; 2416 2417 /* clamp the return value to lessen oscillation */ 2418 clamped = next_pulse_bytes; 2419 clamped = MAX( clamped, min ); 2420 clamped = MIN( clamped, max ); 2421 2422 #ifdef DEBUG_DIRECTION 2423 if( dir == DEBUG_DIRECTION ) 2424 fprintf( stderr, "currentSpeed(%5.2f) desiredSpeed(%5.2f), allocating %5.2f (unclamped: %5.2f)\n", 2425 currentSpeed, 2426 desiredSpeed, 2427 clamped/1024.0, 2428 next_pulse_bytes/1024.0 ); 2429 #endif 2430 2431 return clamped; 2432 } 2433 2434 /** 2435 * Allocate bandwidth for each peer connection. 2436 * 2437 * @param mgr the peer manager 2438 * @param direction whether to allocate upload or download bandwidth 2439 */ 2440 static void 2441 allocateBandwidth( tr_peerMgr * mgr, 2442 tr_direction direction ) 2443 { 2444 int i; 2445 tr_session * session = mgr->session; 2446 const int torrentCount = tr_ptrArraySize( mgr->torrents ); 2447 Torrent ** torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents ); 2448 tr_bandwidth * global_pool = session->bandwidth[direction]; 2449 2450 assert( mgr ); 2451 assert( direction == TR_UP || direction == TR_DOWN ); 2452 2453 /* before allocating bandwidth, pump the connected peers */ 2341 static int 2342 bandwidthPulse( void * vmgr ) 2343 { 2344 tr_peerMgr * mgr = vmgr; 2345 managerLock( mgr ); 2346 2454 2347 pumpAllPeers( mgr ); 2455 2456 for( i=0; i<torrentCount; ++i ) 2457 { 2458 Torrent * t = torrents[i]; 2459 tr_speedlimit speedMode; 2460 2461 /* no point in allocating bandwidth for stopped torrents */ 2462 if( tr_torrentGetActivity( t->tor ) == TR_STATUS_STOPPED ) 2463 continue; 2464 2465 /* if piece data is disallowed, don't bother limiting bandwidth -- 2466 * we won't be asking for, or sending out, any pieces */ 2467 if( !tr_torrentIsPieceTransferAllowed( t->tor, direction ) ) 2468 speedMode = TR_SPEEDLIMIT_UNLIMITED; 2469 else 2470 speedMode = tr_torrentGetSpeedMode( t->tor, direction ); 2471 2472 /* process the torrent's peers based on its speed mode */ 2473 switch( speedMode ) 2474 { 2475 case TR_SPEEDLIMIT_UNLIMITED: 2476 { 2477 tr_bandwidth * b = t->tor->bandwidth[direction]; 2478 tr_bandwidthSetUnlimited( b ); 2479 setTorrentBandwidth( t, direction, b ); 2480 break; 2481 } 2482 2483 case TR_SPEEDLIMIT_SINGLE: 2484 { 2485 tr_bandwidth * b = t->tor->bandwidth[direction]; 2486 const double currentSpeed = tr_bandwidthGetPieceSpeed( b ); 2487 const double desiredSpeed = tr_torrentGetSpeedLimit( t->tor, direction ); 2488 const double bytesPerPulse = allocateHowMuch( direction, currentSpeed, desiredSpeed ); 2489 #ifdef DEBUG_DIRECTION 2490 if( direction == DEBUG_DIRECTION ) 2491 fprintf( stderr, "single: currentSpeed %.0f ... desiredSpeed %.0f ... bytesPerPulse %.0f\n", currentSpeed, desiredSpeed, bytesPerPulse ); 2492 #endif 2493 tr_bandwidthSetLimited( b, bytesPerPulse ); 2494 setTorrentBandwidth( t, direction, b ); 2495 break; 2496 } 2497 2498 case TR_SPEEDLIMIT_GLOBAL: 2499 { 2500 setTorrentBandwidth( t, direction, global_pool ); 2501 break; 2502 } 2503 } 2504 } 2505 2506 /* handle the global pool's connections */ 2507 if( !tr_sessionIsSpeedLimitEnabled( session, direction ) ) 2508 tr_bandwidthSetUnlimited( global_pool ); 2509 else { 2510 const double currentSpeed = tr_bandwidthGetPieceSpeed( global_pool ); 2511 const double desiredSpeed = tr_sessionGetSpeedLimit( session, direction ); 2512 const double bytesPerPulse = allocateHowMuch( direction, currentSpeed, desiredSpeed ); 2513 #ifdef DEBUG_DIRECTION 2514 if( direction == DEBUG_DIRECTION ) 2515 fprintf( stderr, "global(%p): currentSpeed %.0f ... desiredSpeed %.0f ... bytesPerPulse %.0f\n", global_pool, currentSpeed, desiredSpeed, bytesPerPulse ); 2516 #endif 2517 tr_bandwidthSetLimited( session->bandwidth[direction], bytesPerPulse ); 2518 } 2519 2520 /* now that we've allocated bandwidth, pump all the connected peers */ 2348 tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC ); 2349 tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC ); 2521 2350 pumpAllPeers( mgr ); 2522 }2523 2524 static int2525 bandwidthPulse( void * vmgr )2526 {2527 tr_peerMgr * mgr = vmgr;2528 int i;2529 2530 managerLock( mgr );2531 2532 /* allocate the upload and download bandwidth */2533 for( i = 0; i < 2; ++i )2534 allocateBandwidth( mgr, i );2535 2351 2536 2352 managerUnlock( mgr ); -
trunk/libtransmission/peer-msgs.c
r7147 r7154 25 25 #include "crypto.h" 26 26 #include "inout.h" 27 #include "iobuf.h" 27 28 #ifdef WIN32 28 29 #include "net.h" /* for ECONN */ … … 1315 1316 1316 1317 static int 1317 readBtPiece( tr_peermsgs * msgs, 1318 struct evbuffer * inbuf, 1319 size_t inlen ) 1318 readBtPiece( tr_peermsgs * msgs, 1319 struct evbuffer * inbuf, 1320 size_t inlen, 1321 size_t * setme_piece_bytes_read ) 1320 1322 { 1321 1323 struct peer_request * req = &msgs->incoming.blockReq; … … 1350 1352 evbuffer_add( msgs->incoming.block, buf, n ); 1351 1353 fireClientGotData( msgs, n, TRUE ); 1354 *setme_piece_bytes_read += n; 1352 1355 tr_free( buf ); 1353 1356 dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain", … … 1643 1646 1644 1647 static ReadState 1645 canRead( struct bufferevent * evin, 1646 void * vmsgs ) 1648 canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece ) 1647 1649 { 1648 1650 ReadState ret; 1649 1651 tr_peermsgs * msgs = vmsgs; 1650 struct evbuffer * in = EVBUFFER_INPUT ( evin);1652 struct evbuffer * in = tr_iobuf_input( iobuf ); 1651 1653 const size_t inlen = EVBUFFER_LENGTH( in ); 1652 1654 … … 1657 1659 else if( msgs->state == AWAITING_BT_PIECE ) 1658 1660 { 1659 ret = inlen ? readBtPiece( msgs, in, inlen ) : READ_LATER;1661 ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER; 1660 1662 } 1661 1663 else switch( msgs->state ) … … 1821 1823 1822 1824 static void 1823 gotError( struct bufferevent * evbuf UNUSED,1824 short 1825 void *vmsgs )1825 gotError( struct tr_iobuf * iobuf UNUSED, 1826 short what, 1827 void * vmsgs ) 1826 1828 { 1827 1829 if( what & EVBUFFER_TIMEOUT ) 1828 dbgmsg( vmsgs, "libevent got a timeout, what=%hd, secs=%d", what, 1829 evbuf->timeout_read ); 1830 dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what ); 1830 1831 if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) ) 1831 1832 dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)", -
trunk/libtransmission/session.c
r7147 r7154 275 275 /* Initialize rate and file descripts controls */ 276 276 277 h->uploadLimit = uploadLimit;278 h->useUploadLimit = useUploadLimit;279 h->downloadLimit = downloadLimit;280 h->useDownloadLimit = useDownloadLimit;281 282 277 tr_fdInit( globalPeerLimit ); 283 278 h->shared = tr_sharedInit( h, isPortForwardingEnabled, publicPort ); 284 279 h->isPortSet = publicPort >= 0; 285 280 286 h->bandwidth[TR_UP] = tr_bandwidthNew( h ); 287 h->bandwidth[TR_DOWN] = tr_bandwidthNew( h ); 281 h->bandwidth = tr_bandwidthNew( h, NULL ); 282 tr_bandwidthSetDesiredSpeed( h->bandwidth, TR_UP, uploadLimit ); 283 tr_bandwidthSetDesiredSpeed( h->bandwidth, TR_DOWN, downloadLimit ); 284 tr_bandwidthSetLimited( h->bandwidth, TR_UP, useUploadLimit ); 285 tr_bandwidthSetLimited( h->bandwidth, TR_DOWN, useDownloadLimit ); 288 286 289 287 /* first %s is the application name … … 444 442 445 443 void 446 tr_sessionSetSpeedLimitEnabled( tr_handle * h, 447 tr_direction direction, 448 int use_flag ) 449 { 450 assert( h ); 451 assert( direction == TR_UP || direction == TR_DOWN ); 452 453 if( direction == TR_UP ) 454 h->useUploadLimit = use_flag ? 1 : 0; 455 else 456 h->useDownloadLimit = use_flag ? 1 : 0; 457 } 458 459 int 460 tr_sessionIsSpeedLimitEnabled( const tr_handle * h, 461 tr_direction direction ) 462 { 463 return direction == TR_UP ? h->useUploadLimit : h->useDownloadLimit; 464 } 465 466 void 467 tr_sessionSetSpeedLimit( tr_handle * h, 468 tr_direction direction, 469 int KiB_sec ) 470 { 471 if( direction == TR_DOWN ) 472 h->downloadLimit = KiB_sec; 473 else 474 h->uploadLimit = KiB_sec; 475 } 476 477 int 478 tr_sessionGetSpeedLimit( const tr_handle * h, 479 tr_direction direction ) 480 { 481 return direction == TR_UP ? h->uploadLimit : h->downloadLimit; 444 tr_sessionSetSpeedLimitEnabled( tr_session * session, 445 tr_direction dir, 446 int isLimited ) 447 { 448 tr_bandwidthSetLimited( session->bandwidth, dir, isLimited ); 449 } 450 451 int 452 tr_sessionIsSpeedLimitEnabled( const tr_session * session, 453 tr_direction dir ) 454 { 455 return !tr_bandwidthIsLimited( session->bandwidth, dir ); 456 } 457 458 void 459 tr_sessionSetSpeedLimit( tr_session * session, 460 tr_direction dir, 461 int desiredSpeed ) 462 { 463 tr_bandwidthSetDesiredSpeed( session->bandwidth, dir, desiredSpeed ); 464 } 465 466 int 467 tr_sessionGetSpeedLimit( const tr_session * session, 468 tr_direction dir ) 469 { 470 return tr_bandwidthGetDesiredSpeed( session->bandwidth, dir ); 482 471 } 483 472 … … 506 495 tr_sessionGetPieceSpeed( const tr_session * session, tr_direction dir ) 507 496 { 508 assert( dir==TR_UP || dir==TR_DOWN ); 509 510 return session ? tr_bandwidthGetPieceSpeed( session->bandwidth[dir] ) : 0.0; 497 return session ? tr_bandwidthGetPieceSpeed( session->bandwidth, dir ) : 0.0; 511 498 } 512 499 … … 514 501 tr_sessionGetRawSpeed( const tr_session * session, tr_direction dir ) 515 502 { 516 assert( dir==TR_UP || dir==TR_DOWN ); 517 518 return session ? tr_bandwidthGetPieceSpeed( session->bandwidth[dir] ) : 0.0; 503 return session ? tr_bandwidthGetPieceSpeed( session->bandwidth, dir ) : 0.0; 519 504 } 520 505 … … 630 615 631 616 /* free the session memory */ 632 tr_bandwidthFree( session->bandwidth[TR_UP] ); 633 tr_bandwidthFree( session->bandwidth[TR_DOWN] ); 617 tr_bandwidthFree( session->bandwidth ); 634 618 tr_lockFree( session->lock ); 635 619 for( i = 0; i < session->metainfoLookupCount; ++i ) -
trunk/libtransmission/session.h
r7151 r7154 63 63 unsigned int isProxyAuthEnabled : 1; 64 64 unsigned int isClosed : 1; 65 unsigned int useUploadLimit : 1;66 unsigned int useDownloadLimit : 1;67 65 unsigned int useLazyBitfield : 1; 68 66 … … 88 86 char * proxyUsername; 89 87 char * proxyPassword; 90 91 int uploadLimit;92 int downloadLimit;93 88 94 89 struct tr_list * blocklists; … … 117 112 118 113 /* monitors the "global pool" speeds */ 119 struct tr_bandwidth * bandwidth [2];114 struct tr_bandwidth * bandwidth; 120 115 }; 121 116 -
trunk/libtransmission/torrent.c
r7147 r7154 144 144 void 145 145 tr_torrentSetSpeedMode( tr_torrent * tor, 146 tr_direction dir ection,146 tr_direction dir, 147 147 tr_speedlimit mode ) 148 148 { 149 tr_speedlimit * limit = direction == TR_UP ? &tor->uploadLimitMode 150 : &tor->downloadLimitMode; 151 152 *limit = mode; 149 assert( tor != NULL ); 150 assert( dir==TR_UP || dir==TR_DOWN ); 151 assert( mode==TR_SPEEDLIMIT_GLOBAL || mode==TR_SPEEDLIMIT_SINGLE || mode==TR_SPEEDLIMIT_UNLIMITED ); 152 153 tor->speedLimitMode[dir] = mode; 154 155 tr_bandwidthSetLimited( tor->bandwidth, dir, mode==TR_SPEEDLIMIT_SINGLE ); 156 tr_bandwidthHonorParentLimits( tor->bandwidth, dir, mode!=TR_SPEEDLIMIT_UNLIMITED ); 153 157 } 154 158 155 159 tr_speedlimit 156 160 tr_torrentGetSpeedMode( const tr_torrent * tor, 157 tr_direction direction ) 158 { 159 return direction == TR_UP ? tor->uploadLimitMode 160 : tor->downloadLimitMode; 161 tr_direction dir ) 162 { 163 assert( tor != NULL ); 164 assert( dir==TR_UP || dir==TR_DOWN ); 165 166 return tor->speedLimitMode[dir]; 161 167 } 162 168 163 169 void 164 170 tr_torrentSetSpeedLimit( tr_torrent * tor, 165 tr_direction direction, 166 int single_KiB_sec ) 167 { 168 switch( direction ) 169 { 170 case TR_UP: 171 tor->uploadLimit = single_KiB_sec; break; 172 173 case TR_DOWN: 174 tor->downloadLimit = single_KiB_sec; break; 175 176 default: 177 assert( 0 ); 178 } 171 tr_direction dir, 172 int desiredSpeed ) 173 { 174 tr_bandwidthSetDesiredSpeed( tor->bandwidth, dir, desiredSpeed ); 179 175 } 180 176 181 177 int 182 178 tr_torrentGetSpeedLimit( const tr_torrent * tor, 183 tr_direction direction ) 184 { 185 switch( direction ) 186 { 187 case TR_UP: 188 return tor->uploadLimit; 189 190 case TR_DOWN: 191 return tor->downloadLimit; 192 193 default: 194 assert( 0 ); 195 } 179 tr_direction dir ) 180 { 181 return tr_bandwidthGetDesiredSpeed( tor->bandwidth, dir ); 196 182 } 197 183 … … 498 484 randomizeTiers( info ); 499 485 500 tor->bandwidth[TR_UP] = tr_bandwidthNew( h ); 501 tor->bandwidth[TR_DOWN] = tr_bandwidthNew( h ); 486 tor->bandwidth = tr_bandwidthNew( h, h->bandwidth ); 487 488 fprintf( stderr, "torrent [%s] bandwidth is %p\n", info->name, tor->bandwidth ); 502 489 503 490 tor->blockSize = getBlockSize( info->pieceSize ); … … 541 528 tr_torrentInitFilePieces( tor ); 542 529 543 tor->uploadLimit = 0;544 tor->downloadLimit = 0;545 530 tor->swarmSpeed = tr_rcInit( ); 546 531 … … 814 799 s->peersFrom ); 815 800 816 s->rawUploadSpeed = tr_bandwidthGetRawSpeed ( tor->bandwidth [TR_UP]);817 s->rawDownloadSpeed = tr_bandwidthGetRawSpeed ( tor->bandwidth [TR_DOWN]);818 s->pieceUploadSpeed = tr_bandwidthGetPieceSpeed( tor->bandwidth [TR_UP]);819 s->pieceDownloadSpeed = tr_bandwidthGetPieceSpeed( tor->bandwidth [TR_DOWN]);801 s->rawUploadSpeed = tr_bandwidthGetRawSpeed ( tor->bandwidth, TR_UP ); 802 s->rawDownloadSpeed = tr_bandwidthGetRawSpeed ( tor->bandwidth, TR_DOWN ); 803 s->pieceUploadSpeed = tr_bandwidthGetPieceSpeed( tor->bandwidth, TR_UP ); 804 s->pieceDownloadSpeed = tr_bandwidthGetPieceSpeed( tor->bandwidth, TR_DOWN ); 820 805 821 806 usableSeeds += tor->info.webseedCount; … … 1100 1085 h->torrentCount--; 1101 1086 1102 tr_bandwidthFree( tor->bandwidth[TR_DOWN] ); 1103 tr_bandwidthFree( tor->bandwidth[TR_UP] ); 1087 tr_bandwidthFree( tor->bandwidth ); 1104 1088 1105 1089 tr_metainfoFree( inf ); -
trunk/libtransmission/torrent.h
r7151 r7154 173 173 tr_info info; 174 174 175 int uploadLimit; 176 tr_speedlimit uploadLimitMode; 177 int downloadLimit; 178 tr_speedlimit downloadLimitMode; 175 tr_speedlimit speedLimitMode[2]; 179 176 180 177 struct tr_ratecontrol * swarmSpeed; … … 235 232 int uniqueId; 236 233 237 struct tr_bandwidth * bandwidth [2];234 struct tr_bandwidth * bandwidth; 238 235 }; 239 236
Note: See TracChangeset
for help on using the changeset viewer.