Changeset 7824


Ignore:
Timestamp:
Feb 4, 2009, 4:58:52 PM (12 years ago)
Author:
charles
Message:

(trunk libT) #1748: possible fix for the kqueue corruption errors by consolidating the three per-torrent libevent timers into three session-wide timers. Since most people reporting this error have lots of torrents loaded, consider a hypothetical example: if you had 500 torrents, this patch will reduce 1,500 libevent timers down to just three timers. On top of that, those three have simpler life cycles too...

Location:
trunk/libtransmission
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • trunk/libtransmission/peer-common.h

    r7658 r7824  
    4848    TR_PEER_ERROR,
    4949    TR_PEER_CANCEL,
    50     TR_PEER_UPLOAD_ONLY,
    51     TR_PEER_NEED_REQ
     50    TR_PEER_UPLOAD_ONLY
    5251}
    5352PeerEventType;
  • trunk/libtransmission/peer-mgr.c

    r7792 r7824  
    1 
    21/*
    32 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
     
    4847
    4948    /* minimum interval for refilling peers' request lists */
    50     REFILL_PERIOD_MSEC = 333,
    51 
    52     /* when many peers are available, keep idle ones this long */
    53     MIN_UPLOAD_IDLE_SECS = ( 30 ),
    54 
    55     /* when few peers are available, keep idle ones this long */
    56     MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
    57 
    58     /* how frequently to decide which peers live and die */
    59     RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
     49    REFILL_PERIOD_MSEC = 400,
    6050   
    6151    /* how frequently to reallocate bandwidth */
    6252    BANDWIDTH_PERIOD_MSEC = 500,
    6353
     54    /* how frequently to decide which peers live and die */
     55    RECONNECT_PERIOD_MSEC = 500,
     56
     57    /* when many peers are available, keep idle ones this long */
     58    MIN_UPLOAD_IDLE_SECS = ( 30 ),
     59
     60    /* when few peers are available, keep idle ones this long */
     61    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
     62
    6463    /* max # of peers to ask fer per torrent per reconnect pulse */
    6564    MAX_RECONNECTIONS_PER_PULSE = 16,
     
    7574    MYFLAG_BANNED = 1,
    7675
     76    /* use for bitwise operations w/peer_atom.myflags */
    7777    /* unreachable for now... but not banned.
    7878     * if they try to connect to us it's okay */
     
    127127    tr_ptrArray     peers; /* tr_peer */
    128128    tr_ptrArray     webseeds; /* tr_webseed */
    129     tr_timer *      reconnectTimer;
    130     tr_timer *      rechokeTimer;
    131     tr_timer *      refillTimer;
    132129    tr_torrent *    tor;
    133130    tr_peer *       optimistic; /* the optimistic peer, or NULL if none */
     
    142139    tr_ptrArray       incomingHandshakes; /* tr_handshake */
    143140    tr_timer        * bandwidthTimer;
     141    tr_timer        * rechokeTimer;
     142    tr_timer        * refillTimer;
     143    tr_timer        * reconnectTimer;
    144144};
    145145
     
    380380
    381381    memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
    382 
    383     tr_timerFree( &t->reconnectTimer );
    384     tr_timerFree( &t->rechokeTimer );
    385     tr_timerFree( &t->refillTimer );
    386382
    387383    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
     
    425421
    426422
    427 static int bandwidthPulse( void * vmgr );
    428 
     423static int bandwidthPulse ( void * vmgr );
     424static int rechokePulse   ( void * vmgr );
     425static int refillPulse    ( void * vmgr );
     426static int reconnectPulse ( void * vmgr );
    429427
    430428tr_peerMgr*
     
    436434    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
    437435    m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
     436    m->rechokeTimer   = tr_timerNew( session, rechokePulse,   m, RECHOKE_PERIOD_MSEC );
     437    m->refillTimer    = tr_timerNew( session, refillPulse,    m, REFILL_PERIOD_MSEC );
     438    m->reconnectTimer = tr_timerNew( session, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
     439
     440    rechokePulse( m );
     441
    438442    return m;
    439443}
     
    444448    managerLock( manager );
    445449
     450    tr_timerFree( &manager->reconnectTimer );
     451    tr_timerFree( &manager->refillTimer );
     452    tr_timerFree( &manager->rechokeTimer );
    446453    tr_timerFree( &manager->bandwidthTimer );
    447454
     
    728735}
    729736
    730 static int
    731 refillPulse( void * vtorrent )
     737static void
     738refillTorrent( Torrent * t )
    732739{
    733740    tr_block_index_t block;
     
    737744    tr_webseed ** webseeds;
    738745    struct tr_blockIterator * blockIterator;
    739     Torrent * t = vtorrent;
    740746    tr_torrent * tor = t->tor;
    741747
    742748    if( !t->isRunning )
    743         return TRUE;
     749        return;
    744750    if( tr_torrentIsSeed( t->tor ) )
    745         return TRUE;
    746 
    747     torrentLock( t );
     751        return;
     752
    748753    tordbg( t, "Refilling Request Buffers..." );
    749754
     
    817822    tr_free( webseeds );
    818823    tr_free( peers );
    819 
    820     t->refillTimer = NULL;
    821     torrentUnlock( t );
    822     return FALSE;
     824}
     825
     826static int
     827refillPulse( void * vmgr )
     828{
     829    tr_torrent * tor = NULL;
     830    tr_peerMgr * mgr = vmgr;
     831    managerLock( mgr );
     832
     833    while(( tor = tr_torrentNext( mgr->session, tor )))
     834        if( tor->isRunning && !tr_torrentIsSeed( tor ) )
     835            refillTorrent( tor->torrentPeers );
     836
     837    managerUnlock( mgr );
     838    return TRUE;
    823839}
    824840
     
    867883    tor->corruptCur += byteCount;
    868884    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
    869 }
    870 
    871 static void
    872 refillSoon( Torrent * t )
    873 {
    874     if( t->refillTimer == NULL )
    875         t->refillTimer = tr_timerNew( t->manager->session,
    876                                       refillPulse, t,
    877                                       REFILL_PERIOD_MSEC );
    878885}
    879886
     
    943950                a->uploadOnly = e->uploadOnly ? UPLOAD_ONLY_YES : UPLOAD_ONLY_NO;
    944951            }
    945             break;
    946 
    947         case TR_PEER_NEED_REQ:
    948             refillSoon( t );
    949952            break;
    950953
     
    15071510}
    15081511
    1509 static int reconnectPulse( void * vtorrent );
    1510 
    1511 static int rechokePulse( void * vtorrent );
    1512 
    15131512void
    15141513tr_peerMgrStartTorrent( tr_torrent * tor )
    15151514{
    1516     Torrent * t = tor->torrentPeers;
    1517 
    1518     managerLock( t->manager );
    1519 
    1520     assert( t );
    1521     assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
    1522     assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
    1523 
    1524     if( !t->isRunning )
    1525     {
    1526         t->isRunning = 1;
    1527 
    1528         t->reconnectTimer = tr_timerNew( t->manager->session,
    1529                                          reconnectPulse, t,
    1530                                          RECONNECT_PERIOD_MSEC );
    1531 
    1532         t->rechokeTimer = tr_timerNew( t->manager->session,
    1533                                        rechokePulse, t,
    1534                                        RECHOKE_PERIOD_MSEC );
    1535 
    1536         reconnectPulse( t );
    1537 
    1538         rechokePulse( t );
    1539 
    1540         if( !tr_ptrArrayEmpty( &t->webseeds ) )
    1541             refillSoon( t );
    1542     }
    1543 
    1544     managerUnlock( t->manager );
     1515    tor->torrentPeers->isRunning = TRUE;
    15451516}
    15461517
     
    15501521    assert( torrentIsLocked( t ) );
    15511522
    1552     t->isRunning = 0;
    1553     tr_timerFree( &t->rechokeTimer );
    1554     tr_timerFree( &t->reconnectTimer );
     1523    t->isRunning = FALSE;
    15551524
    15561525    /* disconnect the peers. */
     
    18681837
    18691838static void
    1870 rechoke( Torrent * t )
     1839rechokeTorrent( Torrent * t )
    18711840{
    18721841    int i, size, unchokedInterested;
     
    19681937
    19691938static int
    1970 rechokePulse( void * vtorrent )
    1971 {
    1972     Torrent * t = vtorrent;
    1973 
    1974     torrentLock( t );
    1975     rechoke( t );
    1976     torrentUnlock( t );
     1939rechokePulse( void * vmgr )
     1940{
     1941    tr_torrent * tor = NULL;
     1942    tr_peerMgr * mgr = vmgr;
     1943    managerLock( mgr );
     1944
     1945    while(( tor = tr_torrentNext( mgr->session, tor )))
     1946        if( tor->isRunning )
     1947            rechokeTorrent( tor->torrentPeers );
     1948
     1949    managerUnlock( mgr );
    19771950    return TRUE;
    19781951}
     
    22162189}
    22172190
    2218 static int
    2219 reconnectPulse( void * vtorrent )
    2220 {
    2221     Torrent *     t = vtorrent;
     2191static void
     2192reconnectTorrent( Torrent * t )
     2193{
    22222194    static time_t prevTime = 0;
    22232195    static int    newConnectionsThisSecond = 0;
    22242196    time_t        now;
    2225 
    2226     torrentLock( t );
    22272197
    22282198    now = time( NULL );
     
    23272297        tr_free( canClose );
    23282298    }
    2329 
    2330     torrentUnlock( t );
     2299}
     2300
     2301static int
     2302reconnectPulse( void * vmgr )
     2303{
     2304    tr_torrent * tor = NULL;
     2305    tr_peerMgr * mgr = vmgr;
     2306    managerLock( mgr );
     2307
     2308    while(( tor = tr_torrentNext( mgr->session, tor )))
     2309        if( tor->isRunning )
     2310            reconnectTorrent( tor->torrentPeers );
     2311
     2312    managerUnlock( mgr );
    23312313    return TRUE;
    23322314}
  • trunk/libtransmission/peer-msgs.c

    r7769 r7824  
    421421
    422422static void
    423 fireNeedReq( tr_peermsgs * msgs )
    424 {
    425     tr_peer_event e = blankEvent;
    426     e.eventType = TR_PEER_NEED_REQ;
    427     publish( msgs, &e );
    428 }
    429 
    430 static void
    431423firePeerProgress( tr_peermsgs * msgs )
    432424{
     
    654646    if( i != msgs->peer->clientIsInterested )
    655647        sendInterest( msgs, i );
    656     if( i )
    657         fireNeedReq( msgs );
    658648}
    659649
     
    820810        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
    821811                sent, msgs->clientAskedFor.len, msgs->clientWillAskFor.len );
    822 
    823     if( len < max )
    824         fireNeedReq( msgs );
    825812}
    826813
     
    13671354            dbgmsg( msgs, "got Unchoke" );
    13681355            msgs->peer->clientIsChoked = 0;
    1369             fireNeedReq( msgs );
    13701356            break;
    13711357
     
    13931379
    13941380        case BT_BITFIELD:
    1395         {
    13961381            dbgmsg( msgs, "got a bitfield" );
    13971382            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
    13981383            updatePeerProgress( msgs );
    1399             fireNeedReq( msgs );
    14001384            break;
    1401         }
    14021385
    14031386        case BT_REQUEST:
  • trunk/libtransmission/session.c

    r7812 r7824  
    360360static void tr_sessionInitImpl( void * );
    361361
     362struct init_data
     363{
     364    tr_session  * session;
     365    const char  * configDir;
     366    tr_bool       messageQueuingEnabled;
     367    tr_benc     * clientSettings;
     368};
     369
    362370tr_session *
    363371tr_sessionInit( const char  * tag,
     
    366374                tr_benc     * clientSettings )
    367375{
     376    tr_session * session;
     377    struct init_data data;
     378
     379    assert( tr_bencIsDict( clientSettings ) );
     380
     381    /* initialize the bare skeleton of the session object */
     382    session = tr_new0( tr_session, 1 );
     383    session->bandwidth = tr_bandwidthNew( session, NULL );
     384    session->lock = tr_lockNew( );
     385    session->tag = tr_strdup( tag );
     386    session->magicNumber = SESSION_MAGIC_NUMBER;
     387
     388    /* start the libtransmission thread */
     389    tr_netInit( ); /* must go before tr_eventInit */
     390    tr_eventInit( session );
     391    assert( session->events != NULL );
     392
     393    /* run the rest in the libtransmission thread */
     394    session->isWaiting = TRUE;
     395    data.session = session;
     396    data.configDir = configDir;
     397    data.messageQueuingEnabled = messageQueuingEnabled;
     398    data.clientSettings = clientSettings;
     399    tr_runInEventThread( session, tr_sessionInitImpl, &data );
     400    while( session->isWaiting )
     401        tr_wait( 100 );
     402
     403    return session;
     404}
     405
     406static void
     407tr_sessionInitImpl( void * vdata )
     408{
    368409    int64_t i;
    369410    int64_t j;
     
    371412    const char * str;
    372413    tr_benc settings;
    373     tr_session * session;
    374414    char * filename;
    375 
     415    struct init_data * data = vdata;
     416    tr_benc * clientSettings = data->clientSettings;
     417    tr_session * session = data->session;
     418
     419    assert( tr_amInEventThread( session ) );
    376420    assert( tr_bencIsDict( clientSettings ) );
    377 
    378     session = tr_new0( tr_session, 1 );
    379     session->bandwidth = tr_bandwidthNew( session, NULL );
    380     session->lock = tr_lockNew( );
    381     session->tag = tr_strdup( tag );
    382     session->magicNumber = SESSION_MAGIC_NUMBER;
    383421
    384422    dbgmsg( "tr_sessionInit: the session's top-level bandwidth object is %p", session->bandwidth );
     
    400438    assert( found );
    401439    tr_setMessageLevel( i );
    402     tr_setMessageQueuing( messageQueuingEnabled );
     440    tr_setMessageQueuing( data->messageQueuingEnabled );
    403441 
    404442 
     
    456494    session->so_rcvbuf = 8192;
    457495 
    458     tr_setConfigDir( session, configDir );
    459 
    460     tr_netInit( ); /* must go before tr_eventInit */
    461     tr_eventInit( session );
    462     assert( session->events != NULL );
     496    tr_setConfigDir( session, data->configDir );
     497
     498    tr_trackerSessionInit( session );
    463499
    464500    session->peerMgr = tr_peerMgrNew( session );
     
    531567
    532568    tr_bencFree( &settings );
    533 
    534     session->isWaiting = TRUE;
    535     tr_runInEventThread( session, tr_sessionInitImpl, session );
    536     while( session->isWaiting )
    537         tr_wait( 100 );
    538 
    539     return session;
    540 }
    541 static void
    542 tr_sessionInitImpl( void * vsession )
    543 {
    544     tr_session * session = vsession;
    545569
    546570    assert( tr_isSession( session ) );
  • trunk/libtransmission/tracker.c

    r7718 r7824  
    842842static int trackerPulse( void * vsession );
    843843
    844 static void
    845 ensureGlobalsExist( tr_session * session )
    846 {
    847     if( session->tracker == NULL )
    848     {
    849         session->tracker = tr_new0( struct tr_tracker_handle, 1 );
    850         session->tracker->pulseTimer =
    851             tr_timerNew( session, trackerPulse, session,
    852                          PULSE_INTERVAL_MSEC );
    853         dbgmsg( NULL, "creating tracker timer" );
    854     }
     844void
     845tr_trackerSessionInit( tr_session * session )
     846{
     847    assert( tr_isSession( session ) );
     848
     849    session->tracker = tr_new0( struct tr_tracker_handle, 1 );
     850    session->tracker->pulseTimer = tr_timerNew( session, trackerPulse, session, PULSE_INTERVAL_MSEC );
     851    dbgmsg( NULL, "creating tracker timer" );
    855852}
    856853
     
    10351032    const tr_info * info = &torrent->info;
    10361033    tr_tracker *    t;
    1037 
    1038     ensureGlobalsExist( torrent->session );
    10391034
    10401035    t = tr_new0( tr_tracker, 1 );
  • trunk/libtransmission/tracker.h

    r7658 r7824  
    3131
    3232void         tr_trackerFree( tr_tracker * );
     33
     34/**
     35***
     36**/
     37
     38void         tr_trackerSessionInit( tr_session * );
    3339
    3440void         tr_trackerSessionClose( tr_session * );
  • trunk/libtransmission/trevent.c

    r7812 r7824  
    192192        }
    193193
    194         case 't': /* create timer */
    195         {
    196             tr_timer *    timer;
    197             const size_t  nwant = sizeof( timer );
    198             const ssize_t ngot = piperead( fd, &timer, nwant );
    199             if( !eh->die && ( ngot == (ssize_t)nwant ) )
    200             {
    201                 dbgmsg( "adding timer in libevent thread" );
    202                 evtimer_add( &timer->event, &timer->tv );
    203             }
    204             break;
    205         }
    206 
    207194        case '\0': /* eof */
    208195        {
     
    295282{
    296283    assert( tr_isSession( session ) );
    297     assert( session->events );
     284    assert( session->events != NULL );
    298285
    299286    return tr_amInThread( session->events->thread );
     
    342329
    343330tr_timer*
    344 tr_timerNew( tr_session * session,
    345              timer_func   func,
    346              void       * user_data,
    347              uint64_t     interval_milliseconds )
     331tr_timerNew( tr_session  * session,
     332             timer_func    func,
     333             void        * user_data,
     334             uint64_t      interval_milliseconds )
    348335{
    349336    tr_timer * timer;
    350337
    351     assert( tr_isSession( session ) );
    352     assert( session->events != NULL );
     338    assert( tr_amInEventThread( session ) );
    353339
    354340    timer = tr_new0( tr_timer, 1 );
    355     tr_timevalMsec( interval_milliseconds, &timer->tv );
    356341    timer->func = func;
    357342    timer->user_data = user_data;
    358343    timer->eh = session->events;
     344
     345    tr_timevalMsec( interval_milliseconds, &timer->tv );
    359346    evtimer_set( &timer->event, timerCallback, timer );
    360 
    361     if( tr_amInThread( session->events->thread ) )
    362     {
    363         evtimer_add( &timer->event,  &timer->tv );
    364     }
    365     else
    366     {
    367         const char ch = 't';
    368         int        fd = session->events->fds[1];
    369         tr_lock *  lock = session->events->lock;
    370 
    371         tr_lockLock( lock );
    372         pipewrite( fd, &ch, 1 );
    373         pipewrite( fd, &timer, sizeof( timer ) );
    374         tr_lockUnlock( lock );
    375     }
     347    evtimer_add( &timer->event,  &timer->tv );
    376348
    377349    return timer;
  • trunk/libtransmission/webseed.c

    r7658 r7824  
    6060    if( w->callback )
    6161        w->callback( NULL, e, w->callback_userdata );
    62 }
    63 
    64 static void
    65 fireNeedReq( tr_webseed * w )
    66 {
    67     tr_peer_event e = blankEvent;
    68     e.eventType = TR_PEER_NEED_REQ;
    69     publish( w, &e );
    7062}
    7163
     
    181173            if( w->dead )
    182174                tr_webseedFree( w );
    183             else  {
     175            else
    184176                fireClientGotBlock( w, w->pieceIndex, w->pieceOffset, w->byteCount );
    185                 fireNeedReq( w );
    186             }
    187177        }
    188178    }
Note: See TracChangeset for help on using the changeset viewer.