source: trunk/libtransmission/webseed.c @ 12918

Last change on this file since 12918 was 12860, checked in by jordan, 10 years ago

(trunk web) small refactoring of webseed's batch processing.

  1. move the callback structs' declarations next to the callback functions where they're used.
  2. rename the callback structs to clarify their link to the callback functions.
  3. constify some of the callback functions' local variables.
  4. clarify some of the comments.
  • Property svn:keywords set to Date Rev Author Id
File size: 17.6 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: webseed.c 12860 2011-09-12 21:46:15Z jordan $
11 */
12
13#include <string.h> /* strlen() */
14
15#include <event2/buffer.h>
16#include <event2/event.h>
17
18#include "transmission.h"
19#include "bandwidth.h"
20#include "cache.h"
21#include "inout.h" /* tr_ioFindFileLocation() */
22#include "list.h"
23#include "peer-mgr.h"
24#include "torrent.h"
25#include "trevent.h" /* tr_runInEventThread() */
26#include "utils.h"
27#include "web.h"
28#include "webseed.h"
29
30struct tr_webseed_task
31{
32    struct evbuffer    * content;
33    struct tr_webseed  * webseed;
34    tr_block_index_t     block;
35    tr_piece_index_t     piece_index;
36    uint32_t             piece_offset;
37    uint32_t             length;
38    tr_block_index_t     blocks_done;
39    uint32_t             block_size;
40    struct tr_web_task * web_task;
41    long                 response_code;
42};
43
44struct tr_webseed
45{
46    tr_peer              parent;
47    tr_bandwidth         bandwidth;
48    tr_session         * session;
49    tr_peer_callback   * callback;
50    void               * callback_data;
51    tr_list            * tasks;
52    struct event       * timer;
53    char               * base_url;
54    size_t               base_url_len;
55    int                  torrent_id;
56    bool                 is_stopping;
57    int                  consecutive_failures;
58    int                  retry_tickcount;
59    int                  retry_challenge;
60    int                  idle_connections;
61    int                  active_transfers;
62    char              ** file_urls;
63};
64
65enum
66{
67    TR_IDLE_TIMER_MSEC = 2000,
68
69    FAILURE_RETRY_INTERVAL = 150,
70
71    MAX_CONSECUTIVE_FAILURES = 5,
72
73    MAX_WEBSEED_CONNECTIONS = 4
74};
75
76static void
77webseed_free( struct tr_webseed * w )
78{
79    tr_torrent * tor = tr_torrentFindFromId( w->session, w->torrent_id );
80    const tr_info * inf = tr_torrentInfo( tor );
81    tr_file_index_t i;
82
83    for( i=0; i<inf->fileCount; ++i )
84        tr_free( w->file_urls[i] );
85    tr_free( w->file_urls );
86
87    /* webseed destruct */
88    event_free( w->timer );
89    tr_bandwidthDestruct( &w->bandwidth );
90    tr_free( w->base_url );
91
92    /* parent class destruct */
93    tr_peerDestruct( tor, &w->parent );
94
95    tr_free( w );
96}
97
98/***
99****
100***/
101
102static void
103publish( tr_webseed * w, tr_peer_event * e )
104{
105    if( w->callback != NULL )
106        w->callback( &w->parent, e, w->callback_data );
107}
108
109static void
110fire_client_got_rejs( tr_torrent * tor, tr_webseed * w,
111                      tr_block_index_t block, tr_block_index_t count )
112{
113    tr_block_index_t i;
114    tr_peer_event e = TR_PEER_EVENT_INIT;
115    e.eventType = TR_PEER_CLIENT_GOT_REJ;
116    tr_torrentGetBlockLocation( tor, block, &e.pieceIndex, &e.offset, &e.length );
117    for( i = 1; i <= count; i++ ) {
118        if( i == count )
119            e.length = tr_torBlockCountBytes( tor, block + count - 1 );
120        publish( w, &e );
121        e.offset += e.length;
122    }
123}
124
125static void
126fire_client_got_blocks( tr_torrent * tor, tr_webseed * w,
127                        tr_block_index_t block, tr_block_index_t count )
128{
129    tr_block_index_t i;
130    tr_peer_event e = TR_PEER_EVENT_INIT;
131    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
132    tr_torrentGetBlockLocation( tor, block, &e.pieceIndex, &e.offset, &e.length );
133    for( i = 1; i <= count; i++ ) {
134        if( i == count )
135            e.length = tr_torBlockCountBytes( tor, block + count - 1 );
136        publish( w, &e );
137        e.offset += e.length;
138    }
139}
140
141static void
142fire_client_got_data( tr_webseed * w, uint32_t length )
143{
144    tr_peer_event e = TR_PEER_EVENT_INIT;
145    e.eventType = TR_PEER_CLIENT_GOT_DATA;
146    e.length = length;
147    e.wasPieceData = true;
148    publish( w, &e );
149}
150
151/***
152****
153***/
154
155struct write_block_data
156{
157    struct tr_webseed  * webseed;
158    struct evbuffer    * content;
159    tr_piece_index_t     piece_index;
160    tr_block_index_t     block_index;
161    tr_block_index_t     count;
162    uint32_t             block_offset;
163};
164
165static void
166write_block_func( void * vdata )
167{
168    struct write_block_data * data = vdata;
169    struct tr_webseed * w = data->webseed;
170    struct evbuffer * buf = data->content;
171    struct tr_torrent * tor;
172
173    tor = tr_torrentFindFromId( w->session, w->torrent_id );
174    if( tor )
175    {
176        const uint32_t block_size = tor->blockSize;
177        uint32_t len = evbuffer_get_length( buf );
178        const uint32_t offset_end = data->block_offset + len;
179        tr_cache * cache = w->session->cache;
180        const tr_piece_index_t piece = data->piece_index;
181
182        while( len > 0 )
183        {
184            const uint32_t bytes_this_pass = MIN( len, block_size );
185            tr_cacheWriteBlock( cache, tor, piece, offset_end - len, bytes_this_pass, buf );
186            len -= bytes_this_pass;
187        }
188
189        fire_client_got_blocks( tor, w, data->block_index, data->count );
190    }
191
192    evbuffer_free( buf );
193    tr_free( data );
194}
195
196/***
197****
198***/
199
200struct connection_succeeded_data
201{
202    struct tr_webseed  * webseed;
203    char               * real_url;
204    tr_piece_index_t     piece_index;
205    uint32_t             piece_offset;
206};
207
208static void
209connection_succeeded( void * vdata )
210{
211    tr_torrent * tor;
212    struct connection_succeeded_data * data = vdata;
213    struct tr_webseed * w = data->webseed;
214
215    if( ++w->active_transfers >= w->retry_challenge && w->retry_challenge )
216        /* the server seems to be accepting more connections now */
217        w->consecutive_failures = w->retry_tickcount = w->retry_challenge = 0;
218
219    if( data->real_url &&
220        (tor = tr_torrentFindFromId( w->session, w->torrent_id )))
221    {
222        uint64_t file_offset;
223        tr_file_index_t file_index;
224
225        tr_ioFindFileLocation( tor, data->piece_index, data->piece_offset,
226                               &file_index, &file_offset );
227        tr_free( w->file_urls[file_index] );
228        w->file_urls[file_index] = data->real_url;
229    }
230}
231
232/***
233****
234***/
235
236static void
237on_content_changed( struct evbuffer                * buf,
238                    const struct evbuffer_cb_info  * info,
239                    void                           * vtask )
240{
241    uint32_t len;
242    const size_t n_added = info->n_added;
243    struct tr_webseed_task * task = vtask;
244    struct tr_webseed * w = task->webseed;
245
246    if( n_added <= 0 )
247        return;
248
249    if( !w->is_stopping )
250    {
251        tr_bandwidthUsed( &w->bandwidth, TR_DOWN, n_added, true, tr_time_msec( ) );
252        fire_client_got_data( w, n_added );
253    }
254
255    len = evbuffer_get_length( buf );
256
257    if( !task->response_code )
258    {
259        tr_webGetTaskInfo( task->web_task, TR_WEB_GET_CODE, &task->response_code );
260
261        if( task->response_code == 206 )
262        {
263            const char * url;
264            struct connection_succeeded_data * data;
265
266            url = NULL;
267            tr_webGetTaskInfo( task->web_task, TR_WEB_GET_REAL_URL, &url );
268
269            data = tr_new( struct connection_succeeded_data, 1 );
270            data->webseed = w;
271            data->real_url = tr_strdup( url );
272            data->piece_index = task->piece_index;
273            data->piece_offset = task->piece_offset
274                               + (task->blocks_done * task->block_size)
275                               + (len - 1);
276
277            /* processing this uses a tr_torrent pointer,
278               so push the work to the libevent thread... */
279            tr_runInEventThread( w->session, connection_succeeded, data );
280        }
281    }
282
283    if( ( task->response_code == 206 ) && ( len >= task->block_size ) )
284    {
285        /* once we've got at least one full block, save it */
286
287        struct write_block_data * data;
288        const uint32_t block_size = task->block_size;
289        const tr_block_index_t completed = len / block_size;
290
291        data = tr_new( struct write_block_data, 1 );
292        data->webseed = task->webseed;
293        data->piece_index = task->piece_index;
294        data->block_index = task->block + task->blocks_done;
295        data->count = completed;
296        data->block_offset = task->piece_offset + task->blocks_done * block_size;
297        data->content = evbuffer_new( );
298
299        /* we don't use locking on this evbuffer so we must copy out the data
300        that will be needed when writing the block in a different thread */
301        evbuffer_remove_buffer( task->content, data->content,
302                                block_size * completed );
303
304        tr_runInEventThread( w->session, write_block_func, data );
305        task->blocks_done += completed;
306    }
307}
308
309static void task_request_next_chunk( struct tr_webseed_task * task );
310
311static bool
312webseed_has_tasks( const tr_webseed * w )
313{
314    return w->tasks != NULL;
315}
316
317
318static void
319on_idle( tr_webseed * w )
320{
321    tr_torrent * tor = tr_torrentFindFromId( w->session, w->torrent_id );
322    int want, running_tasks = tr_list_size( w->tasks );
323
324    if( w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES ) {
325        want = w->idle_connections;
326
327        if( w->retry_tickcount >= FAILURE_RETRY_INTERVAL ) {
328            /* some time has passed since our connection attempts failed. try again */
329            ++want;
330            /* if this challenge is fulfilled we will reset consecutive_failures */
331            w->retry_challenge = running_tasks + want;
332        }
333    }
334    else {
335        want = MAX_WEBSEED_CONNECTIONS - running_tasks;
336        w->retry_challenge = running_tasks + w->idle_connections + 1;
337    }
338
339    if( w->is_stopping && !webseed_has_tasks( w ) )
340    {
341        webseed_free( w );
342    }
343    else if( !w->is_stopping && tor
344                             && tor->isRunning
345                             && !tr_torrentIsSeed( tor )
346                             && want )
347    {
348        int i;
349        int got = 0;
350        tr_block_index_t * blocks = NULL;
351
352        blocks = tr_new( tr_block_index_t, want*2 );
353        tr_peerMgrGetNextRequests( tor, &w->parent, want, blocks, &got, true );
354
355        w->idle_connections -= MIN( w->idle_connections, got );
356        if( w->retry_tickcount >= FAILURE_RETRY_INTERVAL && got == want )
357            w->retry_tickcount = 0;
358
359        for( i=0; i<got; ++i )
360        {
361            const tr_block_index_t b = blocks[i*2];
362            const tr_block_index_t be = blocks[i*2+1];
363            struct tr_webseed_task * task = tr_new( struct tr_webseed_task, 1 );
364            task->webseed = w;
365            task->block = b;
366            task->piece_index = tr_torBlockPiece( tor, b );
367            task->piece_offset = ( tor->blockSize * b )
368                                - ( tor->info.pieceSize * task->piece_index );
369            task->length = (be - b) * tor->blockSize + tr_torBlockCountBytes( tor, be );
370            task->blocks_done = 0;
371            task->response_code = 0;
372            task->block_size = tor->blockSize;
373            task->content = evbuffer_new( );
374            evbuffer_add_cb( task->content, on_content_changed, task );
375            tr_list_append( &w->tasks, task );
376            task_request_next_chunk( task );
377        }
378
379        tr_free( blocks );
380    }
381}
382
383
384static void
385web_response_func( tr_session    * session,
386                   bool            did_connect UNUSED,
387                   bool            did_timeout UNUSED,
388                   long            response_code,
389                   const void    * response UNUSED,
390                   size_t          response_byte_count UNUSED,
391                   void          * vtask )
392{
393    struct tr_webseed_task * t = vtask;
394    tr_webseed * w = t->webseed;
395    tr_torrent * tor = tr_torrentFindFromId( session, w->torrent_id );
396    const int success = ( response_code == 206 );
397
398    if( tor )
399    {
400        /* active_transfers was only increased if the connection was successful */
401        if( t->response_code == 206 )
402            --w->active_transfers;
403
404        if( !success )
405        {
406            const tr_block_index_t blocks_remain = (t->length + tor->blockSize - 1)
407                                                   / tor->blockSize - t->blocks_done;
408
409            if( blocks_remain )
410                fire_client_got_rejs( tor, w, t->block + t->blocks_done, blocks_remain );
411
412            if( t->blocks_done )
413                ++w->idle_connections;
414            else if( ++w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES && !w->retry_tickcount )
415                /* now wait a while until retrying to establish a connection */
416                ++w->retry_tickcount;
417
418            tr_list_remove_data( &w->tasks, t );
419            evbuffer_free( t->content );
420            tr_free( t );
421        }
422        else
423        {
424            const uint32_t bytes_done = t->blocks_done * tor->blockSize;
425            const uint32_t buf_len = evbuffer_get_length( t->content );
426
427            if( bytes_done + buf_len < t->length )
428            {
429                /* request finished successfully but there's still data missing. that
430                means we've reached the end of a file and need to request the next one */
431                t->response_code = 0;
432                task_request_next_chunk( t );
433            }
434            else
435            {
436                if( buf_len ) {
437                    /* on_content_changed() will not write a block if it is smaller than
438                    the torrent's block size, i.e. the torrent's very last block */
439                    tr_cacheWriteBlock( session->cache, tor,
440                                        t->piece_index, t->piece_offset + bytes_done,
441                                        buf_len, t->content );
442
443                    fire_client_got_blocks( tor, t->webseed,
444                                            t->block + t->blocks_done, 1 );
445                }
446
447                ++w->idle_connections;
448
449                tr_list_remove_data( &w->tasks, t );
450                evbuffer_free( t->content );
451                tr_free( t );
452
453                on_idle( w );
454            }
455        }
456    }
457}
458
459static struct evbuffer *
460make_url( tr_webseed * w, const tr_file * file )
461{
462    struct evbuffer * buf = evbuffer_new( );
463
464    evbuffer_add( buf, w->base_url, w->base_url_len );
465
466    /* if url ends with a '/', add the torrent name */
467    if( w->base_url[w->base_url_len - 1] == '/' && file->name )
468        tr_http_escape( buf, file->name, strlen(file->name), false );
469
470    return buf;
471}
472
473static void
474task_request_next_chunk( struct tr_webseed_task * t )
475{
476    tr_webseed * w = t->webseed;
477    tr_torrent * tor = tr_torrentFindFromId( w->session, w->torrent_id );
478    if( tor != NULL )
479    {
480        char range[64];
481        char ** urls = t->webseed->file_urls;
482
483        const tr_info * inf = tr_torrentInfo( tor );
484        const uint32_t remain = t->length - t->blocks_done * tor->blockSize
485                                - evbuffer_get_length( t->content );
486
487        const uint64_t total_offset = inf->pieceSize * t->piece_index
488                                    + t->piece_offset + t->length - remain;
489        const tr_piece_index_t step_piece = total_offset / inf->pieceSize;
490        const uint32_t step_piece_offset
491                               = total_offset - ( inf->pieceSize * step_piece );
492
493        tr_file_index_t file_index;
494        uint64_t file_offset;
495        const tr_file * file;
496        uint32_t this_pass;
497
498        tr_ioFindFileLocation( tor, step_piece, step_piece_offset,
499                                    &file_index, &file_offset );
500        file = &inf->files[file_index];
501        this_pass = MIN( remain, file->length - file_offset );
502
503        if( !urls[file_index] )
504            urls[file_index] = evbuffer_free_to_str( make_url( t->webseed, file ) );
505
506        tr_snprintf( range, sizeof range, "%"PRIu64"-%"PRIu64,
507                     file_offset, file_offset + this_pass - 1 );
508        t->web_task = tr_webRunWithBuffer( w->session, urls[file_index],
509                                           range, NULL, web_response_func, t, t->content );
510    }
511}
512
513bool
514tr_webseedGetSpeed_Bps( const tr_webseed * w, uint64_t now, int * setme_Bps )
515{
516    const bool is_active = webseed_has_tasks( w );
517    *setme_Bps = is_active ? tr_bandwidthGetPieceSpeed_Bps( &w->bandwidth, now, TR_DOWN ) : 0;
518    return is_active;
519}
520
521bool
522tr_webseedIsActive( const tr_webseed * w )
523{
524    int Bps = 0;
525    return tr_webseedGetSpeed_Bps( w, tr_time_msec(), &Bps ) && ( Bps > 0 );
526}
527
528/***
529****
530***/
531
532static void
533webseed_timer_func( evutil_socket_t foo UNUSED, short bar UNUSED, void * vw )
534{
535    tr_webseed * w = vw;
536    if( w->retry_tickcount )
537        ++w->retry_tickcount;
538    on_idle( w );
539    tr_timerAddMsec( w->timer, TR_IDLE_TIMER_MSEC );
540}
541
542tr_webseed*
543tr_webseedNew( struct tr_torrent  * tor,
544               const char         * url,
545               tr_peer_callback   * callback,
546               void               * callback_data )
547{
548    tr_webseed * w = tr_new0( tr_webseed, 1 );
549    tr_peer * peer = &w->parent;
550    const tr_info * inf = tr_torrentInfo( tor );
551
552    /* construct parent class */
553    tr_peerConstruct( peer );
554    peer->peerIsChoked = true;
555    peer->clientIsInterested = !tr_torrentIsSeed( tor );
556    peer->client = tr_strdup( "webseed" );
557    tr_bitfieldSetHasAll( &peer->have );
558    tr_peerUpdateProgress( tor, peer );
559
560    w->torrent_id = tr_torrentId( tor );
561    w->session = tor->session;
562    w->base_url_len = strlen( url );
563    w->base_url = tr_strndup( url, w->base_url_len );
564    w->callback = callback;
565    w->callback_data = callback_data;
566    w->file_urls = tr_new0( char *, inf->fileCount );
567    //tr_rcConstruct( &w->download_rate );
568    tr_bandwidthConstruct( &w->bandwidth, tor->session, &tor->bandwidth );
569    w->timer = evtimer_new( w->session->event_base, webseed_timer_func, w );
570    tr_timerAddMsec( w->timer, TR_IDLE_TIMER_MSEC );
571    return w;
572}
573
574void
575tr_webseedFree( tr_webseed * w )
576{
577    if( w )
578    {
579        if( webseed_has_tasks( w ) )
580            w->is_stopping = true;
581        else
582            webseed_free( w );
583    }
584}
Note: See TracBrowser for help on using the repository browser.