Changeset 4004


Ignore:
Timestamp:
Nov 29, 2007, 12:43:58 AM (14 years ago)
Author:
charles
Message:

rewrite the tracker code. this should improve and/or fix a number of bugs, including "too many open files", "router death", "slow internet", and the mutex release crash.

Location:
trunk/libtransmission
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • trunk/libtransmission/internal.h

    r3976 r4004  
    196196
    197197    struct tr_stats_handle   * sessionStats;
     198    struct tr_tracker_handle * tracker;
    198199};
    199200
  • trunk/libtransmission/tracker.c

    r3991 r4004  
    2525#include "bencode.h"
    2626#include "completion.h"
     27#include "list.h"
    2728#include "net.h"
    2829#include "publish.h"
     
    3435enum
    3536{
     37    /* seconds between tracker pulses */
     38    PULSE_INTERVAL_SEC = 1,
     39
     40    /* maximum number of concurrent tracker socket connections */
     41    MAX_TRACKER_SOCKETS = 16,
     42
    3643    /* unless the tracker says otherwise, rescrape this frequently */
    3744    DEFAULT_SCRAPE_INTERVAL_SEC = (60 * 15),
     
    6067***
    6168**/
     69
     70enum
     71{
     72    TR_REQ_STARTED,
     73    TR_REQ_COMPLETED,
     74    TR_REQ_STOPPED,
     75    TR_REQ_REANNOUNCE,
     76    TR_REQ_COUNT
     77};
    6278
    6379struct tr_tracker
     
    107123
    108124    time_t manualAnnounceAllowedAt;
    109 
    110     tr_timer * scrapeTimer;
    111     tr_timer * reannounceTimer;
    112 
    113     unsigned int isRunning : 1;
     125    time_t reannounceAt;
     126    time_t scrapeAt;
     127
     128    unsigned int isRunning     : 1;
    114129};
    115130
     
    145160#define dbgmsg(t, fmt...) myDebug(__FILE__, __LINE__, t, ##fmt )
    146161
    147 
    148162/***
    149 ****  Connections that know how to clean up after themselves
     163****
    150164***/
    151165
     166static tr_tracker_info *
     167getCurrentAddress( const tr_tracker * t )
     168{
     169    assert( t->addresses != NULL );
     170    assert( t->addressIndex >= 0 );
     171    assert( t->addressIndex < t->addressCount );
     172
     173    return t->redirect ? t->redirect
     174                       : t->addresses + t->addressIndex;
     175}
     176
    152177static int
    153 freeConnection( void * evcon )
    154 {
    155     evhttp_connection_free( evcon );
    156     return FALSE;
    157 }
    158 
    159 static void
    160 connectionClosedCB( struct evhttp_connection * evcon, void * handle )
    161 {
    162     /* libevent references evcon right after calling this function,
    163        so we can't free it yet... defer it to after this call chain
    164        has played out */
    165     tr_timerNew( handle, freeConnection, evcon, 100 );
    166 }
    167 
    168 static struct evhttp_connection*
    169 getConnection( tr_tracker * t, const char * address, int port )
    170 {
    171     struct evhttp_connection * c = evhttp_connection_new( address, port );
    172     evhttp_connection_set_closecb( c, connectionClosedCB, t->handle );
    173     return c;
     178trackerSupportsScrape( const tr_tracker * t )
     179{
     180    const tr_tracker_info * info = getCurrentAddress( t );
     181
     182    return ( info != NULL )
     183        && ( info->scrape != NULL )
     184        && ( info->scrape[0] != '\0' );
     185}
     186
     187/***
     188****
     189***/
     190
     191struct torrent_hash
     192{
     193    tr_handle * handle;
     194    uint8_t hash[SHA_DIGEST_LENGTH];
     195};
     196
     197static struct torrent_hash*
     198torrentHashNew( tr_handle * handle, const tr_tracker * t )
     199{
     200    struct torrent_hash * data = tr_new( struct torrent_hash, 1 );
     201    data->handle = handle;
     202    memcpy( data->hash, t->hash, SHA_DIGEST_LENGTH );
     203    return data;
     204}
     205
     206tr_tracker *
     207findTrackerFromHash( struct torrent_hash * data )
     208{
     209    tr_torrent * torrent = tr_torrentFindFromHash( data->handle, data->hash );
     210    return torrent ? torrent->tracker : NULL;
    174211}
    175212
     
    221258
    222259/***
    223 ****  LIFE CYCLE
     260****
    224261***/
    225262
    226263static void
    227 generateKeyParam( char * msg, int len )
    228 {
    229     int i;
    230     const char * pool = "abcdefghijklmnopqrstuvwxyz0123456789";
    231     const int poolSize = strlen( pool );
    232     for( i=0; i<len; ++i )
    233         *msg++ = pool[tr_rand(poolSize)];
    234     *msg = '\0';
    235 }
    236 
    237 static void
    238 escape( char * out, const uint8_t * in, int in_len ) /* rfc2396 */
    239 {
    240     const uint8_t *end = in + in_len;
    241     while( in != end )
    242         if( isalnum(*in) )
    243             *out++ = (char) *in++;
    244         else
    245             out += snprintf( out, 4, "%%%02X", (unsigned int)*in++ );
    246     *out = '\0';
    247 }
    248 
    249 static int onScrapeNow( void * vt );
    250 
    251 tr_tracker *
    252 tr_trackerNew( const tr_torrent * torrent )
    253 {
    254     const tr_info * info = &torrent->info;
    255     int i, j, sum, *iwalk;
    256     tr_tracker_info * nwalk;
    257     tr_tracker * t;
    258 
    259     tr_dbg( "making a new tracker for \"%s\"", info->primaryAddress );
    260 
    261     t = tr_new0( tr_tracker, 1 );
    262     t->handle = torrent->handle;
    263     t->scrapeIntervalSec       = DEFAULT_SCRAPE_INTERVAL_SEC;
    264     t->announceIntervalSec     = DEFAULT_ANNOUNCE_INTERVAL_SEC;
    265     t->announceMinIntervalSec  = DEFAULT_ANNOUNCE_MIN_INTERVAL_SEC;
    266     generateKeyParam( t->key_param, KEYLEN );
    267 
    268     t->publisher = tr_publisherNew( );
    269     t->timesDownloaded = -1;
    270     t->seederCount = -1;
    271     t->leecherCount = -1;
    272     t->manualAnnounceAllowedAt = ~(time_t)0;
    273     t->name = tr_strdup( info->name );
    274     memcpy( t->hash, info->hash, SHA_DIGEST_LENGTH );
    275     escape( t->escaped, info->hash, SHA_DIGEST_LENGTH );
    276 
    277     for( sum=i=0; i<info->trackerTiers; ++i )
    278          sum += info->trackerList[i].count;
    279     t->addresses = nwalk = tr_new0( tr_tracker_info, sum );
    280     t->addressIndex = 0;
    281     t->addressCount = sum;
    282     t->tierFronts = iwalk = tr_new0( int, sum );
    283 
    284     for( i=0; i<info->trackerTiers; ++i )
    285     {
    286         const int tierFront = nwalk - t->addresses;
    287 
    288         for( j=0; j<info->trackerList[i].count; ++j )
    289         {
    290             const tr_tracker_info * src = &info->trackerList[i].list[j];
    291             nwalk->address = tr_strdup( src->address );
    292             nwalk->port = src->port;
    293             nwalk->announce = tr_strdup( src->announce );
    294             nwalk->scrape = tr_strdup( src->scrape );
    295             ++nwalk;
    296 
    297             *iwalk++ = tierFront;
    298         }
    299     }
    300 
    301     assert( nwalk - t->addresses == sum );
    302     assert( iwalk - t->tierFronts == sum );
    303 
    304     /* scrape sometime in the next two minutes.
    305        scrapes are staggered out like this to prevent
    306        hundreds of scrapes from going out at the same time */
    307     t->scrapeTimer = tr_timerNew( t->handle,
    308                                   onScrapeNow, t,
    309                                   tr_rand(120)*1000 );
    310 
    311     return t;
    312 }
    313 
    314 static void
    315 onTrackerFreeNow( void * vt )
    316 {
    317     int i;
    318     tr_tracker * t = vt;
    319 
    320     tr_timerFree( &t->scrapeTimer );
    321     tr_timerFree( &t->reannounceTimer );
    322     tr_publisherFree( &t->publisher );
    323     tr_free( t->name );
    324     tr_free( t->trackerID );
    325     tr_free( t->lastRequest );
    326 
    327     /* addresses... */
    328     for( i=0; i<t->addressCount; ++i )
    329         tr_trackerInfoClear( &t->addresses[i] );
    330     tr_free( t->addresses );
    331     tr_free( t->tierFronts );
    332 
    333     /* redirect... */
    334     if( t->redirect ) {
    335         tr_trackerInfoClear( t->redirect );
    336         tr_free( t->redirect );
    337     }
    338 
    339     tr_free( t );
    340 }
    341 
    342 void
    343 tr_trackerFree( tr_tracker * t )
    344 {
    345     tr_runInEventThread( t->handle, onTrackerFreeNow, t );
    346 }
    347 
    348 /***
    349 ****  UTIL
    350 ***/
     264onStoppedResponse( struct evhttp_request * req UNUSED, void * handle UNUSED )
     265{
     266    dbgmsg( NULL, "got a response to some `stop' message" );
     267}
    351268
    352269static int
     
    427344}
    428345
    429 static tr_tracker_info *
    430 getCurrentAddress( const tr_tracker * t )
    431 {
    432     assert( t->addresses != NULL );
    433     assert( t->addressIndex >= 0 );
    434     assert( t->addressIndex < t->addressCount );
    435 
    436     return t->redirect ? t->redirect
    437                        : t->addresses + t->addressIndex;
    438 }
    439 static int
    440 trackerSupportsScrape( const tr_tracker * t )
    441 {
    442     const tr_tracker_info * info = getCurrentAddress( t );
    443 
    444     return ( info != NULL )
    445         && ( info->scrape != NULL )
    446         && ( info->scrape[0] != '\0' );
    447 }
    448 
    449 
    450 static void
    451 addCommonHeaders( const tr_tracker * t,
    452                   struct evhttp_request * req )
    453 {
    454     char buf[1024];
    455     const tr_tracker_info * address = getCurrentAddress( t );
    456     snprintf( buf, sizeof(buf), "%s:%d", address->address, address->port );
    457     evhttp_add_header( req->output_headers, "Host", buf );
    458     evhttp_add_header( req->output_headers, "Connection", "close" );
    459     evhttp_add_header( req->output_headers, "Content-Length", "0" );
    460     evhttp_add_header( req->output_headers, "User-Agent",
    461                                          TR_NAME "/" LONG_VERSION_STRING );
    462 }
    463 
    464 /**
    465 ***
    466 **/
    467 
    468 struct torrent_hash
    469 {
    470     tr_handle * handle;
    471     uint8_t hash[SHA_DIGEST_LENGTH];
    472 };
    473 
    474 static struct torrent_hash*
    475 torrentHashNew( tr_tracker * t )
    476 {
    477     struct torrent_hash * data = tr_new( struct torrent_hash, 1 );
    478     data->handle = t->handle;
    479     memcpy( data->hash, t->hash, SHA_DIGEST_LENGTH );
    480     return data;
    481 }
    482 
    483 tr_tracker *
    484 findTrackerFromHash( struct torrent_hash * data )
    485 {
    486     tr_torrent * torrent = tr_torrentFindFromHash( data->handle, data->hash );
    487     return torrent ? torrent->tracker : NULL;
    488 }
    489 
    490 /***
    491 ****
    492 ****  SCRAPE
    493 ****
    494 ***/
     346/* Convert to compact form */
     347static uint8_t *
     348parseOldPeers( benc_val_t * bePeers, int * setmePeerCount )
     349{
     350    int i;
     351    uint8_t *compact, *walk;
     352    const int peerCount = bePeers->val.l.count;
     353
     354    assert( bePeers->type == TYPE_LIST );
     355
     356    compact = tr_new( uint8_t, peerCount*6 );
     357
     358    for( i=0, walk=compact; i<peerCount; ++i )
     359    {
     360        struct in_addr addr;
     361        tr_port_t port;
     362        benc_val_t * val;
     363        benc_val_t * peer = &bePeers->val.l.vals[i];
     364
     365        val = tr_bencDictFind( peer, "ip" );
     366        if( !val || val->type!=TYPE_STR || tr_netResolve(val->val.s.s, &addr) )
     367            continue;
     368
     369        memcpy( walk, &addr, 4 );
     370        walk += 4;
     371
     372        val = tr_bencDictFind( peer, "port" );
     373        if( !val || val->type!=TYPE_INT || val->val.i<0 || val->val.i>0xffff )
     374            continue;
     375
     376        port = htons( val->val.i );
     377        memcpy( walk, &port, 2 );
     378        walk += 2;
     379    }
     380
     381    *setmePeerCount = peerCount;
     382    return compact;
     383}
     384
     385static void
     386onTrackerResponse( struct evhttp_request * req, void * torrent_hash )
     387{
     388    const char * warning;
     389    tr_tracker * t;
     390    int responseCode;
     391
     392    t = findTrackerFromHash( torrent_hash );
     393    tr_free( torrent_hash );
     394    if( t == NULL ) /* tracker has been closed */
     395        return;
     396
     397    dbgmsg( t, "got response from tracker: \"%s\"",
     398            ( req && req->response_code_line ) ?  req->response_code_line
     399                                               : "(null)" );
     400
     401    tr_inf( "Torrent \"%s\" tracker response: %s",
     402            t->name,
     403            ( req ? req->response_code_line : "(null)") );
     404
     405    if( req && ( req->response_code == HTTP_OK ) )
     406    {
     407        benc_val_t benc;
     408        const int bencLoaded = !parseBencResponse( req, &benc );
     409
     410        publishErrorClear( t );
     411
     412        if( bencLoaded && benc.type==TYPE_DICT )
     413        {
     414            benc_val_t * tmp;
     415
     416            if(( tmp = tr_bencDictFind( &benc, "failure reason" ))) {
     417                dbgmsg( t, "got failure message [%s]", tmp->val.s.s );
     418                publishErrorMessage( t, tmp->val.s.s );
     419            }
     420
     421            if(( tmp = tr_bencDictFind( &benc, "warning message" ))) {
     422                dbgmsg( t, "got warning message [%s]", tmp->val.s.s );
     423                publishWarning( t, tmp->val.s.s );
     424            }
     425
     426            if(( tmp = tr_bencDictFind( &benc, "interval" ))) {
     427                dbgmsg( t, "setting interval to %d", tmp->val.i );
     428                t->announceIntervalSec = tmp->val.i;
     429            }
     430
     431            if(( tmp = tr_bencDictFind( &benc, "min interval" ))) {
     432                dbgmsg( t, "setting min interval to %d", tmp->val.i );
     433                t->announceMinIntervalSec = tmp->val.i;
     434            }
     435
     436            if(( tmp = tr_bencDictFind( &benc, "tracker id" )))
     437                t->trackerID = tr_strndup( tmp->val.s.s, tmp->val.s.i );
     438
     439            if(( tmp = tr_bencDictFind( &benc, "complete" )))
     440                t->seederCount = tmp->val.i;
     441
     442            if(( tmp = tr_bencDictFind( &benc, "incomplete" )))
     443                t->leecherCount = tmp->val.i;
     444
     445            if(( tmp = tr_bencDictFind( &benc, "peers" )))
     446            {
     447                int peerCount = 0;
     448                uint8_t * peerCompact = NULL;
     449
     450                if( tmp->type == TYPE_LIST ) /* original protocol */
     451                {
     452                    if( tmp->val.l.count > 0 )
     453                        peerCompact = parseOldPeers( tmp, &peerCount );
     454                }
     455                else if( tmp->type == TYPE_STR ) /* "compact" extension */
     456                {
     457                    if( tmp->val.s.i >= 6 )
     458                    {
     459                        peerCount = tmp->val.s.i / 6;
     460                        peerCompact = tr_new( uint8_t, tmp->val.s.i );
     461                        memcpy( peerCompact, tmp->val.s.s, tmp->val.s.i );
     462                    }
     463                }
     464
     465                publishNewPeers( t, peerCount, peerCompact );
     466                tr_free( peerCompact );
     467            }
     468        }
     469
     470        if( bencLoaded )
     471            tr_bencFree( &benc );
     472    }
     473
     474    if (( warning = updateAddresses( t, req ) )) {
     475        publishWarning( t, warning );
     476        tr_err( warning );
     477    }
     478
     479    /**
     480    ***
     481    **/
     482
     483    responseCode = req ? req->response_code : 503;
     484
     485    if( 200<=responseCode && responseCode<=299 )
     486    {
     487        dbgmsg( t, "request succeeded. reannouncing in %d seconds",
     488                   t->announceIntervalSec );
     489        t->reannounceAt = time(NULL) + t->announceIntervalSec;
     490        t->manualAnnounceAllowedAt = time(NULL) + t->announceMinIntervalSec;
     491    }
     492    else if( 300<=responseCode && responseCode<=399 )
     493    {
     494        dbgmsg( t, "got a redirect; retrying immediately" );
     495
     496        /* it's a redirect... updateAddresses() has already
     497         * parsed the redirect, all that's left is to retry */
     498        t->reannounceAt = time(NULL);
     499        t->manualAnnounceAllowedAt = time(NULL) + t->announceMinIntervalSec;
     500    }
     501    else if( 400<=responseCode && responseCode<=499 )
     502    {
     503        dbgmsg( t, "got a 4xx error." );
     504
     505        /* The request could not be understood by the server due to
     506         * malformed syntax. The client SHOULD NOT repeat the
     507         * request without modifications. */
     508        if( req && req->response_code_line )
     509            publishErrorMessage( t, req->response_code_line );
     510        t->manualAnnounceAllowedAt = ~(time_t)0;
     511        t->reannounceAt = 0;
     512    }
     513    else if( 500<=responseCode && responseCode<=599 )
     514    {
     515        dbgmsg( t, "Got a 5xx error... retrying in 15 seconds." );
     516
     517        /* Response status codes beginning with the digit "5" indicate
     518         * cases in which the server is aware that it has erred or is
     519         * incapable of performing the request.  So we pause a bit and
     520         * try again. */
     521        if( req && req->response_code_line )
     522            publishWarning( t, req->response_code_line );
     523        t->manualAnnounceAllowedAt = ~(time_t)0;
     524        t->reannounceAt = time(NULL) + 15;
     525    }
     526    else
     527    {
     528        dbgmsg( t, "Invalid response from tracker... retrying in 60 seconds." );
     529
     530        /* WTF did we get?? */
     531        if( req && req->response_code_line )
     532            publishErrorMessage( t, req->response_code_line );
     533        t->manualAnnounceAllowedAt = ~(time_t)0;
     534        t->reannounceAt = time(NULL) + 60;
     535    }
     536}
    495537
    496538static void
     
    562604    }
    563605
    564     tr_timerFree( &t->scrapeTimer );
    565 
    566     t->scrapeTimer = tr_timerNew( t->handle,
    567                                   onScrapeNow, t,
    568                                   nextScrapeSec*1000 );
    569 }
    570 
    571 static int
    572 onScrapeNow( void * vt )
    573 {
    574     tr_tracker * t = vt;
    575     const tr_tracker_info * address = getCurrentAddress( t );
    576 
    577     if( trackerSupportsScrape( t ) )
    578     {
    579         char * uri;
    580         struct evhttp_connection * evcon;
    581         struct evhttp_request *req;
    582         struct evbuffer * buf = evbuffer_new( );
    583 
    584         evbuffer_add_printf( buf, "%s%sinfo_hash=%s",
    585                              address->scrape,
    586                              ( strchr(address->scrape, '?') == NULL ? "?" : "&" ),
    587                              t->escaped );
    588         uri = tr_strdup( (char*) EVBUFFER_DATA( buf ) );
    589         evbuffer_free( buf );
    590 
    591         tr_inf( "Sending scrape to tracker %s:%d: %s",
    592                 address->address, address->port, uri );
    593 
    594         evcon = getConnection( t, address->address, address->port );
    595         evhttp_connection_set_timeout( evcon, TIMEOUT_INTERVAL_SEC );
    596         req = evhttp_request_new( onScrapeResponse, torrentHashNew( t ) );
    597         addCommonHeaders( t, req );
    598         tr_evhttp_make_request( t->handle, evcon, req, EVHTTP_REQ_GET, uri );
    599     }
    600 
    601     t->scrapeTimer = NULL;
    602     return FALSE;
     606    t->scrapeAt = time(NULL) + nextScrapeSec;
    603607}
    604608
    605609/***
    606610****
    607 ****  TRACKER REQUESTS
    608 ****
    609611***/
     612
     613struct tr_tracker_request
     614{
     615    int port;
     616    int timeout;
     617    char * address;
     618    char * uri;
     619    struct evhttp_request * req;
     620};
     621
     622static void
     623freeRequest( struct tr_tracker_request ** req )
     624{
     625    tr_free( (*req)->address );
     626    tr_free( (*req)->uri );
     627    tr_free( (*req) );
     628    *req = NULL;
     629}
     630
     631static void
     632addCommonHeaders( const tr_tracker * t,
     633                  struct evhttp_request * req )
     634{
     635    char buf[1024];
     636    const tr_tracker_info * address = getCurrentAddress( t );
     637    snprintf( buf, sizeof(buf), "%s:%d", address->address, address->port );
     638    evhttp_add_header( req->output_headers, "Host", buf );
     639    evhttp_add_header( req->output_headers, "Connection", "close" );
     640    evhttp_add_header( req->output_headers, "Content-Length", "0" );
     641    evhttp_add_header( req->output_headers, "User-Agent",
     642                                         TR_NAME "/" LONG_VERSION_STRING );
     643}
    610644
    611645static char*
     
    658692}
    659693
    660 /* Convert to compact form */
    661 static uint8_t *
    662 parseOldPeers( benc_val_t * bePeers, int * setmePeerCount )
     694static struct tr_tracker_request*
     695createRequest( tr_handle * handle, const tr_tracker * tracker, int reqtype )
     696{
     697    static const char* strings[TR_REQ_COUNT] = { "started", "completed", "stopped", "" };
     698    const tr_torrent * torrent = tr_torrentFindFromHash( handle, tracker->hash );
     699    const tr_tracker_info * address = getCurrentAddress( tracker );
     700    const int isStopping = reqtype == TR_REQ_STOPPED;
     701    const char * eventName = strings[reqtype];
     702    struct tr_tracker_request * req;
     703
     704    req = tr_new0( struct tr_tracker_request, 1 );
     705    req->address = tr_strdup( address->address );
     706    req->port = address->port;
     707    req->uri = buildTrackerRequestURI( tracker, torrent, eventName );
     708    req->timeout = isStopping ? STOP_TIMEOUT_INTERVAL_SEC : TIMEOUT_INTERVAL_SEC;
     709    req->req = isStopping
     710        ? evhttp_request_new( onStoppedResponse, handle )
     711        : evhttp_request_new( onTrackerResponse, torrentHashNew(handle, tracker) );
     712    addCommonHeaders( tracker, req->req );
     713
     714    return req;
     715}
     716
     717static struct tr_tracker_request*
     718createScrape( tr_handle * handle, const tr_tracker * tracker )
     719{
     720    const tr_tracker_info * a = getCurrentAddress( tracker );
     721    struct tr_tracker_request * req;
     722
     723    req = tr_new0( struct tr_tracker_request, 1 );
     724    req->address = tr_strdup( a->address );
     725    req->port = a->port;
     726    req->timeout = TIMEOUT_INTERVAL_SEC;
     727    req->req = evhttp_request_new( onScrapeResponse, torrentHashNew( handle, tracker ) );
     728    tr_asprintf( &req->uri, "%s%cinfo_hash=%s", a->scrape, strchr(a->scrape,'?')?'&':'?', tracker->escaped );
     729    addCommonHeaders( tracker, req->req );
     730
     731    return req;
     732}
     733
     734struct tr_tracker_handle
     735{
     736    int socketCount;
     737    tr_timer * pulseTimer;
     738    tr_list * requestQueue;
     739    tr_list * scrapeQueue;
     740};
     741
     742static int pulse( void * vhandle );
     743
     744static void
     745ensureGlobalsExist( tr_handle * handle )
     746{
     747    if( handle->tracker == NULL )
     748    {
     749        handle->tracker = tr_new0( struct tr_tracker_handle, 1 );
     750        handle->tracker->pulseTimer = tr_timerNew( handle, pulse, handle, PULSE_INTERVAL_SEC*1000 );
     751        dbgmsg( NULL, "creating tracker timer" );
     752    }
     753}
     754
     755static int
     756maybeFreeGlobals( tr_handle * handle )
     757{
     758    int globalsExist = handle->tracker != NULL;
     759
     760    if( globalsExist
     761        && ( handle->tracker->socketCount < 1 )
     762        && ( handle->tracker->requestQueue == NULL )
     763        && ( handle->tracker->scrapeQueue == NULL )
     764        && ( handle->torrentList== NULL ) )
     765    {
     766        dbgmsg( NULL, "freeing tracker timer" );
     767        tr_timerFree( &handle->tracker->pulseTimer );
     768        tr_free( handle->tracker );
     769        handle->tracker = NULL;
     770        globalsExist = FALSE;
     771    }
     772
     773    return globalsExist;
     774}
     775
     776/***
     777****
     778***/
     779
     780static int
     781freeConnection( void * evcon )
     782{
     783    evhttp_connection_free( evcon );
     784    return FALSE;
     785}
     786static void
     787connectionClosedCB( struct evhttp_connection * evcon, void * vhandle )
     788{
     789    tr_handle * handle = vhandle;
     790
     791    assert( handle );
     792    assert( handle->tracker );
     793
     794    /* libevent references evcon right after calling this function,
     795       so we can't free it yet... defer it to after this call chain
     796       has played out */
     797    tr_timerNew( handle, freeConnection, evcon, 100 );
     798
     799    --handle->tracker->socketCount;
     800    dbgmsg( NULL, "decrementing socket count to %d", handle->tracker->socketCount );
     801}
     802
     803static struct evhttp_connection*
     804getConnection( tr_handle * handle, const char * address, int port )
     805{
     806    struct evhttp_connection * c = evhttp_connection_new( address, port );
     807    evhttp_connection_set_closecb( c, connectionClosedCB, handle );
     808    return c;
     809}
     810
     811static void
     812invokeRequest( tr_handle * handle, const struct tr_tracker_request * req )
     813{
     814    struct evhttp_connection * evcon = getConnection( handle, req->address, req->port );
     815    ++handle->tracker->socketCount;
     816    dbgmsg( NULL, "incrementing socket count to %d, sending '%s' to tracker %s:%d", handle->tracker->socketCount, req->uri, req->address, req->port );
     817    evhttp_connection_set_timeout( evcon, req->timeout );
     818    evhttp_make_request( evcon, req->req, EVHTTP_REQ_GET, req->uri );
     819}
     820
     821static void
     822invokeNextInQueue( tr_handle * handle, tr_list ** list )
     823{
     824    struct tr_tracker_request * req = tr_list_pop_front( list );
     825    invokeRequest( handle, req );
     826    freeRequest( &req );
     827}
     828
     829static int
     830socketIsAvailable( tr_handle * handle )
     831{
     832    return handle->tracker->socketCount < MAX_TRACKER_SOCKETS;
     833}
     834
     835static void ensureGlobalsExist( tr_handle * );
     836
     837static void
     838enqueueRequest( tr_handle * handle, const tr_tracker * tracker, int reqtype )
     839{
     840    struct tr_tracker_request * req;
     841    ensureGlobalsExist( handle );
     842    req = createRequest( handle, tracker, reqtype );
     843    tr_list_append( &handle->tracker->requestQueue, req );
     844}
     845
     846static void
     847enqueueScrape( tr_handle * handle, const tr_tracker * tracker )
     848{
     849    struct tr_tracker_request * req;
     850    ensureGlobalsExist( handle );
     851    req = createScrape( handle, tracker );
     852    tr_list_append( &handle->tracker->scrapeQueue, req );
     853}
     854
     855static int
     856pulse( void * vhandle )
     857{
     858    tr_handle * handle = vhandle;
     859    struct tr_tracker_handle * th = handle->tracker;
     860    tr_torrent * t;
     861    const time_t now = time( NULL );
     862
     863    if( handle->tracker->socketCount || tr_list_size(th->requestQueue) || tr_list_size(th->scrapeQueue) )
     864        dbgmsg( NULL, "tracker pulse... %d sockets, %d reqs left, %d scrapes left", handle->tracker->socketCount, tr_list_size(th->requestQueue), tr_list_size(th->scrapeQueue) );
     865
     866    /* upkeep: queue periodic rescrape / reannounce */
     867    for( t=handle->torrentList; t; t=t->next ) {
     868        tr_tracker * tracker = t->tracker;
     869        if( tracker->scrapeAt && trackerSupportsScrape( tracker ) && ( now >= tracker->scrapeAt ) ) {
     870            tracker->scrapeAt = 0;
     871            enqueueScrape( handle, tracker );
     872        }
     873        if( tracker->reannounceAt && tracker->isRunning && ( now >= tracker->reannounceAt ) ) {
     874            tracker->reannounceAt = 0;
     875            enqueueRequest( handle, tracker, TR_REQ_REANNOUNCE );
     876        }
     877    }
     878
     879    if( handle->tracker->socketCount || tr_list_size(th->requestQueue) || tr_list_size(th->scrapeQueue) )
     880        dbgmsg( NULL, "tracker pulse after upkeep... %d sockets, %d reqs left, %d scrapes left", handle->tracker->socketCount, tr_list_size(th->requestQueue), tr_list_size(th->scrapeQueue) );
     881
     882    /* look for things to do... process all the requests, then process all the scrapes */
     883    while( th->requestQueue && socketIsAvailable( handle ) )
     884        invokeNextInQueue( handle, &th->requestQueue );
     885    while( th->scrapeQueue && socketIsAvailable( handle ) )
     886        invokeNextInQueue( handle, &th->scrapeQueue );
     887
     888    if( handle->tracker->socketCount || tr_list_size(th->requestQueue) || tr_list_size(th->scrapeQueue) )
     889        dbgmsg( NULL, "tracker pulse done... %d sockets, %d reqs left, %d scrapes left", handle->tracker->socketCount, tr_list_size(th->requestQueue), tr_list_size(th->scrapeQueue) );
     890
     891    return maybeFreeGlobals( handle );
     892}
     893
     894/***
     895****  LIFE CYCLE
     896***/
     897
     898static void
     899generateKeyParam( char * msg, int len )
    663900{
    664901    int i;
    665     uint8_t *compact, *walk;
    666     const int peerCount = bePeers->val.l.count;
    667 
    668     assert( bePeers->type == TYPE_LIST );
    669 
    670     compact = tr_new( uint8_t, peerCount*6 );
    671 
    672     for( i=0, walk=compact; i<peerCount; ++i )
    673     {
    674         struct in_addr addr;
    675         tr_port_t port;
    676         benc_val_t * val;
    677         benc_val_t * peer = &bePeers->val.l.vals[i];
    678 
    679         val = tr_bencDictFind( peer, "ip" );
    680         if( !val || val->type!=TYPE_STR || tr_netResolve(val->val.s.s, &addr) )
    681             continue;
    682 
    683         memcpy( walk, &addr, 4 );
    684         walk += 4;
    685 
    686         val = tr_bencDictFind( peer, "port" );
    687         if( !val || val->type!=TYPE_INT || val->val.i<0 || val->val.i>0xffff )
    688             continue;
    689 
    690         port = htons( val->val.i );
    691         memcpy( walk, &port, 2 );
    692         walk += 2;
    693     }
    694 
    695     *setmePeerCount = peerCount;
    696     return compact;
    697 }
    698 
    699 static int onRetry( void * vt );
    700 static int onReannounce( void * vt );
    701 
    702 static void
    703 onStoppedResponse( struct evhttp_request * req UNUSED, void * handle UNUSED )
    704 {
    705     dbgmsg( NULL, "got a response to some `stop' message" );
    706 }
    707 
    708 static void
    709 onTrackerResponse( struct evhttp_request * req, void * torrent_hash )
    710 {
    711     const char * warning;
     902    const char * pool = "abcdefghijklmnopqrstuvwxyz0123456789";
     903    const int poolSize = strlen( pool );
     904    for( i=0; i<len; ++i )
     905        *msg++ = pool[tr_rand(poolSize)];
     906    *msg = '\0';
     907}
     908
     909static void
     910escape( char * out, const uint8_t * in, int in_len ) /* rfc2396 */
     911{
     912    const uint8_t *end = in + in_len;
     913    while( in != end )
     914        if( isalnum(*in) )
     915            *out++ = (char) *in++;
     916        else
     917            out += snprintf( out, 4, "%%%02X", (unsigned int)*in++ );
     918    *out = '\0';
     919}
     920
     921tr_tracker *
     922tr_trackerNew( const tr_torrent * torrent )
     923{
     924    const tr_info * info = &torrent->info;
     925    int i, j, sum, *iwalk;
     926    tr_tracker_info * nwalk;
    712927    tr_tracker * t;
    713     int responseCode;
    714 
    715     t = findTrackerFromHash( torrent_hash );
    716     tr_free( torrent_hash );
    717     if( t == NULL ) /* tracker has been closed */
    718         return;
    719 
    720     dbgmsg( t, "got response from tracker: \"%s\"",
    721             ( req && req->response_code_line ) ?  req->response_code_line
    722                                                : "(null)" );
    723 
    724     tr_inf( "Torrent \"%s\" tracker response: %s",
    725             t->name,
    726             ( req ? req->response_code_line : "(null)") );
    727 
    728     if( req && ( req->response_code == HTTP_OK ) )
    729     {
    730         benc_val_t benc;
    731         const int bencLoaded = !parseBencResponse( req, &benc );
    732 
    733         publishErrorClear( t );
    734 
    735         if( bencLoaded && benc.type==TYPE_DICT )
     928
     929    tr_dbg( "making a new tracker for \"%s\"", info->primaryAddress );
     930
     931    t = tr_new0( tr_tracker, 1 );
     932    t->handle = torrent->handle;
     933    t->scrapeIntervalSec       = DEFAULT_SCRAPE_INTERVAL_SEC;
     934    t->announceIntervalSec     = DEFAULT_ANNOUNCE_INTERVAL_SEC;
     935    t->announceMinIntervalSec  = DEFAULT_ANNOUNCE_MIN_INTERVAL_SEC;
     936    generateKeyParam( t->key_param, KEYLEN );
     937
     938    t->publisher = tr_publisherNew( );
     939    t->timesDownloaded = -1;
     940    t->seederCount = -1;
     941    t->leecherCount = -1;
     942    t->manualAnnounceAllowedAt = ~(time_t)0;
     943    t->name = tr_strdup( info->name );
     944    memcpy( t->hash, info->hash, SHA_DIGEST_LENGTH );
     945    escape( t->escaped, info->hash, SHA_DIGEST_LENGTH );
     946
     947    for( sum=i=0; i<info->trackerTiers; ++i )
     948         sum += info->trackerList[i].count;
     949    t->addresses = nwalk = tr_new0( tr_tracker_info, sum );
     950    t->addressIndex = 0;
     951    t->addressCount = sum;
     952    t->tierFronts = iwalk = tr_new0( int, sum );
     953
     954    for( i=0; i<info->trackerTiers; ++i )
     955    {
     956        const int tierFront = nwalk - t->addresses;
     957
     958        for( j=0; j<info->trackerList[i].count; ++j )
    736959        {
    737             benc_val_t * tmp;
    738 
    739             if(( tmp = tr_bencDictFind( &benc, "failure reason" ))) {
    740                 dbgmsg( t, "got failure message [%s]", tmp->val.s.s );
    741                 publishErrorMessage( t, tmp->val.s.s );
    742             }
    743 
    744             if(( tmp = tr_bencDictFind( &benc, "warning message" ))) {
    745                 dbgmsg( t, "got warning message [%s]", tmp->val.s.s );
    746                 publishWarning( t, tmp->val.s.s );
    747             }
    748 
    749             if(( tmp = tr_bencDictFind( &benc, "interval" ))) {
    750                 dbgmsg( t, "setting interval to %d", tmp->val.i );
    751                 t->announceIntervalSec = tmp->val.i;
    752             }
    753 
    754             if(( tmp = tr_bencDictFind( &benc, "min interval" ))) {
    755                 dbgmsg( t, "setting min interval to %d", tmp->val.i );
    756                 t->announceMinIntervalSec = tmp->val.i;
    757             }
    758 
    759             if(( tmp = tr_bencDictFind( &benc, "tracker id" )))
    760                 t->trackerID = tr_strndup( tmp->val.s.s, tmp->val.s.i );
    761 
    762             if(( tmp = tr_bencDictFind( &benc, "complete" )))
    763                 t->seederCount = tmp->val.i;
    764 
    765             if(( tmp = tr_bencDictFind( &benc, "incomplete" )))
    766                 t->leecherCount = tmp->val.i;
    767 
    768             if(( tmp = tr_bencDictFind( &benc, "peers" )))
    769             {
    770                 int peerCount = 0;
    771                 uint8_t * peerCompact = NULL;
    772 
    773                 if( tmp->type == TYPE_LIST ) /* original protocol */
    774                 {
    775                     if( tmp->val.l.count > 0 )
    776                         peerCompact = parseOldPeers( tmp, &peerCount );
    777                 }
    778                 else if( tmp->type == TYPE_STR ) /* "compact" extension */
    779                 {
    780                     if( tmp->val.s.i >= 6 )
    781                     {
    782                         peerCount = tmp->val.s.i / 6;
    783                         peerCompact = tr_new( uint8_t, tmp->val.s.i );
    784                         memcpy( peerCompact, tmp->val.s.s, tmp->val.s.i );
    785                     }
    786                 }
    787 
    788                 publishNewPeers( t, peerCount, peerCompact );
    789                 tr_free( peerCompact );
    790             }
     960            const tr_tracker_info * src = &info->trackerList[i].list[j];
     961            nwalk->address = tr_strdup( src->address );
     962            nwalk->port = src->port;
     963            nwalk->announce = tr_strdup( src->announce );
     964            nwalk->scrape = tr_strdup( src->scrape );
     965            ++nwalk;
     966
     967            *iwalk++ = tierFront;
    791968        }
    792 
    793         if( bencLoaded )
    794             tr_bencFree( &benc );
    795     }
    796 
    797     if (( warning = updateAddresses( t, req ) )) {
    798         publishWarning( t, warning );
    799         tr_err( warning );
    800     }
    801 
    802     /**
    803     ***
    804     **/
    805 
    806     responseCode = req ? req->response_code : 503;
    807 
    808     if( 200<=responseCode && responseCode<=299 )
    809     {
    810         dbgmsg( t, "request succeeded. reannouncing in %d seconds",
    811                    t->announceIntervalSec );
    812         t->manualAnnounceAllowedAt = time(NULL)
    813                                    + t->announceMinIntervalSec;
    814         t->reannounceTimer = tr_timerNew( t->handle,
    815                                           onReannounce, t,
    816                                           t->announceIntervalSec * 1000 );
    817     }
    818     else if( 300<=responseCode && responseCode<=399 )
    819     {
    820         dbgmsg( t, "got a redirect; retrying immediately" );
    821 
    822         /* it's a redirect... updateAddresses() has already
    823          * parsed the redirect, all that's left is to retry */
    824         onRetry( t );
    825     }
    826     else if( 400<=responseCode && responseCode<=499 )
    827     {
    828         dbgmsg( t, "got a 4xx error." );
    829 
    830         /* The request could not be understood by the server due to
    831          * malformed syntax. The client SHOULD NOT repeat the
    832          * request without modifications. */
    833         if( req && req->response_code_line )
    834             publishErrorMessage( t, req->response_code_line );
    835         t->manualAnnounceAllowedAt = ~(time_t)0;
    836         t->reannounceTimer = NULL;
    837     }
    838     else if( 500<=responseCode && responseCode<=599 )
    839     {
    840         dbgmsg( t, "Got a 5xx error... retrying in 15 seconds." );
    841 
    842         /* Response status codes beginning with the digit "5" indicate
    843          * cases in which the server is aware that it has erred or is
    844          * incapable of performing the request.  So we pause a bit and
    845          * try again. */
    846         if( req && req->response_code_line )
    847             publishWarning( t, req->response_code_line );
    848         t->manualAnnounceAllowedAt = ~(time_t)0;
    849         t->reannounceTimer = tr_timerNew( t->handle, onRetry, t, 15 * 1000 );
    850     }
    851     else
    852     {
    853         dbgmsg( t, "Invalid response from tracker... retrying in 120 seconds." );
    854 
    855         /* WTF did we get?? */
    856         if( req && req->response_code_line )
    857             publishErrorMessage( t, req->response_code_line );
    858         t->manualAnnounceAllowedAt = ~(time_t)0;
    859         t->reannounceTimer = tr_timerNew( t->handle, onRetry, t, 120 * 1000 );
    860     }
    861 }
    862 
    863 static int
    864 sendTrackerRequest( void * vt, const char * eventName )
    865 {
     969    }
     970
     971    assert( nwalk - t->addresses == sum );
     972    assert( iwalk - t->tierFronts == sum );
     973
     974    if( trackerSupportsScrape( t ) )
     975        enqueueScrape( t->handle, t );
     976
     977    return t;
     978}
     979
     980static void
     981onTrackerFreeNow( void * vt )
     982{
     983    int i;
    866984    tr_tracker * t = vt;
    867     const int isStopping = eventName && !strcmp( eventName, "stopped" );
    868     const tr_tracker_info * address = getCurrentAddress( t );
    869     char * uri;
    870     struct evhttp_connection * evcon;
    871     const tr_torrent * tor;
    872 
    873     tor = tr_torrentFindFromHash( t->handle, t->hash );
    874     if( tor == NULL )
    875         return FALSE;   
    876 
    877     uri = buildTrackerRequestURI( t, tor, eventName );
    878 
    879     tr_inf( "Torrent \"%s\" sending '%s' to tracker %s:%d: %s",
    880             t->name,
    881             (eventName ? eventName : "periodic announce"),
    882             address->address, address->port,
    883             uri );
    884 
    885     /* kill any pending requests */
    886     dbgmsg( t, "clearing announce timer" );
    887     tr_timerFree( &t->reannounceTimer );
    888 
    889     evcon = getConnection( t, address->address, address->port );
    890     if ( !evcon ) {
    891         tr_err( "Can't make a connection to %s:%d", address->address, address->port );
    892         tr_free( uri );
    893     } else {
    894         struct evhttp_request * req;
    895         if( eventName != t->lastRequest ) {
    896             tr_free( t->lastRequest );
    897             t->lastRequest = tr_strdup( eventName );
    898         }
    899         if( isStopping ) {
    900             evhttp_connection_set_timeout( evcon, STOP_TIMEOUT_INTERVAL_SEC );
    901             req = evhttp_request_new( onStoppedResponse, t->handle );
    902         } else {
    903             evhttp_connection_set_timeout( evcon, TIMEOUT_INTERVAL_SEC );
    904             req = evhttp_request_new( onTrackerResponse, torrentHashNew(t) );
    905         }
    906         dbgmsg( t, "sending \"%s\" request to tracker", eventName ? eventName : "reannounce" );
    907 
    908         addCommonHeaders( t, req );
    909         tr_evhttp_make_request( t->handle, evcon,
    910                                 req, EVHTTP_REQ_GET, uri );
    911     }
    912 
    913     return FALSE;
    914 }
    915 
    916 static int
    917 onReannounce( void * vt )
    918 {
    919     tr_tracker * t = vt;
    920     dbgmsg( t, "onReannounce" );
    921     sendTrackerRequest( t, "" );
    922     dbgmsg( t, "onReannounce setting announceTimer to NULL" );
    923     t->reannounceTimer = NULL;
    924     return FALSE;
    925 }
    926 
    927 static int
    928 onRetry( void * vt )
    929 {
    930     tr_tracker * t = vt;
    931     dbgmsg( t, "onRetry" );
    932     sendTrackerRequest( t, t->lastRequest );
    933     dbgmsg( t, "onRetry setting announceTimer to NULL" );
    934     t->reannounceTimer = NULL;
    935     return FALSE;
    936 }
     985
     986    tr_publisherFree( &t->publisher );
     987    tr_free( t->name );
     988    tr_free( t->trackerID );
     989    tr_free( t->lastRequest );
     990
     991    /* addresses... */
     992    for( i=0; i<t->addressCount; ++i )
     993        tr_trackerInfoClear( &t->addresses[i] );
     994    tr_free( t->addresses );
     995    tr_free( t->tierFronts );
     996
     997    /* redirect... */
     998    if( t->redirect ) {
     999        tr_trackerInfoClear( t->redirect );
     1000        tr_free( t->redirect );
     1001    }
     1002
     1003    tr_free( t );
     1004}
     1005
     1006void
     1007tr_trackerFree( tr_tracker * t )
     1008{
     1009    tr_runInEventThread( t->handle, onTrackerFreeNow, t );
     1010}
     1011
    9371012
    9381013/***
     
    9841059}
    9851060
    986 struct request_data
    987 {
    988     tr_tracker * t;
    989     const char * command;
    990 };
    991 
    992 static void
    993 sendRequestFromEventThreadImpl( void * vdata )
    994 {
    995     struct request_data * data = vdata;
    996     sendTrackerRequest( data->t, data->command );
    997     tr_free( data );
    998 }
    999 
    1000 static void
    1001 sendRequestFromEventThread( tr_tracker * t, const char * command )
    1002 {
    1003     struct request_data * data = tr_new( struct request_data, 1 );
    1004     data->t = t;
    1005     data->command = command;
    1006     tr_runInEventThread( t->handle, sendRequestFromEventThreadImpl, data );
    1007 }
    10081061
    10091062void
     
    10111064{
    10121065    tr_peerIdNew( t->peer_id, sizeof(t->peer_id) );
    1013 
    1014     if( !t->reannounceTimer && !t->isRunning )
    1015     {
     1066    if( t->isRunning == 0 ) {
    10161067        t->isRunning = 1;
    1017         sendRequestFromEventThread( t, "started" );
     1068        enqueueRequest( t->handle, t, TR_REQ_STARTED );
    10181069    }
    10191070}
     
    10221073tr_trackerReannounce( tr_tracker * t )
    10231074{
    1024     sendRequestFromEventThread( t, "started" );
     1075    enqueueRequest( t->handle, t, TR_REQ_REANNOUNCE );
    10251076}
    10261077
     
    10281079tr_trackerCompleted( tr_tracker * t )
    10291080{
    1030     sendRequestFromEventThread( t, "completed" );
     1081    enqueueRequest( t->handle, t, TR_REQ_COMPLETED );
    10311082}
    10321083
     
    10341085tr_trackerStop( tr_tracker * t )
    10351086{
    1036     if( t->isRunning )
    1037     {
     1087    if( t->isRunning ) {
    10381088        t->isRunning = 0;
    1039         sendRequestFromEventThread( t, "stopped" );
     1089        enqueueRequest( t->handle, t, TR_REQ_STOPPED );
    10401090    }
    10411091}
  • trunk/libtransmission/transmission.c

    r3989 r4004  
    348348}
    349349
     350#define SHUTDOWN_MAX_SECONDS 30
     351
    350352void
    351353tr_close( tr_handle * h )
    352354{
    353     const int maxwait_msec = 6 * 1000;
     355    const int maxwait_msec = SHUTDOWN_MAX_SECONDS * 1000;
    354356    const uint64_t deadline = tr_date( ) + maxwait_msec;
    355357
  • trunk/libtransmission/trevent.c

    r3884 r4004  
    6464enum mode
    6565{
    66     TR_EV_EVHTTP_MAKE_REQUEST,
    6766    TR_EV_TIMER_ADD,
    6867    TR_EV_EXEC
     
    131130                break;
    132131
    133             case TR_EV_EVHTTP_MAKE_REQUEST:
    134                 evhttp_make_request( cmd->evcon, cmd->req, cmd->evtype, cmd->uri );
    135                 tr_free( cmd->uri );
    136                 break;
    137 
    138132            case TR_EV_EXEC:
    139133                (cmd->func)( cmd->user_data );
     
    241235{
    242236    return tr_amInThread( handle->events->thread );
    243 }
    244 
    245 
    246 void
    247 tr_evhttp_make_request (tr_handle                 * handle,
    248                         struct evhttp_connection  * evcon,
    249                         struct evhttp_request     * req,
    250                         enum   evhttp_cmd_type      type,
    251                         char                      * uri)
    252 {
    253     if( tr_amInThread( handle->events->thread ) ) {
    254         evhttp_make_request( evcon, req, type, uri );
    255         tr_free( uri );
    256     } else {
    257         struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 );
    258         cmd->mode = TR_EV_EVHTTP_MAKE_REQUEST;
    259         cmd->evcon = evcon;
    260         cmd->req = req;
    261         cmd->evtype = type;
    262         cmd->uri = uri;
    263         pushList( handle->events, cmd );
    264     }
    265237}
    266238
  • trunk/libtransmission/trevent.h

    r3833 r4004  
    3232struct bufferevent;
    3333
    34 void tr_evhttp_make_request (struct tr_handle          * tr_handle,
    35                              struct evhttp_connection  * evcon,
    36                              struct evhttp_request     * req,
    37                              enum evhttp_cmd_type        type,
    38                              char                      * uri);
    39 
    4034/**
    4135***
Note: See TracChangeset for help on using the changeset viewer.