Changeset 3070


Ignore:
Timestamp:
Sep 15, 2007, 2:09:05 PM (14 years ago)
Author:
charles
Message:
  • save a little bit of memory -- we're still using too much.
  • implement mainline's real choking algorithm
  • if peer sends us a pex and our pex wasn't turned on, then turn it on.
Location:
branches/encryption/libtransmission
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • branches/encryption/libtransmission/peer-io.c

    r3061 r3070  
    349349
    350350void
    351 tr_peerIoWriteBuf( tr_peerIo             * io,
    352                    const struct evbuffer * buf )
     351tr_peerIoWriteBuf( tr_peerIo       * io,
     352                   struct evbuffer * buf )
    353353{
    354354    tr_peerIoWrite( io, EVBUFFER_DATA(buf), EVBUFFER_LENGTH(buf) );
     355    evbuffer_drain( buf, ~0 );
    355356}
    356357
  • branches/encryption/libtransmission/peer-io.h

    r2995 r3070  
    113113                     int           writeme_len );
    114114
    115 void tr_peerIoWriteBuf( tr_peerIo             * io,
    116                         const struct evbuffer * buf );
     115void tr_peerIoWriteBuf( tr_peerIo       * io,
     116                        struct evbuffer * buf );
    117117
    118118
  • branches/encryption/libtransmission/peer-mgr-private.h

    r2984 r3070  
    2424typedef struct tr_peer
    2525{
     26    unsigned int  pexEnabled : 1;
     27    unsigned int  peerIsChoked : 1;
     28    unsigned int  peerIsInterested : 1;
     29    unsigned int  clientIsChoked : 1;
     30    unsigned int  clientIsInterested : 1;
     31
    2632    struct in_addr in_addr;
    2733    uint16_t port;
     
    3743    char * client;
    3844
    39     uint64_t lastPexTime;
    40 
    41     unsigned int  pexEnabled : 1;
    42     unsigned int  peerIsChoked : 1;
    43     unsigned int  peerIsInterested : 1;
    44     unsigned int  clientIsChoked : 1;
    45     unsigned int  clientIsInterested : 1;
     45    time_t peerSentDataAt;
    4646
    4747    struct tr_peermsgs * msgs;
  • branches/encryption/libtransmission/peer-mgr.c

    r3065 r3070  
    3232#define RECHOKE_PERIOD_SECONDS (15 * 1000)
    3333
     34#define REFILL_PERIOD_MSEC 1000
     35
    3436/* how many peers to unchoke per-torrent. */
    35 /* FIXME: make this user-configurable */
     37/* FIXME: make this user-configurable? */
    3638#define NUM_UNCHOKED_PEERS_PER_TORRENT 8
    3739
     
    399401            {
    400402                const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
    401                 if( val==TR_ADDREQ_FULL || val==TR_ADDREQ_CLIENT_CHOKED ) {
    402                     fprintf( stderr, "peer %p (of %d) is full or is choking us\n", peers[j]->msgs, size );
    403                     peers[j] = peers[--size];
    404                 }
    405                 else if( val == TR_ADDREQ_MISSING ) {
    406                     ++j;
    407                 }
    408                 else if( val == TR_ADDREQ_OK ) {
    409                     fprintf( stderr, "peer %p took the request for block %d\n", peers[j]->msgs, b );
    410                     incrementReqCount( &t->blocks[i] );
    411                     break;
     403                switch( val )
     404                {
     405                    case TR_ADDREQ_FULL:
     406                    case TR_ADDREQ_CLIENT_CHOKED:
     407                        peers[j] = peers[--size];
     408                        break;
     409
     410                    case TR_ADDREQ_MISSING:
     411                        ++j;
     412                        break;
     413
     414                    case TR_ADDREQ_OK:
     415                        fprintf( stderr, "peer %p took the request for block %d\n", peers[j]->msgs, b );
     416                        incrementReqCount( &t->blocks[i] );
     417                        j = size;
     418                        break;
    412419                }
    413420            }
     
    421428    tr_free( peers );
    422429
    423     /* let the timer expire */
    424     t->refillTimer = NULL;
    425     return FALSE;
    426 }
    427 
    428 static void
    429 ensureRefillTag( Torrent * t )
    430 {
    431     if( t->refillTimer == NULL ) {
    432         t->refillTimer = tr_timerNew( t->manager->handle, refillPulse, t, 250 );
    433         fprintf( stderr, "timer REFILL %p is new\n", t->refillTimer );
    434     }
     430    return TRUE;
    435431}
    436432
     
    514510            break;
    515511
    516         case TR_PEERMSG_BLOCKS_RUNNING_LOW:
    517             ensureRefillTag( t );
    518             break;
    519 
    520512        default:
    521513            assert(0);
     
    634626    }
    635627
    636 fprintf( stderr, "OUTGOING OUTGOING OUTGOING OUTGOING connection trying to connect to the peer...\n" );
    637 
    638628    io = tr_peerIoNewOutgoing( manager->handle,
    639629                               &peer->in_addr,
     
    799789    t->peers = tr_ptrArrayNew( );
    800790    t->chokeTimer = tr_timerNew( manager->handle, chokePulse, t, RECHOKE_PERIOD_SECONDS );
    801 fprintf( stderr, "timer CHOKE %p is new\n", t->chokeTimer );
     791    t->refillTimer = tr_timerNew( t->manager->handle, refillPulse, t, REFILL_PERIOD_MSEC );
    802792
    803793    memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
     
    929919        int i, size;
    930920        tr_peer ** peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
    931         for( i=0; i<size; ++i ) {
     921        for( i=0; i<size; ++i )
    932922            peers[i]->pexEnabled = disable ? 0 : 1;
    933             peers[i]->lastPexTime = 0;
    934         }
    935923
    936924        tor->pexDisabled = disable;
     
    947935    float rate;
    948936    int randomKey;
     937    int preferred;
     938    int doUnchoke;
    949939}
    950940ChokeData;
     
    955945    const ChokeData * a = ( const ChokeData * ) va;
    956946    const ChokeData * b = ( const ChokeData * ) vb;
    957     if( a->rate > b->rate ) return -1;
    958     if( a->rate < b->rate ) return 1;
    959     return a->randomKey - b->randomKey;
     947
     948    if( a->preferred != b->preferred )
     949        return a->preferred ? -1 : 1;
     950
     951    if( a->preferred )
     952    {
     953        if( a->rate > b->rate ) return -1;
     954        if( a->rate < b->rate ) return 1;
     955        return 0;
     956    }
     957    else
     958    {
     959        return a->randomKey - b->randomKey;
     960    }
     961}
     962
     963static int
     964clientIsSnubbedBy( const tr_peer * peer )
     965{
     966    assert( peer != NULL );
     967
     968    return peer->peerSentDataAt < (time(NULL) - 30);
     969}
     970
     971static void
     972rechokeLeech( Torrent * t )
     973{
     974    int i, size, unchoked=0;
     975    tr_peer ** peers = getConnectedPeers( t, &size );
     976    ChokeData * choke = tr_new0( ChokeData, size );
     977
     978    /* sort the peers by preference and rate */
     979    for( i=0; i<size; ++i ) {
     980        tr_peer * peer = peers[i];
     981        ChokeData * node = &choke[i];
     982        node->peer = peer;
     983        node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
     984        node->randomKey = tr_rand( INT_MAX );
     985        node->rate = tr_peerIoGetRateToClient( peer->io );
     986    }
     987    qsort( choke, size, sizeof(ChokeData), compareChoke );
     988
     989    for( i=0; i<size && i<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
     990        choke[i].doUnchoke = 1;
     991        ++unchoked;
     992    }
     993
     994    for( ; i<size; ++i ) {
     995        choke[i].doUnchoke = 1;
     996        ++unchoked;
     997        if( choke[i].peer->peerIsInterested )
     998            break;
     999    }
     1000
     1001    for( i=0; i<size; ++i )
     1002        tr_peerMsgsSetChoke( choke[i].peer->msgs, !choke[i].doUnchoke );
     1003
     1004    /* cleanup */
     1005    tr_free( choke );
     1006    tr_free( peers );
     1007}
     1008
     1009static void
     1010rechokeSeed( Torrent * t )
     1011{
     1012    int i, size;
     1013    tr_peer ** peers = getConnectedPeers( t, &size );
     1014
     1015    /* FIXME */
     1016    for( i=0; i<size; ++i )
     1017        tr_peerMsgsSetChoke( peers[i]->msgs, FALSE );
     1018
     1019    tr_free( peers );
    9601020}
    9611021
     
    9631023chokePulse( void * vtorrent )
    9641024{
    965     Torrent * t = (Torrent *) vtorrent;
    966     int i, size, unchoked;
     1025    Torrent * t = vtorrent;
    9671026    const int done = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
    968     ChokeData * data;
    969     tr_peer ** peers = getConnectedPeers( t, &size );
    970     const time_t now = time( NULL );
    971     int optimistic = FALSE;
    972 
    973 fprintf( stderr, "[%s] rechoking torrent %p, with %d peers\n", ctime(&now), t, size );
    974 
    975     if( size < 1 ) {
    976         tr_free( peers );
    977         return TRUE;
    978     }
    979 
    980     data = tr_new( ChokeData, size );
    981     for( i=0; i<size; ++i ) {
    982         data[i].peer = peers[i];
    983         data[i].randomKey = tr_rand( INT_MAX );
    984         data[i].rate = done ? tr_peerIoGetRateToPeer( peers[i]->io )
    985                             : tr_peerIoGetRateToClient( peers[i]->io );
    986     }
    987 
    988     /* find the best peers and unchoke them */
    989     qsort( data, size, sizeof(ChokeData), compareChoke );
    990     for( i=unchoked=0; i<size && unchoked<NUM_UNCHOKED_PEERS_PER_TORRENT; ++i ) {
    991         tr_peerMsgsSetChoke( data[i].peer->msgs, FALSE );
    992         ++unchoked;
    993     }
    994     memmove( data, data+i, sizeof(ChokeData)*(size-i) );
    995     size -= i;
    996 
    997     /* of those remaining, optimistically the first interested one
    998        (the were randomized in qsort already) and choke the rest */
    999     for( i=0; i<size; ++i ) {
    1000         if( optimistic || !data[i].peer->peerIsInterested )
    1001             tr_peerMsgsSetChoke( data[i].peer->msgs, TRUE );
    1002         else {
    1003             optimistic = TRUE;
    1004             tr_peerMsgsSetChoke( data[i].peer->msgs, FALSE );
    1005         }
    1006     }
    1007 
    1008     /* cleanup */
    1009     tr_free( data );
    1010     tr_free( peers );
     1027    if( done )
     1028        rechokeLeech( vtorrent );
     1029    else
     1030        rechokeSeed( vtorrent );
    10111031    return TRUE;
    10121032}
  • branches/encryption/libtransmission/peer-msgs.c

    r3065 r3070  
    9191struct tr_peermsgs
    9292{
     93    uint8_t state;
     94
    9395    tr_peer * info;
    9496
     
    110112    unsigned int  notListening        : 1;
    111113
    112     struct peer_request blockToUs;
    113 
    114     int state;
     114    struct peer_request blockToUs; /* the block currntly being sent to us */
    115115
    116116    uint32_t incomingMessageLength;
    117117
    118     uint64_t gotKeepAliveTime;
    119 
    120     uint8_t ut_pex;
     118    time_t gotKeepAliveTime;
     119    time_t clientSentPexAt;
     120
     121    uint8_t ut_pex_id;
    121122    uint16_t listeningPort;
    122123
     124    uint16_t pexCount;
    123125    tr_pex * pex;
    124     int pexCount;
    125126};
    126127
     
    190191{
    191192    publishEvent( peer, TR_PEERMSG_GOT_ERROR );
    192 }
    193 
    194 static void
    195 fireBlocksRunningLow( tr_peermsgs * peer )
    196 {
    197     publishEvent( peer, TR_PEERMSG_BLOCKS_RUNNING_LOW );
    198193}
    199194
     
    219214
    220215static int
    221 isPeerInteresting( const tr_peermsgs * peer )
    222 {
    223     int i;
    224     const tr_torrent * torrent = peer->torrent;
    225     const tr_bitfield * bitfield = tr_cpPieceBitfield( torrent->completion );
    226 
    227     if( !peer->info->have ) /* We don't know what this peer has */
     216isPeerInteresting( const tr_peermsgs * msgs )
     217{
     218    const int clientIsSeed = tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
     219    const int peerIsSeed = msgs->info->progress >= 1.0;
     220
     221    if( peerIsSeed )
     222    {
     223        return !clientIsSeed;
     224    }
     225    else if( clientIsSeed )
     226    {
     227        return !peerIsSeed;
     228    }
     229    else /* we're both leeches... */
     230    {
     231        int i;
     232        const tr_torrent * torrent = msgs->torrent;
     233        const tr_bitfield * bitfield = tr_cpPieceBitfield( torrent->completion );
     234
     235        if( !msgs->info->have ) /* We don't know what this peer has... what should this be? */
     236            return TRUE;
     237
     238        assert( bitfield->len == msgs->info->have->len );
     239        for( i=0; i<torrent->info.pieceCount; ++i )
     240            if( isPieceInteresting( msgs, i ) )
     241                return TRUE;
     242
    228243        return FALSE;
    229 
    230     assert( bitfield->len == peer->info->have->len );
    231 
    232     for( i=0; i<torrent->info.pieceCount; ++i )
    233         if( isPieceInteresting( peer, i ) )
    234             return TRUE;
    235 
    236     return FALSE;
    237 }
    238 
    239 static void
    240 sendInterest( tr_peermsgs * peer, int weAreInterested )
     244    }
     245}
     246
     247static void
     248sendInterest( tr_peermsgs * msgs, int weAreInterested )
    241249{
    242250    const uint32_t len = sizeof(uint8_t);
    243251    const uint8_t bt_msgid = weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED;
    244252
    245     fprintf( stderr, "peer %p: sending an %s message\n", peer, (weAreInterested ? "INTERESTED" : "NOT_INTERESTED") );
    246     tr_peerIoWriteUint32( peer->io, peer->outMessages, len );
    247     tr_peerIoWriteBytes( peer->io, peer->outMessages, &bt_msgid, 1 );
    248 }
    249 
    250 static void
    251 updateInterest( tr_peermsgs * peer )
    252 {
    253     const int i = isPeerInteresting( peer );
    254     if( i != peer->info->clientIsInterested )
    255         sendInterest( peer, i );
     253    fprintf( stderr, "peer %p: sending an %s message\n", msgs, (weAreInterested ? "INTERESTED" : "NOT_INTERESTED") );
     254    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
     255    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
     256}
     257
     258static void
     259updateInterest( tr_peermsgs * msgs )
     260{
     261    const int i = isPeerInteresting( msgs );
     262    if( i != msgs->info->clientIsInterested )
     263        sendInterest( msgs, i );
    256264}
    257265
    258266void
    259 tr_peerMsgsSetChoke( tr_peermsgs * peer, int choke )
    260 {
    261     assert( peer != NULL );
    262     assert( peer->info != NULL );
    263 
    264     if( peer->info->peerIsChoked != !!choke )
     267tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
     268{
     269    assert( msgs != NULL );
     270    assert( msgs->info != NULL );
     271
     272    if( msgs->info->peerIsChoked != !!choke )
    265273    {
    266274        const uint32_t len = sizeof(uint8_t);
    267275        const uint8_t bt_msgid = choke ? BT_CHOKE : BT_UNCHOKE;
    268276
    269         peer->info->peerIsChoked = choke ? 1 : 0;
    270         if( peer->info )
     277        msgs->info->peerIsChoked = choke ? 1 : 0;
     278        if( msgs->info )
    271279        {
    272             tr_list_foreach( peer->peerAskedFor, tr_free );
    273             tr_list_free( &peer->peerAskedFor );
     280            tr_list_foreach( msgs->peerAskedFor, tr_free );
     281            tr_list_free( &msgs->peerAskedFor );
    274282        }
    275283
    276         fprintf( stderr, "peer %p: sending a %s message\n", peer, (choke ? "CHOKE" : "UNCHOKE") );
    277         tr_peerIoWriteUint32( peer->io, peer->outMessages, len );
    278         tr_peerIoWriteBytes( peer->io, peer->outMessages, &bt_msgid, 1 );
     284        fprintf( stderr, "peer %p: sending a %s message\n", msgs, (choke ? "CHOKE" : "UNCHOKE") );
     285        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
     286        tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
    279287    }
    280288}
     
    306314        const uint8_t bt_msgid = BT_CANCEL;
    307315        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
    308         fprintf( stderr, "w00t peer %p: cancelling req for piece %u, offset %u\n", msgs, (unsigned)pieceIndex, (unsigned)offset );
    309316        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
    310317        tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
     
    332339    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
    333340    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
     341
     342    updateInterest( msgs );
    334343}
    335344
     
    355364        return TR_ADDREQ_MISSING;
    356365
    357     maxSize = 2 + (int)(tr_peerIoGetRateToClient(msgs->io)/10);
    358     if( maxSize > 100 )
    359         maxSize = 100;
    360     fprintf( stderr, "peer %p has a max request queue size of %d\n", msgs, maxSize );
     366    maxSize = MIN( 2 + (int)(tr_peerIoGetRateToClient(msgs->io)/10), 100 );
    361367    if( tr_list_size( msgs->clientAskedFor) >= maxSize )
    362368        return TR_ADDREQ_FULL;
     369    fprintf( stderr, "w00t peer %p has a max request queue size of %d\n", msgs, maxSize );
    363370
    364371    /* queue the request */
     
    368375    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
    369376    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
    370     fprintf( stderr, "w00t peer %p: requesting a block from piece %u, offset %u, length %u\n",
    371              msgs, (unsigned int)index, (unsigned int)offset, (unsigned int)length );
    372377
    373378    /* add it to our `requests sent' list */
     
    378383    req->time_requested = time( NULL );
    379384    tr_list_append( &msgs->clientAskedFor, req );
    380     fprintf( stderr, "w00t added a request; peer %p's clientAskedFor.size() is now %d\n", msgs, tr_list_size(msgs->clientAskedFor));
    381385
    382386    return TR_ADDREQ_OK;
     
    407411        sub = tr_bencDictFind( sub, "ut_pex" );
    408412        if( tr_bencIsInt( sub ) ) {
    409             peer->ut_pex = (uint8_t) sub->val.i;
    410             fprintf( stderr, "peer->ut_pex is %d\n", peer->ut_pex );
     413            peer->ut_pex_id = (uint8_t) sub->val.i;
     414            fprintf( stderr, "peer->ut_pex is %d\n", (int)peer->ut_pex_id );
    411415        }
    412416    }
     
    470474
    471475static void
    472 parseLtep( tr_peermsgs * peer, int msglen, struct evbuffer * inbuf )
     476sendPex( tr_peermsgs * msgs );
     477
     478static void
     479parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
    473480{
    474481    uint8_t ltep_msgid;
    475482
    476     tr_peerIoReadBytes( peer->io, inbuf, &ltep_msgid, 1 );
     483    tr_peerIoReadBytes( msgs->io, inbuf, &ltep_msgid, 1 );
    477484    msglen--;
    478485
     
    480487    {
    481488        fprintf( stderr, "got ltep handshake\n" );
    482         parseLtepHandshake( peer, msglen, inbuf );
    483     }
    484     else if( ltep_msgid == peer->ut_pex )
     489        parseLtepHandshake( msgs, msglen, inbuf );
     490    }
     491    else if( ltep_msgid == msgs->ut_pex_id )
    485492    {
    486493        fprintf( stderr, "got ut pex\n" );
    487         parseUtPex( peer, msglen, inbuf );
     494        msgs->info->pexEnabled = 1;
     495        parseUtPex( msgs, msglen, inbuf );
     496        //sendPex( msgs );
    488497    }
    489498    else
     
    495504
    496505static int
    497 readBtLength( tr_peermsgs * peer, struct evbuffer * inbuf )
     506readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
    498507{
    499508    uint32_t len;
     
    503512        return READ_MORE;
    504513
    505     tr_peerIoReadUint32( peer->io, inbuf, &len );
     514    tr_peerIoReadUint32( msgs->io, inbuf, &len );
    506515
    507516    if( len == 0 ) { /* peer sent us a keepalive message */
    508         fprintf( stderr, "peer sent us a keepalive message...\n" );
    509         peer->gotKeepAliveTime = tr_date( );
     517        fprintf( stderr, "peer %p sent us a keepalive message...\n", msgs );
     518        msgs->gotKeepAliveTime = time( NULL );
    510519    } else {
    511         fprintf( stderr, "peer is sending us a message with %"PRIu64" bytes...\n", (uint64_t)len );
    512         peer->incomingMessageLength = len;
    513         fprintf( stderr, "peer is sending us a message with %"PRIu64" bytes...\n", (uint64_t)peer->incomingMessageLength );
    514         peer->state = AWAITING_BT_MESSAGE;
     520        fprintf( stderr, "peer %p is sending us a message with %"PRIu64" bytes...\n", msgs, (uint64_t)len );
     521        msgs->incomingMessageLength = len;
     522        msgs->state = AWAITING_BT_MESSAGE;
    515523    } return READ_AGAIN;
    516524}
     
    528536    tr_peerIoReadBytes( msgs->io, inbuf, &id, 1 );
    529537    msglen--;
    530     fprintf( stderr, "got a message from the peer... "
    531                      "bt id number is %d, and remaining len is %d\n", (int)id, (int)msglen );
     538    fprintf( stderr, "peer %p sent us a message... "
     539                     "bt id number is %d, and remaining len is %d\n", msgs, (int)id, (int)msglen );
    532540
    533541    switch( id )
     
    596604            tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
    597605            if( !msgs->info->peerIsChoked )
    598                 tr_list_prepend( &msgs->peerAskedFor, req );
     606                tr_list_append( &msgs->peerAskedFor, req );
    599607            break;
    600608        }
     
    624632            msgs->blockToUs.length = msglen - 8;
    625633            assert( msgs->blockToUs.length > 0 );
    626             evbuffer_drain( msgs->inBlock, ~0 );
     634            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
     635            //evbuffer_drain( msgs->inBlock, ~0 );
    627636            return READ_AGAIN;
    628637            break;
     
    787796    fireGotBlock( msgs, index, offset, length );
    788797
    789     fireBlocksRunningLow( msgs );
    790 
    791798    /**
    792799    ***  Handle if this was the last block in the piece
     
    827834        msgs->blockToUs.length -= len;
    828835        clientGotBytes( msgs, len );
     836        msgs->info->peerSentDataAt = time( NULL );
    829837fprintf( stderr, "got %"PRIu64"; left to read is [%"PRIu64"]\n", (uint64_t)len, (uint64_t)msgs->blockToUs.length );
    830838
     
    887895    /* if we froze out a downloaded block because of speed limits,
    888896       start listening to the peer again */
     897#if 0
    889898    if( msgs->notListening )
    890899    {
     
    893902        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
    894903    }
     904#endif
    895905
    896906    if(( len = EVBUFFER_LENGTH( msgs->outBlock ) ))
     
    908918    {
    909919        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
    910         evbuffer_drain( msgs->outMessages, ~0 );
    911920    }
    912921    else if(( msgs->peerAskedFor ))
     
    925934    }
    926935
    927     fireBlocksRunningLow( msgs );
    928 
    929936    return TRUE; /* loop forever */
    930937}
     
    10071014}
    10081015
    1009 static int
    1010 pexPulse( void * vpeer )
    1011 {
    1012     tr_peermsgs * peer = (tr_peermsgs *) vpeer;
    1013 
    1014     if( peer->info->pexEnabled )
     1016static void
     1017sendPex( tr_peermsgs * msgs )
     1018{
     1019    if( msgs->info->pexEnabled )
    10151020    {
    10161021        int i;
    10171022        tr_pex * newPex = NULL;
    1018         const int newCount = tr_peerMgrGetPeers( peer->handle->peerMgr, peer->torrent->info.hash, &newPex );
     1023        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
    10191024        PexDiffs diffs;
    10201025        benc_val_t val, *added, *dropped, *flags;
     
    10231028        int bencLen;
    10241029        const uint8_t bt_msgid = BT_LTEP;
    1025         const uint8_t ltep_msgid = peer->ut_pex;
     1030        const uint8_t ltep_msgid = msgs->ut_pex_id;
    10261031
    10271032        /* build the diffs */
    10281033        diffs.added = tr_new( tr_pex, newCount );
    10291034        diffs.addedCount = 0;
    1030         diffs.dropped = tr_new( tr_pex, peer->pexCount );
     1035        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
    10311036        diffs.droppedCount = 0;
    1032         diffs.elements = tr_new( tr_pex, newCount + peer->pexCount );
     1037        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
    10331038        diffs.elementCount = 0;
    10341039        diffs.diffCount = 0;
    1035         tr_set_compare( peer->pex, peer->pexCount,
     1040        tr_set_compare( msgs->pex, msgs->pexCount,
    10361041                        newPex, newCount,
    10371042                        tr_pexCompare, sizeof(tr_pex),
    10381043                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
    1039         fprintf( stderr, "pex: old peer count %d, new peer count %d, added %d, removed %d\n", peer->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
     1044        fprintf( stderr, "pex: old peer count %d, new peer count %d, added %d, removed %d\n", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
    10401045
    10411046        /* update peer */
    1042         tr_free( peer->pex );
    1043         peer->pex = diffs.elements;
    1044         peer->pexCount = diffs.elementCount;
     1047        tr_free( msgs->pex );
     1048        msgs->pex = diffs.elements;
     1049        msgs->pexCount = diffs.elementCount;
    10451050
    10461051        /* build the pex payload */
     
    10781083        /* write the pex message */
    10791084        benc = tr_bencSaveMalloc( &val, &bencLen );
    1080         tr_peerIoWriteUint32( peer->io, peer->outMessages, 1 + 1 + bencLen );
    1081         tr_peerIoWriteBytes ( peer->io, peer->outMessages, &bt_msgid, 1 );
    1082         tr_peerIoWriteBytes ( peer->io, peer->outMessages, &ltep_msgid, 1 );
    1083         tr_peerIoWriteBytes ( peer->io, peer->outMessages, benc, bencLen );
     1085        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 1 + 1 + bencLen );
     1086        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, &bt_msgid, 1 );
     1087        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, &ltep_msgid, 1 );
     1088        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
    10841089
    10851090        /* cleanup */
     
    10891094        tr_free( diffs.dropped );
    10901095        tr_free( newPex );
    1091     }
    1092 
     1096
     1097        msgs->clientSentPexAt = time( NULL );
     1098    }
     1099}
     1100
     1101static int
     1102pexPulse( void * vpeer )
     1103{
     1104    sendPex( vpeer );
    10931105    return TRUE;
    10941106}
  • branches/encryption/libtransmission/peer-msgs.h

    r3065 r3070  
    6262    TR_PEERMSG_CLIENT_BLOCK,
    6363    TR_PEERMSG_GOT_PEX,
    64     TR_PEERMSG_GOT_ERROR,
    65     TR_PEERMSG_BLOCKS_RUNNING_LOW,
     64    TR_PEERMSG_GOT_ERROR
    6665}
    6766PeerMsgsEventType;
Note: See TracChangeset for help on using the changeset viewer.