source: trunk/libtransmission/webseed.c

Last change on this file was 14619, checked in by mikedld, 5 years ago

Optionally return result length from evbuffer_free_to_str()

  • Property svn:keywords set to Date Rev Author Id
File size: 17.8 KB
Line 
1/*
2 * This file Copyright (C) 2008-2014 Mnemosyne LLC
3 *
4 * It may be used under the GNU GPL versions 2 or 3
5 * or any future license endorsed by Mnemosyne LLC.
6 *
7 * $Id: webseed.c 14619 2015-12-13 10:23:22Z mikedld $
8 */
9
10#include <string.h> /* strlen () */
11
12#include <event2/buffer.h>
13#include <event2/event.h>
14
15#include "transmission.h"
16#include "bandwidth.h"
17#include "cache.h"
18#include "inout.h" /* tr_ioFindFileLocation () */
19#include "list.h"
20#include "peer-mgr.h"
21#include "torrent.h"
22#include "trevent.h" /* tr_runInEventThread () */
23#include "utils.h"
24#include "web.h"
25#include "webseed.h"
26
27struct tr_webseed_task
28{
29  bool                 dead;
30  struct evbuffer    * content;
31  struct tr_webseed  * webseed;
32  tr_session         * session;
33  tr_block_index_t     block;
34  tr_piece_index_t     piece_index;
35  uint32_t             piece_offset;
36  uint32_t             length;
37  tr_block_index_t     blocks_done;
38  uint32_t             block_size;
39  struct tr_web_task * web_task;
40  long                 response_code;
41};
42
43struct tr_webseed
44{
45  tr_peer              parent;
46  tr_bandwidth         bandwidth;
47  tr_session         * session;
48  tr_peer_callback     callback;
49  void               * callback_data;
50  tr_list            * tasks;
51  struct event       * timer;
52  char               * base_url;
53  size_t               base_url_len;
54  int                  torrent_id;
55  int                  consecutive_failures;
56  int                  retry_tickcount;
57  int                  retry_challenge;
58  int                  idle_connections;
59  int                  active_transfers;
60  char              ** file_urls;
61};
62
63enum
64{
65  TR_IDLE_TIMER_MSEC = 2000,
66
67  FAILURE_RETRY_INTERVAL = 150,
68
69  MAX_CONSECUTIVE_FAILURES = 5,
70
71  MAX_WEBSEED_CONNECTIONS = 4
72};
73
74/***
75****
76***/
77
78static void
79publish (tr_webseed * w, tr_peer_event * e)
80{
81    if (w->callback != NULL)
82        w->callback (&w->parent, e, w->callback_data);
83}
84
85static void
86fire_client_got_rejs (tr_torrent        * tor,
87                      tr_webseed        * w,
88                      tr_block_index_t    block,
89                      tr_block_index_t    count)
90{
91  tr_block_index_t i;
92  tr_peer_event e = TR_PEER_EVENT_INIT;
93  e.eventType = TR_PEER_CLIENT_GOT_REJ;
94  tr_torrentGetBlockLocation (tor, block, &e.pieceIndex, &e.offset, &e.length);
95  for (i = 1; i <= count; i++)
96    {
97      if (i == count)
98        e.length = tr_torBlockCountBytes (tor, block + count - 1);
99      publish (w, &e);
100      e.offset += e.length;
101    }
102}
103
104static void
105fire_client_got_blocks (tr_torrent        * tor,
106                        tr_webseed        * w,
107                        tr_block_index_t    block,
108                        tr_block_index_t    count)
109{
110  tr_block_index_t i;
111  tr_peer_event e = TR_PEER_EVENT_INIT;
112  e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
113  tr_torrentGetBlockLocation (tor, block, &e.pieceIndex, &e.offset, &e.length);
114  for (i = 1; i <= count; i++)
115    {
116      if (i == count)
117        e.length = tr_torBlockCountBytes (tor, block + count - 1);
118      publish (w, &e);
119      e.offset += e.length;
120    }
121}
122
123static void
124fire_client_got_piece_data (tr_webseed * w, uint32_t length)
125{
126  tr_peer_event e = TR_PEER_EVENT_INIT;
127  e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA;
128  e.length = length;
129  publish (w, &e);
130}
131
132/***
133****
134***/
135
136struct write_block_data
137{
138  tr_session         * session;
139  int                  torrent_id;
140  struct tr_webseed  * webseed;
141  struct evbuffer    * content;
142  tr_piece_index_t     piece_index;
143  tr_block_index_t     block_index;
144  tr_block_index_t     count;
145  uint32_t             block_offset;
146};
147
148static void
149write_block_func (void * vdata)
150{
151  struct write_block_data * data = vdata;
152  struct tr_webseed * w = data->webseed;
153  struct evbuffer * buf = data->content;
154  struct tr_torrent * tor;
155
156  tor = tr_torrentFindFromId (data->session, data->torrent_id);
157  if (tor != NULL)
158    {
159      const uint32_t block_size = tor->blockSize;
160      uint32_t len = evbuffer_get_length (buf);
161      const uint32_t offset_end = data->block_offset + len;
162      tr_cache * cache = data->session->cache;
163      const tr_piece_index_t piece = data->piece_index;
164
165      if (!tr_torrentPieceIsComplete (tor, piece))
166        {
167          while (len > 0)
168            {
169              const uint32_t bytes_this_pass = MIN (len, block_size);
170              tr_cacheWriteBlock (cache, tor, piece, offset_end - len, bytes_this_pass, buf);
171              len -= bytes_this_pass;
172            }
173
174          fire_client_got_blocks (tor, w, data->block_index, data->count);
175        }
176    }
177
178  evbuffer_free (buf);
179  tr_free (data);
180}
181
182/***
183****
184***/
185
186struct connection_succeeded_data
187{
188  struct tr_webseed  * webseed;
189  char               * real_url;
190  tr_piece_index_t     piece_index;
191  uint32_t             piece_offset;
192};
193
194static void
195connection_succeeded (void * vdata)
196{
197  tr_torrent * tor;
198  struct connection_succeeded_data * data = vdata;
199  struct tr_webseed * w = data->webseed;
200
201  if (++w->active_transfers >= w->retry_challenge && w->retry_challenge)
202    /* the server seems to be accepting more connections now */
203    w->consecutive_failures = w->retry_tickcount = w->retry_challenge = 0;
204
205  if (data->real_url && (tor = tr_torrentFindFromId (w->session, w->torrent_id)))
206    {
207      uint64_t file_offset;
208      tr_file_index_t file_index;
209
210      tr_ioFindFileLocation (tor, data->piece_index, data->piece_offset,
211                             &file_index, &file_offset);
212      tr_free (w->file_urls[file_index]);
213      w->file_urls[file_index] = data->real_url;
214      data->real_url = NULL;
215    }
216
217  tr_free (data->real_url);
218  tr_free (data);
219}
220
221/***
222****
223***/
224
225static void
226on_content_changed (struct evbuffer                * buf,
227                    const struct evbuffer_cb_info  * info,
228                    void                           * vtask)
229{
230  const size_t n_added = info->n_added;
231  struct tr_webseed_task * task = vtask;
232  tr_session * session = task->session;
233
234  tr_sessionLock (session);
235
236  if (!task->dead && (n_added>0))
237    {
238      uint32_t len;
239      struct tr_webseed * w = task->webseed;
240
241      tr_bandwidthUsed (&w->bandwidth, TR_DOWN, n_added, true, tr_time_msec ());
242      fire_client_got_piece_data (w, n_added);
243      len = evbuffer_get_length (buf);
244
245      if (!task->response_code)
246        {
247          tr_webGetTaskInfo (task->web_task, TR_WEB_GET_CODE, &task->response_code);
248
249          if (task->response_code == 206)
250            {
251              const char * url;
252              struct connection_succeeded_data * data;
253
254              url = NULL;
255              tr_webGetTaskInfo (task->web_task, TR_WEB_GET_REAL_URL, &url);
256
257              data = tr_new (struct connection_succeeded_data, 1);
258              data->webseed = w;
259              data->real_url = tr_strdup (url);
260              data->piece_index = task->piece_index;
261              data->piece_offset = task->piece_offset + (task->blocks_done * task->block_size) + (len - 1);
262
263              /* processing this uses a tr_torrent pointer,
264                 so push the work to the libevent thread... */
265              tr_runInEventThread (w->session, connection_succeeded, data);
266            }
267        }
268
269      if ((task->response_code == 206) && (len >= task->block_size))
270        {
271          /* once we've got at least one full block, save it */
272
273          struct write_block_data * data;
274          const uint32_t block_size = task->block_size;
275          const tr_block_index_t completed = len / block_size;
276
277          data = tr_new (struct write_block_data, 1);
278          data->webseed = task->webseed;
279          data->piece_index = task->piece_index;
280          data->block_index = task->block + task->blocks_done;
281          data->count = completed;
282          data->block_offset = task->piece_offset + task->blocks_done * block_size;
283          data->content = evbuffer_new ();
284          data->torrent_id = w->torrent_id;
285          data->session = w->session;
286
287          /* we don't use locking on this evbuffer so we must copy out the data
288             that will be needed when writing the block in a different thread */
289          evbuffer_remove_buffer (task->content, data->content,
290                                  block_size * completed);
291
292          tr_runInEventThread (w->session, write_block_func, data);
293          task->blocks_done += completed;
294        }
295    }
296
297  tr_sessionUnlock (session);
298}
299
300static void task_request_next_chunk (struct tr_webseed_task * task);
301
302static void
303on_idle (tr_webseed * w)
304{
305  int want;
306  int running_tasks = tr_list_size (w->tasks);
307  tr_torrent * tor = tr_torrentFindFromId (w->session, w->torrent_id);
308
309  if (w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES)
310    {
311      want = w->idle_connections;
312
313      if (w->retry_tickcount >= FAILURE_RETRY_INTERVAL)
314        {
315          /* some time has passed since our connection attempts failed. try again */
316          ++want;
317          /* if this challenge is fulfilled we will reset consecutive_failures */
318          w->retry_challenge = running_tasks + want;
319        }
320    }
321  else
322    {
323      want = MAX_WEBSEED_CONNECTIONS - running_tasks;
324      w->retry_challenge = running_tasks + w->idle_connections + 1;
325    }
326
327  if (tor && tor->isRunning && !tr_torrentIsSeed (tor) && (want > 0))
328    {
329      int i;
330      int got = 0;
331      tr_block_index_t * blocks = NULL;
332
333      blocks = tr_new (tr_block_index_t, want*2);
334      tr_peerMgrGetNextRequests (tor, &w->parent, want, blocks, &got, true);
335
336      w->idle_connections -= MIN (w->idle_connections, got);
337      if (w->retry_tickcount >= FAILURE_RETRY_INTERVAL && got == want)
338        w->retry_tickcount = 0;
339
340      for (i=0; i<got; ++i)
341        {
342          const tr_block_index_t b = blocks[i*2];
343          const tr_block_index_t be = blocks[i*2+1];
344          struct tr_webseed_task * task;
345
346          task = tr_new0 (struct tr_webseed_task, 1);
347          task->session = tor->session;
348          task->webseed = w;
349          task->block = b;
350          task->piece_index = tr_torBlockPiece (tor, b);
351          task->piece_offset = (tor->blockSize * b) - (tor->info.pieceSize * task->piece_index);
352          task->length = (be - b) * tor->blockSize + tr_torBlockCountBytes (tor, be);
353          task->blocks_done = 0;
354          task->response_code = 0;
355          task->block_size = tor->blockSize;
356          task->content = evbuffer_new ();
357          evbuffer_add_cb (task->content, on_content_changed, task);
358          tr_list_append (&w->tasks, task);
359          task_request_next_chunk (task);
360        }
361
362      tr_free (blocks);
363    }
364}
365
366
367static void
368web_response_func (tr_session    * session,
369                   bool            did_connect UNUSED,
370                   bool            did_timeout UNUSED,
371                   long            response_code,
372                   const void    * response UNUSED,
373                   size_t          response_byte_count UNUSED,
374                   void          * vtask)
375{
376  tr_webseed * w;
377  tr_torrent * tor;
378  struct tr_webseed_task * t = vtask;
379  const int success = (response_code == 206);
380
381  if (t->dead)
382    {
383      evbuffer_free (t->content);
384      tr_free (t);
385      return;
386    }
387
388  w = t->webseed;
389  tor = tr_torrentFindFromId (session, w->torrent_id);
390  if (tor != NULL)
391    {
392      /* active_transfers was only increased if the connection was successful */
393      if (t->response_code == 206)
394        --w->active_transfers;
395
396      if (!success)
397        {
398          const tr_block_index_t blocks_remain = (t->length + tor->blockSize - 1)
399                                                   / tor->blockSize - t->blocks_done;
400
401          if (blocks_remain)
402            fire_client_got_rejs (tor, w, t->block + t->blocks_done, blocks_remain);
403
404          if (t->blocks_done)
405            ++w->idle_connections;
406          else if (++w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES && !w->retry_tickcount)
407            /* now wait a while until retrying to establish a connection */
408            ++w->retry_tickcount;
409
410          tr_list_remove_data (&w->tasks, t);
411          evbuffer_free (t->content);
412          tr_free (t);
413        }
414        else
415        {
416          const uint32_t bytes_done = t->blocks_done * tor->blockSize;
417          const uint32_t buf_len = evbuffer_get_length (t->content);
418
419          if (bytes_done + buf_len < t->length)
420            {
421              /* request finished successfully but there's still data missing. that
422                 means we've reached the end of a file and need to request the next one */
423              t->response_code = 0;
424              task_request_next_chunk (t);
425            }
426            else
427            {
428              if (buf_len && !tr_torrentPieceIsComplete (tor, t->piece_index))
429                {
430                  /* on_content_changed () will not write a block if it is smaller than
431                     the torrent's block size, i.e. the torrent's very last block */
432                  tr_cacheWriteBlock (session->cache, tor,
433                                      t->piece_index, t->piece_offset + bytes_done,
434                                      buf_len, t->content);
435
436                  fire_client_got_blocks (tor, t->webseed,
437                                          t->block + t->blocks_done, 1);
438                }
439
440              ++w->idle_connections;
441
442              tr_list_remove_data (&w->tasks, t);
443              evbuffer_free (t->content);
444              tr_free (t);
445
446              on_idle (w);
447            }
448        }
449    }
450}
451
452static struct evbuffer *
453make_url (tr_webseed * w, const tr_file * file)
454{
455  struct evbuffer * buf = evbuffer_new ();
456
457  evbuffer_add (buf, w->base_url, w->base_url_len);
458
459  /* if url ends with a '/', add the torrent name */
460  if (w->base_url[w->base_url_len - 1] == '/' && file->name)
461    tr_http_escape (buf, file->name, strlen (file->name), false);
462
463  return buf;
464}
465
466static void
467task_request_next_chunk (struct tr_webseed_task * t)
468{
469  tr_webseed * w = t->webseed;
470  tr_torrent * tor = tr_torrentFindFromId (w->session, w->torrent_id);
471  if (tor != NULL)
472    {
473      char range[64];
474      char ** urls = t->webseed->file_urls;
475
476      const tr_info * inf = tr_torrentInfo (tor);
477      const uint64_t remain = t->length - t->blocks_done * tor->blockSize
478                            - evbuffer_get_length (t->content);
479
480      const uint64_t total_offset = tr_pieceOffset (tor, t->piece_index,
481                                                         t->piece_offset,
482                                                         t->length - remain);
483      const tr_piece_index_t step_piece = total_offset / inf->pieceSize;
484      const uint64_t step_piece_offset = total_offset - (inf->pieceSize * step_piece);
485
486      tr_file_index_t file_index;
487      const tr_file * file;
488      uint64_t file_offset;
489      uint64_t this_pass;
490
491      tr_ioFindFileLocation (tor, step_piece, step_piece_offset,
492                             &file_index, &file_offset);
493      file = &inf->files[file_index];
494      this_pass = MIN (remain, file->length - file_offset);
495
496      if (!urls[file_index])
497        urls[file_index] = evbuffer_free_to_str (make_url (t->webseed, file), NULL);
498
499      tr_snprintf (range, sizeof range, "%"PRIu64"-%"PRIu64,
500                   file_offset, file_offset + this_pass - 1);
501
502      t->web_task = tr_webRunWebseed (tor, urls[file_index], range,
503                                      web_response_func, t, t->content);
504    }
505}
506
507/***
508****
509***/
510
511static void
512webseed_timer_func (evutil_socket_t foo UNUSED, short bar UNUSED, void * vw)
513{
514  tr_webseed * w = vw;
515
516  if (w->retry_tickcount)
517    ++w->retry_tickcount;
518
519  on_idle (w);
520
521  tr_timerAddMsec (w->timer, TR_IDLE_TIMER_MSEC);
522}
523
524/***
525****  tr_peer virtual functions
526***/
527
528static bool
529webseed_is_transferring_pieces (const tr_peer * peer,
530                                uint64_t        now,
531                                tr_direction    direction,
532                                unsigned int  * setme_Bps)
533{
534  unsigned int Bps = 0;
535  bool is_active = false;
536
537  if (direction == TR_DOWN)
538    {
539      const tr_webseed * w = (const tr_webseed *) peer;
540      is_active = w->tasks != NULL;
541      Bps = tr_bandwidthGetPieceSpeed_Bps (&w->bandwidth, now, direction);
542    }
543
544  if (setme_Bps != NULL)
545    *setme_Bps = Bps;
546
547  return is_active;
548}
549
550static void
551webseed_destruct (tr_peer * peer)
552{
553  tr_list * l;
554  tr_webseed * w = (tr_webseed *) peer;
555
556  /* flag all the pending tasks as dead */
557  for (l=w->tasks; l!=NULL; l=l->next)
558    {
559      struct tr_webseed_task * task = l->data;
560      task->dead = true;
561    }
562  tr_list_free (&w->tasks, NULL);
563
564  /* if we have an array of file URLs, free it */
565  if (w->file_urls != NULL)
566    {
567      tr_file_index_t i;
568      tr_torrent * tor = tr_torrentFindFromId (w->session, w->torrent_id);
569      const tr_info * inf = tr_torrentInfo (tor);
570
571      for (i=0; i<inf->fileCount; ++i)
572        tr_free (w->file_urls[i]);
573      tr_free (w->file_urls);
574    }
575
576  /* webseed destruct */
577  event_free (w->timer);
578  tr_bandwidthDestruct (&w->bandwidth);
579  tr_free (w->base_url);
580
581  /* parent class destruct */
582  tr_peerDestruct (&w->parent);
583}
584
585static const struct tr_peer_virtual_funcs my_funcs =
586{
587  .destruct = webseed_destruct,
588  .is_transferring_pieces = webseed_is_transferring_pieces
589};
590
591/***
592****
593***/
594
595tr_webseed*
596tr_webseedNew (struct tr_torrent  * tor,
597               const char         * url,
598               tr_peer_callback     callback,
599               void               * callback_data)
600{
601  tr_webseed * w = tr_new0 (tr_webseed, 1);
602  tr_peer * peer = &w->parent;
603  const tr_info * inf = tr_torrentInfo (tor);
604
605  /* construct parent class */
606  tr_peerConstruct (peer, tor);
607  peer->client = TR_KEY_webseeds;
608  peer->funcs = &my_funcs;
609  tr_bitfieldSetHasAll (&peer->have);
610  tr_peerUpdateProgress (tor, peer);
611
612  w->torrent_id = tr_torrentId (tor);
613  w->session = tor->session;
614  w->base_url_len = strlen (url);
615  w->base_url = tr_strndup (url, w->base_url_len);
616  w->callback = callback;
617  w->callback_data = callback_data;
618  w->file_urls = tr_new0 (char *, inf->fileCount);
619  //tr_rcConstruct (&w->download_rate);
620  tr_bandwidthConstruct (&w->bandwidth, tor->session, &tor->bandwidth);
621  w->timer = evtimer_new (w->session->event_base, webseed_timer_func, w);
622  tr_timerAddMsec (w->timer, TR_IDLE_TIMER_MSEC);
623  return w;
624}
Note: See TracBrowser for help on using the repository browser.