source: trunk/libtransmission/tr-dht.c @ 12545

Last change on this file since 12545 was 12543, checked in by jordan, 10 years ago

(trunk libT) #4351 "DHT can't be uninitialized during a session" -- don't process incoming DHT messages if DHT is disabled. Don't process incoming UTP messages if UTP is disabled.

  • Property svn:keywords set to Date Rev Author Id
File size: 18.9 KB
Line 
1/*
2 * Copyright (c) 2009-2010 by Juliusz Chroboczek
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 * THE SOFTWARE.
21 *
22 * $Id: tr-dht.c 12543 2011-07-12 12:26:24Z jordan $
23 *
24 */
25
26/* ansi */
27#include <errno.h>
28#include <stdio.h>
29#include <string.h> /* memcpy(), memset(), memchr(), strlen() */
30#include <stdlib.h> /* atoi() */
31
32/* posix */
33#include <signal.h> /* sig_atomic_t */
34#include <sys/time.h>
35#include <unistd.h> /* close() */
36#ifdef WIN32
37  #include <inttypes.h>
38  #define _WIN32_WINNT  0x0501  /* freeaddrinfo(),getaddrinfo(),getnameinfo() */
39  #include <ws2tcpip.h>
40#else
41  #include <sys/types.h>
42  #include <sys/socket.h> /* socket(), bind() */
43  #include <netdb.h>
44  #include <netinet/in.h> /* sockaddr_in */
45#endif
46
47/* third party */
48#include <event2/event.h>
49#include <dht/dht.h>
50
51/* libT */
52#include "transmission.h"
53#include "bencode.h"
54#include "crypto.h"
55#include "net.h"
56#include "peer-mgr.h" /* tr_peerMgrCompactToPex() */
57#include "platform.h" /* tr_threadNew() */
58#include "session.h"
59#include "torrent.h" /* tr_torrentFindFromHash() */
60#include "tr-dht.h"
61#include "trevent.h" /* tr_runInEventThread() */
62#include "utils.h"
63
64static struct event *dht_timer = NULL;
65static unsigned char myid[20];
66static tr_session *session = NULL;
67
68static void timer_callback(int s, short type, void *ignore);
69
70struct bootstrap_closure {
71    tr_session *session;
72    uint8_t *nodes;
73    uint8_t *nodes6;
74    size_t len, len6;
75};
76
77static int
78bootstrap_done( tr_session *session, int af )
79{
80    int status;
81
82    if(af == 0)
83        return
84            bootstrap_done(session, AF_INET) &&
85            bootstrap_done(session, AF_INET6);
86
87    status = tr_dhtStatus(session, af, NULL);
88    return status == TR_DHT_STOPPED || status >= TR_DHT_FIREWALLED;
89}
90
91static void
92nap( int roughly_sec )
93{
94    const int roughly_msec = roughly_sec * 1000;
95    const int msec = roughly_msec/2 + tr_cryptoWeakRandInt(roughly_msec);
96    tr_wait_msec( msec );
97}
98
99static int
100bootstrap_af(tr_session *session)
101{
102    if( bootstrap_done(session, AF_INET6) )
103        return AF_INET;
104    else if ( bootstrap_done(session, AF_INET) )
105        return AF_INET6;
106    else
107        return 0;
108}
109
110static void
111bootstrap_from_name( const char *name, tr_port port, int af )
112{
113    struct addrinfo hints, *info, *infop;
114    char pp[10];
115    int rc;
116
117    memset(&hints, 0, sizeof(hints));
118    hints.ai_socktype = SOCK_DGRAM;
119    hints.ai_family = af;
120    /* No, just passing p + 1 to gai won't work. */
121    tr_snprintf(pp, sizeof(pp), "%d", (int)port);
122
123    rc = getaddrinfo(name, pp, &hints, &info);
124    if(rc != 0) {
125        tr_nerr("DHT", "%s:%s: %s", name, pp, gai_strerror(rc));
126        return;
127    }
128
129    infop = info;
130    while(infop) {
131        dht_ping_node(infop->ai_addr, infop->ai_addrlen);
132
133        nap(15);
134
135        if(bootstrap_done(session, af))
136            break;
137        infop = infop->ai_next;
138    }
139    freeaddrinfo(info);
140}
141
142static void
143dht_bootstrap(void *closure)
144{
145    struct bootstrap_closure *cl = closure;
146    int i;
147    int num = cl->len / 6, num6 = cl->len6 / 18;
148
149    if(session != cl->session)
150        return;
151
152    if(cl->len > 0)
153        tr_ninf( "DHT", "Bootstrapping from %d nodes", num );
154
155    if(cl->len6 > 0)
156        tr_ninf( "DHT", "Bootstrapping from %d IPv6 nodes", num6 );
157
158
159    for(i = 0; i < MAX(num, num6); i++) {
160        if( i < num && !bootstrap_done(cl->session, AF_INET) ) {
161            tr_port port;
162            struct tr_address addr;
163
164            memset(&addr, 0, sizeof(addr));
165            addr.type = TR_AF_INET;
166            memcpy(&addr.addr.addr4, &cl->nodes[i * 6], 4);
167            memcpy(&port, &cl->nodes[i * 6 + 4], 2);
168            port = ntohs(port);
169            tr_dhtAddNode(cl->session, &addr, port, 1);
170        }
171        if( i < num6 && !bootstrap_done(cl->session, AF_INET6) ) {
172            tr_port port;
173            struct tr_address addr;
174
175            memset(&addr, 0, sizeof(addr));
176            addr.type = TR_AF_INET6;
177            memcpy(&addr.addr.addr6, &cl->nodes6[i * 18], 16);
178            memcpy(&port, &cl->nodes6[i * 18 + 16], 2);
179            port = ntohs(port);
180            tr_dhtAddNode(cl->session, &addr, port, 1);
181        }
182
183        /* Our DHT code is able to take up to 9 nodes in a row without
184           dropping any. After that, it takes some time to split buckets.
185           So ping the first 8 nodes quickly, then slow down. */
186        if(i < 8)
187            nap(2);
188        else
189            nap(15);
190
191        if(bootstrap_done( session, 0 ))
192            break;
193    }
194
195    if(!bootstrap_done(cl->session, 0)) {
196        char *bootstrap_file;
197        FILE *f = NULL;
198
199        bootstrap_file =
200            tr_buildPath(cl->session->configDir, "dht.bootstrap", NULL);
201
202        if(bootstrap_file)
203            f = fopen(bootstrap_file, "rb");
204        if(f != NULL) {
205            tr_ninf("DHT", "Attempting manual bootstrap");
206            for(;;) {
207                char buf[201];
208                char *p;
209                int port = 0;
210
211                p = fgets(buf, 200, f);
212                if( p == NULL )
213                    break;
214
215                p = memchr(buf, ' ', strlen(buf));
216                if(p != NULL)
217                    port = atoi(p + 1);
218                if(p == NULL || port <= 0 || port >= 0x10000) {
219                    tr_nerr("DHT", "Couldn't parse %s", buf);
220                    continue;
221                }
222
223                *p = '\0';
224
225                bootstrap_from_name( buf, port, bootstrap_af(session) );
226
227                if(bootstrap_done(cl->session, 0))
228                    break;
229            }
230            fclose( f );
231        }
232
233        tr_free( bootstrap_file );
234    }
235
236    if(!bootstrap_done(cl->session, 0)) {
237        for(i = 0; i < 6; i++) {
238            /* We don't want to abuse our bootstrap nodes, so be very
239               slow.  The initial wait is to give other nodes a chance
240               to contact us before we attempt to contact a bootstrap
241               node, for example because we've just been restarted. */
242            nap(40);
243            if(bootstrap_done(cl->session, 0))
244                break;
245            if(i == 0)
246                tr_ninf("DHT",
247                        "Attempting bootstrap from dht.transmissionbt.com");
248            bootstrap_from_name( "dht.transmissionbt.com", 6881,
249                                 bootstrap_af(session) );
250        }
251    }
252
253    if( cl->nodes )
254        tr_free( cl->nodes );
255    if( cl->nodes6 )
256        tr_free( cl->nodes6 );
257    tr_free( closure );
258    tr_ndbg( "DHT", "Finished bootstrapping" );
259}
260
261int
262tr_dhtInit(tr_session *ss)
263{
264    tr_benc benc;
265    int rc;
266    bool have_id = false;
267    char * dat_file;
268    uint8_t * nodes = NULL, * nodes6 = NULL;
269    const uint8_t * raw;
270    size_t len, len6;
271    struct bootstrap_closure * cl;
272
273    if( session ) /* already initialized */
274        return -1;
275
276    tr_ndbg( "DHT", "Initializing DHT" );
277
278    if( getenv( "TR_DHT_VERBOSE" ) != NULL )
279        dht_debug = stderr;
280
281    dat_file = tr_buildPath( ss->configDir, "dht.dat", NULL );
282    rc = tr_bencLoadFile( &benc, TR_FMT_BENC, dat_file );
283    tr_free( dat_file );
284    if(rc == 0) {
285        have_id = tr_bencDictFindRaw(&benc, "id", &raw, &len);
286        if( have_id && len==20 )
287            memcpy( myid, raw, len );
288        if( ss->udp_socket >= 0 &&
289            tr_bencDictFindRaw( &benc, "nodes", &raw, &len ) && !(len%6) ) {
290                nodes = tr_memdup( raw, len );
291        }
292        if( ss->udp6_socket > 0 &&
293            tr_bencDictFindRaw( &benc, "nodes6", &raw, &len6 ) && !(len6%18) ) {
294            nodes6 = tr_memdup( raw, len6 );
295        }
296        tr_bencFree( &benc );
297    }
298
299    if(nodes == NULL)
300        len = 0;
301    if(nodes6 == NULL)
302        len6 = 0;
303
304    if( have_id )
305        tr_ninf( "DHT", "Reusing old id" );
306    else {
307        /* Note that DHT ids need to be distributed uniformly,
308         * so it should be something truly random. */
309        tr_ninf( "DHT", "Generating new id" );
310        tr_cryptoRandBuf( myid, 20 );
311    }
312
313    rc = dht_init( ss->udp_socket, ss->udp6_socket, myid, NULL );
314    if( rc < 0 )
315        goto fail;
316
317    session = ss;
318
319    cl = tr_new( struct bootstrap_closure, 1 );
320    cl->session = session;
321    cl->nodes = nodes;
322    cl->nodes6 = nodes6;
323    cl->len = len;
324    cl->len6 = len6;
325    tr_threadNew( dht_bootstrap, cl );
326
327    dht_timer = evtimer_new( session->event_base, timer_callback, session );
328    tr_timerAdd( dht_timer, 0, tr_cryptoWeakRandInt( 1000000 ) );
329
330    tr_ndbg( "DHT", "DHT initialized" );
331
332    return 1;
333
334 fail:
335    tr_ndbg( "DHT", "DHT initialization failed (errno = %d)", errno );
336    session = NULL;
337    return -1;
338}
339
340void
341tr_dhtUninit(tr_session *ss)
342{
343    if(session != ss)
344        return;
345
346    tr_ndbg( "DHT", "Uninitializing DHT" );
347
348    if( dht_timer != NULL ) {
349        event_free( dht_timer );
350        dht_timer = NULL;
351    }
352
353    /* Since we only save known good nodes, avoid erasing older data if we
354       don't know enough nodes. */
355    if(tr_dhtStatus(ss, AF_INET, NULL) < TR_DHT_FIREWALLED)
356        tr_ninf( "DHT", "Not saving nodes, DHT not ready" );
357    else {
358        tr_benc benc;
359        struct sockaddr_in sins[300];
360        struct sockaddr_in6 sins6[300];
361        char compact[300 * 6], compact6[300 * 18];
362        char *dat_file;
363        int i, j, num = 300, num6 = 300;
364        int n = dht_get_nodes(sins, &num, sins6, &num6);
365
366        tr_ninf( "DHT", "Saving %d (%d + %d) nodes", n, num, num6 );
367
368        j = 0;
369        for( i=0; i<num; ++i ) {
370            memcpy( compact + j, &sins[i].sin_addr, 4 );
371            memcpy( compact + j + 4, &sins[i].sin_port, 2 );
372            j += 6;
373        }
374        j = 0;
375        for( i=0; i<num6; ++i ) {
376            memcpy( compact6 + j, &sins6[i].sin6_addr, 16 );
377            memcpy( compact6 + j + 16, &sins6[i].sin6_port, 2 );
378            j += 18;
379        }
380        tr_bencInitDict( &benc, 3 );
381        tr_bencDictAddRaw( &benc, "id", myid, 20 );
382        if(num > 0)
383            tr_bencDictAddRaw( &benc, "nodes", compact, num * 6 );
384        if(num6 > 0)
385            tr_bencDictAddRaw( &benc, "nodes6", compact6, num6 * 18 );
386        dat_file = tr_buildPath( ss->configDir, "dht.dat", NULL );
387        tr_bencToFile( &benc, TR_FMT_BENC, dat_file );
388        tr_bencFree( &benc );
389        tr_free( dat_file );
390    }
391
392    dht_uninit();
393    tr_ndbg("DHT", "Done uninitializing DHT");
394
395    session = NULL;
396}
397
398bool
399tr_dhtEnabled( const tr_session * ss )
400{
401    return ss && ( ss == session );
402}
403
404struct getstatus_closure
405{
406    int af;
407    sig_atomic_t status;
408    sig_atomic_t count;
409};
410
411static void
412getstatus( void * cl )
413{
414    struct getstatus_closure * closure = cl;
415    int good, dubious, incoming;
416
417    dht_nodes( closure->af, &good, &dubious, NULL, &incoming );
418
419    closure->count = good + dubious;
420
421    if( good < 4 || good + dubious <= 8 )
422        closure->status = TR_DHT_BROKEN;
423    else if( good < 40 )
424        closure->status = TR_DHT_POOR;
425    else if( incoming < 8 )
426        closure->status = TR_DHT_FIREWALLED;
427    else
428        closure->status = TR_DHT_GOOD;
429}
430
431int
432tr_dhtStatus( tr_session * session, int af, int * nodes_return )
433{
434    struct getstatus_closure closure = { af, -1, -1 };
435
436    if( !tr_dhtEnabled( session ) ||
437        (af == AF_INET && session->udp_socket < 0) ||
438        (af == AF_INET6 && session->udp6_socket < 0) ) {
439        if( nodes_return )
440            *nodes_return = 0;
441        return TR_DHT_STOPPED;
442    }
443
444    tr_runInEventThread( session, getstatus, &closure );
445    while( closure.status < 0 )
446        tr_wait_msec( 50 /*msec*/ );
447
448    if( nodes_return )
449        *nodes_return = closure.count;
450
451    return closure.status;
452}
453
454tr_port
455tr_dhtPort( tr_session *ss )
456{
457    return tr_dhtEnabled( ss ) ? ss->udp_port : 0;
458}
459
460int
461tr_dhtAddNode( tr_session       * ss,
462               const tr_address * address,
463               tr_port            port,
464               bool            bootstrap )
465{
466    int af = address->type == TR_AF_INET ? AF_INET : AF_INET6;
467
468    if( !tr_dhtEnabled( ss ) )
469        return 0;
470
471    /* Since we don't want to abuse our bootstrap nodes,
472     * we don't ping them if the DHT is in a good state. */
473
474    if(bootstrap) {
475        if(tr_dhtStatus(ss, af, NULL) >= TR_DHT_FIREWALLED)
476            return 0;
477    }
478
479    if( address->type == TR_AF_INET ) {
480        struct sockaddr_in sin;
481        memset(&sin, 0, sizeof(sin));
482        sin.sin_family = AF_INET;
483        memcpy(&sin.sin_addr, &address->addr.addr4, 4);
484        sin.sin_port = htons(port);
485        dht_ping_node((struct sockaddr*)&sin, sizeof(sin));
486        return 1;
487    } else if( address->type == TR_AF_INET6 ) {
488        struct sockaddr_in6 sin6;
489        memset(&sin6, 0, sizeof(sin6));
490        sin6.sin6_family = AF_INET6;
491        memcpy(&sin6.sin6_addr, &address->addr.addr6, 16);
492        sin6.sin6_port = htons(port);
493        dht_ping_node((struct sockaddr*)&sin6, sizeof(sin6));
494        return 1;
495    }
496
497    return 0;
498}
499
500const char *
501tr_dhtPrintableStatus(int status)
502{
503    switch(status) {
504    case TR_DHT_STOPPED: return "stopped";
505    case TR_DHT_BROKEN: return "broken";
506    case TR_DHT_POOR: return "poor";
507    case TR_DHT_FIREWALLED: return "firewalled";
508    case TR_DHT_GOOD: return "good";
509    default: return "???";
510    }
511}
512
513static void
514callback( void *ignore UNUSED, int event,
515          unsigned char *info_hash, void *data, size_t data_len )
516{
517    if( event == DHT_EVENT_VALUES || event == DHT_EVENT_VALUES6 ) {
518        tr_torrent *tor;
519        tr_sessionLock( session );
520        tor = tr_torrentFindFromHash( session, info_hash );
521        if( tor && tr_torrentAllowsDHT( tor ))
522        {
523            size_t i, n;
524            tr_pex * pex;
525            if( event == DHT_EVENT_VALUES )
526                pex = tr_peerMgrCompactToPex(data, data_len, NULL, 0, &n);
527            else
528                pex = tr_peerMgrCompact6ToPex(data, data_len, NULL, 0, &n);
529            for( i=0; i<n; ++i )
530                tr_peerMgrAddPex( tor, TR_PEER_FROM_DHT, pex+i, -1 );
531            tr_free(pex);
532            tr_tordbg(tor, "Learned %d%s peers from DHT",
533                      (int)n,
534                      event == DHT_EVENT_VALUES6 ? " IPv6" : "");
535        }
536        tr_sessionUnlock( session );
537    } else if( event == DHT_EVENT_SEARCH_DONE ||
538               event == DHT_EVENT_SEARCH_DONE6) {
539        tr_torrent * tor = tr_torrentFindFromHash( session, info_hash );
540        if( tor ) {
541            if( event == DHT_EVENT_SEARCH_DONE ) {
542                tr_torinf(tor, "DHT announce done");
543                tor->dhtAnnounceInProgress = 0;
544            } else {
545                tr_torinf(tor, "IPv6 DHT announce done");
546                tor->dhtAnnounce6InProgress = 0;
547            }
548        }
549    }
550}
551
552static int
553tr_dhtAnnounce(tr_torrent *tor, int af, bool announce)
554{
555    int rc, status, numnodes, ret = 0;
556
557    if( !tr_torrentAllowsDHT( tor ) )
558        return -1;
559
560    status = tr_dhtStatus( tor->session, af, &numnodes );
561
562    if( status == TR_DHT_STOPPED ) {
563        /* Let the caller believe everything is all right. */
564        return 1;
565    }
566
567    if(status >= TR_DHT_POOR ) {
568        rc = dht_search( tor->info.hash,
569                         announce ? tr_sessionGetPeerPort(session) : 0,
570                         af, callback, NULL);
571        if( rc >= 1 ) {
572            tr_torinf(tor, "Starting%s DHT announce (%s, %d nodes)",
573                      af == AF_INET6 ? " IPv6" : "",
574                      tr_dhtPrintableStatus(status), numnodes);
575            if(af == AF_INET)
576                tor->dhtAnnounceInProgress = true;
577            else
578                tor->dhtAnnounce6InProgress = true;
579            ret = 1;
580        } else {
581            tr_torerr(tor, "%sDHT announce failed (%s, %d nodes): %s",
582                      af == AF_INET6 ? "IPv6 " : "",
583                      tr_dhtPrintableStatus(status), numnodes,
584                      tr_strerror( errno ) );
585        }
586    } else {
587        tr_tordbg(tor, "%sDHT not ready (%s, %d nodes)",
588                  af == AF_INET6 ? "IPv6 " : "",
589                  tr_dhtPrintableStatus(status), numnodes);
590    }
591
592    return ret;
593}
594
595void
596tr_dhtUpkeep( tr_session * session )
597{
598    tr_torrent * tor = NULL;
599    const time_t now = tr_time( );
600
601    while(( tor = tr_torrentNext( session, tor )))
602    {
603        if( !tor->isRunning || !tr_torrentAllowsDHT( tor ) )
604            continue;
605
606        if( tor->dhtAnnounceAt <= now )
607        {
608            const int rc = tr_dhtAnnounce(tor, AF_INET, 1);
609
610            tor->dhtAnnounceAt = now + ((rc == 0)
611                                     ? 5 + tr_cryptoWeakRandInt( 5 )
612                                     : 25 * 60 + tr_cryptoWeakRandInt( 3*60 ));
613        }
614
615        if( tor->dhtAnnounce6At <= now )
616        {
617            const int rc = tr_dhtAnnounce(tor, AF_INET6, 1);
618
619            tor->dhtAnnounce6At = now + ((rc == 0)
620                                      ? 5 + tr_cryptoWeakRandInt( 5 )
621                                      : 25 * 60 + tr_cryptoWeakRandInt( 3*60 ));
622        }
623    }
624}
625
626void
627tr_dhtCallback(unsigned char *buf, int buflen,
628               struct sockaddr *from, socklen_t fromlen,
629               void *sv )
630{
631    time_t tosleep;
632    int rc;
633
634    assert(tr_isSession(sv));
635
636    if(sv != session)
637        return;
638
639    rc = dht_periodic( buf, buflen, from, fromlen,
640                       &tosleep, callback, NULL);
641    if(rc < 0) {
642        if(errno == EINTR) {
643            tosleep = 0;
644        } else {
645            tr_nerr( "DHT", "dht_periodic failed: %s", tr_strerror( errno ) );
646            if(errno == EINVAL || errno == EFAULT)
647                    abort();
648            tosleep = 1;
649        }
650    }
651
652    /* Being slightly late is fine,
653       and has the added benefit of adding some jitter. */
654    tr_timerAdd( dht_timer, tosleep, tr_cryptoWeakRandInt( 1000000 ) );
655}
656
657static void
658timer_callback(int s UNUSED, short type UNUSED, void *session )
659{
660    tr_dhtCallback(NULL, 0, NULL, 0, session);
661}
662
663
664void
665dht_hash(void *hash_return, int hash_size,
666         const void *v1, int len1,
667         const void *v2, int len2,
668         const void *v3, int len3)
669{
670    unsigned char sha1[SHA_DIGEST_LENGTH];
671    tr_sha1( sha1, v1, len1, v2, len2, v3, len3, NULL );
672    memset( hash_return, 0, hash_size );
673    memcpy( hash_return, sha1, MIN( hash_size, SHA_DIGEST_LENGTH ) );
674}
675
676int
677dht_random_bytes( void * buf, size_t size )
678{
679    tr_cryptoRandBuf( buf, size );
680    return size;
681}
Note: See TracBrowser for help on using the repository browser.