Changeset 3295
- Timestamp:
- Oct 6, 2007, 6:20:52 PM (15 years ago)
- Location:
- trunk/libtransmission
- Files:
-
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/libtransmission/handshake.c
r3291 r3295 158 158 { 159 159 va_list args; 160 const char * addr = tr_peerIoGetAddrStr( handshake->io );161 160 struct evbuffer * buf = evbuffer_new( ); 162 evbuffer_add_printf( buf, "[%s:%d] %s (%p) ", file, line, addr, handshake->io ); 161 char timestr[64]; 162 evbuffer_add_printf( buf, "[%s] %s: ", 163 tr_getLogTimeStr( timestr, sizeof(timestr) ), 164 tr_peerIoGetAddrStr( handshake->io ) ); 163 165 va_start( args, fmt ); 164 166 evbuffer_add_vprintf( buf, fmt, args ); 165 167 va_end( args ); 166 fprintf( stderr, "%s\n", EVBUFFER_DATA(buf) ); 168 evbuffer_add_printf( buf, " (%s:%d)\n", file, line ); 169 170 fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp ); 167 171 evbuffer_free( buf ); 168 172 } … … 581 585 /* FIXME: use readHandshake here */ 582 586 583 dbgmsg( handshake, "payload: need %d, got %d \n", (int)HANDSHAKE_SIZE, (int)EVBUFFER_LENGTH(inbuf) );587 dbgmsg( handshake, "payload: need %d, got %d", (int)HANDSHAKE_SIZE, (int)EVBUFFER_LENGTH(inbuf) ); 584 588 585 589 if( EVBUFFER_LENGTH(inbuf) < HANDSHAKE_SIZE ) … … 975 979 struct evbuffer * inbuf = EVBUFFER_INPUT ( evin ); 976 980 ReadState ret; 977 dbgmsg( handshake, "handling canRead; state is [%s] \n", getStateName(handshake->state) );981 dbgmsg( handshake, "handling canRead; state is [%s]", getStateName(handshake->state) ); 978 982 979 983 switch( handshake->state ) -
trunk/libtransmission/peer-mgr-private.h
r3273 r3295 57 57 tr_publisher_tag msgsTag; 58 58 59 struct tr_ratecontrol * rateToClient; 60 struct tr_ratecontrol * rateToPeer; 59 struct tr_ratecontrol * rcToClient; 60 struct tr_ratecontrol * rcToPeer; 61 62 double rateToClient; 63 double rateToPeer; 61 64 } 62 65 tr_peer; -
trunk/libtransmission/peer-mgr.c
r3290 r3295 40 40 { 41 41 /* how frequently to change which peers are choked */ 42 RECHOKE_PERIOD_MSEC = (1 5 * 1000),42 RECHOKE_PERIOD_MSEC = (1000), 43 43 44 44 /* how frequently to decide which peers live and die */ … … 46 46 47 47 /* how frequently to refill peers' request lists */ 48 REFILL_PERIOD_MSEC = 1500,48 REFILL_PERIOD_MSEC = 666, 49 49 50 50 /* don't change a peer's choke status more often than this */ … … 61 61 /* how many peers to unchoke per-torrent. */ 62 62 /* FIXME: make this user-configurable? */ 63 NUM_UNCHOKED_PEERS_PER_TORRENT = 16, /* arbitrary */ 64 65 /* another arbitrary number */ 66 MAX_RECONNECTIONS_PER_MINUTE = 60, 67 68 MAX_RECONNECTIONS_PER_PULSE = 69 ((MAX_RECONNECTIONS_PER_MINUTE * RECONNECT_PERIOD_MSEC) / (60*1000)), 63 NUM_UNCHOKED_PEERS_PER_TORRENT = 12, /* arbitrary */ 64 65 /* set this too high and there will be a lot of churn. 66 * set it too low and you'll get peers too slowly */ 67 MAX_RECONNECTIONS_PER_PULSE = 8, 70 68 71 69 /* corresponds to ut_pex's added.f flags */ … … 132 130 va_list args; 133 131 struct evbuffer * buf = evbuffer_new( ); 134 evbuffer_add_printf( buf, "[%s:%d] %s ", file, line, t->tor->info.name ); 132 char timestr[64]; 133 evbuffer_add_printf( buf, "[%s] %s: ", 134 tr_getLogTimeStr( timestr, sizeof(timestr) ), 135 t->tor->info.name ); 135 136 va_start( args, fmt ); 136 137 evbuffer_add_vprintf( buf, fmt, args ); 137 138 va_end( args ); 138 fprintf( fp, "%s\n", EVBUFFER_DATA(buf) ); 139 evbuffer_add_printf( buf, " (%s:%d)\n", file, line ); 140 141 fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp ); 139 142 evbuffer_free( buf ); 140 143 } … … 302 305 tr_peer * p; 303 306 p = tr_new0( tr_peer, 1 ); 304 p->r ateToClient = tr_rcInit( );305 p->r ateToPeer = tr_rcInit( );307 p->rcToClient = tr_rcInit( ); 308 p->rcToPeer = tr_rcInit( ); 306 309 memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) ); 307 310 return p; … … 339 342 tr_bitfieldFree( peer->blame ); 340 343 tr_bitfieldFree( peer->banned ); 341 tr_rcClose( peer->r ateToClient );342 tr_rcClose( peer->r ateToPeer );344 tr_rcClose( peer->rcToClient ); 345 tr_rcClose( peer->rcToPeer ); 343 346 tr_free( peer->client ); 344 347 tr_free( peer ); … … 698 701 699 702 torrentLock( t ); 703 tordbg( t, "Refilling Request Buffers..." ); 700 704 701 705 blocks = getPreferredBlocks( t, &blockCount ); … … 747 751 748 752 t->refillTimer = NULL; 749 750 753 torrentUnlock( t ); 751 754 return FALSE; … … 796 799 refillPulse, t, 797 800 REFILL_PERIOD_MSEC ); 801 break; 802 803 case TR_PEERMSG_CANCEL: 804 tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) ); 798 805 break; 799 806 … … 906 913 const struct in_addr * addr = tr_peerIoGetAddress( io, &port ); 907 914 ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING ); 915 tr_free( peer->client ); 916 peer->client = peer_id ? tr_clientForId( peer_id ) : NULL; 908 917 if( peer->msgs != NULL ) { /* we already have this peer */ 909 918 tr_peerIoFree( io ); … … 913 922 peer->io = io; 914 923 peer->msgs = tr_peerMsgsNew( t->tor, peer, msgsCallbackFunc, t, &peer->msgsTag ); 915 tr_free( peer->client );916 peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;917 924 } 918 925 } … … 1258 1265 ++setmePeersFrom[atom->from]; 1259 1266 1260 if( tr_rcRate( peer->rateToPeer )> 0.01 )1267 if( peer->rateToPeer > 0.01 ) 1261 1268 ++*setmePeersGettingFromUs; 1262 1269 1263 if( tr_rcRate( peer->rateToClient )> 0.01 )1270 if( peer->rateToClient > 0.01 ) 1264 1271 ++*setmePeersSendingToUs; 1265 1272 } … … 1298 1305 stat->progress = peer->progress; 1299 1306 stat->isEncrypted = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0; 1300 stat->uploadToRate = tr_rcRate( peer->rateToPeer );1301 stat->downloadFromRate = tr_rcRate( peer->rateToClient );1307 stat->uploadToRate = peer->rateToPeer; 1308 stat->downloadFromRate = peer->rateToClient; 1302 1309 stat->isDownloading = stat->uploadToRate > 0.01; 1303 1310 stat->isUploading = stat->downloadFromRate > 0.01; … … 1356 1363 **/ 1357 1364 1365 static double 1366 getWeightedThroughput( const tr_peer * peer ) 1367 { 1368 return ( 3 * peer->rateToPeer ) 1369 + ( 1 * peer->rateToClient ); 1370 } 1371 1358 1372 static void 1359 1373 rechoke( Torrent * t ) … … 1378 1392 node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer); 1379 1393 node->randomKey = tr_rand( INT_MAX ); 1380 node->rate = (3*tr_rcRate(peer->rateToPeer)) 1381 + (1*tr_rcRate(peer->rateToClient)); 1394 node->rate = getWeightedThroughput( peer ); 1382 1395 } 1383 1396 … … 1439 1452 const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr ); 1440 1453 const int peerIsSeed = atom->flags & ADDED_F_SEED_FLAG; 1441 const double throughput = (3*tr_rcRate(peer->rateToPeer)) 1442 + (1*tr_rcRate(peer->rateToClient)); 1454 const double throughput = getWeightedThroughput( peer ); 1443 1455 1444 1456 assert( atom != NULL ); … … 1546 1558 /* add some new ones */ 1547 1559 nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - peerCount; 1548 for( i=0; i<nAdd && i<nCandidates; ++i ) 1560 for( i=0; i<nAdd && i<nCandidates && i<MAX_RECONNECTIONS_PER_PULSE; ++i ) 1561 //for( i=0; i<nCandidates; ++i ) 1549 1562 { 1550 1563 tr_peerMgr * mgr = t->manager; -
trunk/libtransmission/peer-msgs.c
r3290 r3295 69 69 KEEPALIVE_INTERVAL_SECS = 90, /* idle seconds before we send a keepalive */ 70 70 PEX_INTERVAL = (60 * 1000), /* msec between calls to sendPex() */ 71 PEER_PULSE_INTERVAL = (33), /* msec between calls to pulse() */ 71 PEER_PULSE_INTERVAL = (66), /* msec between calls to pulse() */ 72 RATE_PULSE_INTERVAL = (333), /* msec between calls to ratePulse() */ 72 73 }; 73 74 … … 88 89 89 90 static int 90 peer_request_compare( const void * va, const void * vb )91 compareRequest( const void * va, const void * vb ) 91 92 { 92 93 struct peer_request * a = (struct peer_request*) va; … … 112 113 tr_list * peerAskedFor; 113 114 tr_list * clientAskedFor; 114 115 tr_list * clientWillAskFor; 116 117 tr_timer * rateTimer; 115 118 tr_timer * pulseTimer; 116 119 tr_timer * pexTimer; … … 131 134 132 135 uint8_t state; 133 134 136 uint8_t ut_pex_id; 135 136 137 uint16_t pexCount; 137 138 138 uint32_t incomingMessageLength; 139 uint32_t maxActiveRequests; 140 uint32_t minActiveRequests; 139 141 140 142 tr_pex * pex; … … 154 156 { 155 157 va_list args; 156 const char * addr = tr_peerIoGetAddrStr( msgs->io );157 158 struct evbuffer * buf = evbuffer_new( ); 158 evbuffer_add_printf( buf, "[%s:%d] %s (%p) ", file, line, addr, msgs->io ); 159 char timestr[64]; 160 evbuffer_add_printf( buf, "[%s] %s [%s]: ", 161 tr_getLogTimeStr( timestr, sizeof(timestr) ), 162 tr_peerIoGetAddrStr( msgs->io ), 163 msgs->info->client ); 159 164 va_start( args, fmt ); 160 165 evbuffer_add_vprintf( buf, fmt, args ); 161 166 va_end( args ); 162 fprintf( fp, "%s\n", EVBUFFER_DATA(buf) ); 167 evbuffer_add_printf( buf, " (%s:%d)\n", file, line ); 168 169 fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp ); 163 170 evbuffer_free( buf ); 164 171 } 165 172 } 166 173 167 #define dbgmsg(handshake, fmt...) myDebug(__FILE__, __LINE__, handshake, ##fmt ) 174 #define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt ) 175 176 /** 177 *** 178 **/ 179 180 static void 181 protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req ) 182 { 183 tr_peerIo * io = msgs->io; 184 struct evbuffer * out = msgs->outMessages; 185 186 dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length ); 187 tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) ); 188 tr_peerIoWriteUint8 ( io, out, BT_REQUEST ); 189 tr_peerIoWriteUint32( io, out, req->index ); 190 tr_peerIoWriteUint32( io, out, req->offset ); 191 tr_peerIoWriteUint32( io, out, req->length ); 192 } 193 194 static void 195 protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req ) 196 { 197 tr_peerIo * io = msgs->io; 198 struct evbuffer * out = msgs->outMessages; 199 200 dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length ); 201 tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) ); 202 tr_peerIoWriteUint8 ( io, out, BT_CANCEL ); 203 tr_peerIoWriteUint32( io, out, req->index ); 204 tr_peerIoWriteUint32( io, out, req->offset ); 205 tr_peerIoWriteUint32( io, out, req->length ); 206 } 207 208 static void 209 protocolSendHave( tr_peermsgs * msgs, uint32_t index ) 210 { 211 tr_peerIo * io = msgs->io; 212 struct evbuffer * out = msgs->outMessages; 213 214 dbgmsg( msgs, "sending Have %u", index ); 215 tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) ); 216 tr_peerIoWriteUint8 ( io, out, BT_HAVE ); 217 tr_peerIoWriteUint32( io, out, index ); 218 } 219 220 static void 221 protocolSendChoke( tr_peermsgs * msgs, int choke ) 222 { 223 tr_peerIo * io = msgs->io; 224 struct evbuffer * out = msgs->outMessages; 225 226 dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") ); 227 tr_peerIoWriteUint32( io, out, sizeof(uint8_t) ); 228 tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE ); 229 } 168 230 169 231 /** … … 192 254 tr_peermsgs_event e = blankEvent; 193 255 e.eventType = TR_PEERMSG_NEED_REQ; 256 dbgmsg( msgs, "firing NEED_REQ" ); 194 257 publish( msgs, &e ); 195 258 } … … 221 284 e.offset = offset; 222 285 e.length = length; 286 publish( msgs, &e ); 287 } 288 289 static void 290 fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req ) 291 { 292 tr_peermsgs_event e = blankEvent; 293 e.eventType = TR_PEERMSG_CANCEL; 294 e.pieceIndex = req->index; 295 e.offset = req->offset; 296 e.length = req->length; 223 297 publish( msgs, &e ); 224 298 } … … 278 352 279 353 msgs->info->clientIsInterested = weAreInterested; 280 dbgmsg( msgs, " : sending an %s message",281 weAreInterested ? "I NTERESTED" : "NOT_INTERESTED");354 dbgmsg( msgs, "Sending %s", 355 weAreInterested ? "Interested" : "Not Interested"); 282 356 283 357 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) ); … … 322 396 } 323 397 324 dbgmsg( msgs, "sending a %s message", (choke ? "CHOKE" : "UNCHOKE") ); 325 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) ); 326 tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, 327 choke ? BT_CHOKE : BT_UNCHOKE ); 398 protocolSendChoke( msgs, choke ); 328 399 msgs->info->chokeChangedAt = time( NULL ); 329 400 } … … 335 406 336 407 void 337 tr_peerMsgsCancel( tr_peermsgs * msgs,338 uint32_t pieceIndex,339 uint32_t offset,340 uint32_t length )341 {342 tr_list * node;343 struct peer_request tmp;344 345 assert( msgs != NULL );346 assert( length > 0 );347 348 /* have we asked the peer for this piece? */349 tmp.index = pieceIndex;350 tmp.offset = offset;351 tmp.length = length;352 node = tr_list_remove( &msgs->clientAskedFor, &tmp, peer_request_compare );353 354 /* if so, send a cancel message */355 if( node != NULL ) {356 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + 3*sizeof(uint32_t) );357 tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_CANCEL );358 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );359 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );360 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );361 tr_free( node );362 }363 }364 365 /**366 ***367 **/368 369 void370 408 tr_peerMsgsHave( tr_peermsgs * msgs, 371 uint32_t pieceIndex ) 372 { 373 dbgmsg( msgs, "w00t telling them we HAVE piece #%d", pieceIndex ); 374 375 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 376 sizeof(uint8_t) + sizeof(uint32_t) ); 377 tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_HAVE ); 378 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex ); 379 409 uint32_t index ) 410 { 411 protocolSendHave( msgs, index ); 412 413 /* since we have more pieces now, we might not be interested in this peer */ 380 414 updateInterest( msgs ); 381 415 } … … 452 486 453 487 static int 454 pulse( void * vmsgs );455 456 static int457 488 reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length ) 458 489 { … … 477 508 } 478 509 510 static void 511 pumpRequestQueue( tr_peermsgs * msgs ) 512 { 513 const int max = msgs->maxActiveRequests; 514 const int min = msgs->minActiveRequests; 515 int count = tr_list_size( msgs->clientAskedFor ); 516 int sent = 0; 517 518 if( count > min ) 519 return; 520 if( msgs->info->clientIsChoked ) 521 return; 522 523 while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) ) 524 { 525 struct peer_request * req = tr_list_pop_front( &msgs->clientWillAskFor ); 526 protocolSendRequest( msgs, req ); 527 req->time_requested = msgs->lastReqAddedAt = time( NULL ); 528 tr_list_append( &msgs->clientAskedFor, req ); 529 ++count; 530 ++sent; 531 } 532 533 if( sent ) 534 dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued", 535 sent, 536 tr_list_size(msgs->clientAskedFor), 537 tr_list_size(msgs->clientWillAskFor) ); 538 539 if( count < max ) 540 fireNeedReq( msgs ); 541 } 542 479 543 int 480 544 tr_peerMsgsAddRequest( tr_peermsgs * msgs, … … 483 547 uint32_t length ) 484 548 { 549 const int req_max = msgs->maxActiveRequests; 485 550 struct peer_request tmp, *req; 486 int maxSize;487 551 488 552 assert( msgs != NULL ); … … 490 554 assert( reqIsValid( msgs, index, offset, length ) ); 491 555 492 if( msgs->info->clientIsChoked ) 556 /** 557 *** Reasons to decline the request 558 **/ 559 560 /* don't send requests to choked clients */ 561 if( msgs->info->clientIsChoked ) { 562 dbgmsg( msgs, "declining request because they're choking us" ); 493 563 return TR_ADDREQ_CLIENT_CHOKED; 494 564 } 565 566 /* peer doesn't have this piece */ 495 567 if( !tr_bitfieldHas( msgs->info->have, index ) ) 496 568 return TR_ADDREQ_MISSING; 497 569 498 maxSize = MIN( 3 + (int)(tr_rcRate(msgs->info->rateToClient)/5), 100 ); 499 if( tr_list_size( msgs->clientAskedFor) >= maxSize ) 570 /* peer's queue is full */ 571 if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) { 572 dbgmsg( msgs, "declining request because we're full" ); 500 573 return TR_ADDREQ_FULL; 574 } 501 575 502 576 /* have we already asked for this piece? */ … … 504 578 tmp.offset = offset; 505 579 tmp.length = length; 506 if( tr_list_remove( &msgs->clientAskedFor, &tmp, peer_request_compare ) != NULL ) 580 if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) { 581 dbgmsg( msgs, "declining because it's a duplicate" ); 507 582 return TR_ADDREQ_DUPLICATE; 508 509 dbgmsg( msgs, "w00t peer has a max request queue size of %d... adding request for piece %d, offset %d", maxSize, (int)index, (int)offset );510 511 /* queue the request */512 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + 3*sizeof(uint32_t) );513 tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_REQUEST ); 514 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, index );515 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );516 tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );517 518 /* add it to our `requests sent' list */519 req = tr_new ( struct peer_request, 1 );583 } 584 if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) { 585 dbgmsg( msgs, "declining because it's a duplicate" ); 586 return TR_ADDREQ_DUPLICATE; 587 } 588 589 /** 590 *** Accept this request 591 **/ 592 593 dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset ); 594 req = tr_new0( struct peer_request, 1 ); 520 595 *req = tmp; 521 req->time_requested = msgs->lastReqAddedAt = time( NULL ); 522 tr_list_append( &msgs->clientAskedFor, req ); 523 pulse( msgs ); 524 596 tr_list_append( &msgs->clientWillAskFor, req ); 525 597 return TR_ADDREQ_OK; 598 } 599 600 static void 601 tr_peerMsgsCancelAllRequests( tr_peermsgs * msgs ) 602 { 603 struct peer_request * req; 604 605 while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) )) 606 { 607 fireCancelledReq( msgs, req ); 608 tr_free( req ); 609 } 610 611 while(( req = tr_list_pop_front( &msgs->clientAskedFor ) )) 612 { 613 fireCancelledReq( msgs, req ); 614 protocolSendCancel( msgs, req ); 615 tr_free( req ); 616 } 617 } 618 619 void 620 tr_peerMsgsCancel( tr_peermsgs * msgs, 621 uint32_t pieceIndex, 622 uint32_t offset, 623 uint32_t length ) 624 { 625 struct peer_request *req, tmp; 626 627 assert( msgs != NULL ); 628 assert( length > 0 ); 629 630 /* have we asked the peer for this piece? */ 631 tmp.index = pieceIndex; 632 tmp.offset = offset; 633 tmp.length = length; 634 635 /* if it's only in the queue and hasn't been sent yet, free it */ 636 if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) )) 637 { 638 fireCancelledReq( msgs, req ); 639 tr_free( req ); 640 } 641 642 /* if it's already been sent, send a cancel message too */ 643 if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) )) 644 { 645 protocolSendCancel( msgs, req ); 646 fireCancelledReq( msgs, req ); 647 tr_free( req ); 648 } 526 649 } 527 650 … … 712 835 713 836 if( len == 0 ) /* peer sent us a keepalive message */ 714 dbgmsg( msgs, " peer sent us a keepalive message..." );837 dbgmsg( msgs, "got KeepAlive" ); 715 838 else { 716 dbgmsg( msgs, "peer is sending us a message with %"PRIu64" bytes...", (uint64_t)len );717 839 msgs->incomingMessageLength = len; 718 840 msgs->state = AWAITING_BT_MESSAGE; … … 743 865 tr_peerIoReadUint8( msgs->io, inbuf, &id ); 744 866 msglen--; 745 dbgmsg( msgs, "peer sent us a message... " 746 "bt id number is %d, and remaining len is %d", (int)id, (int)msglen ); 867 dbgmsg( msgs, "got BT id %d, len %d", (int)id, (int)msglen ); 747 868 748 869 switch( id ) 749 870 { 750 871 case BT_CHOKE: 751 dbgmsg( msgs, " w00t peer sent us a BT_CHOKE" );872 dbgmsg( msgs, "got Choke" ); 752 873 assert( msglen == 0 ); 753 874 msgs->info->clientIsChoked = 1; 754 875 #if 0 755 876 tr_list * walk; 756 877 for( walk = msgs->peerAskedFor; walk != NULL; ) … … 766 887 walk = next; 767 888 } 768 tr_list_free( &msgs->clientAskedFor, tr_free ); 889 #endif 890 tr_peerMsgsCancelAllRequests( msgs ); 769 891 break; 770 892 771 893 case BT_UNCHOKE: 772 dbgmsg( msgs, " w00t peer sent us a BT_UNCHOKE" );894 dbgmsg( msgs, "got Unchoke" ); 773 895 assert( msglen == 0 ); 774 896 msgs->info->clientIsChoked = 0; … … 777 899 778 900 case BT_INTERESTED: 779 dbgmsg( msgs, " w00t peer sent us a BT_INTERESTED" );901 dbgmsg( msgs, "got Interested" ); 780 902 assert( msglen == 0 ); 781 903 msgs->info->peerIsInterested = 1; … … 784 906 785 907 case BT_NOT_INTERESTED: 786 dbgmsg( msgs, " w00t peer sent us a BT_NOT_INTERESTED" );908 dbgmsg( msgs, "got Not Interested" ); 787 909 assert( msglen == 0 ); 788 910 msgs->info->peerIsInterested = 0; … … 790 912 791 913 case BT_HAVE: 792 dbgmsg( msgs, "w00t peer sent us a BT_HAVE" );793 914 assert( msglen == 4 ); 794 915 tr_peerIoReadUint32( msgs->io, inbuf, &ui32 ); 795 916 tr_bitfieldAdd( msgs->info->have, ui32 ); 796 917 updatePeerProgress( msgs ); 918 dbgmsg( msgs, "got Have: %u", ui32 ); 797 919 break; 798 920 799 921 case BT_BITFIELD: { 800 922 const int clientIsSeed = tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE; 801 dbgmsg( msgs, "w00t peer sent us a BT_BITFIELD" );802 923 assert( msglen == msgs->info->have->len ); 803 924 tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen ); … … 810 931 case BT_REQUEST: { 811 932 struct peer_request * req; 812 dbgmsg( msgs, "peer sent us a BT_REQUEST" );813 933 assert( msglen == 12 ); 814 934 req = tr_new( struct peer_request, 1 ); … … 816 936 tr_peerIoReadUint32( msgs->io, inbuf, &req->offset ); 817 937 tr_peerIoReadUint32( msgs->io, inbuf, &req->length ); 938 dbgmsg( msgs, "got Request: %u:%u->%u", req->index, req->offset, req->length ); 818 939 819 940 if ( !requestIsValid( msgs, req ) ) … … 868 989 struct peer_request req; 869 990 void * data; 870 dbgmsg( msgs, "peer sent us a BT_CANCEL" );871 991 assert( msglen == 12 ); 872 992 tr_peerIoReadUint32( msgs->io, inbuf, &req.index ); 873 993 tr_peerIoReadUint32( msgs->io, inbuf, &req.offset ); 874 994 tr_peerIoReadUint32( msgs->io, inbuf, &req.length ); 875 data = tr_list_remove( &msgs->peerAskedFor, &req, peer_request_compare ); 995 dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length ); 996 data = tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ); 876 997 tr_free( data ); 877 998 break; … … 879 1000 880 1001 case BT_PIECE: { 881 dbgmsg( msgs, " peer sent us a BT_PIECE" );1002 dbgmsg( msgs, "got a Piece!" ); 882 1003 assert( msgs->blockToUs.length == 0 ); 883 1004 tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index ); … … 892 1013 893 1014 case BT_PORT: { 894 dbgmsg( msgs, " peer sent usa BT_PORT" );1015 dbgmsg( msgs, "Got a BT_PORT" ); 895 1016 assert( msglen == 2 ); 896 1017 tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port ); … … 905 1026 case BT_HAVE_ALL: { 906 1027 assert( msglen == 0 ); 907 dbgmsg( msgs, " peer sent usa BT_HAVE_ALL" );1028 dbgmsg( msgs, "Got a BT_HAVE_ALL" ); 908 1029 memset( msgs->info->have->bits, 1, msgs->info->have->len ); 909 1030 updatePeerProgress( msgs ); … … 913 1034 case BT_HAVE_NONE: { 914 1035 assert( msglen == 0 ); 915 dbgmsg( msgs, " peer sent usa BT_HAVE_NONE" );1036 dbgmsg( msgs, "Got a BT_HAVE_NONE" ); 916 1037 memset( msgs->info->have->bits, 1, msgs->info->have->len ); 917 1038 updatePeerProgress( msgs ); … … 923 1044 tr_list * node; 924 1045 assert( msglen == 12 ); 925 dbgmsg( msgs, " peer sent usa BT_REJECT" );1046 dbgmsg( msgs, "Got a BT_REJECT" ); 926 1047 tr_peerIoReadUint32( msgs->io, inbuf, &req.index ); 927 1048 tr_peerIoReadUint32( msgs->io, inbuf, &req.offset ); 928 1049 tr_peerIoReadUint32( msgs->io, inbuf, &req.length ); 929 node = tr_list_find( msgs->peerAskedFor, &req, peer_request_compare);1050 node = tr_list_find( msgs->peerAskedFor, &req, compareRequest ); 930 1051 if( node != NULL ) { 931 1052 void * data = node->data; … … 939 1060 case BT_ALLOWED_FAST: { 940 1061 assert( msglen == 4 ); 941 dbgmsg( msgs, " peer sent usa BT_ALLOWED_FAST" );1062 dbgmsg( msgs, "Got a BT_ALLOWED_FAST" ); 942 1063 tr_peerIoReadUint32( msgs->io, inbuf, &ui32 ); 943 1064 tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 ); … … 946 1067 947 1068 case BT_LTEP: 948 dbgmsg( msgs, " peer sent usa BT_LTEP" );1069 dbgmsg( msgs, "Got a BT_LTEP" ); 949 1070 parseLtep( msgs, msglen, inbuf ); 950 1071 break; … … 968 1089 tor->downloadedCur += byteCount; 969 1090 msgs->info->pieceDataActivityDate = time( NULL ); 970 tr_rcTransferred( msgs->info->r ateToClient, byteCount );1091 tr_rcTransferred( msgs->info->rcToClient, byteCount ); 971 1092 tr_rcTransferred( tor->download, byteCount ); 972 1093 tr_rcTransferred( tor->handle->download, byteCount ); … … 980 1101 tor->uploadedCur += byteCount; 981 1102 msgs->info->pieceDataActivityDate = time( NULL ); 982 tr_rcTransferred( msgs->info->r ateToPeer, byteCount );1103 tr_rcTransferred( msgs->info->rcToPeer, byteCount ); 983 1104 tr_rcTransferred( tor->upload, byteCount ); 984 1105 tr_rcTransferred( tor->handle->upload, byteCount ); … … 1055 1176 key.length = length; 1056 1177 req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key, 1057 peer_request_compare);1178 compareRequest ); 1058 1179 if( req == NULL ) { 1059 1180 gotUnwantedBlock( msgs, index, offset, length ); … … 1061 1182 return; 1062 1183 } 1063 dbgmsg( msgs, "w00t peer sent us a block. turnaround time was %d seconds", 1184 dbgmsg( msgs, "Got block %u:%u->%u (turnaround time %d secs)", 1185 req->index, req->offset, req->length, 1064 1186 (int)(time(NULL) - req->time_requested) ); 1065 1187 tr_free( req ); … … 1097 1219 1098 1220 fireGotBlock( msgs, index, offset, length ); 1099 fireNeedReq( msgs );1100 1221 1101 1222 /** … … 1142 1263 if( !msgs->blockToUs.length ) 1143 1264 { 1144 dbgmsg( msgs, " w00t -- got block index %u, offset%u", msgs->blockToUs.index, msgs->blockToUs.offset );1265 dbgmsg( msgs, "got block %u:%u", msgs->blockToUs.index, msgs->blockToUs.offset ); 1145 1266 assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) ); 1146 1267 gotBlock( msgs, msgs->inBlock, … … 1220 1341 1221 1342 static int 1343 ratePulse( void * vmsgs ) 1344 { 1345 tr_peermsgs * msgs = (tr_peermsgs *) vmsgs; 1346 msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient ); 1347 msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer ); 1348 msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/10), 100 ); 1349 msgs->minActiveRequests = msgs->maxActiveRequests / 2; 1350 return TRUE; 1351 } 1352 1353 static int 1222 1354 pulse( void * vmsgs ) 1223 1355 { … … 1233 1365 tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 ); 1234 1366 } 1367 1368 pumpRequestQueue( msgs ); 1235 1369 1236 1370 if( !canWrite( msgs ) ) … … 1262 1396 peerGotBytes( msgs, r->length ); 1263 1397 1264 dbgmsg( msgs, " putting req into out queue: index %d, offset %d, length %d ... %d blocks left in our queue", (int)r->index, (int)r->offset, (int)r->length, tr_list_size(msgs->peerAskedFor) );1398 dbgmsg( msgs, "Sending block %u:%u->%u (%d blocks left to send)", r->index, r->offset, r->length, tr_list_size(msgs->peerAskedFor) ); 1265 1399 1266 1400 tr_free( r ); … … 1408 1542 flags = tr_bencDictAdd( &val, "added.f" ); 1409 1543 tmp = walk = tr_new( uint8_t, diffs.addedCount ); 1410 for( i=0; i<diffs.addedCount; ++i ) { 1411 dbgmsg( msgs, "PEX -->> -->> flag is %d", (int)diffs.added[i].flags ); 1544 for( i=0; i<diffs.addedCount; ++i ) 1412 1545 *walk++ = diffs.added[i].flags; 1413 }1414 1546 assert( ( walk - tmp ) == diffs.addedCount ); 1415 1547 tr_bencInitStr( flags, tmp, walk-tmp, FALSE ); … … 1478 1610 m->info->have = tr_bitfieldNew( torrent->info.pieceCount ); 1479 1611 m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL ); 1612 m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL ); 1480 1613 m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL ); 1481 1614 m->outMessages = evbuffer_new( ); … … 1502 1635 tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m ); 1503 1636 tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 ); 1637 ratePulse( m ); 1504 1638 1505 1639 /** … … 1548 1682 { 1549 1683 tr_timerFree( &msgs->pulseTimer ); 1684 tr_timerFree( &msgs->rateTimer ); 1550 1685 tr_timerFree( &msgs->pexTimer ); 1551 1686 tr_publisherFree( &msgs->publisher ); 1687 tr_list_free( &msgs->clientWillAskFor, tr_free ); 1552 1688 tr_list_free( &msgs->clientAskedFor, tr_free ); 1553 1689 tr_list_free( &msgs->peerAskedFor, tr_free ); -
trunk/libtransmission/peer-msgs.h
r3290 r3295 66 66 TR_PEERMSG_PEER_PROGRESS, 67 67 TR_PEERMSG_GOT_ERROR, 68 TR_PEERMSG_CANCEL, 68 69 TR_PEERMSG_NEED_REQ 69 70 } -
trunk/libtransmission/tracker.c
r3285 r3295 52 52 53 53 /* the value of the 'numwant' argument passed in tracker requests */ 54 #define NUMWANT 7554 #define NUMWANT 128 55 55 56 56 /* the length of the 'key' argument passed in tracker requests */ -
trunk/libtransmission/transmission.c
r3271 r3295 317 317 tr_runInEventThread( h, tr_closeImpl, h ); 318 318 while( !h->isClosed ) 319 tr_wait( 200 );319 tr_wait( 100 ); 320 320 321 321 tr_eventClose( h ); 322 while( h->events != NULL ) { 323 fprintf( stderr, "waiting for libevent thread to close...\n" ); 324 tr_wait( 200 ); 325 } 322 while( h->events != NULL ) 323 tr_wait( 100 ); 326 324 327 325 tr_lockFree( h->lock ); 328 326 free( h->tag ); 329 327 free( h ); 330 fprintf( stderr, " tr_close() completed.\n" );328 fprintf( stderr, "libtransmission closed cleanly.\n" ); 331 329 } 332 330 -
trunk/libtransmission/utils.c
r3285 r3295 85 85 } 86 86 87 char* 88 tr_getLogTimeStr( char * buf, int buflen ) 89 { 90 char tmp[64]; 91 time_t now; 92 struct tm now_tm; 93 struct timeval tv; 94 int milliseconds; 95 96 now = time( NULL ); 97 gettimeofday( &tv, NULL ); 98 99 localtime_r( &now, &now_tm ); 100 strftime( tmp, sizeof(tmp), "%H:%M:%S", &now_tm ); 101 milliseconds = (int)(tv.tv_usec / 1000); 102 snprintf( buf, buflen, "%s.%03d", tmp, milliseconds ); 103 104 return buf; 105 } 106 87 107 void 88 108 tr_setMessageLevel( int level ) -
trunk/libtransmission/utils.h
r3164 r3295 38 38 void tr_msg ( int level, char * msg, ... ); 39 39 FILE* tr_getLog( void ); 40 41 char* tr_getLogTimeStr( char * buf, int buflen ); 40 42 41 43 int tr_rand ( int );
Note: See TracChangeset
for help on using the changeset viewer.