source: trunk/libtransmission/webseed.c @ 12555

Last change on this file since 12555 was 12555, checked in by jordan, 10 years ago

(trunk libT) remove trailing spaces from a couple of lines of code

  • Property svn:keywords set to Date Rev Author Id
File size: 17.9 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: webseed.c 12555 2011-07-17 18:11:34Z jordan $
11 */
12
13#include <string.h> /* strlen() */
14
15#include <event2/buffer.h>
16#include <event2/event.h>
17
18#include "transmission.h"
19#include "bandwidth.h"
20#include "cache.h"
21#include "inout.h" /* tr_ioFindFileLocation() */
22#include "list.h"
23#include "peer-mgr.h"
24#include "torrent.h"
25#include "trevent.h" /* tr_runInEventThread() */
26#include "utils.h"
27#include "web.h"
28#include "webseed.h"
29
30struct tr_webseed_task
31{
32    tr_session         * session;
33    struct evbuffer    * content;
34    struct tr_webseed  * webseed;
35    tr_block_index_t     block;
36    tr_piece_index_t     piece_index;
37    uint32_t             piece_offset;
38    uint32_t             length;
39    tr_block_index_t     blocks_done;
40    uint32_t             block_size;
41    struct tr_web_task * web_task;
42    long                 response_code;
43    int                  torrent_id;
44};
45
46struct tr_webseed
47{
48    tr_peer              parent;
49    tr_bandwidth         bandwidth;
50    tr_session         * session;
51    tr_peer_callback   * callback;
52    void               * callback_data;
53    tr_list            * tasks;
54    struct event       * timer;
55    char               * base_url;
56    size_t               base_url_len;
57    int                  torrent_id;
58    bool                 is_stopping;
59    int                  consecutive_failures;
60    int                  retry_tickcount;
61    int                  retry_challenge;
62    int                  idle_connections;
63    int                  active_transfers;
64    char              ** file_urls;
65};
66
67struct tr_blockwrite_info
68{
69    struct tr_webseed  * webseed;
70    struct evbuffer    * data;
71    tr_piece_index_t     piece_index;
72    tr_block_index_t     block_index;
73    tr_block_index_t     count;
74    uint32_t             block_offset;
75};
76
77struct tr_http_info
78{
79    struct tr_webseed  * webseed;
80    char               * redirect_url;
81    tr_piece_index_t     piece_index;
82    uint32_t             piece_offset;
83};
84
85enum
86{
87    TR_IDLE_TIMER_MSEC = 2000,
88
89    FAILURE_RETRY_INTERVAL = 150,
90
91    MAX_CONSECUTIVE_FAILURES = 5,
92
93    MAX_WEBSEED_CONNECTIONS = 4
94};
95
96static void
97webseed_free( struct tr_webseed * w )
98{
99    tr_torrent * tor = tr_torrentFindFromId( w->session, w->torrent_id );
100    const tr_info * inf = tr_torrentInfo( tor );
101    tr_file_index_t i;
102
103    for( i = 0; i < inf->fileCount; i++ ) {
104        if( w->file_urls[i] )
105            tr_free( w->file_urls[i] );
106    }
107    tr_free( w->file_urls );
108
109    /* webseed destruct */
110    event_free( w->timer );
111    tr_bandwidthDestruct( &w->bandwidth );
112    tr_free( w->base_url );
113
114    /* parent class destruct */
115    tr_peerDestruct( tor, &w->parent );
116
117    tr_free( w );
118}
119
120/***
121****
122***/
123
124static void
125publish( tr_webseed * w, tr_peer_event * e )
126{
127    if( w->callback != NULL )
128        w->callback( &w->parent, e, w->callback_data );
129}
130
131static void
132fire_client_got_rejs( tr_torrent * tor, tr_webseed * w,
133                      tr_block_index_t block, tr_block_index_t count )
134{
135    tr_block_index_t i;
136    tr_peer_event e = TR_PEER_EVENT_INIT;
137    e.eventType = TR_PEER_CLIENT_GOT_REJ;
138    tr_torrentGetBlockLocation( tor, block, &e.pieceIndex, &e.offset, &e.length );
139    for( i = 1; i <= count; i++ ) {
140        if( i == count )
141            e.length = tr_torBlockCountBytes( tor, block + count - 1 );
142        publish( w, &e );
143        e.offset += e.length;
144    }
145}
146
147static void
148fire_client_got_blocks( tr_torrent * tor, tr_webseed * w,
149                        tr_block_index_t block, tr_block_index_t count )
150{
151    tr_block_index_t i;
152    tr_peer_event e = TR_PEER_EVENT_INIT;
153    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
154    tr_torrentGetBlockLocation( tor, block, &e.pieceIndex, &e.offset, &e.length );
155    for( i = 1; i <= count; i++ ) {
156        if( i == count )
157            e.length = tr_torBlockCountBytes( tor, block + count - 1 );
158        publish( w, &e );
159        e.offset += e.length;
160    }
161}
162
163static void
164fire_client_got_data( tr_webseed * w, uint32_t length )
165{
166    tr_peer_event e = TR_PEER_EVENT_INIT;
167    e.eventType = TR_PEER_CLIENT_GOT_DATA;
168    e.length = length;
169    e.wasPieceData = true;
170    publish( w, &e );
171}
172
173/***
174****
175***/
176
177static void
178write_block_func( void * vblock )
179{
180    struct tr_blockwrite_info * block = vblock;
181    struct tr_webseed * w = block->webseed;
182    struct evbuffer * buf = block->data;
183    struct tr_torrent * tor;
184
185    tor = tr_torrentFindFromId( w->session, w->torrent_id );
186    if( tor )
187    {
188        const uint32_t block_size = tor->blockSize;
189        uint32_t len = evbuffer_get_length( buf );
190        uint32_t offset_end = block->block_offset + len;
191        tr_cache * cache = w->session->cache;
192        tr_piece_index_t piece = block->piece_index;
193
194        while( true )
195        {
196            if( len > block_size) {
197                tr_cacheWriteBlock( cache, tor, piece, offset_end - len,
198                                    block_size, buf );
199                len -= block_size;
200            }
201            else {
202                tr_cacheWriteBlock( cache, tor, piece, offset_end - len,
203                                    len, buf );
204                break;
205            }
206        }
207        fire_client_got_blocks( tor, w, block->block_index, block->count );
208    }
209
210    evbuffer_free( buf );
211    tr_free( block );
212}
213
214static void
215connection_succeeded( void * vinf )
216{
217    struct tr_http_info * inf = vinf;
218    struct tr_webseed * w = inf->webseed;
219    struct tr_torrent * tor;
220
221    if( ++w->active_transfers >= w->retry_challenge && w->retry_challenge )
222        /* the server seems to be accepting more connections now */
223        w->consecutive_failures = w->retry_tickcount = w->retry_challenge = 0;
224
225    if( inf->redirect_url &&
226        (tor = tr_torrentFindFromId( w->session, w->torrent_id )))
227    {
228        uint64_t file_offset;
229        tr_file_index_t file_index;
230
231        tr_ioFindFileLocation( tor, inf->piece_index, inf->piece_offset,
232                               &file_index, &file_offset );
233        tr_free( w->file_urls[file_index] );
234        w->file_urls[file_index] = inf->redirect_url;
235    }
236}
237
238static void
239on_content_changed( struct evbuffer                * buf,
240                    const struct evbuffer_cb_info  * info,
241                    void                           * vtask )
242{
243    struct tr_webseed_task * task = vtask;
244    struct tr_webseed * w = task->webseed;
245    uint32_t len;
246
247    if( info->n_added <= 0 )
248        return;
249
250    if( !w->is_stopping )
251    {
252        tr_bandwidthUsed( &w->bandwidth, TR_DOWN, info->n_added, true, tr_time_msec( ) );
253        fire_client_got_data( w, info->n_added );
254    }
255
256    if( !task->response_code ) {
257        tr_webGetTaskInfo( task->web_task, TR_WEB_GET_CODE, &task->response_code );
258
259        if( task->response_code == 206 ) {
260            struct tr_http_info * inf = tr_new( struct tr_http_info, 1 );
261            long redirects;
262
263            inf->webseed = w;
264            inf->piece_index = task->piece_index;
265            inf->piece_offset = task->piece_offset;
266            tr_webGetTaskInfo( task->web_task, TR_WEB_GET_REDIRECTS, &redirects );
267            if( redirects ) {
268                char * redirect_url;
269                tr_webGetTaskInfo( task->web_task, TR_WEB_GET_REAL_URL, &redirect_url );
270                inf->redirect_url = tr_strdup( redirect_url );
271            }
272            else
273                inf->redirect_url = NULL;
274            /* run this in the webseed thread to avoid tampering with mutexes and to
275            not cost the web thrad too much time */
276            tr_runInEventThread( task->session, connection_succeeded, inf );
277        }
278    }
279
280    len = evbuffer_get_length( buf );
281
282    if( task->response_code == 206 && len >= task->block_size )
283    {
284        /* one (ore more) block(s) received. write to hd */
285        const uint32_t block_size = task->block_size;
286        const tr_block_index_t completed = len / block_size;
287        struct tr_blockwrite_info * b = tr_new( struct tr_blockwrite_info, 1 );
288
289        b->webseed = task->webseed;
290        b->piece_index = task->piece_index;
291        b->block_index = task->block + task->blocks_done;
292        b->count = completed;
293        b->block_offset = task->piece_offset + task->blocks_done * block_size;
294        b->data = evbuffer_new( );
295
296        /* we don't use locking on this evbuffer so we must copy out the data
297        that will be needed when writing the block in a different thread */
298        evbuffer_remove_buffer( task->content, b->data,
299                                block_size * completed );
300
301        tr_runInEventThread( task->session, write_block_func, b );
302        task->blocks_done += completed;
303    }
304}
305
306static void task_request_next_chunk( struct tr_webseed_task * task );
307
308static bool
309webseed_has_tasks( const tr_webseed * w )
310{
311    return w->tasks != NULL;
312}
313
314
315static void
316on_idle( tr_webseed * w )
317{
318    tr_torrent * tor = tr_torrentFindFromId( w->session, w->torrent_id );
319    int want, running_tasks = tr_list_size( w->tasks );
320
321    if( w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES ) {
322        want = w->idle_connections;
323
324        if( w->retry_tickcount >= FAILURE_RETRY_INTERVAL ) {
325            /* some time has passed since our connection attempts failed. try again */
326            ++want;
327            /* if this challenge is fulfilled we will reset consecutive_failures */
328            w->retry_challenge = running_tasks + want;
329        }
330    }
331    else {
332        want = MAX_WEBSEED_CONNECTIONS - running_tasks;
333        w->retry_challenge = running_tasks + w->idle_connections + 1;
334    }
335
336    if( w->is_stopping && !webseed_has_tasks( w ) )
337    {
338        webseed_free( w );
339    }
340    else if( !w->is_stopping && tor
341                             && tor->isRunning
342                             && !tr_torrentIsSeed( tor )
343                             && want )
344    {
345        int i;
346        int got = 0;
347        tr_block_index_t * blocks = NULL;
348
349        blocks = tr_new( tr_block_index_t, want*2 );
350        tr_peerMgrGetNextRequests( tor, &w->parent, want, blocks, &got, true );
351
352        w->idle_connections -= MIN( w->idle_connections, got );
353        if( w->retry_tickcount >= FAILURE_RETRY_INTERVAL && got == want )
354            w->retry_tickcount = 0;
355
356        for( i=0; i<got; ++i )
357        {
358            const tr_block_index_t b = blocks[i*2];
359            const tr_block_index_t be = blocks[i*2+1];
360            struct tr_webseed_task * task = tr_new( struct tr_webseed_task, 1 );
361            task->webseed = w;
362            task->session = w->session;
363            task->torrent_id = w->torrent_id;
364            task->block = b;
365            task->piece_index = tr_torBlockPiece( tor, b );
366            task->piece_offset = ( tor->blockSize * b )
367                                - ( tor->info.pieceSize * task->piece_index );
368            task->length = (be - b) * tor->blockSize + tr_torBlockCountBytes( tor, be );
369            task->blocks_done = 0;
370            task->response_code = 0;
371            task->block_size = tor->blockSize;
372            task->content = evbuffer_new( );
373            evbuffer_add_cb( task->content, on_content_changed, task );
374            tr_list_append( &w->tasks, task );
375            task_request_next_chunk( task );
376        }
377
378        tr_free( blocks );
379    }
380}
381
382
383static void
384web_response_func( tr_session    * session,
385                   bool            did_connect UNUSED,
386                   bool            did_timeout UNUSED,
387                   long            response_code,
388                   const void    * response UNUSED,
389                   size_t          response_byte_count UNUSED,
390                   void          * vtask )
391{
392    struct tr_webseed_task * t = vtask;
393    tr_webseed * w = t->webseed;
394    tr_torrent * tor = tr_torrentFindFromId( session, t->torrent_id );
395    const int success = ( response_code == 206 );
396
397    if( tor )
398    {
399        /* active_transfers was only increased if the connection was successful */
400        if( t->response_code == 206 )
401            --w->active_transfers;
402
403        if( !success )
404        {
405            const tr_block_index_t blocks_remain = (t->length + tor->blockSize - 1)
406                                                   / tor->blockSize - t->blocks_done;
407
408            if( blocks_remain )
409                fire_client_got_rejs( tor, w, t->block + t->blocks_done, blocks_remain );
410
411            if( t->blocks_done )
412                ++w->idle_connections;
413            else if( ++w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES && !w->retry_tickcount )
414                /* now wait a while until retrying to establish a connection */
415                ++w->retry_tickcount;
416
417            tr_list_remove_data( &w->tasks, t );
418            evbuffer_free( t->content );
419            tr_free( t );
420        }
421        else
422        {
423            const uint32_t bytes_done = t->blocks_done * tor->blockSize;
424            const uint32_t buf_len = evbuffer_get_length( t->content );
425
426            if( bytes_done + buf_len < t->length )
427            {
428                /* request finished successfully but there's still data missing. that
429                means we've reached the end of a file and need to request the next one */
430                t->response_code = 0;
431                task_request_next_chunk( t );
432            }
433            else
434            {
435                if( buf_len ) {
436                    /* on_content_changed() will not write a block if it is smaller than
437                    the torrent's block size, i.e. the torrent's very last block */
438                    tr_cacheWriteBlock( session->cache, tor,
439                                        t->piece_index, t->piece_offset + bytes_done,
440                                        buf_len, t->content );
441
442                    fire_client_got_blocks( tor, t->webseed,
443                                            t->block + t->blocks_done, 1 );
444                }
445
446                ++w->idle_connections;
447
448                tr_list_remove_data( &w->tasks, t );
449                evbuffer_free( t->content );
450                tr_free( t );
451
452                on_idle( w );
453            }
454        }
455    }
456}
457
458static struct evbuffer *
459make_url( tr_webseed * w, const tr_file * file )
460{
461    struct evbuffer * buf = evbuffer_new( );
462
463    evbuffer_add( buf, w->base_url, w->base_url_len );
464
465    /* if url ends with a '/', add the torrent name */
466    if( w->base_url[w->base_url_len - 1] == '/' && file->name )
467        tr_http_escape( buf, file->name, strlen(file->name), false );
468
469    return buf;
470}
471
472static void
473task_request_next_chunk( struct tr_webseed_task * t )
474{
475    tr_torrent * tor = tr_torrentFindFromId( t->session, t->torrent_id );
476    if( tor != NULL )
477    {
478        char range[64];
479        char ** urls = t->webseed->file_urls;
480
481        const tr_info * inf = tr_torrentInfo( tor );
482        const uint32_t remain = t->length - t->blocks_done * tor->blockSize
483                                - evbuffer_get_length( t->content );
484
485        const uint64_t total_offset = inf->pieceSize * t->piece_index
486                                    + t->piece_offset + t->length - remain;
487        const tr_piece_index_t step_piece = total_offset / inf->pieceSize;
488        const uint32_t step_piece_offset
489                               = total_offset - ( inf->pieceSize * step_piece );
490
491        tr_file_index_t file_index;
492        uint64_t file_offset;
493        const tr_file * file;
494        uint32_t this_pass;
495
496        tr_ioFindFileLocation( tor, step_piece, step_piece_offset,
497                                    &file_index, &file_offset );
498        file = &inf->files[file_index];
499        this_pass = MIN( remain, file->length - file_offset );
500
501        if( !urls[file_index] )
502            urls[file_index] = evbuffer_free_to_str( make_url( t->webseed, file ) );
503
504        tr_snprintf( range, sizeof range, "%"PRIu64"-%"PRIu64,
505                     file_offset, file_offset + this_pass - 1 );
506        t->web_task = tr_webRunWithBuffer( t->session, urls[file_index],
507                                           range, NULL, web_response_func, t, t->content );
508    }
509}
510
511bool
512tr_webseedGetSpeed_Bps( const tr_webseed * w, uint64_t now, int * setme_Bps )
513{
514    const bool is_active = webseed_has_tasks( w );
515    *setme_Bps = is_active ? tr_bandwidthGetPieceSpeed_Bps( &w->bandwidth, now, TR_DOWN ) : 0;
516    return is_active;
517}
518
519bool
520tr_webseedIsActive( const tr_webseed * w )
521{
522    int Bps = 0;
523    return tr_webseedGetSpeed_Bps( w, tr_time_msec(), &Bps ) && ( Bps > 0 );
524}
525
526/***
527****
528***/
529
530static void
531webseed_timer_func( evutil_socket_t foo UNUSED, short bar UNUSED, void * vw )
532{
533    tr_webseed * w = vw;
534    if( w->retry_tickcount )
535        ++w->retry_tickcount;
536    on_idle( w );
537    tr_timerAddMsec( w->timer, TR_IDLE_TIMER_MSEC );
538}
539
540tr_webseed*
541tr_webseedNew( struct tr_torrent  * tor,
542               const char         * url,
543               tr_peer_callback   * callback,
544               void               * callback_data )
545{
546    tr_webseed * w = tr_new0( tr_webseed, 1 );
547    tr_peer * peer = &w->parent;
548    const tr_info * inf = tr_torrentInfo( tor );
549
550    /* construct parent class */
551    tr_peerConstruct( peer );
552    peer->peerIsChoked = true;
553    peer->clientIsInterested = !tr_torrentIsSeed( tor );
554    peer->client = tr_strdup( "webseed" );
555    tr_bitfieldSetHasAll( &peer->have );
556    tr_peerUpdateProgress( tor, peer );
557
558    w->torrent_id = tr_torrentId( tor );
559    w->session = tor->session;
560    w->base_url_len = strlen( url );
561    w->base_url = tr_strndup( url, w->base_url_len );
562    w->callback = callback;
563    w->callback_data = callback_data;
564    w->file_urls = tr_new0( char *, inf->fileCount );
565    //tr_rcConstruct( &w->download_rate );
566    tr_bandwidthConstruct( &w->bandwidth, tor->session, &tor->bandwidth );
567    w->timer = evtimer_new( w->session->event_base, webseed_timer_func, w );
568    tr_timerAddMsec( w->timer, TR_IDLE_TIMER_MSEC );
569    return w;
570}
571
572void
573tr_webseedFree( tr_webseed * w )
574{
575    if( w )
576    {
577        if( webseed_has_tasks( w ) )
578            w->is_stopping = true;
579        else
580            webseed_free( w );
581    }
582}
Note: See TracBrowser for help on using the repository browser.