source: trunk/libtransmission/tr-dht.c @ 8880

Last change on this file since 8880 was 8880, checked in by charles, 13 years ago

(trunk libT) #2281: add logging related to DHT (Patch from Juliusz)

File size: 12.4 KB
Line 
1/*
2Copyright (c) 2009 by Juliusz Chroboczek
3
4Permission is hereby granted, free of charge, to any person obtaining a copy
5of this software and associated documentation files (the "Software"), to deal
6in the Software without restriction, including without limitation the rights
7to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8copies of the Software, and to permit persons to whom the Software is
9furnished to do so, subject to the following conditions:
10
11The above copyright notice and this permission notice shall be included in
12all copies or substantial portions of the Software.
13
14THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
17AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20THE SOFTWARE.
21*/
22
23/* ansi */
24#include <errno.h>
25#include <stdio.h>
26
27/* posix */
28#include <netinet/in.h> /* sockaddr_in */
29#include <signal.h> /* sig_atomic_t */
30#include <sys/time.h>
31#include <sys/types.h>
32#include <sys/socket.h> /* socket(), bind() */
33#include <unistd.h> /* close() */
34
35/* third party */
36#include <event.h>
37#include <dht/dht.h>
38
39/* libT */
40#include "transmission.h"
41#include "bencode.h"
42#include "crypto.h"
43#include "net.h"
44#include "peer-mgr.h" /* tr_peerMgrCompactToPex() */
45#include "platform.h" /* tr_threadNew() */
46#include "session.h"
47#include "torrent.h" /* tr_torrentFindFromHash() */
48#include "tr-dht.h"
49#include "trevent.h" /* tr_runInEventThread() */
50#include "utils.h"
51#include "version.h"
52
53#ifdef WITHOUT_DHT
54
55  /* These are the stubs for when we're building without DHT support */
56  int tr_dhtInit( tr_session * session UNUSED ) { return TR_DHT_STOPPED; }
57  void tr_dhtUninit( tr_session * session UNUSED ) { }
58  tr_bool tr_dhtEnabled( const tr_session * session UNUSED ) { return FALSE; }
59  tr_port tr_dhtPort ( const tr_session * sesssion UNUSED ) { return 0; }
60  int tr_dhtStatus( tr_session * session     UNUSED,
61                    int        * setmeCount  UNUSED ) { return TR_DHT_STOPPED; }
62  int tr_dhtAddNode( tr_session * session    UNUSED,
63                     tr_address * addr       UNUSED,
64                     tr_port      port       UNUSED,
65                     tr_bool      bootstrap  UNUSED ) { return 0; }
66  int tr_dhtAnnounce( tr_torrent * session UNUSED,
67                      tr_bool announce UNUSED ) { return -1; }
68
69
70#else
71
72static int dht_socket;
73static struct event dht_event;
74static tr_port dht_port;
75static unsigned char myid[20];
76static tr_session *session = NULL;
77
78static void event_callback(int s, short type, void *ignore);
79
80struct bootstrap_closure {
81    tr_session *session;
82    uint8_t *nodes;
83    size_t len;
84};
85
86static void
87dht_bootstrap(void *closure)
88{
89    struct bootstrap_closure *cl = closure;
90    size_t i;
91
92    if(session != cl->session)
93        return;
94
95    for(i = 0; i < cl->len; i += 6)
96    {
97        struct timeval tv;
98        tr_port port;
99        struct tr_address addr;
100        int status;
101
102        memset(&addr, 0, sizeof(addr));
103        addr.type = TR_AF_INET;
104        memcpy(&addr.addr.addr4, &cl->nodes[i], 4);
105        memcpy(&port, &cl->nodes[i + 4], 2);
106        port = ntohs(port);
107        /* There's no race here -- if we uninit between the test and the
108           AddNode, the AddNode will be ignored. */
109        status = tr_dhtStatus(cl->session, NULL);
110        if(status == TR_DHT_STOPPED || status >= TR_DHT_FIREWALLED)
111            break;
112        tr_dhtAddNode(cl->session, &addr, port, 1);
113        tr_timevalSet( &tv, 2 + tr_cryptoWeakRandInt( 5 ), tr_cryptoWeakRandInt( 1000000 ) );
114        select( 0, NULL, NULL, NULL, &tv );
115    }
116    tr_free( cl->nodes );
117    tr_free( closure );
118    tr_ndbg( "DHT", "Finished bootstrapping" );
119}
120
121int
122tr_dhtInit(tr_session *ss)
123{
124    struct sockaddr_in sin;
125    tr_benc benc;
126    int rc;
127    tr_bool have_id = FALSE;
128    char * dat_file;
129    uint8_t * nodes = NULL;
130    const uint8_t * raw;
131    size_t len;
132    char v[5];
133
134    if( session ) /* already initialized */
135        return -1;
136
137    dht_port = tr_sessionGetPeerPort(ss);
138    if(dht_port <= 0)
139        return -1;
140
141    tr_ndbg( "DHT", "Initialising DHT" );
142
143    dht_socket = socket(PF_INET, SOCK_DGRAM, 0);
144    if(dht_socket < 0)
145        goto fail;
146
147    memset(&sin, 0, sizeof(sin));
148    sin.sin_family = AF_INET;
149    sin.sin_port = htons(dht_port);
150    rc = bind(dht_socket, (struct sockaddr*)&sin, sizeof(sin));
151    if(rc < 0)
152        goto fail;
153
154    if( getenv( "TR_DHT_VERBOSE" ) != NULL )
155        dht_debug = stderr;
156
157    dat_file = tr_buildPath( ss->configDir, "dht.dat", NULL );
158    rc = tr_bencLoadFile( &benc, TR_FMT_BENC, dat_file );
159    tr_free( dat_file );
160    if(rc == 0) {
161        if(( have_id = tr_bencDictFindRaw( &benc, "id", &raw, &len ) && len==20 ))
162            memcpy( myid, raw, len );
163        if( tr_bencDictFindRaw( &benc, "nodes", &raw, &len ) && !(len%6) ) {
164            nodes = tr_memdup( raw, len );
165            tr_ninf( "DHT", "Bootstrapping from %d old nodes", (int)(len/6) );
166        }
167        tr_bencFree( &benc );
168    }
169
170    if( have_id )
171        tr_ninf( "DHT", "Reusing old id" );
172    else {
173        /* Note that DHT ids need to be distributed uniformly,
174         * so it should be something truly random. */
175        tr_ninf( "DHT", "Generating new id" );
176        tr_cryptoRandBuf( myid, 20 );
177    }
178
179    v[0] = 'T';
180    v[1] = 'R';
181    v[2] = (SVN_REVISION_NUM >> 8) & 0xFF; 
182    v[3] = SVN_REVISION_NUM & 0xFF; 
183    rc = dht_init( dht_socket, myid, (const unsigned char*)v );
184    if(rc < 0)
185        goto fail;
186
187    session = ss;
188
189    if(nodes) {
190        struct bootstrap_closure * cl = tr_new( struct bootstrap_closure, 1 );
191        cl->session = session;
192        cl->nodes = nodes;
193        cl->len = len;
194        tr_threadNew( dht_bootstrap, cl );
195    }
196
197    event_set( &dht_event, dht_socket, EV_READ, event_callback, NULL );
198    tr_timerAdd( &dht_event, 0, tr_cryptoWeakRandInt( 1000000 ) );
199
200    tr_ndbg( "DHT", "DHT initialised" );
201
202    return 1;
203
204    fail:
205    {
206        const int save = errno;
207        close(dht_socket);
208        dht_socket = -1;
209        session = NULL;
210        tr_ndbg( "DHT", "DHT initialisation failed (errno = %d)", save );
211        errno = save;
212    }
213
214    return -1;
215}
216
217void
218tr_dhtUninit(tr_session *ss)
219{
220    if(session != ss)
221        return;
222
223    tr_ndbg( "DHT", "Uninitialising DHT" );
224
225    event_del(&dht_event);
226
227    /* Since we only save known good nodes, avoid erasing older data if we
228       don't know enough nodes. */
229    if(tr_dhtStatus(ss, NULL) < TR_DHT_FIREWALLED)
230        tr_ninf( "DHT", "Not saving nodes, DHT not ready" );
231    else {
232        tr_benc benc;
233        struct sockaddr_in sins[300];
234        char compact[300 * 6];
235        char *dat_file;
236        int i;
237        int n = dht_get_nodes(sins, 300);
238        int j = 0;
239
240        tr_ninf( "DHT", "Saving %d nodes", n );
241        for( i=0; i<n; ++i ) {
242            memcpy( compact + j, &sins[i].sin_addr, 4 );
243            memcpy( compact + j + 4, &sins[i].sin_port, 2 );
244            j += 6;
245        }
246        tr_bencInitDict( &benc, 2 );
247        tr_bencDictAddRaw( &benc, "id", myid, 20 );
248        tr_bencDictAddRaw( &benc, "nodes", compact, j );
249        dat_file = tr_buildPath( ss->configDir, "dht.dat", NULL );
250        tr_bencToFile( &benc, TR_FMT_BENC, dat_file );
251        tr_bencFree( &benc );
252        tr_free( dat_file );
253    }
254
255    dht_uninit( dht_socket, 0 );
256    EVUTIL_CLOSESOCKET( dht_socket );
257
258    tr_ndbg("DHT", "Done uninitialising DHT");
259
260    session = NULL;
261}
262
263tr_bool
264tr_dhtEnabled( const tr_session * ss )
265{
266    return ss && ( ss == session );
267}
268
269struct getstatus_closure
270{
271    sig_atomic_t status;
272    sig_atomic_t count;
273};
274
275static void
276getstatus( void * closure )
277{
278    struct getstatus_closure * ret = closure;
279    int good, dubious, incoming;
280
281    dht_nodes( &good, &dubious, NULL, &incoming );
282
283    ret->count = good + dubious;
284
285    if( good < 4 || good + dubious <= 8 )
286        ret->status = TR_DHT_BROKEN;
287    else if( good < 40 )
288        ret->status = TR_DHT_POOR;
289    else if( incoming < 8 )
290        ret->status = TR_DHT_FIREWALLED;
291    else
292        ret->status = TR_DHT_GOOD;
293}
294
295int
296tr_dhtStatus( tr_session * ss, int * nodes_return )
297{
298    struct getstatus_closure ret = { -1, - 1 };
299
300    if( !tr_dhtEnabled( ss ) )
301        return TR_DHT_STOPPED;
302
303    tr_runInEventThread( ss, getstatus, &ret );
304    while( ret.status < 0 )
305        tr_wait( 10 /*msec*/ );
306
307    if( nodes_return )
308        *nodes_return = ret.count;
309
310    return ret.status;
311}
312
313tr_port
314tr_dhtPort( const tr_session *ss )
315{
316    return tr_dhtEnabled( ss ) ? dht_port : 0;
317}
318
319int
320tr_dhtAddNode(tr_session *ss, tr_address *address, tr_port port,
321              tr_bool bootstrap)
322{
323    struct sockaddr_in sin;
324
325    if( !tr_dhtEnabled( ss ) )
326        return 0;
327
328    if( address->type != TR_AF_INET )
329        return 0;
330
331    /* Since we don't want to abuse our bootstrap nodes,
332     * we don't ping them if the DHT is in a good state. */
333    if(bootstrap) {
334        if(tr_dhtStatus(ss, NULL) >= TR_DHT_FIREWALLED)
335            return 0;
336    }
337
338    memset(&sin, 0, sizeof(sin));
339    sin.sin_family = AF_INET;
340    memcpy(&sin.sin_addr, &address->addr.addr4, 4);
341    sin.sin_port = htons(port);
342    dht_ping_node(dht_socket, &sin);
343
344    return 1;
345}
346
347const char *
348tr_dhtPrintableStatus(int status)
349{
350    switch(status) {
351    case TR_DHT_STOPPED: return "stopped";
352    case TR_DHT_BROKEN: return "broken";
353    case TR_DHT_POOR: return "poor";
354    case TR_DHT_FIREWALLED: return "firewalled";
355    case TR_DHT_GOOD: return "good";
356    default: return "???";
357    }
358}
359
360static void
361callback( void *ignore UNUSED, int event,
362          unsigned char *info_hash, void *data, size_t data_len )
363{
364    if( event == DHT_EVENT_VALUES )
365    {
366        tr_torrent *tor;
367        tr_globalLock( session );
368        tor = tr_torrentFindFromHash( session, info_hash );
369        if( tor && tr_torrentAllowsDHT( tor ))
370        {
371            size_t i, n;
372            tr_pex * pex = tr_peerMgrCompactToPex(data, data_len, NULL, 0, &n);
373            for( i=0; i<n; ++i )
374                tr_peerMgrAddPex( tor, TR_PEER_FROM_DHT, pex+i );
375            tr_free(pex);
376            tr_torinf(tor, "Learned %d peers from DHT", (int)n);
377        }
378        tr_globalUnlock( session );
379    }
380    else if( event == DHT_EVENT_SEARCH_DONE )
381    {
382        tr_torrent * tor = tr_torrentFindFromHash( session, info_hash );
383        if( tor ) {
384            tr_torinf(tor, "DHT announce done");
385            tor->dhtAnnounceInProgress = 0;
386        }
387    }
388}
389
390int
391tr_dhtAnnounce(tr_torrent *tor, tr_bool announce)
392{
393    int rc, status, numnodes;
394
395    if( !tr_torrentAllowsDHT( tor ) )
396        return -1;
397
398    status = tr_dhtStatus( tor->session, &numnodes );
399    if(status < TR_DHT_POOR ) {
400        tr_tordbg(tor, "DHT not ready (%s, %d nodes)",
401                  tr_dhtPrintableStatus(status), numnodes);
402        return 0;
403    }
404
405    rc = dht_search( dht_socket, tor->info.hash,
406                     announce ? tr_sessionGetPeerPort(session) : 0,
407                     callback, NULL);
408
409    if( rc >= 1 ) {
410        tr_torinf(tor, "Starting DHT announce (%s, %d nodes)",
411                  tr_dhtPrintableStatus(status), numnodes);
412        tor->dhtAnnounceInProgress = TRUE;
413    } else {
414        tr_torerr(tor, "DHT announce failed, errno = %d (%s, %d nodes)",
415                  errno, tr_dhtPrintableStatus(status), numnodes);
416    }
417
418    return 1;
419}
420
421static void
422event_callback(int s, short type, void *ignore UNUSED )
423{
424    time_t tosleep;
425
426    if( dht_periodic(s, type == EV_READ, &tosleep, callback, NULL) < 0 ) {
427        if(errno == EINTR) {
428            tosleep = 0;
429        } else {
430            tr_nerr("DHT", "dht_periodic failed (errno = %d)", errno);
431            if(errno == EINVAL || errno == EFAULT)
432                    abort();
433            tosleep = 1;
434        }
435    }
436
437    /* Being slightly late is fine,
438       and has the added benefit of adding some jitter. */
439    tr_timerAdd( &dht_event, tosleep, tr_cryptoWeakRandInt( 1000000 ) );
440}
441
442void
443dht_hash(void *hash_return, int hash_size,
444         const void *v1, int len1,
445         const void *v2, int len2,
446         const void *v3, int len3)
447{
448    unsigned char sha1[SHA_DIGEST_LENGTH];
449    tr_sha1( sha1, v1, len1, v2, len2, v3, len3, NULL );
450    memset( hash_return, 0, hash_size );
451    memcpy( hash_return, sha1, MIN( hash_size, SHA_DIGEST_LENGTH ) );
452}
453
454int
455dht_random_bytes( void * buf, size_t size )
456{
457    tr_cryptoRandBuf( buf, size );
458    return size;
459}
460
461#endif
Note: See TracBrowser for help on using the repository browser.