Ignore:
Timestamp:
Nov 26, 2009, 5:13:58 AM (11 years ago)
Author:
charles
Message:

(trunk libT) #2430 "Peer atom pool grows too large" -- add an atom expiration system along the lines of the suggestions in this ticket's comments. jch and KyleK, let me know if you think anything in this commit needs improvement.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/libtransmission/peer-mgr.c

    r9553 r9582  
    4242enum
    4343{
     44    /* how frequently to cull old atoms */
     45    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
     46
    4447    /* how frequently to change which peers are choked */
    4548    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
     
    121124    time_t      time;        /* when the peer's connection status last changed */
    122125    time_t      piece_data_time;
     126
     127    /* similar to a TTL field, but less rigid --
     128     * if the swarm is small, the atom will be kept past this date. */
     129    time_t      shelf_date;
    123130};
    124131
     
    186193    tr_timer        * reconnectTimer;
    187194    tr_timer        * refillUpkeepTimer;
     195    tr_timer        * atomTimer;
    188196};
    189197
     
    268276
    269277static int
    270 comparePeerAtoms( const void * va, const void * vb )
     278compareAtomsByAddress( const void * va, const void * vb )
    271279{
    272280    const struct peer_atom * b = vb;
     
    464472}
    465473
    466 
    467 static int bandwidthPulse ( void * vmgr );
    468 static int rechokePulse   ( void * vmgr );
    469 static int reconnectPulse ( void * vmgr );
    470 static int refillUpkeep   ( void * vmgr );
    471 
    472474tr_peerMgr*
    473475tr_peerMgrNew( tr_session * session )
     
    482484deleteTimers( struct tr_peerMgr * m )
    483485{
     486    if( m->atomTimer )
     487        tr_timerFree( &m->atomTimer );
     488
    484489    if( m->bandwidthTimer )
    485490        tr_timerFree( &m->bandwidthTimer );
     
    13861391}
    13871392
    1388 static void
    1389 ensureAtomExists( Torrent          * t,
    1390                   const tr_address * addr,
    1391                   tr_port            port,
    1392                   uint8_t            flags,
    1393                   uint8_t            from )
     1393static int
     1394getDefaultShelfLife( uint8_t from )
     1395{
     1396    /* in general, peers obtained from firsthand contact
     1397     * are better than those from secondhand, etc etc */
     1398    switch( from )
     1399    {
     1400        case TR_PEER_FROM_INCOMING : return 60 * 60 * 8;
     1401        case TR_PEER_FROM_LTEP     : return 60 * 60 * 8;
     1402        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 4;
     1403        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
     1404        case TR_PEER_FROM_RESUME   : return 60 * 60;
     1405        case TR_PEER_FROM_DHT      : return 60 * 40;
     1406        default                    : return 60 * 60;           
     1407    }
     1408}
     1409
     1410
     1411static void
     1412ensureAtomExists( Torrent           * t,
     1413                  const time_t        now,
     1414                  const tr_address  * addr,
     1415                  const tr_port       port,
     1416                  const uint8_t       flags,
     1417                  const uint8_t       from )
    13941418{
    13951419    assert( tr_isAddress( addr ) );
     
    13991423    {
    14001424        struct peer_atom * a;
     1425        const int jitter = tr_cryptoWeakRandInt( 120 );
     1426
    14011427        a = tr_new0( struct peer_atom, 1 );
    14021428        a->addr = *addr;
     
    14041430        a->flags = flags;
    14051431        a->from = from;
     1432        a->shelf_date = now + getDefaultShelfLife( from ) + jitter;
     1433        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
     1434
    14061435        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
    1407         tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
    14081436    }
    14091437}
     
    14731501    {
    14741502        struct peer_atom * atom;
    1475         ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
     1503        const time_t now = time( NULL );
     1504
     1505        ensureAtomExists( t, now, addr, port, 0, TR_PEER_FROM_INCOMING );
    14761506        atom = getExistingAtom( t, addr );
    1477         atom->time = time( NULL );
     1507        atom->time = now;
    14781508        atom->piece_data_time = 0;
    14791509
     
    15851615        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
    15861616            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
    1587                 ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
     1617                ensureAtomExists( t, time( NULL ), &pex->addr, pex->port, pex->flags, from );
    15881618
    15891619        managerUnlock( t->manager );
     
    18251855}
    18261856
     1857static int atomPulse      ( void * vmgr );
     1858static int bandwidthPulse ( void * vmgr );
     1859static int rechokePulse   ( void * vmgr );
     1860static int reconnectPulse ( void * vmgr );
     1861
    18271862static void
    18281863ensureMgrTimersExist( struct tr_peerMgr * m )
    18291864{
    18301865    tr_session * s = m->session;
     1866
     1867    if( m->atomTimer == NULL )
     1868        m->atomTimer = tr_timerNew( s, atomPulse, m, ATOM_PERIOD_MSEC );
    18311869
    18321870    if( m->bandwidthTimer == NULL )
     
    29082946    return TRUE;
    29092947}
     2948
     2949/***
     2950****
     2951***/
     2952
     2953static int
     2954compareAtomPtrsByAddress( const void * va, const void *vb )
     2955{
     2956    const struct peer_atom * a = * (const struct peer_atom**) va;
     2957    const struct peer_atom * b = * (const struct peer_atom**) vb;
     2958
     2959    assert( tr_isAtom( a ) );
     2960    assert( tr_isAtom( b ) );
     2961
     2962    return tr_compareAddresses( &a->addr, &b->addr );
     2963}
     2964
     2965static time_t tr_now = 0;
     2966
     2967/* best come first, worst go last */
     2968static int
     2969compareAtomPtrsByShelfDate( const void * va, const void *vb )
     2970{
     2971    time_t atime;
     2972    time_t btime;
     2973    const struct peer_atom * a = * (const struct peer_atom**) va;
     2974    const struct peer_atom * b = * (const struct peer_atom**) vb;
     2975    const int data_time_cutoff_secs = 60 * 60;
     2976
     2977    assert( tr_isAtom( a ) );
     2978    assert( tr_isAtom( b ) );
     2979
     2980    /* primary key: the last piece data time *if* it was within the last hour */
     2981    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
     2982    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
     2983    if( atime != btime )
     2984        return atime > btime ? -1 : 1;
     2985
     2986    /* secondary key: shelf date. */
     2987    if( a->shelf_date != b->shelf_date )
     2988        return a->shelf_date > b->shelf_date ? -1 : 1;
     2989
     2990    return 0;
     2991}
     2992
     2993static int
     2994getMaxAtomCount( const tr_torrent * tor )
     2995{
     2996    /* FIXME: this curve should be smoother... */
     2997    const int n = tor->maxConnectedPeers;
     2998    if( n >= 200 ) return n * 1.5;
     2999    if( n >= 100 ) return n * 2;
     3000    if( n >=  50 ) return n * 3;
     3001    if( n >=  20 ) return n * 5;
     3002    return n * 10;
     3003}
     3004
     3005static int
     3006atomPulse( void * vmgr )
     3007{
     3008    tr_torrent * tor = NULL;
     3009    tr_peerMgr * mgr = vmgr;
     3010    managerLock( mgr );
     3011
     3012    while(( tor = tr_torrentNext( mgr->session, tor )))
     3013    {
     3014        int atomCount;
     3015        Torrent * t = tor->torrentPeers;
     3016        const int maxAtomCount = getMaxAtomCount( tor );
     3017        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
     3018
     3019        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
     3020        {
     3021            int i;
     3022            int keepCount = 0;
     3023            int testCount = 0;
     3024            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
     3025            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
     3026            tordbg( t, "max atom count is %d; have %d.. pruning\n", maxAtomCount, atomCount );
     3027
     3028            /* keep the ones that are in use */
     3029            for( i=0; i<atomCount; ++i ) {
     3030                struct peer_atom * atom = atoms[i];
     3031                if( peerIsInUse( t, &atom->addr ) )
     3032                    keep[keepCount++] = atom;
     3033                else
     3034                    test[testCount++] = atom;
     3035            }
     3036
     3037            /* if there's room, keep the best of what's left */
     3038            i = 0;
     3039            if( keepCount < maxAtomCount ) {
     3040                tr_now = time( NULL );
     3041                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
     3042                while( i<testCount && keepCount<maxAtomCount )
     3043                    keep[keepCount++] = test[i++];
     3044            }
     3045
     3046            /* free the culled atoms */
     3047            while( i<testCount )
     3048                tr_free( test[i++] );
     3049
     3050            /* rebuild Torrent.pool with what's left */
     3051            tr_ptrArrayDestruct( &t->pool, NULL );
     3052            t->pool = TR_PTR_ARRAY_INIT;
     3053            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
     3054            for( i=0; i<keepCount; ++i ) {
     3055                if( i+1<keepCount )
     3056                    assert( tr_compareAddresses( &keep[i]->addr, &keep[i+1]->addr ) < 0 );
     3057                tr_ptrArrayAppend( &t->pool, keep[i] );
     3058            }
     3059
     3060            /* cleanup */
     3061            tr_free( test );
     3062            tr_free( keep );
     3063        }
     3064    }
     3065
     3066    managerUnlock( mgr );
     3067    return TRUE;
     3068}
Note: See TracChangeset for help on using the changeset viewer.