source: trunk/libtransmission/cache.c @ 13909

Last change on this file since 13909 was 13909, checked in by jordan, 8 years ago

(libT) add assertions to check that block-writing functions are only invoked in the libtransmission thread

  • Property svn:keywords set to Date Rev Author Id
File size: 11.2 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2 (b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: cache.c 13909 2013-01-31 17:39:06Z jordan $
11 */
12
13#include <stdlib.h> /* qsort () */
14
15#include <event2/buffer.h>
16
17#include "transmission.h"
18#include "cache.h"
19#include "inout.h"
20#include "log.h"
21#include "peer-common.h" /* MAX_BLOCK_SIZE */
22#include "ptrarray.h"
23#include "torrent.h"
24#include "trevent.h"
25#include "utils.h"
26
27#define MY_NAME "Cache"
28
29#define dbgmsg(...) \
30  do \
31    { \
32      if (tr_logGetDeepEnabled ()) \
33        tr_logAddDeep (__FILE__, __LINE__, MY_NAME, __VA_ARGS__); \
34    } \
35  while (0)
36
37/****
38*****
39****/
40
41struct cache_block
42{
43  tr_torrent * tor;
44
45  tr_piece_index_t piece;
46  uint32_t offset;
47  uint32_t length;
48
49  time_t time;
50  tr_block_index_t block;
51
52  struct evbuffer * evbuf;
53};
54
55struct tr_cache
56{
57  tr_ptrArray blocks;
58  int max_blocks;
59  size_t max_bytes;
60
61  size_t disk_writes;
62  size_t disk_write_bytes;
63  size_t cache_writes;
64  size_t cache_write_bytes;
65};
66
67/****
68*****
69****/
70
71struct run_info
72{
73  int pos;
74  int rank;
75  time_t last_block_time;
76  bool is_multi_piece;
77  bool is_piece_done;
78  unsigned len;
79};
80
81
82/* return a count of how many contiguous blocks there are starting at this pos */
83static int
84getBlockRun (const tr_cache * cache, int pos, struct run_info * info)
85{
86  int i;
87  const int n = tr_ptrArraySize (&cache->blocks);
88  const struct cache_block ** blocks = (const struct cache_block**) tr_ptrArrayBase (&cache->blocks);
89  const struct cache_block * ref = blocks[pos];
90  tr_block_index_t block = ref->block;
91
92  for (i=pos; i<n; ++i, ++block)
93    {
94      const struct cache_block * b = blocks[i];
95      if (b->block != block)
96        break;
97      if (b->tor != ref->tor)
98        break;
99      //fprintf (stderr, "pos %d tor %d block %zu time %zu\n", i, b->tor->uniqueId, (size_t)b->block, (size_t)b->time);
100    }
101
102  //fprintf (stderr, "run is %d long from [%d to %d)\n", (int)(i-pos), i, (int)pos);
103
104  if (info != NULL)
105    {
106      const struct cache_block * b = blocks[i-1];
107      info->last_block_time = b->time;
108      info->is_piece_done = tr_cpPieceIsComplete (&b->tor->completion, b->piece);
109      info->is_multi_piece = b->piece != blocks[pos]->piece ? true : false;
110      info->len = i - pos;
111      info->pos = pos;
112    }
113
114  return i-pos;
115}
116
117/* higher rank comes before lower rank */
118static int
119compareRuns (const void * va, const void * vb)
120{
121  const struct run_info * a = va;
122  const struct run_info * b = vb;
123  return b->rank - a->rank;
124}
125
126enum
127{
128  MULTIFLAG   = 0x1000,
129  DONEFLAG    = 0x2000,
130  SESSIONFLAG = 0x4000
131};
132
133/* Calculte runs
134 *   - Stale runs, runs sitting in cache for a long time or runs not growing, get priority.
135 *     Returns number of runs.
136 */
137static int
138calcRuns (tr_cache * cache, struct run_info * runs)
139{
140  const int n = tr_ptrArraySize (&cache->blocks);
141  int i = 0, pos;
142  const time_t now = tr_time ();
143
144  for (pos = 0; pos < n; pos += runs[i++].len)
145    {
146      int rank = getBlockRun (cache, pos, &runs[i]);
147
148      /* This adds ~2 to the relative length of a run for every minute it has
149       * languished in the cache. */
150      rank += (now - runs[i].last_block_time) / 32;
151
152      /* Flushing stale blocks should be a top priority as the probability of them
153       * growing is very small, for blocks on piece boundaries, and nonexistant for
154       * blocks inside pieces. */
155      rank |= runs[i].is_piece_done ? DONEFLAG : 0;
156
157      /* Move the multi piece runs higher */
158      rank |= runs[i].is_multi_piece ? MULTIFLAG : 0;
159
160      runs[i].rank = rank;
161
162      //fprintf (stderr,"block run at pos %d of length %d and age %ld adjusted +%d\n",runs[i].pos,runs[i].len,now-runs[i].last_block_time,rank-runs[i].len);
163    }
164
165  //fprintf (stderr, "%d block runs\n", i);
166  qsort (runs, i, sizeof (struct run_info), compareRuns);
167  return i;
168}
169
170static int
171flushContiguous (tr_cache * cache, int pos, int n)
172{
173  int i;
174  int err = 0;
175  uint8_t * buf = tr_new (uint8_t, n * MAX_BLOCK_SIZE);
176  uint8_t * walk = buf;
177  struct cache_block ** blocks = (struct cache_block**) tr_ptrArrayBase (&cache->blocks);
178
179  struct cache_block * b = blocks[pos];
180  tr_torrent * tor = b->tor;
181  const tr_piece_index_t piece = b->piece;
182  const uint32_t offset = b->offset;
183
184  for (i=pos; i<pos+n; ++i)
185    {
186      b = blocks[i];
187      evbuffer_copyout (b->evbuf, walk, b->length);
188      walk += b->length;
189      evbuffer_free (b->evbuf);
190      tr_free (b);
191    }
192  tr_ptrArrayErase (&cache->blocks, pos, pos+n);
193
194  err = tr_ioWrite (tor, piece, offset, walk-buf, buf);
195  tr_free (buf);
196
197  ++cache->disk_writes;
198  cache->disk_write_bytes += walk-buf;
199  return err;
200}
201
202static int
203flushRuns (tr_cache * cache, struct run_info * runs, int n)
204{
205  int i;
206  int err = 0;
207
208  for (i=0; !err && i<n; i++)
209    {
210      int j;
211
212      err = flushContiguous (cache, runs[i].pos, runs[i].len);
213
214      for (j=i+1; j<n; j++)
215        if (runs[j].pos > runs[i].pos)
216          runs[j].pos -= runs[i].len;
217    }
218
219  return err;
220}
221
222static int
223cacheTrim (tr_cache * cache)
224{
225  int err = 0;
226
227  if (tr_ptrArraySize (&cache->blocks) > cache->max_blocks)
228    {
229      /* Amount of cache that should be removed by the flush. This influences how large
230       * runs can grow as well as how often flushes will happen. */
231      const int cacheCutoff = 1 + cache->max_blocks / 4;
232      struct run_info * runs = tr_new (struct run_info, tr_ptrArraySize (&cache->blocks));
233      int i=0, j=0;
234
235      calcRuns (cache, runs);
236      while (j < cacheCutoff)
237        j += runs[i++].len;
238      err = flushRuns (cache, runs, i);
239      tr_free (runs);
240    }
241
242  return err;
243}
244
245/***
246****
247***/
248
249static int
250getMaxBlocks (int64_t max_bytes)
251{
252  return max_bytes / (double)MAX_BLOCK_SIZE;
253}
254
255int
256tr_cacheSetLimit (tr_cache * cache, int64_t max_bytes)
257{
258  char buf[128];
259
260  cache->max_bytes = max_bytes;
261  cache->max_blocks = getMaxBlocks (max_bytes);
262
263  tr_formatter_mem_B (buf, cache->max_bytes, sizeof (buf));
264  tr_logAddNamedDbg (MY_NAME, "Maximum cache size set to %s (%d blocks)", buf, cache->max_blocks);
265
266  return cacheTrim (cache);
267}
268
269int64_t
270tr_cacheGetLimit (const tr_cache * cache)
271{
272  return cache->max_bytes;
273}
274
275tr_cache *
276tr_cacheNew (int64_t max_bytes)
277{
278  tr_cache * cache = tr_new0 (tr_cache, 1);
279  cache->blocks = TR_PTR_ARRAY_INIT;
280  cache->max_bytes = max_bytes;
281  cache->max_blocks = getMaxBlocks (max_bytes);
282  return cache;
283}
284
285void
286tr_cacheFree (tr_cache * cache)
287{
288  assert (tr_ptrArrayEmpty (&cache->blocks));
289  tr_ptrArrayDestruct (&cache->blocks, NULL);
290  tr_free (cache);
291}
292
293/***
294****
295***/
296
297static int
298cache_block_compare (const void * va, const void * vb)
299{
300  const struct cache_block * a = va;
301  const struct cache_block * b = vb;
302
303  /* primary key: torrent id */
304  if (a->tor->uniqueId != b->tor->uniqueId)
305    return a->tor->uniqueId < b->tor->uniqueId ? -1 : 1;
306
307  /* secondary key: block # */
308  if (a->block != b->block)
309    return a->block < b->block ? -1 : 1;
310
311  /* they're equal */
312  return 0;
313}
314
315static struct cache_block *
316findBlock (tr_cache           * cache,
317           tr_torrent         * torrent,
318           tr_piece_index_t     piece,
319           uint32_t             offset)
320{
321  struct cache_block key;
322  key.tor = torrent;
323  key.block = _tr_block (torrent, piece, offset);
324  return tr_ptrArrayFindSorted (&cache->blocks, &key, cache_block_compare);
325}
326
327int
328tr_cacheWriteBlock (tr_cache         * cache,
329                    tr_torrent       * torrent,
330                    tr_piece_index_t   piece,
331                    uint32_t           offset,
332                    uint32_t           length,
333                    struct evbuffer  * writeme)
334{
335  struct cache_block * cb = findBlock (cache, torrent, piece, offset);
336
337  assert (tr_amInEventThread (torrent->session));
338
339  if (cb == NULL)
340    {
341      cb = tr_new (struct cache_block, 1);
342      cb->tor = torrent;
343      cb->piece = piece;
344      cb->offset = offset;
345      cb->length = length;
346      cb->block = _tr_block (torrent, piece, offset);
347      cb->evbuf = evbuffer_new ();
348      tr_ptrArrayInsertSorted (&cache->blocks, cb, cache_block_compare);
349    }
350
351  cb->time = tr_time ();
352
353  assert (cb->length == length);
354  evbuffer_drain (cb->evbuf, evbuffer_get_length (cb->evbuf));
355  evbuffer_remove_buffer (writeme, cb->evbuf, cb->length);
356
357  cache->cache_writes++;
358  cache->cache_write_bytes += cb->length;
359
360  return cacheTrim (cache);
361}
362
363int
364tr_cacheReadBlock (tr_cache         * cache,
365                   tr_torrent       * torrent,
366                   tr_piece_index_t   piece,
367                   uint32_t           offset,
368                   uint32_t           len,
369                   uint8_t          * setme)
370{
371  int err = 0;
372  struct cache_block * cb = findBlock (cache, torrent, piece, offset);
373
374  if (cb)
375    evbuffer_copyout (cb->evbuf, setme, len);
376  else
377    err = tr_ioRead (torrent, piece, offset, len, setme);
378
379  return err;
380}
381
382int
383tr_cachePrefetchBlock (tr_cache         * cache,
384                       tr_torrent       * torrent,
385                       tr_piece_index_t   piece,
386                       uint32_t           offset,
387                       uint32_t           len)
388{
389  int err = 0;
390  struct cache_block * cb = findBlock (cache, torrent, piece, offset);
391
392  if (cb == NULL)
393    err = tr_ioPrefetch (torrent, piece, offset, len);
394
395  return err;
396}
397
398/***
399****
400***/
401
402static int
403findBlockPos (tr_cache * cache, tr_torrent * torrent, tr_piece_index_t block)
404{
405  struct cache_block key;
406  key.tor = torrent;
407  key.block = block;
408  return tr_ptrArrayLowerBound (&cache->blocks, &key, cache_block_compare, NULL);
409}
410
411int tr_cacheFlushDone (tr_cache * cache)
412{
413  int err = 0;
414
415  if (tr_ptrArraySize (&cache->blocks) > 0)
416    {
417      int i, n;
418      struct run_info * runs;
419
420      runs = tr_new (struct run_info, tr_ptrArraySize (&cache->blocks));
421      i = 0;
422      n = calcRuns (cache, runs);
423
424      while (i < n && (runs[i].is_piece_done || runs[i].is_multi_piece))
425        runs[i++].rank |= SESSIONFLAG;
426
427      err = flushRuns (cache, runs, i);
428      tr_free (runs);
429    }
430
431  return err;
432}
433
434int
435tr_cacheFlushFile (tr_cache * cache, tr_torrent * torrent, tr_file_index_t i)
436{
437  int pos;
438  int err = 0;
439  tr_block_index_t first;
440  tr_block_index_t last;
441
442  tr_torGetFileBlockRange (torrent, i, &first, &last);
443  pos = findBlockPos (cache, torrent, first);
444  dbgmsg ("flushing file %d from cache to disk: blocks [%zu...%zu]", (int)i, (size_t)first, (size_t)last);
445
446  /* flush out all the blocks in that file */
447  while (!err && (pos < tr_ptrArraySize (&cache->blocks)))
448    {
449      const struct cache_block * b = tr_ptrArrayNth (&cache->blocks, pos);
450
451      if (b->tor != torrent)
452        break;
453      if ((b->block < first) || (b->block > last))
454        break;
455
456      err = flushContiguous (cache, pos, getBlockRun (cache, pos, NULL));
457    }
458
459    return err;
460}
461
462int
463tr_cacheFlushTorrent (tr_cache * cache, tr_torrent * torrent)
464{
465  int err = 0;
466  const int pos = findBlockPos (cache, torrent, 0);
467
468  /* flush out all the blocks in that torrent */
469  while (!err && (pos < tr_ptrArraySize (&cache->blocks)))
470    {
471      const struct cache_block * b = tr_ptrArrayNth (&cache->blocks, pos);
472
473      if (b->tor != torrent)
474        break;
475
476      err = flushContiguous (cache, pos, getBlockRun (cache, pos, NULL));
477    }
478
479  return err;
480}
Note: See TracBrowser for help on using the repository browser.