Changeset 3295


Ignore:
Timestamp:
Oct 6, 2007, 6:20:52 PM (15 years ago)
Author:
charles
Message:
  • add a per-peer request queue to hold the next 10-15 seconds' worth of requests so that we always have more requests at hand when the current requests start to run low.
  • increase the tracker `numwant' variable to grow our peer pool
  • bugfixes in cancelling requests.
  • make the debug log sexy and readable like uTorrent's ;)
Location:
trunk/libtransmission
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • trunk/libtransmission/handshake.c

    r3291 r3295  
    158158    {
    159159        va_list args;
    160         const char * addr = tr_peerIoGetAddrStr( handshake->io );
    161160        struct evbuffer * buf = evbuffer_new( );
    162         evbuffer_add_printf( buf, "[%s:%d] %s (%p) ", file, line, addr, handshake->io );
     161        char timestr[64];
     162        evbuffer_add_printf( buf, "[%s] %s: ",
     163                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
     164                             tr_peerIoGetAddrStr( handshake->io ) );
    163165        va_start( args, fmt );
    164166        evbuffer_add_vprintf( buf, fmt, args );
    165167        va_end( args );
    166         fprintf( stderr, "%s\n", EVBUFFER_DATA(buf) );
     168        evbuffer_add_printf( buf, " (%s:%d)\n", file, line );
     169
     170        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
    167171        evbuffer_free( buf );
    168172    }
     
    581585/* FIXME: use  readHandshake here */
    582586
    583     dbgmsg( handshake, "payload: need %d, got %d\n", (int)HANDSHAKE_SIZE, (int)EVBUFFER_LENGTH(inbuf) );
     587    dbgmsg( handshake, "payload: need %d, got %d", (int)HANDSHAKE_SIZE, (int)EVBUFFER_LENGTH(inbuf) );
    584588
    585589    if( EVBUFFER_LENGTH(inbuf) < HANDSHAKE_SIZE )
     
    975979    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
    976980    ReadState ret;
    977     dbgmsg( handshake, "handling canRead; state is [%s]\n", getStateName(handshake->state) );
     981    dbgmsg( handshake, "handling canRead; state is [%s]", getStateName(handshake->state) );
    978982
    979983    switch( handshake->state )
  • trunk/libtransmission/peer-mgr-private.h

    r3273 r3295  
    5757    tr_publisher_tag msgsTag;
    5858
    59     struct tr_ratecontrol * rateToClient;
    60     struct tr_ratecontrol * rateToPeer;
     59    struct tr_ratecontrol * rcToClient;
     60    struct tr_ratecontrol * rcToPeer;
     61
     62    double rateToClient;
     63    double rateToPeer;
    6164}
    6265tr_peer;
  • trunk/libtransmission/peer-mgr.c

    r3290 r3295  
    4040{
    4141    /* how frequently to change which peers are choked */
    42     RECHOKE_PERIOD_MSEC = (15 * 1000),
     42    RECHOKE_PERIOD_MSEC = (1000),
    4343
    4444    /* how frequently to decide which peers live and die */
     
    4646
    4747    /* how frequently to refill peers' request lists */
    48     REFILL_PERIOD_MSEC = 1500,
     48    REFILL_PERIOD_MSEC = 666,
    4949
    5050    /* don't change a peer's choke status more often than this */
     
    6161    /* how many peers to unchoke per-torrent. */
    6262    /* FIXME: make this user-configurable? */
    63     NUM_UNCHOKED_PEERS_PER_TORRENT = 16, /* arbitrary */
    64 
    65     /* another arbitrary number */
    66     MAX_RECONNECTIONS_PER_MINUTE = 60,
    67 
    68     MAX_RECONNECTIONS_PER_PULSE =
    69         ((MAX_RECONNECTIONS_PER_MINUTE * RECONNECT_PERIOD_MSEC) / (60*1000)),
     63    NUM_UNCHOKED_PEERS_PER_TORRENT = 12, /* arbitrary */
     64
     65    /* set this too high and there will be a lot of churn.
     66     * set it too low and you'll get peers too slowly */
     67    MAX_RECONNECTIONS_PER_PULSE = 8,
    7068
    7169    /* corresponds to ut_pex's added.f flags */
     
    132130        va_list args;
    133131        struct evbuffer * buf = evbuffer_new( );
    134         evbuffer_add_printf( buf, "[%s:%d] %s ", file, line, t->tor->info.name  );
     132        char timestr[64];
     133        evbuffer_add_printf( buf, "[%s] %s: ",
     134                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
     135                             t->tor->info.name );
    135136        va_start( args, fmt );
    136137        evbuffer_add_vprintf( buf, fmt, args );
    137138        va_end( args );
    138         fprintf( fp, "%s\n", EVBUFFER_DATA(buf) );
     139        evbuffer_add_printf( buf, " (%s:%d)\n", file, line );
     140
     141        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
    139142        evbuffer_free( buf );
    140143    }
     
    302305    tr_peer * p;
    303306    p = tr_new0( tr_peer, 1 );
    304     p->rateToClient = tr_rcInit( );
    305     p->rateToPeer = tr_rcInit( );
     307    p->rcToClient = tr_rcInit( );
     308    p->rcToPeer = tr_rcInit( );
    306309    memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
    307310    return p;
     
    339342    tr_bitfieldFree( peer->blame );
    340343    tr_bitfieldFree( peer->banned );
    341     tr_rcClose( peer->rateToClient );
    342     tr_rcClose( peer->rateToPeer );
     344    tr_rcClose( peer->rcToClient );
     345    tr_rcClose( peer->rcToPeer );
    343346    tr_free( peer->client );
    344347    tr_free( peer );
     
    698701
    699702    torrentLock( t );
     703    tordbg( t, "Refilling Request Buffers..." );
    700704
    701705    blocks = getPreferredBlocks( t, &blockCount );
     
    747751
    748752    t->refillTimer = NULL;
    749 
    750753    torrentUnlock( t );
    751754    return FALSE;
     
    796799                                              refillPulse, t,
    797800                                              REFILL_PERIOD_MSEC );
     801            break;
     802
     803        case TR_PEERMSG_CANCEL:
     804            tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) );
    798805            break;
    799806
     
    906913        const struct in_addr * addr = tr_peerIoGetAddress( io,  &port );
    907914        ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
     915        tr_free( peer->client );
     916        peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
    908917        if( peer->msgs != NULL ) { /* we already have this peer */
    909918            tr_peerIoFree( io );
     
    913922            peer->io = io;
    914923            peer->msgs = tr_peerMsgsNew( t->tor, peer, msgsCallbackFunc, t, &peer->msgsTag );
    915             tr_free( peer->client );
    916             peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
    917924        }
    918925    }
     
    12581265        ++setmePeersFrom[atom->from];
    12591266
    1260         if( tr_rcRate( peer->rateToPeer ) > 0.01 )
     1267        if( peer->rateToPeer > 0.01 )
    12611268            ++*setmePeersGettingFromUs;
    12621269
    1263         if( tr_rcRate( peer->rateToClient ) > 0.01 )
     1270        if( peer->rateToClient > 0.01 )
    12641271            ++*setmePeersSendingToUs;
    12651272    }
     
    12981305        stat->progress         = peer->progress;
    12991306        stat->isEncrypted      = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
    1300         stat->uploadToRate     = tr_rcRate( peer->rateToPeer );
    1301         stat->downloadFromRate = tr_rcRate( peer->rateToClient );
     1307        stat->uploadToRate     = peer->rateToPeer;
     1308        stat->downloadFromRate = peer->rateToClient;
    13021309        stat->isDownloading    = stat->uploadToRate > 0.01;
    13031310        stat->isUploading      = stat->downloadFromRate > 0.01;
     
    13561363**/
    13571364
     1365static double
     1366getWeightedThroughput( const tr_peer * peer )
     1367{
     1368    return ( 3 * peer->rateToPeer )
     1369         + ( 1 * peer->rateToClient );
     1370}
     1371
    13581372static void
    13591373rechoke( Torrent * t )
     
    13781392        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
    13791393        node->randomKey = tr_rand( INT_MAX );
    1380         node->rate = (3*tr_rcRate(peer->rateToPeer))
    1381                    + (1*tr_rcRate(peer->rateToClient));
     1394        node->rate = getWeightedThroughput( peer );
    13821395    }
    13831396
     
    14391452        const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
    14401453        const int peerIsSeed = atom->flags & ADDED_F_SEED_FLAG;
    1441         const double throughput = (3*tr_rcRate(peer->rateToPeer))
    1442                                 + (1*tr_rcRate(peer->rateToClient));
     1454        const double throughput = getWeightedThroughput( peer );
    14431455
    14441456        assert( atom != NULL );
     
    15461558        /* add some new ones */
    15471559        nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - peerCount;
    1548         for( i=0; i<nAdd && i<nCandidates; ++i )
     1560        for( i=0; i<nAdd && i<nCandidates && i<MAX_RECONNECTIONS_PER_PULSE; ++i )
     1561        //for( i=0; i<nCandidates; ++i )
    15491562        {
    15501563            tr_peerMgr * mgr = t->manager;
  • trunk/libtransmission/peer-msgs.c

    r3290 r3295  
    6969    KEEPALIVE_INTERVAL_SECS = 90,          /* idle seconds before we send a keepalive */
    7070    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
    71     PEER_PULSE_INTERVAL     = (33),        /* msec between calls to pulse() */
     71    PEER_PULSE_INTERVAL     = (66),        /* msec between calls to pulse() */
     72    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
    7273};
    7374
     
    8889
    8990static int
    90 peer_request_compare( const void * va, const void * vb )
     91compareRequest( const void * va, const void * vb )
    9192{
    9293    struct peer_request * a = (struct peer_request*) va;
     
    112113    tr_list * peerAskedFor;
    113114    tr_list * clientAskedFor;
    114 
     115    tr_list * clientWillAskFor;
     116
     117    tr_timer * rateTimer;
    115118    tr_timer * pulseTimer;
    116119    tr_timer * pexTimer;
     
    131134   
    132135    uint8_t state;
    133 
    134136    uint8_t ut_pex_id;
    135 
    136137    uint16_t pexCount;
    137 
    138138    uint32_t incomingMessageLength;
     139    uint32_t maxActiveRequests;
     140    uint32_t minActiveRequests;
    139141
    140142    tr_pex * pex;
     
    154156    {
    155157        va_list args;
    156         const char * addr = tr_peerIoGetAddrStr( msgs->io );
    157158        struct evbuffer * buf = evbuffer_new( );
    158         evbuffer_add_printf( buf, "[%s:%d] %s (%p) ", file, line, addr, msgs->io );
     159        char timestr[64];
     160        evbuffer_add_printf( buf, "[%s] %s [%s]: ",
     161                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
     162                             tr_peerIoGetAddrStr( msgs->io ),
     163                             msgs->info->client );
    159164        va_start( args, fmt );
    160165        evbuffer_add_vprintf( buf, fmt, args );
    161166        va_end( args );
    162         fprintf( fp, "%s\n", EVBUFFER_DATA(buf) );
     167        evbuffer_add_printf( buf, " (%s:%d)\n", file, line );
     168
     169        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
    163170        evbuffer_free( buf );
    164171    }
    165172}
    166173
    167 #define dbgmsg(handshake, fmt...) myDebug(__FILE__, __LINE__, handshake, ##fmt )
     174#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
     175
     176/**
     177***
     178**/
     179
     180static void
     181protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
     182{
     183    tr_peerIo * io = msgs->io;
     184    struct evbuffer * out = msgs->outMessages;
     185
     186    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
     187    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
     188    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
     189    tr_peerIoWriteUint32( io, out, req->index );
     190    tr_peerIoWriteUint32( io, out, req->offset );
     191    tr_peerIoWriteUint32( io, out, req->length );
     192}
     193
     194static void
     195protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
     196{
     197    tr_peerIo * io = msgs->io;
     198    struct evbuffer * out = msgs->outMessages;
     199
     200    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
     201    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
     202    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
     203    tr_peerIoWriteUint32( io, out, req->index );
     204    tr_peerIoWriteUint32( io, out, req->offset );
     205    tr_peerIoWriteUint32( io, out, req->length );
     206}
     207
     208static void
     209protocolSendHave( tr_peermsgs * msgs, uint32_t index )
     210{
     211    tr_peerIo * io = msgs->io;
     212    struct evbuffer * out = msgs->outMessages;
     213
     214    dbgmsg( msgs, "sending Have %u", index );
     215    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
     216    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
     217    tr_peerIoWriteUint32( io, out, index );
     218}
     219
     220static void
     221protocolSendChoke( tr_peermsgs * msgs, int choke )
     222{
     223    tr_peerIo * io = msgs->io;
     224    struct evbuffer * out = msgs->outMessages;
     225
     226    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
     227    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
     228    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
     229}
    168230
    169231/**
     
    192254    tr_peermsgs_event e = blankEvent;
    193255    e.eventType = TR_PEERMSG_NEED_REQ;
     256    dbgmsg( msgs, "firing NEED_REQ" );
    194257    publish( msgs, &e );
    195258}
     
    221284    e.offset = offset;
    222285    e.length = length;
     286    publish( msgs, &e );
     287}
     288
     289static void
     290fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
     291{
     292    tr_peermsgs_event e = blankEvent;
     293    e.eventType = TR_PEERMSG_CANCEL;
     294    e.pieceIndex = req->index;
     295    e.offset = req->offset;
     296    e.length = req->length;
    223297    publish( msgs, &e );
    224298}
     
    278352
    279353    msgs->info->clientIsInterested = weAreInterested;
    280     dbgmsg( msgs, ": sending an %s message",
    281             weAreInterested ? "INTERESTED" : "NOT_INTERESTED");
     354    dbgmsg( msgs, "Sending %s",
     355            weAreInterested ? "Interested" : "Not Interested");
    282356
    283357    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
     
    322396            }
    323397
    324         dbgmsg( msgs, "sending a %s message", (choke ? "CHOKE" : "UNCHOKE") );
    325         tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
    326         tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
    327                                              choke ? BT_CHOKE : BT_UNCHOKE );
     398        protocolSendChoke( msgs, choke );
    328399        msgs->info->chokeChangedAt = time( NULL );
    329400    }
     
    335406
    336407void
    337 tr_peerMsgsCancel( tr_peermsgs * msgs,
    338                    uint32_t      pieceIndex,
    339                    uint32_t      offset,
    340                    uint32_t      length )
    341 {
    342     tr_list * node;
    343     struct peer_request tmp;
    344 
    345     assert( msgs != NULL );
    346     assert( length > 0 );
    347 
    348     /* have we asked the peer for this piece? */
    349     tmp.index = pieceIndex;
    350     tmp.offset = offset;
    351     tmp.length = length;
    352     node = tr_list_remove( &msgs->clientAskedFor, &tmp, peer_request_compare );
    353 
    354     /* if so, send a cancel message */
    355     if( node != NULL ) {
    356         tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + 3*sizeof(uint32_t) );
    357         tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_CANCEL );
    358         tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
    359         tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
    360         tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
    361         tr_free( node );
    362     }
    363 }
    364 
    365 /**
    366 ***
    367 **/
    368 
    369 void
    370408tr_peerMsgsHave( tr_peermsgs * msgs,
    371                  uint32_t      pieceIndex )
    372 {
    373     dbgmsg( msgs, "w00t telling them we HAVE piece #%d", pieceIndex );
    374 
    375     tr_peerIoWriteUint32( msgs->io, msgs->outMessages,
    376                                sizeof(uint8_t) + sizeof(uint32_t) );
    377     tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_HAVE );
    378     tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
    379 
     409                 uint32_t      index )
     410{
     411    protocolSendHave( msgs, index );
     412
     413    /* since we have more pieces now, we might not be interested in this peer */
    380414    updateInterest( msgs );
    381415}
     
    452486
    453487static int
    454 pulse( void * vmsgs );
    455 
    456 static int
    457488reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length )
    458489{
     
    477508}
    478509
     510static void
     511pumpRequestQueue( tr_peermsgs * msgs )
     512{
     513    const int max = msgs->maxActiveRequests;
     514    const int min = msgs->minActiveRequests;
     515    int count = tr_list_size( msgs->clientAskedFor );
     516    int sent = 0;
     517
     518    if( count > min )
     519        return;
     520    if( msgs->info->clientIsChoked )
     521        return;
     522
     523    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
     524    {
     525        struct peer_request * req = tr_list_pop_front( &msgs->clientWillAskFor );
     526        protocolSendRequest( msgs, req );
     527        req->time_requested = msgs->lastReqAddedAt = time( NULL );
     528        tr_list_append( &msgs->clientAskedFor, req );
     529        ++count;
     530        ++sent;
     531    }
     532
     533    if( sent )
     534        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
     535                sent,
     536                tr_list_size(msgs->clientAskedFor),
     537                tr_list_size(msgs->clientWillAskFor) );
     538
     539    if( count < max )
     540        fireNeedReq( msgs );
     541}
     542
    479543int
    480544tr_peerMsgsAddRequest( tr_peermsgs * msgs,
     
    483547                       uint32_t      length )
    484548{
     549    const int req_max = msgs->maxActiveRequests;
    485550    struct peer_request tmp, *req;
    486     int maxSize;
    487551
    488552    assert( msgs != NULL );
     
    490554    assert( reqIsValid( msgs, index, offset, length ) );
    491555
    492     if( msgs->info->clientIsChoked )
     556    /**
     557    ***  Reasons to decline the request
     558    **/
     559
     560    /* don't send requests to choked clients */
     561    if( msgs->info->clientIsChoked ) {
     562        dbgmsg( msgs, "declining request because they're choking us" );
    493563        return TR_ADDREQ_CLIENT_CHOKED;
    494 
     564    }
     565
     566    /* peer doesn't have this piece */
    495567    if( !tr_bitfieldHas( msgs->info->have, index ) )
    496568        return TR_ADDREQ_MISSING;
    497569
    498     maxSize = MIN( 3 + (int)(tr_rcRate(msgs->info->rateToClient)/5), 100 );
    499     if( tr_list_size( msgs->clientAskedFor) >= maxSize )
     570    /* peer's queue is full */
     571    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
     572        dbgmsg( msgs, "declining request because we're full" );
    500573        return TR_ADDREQ_FULL;
     574    }
    501575
    502576    /* have we already asked for this piece? */
     
    504578    tmp.offset = offset;
    505579    tmp.length = length;
    506     if( tr_list_remove( &msgs->clientAskedFor, &tmp, peer_request_compare ) != NULL )
     580    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
     581        dbgmsg( msgs, "declining because it's a duplicate" );
    507582        return TR_ADDREQ_DUPLICATE;
    508 
    509     dbgmsg( msgs, "w00t peer has a max request queue size of %d... adding request for piece %d, offset %d", maxSize, (int)index, (int)offset );
    510 
    511     /* queue the request */
    512     tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + 3*sizeof(uint32_t) );
    513     tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_REQUEST );
    514     tr_peerIoWriteUint32( msgs->io, msgs->outMessages, index );
    515     tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
    516     tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
    517 
    518     /* add it to our `requests sent' list */
    519     req = tr_new( struct peer_request, 1 );
     583    }
     584    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
     585        dbgmsg( msgs, "declining because it's a duplicate" );
     586        return TR_ADDREQ_DUPLICATE;
     587    }
     588
     589    /**
     590    ***  Accept this request
     591    **/
     592
     593    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
     594    req = tr_new0( struct peer_request, 1 );
    520595    *req = tmp;
    521     req->time_requested = msgs->lastReqAddedAt = time( NULL );
    522     tr_list_append( &msgs->clientAskedFor, req );
    523     pulse( msgs );
    524 
     596    tr_list_append( &msgs->clientWillAskFor, req );
    525597    return TR_ADDREQ_OK;
     598}
     599
     600static void
     601tr_peerMsgsCancelAllRequests( tr_peermsgs * msgs )
     602{
     603    struct peer_request * req;
     604
     605    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
     606    {
     607        fireCancelledReq( msgs, req );
     608        tr_free( req );
     609    }
     610
     611    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
     612    {
     613        fireCancelledReq( msgs, req );
     614        protocolSendCancel( msgs, req );
     615        tr_free( req );
     616    }
     617}
     618
     619void
     620tr_peerMsgsCancel( tr_peermsgs * msgs,
     621                   uint32_t      pieceIndex,
     622                   uint32_t      offset,
     623                   uint32_t      length )
     624{
     625    struct peer_request *req, tmp;
     626
     627    assert( msgs != NULL );
     628    assert( length > 0 );
     629
     630    /* have we asked the peer for this piece? */
     631    tmp.index = pieceIndex;
     632    tmp.offset = offset;
     633    tmp.length = length;
     634
     635    /* if it's only in the queue and hasn't been sent yet, free it */
     636    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
     637    {
     638        fireCancelledReq( msgs, req );
     639        tr_free( req );
     640    }
     641
     642    /* if it's already been sent, send a cancel message too */
     643    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
     644    {
     645        protocolSendCancel( msgs, req );
     646        fireCancelledReq( msgs, req );
     647        tr_free( req );
     648    }
    526649}
    527650
     
    712835
    713836    if( len == 0 ) /* peer sent us a keepalive message */
    714         dbgmsg( msgs, "peer sent us a keepalive message..." );
     837        dbgmsg( msgs, "got KeepAlive" );
    715838    else {
    716         dbgmsg( msgs, "peer is sending us a message with %"PRIu64" bytes...", (uint64_t)len );
    717839        msgs->incomingMessageLength = len;
    718840        msgs->state = AWAITING_BT_MESSAGE;
     
    743865    tr_peerIoReadUint8( msgs->io, inbuf, &id );
    744866    msglen--;
    745     dbgmsg( msgs, "peer sent us a message... "
    746                   "bt id number is %d, and remaining len is %d", (int)id, (int)msglen );
     867    dbgmsg( msgs, "got BT id %d, len %d", (int)id, (int)msglen );
    747868
    748869    switch( id )
    749870    {
    750871        case BT_CHOKE:
    751             dbgmsg( msgs, "w00t peer sent us a BT_CHOKE" );
     872            dbgmsg( msgs, "got Choke" );
    752873            assert( msglen == 0 );
    753874            msgs->info->clientIsChoked = 1;
    754            
     875#if 0
    755876            tr_list * walk;
    756877            for( walk = msgs->peerAskedFor; walk != NULL; )
     
    766887                walk = next;
    767888            }
    768             tr_list_free( &msgs->clientAskedFor, tr_free );
     889#endif
     890            tr_peerMsgsCancelAllRequests( msgs );
    769891            break;
    770892
    771893        case BT_UNCHOKE:
    772             dbgmsg( msgs, "w00t peer sent us a BT_UNCHOKE" );
     894            dbgmsg( msgs, "got Unchoke" );
    773895            assert( msglen == 0 );
    774896            msgs->info->clientIsChoked = 0;
     
    777899
    778900        case BT_INTERESTED:
    779             dbgmsg( msgs, "w00t peer sent us a BT_INTERESTED" );
     901            dbgmsg( msgs, "got Interested" );
    780902            assert( msglen == 0 );
    781903            msgs->info->peerIsInterested = 1;
     
    784906
    785907        case BT_NOT_INTERESTED:
    786             dbgmsg( msgs, "w00t peer sent us a BT_NOT_INTERESTED" );
     908            dbgmsg( msgs, "got Not Interested" );
    787909            assert( msglen == 0 );
    788910            msgs->info->peerIsInterested = 0;
     
    790912
    791913        case BT_HAVE:
    792             dbgmsg( msgs, "w00t peer sent us a BT_HAVE" );
    793914            assert( msglen == 4 );
    794915            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
    795916            tr_bitfieldAdd( msgs->info->have, ui32 );
    796917            updatePeerProgress( msgs );
     918            dbgmsg( msgs, "got Have: %u", ui32 );
    797919            break;
    798920
    799921        case BT_BITFIELD: {
    800922            const int clientIsSeed = tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
    801             dbgmsg( msgs, "w00t peer sent us a BT_BITFIELD" );
    802923            assert( msglen == msgs->info->have->len );
    803924            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
     
    810931        case BT_REQUEST: {
    811932            struct peer_request * req;
    812             dbgmsg( msgs, "peer sent us a BT_REQUEST" );
    813933            assert( msglen == 12 );
    814934            req = tr_new( struct peer_request, 1 );
     
    816936            tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
    817937            tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
     938            dbgmsg( msgs, "got Request: %u:%u->%u", req->index, req->offset, req->length );
    818939           
    819940            if ( !requestIsValid( msgs, req ) )
     
    868989            struct peer_request req;
    869990            void * data;
    870             dbgmsg( msgs, "peer sent us a BT_CANCEL" );
    871991            assert( msglen == 12 );
    872992            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
    873993            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
    874994            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
    875             data = tr_list_remove( &msgs->peerAskedFor, &req, peer_request_compare );
     995            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
     996            data = tr_list_remove( &msgs->peerAskedFor, &req, compareRequest );
    876997            tr_free( data );
    877998            break;
     
    8791000
    8801001        case BT_PIECE: {
    881             dbgmsg( msgs, "peer sent us a BT_PIECE" );
     1002            dbgmsg( msgs, "got a Piece!" );
    8821003            assert( msgs->blockToUs.length == 0 );
    8831004            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
     
    8921013
    8931014        case BT_PORT: {
    894             dbgmsg( msgs, "peer sent us a BT_PORT" );
     1015            dbgmsg( msgs, "Got a BT_PORT" );
    8951016            assert( msglen == 2 );
    8961017            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
     
    9051026        case BT_HAVE_ALL: {
    9061027            assert( msglen == 0 );
    907             dbgmsg( msgs, "peer sent us a BT_HAVE_ALL" );
     1028            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
    9081029            memset( msgs->info->have->bits, 1, msgs->info->have->len );
    9091030            updatePeerProgress( msgs );
     
    9131034        case BT_HAVE_NONE: {
    9141035            assert( msglen == 0 );
    915             dbgmsg( msgs, "peer sent us a BT_HAVE_NONE" );
     1036            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
    9161037            memset( msgs->info->have->bits, 1, msgs->info->have->len );
    9171038            updatePeerProgress( msgs );
     
    9231044            tr_list * node;
    9241045            assert( msglen == 12 );
    925             dbgmsg( msgs, "peer sent us a BT_REJECT" );
     1046            dbgmsg( msgs, "Got a BT_REJECT" );
    9261047            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
    9271048            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
    9281049            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
    929             node = tr_list_find( msgs->peerAskedFor, &req, peer_request_compare );
     1050            node = tr_list_find( msgs->peerAskedFor, &req, compareRequest );
    9301051            if( node != NULL ) {
    9311052                void * data = node->data;
     
    9391060        case BT_ALLOWED_FAST: {
    9401061            assert( msglen == 4 );
    941             dbgmsg( msgs, "peer sent us a BT_ALLOWED_FAST" );
     1062            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
    9421063            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
    9431064            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
     
    9461067           
    9471068        case BT_LTEP:
    948             dbgmsg( msgs, "peer sent us a BT_LTEP" );
     1069            dbgmsg( msgs, "Got a BT_LTEP" );
    9491070            parseLtep( msgs, msglen, inbuf );
    9501071            break;
     
    9681089    tor->downloadedCur += byteCount;
    9691090    msgs->info->pieceDataActivityDate = time( NULL );
    970     tr_rcTransferred( msgs->info->rateToClient, byteCount );
     1091    tr_rcTransferred( msgs->info->rcToClient, byteCount );
    9711092    tr_rcTransferred( tor->download, byteCount );
    9721093    tr_rcTransferred( tor->handle->download, byteCount );
     
    9801101    tor->uploadedCur += byteCount;
    9811102    msgs->info->pieceDataActivityDate = time( NULL );
    982     tr_rcTransferred( msgs->info->rateToPeer, byteCount );
     1103    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
    9831104    tr_rcTransferred( tor->upload, byteCount );
    9841105    tr_rcTransferred( tor->handle->upload, byteCount );
     
    10551176    key.length = length;
    10561177    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
    1057                                                  peer_request_compare );
     1178                                                 compareRequest );
    10581179    if( req == NULL ) {
    10591180        gotUnwantedBlock( msgs, index, offset, length );
     
    10611182        return;
    10621183    }
    1063     dbgmsg( msgs, "w00t peer sent us a block.  turnaround time was %d seconds",
     1184    dbgmsg( msgs, "Got block %u:%u->%u (turnaround time %d secs)",
     1185                     req->index, req->offset, req->length,
    10641186                     (int)(time(NULL) - req->time_requested) );
    10651187    tr_free( req );
     
    10971219
    10981220    fireGotBlock( msgs, index, offset, length );
    1099     fireNeedReq( msgs );
    11001221
    11011222    /**
     
    11421263    if( !msgs->blockToUs.length )
    11431264    {
    1144         dbgmsg( msgs, "w00t -- got block index %u, offset %u", msgs->blockToUs.index, msgs->blockToUs.offset );
     1265        dbgmsg( msgs, "got block %u:%u", msgs->blockToUs.index, msgs->blockToUs.offset );
    11451266        assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
    11461267        gotBlock( msgs, msgs->inBlock,
     
    12201341
    12211342static int
     1343ratePulse( void * vmsgs )
     1344{
     1345    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
     1346    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
     1347    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
     1348    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/10), 100 );
     1349    msgs->minActiveRequests = msgs->maxActiveRequests / 2;
     1350    return TRUE;
     1351}
     1352
     1353static int
    12221354pulse( void * vmsgs )
    12231355{
     
    12331365        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
    12341366    }
     1367
     1368    pumpRequestQueue( msgs );
    12351369
    12361370    if( !canWrite( msgs ) )
     
    12621396            peerGotBytes( msgs, r->length );
    12631397
    1264             dbgmsg( msgs, "putting req into out queue: index %d, offset %d, length %d ... %d blocks left in our queue", (int)r->index, (int)r->offset, (int)r->length, tr_list_size(msgs->peerAskedFor) );
     1398            dbgmsg( msgs, "Sending block %u:%u->%u (%d blocks left to send)", r->index, r->offset, r->length, tr_list_size(msgs->peerAskedFor) );
    12651399
    12661400            tr_free( r );
     
    14081542        flags = tr_bencDictAdd( &val, "added.f" );
    14091543        tmp = walk = tr_new( uint8_t, diffs.addedCount );
    1410         for( i=0; i<diffs.addedCount; ++i ) {
    1411             dbgmsg( msgs, "PEX -->> -->> flag is %d", (int)diffs.added[i].flags );
     1544        for( i=0; i<diffs.addedCount; ++i )
    14121545            *walk++ = diffs.added[i].flags;
    1413         }
    14141546        assert( ( walk - tmp ) == diffs.addedCount );
    14151547        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
     
    14781610    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
    14791611    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
     1612    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
    14801613    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
    14811614    m->outMessages = evbuffer_new( );
     
    15021635    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
    15031636    tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
     1637    ratePulse( m );
    15041638
    15051639    /**
     
    15481682    {
    15491683        tr_timerFree( &msgs->pulseTimer );
     1684        tr_timerFree( &msgs->rateTimer );
    15501685        tr_timerFree( &msgs->pexTimer );
    15511686        tr_publisherFree( &msgs->publisher );
     1687        tr_list_free( &msgs->clientWillAskFor, tr_free );
    15521688        tr_list_free( &msgs->clientAskedFor, tr_free );
    15531689        tr_list_free( &msgs->peerAskedFor, tr_free );
  • trunk/libtransmission/peer-msgs.h

    r3290 r3295  
    6666    TR_PEERMSG_PEER_PROGRESS,
    6767    TR_PEERMSG_GOT_ERROR,
     68    TR_PEERMSG_CANCEL,
    6869    TR_PEERMSG_NEED_REQ
    6970}
  • trunk/libtransmission/tracker.c

    r3285 r3295  
    5252
    5353/* the value of the 'numwant' argument passed in tracker requests */
    54 #define NUMWANT 75
     54#define NUMWANT 128
    5555
    5656/* the length of the 'key' argument passed in tracker requests */
  • trunk/libtransmission/transmission.c

    r3271 r3295  
    317317    tr_runInEventThread( h, tr_closeImpl, h );
    318318    while( !h->isClosed )
    319         tr_wait( 200 );
     319        tr_wait( 100 );
    320320
    321321    tr_eventClose( h );
    322     while( h->events != NULL ) {
    323         fprintf( stderr, "waiting for libevent thread to close...\n" );
    324         tr_wait( 200 );
    325     }
     322    while( h->events != NULL )
     323        tr_wait( 100 );
    326324
    327325    tr_lockFree( h->lock );
    328326    free( h->tag );
    329327    free( h );
    330     fprintf( stderr, "tr_close() completed.\n" );
     328    fprintf( stderr, "libtransmission closed cleanly.\n" );
    331329}
    332330
  • trunk/libtransmission/utils.c

    r3285 r3295  
    8585}
    8686
     87char*
     88tr_getLogTimeStr( char * buf, int buflen )
     89{
     90    char tmp[64];
     91    time_t now;
     92    struct tm now_tm;
     93    struct timeval tv;
     94    int milliseconds;
     95
     96    now = time( NULL );
     97    gettimeofday( &tv, NULL );
     98
     99    localtime_r( &now, &now_tm );
     100    strftime( tmp, sizeof(tmp), "%H:%M:%S", &now_tm );
     101    milliseconds = (int)(tv.tv_usec / 1000);
     102    snprintf( buf, buflen, "%s.%03d", tmp, milliseconds );
     103
     104    return buf;
     105}
     106
    87107void
    88108tr_setMessageLevel( int level )
  • trunk/libtransmission/utils.h

    r3164 r3295  
    3838void tr_msg  ( int level, char * msg, ... );
    3939FILE* tr_getLog( void );
     40
     41char* tr_getLogTimeStr( char * buf, int buflen );
    4042
    4143int  tr_rand ( int );
Note: See TracChangeset for help on using the changeset viewer.