Changeset 7441
- Timestamp:
- Dec 20, 2008, 10:19:34 PM (12 years ago)
- Location:
- trunk/libtransmission
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/libtransmission/bandwidth.c
r7433 r7441 295 295 int period_msec ) 296 296 { 297 int n;297 int i, n, peerCount; 298 298 tr_ptrArray * tmp; 299 299 struct tr_peerIo ** peers; 300 300 301 /* allocateBandwidth() is a helper function with two purposes: 302 * 1. allocate bandwidth to b and its subtree 303 * 2. accumulate an array of all the peerIos from b and its subtree. */ 301 304 tmp = tr_ptrArrayNew( ); 302 305 allocateBandwidth( b, dir, period_msec, tmp ); 303 peers = (struct tr_peerIo**) tr_ptrArrayPeek( tmp, &n ); 304 305 /* loop through all the peers, reading and writing in small chunks, 306 * until we run out of bandwidth or peers. we do it this way to 307 * prevent one peer from using up all the bandwidth */ 308 #if 0 309 fprintf( stderr, "%s - %d peers\n", (dir==TR_UP)?"up":"down", n ); 310 #endif 311 while( n > 0 ) 312 { 313 int i; 314 for( i=0; i<n; ) 315 { 316 const int increment = n==1 ? 4096 : 1024; 317 const int byteCount = tr_peerIoFlush( peers[i], dir, increment); 318 319 #if 0 320 if( byteCount ) 321 fprintf( stderr, "peer %p: %d bytes\n", peers[i], byteCount ); 322 #endif 323 324 if( byteCount == increment ) 325 ++i; 326 else 327 peers[i] = peers[--n]; 306 peers = (struct tr_peerIo**) tr_ptrArrayPeek( tmp, &peerCount ); 307 308 /* Stop all peers from listening for the socket to be ready for IO. 309 * See "Second phase of IO" lower in this function for more info. */ 310 for( i=0; i<peerCount; ++i ) 311 tr_peerIoSetEnabled( peers[i], dir, FALSE ); 312 313 /* First phase of IO. Tries to distribute bandwidth in a fair/even manner 314 * to avoid "greedy peers" from starving out the other peers: loop through 315 * peers in a round-robin fashion, giving each one of them them small chunks 316 * of bandwidth to use. (It's small to conserve some of the bandwidth 317 * until the end of the loop). Keep looping until we run out of bandwidth 318 * or peers that are ready to use it. */ 319 n = peerCount; 320 i = n ? tr_cryptoWeakRandInt( n ) : 0; /* pick a random starting point */ 321 for( ; n>0; ) 322 { 323 const int increment = n==1 ? 4096 : 1024; 324 const int byteCount = tr_peerIoFlush( peers[i], dir, increment); 325 326 if( byteCount == increment ) 327 ++i; 328 else { 329 /* peer is done writing for now; move it to the end of the list */ 330 tr_peerIo * tmp = peers[i]; 331 peers[i] = peers[n-1]; 332 peers[n-1] = tmp; 333 --n; 328 334 } 329 } 335 336 assert( i <= n ); 337 if( i == n ) 338 i = 0; 339 } 340 341 /* Second phase of IO. To help us scale well in high bandiwdth situations 342 * such as LANs, enable on-demand IO for peers with bandwidth left to burn. 343 * This on-demand IO for a peer is enabled until either (1) the peer runs 344 * out of bandwidth, or (2) the next tr_bandwidthAllocate() call, when we 345 * start all over again. */ 346 for( i=0; i<peerCount; ++i ) 347 if( tr_peerIoHasBandwidthLeft( peers[i], dir ) ) 348 tr_peerIoSetEnabled( peers[i], dir, TRUE ); 330 349 331 350 /* cleanup */ -
trunk/libtransmission/handshake.c
r7419 r7441 1091 1091 } 1092 1092 1093 void 1094 tr_handshakeFree( tr_handshake * handshake ) 1095 { 1096 if( handshake->io ) 1097 tr_peerIoFree( handshake->io ); 1098 1099 tr_free( handshake ); 1100 } 1101 1093 1102 static int 1094 1103 tr_handshakeDone( tr_handshake * handshake, 1095 1104 int isOK ) 1096 1105 { 1097 intsuccess;1106 tr_bool success; 1098 1107 1099 1108 dbgmsg( handshake, "handshakeDone: %s", isOK ? "connected" : "aborting" ); … … 1101 1110 1102 1111 success = fireDoneFunc( handshake, isOK ); 1103 1104 tr_free( handshake );1105 1112 1106 1113 return success ? READ_LATER : READ_ERR; … … 1193 1200 } 1194 1201 1202 struct tr_peerIo* 1203 tr_handshakeStealIO( tr_handshake * handshake ) 1204 { 1205 struct tr_peerIo * io; 1206 1207 assert( handshake ); 1208 assert( handshake->io ); 1209 1210 io = handshake->io; 1211 handshake->io = NULL; 1212 return io; 1213 } 1214 1195 1215 const tr_address * 1196 1216 tr_handshakeGetAddr( const struct tr_handshake * handshake, -
trunk/libtransmission/handshake.h
r7404 r7441 40 40 tr_port * port ); 41 41 42 void tr_handshakeFree( tr_handshake * handshake ); 43 42 44 void tr_handshakeAbort( tr_handshake * handshake ); 43 45 44 46 struct tr_peerIo* tr_handshakeGetIO( tr_handshake * handshake ); 45 47 48 struct tr_peerIo* tr_handshakeStealIO( tr_handshake * handshake ); 49 50 46 51 #endif -
trunk/libtransmission/peer-io.c
r7435 r7441 35 35 36 36 #define MAGIC_NUMBER 206745 37 #define IO_TIMEOUT_SECS 838 37 39 38 static size_t … … 88 87 89 88 uint8_t encryptionMode; 90 uint8_t timeout;91 89 tr_port port; 92 90 int socket; … … 112 110 struct evbuffer * inbuf; 113 111 struct evbuffer * outbuf; 112 113 struct event event_read; 114 struct event event_write; 114 115 }; 115 116 … … 119 120 120 121 static void 121 didWriteWrapper( void * unused UNUSED, 122 size_t bytes_transferred, 123 void * vio ) 124 { 125 tr_peerIo * io = vio; 126 122 didWriteWrapper( tr_peerIo * io, size_t bytes_transferred ) 123 { 127 124 while( bytes_transferred ) 128 125 { … … 147 144 148 145 static void 149 canReadWrapper( void * unused UNUSED, 150 size_t bytes_transferred UNUSED, 151 void * vio ) 152 { 153 int done = 0; 154 int err = 0; 155 tr_peerIo * io = vio; 146 canReadWrapper( tr_peerIo * io ) 147 { 148 tr_bool done = 0; 149 tr_bool err = 0; 156 150 tr_session * session = io->session; 157 151 … … 169 163 const int ret = io->canRead( io, io->userData, &piece ); 170 164 171 if( ret != READ_ERR ) 172 { 173 const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf ); 174 if( piece ) 175 tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE ); 176 if( used != piece ) 177 tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE ); 178 } 165 const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf ); 166 167 if( piece ) 168 tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE ); 169 170 if( used != piece ) 171 tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE ); 179 172 180 173 switch( ret ) … … 200 193 } 201 194 202 #if 0 195 #define _isBool(b) (((b)==0 || (b)==1)) 196 197 static int 198 isPeerIo( const tr_peerIo * io ) 199 { 200 return ( io != NULL ) 201 && ( io->magicNumber == MAGIC_NUMBER ) 202 && ( tr_isAddress( &io->addr ) ) 203 && ( _isBool( io->isEncrypted ) ) 204 && ( _isBool( io->isIncoming ) ) 205 && ( _isBool( io->peerIdIsSet ) ) 206 && ( _isBool( io->extendedProtocolSupported ) ) 207 && ( _isBool( io->fastExtensionSupported ) ); 208 } 209 203 210 static void 204 gotErrorWrapper( struct tr_iobuf * iobuf, 205 short what, 206 void * userData ) 207 { 208 tr_peerIo * c = userData; 209 210 if( c->gotError ) 211 c->gotError( iobuf, what, c->userData ); 212 } 211 event_read_cb( int fd, short event UNUSED, void * vio ) 212 { 213 int res; 214 short what = EVBUFFER_READ; 215 tr_peerIo * io = vio; 216 const size_t howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, io->session->so_rcvbuf ); 217 const tr_direction dir = TR_DOWN; 218 219 assert( isPeerIo( io ) ); 220 221 dbgmsg( io, "libevent says this peer is ready to read" ); 222 223 /* if we don't have any bandwidth left, stop reading */ 224 if( howmuch < 1 ) { 225 tr_peerIoSetEnabled( io, dir, FALSE ); 226 return; 227 } 228 229 res = evbuffer_read( io->inbuf, fd, howmuch ); 230 if( res == -1 ) { 231 if( errno == EAGAIN || errno == EINTR ) 232 goto reschedule; 233 /* error case */ 234 what |= EVBUFFER_ERROR; 235 } else if( res == 0 ) { 236 /* eof case */ 237 what |= EVBUFFER_EOF; 238 } 239 240 if( res <= 0 ) 241 goto error; 242 243 tr_peerIoSetEnabled( io, dir, TRUE ); 244 245 /* Invoke the user callback - must always be called last */ 246 canReadWrapper( io ); 247 248 return; 249 250 reschedule: 251 tr_peerIoSetEnabled( io, dir, TRUE ); 252 return; 253 254 error: 255 if( io->gotError != NULL ) 256 io->gotError( io, what, io->userData ); 257 } 258 259 static int 260 tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch ) 261 { 262 struct evbuffer * buffer = io->outbuf; 263 int n = MIN( EVBUFFER_LENGTH( buffer ), howmuch ); 264 265 #ifdef WIN32 266 n = send(fd, buffer->buffer, n, 0 ); 267 #else 268 n = write(fd, buffer->buffer, n ); 213 269 #endif 270 dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(errno):"") ); 271 272 if( n == -1 ) 273 return -1; 274 if (n == 0) 275 return 0; 276 evbuffer_drain( buffer, n ); 277 278 return n; 279 } 280 281 static void 282 event_write_cb( int fd, short event UNUSED, void * vio ) 283 { 284 int res = 0; 285 short what = EVBUFFER_WRITE; 286 tr_peerIo * io = vio; 287 size_t howmuch; 288 const tr_direction dir = TR_UP; 289 290 assert( isPeerIo( io ) ); 291 292 dbgmsg( io, "libevent says this peer is ready to write" ); 293 294 howmuch = MIN( (size_t)io->session->so_sndbuf, EVBUFFER_LENGTH( io->outbuf ) ); 295 howmuch = tr_bandwidthClamp( io->bandwidth, dir, howmuch ); 296 297 /* if we don't have any bandwidth left, stop writing */ 298 if( howmuch < 1 ) { 299 tr_peerIoSetEnabled( io, dir, FALSE ); 300 return; 301 } 302 303 res = tr_evbuffer_write( io, fd, howmuch ); 304 if (res == -1) { 305 #ifndef WIN32 306 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not 307 * *set errno. thus this error checking is not portable*/ 308 if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS) 309 goto reschedule; 310 /* error case */ 311 what |= EVBUFFER_ERROR; 312 313 #else 314 goto reschedule; 315 #endif 316 317 } else if (res == 0) { 318 /* eof case */ 319 what |= EVBUFFER_EOF; 320 } 321 if (res <= 0) 322 goto error; 323 324 if( EVBUFFER_LENGTH( io->outbuf ) ) 325 tr_peerIoSetEnabled( io, dir, TRUE ); 326 327 didWriteWrapper( io, res ); 328 return; 329 330 reschedule: 331 if( EVBUFFER_LENGTH( io->outbuf ) ) 332 tr_peerIoSetEnabled( io, dir, TRUE ); 333 return; 334 335 error: 336 io->gotError( io, what, io->userData ); 337 } 214 338 215 339 /** … … 217 341 **/ 218 342 219 #if 0220 static void221 bufevNew( tr_peerIo * io )222 {223 io->iobuf = tr_iobuf_new( io->session,224 io->bandwidth,225 io->socket,226 EV_READ | EV_WRITE,227 canReadWrapper,228 didWriteWrapper,229 gotErrorWrapper,230 io );231 232 tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );233 }234 #endif235 236 static int237 isPeerIo( const tr_peerIo * io )238 {239 return ( io != NULL ) && ( io->magicNumber == MAGIC_NUMBER );240 }241 343 242 344 static int … … 267 369 io->socket = socket; 268 370 io->isIncoming = isIncoming != 0; 269 io->timeout = IO_TIMEOUT_SECS;270 371 io->timeCreated = time( NULL ); 271 372 io->inbuf = evbuffer_new( ); 272 373 io->outbuf = evbuffer_new( ); 374 event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io ); 375 event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io ); 273 376 #if 0 274 377 bufevNew( io ); … … 315 418 tr_peerIo * io = vio; 316 419 420 event_del( &io->event_read ); 421 event_del( &io->event_write ); 317 422 tr_peerIoSetBandwidth( io, NULL ); 318 423 evbuffer_free( io->outbuf ); … … 378 483 } 379 484 380 #if 0 381 static void 382 tr_peerIoTryRead( tr_peerIo * io ) 383 { 384 if( EVBUFFER_LENGTH( tr_iobuf_input( io->iobuf ))) 385 (*canReadWrapper)( io->iobuf, ~0, io ); 386 } 387 #endif 388 389 void 390 tr_peerIoSetIOFuncs( tr_peerIo * io, 391 tr_can_read_cb readcb, 392 tr_did_write_cb writecb, 393 tr_net_error_cb errcb, 394 void * userData ) 485 void 486 tr_peerIoSetIOFuncs( tr_peerIo * io, 487 tr_can_read_cb readcb, 488 tr_did_write_cb writecb, 489 tr_net_error_cb errcb, 490 void * userData ) 395 491 { 396 492 io->canRead = readcb; … … 398 494 io->gotError = errcb; 399 495 io->userData = userData; 400 401 #if 0402 tr_peerIoTryRead( io );403 #endif404 496 } 405 497 … … 436 528 return -1; 437 529 } 438 439 #if 0440 void441 tr_peerIoSetTimeoutSecs( tr_peerIo * io,442 int secs )443 {444 io->timeout = secs;445 tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );446 tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );447 }448 #endif449 530 450 531 /** … … 564 645 const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP ); 565 646 const double period = 20; /* arbitrary */ 566 return MAX( maxBlockSize*20.5, currentSpeed*1024*period ); 647 const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */ 648 return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period ); 567 649 } 568 650 … … 805 887 dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(errno):"") ); 806 888 807 if( res > 0)808 canReadWrapper( io , res, io);889 if( EVBUFFER_LENGTH( io->inbuf ) ) 890 canReadWrapper( io ); 809 891 810 892 if( ( res <= 0 ) && ( io->gotError ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) ) … … 827 909 828 910 howmuch = tr_bandwidthClamp( io->bandwidth, TR_UP, howmuch ); 829 howmuch = MIN( howmuch, EVBUFFER_LENGTH( io->outbuf ) ); 830 n = (int) howmuch; 831 832 #ifdef WIN32 833 n = send( io->socket, EVBUFFER_DATA( io->outbuf ), n, 0 ); 834 #else 835 n = write( io->socket, EVBUFFER_DATA( io->outbuf ), n ); 836 #endif 837 dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(errno):"") ); 911 912 n = tr_evbuffer_write( io, io->socket, (int)howmuch ); 838 913 839 914 if( n > 0 ) 840 { 841 evbuffer_drain( io->outbuf, n ); 842 843 didWriteWrapper( NULL, n, io ); 844 } 845 846 if( ( n < 0 ) && ( io->gotError ) && ( errno != EPIPE ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) ) 847 { 915 didWriteWrapper( io, n ); 916 917 if( ( n < 0 ) && ( io->gotError ) && ( errno != EPIPE ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) ) { 848 918 short what = EVBUFFER_WRITE | EVBUFFER_ERROR; 849 919 io->gotError( io, what, io->userData ); … … 876 946 return io->inbuf; 877 947 } 948 949 tr_bool 950 tr_peerIoHasBandwidthLeft( const tr_peerIo * io, tr_direction dir ) 951 { 952 assert( isPeerIo( io ) ); 953 assert( dir==TR_UP || dir==TR_DOWN ); 954 955 return tr_bandwidthClamp( io->bandwidth, dir, 1024 ) > 0; 956 } 957 958 /*** 959 **** 960 ****/ 961 962 static void 963 event_enable( tr_peerIo * io, short event ) 964 { 965 assert( isPeerIo( io ) ); 966 967 if( event & EV_READ ) 968 event_add( &io->event_read, NULL ); 969 970 if( event & EV_WRITE ) 971 event_add( &io->event_write, NULL ); 972 } 973 974 static void 975 event_disable( struct tr_peerIo * io, short event ) 976 { 977 assert( isPeerIo( io ) ); 978 979 if( event & EV_READ ) 980 event_del( &io->event_read ); 981 982 if( event & EV_WRITE ) 983 event_del( &io->event_write ); 984 } 985 986 987 void 988 tr_peerIoSetEnabled( tr_peerIo * io, 989 tr_direction dir, 990 tr_bool isEnabled ) 991 { 992 const short event = dir == TR_UP ? EV_WRITE : EV_READ; 993 994 if( isEnabled ) 995 event_enable( io, event ); 996 else 997 event_disable( io, event ); 998 } -
trunk/libtransmission/peer-io.h
r7419 r7441 26 26 struct tr_bandwidth; 27 27 struct tr_crypto; 28 struct tr_iobuf;29 28 typedef struct tr_peerIo tr_peerIo; 30 29 … … 214 213 **/ 215 214 215 tr_bool tr_peerIoHasBandwidthLeft( const tr_peerIo * io, 216 tr_direction direction ); 217 218 void tr_peerIoSetEnabled( tr_peerIo * io, 219 tr_direction dir, 220 tr_bool isEnabled ); 221 216 222 int tr_peerIoFlush( tr_peerIo * io, 217 223 tr_direction dir, -
trunk/libtransmission/peer-mgr.c
r7434 r7441 58 58 59 59 /* how frequently to reallocate bandwidth */ 60 BANDWIDTH_PERIOD_MSEC = 100,60 BANDWIDTH_PERIOD_MSEC = 500, 61 61 62 62 /* max # of peers to ask fer per torrent per reconnect pulse */ … … 144 144 tr_ptrArray * torrents; /* Torrent */ 145 145 tr_ptrArray * incomingHandshakes; /* tr_handshake */ 146 tr_ptrArray * finishedHandshakes; /* tr_handshake */ 146 147 tr_timer * bandwidthTimer; 147 148 }; … … 464 465 m->torrents = tr_ptrArrayNew( ); 465 466 m->incomingHandshakes = tr_ptrArrayNew( ); 467 m->finishedHandshakes = tr_ptrArrayNew( ); 466 468 m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC ); 467 469 return m; … … 471 473 tr_peerMgrFree( tr_peerMgr * manager ) 472 474 { 475 tr_handshake * handshake; 476 473 477 managerLock( manager ); 474 478 … … 481 485 482 486 tr_ptrArrayFree( manager->incomingHandshakes, NULL ); 487 488 while(( handshake = tr_ptrArrayPop( manager->finishedHandshakes ))) 489 tr_handshakeFree( handshake ); 490 491 tr_ptrArrayFree( manager->finishedHandshakes, NULL ); 483 492 484 493 /* free the torrents. */ … … 1195 1204 /* FIXME: this is kind of a mess. */ 1196 1205 static tr_bool 1197 myHandshakeDoneCB( tr_handshake * handshake,1198 tr_peerIo *io,1206 myHandshakeDoneCB( tr_handshake * handshake, 1207 tr_peerIo * io, 1199 1208 int isConnected, 1200 1209 const uint8_t * peer_id, 1201 void *vmanager )1210 void * vmanager ) 1202 1211 { 1203 1212 tr_bool ok = isConnected; … … 1241 1250 ++atom->numFails; 1242 1251 } 1243 1244 tr_peerIoFree( io );1245 1252 } 1246 1253 else /* looking good */ … … 1256 1263 tordbg( t, "banned peer %s tried to reconnect", 1257 1264 tr_peerIoAddrStr( &atom->addr, atom->port ) ); 1258 tr_peerIoFree( io );1259 1265 } 1260 1266 else if( tr_peerIoIsIncoming( io ) … … 1262 1268 1263 1269 { 1264 tr_peerIoFree( io );1265 1270 } 1266 1271 else … … 1270 1275 if( peer ) /* we already have this peer */ 1271 1276 { 1272 tr_peerIoFree( io );1273 1277 } 1274 1278 else … … 1286 1290 1287 1291 peer->port = port; 1288 peer->io = io;1292 peer->io = tr_handshakeStealIO( handshake ); 1289 1293 tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag ); 1290 1294 tr_peerIoSetBandwidth( io, peer->bandwidth ); … … 1294 1298 } 1295 1299 } 1300 1301 if( !success ) 1302 tr_ptrArrayAppend( manager->finishedHandshakes, handshake ); 1296 1303 1297 1304 if( t ) … … 1545 1552 } 1546 1553 1547 #warning this for loop can be removed when we are sure the bug is fixed1548 for( i=0; i<peersReturning; ++i )1549 assert( tr_isAddress( &pex[i].addr ) );1550 1551 1554 assert( ( walk - pex ) == peersReturning ); 1552 1555 qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare ); 1553 1554 #warning this for loop can be removed when we are sure the bug is fixed1555 for( i=0; i<peersReturning; ++i )1556 assert( tr_isAddress( &pex[i].addr ) );1557 1556 1558 1557 *setme_pex = pex; … … 2395 2394 bandwidthPulse( void * vmgr ) 2396 2395 { 2396 tr_handshake * handshake; 2397 2397 tr_peerMgr * mgr = vmgr; 2398 2398 managerLock( mgr ); 2399 2399 2400 /* FIXME: this next line probably isn't necessary... */ 2400 2401 pumpAllPeers( mgr ); 2402 2403 /* allocate bandwidth to the peers */ 2401 2404 tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC ); 2402 2405 tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC ); 2403 pumpAllPeers( mgr ); 2406 2407 /* free all the finished handshakes */ 2408 while(( handshake = tr_ptrArrayPop( mgr->finishedHandshakes ))) 2409 tr_handshakeFree( handshake ); 2404 2410 2405 2411 managerUnlock( mgr );
Note: See TracChangeset
for help on using the changeset viewer.