source: trunk/libtransmission/cache.c @ 11782

Last change on this file since 11782 was 11782, checked in by jordan, 11 years ago

(trunk libT) memory cache should use evbuffers to avoid unnecessary calls to memcpy -- done.

  • Property svn:keywords set to Date Rev Author Id
File size: 12.3 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: cache.c 11782 2011-01-29 18:56:53Z jordan $
11 */
12
13#include <event2/buffer.h>
14
15#include "transmission.h"
16#include "cache.h"
17#include "inout.h"
18#include "peer-common.h" /* MAX_BLOCK_SIZE */
19#include "ptrarray.h"
20#include "torrent.h"
21#include "utils.h"
22
23#define MY_NAME "Cache"
24
25#define dbgmsg( ... ) \
26    do { \
27        if( tr_deepLoggingIsActive( ) ) \
28            tr_deepLog( __FILE__, __LINE__, MY_NAME, __VA_ARGS__ ); \
29    } while( 0 )
30
31
32/****
33*****
34****/
35
36struct cache_block
37{
38    tr_torrent * tor;
39
40    tr_piece_index_t piece;
41    uint32_t offset;
42    uint32_t length;
43
44    time_t time;
45    tr_block_index_t block;
46
47    struct evbuffer * evbuf;
48};
49
50struct tr_cache
51{
52    tr_ptrArray blocks;
53    int max_blocks;
54    size_t max_bytes;
55
56    size_t disk_writes;
57    size_t disk_write_bytes;
58    size_t cache_writes;
59    size_t cache_write_bytes;
60};
61
62/****
63*****
64****/
65
66struct run_info
67{
68  int       pos;
69  int       rank;
70  time_t    last_block_time;
71  tr_bool   is_multi_piece;
72  tr_bool   is_piece_done;
73  unsigned  len;
74};
75
76
77/* return a count of how many contiguous blocks there are starting at this pos */
78static int
79getBlockRun( const tr_cache * cache, int pos, struct run_info * info )
80{
81    int i;
82    const int n = tr_ptrArraySize( &cache->blocks );
83    const struct cache_block ** blocks = (const struct cache_block**) tr_ptrArrayBase( &cache->blocks );
84    const struct cache_block * ref = blocks[pos];
85    tr_block_index_t block = ref->block;
86
87    for( i=pos; i<n; ++i, ++block ) {
88        const struct cache_block * b = blocks[i];
89        if( b->block != block ) break;
90        if( b->tor != ref->tor ) break;
91//fprintf( stderr, "pos %d tor %d block %zu time %zu\n", i, b->tor->uniqueId, (size_t)b->block, (size_t)b->time );
92    }
93
94//fprintf( stderr, "run is %d long from [%d to %d)\n", (int)(i-pos), i, (int)pos );
95    if( info != NULL ) {
96        const struct cache_block * b = blocks[i-1];
97        info->last_block_time = b->time;
98        info->is_piece_done = tr_cpPieceIsComplete( &b->tor->completion, b->piece );
99        info->is_multi_piece = b->piece != blocks[pos]->piece ? TRUE : FALSE;
100        info->len = i - pos;
101        info->pos = pos;
102    }
103
104    return i-pos;
105}
106
107static int
108compareRuns( const void * va, const void * vb )
109{
110    const struct run_info a = *(const struct run_info*)va;
111    const struct run_info b = *(const struct run_info*)vb;
112    return b.rank - a.rank;
113}
114
115enum
116{
117    MULTIFLAG   = 0x1000,
118    DONEFLAG    = 0x2000,
119    SESSIONFLAG = 0x4000
120};
121/* Calculte runs
122 *   - Stale runs, runs sitting in cache for a long time or runs not growing, get priority.
123 *     Returns number of runs.
124 */
125static int
126calcRuns( tr_cache * cache, struct run_info * runs )
127{
128    const int n = tr_ptrArraySize( &cache->blocks );
129    int i = 0, pos;
130    const time_t now = tr_time();
131
132    for( pos = 0; pos < n; pos += runs[i++].len )
133    {
134        int rank = getBlockRun( cache, pos, &runs[i] );
135
136        /* This adds ~2 to the relative length of a run for every minute it has
137         * languished in the cache. */
138        rank += ( now - runs[i].last_block_time ) / 32;
139
140        /* Flushing stale blocks should be a top priority as the probability of them
141         * growing is very small, for blocks on piece boundaries, and nonexistant for
142         * blocks inside pieces. */
143        rank |= runs[i].is_piece_done ? DONEFLAG : 0;
144
145        /* Move the multi piece runs higher */
146        rank |= runs[i].is_multi_piece ? MULTIFLAG : 0;
147
148        runs[i].rank = rank;
149//fprintf(stderr,"block run at pos %d of length %d and age %ld adjusted +%d\n",runs[i].pos,runs[i].len,now-runs[i].last_block_time,rank-runs[i].len);
150    }
151
152    //fprintf( stderr, "%d block runs\n", i );
153    qsort( runs, i, sizeof( struct run_info ), compareRuns );
154    return i;
155}
156
157static int
158flushContiguous( tr_cache * cache, int pos, int n )
159{
160    int i;
161    int err = 0;
162    uint8_t * buf = tr_new( uint8_t, n * MAX_BLOCK_SIZE );
163    uint8_t * walk = buf;
164    struct cache_block ** blocks = (struct cache_block**) tr_ptrArrayBase( &cache->blocks );
165
166    struct cache_block * b = blocks[pos];
167    tr_torrent * tor             = b->tor;
168    const tr_piece_index_t piece = b->piece;
169    const uint32_t offset        = b->offset;
170
171//fprintf( stderr, "flushing %d contiguous blocks [%d-%d) from cache to disk\n", n, pos, n+pos );
172
173    for( i=pos; i<pos+n; ++i ) {
174        b = blocks[i];
175        evbuffer_copyout( b->evbuf, walk, b->length );
176        walk += b->length;
177        evbuffer_free( b->evbuf );
178        tr_free( b );
179    }
180    tr_ptrArrayErase( &cache->blocks, pos, pos+n );
181
182#if 0
183    tr_tordbg( tor, "Writing to disk piece %d, offset %d, len %d", (int)piece, (int)offset, (int)(walk-buf) );
184    tr_ndbg( MY_NAME, "Removing %d blocks from cache, rank: %d - %d left", n, rank, tr_ptrArraySize(&cache->blocks) );
185    fprintf( stderr, "%s - Writing to disk piece %d, offset %d, len %d\n", tr_torrentName(tor), (int)piece, (int)offset, (int)(walk-buf) );
186    fprintf( stderr, "%s - Removing %d blocks from cache; %d left\n", MY_NAME, n, tr_ptrArraySize(&cache->blocks) );
187#endif
188
189    err = tr_ioWrite( tor, piece, offset, walk-buf, buf );
190    tr_free( buf );
191
192    ++cache->disk_writes;
193    cache->disk_write_bytes += walk-buf;
194    return err;
195}
196
197static int
198flushRuns( tr_cache * cache, struct run_info * runs, int n )
199{
200    int i, j, err = 0;
201
202    for( i = 0; !err && i < n; ++i )
203    {
204        err = flushContiguous( cache, runs[i].pos, runs[i].len );
205        for( j = i + 1; j < n; ++j )
206            if( runs[j].pos > runs[i].pos )
207                runs[j].pos -= runs[i].len;
208    }
209
210    return err;
211}
212
213static int
214cacheTrim( tr_cache * cache )
215{
216    int err = 0;
217
218    if( tr_ptrArraySize( &cache->blocks ) > cache->max_blocks )
219    {
220        /* Amount of cache that should be removed by the flush. This influences how large
221         * runs can grow as well as how often flushes will happen. */
222        const int cacheCutoff = 1 + cache->max_blocks / 4;
223        struct run_info * runs = tr_new( struct run_info, tr_ptrArraySize( &cache->blocks ) );
224        int i = 0, j = 0;
225
226        calcRuns( cache, runs );
227        while( j < cacheCutoff )
228            j += runs[i++].len;
229        err = flushRuns( cache, runs, i );
230        tr_free( runs );
231    }
232
233    return err;
234}
235
236/***
237****
238***/
239
240static int
241getMaxBlocks( int64_t max_bytes )
242{
243    return max_bytes / (double)MAX_BLOCK_SIZE;
244}
245
246int
247tr_cacheSetLimit( tr_cache * cache, int64_t max_bytes )
248{
249    char buf[128];
250
251    cache->max_bytes = max_bytes;
252    cache->max_blocks = getMaxBlocks( max_bytes );
253
254    tr_formatter_mem_B( buf, cache->max_bytes, sizeof( buf ) );
255    tr_ndbg( MY_NAME, "Maximum cache size set to %s (%d blocks)", buf, cache->max_blocks );
256
257    return cacheTrim( cache );
258}
259
260int64_t
261tr_cacheGetLimit( const tr_cache * cache )
262{
263    return cache->max_bytes;
264}
265
266tr_cache *
267tr_cacheNew( int64_t max_bytes )
268{
269    tr_cache * cache = tr_new0( tr_cache, 1 );
270    cache->blocks = TR_PTR_ARRAY_INIT;
271    cache->max_bytes = max_bytes;
272    cache->max_blocks = getMaxBlocks( max_bytes );
273    return cache;
274}
275
276void
277tr_cacheFree( tr_cache * cache )
278{
279    assert( tr_ptrArrayEmpty( &cache->blocks ) );
280    tr_ptrArrayDestruct( &cache->blocks, NULL );
281    tr_free( cache );
282}
283
284/***
285****
286***/
287
288static int
289cache_block_compare( const void * va, const void * vb )
290{
291    const struct cache_block * a = va;
292    const struct cache_block * b = vb;
293
294    /* primary key: torrent id */
295    if( a->tor->uniqueId != b->tor->uniqueId )
296        return a->tor->uniqueId < b->tor->uniqueId ? -1 : 1;
297
298    /* secondary key: block # */
299    if( a->block != b->block )
300        return a->block < b->block ? -1 : 1;
301
302    /* they're equal */
303    return 0;
304}
305
306static struct cache_block *
307findBlock( tr_cache           * cache,
308           tr_torrent         * torrent,
309           tr_piece_index_t     piece,
310           uint32_t             offset )
311{
312    struct cache_block key;
313    key.tor = torrent;
314    key.block = _tr_block( torrent, piece, offset );
315    return tr_ptrArrayFindSorted( &cache->blocks, &key, cache_block_compare );
316}
317
318int
319tr_cacheWriteBlock( tr_cache         * cache,
320                    tr_torrent       * torrent,
321                    tr_piece_index_t   piece,
322                    uint32_t           offset,
323                    uint32_t           length,
324                    struct evbuffer  * writeme )
325{
326    struct cache_block * cb = findBlock( cache, torrent, piece, offset );
327
328    if( cb == NULL )
329    {
330        cb = tr_new( struct cache_block, 1 );
331        cb->tor = torrent;
332        cb->piece = piece;
333        cb->offset = offset;
334        cb->length = length;
335        cb->block = _tr_block( torrent, piece, offset );
336        cb->evbuf = evbuffer_new( );
337        tr_ptrArrayInsertSorted( &cache->blocks, cb, cache_block_compare );
338    }
339
340    cb->time = tr_time();
341
342    assert( cb->length == length );
343    evbuffer_drain( cb->evbuf, evbuffer_get_length( cb->evbuf ) );
344    evbuffer_remove_buffer( writeme, cb->evbuf, cb->length );
345
346    ++cache->cache_writes;
347    cache->cache_write_bytes += cb->length;
348
349    return cacheTrim( cache );
350}
351
352int
353tr_cacheReadBlock( tr_cache         * cache,
354                   tr_torrent       * torrent,
355                   tr_piece_index_t   piece,
356                   uint32_t           offset,
357                   uint32_t           len,
358                   uint8_t          * setme )
359{
360    int err = 0;
361    struct cache_block * cb = findBlock( cache, torrent, piece, offset );
362
363    if( cb )
364        evbuffer_copyout( cb->evbuf, setme, len );
365    else
366        err = tr_ioRead( torrent, piece, offset, len, setme );
367
368    return err;
369}
370
371int
372tr_cachePrefetchBlock( tr_cache         * cache,
373                       tr_torrent       * torrent,
374                       tr_piece_index_t   piece,
375                       uint32_t           offset,
376                       uint32_t           len )
377{
378    int err = 0;
379    struct cache_block * cb = findBlock( cache, torrent, piece, offset );
380
381    if( cb == NULL )
382        err = tr_ioPrefetch( torrent, piece, offset, len );
383
384    return err;
385}
386
387/***
388****
389***/
390
391static int
392findPiece( tr_cache * cache, tr_torrent * torrent, tr_piece_index_t piece )
393{
394    struct cache_block key;
395    key.tor = torrent;
396    key.block = tr_torPieceFirstBlock( torrent, piece );
397    return tr_ptrArrayLowerBound( &cache->blocks, &key, cache_block_compare, NULL );
398}
399
400int tr_cacheFlushDone( tr_cache * cache )
401{
402    int err = 0;
403
404    if( tr_ptrArraySize( &cache->blocks ) > 0 )
405    {
406        struct run_info * runs = tr_new( struct run_info, tr_ptrArraySize( &cache->blocks ) );
407        int i = 0, n;
408
409        n = calcRuns( cache, runs );
410        while( i < n && ( runs[i].is_piece_done || runs[i].is_multi_piece ) )
411            runs[i++].rank |= SESSIONFLAG;
412        err = flushRuns( cache, runs, i );
413        tr_free( runs );
414    }
415
416    return err;
417}
418
419int
420tr_cacheFlushFile( tr_cache * cache, tr_torrent * torrent, tr_file_index_t i )
421{
422    int err = 0;
423    const tr_file * file = &torrent->info.files[i];
424    const tr_block_index_t begin = tr_torPieceFirstBlock( torrent, file->firstPiece );
425    const tr_block_index_t end  = tr_torPieceFirstBlock( torrent, file->lastPiece ) + tr_torPieceCountBlocks( torrent, file->lastPiece );
426    const int pos = findPiece( cache, torrent, file->firstPiece );
427    dbgmsg( "flushing file %d from cache to disk: blocks [%zu...%zu)", (int)i, (size_t)begin, (size_t)end );
428
429    /* flush out all the blocks in that file */
430    while( !err && ( pos < tr_ptrArraySize( &cache->blocks ) ) )
431    {
432        const struct cache_block * b = tr_ptrArrayNth( &cache->blocks, pos );
433        if( b->tor != torrent ) break;
434        if( ( b->block < begin ) || ( b->block >= end ) ) break;
435        err = flushContiguous( cache, pos, getBlockRun( cache, pos, NULL ) );
436    }
437
438    return err;
439}
440
441int
442tr_cacheFlushTorrent( tr_cache * cache, tr_torrent * torrent )
443{
444    int err = 0;
445    const int pos = findPiece( cache, torrent, 0 );
446
447    /* flush out all the blocks in that torrent */
448    while( !err && ( pos < tr_ptrArraySize( &cache->blocks ) ) )
449    {
450        const struct cache_block * b = tr_ptrArrayNth( &cache->blocks, pos );
451        if( b->tor != torrent ) break;
452        err = flushContiguous( cache, pos, getBlockRun( cache, pos, NULL ) );
453    }
454
455    return err;
456}
Note: See TracBrowser for help on using the repository browser.