source: trunk/libtransmission/webseed.c @ 12740

Last change on this file since 12740 was 12740, checked in by jordan, 10 years ago

(trunk libT) #4437 "Multi file webseeds don't work" -- fixed.

  • Property svn:keywords set to Date Rev Author Id
File size: 18.1 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: webseed.c 12740 2011-08-25 09:08:50Z jordan $
11 */
12
13#include <string.h> /* strlen() */
14
15#include <event2/buffer.h>
16#include <event2/event.h>
17
18#include "transmission.h"
19#include "bandwidth.h"
20#include "cache.h"
21#include "inout.h" /* tr_ioFindFileLocation() */
22#include "list.h"
23#include "peer-mgr.h"
24#include "torrent.h"
25#include "trevent.h" /* tr_runInEventThread() */
26#include "utils.h"
27#include "web.h"
28#include "webseed.h"
29
30struct tr_webseed_task
31{
32    tr_session         * session;
33    struct evbuffer    * content;
34    struct tr_webseed  * webseed;
35    tr_block_index_t     block;
36    tr_piece_index_t     piece_index;
37    uint32_t             piece_offset;
38    uint32_t             length;
39    tr_block_index_t     blocks_done;
40    uint32_t             block_size;
41    struct tr_web_task * web_task;
42    long                 response_code;
43    int                  torrent_id;
44    bool                 bad_range;
45};
46
47struct tr_webseed
48{
49    tr_peer              parent;
50    tr_bandwidth         bandwidth;
51    tr_session         * session;
52    tr_peer_callback   * callback;
53    void               * callback_data;
54    tr_list            * tasks;
55    struct event       * timer;
56    char               * base_url;
57    size_t               base_url_len;
58    int                  torrent_id;
59    bool                 is_stopping;
60    int                  consecutive_failures;
61    int                  retry_tickcount;
62    int                  retry_challenge;
63    int                  idle_connections;
64    int                  active_transfers;
65    char              ** file_urls;
66};
67
68struct tr_blockwrite_info
69{
70    struct tr_webseed  * webseed;
71    struct evbuffer    * data;
72    tr_piece_index_t     piece_index;
73    tr_block_index_t     block_index;
74    tr_block_index_t     count;
75    uint32_t             block_offset;
76};
77
78struct tr_http_info
79{
80    struct tr_webseed  * webseed;
81    char               * redirect_url;
82    tr_piece_index_t     piece_index;
83    uint32_t             piece_offset;
84};
85
86enum
87{
88    TR_IDLE_TIMER_MSEC = 2000,
89
90    FAILURE_RETRY_INTERVAL = 150,
91
92    MAX_CONSECUTIVE_FAILURES = 5,
93
94    MAX_WEBSEED_CONNECTIONS = 4
95};
96
97static void
98webseed_free( struct tr_webseed * w )
99{
100    tr_torrent * tor = tr_torrentFindFromId( w->session, w->torrent_id );
101    const tr_info * inf = tr_torrentInfo( tor );
102    tr_file_index_t i;
103
104    for( i = 0; i < inf->fileCount; i++ ) {
105        if( w->file_urls[i] )
106            tr_free( w->file_urls[i] );
107    }
108    tr_free( w->file_urls );
109
110    /* webseed destruct */
111    event_free( w->timer );
112    tr_bandwidthDestruct( &w->bandwidth );
113    tr_free( w->base_url );
114
115    /* parent class destruct */
116    tr_peerDestruct( tor, &w->parent );
117
118    tr_free( w );
119}
120
121/***
122****
123***/
124
125static void
126publish( tr_webseed * w, tr_peer_event * e )
127{
128    if( w->callback != NULL )
129        w->callback( &w->parent, e, w->callback_data );
130}
131
132static void
133fire_client_got_rejs( tr_torrent * tor, tr_webseed * w,
134                      tr_block_index_t block, tr_block_index_t count )
135{
136    tr_block_index_t i;
137    tr_peer_event e = TR_PEER_EVENT_INIT;
138    e.eventType = TR_PEER_CLIENT_GOT_REJ;
139    tr_torrentGetBlockLocation( tor, block, &e.pieceIndex, &e.offset, &e.length );
140    for( i = 1; i <= count; i++ ) {
141        if( i == count )
142            e.length = tr_torBlockCountBytes( tor, block + count - 1 );
143        publish( w, &e );
144        e.offset += e.length;
145    }
146}
147
148static void
149fire_client_got_blocks( tr_torrent * tor, tr_webseed * w,
150                        tr_block_index_t block, tr_block_index_t count )
151{
152    tr_block_index_t i;
153    tr_peer_event e = TR_PEER_EVENT_INIT;
154    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
155    tr_torrentGetBlockLocation( tor, block, &e.pieceIndex, &e.offset, &e.length );
156    for( i = 1; i <= count; i++ ) {
157        if( i == count )
158            e.length = tr_torBlockCountBytes( tor, block + count - 1 );
159        publish( w, &e );
160        e.offset += e.length;
161    }
162}
163
164static void
165fire_client_got_data( tr_webseed * w, uint32_t length )
166{
167    tr_peer_event e = TR_PEER_EVENT_INIT;
168    e.eventType = TR_PEER_CLIENT_GOT_DATA;
169    e.length = length;
170    e.wasPieceData = true;
171    publish( w, &e );
172}
173
174/***
175****
176***/
177
178static void
179write_block_func( void * vblock )
180{
181    struct tr_blockwrite_info * block = vblock;
182    struct tr_webseed * w = block->webseed;
183    struct evbuffer * buf = block->data;
184    struct tr_torrent * tor;
185
186    tor = tr_torrentFindFromId( w->session, w->torrent_id );
187    if( tor )
188    {
189        const uint32_t block_size = tor->blockSize;
190        uint32_t len = evbuffer_get_length( buf );
191        uint32_t offset_end = block->block_offset + len;
192        tr_cache * cache = w->session->cache;
193        tr_piece_index_t piece = block->piece_index;
194
195        while( true )
196        {
197            if( len > block_size) {
198                tr_cacheWriteBlock( cache, tor, piece, offset_end - len,
199                                    block_size, buf );
200                len -= block_size;
201            }
202            else {
203                tr_cacheWriteBlock( cache, tor, piece, offset_end - len,
204                                    len, buf );
205                break;
206            }
207        }
208        fire_client_got_blocks( tor, w, block->block_index, block->count );
209    }
210
211    evbuffer_free( buf );
212    tr_free( block );
213}
214
215static void
216connection_succeeded( void * vinf )
217{
218    struct tr_http_info * inf = vinf;
219    struct tr_webseed * w = inf->webseed;
220    struct tr_torrent * tor;
221
222    if( ++w->active_transfers >= w->retry_challenge && w->retry_challenge )
223        /* the server seems to be accepting more connections now */
224        w->consecutive_failures = w->retry_tickcount = w->retry_challenge = 0;
225
226    if( inf->redirect_url &&
227        (tor = tr_torrentFindFromId( w->session, w->torrent_id )))
228    {
229        uint64_t file_offset;
230        tr_file_index_t file_index;
231
232        tr_ioFindFileLocation( tor, inf->piece_index, inf->piece_offset,
233                               &file_index, &file_offset );
234        tr_free( w->file_urls[file_index] );
235        w->file_urls[file_index] = inf->redirect_url;
236    }
237}
238
239static void
240on_content_changed( struct evbuffer                * buf,
241                    const struct evbuffer_cb_info  * info,
242                    void                           * vtask )
243{
244    struct tr_webseed_task * task = vtask;
245    struct tr_webseed * w = task->webseed;
246    uint32_t len;
247
248    if( info->n_added <= 0 )
249        return;
250
251    if( !w->is_stopping )
252    {
253        tr_bandwidthUsed( &w->bandwidth, TR_DOWN, info->n_added, true, tr_time_msec( ) );
254        fire_client_got_data( w, info->n_added );
255    }
256
257    if( !task->response_code ) {
258        tr_webGetTaskInfo( task->web_task, TR_WEB_GET_CODE, &task->response_code );
259
260        if( task->response_code == 206 ) {
261            struct tr_http_info * inf = tr_new( struct tr_http_info, 1 );
262            long redirects;
263
264            inf->webseed = w;
265            inf->piece_index = task->piece_index;
266            inf->piece_offset = task->piece_offset;
267            tr_webGetTaskInfo( task->web_task, TR_WEB_GET_REDIRECTS, &redirects );
268            if( redirects ) {
269                char * redirect_url;
270                tr_webGetTaskInfo( task->web_task, TR_WEB_GET_REAL_URL, &redirect_url );
271                inf->redirect_url = tr_strdup( redirect_url );
272            }
273            else
274                inf->redirect_url = NULL;
275            /* run this in the webseed thread to avoid tampering with mutexes and to
276            not cost the web thrad too much time */
277            tr_runInEventThread( task->session, connection_succeeded, inf );
278        }
279    }
280
281    len = evbuffer_get_length( buf );
282
283    if( task->response_code == 206 && len >= task->block_size )
284    {
285        /* one (ore more) block(s) received. write to hd */
286        const uint32_t block_size = task->block_size;
287        const tr_block_index_t completed = len / block_size;
288        struct tr_blockwrite_info * b = tr_new( struct tr_blockwrite_info, 1 );
289
290        b->webseed = task->webseed;
291        b->piece_index = task->piece_index;
292        b->block_index = task->block + task->blocks_done;
293        b->count = completed;
294        b->block_offset = task->piece_offset + task->blocks_done * block_size;
295        b->data = evbuffer_new( );
296
297        /* we don't use locking on this evbuffer so we must copy out the data
298        that will be needed when writing the block in a different thread */
299        evbuffer_remove_buffer( task->content, b->data,
300                                block_size * completed );
301
302        tr_runInEventThread( task->session, write_block_func, b );
303        task->blocks_done += completed;
304    }
305}
306
307static void task_request_next_chunk( struct tr_webseed_task * task );
308
309static bool
310webseed_has_tasks( const tr_webseed * w )
311{
312    return w->tasks != NULL;
313}
314
315
316static void
317on_idle( tr_webseed * w )
318{
319    tr_torrent * tor = tr_torrentFindFromId( w->session, w->torrent_id );
320    int want, running_tasks = tr_list_size( w->tasks );
321
322    if( w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES ) {
323        want = w->idle_connections;
324
325        if( w->retry_tickcount >= FAILURE_RETRY_INTERVAL ) {
326            /* some time has passed since our connection attempts failed. try again */
327            ++want;
328            /* if this challenge is fulfilled we will reset consecutive_failures */
329            w->retry_challenge = running_tasks + want;
330        }
331    }
332    else {
333        want = MAX_WEBSEED_CONNECTIONS - running_tasks;
334        w->retry_challenge = running_tasks + w->idle_connections + 1;
335    }
336
337    if( w->is_stopping && !webseed_has_tasks( w ) )
338    {
339        webseed_free( w );
340    }
341    else if( !w->is_stopping && tor
342                             && tor->isRunning
343                             && !tr_torrentIsSeed( tor )
344                             && want )
345    {
346        int i;
347        int got = 0;
348        tr_block_index_t * blocks = NULL;
349
350        blocks = tr_new( tr_block_index_t, want*2 );
351        tr_peerMgrGetNextRequests( tor, &w->parent, want, blocks, &got, true );
352
353        w->idle_connections -= MIN( w->idle_connections, got );
354        if( w->retry_tickcount >= FAILURE_RETRY_INTERVAL && got == want )
355            w->retry_tickcount = 0;
356
357        for( i=0; i<got; ++i )
358        {
359            const tr_block_index_t b = blocks[i*2];
360            const tr_block_index_t be = blocks[i*2+1];
361            struct tr_webseed_task * task = tr_new( struct tr_webseed_task, 1 );
362            task->webseed = w;
363            task->session = w->session;
364            task->torrent_id = w->torrent_id;
365            task->block = b;
366            task->piece_index = tr_torBlockPiece( tor, b );
367            task->piece_offset = ( tor->blockSize * b )
368                                - ( tor->info.pieceSize * task->piece_index );
369            task->length = (be - b) * tor->blockSize + tr_torBlockCountBytes( tor, be );
370            task->blocks_done = 0;
371            task->response_code = 0;
372            task->block_size = tor->blockSize;
373            task->content = evbuffer_new( );
374            evbuffer_add_cb( task->content, on_content_changed, task );
375            tr_list_append( &w->tasks, task );
376            task_request_next_chunk( task );
377        }
378
379        tr_free( blocks );
380    }
381}
382
383
384static void
385web_response_func( tr_session    * session,
386                   bool            did_connect UNUSED,
387                   bool            did_timeout UNUSED,
388                   long            response_code,
389                   const void    * response UNUSED,
390                   size_t          response_byte_count UNUSED,
391                   void          * vtask )
392{
393    struct tr_webseed_task * t = vtask;
394    tr_webseed * w = t->webseed;
395    tr_torrent * tor = tr_torrentFindFromId( session, t->torrent_id );
396    const int success = ( response_code == 206 );
397
398    if( tor )
399    {
400        /* active_transfers was only increased if the connection was successful */
401        if( t->response_code == 206 )
402            --w->active_transfers;
403
404        if( !success )
405        {
406            const tr_block_index_t blocks_remain = (t->length + tor->blockSize - 1)
407                                                   / tor->blockSize - t->blocks_done;
408
409            if( blocks_remain )
410                fire_client_got_rejs( tor, w, t->block + t->blocks_done, blocks_remain );
411
412            if( t->blocks_done )
413                ++w->idle_connections;
414            else if( ++w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES && !w->retry_tickcount )
415                /* now wait a while until retrying to establish a connection */
416                ++w->retry_tickcount;
417
418            t->bad_range = response_code == 416;
419
420            tr_list_remove_data( &w->tasks, t );
421            evbuffer_free( t->content );
422            tr_free( t );
423        }
424        else
425        {
426            const uint32_t bytes_done = t->blocks_done * tor->blockSize;
427            const uint32_t buf_len = evbuffer_get_length( t->content );
428
429            if( bytes_done + buf_len < t->length )
430            {
431                /* request finished successfully but there's still data missing. that
432                means we've reached the end of a file and need to request the next one */
433                t->response_code = 0;
434                task_request_next_chunk( t );
435            }
436            else
437            {
438                if( buf_len ) {
439                    /* on_content_changed() will not write a block if it is smaller than
440                    the torrent's block size, i.e. the torrent's very last block */
441                    tr_cacheWriteBlock( session->cache, tor,
442                                        t->piece_index, t->piece_offset + bytes_done,
443                                        buf_len, t->content );
444
445                    fire_client_got_blocks( tor, t->webseed,
446                                            t->block + t->blocks_done, 1 );
447                }
448
449                ++w->idle_connections;
450
451                tr_list_remove_data( &w->tasks, t );
452                evbuffer_free( t->content );
453                tr_free( t );
454
455                on_idle( w );
456            }
457        }
458    }
459}
460
461static struct evbuffer *
462make_url( struct tr_webseed_task * t, const tr_file * file )
463{
464    struct evbuffer * buf = evbuffer_new( );
465    struct tr_webseed * w = t->webseed;
466
467    evbuffer_add( buf, w->base_url, w->base_url_len );
468
469    if( t->bad_range || ( w->base_url[w->base_url_len - 1] == '/' && file->name ) ) {
470        if( t->bad_range )
471            evbuffer_add( buf, "/", 1 );
472        tr_http_escape( buf, file->name, strlen(file->name), false );
473    }
474
475    return buf;
476}
477
478static void
479task_request_next_chunk( struct tr_webseed_task * t )
480{
481    tr_torrent * tor = tr_torrentFindFromId( t->session, t->torrent_id );
482    if( tor != NULL )
483    {
484        char range[64];
485        char ** urls = t->webseed->file_urls;
486
487        const tr_info * inf = tr_torrentInfo( tor );
488        const uint32_t remain = t->length - t->blocks_done * tor->blockSize
489                                - evbuffer_get_length( t->content );
490
491        const uint64_t total_offset = inf->pieceSize * t->piece_index
492                                    + t->piece_offset + t->length - remain;
493        const tr_piece_index_t step_piece = total_offset / inf->pieceSize;
494        const uint32_t step_piece_offset
495                               = total_offset - ( inf->pieceSize * step_piece );
496
497        tr_file_index_t file_index;
498        uint64_t file_offset;
499        const tr_file * file;
500        uint32_t this_pass;
501
502        tr_ioFindFileLocation( tor, step_piece, step_piece_offset,
503                                    &file_index, &file_offset );
504        file = &inf->files[file_index];
505        this_pass = MIN( remain, file->length - file_offset );
506
507        if( !urls[file_index] )
508            urls[file_index] = evbuffer_free_to_str( make_url( t, file ) );
509
510        tr_snprintf( range, sizeof range, "%"PRIu64"-%"PRIu64,
511                     file_offset, file_offset + this_pass - 1 );
512        t->web_task = tr_webRunWithBuffer( t->session, urls[file_index],
513                                           range, NULL, web_response_func, t, t->content );
514    }
515}
516
517bool
518tr_webseedGetSpeed_Bps( const tr_webseed * w, uint64_t now, int * setme_Bps )
519{
520    const bool is_active = webseed_has_tasks( w );
521    *setme_Bps = is_active ? tr_bandwidthGetPieceSpeed_Bps( &w->bandwidth, now, TR_DOWN ) : 0;
522    return is_active;
523}
524
525bool
526tr_webseedIsActive( const tr_webseed * w )
527{
528    int Bps = 0;
529    return tr_webseedGetSpeed_Bps( w, tr_time_msec(), &Bps ) && ( Bps > 0 );
530}
531
532/***
533****
534***/
535
536static void
537webseed_timer_func( evutil_socket_t foo UNUSED, short bar UNUSED, void * vw )
538{
539    tr_webseed * w = vw;
540    if( w->retry_tickcount )
541        ++w->retry_tickcount;
542    on_idle( w );
543    tr_timerAddMsec( w->timer, TR_IDLE_TIMER_MSEC );
544}
545
546tr_webseed*
547tr_webseedNew( struct tr_torrent  * tor,
548               const char         * url,
549               tr_peer_callback   * callback,
550               void               * callback_data )
551{
552    tr_webseed * w = tr_new0( tr_webseed, 1 );
553    tr_peer * peer = &w->parent;
554    const tr_info * inf = tr_torrentInfo( tor );
555
556    /* construct parent class */
557    tr_peerConstruct( peer );
558    peer->peerIsChoked = true;
559    peer->clientIsInterested = !tr_torrentIsSeed( tor );
560    peer->client = tr_strdup( "webseed" );
561    tr_bitfieldSetHasAll( &peer->have );
562    tr_peerUpdateProgress( tor, peer );
563
564    w->torrent_id = tr_torrentId( tor );
565    w->session = tor->session;
566    w->base_url_len = strlen( url );
567    w->base_url = tr_strndup( url, w->base_url_len );
568    w->callback = callback;
569    w->callback_data = callback_data;
570    w->file_urls = tr_new0( char *, inf->fileCount );
571    //tr_rcConstruct( &w->download_rate );
572    tr_bandwidthConstruct( &w->bandwidth, tor->session, &tor->bandwidth );
573    w->timer = evtimer_new( w->session->event_base, webseed_timer_func, w );
574    tr_timerAddMsec( w->timer, TR_IDLE_TIMER_MSEC );
575    return w;
576}
577
578void
579tr_webseedFree( tr_webseed * w )
580{
581    if( w )
582    {
583        if( webseed_has_tasks( w ) )
584            w->is_stopping = true;
585        else
586            webseed_free( w );
587    }
588}
Note: See TracBrowser for help on using the repository browser.