source: branches/encryption/libtransmission/peer-msgs.c @ 2985

Last change on this file since 2985 was 2985, checked in by charles, 15 years ago

work on incoming connections, and better deciding of which pieces to request first.

  • Property svn:keywords set to Date Rev Author Id
File size: 29.3 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 2985 2007-09-07 20:55:38Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <arpa/inet.h>
20
21#include <sys/types.h> /* event.h needs this */
22#include <event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "completion.h"
27#include "inout.h"
28#include "list.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ratecontrol.h"
34#include "timer.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41#define MINUTES_TO_MSEC(N) ((N) * 60 * 1000)
42
43/* pex attempts are made this frequently */
44#define PEX_INTERVAL (MINUTES_TO_MSEC(1))
45
46/* the most requests we'll batch up for this peer */
47#define OUT_REQUESTS_MAX 10
48
49/* when we get down to this many requests, we ask the manager for more */
50#define OUT_REQUESTS_LOW 4
51
52enum
53{
54    BT_CHOKE           = 0,
55    BT_UNCHOKE         = 1,
56    BT_INTERESTED      = 2,
57    BT_NOT_INTERESTED  = 3,
58    BT_HAVE            = 4,
59    BT_BITFIELD        = 5,
60    BT_REQUEST         = 6,
61    BT_PIECE           = 7,
62    BT_CANCEL          = 8,
63    BT_PORT            = 9,
64    BT_LTEP            = 20,
65
66    LTEP_HANDSHAKE     = 0
67};
68
69enum
70{
71    AWAITING_BT_LENGTH,
72    AWAITING_BT_MESSAGE,
73    READING_BT_PIECE
74};
75
76static const char *
77getStateName( int state )
78{
79    switch( state )
80    {
81        case AWAITING_BT_LENGTH: return "awaiting bt length";
82        case AWAITING_BT_MESSAGE: return "awaiting bt message";
83        case READING_BT_PIECE: return "reading bt piece";
84    }
85
86    fprintf (stderr, "PeerManager::getStateName: unhandled state %d\n", state );
87    abort( );
88}
89
90struct peer_request
91{
92    uint32_t index;
93    uint32_t offset;
94    uint32_t length;
95};
96
97static int
98peer_request_compare( const void * va, const void * vb )
99{
100    struct peer_request * a = (struct peer_request*) va;
101    struct peer_request * b = (struct peer_request*) vb;
102    if( a->index != b->index ) return a->index - b->index;
103    if( a->offset != b->offset ) return a->offset - b->offset;
104    if( a->length != b->length ) return a->length - b->length;
105    return 0;
106}
107
108struct tr_peermsgs
109{
110    tr_peer * info;
111
112    tr_handle * handle;
113    tr_torrent * torrent;
114    tr_peerIo * io;
115
116    tr_publisher_t * publisher;
117
118    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
119    struct evbuffer * outBlock;    /* the block we're currently sending */
120    struct evbuffer * inBlock;     /* the block we're currently receiving */
121    tr_list * peerAskedFor;
122    tr_list * clientAskedFor;
123
124    tr_timer_tag pulseTag;
125    tr_timer_tag pexTag;
126
127    unsigned int  notListening        : 1;
128
129    struct peer_request blockToUs;
130
131    int state;
132
133    uint32_t incomingMessageLength;
134
135    uint64_t gotKeepAliveTime;
136
137    uint8_t ut_pex;
138    uint16_t listeningPort;
139
140    tr_pex * pex;
141    int pexCount;
142};
143
144/**
145***  EVENTS
146**/
147
148static const tr_peermsgs_event blankEvent = { 0, 0, NULL };
149
150static void
151publishEvent( tr_peermsgs * peer, int eventType )
152{
153    tr_peermsgs_event e = blankEvent;
154    e.eventType = eventType;
155    tr_publisherPublish( peer->publisher, peer, &e );
156}
157
158static void
159fireGotPex( tr_peermsgs * peer )
160{
161    publishEvent( peer, TR_PEERMSG_GOT_PEX );
162}
163
164static void
165fireGotBitfield( tr_peermsgs * peer, const tr_bitfield * bitfield )
166{
167    tr_peermsgs_event e = blankEvent;
168    e.eventType = TR_PEERMSG_GOT_BITFIELD;
169    e.bitfield = bitfield;
170    tr_publisherPublish( peer->publisher, peer, &e );
171}
172
173static void
174fireGotHave( tr_peermsgs * peer, uint32_t pieceIndex )
175{
176    tr_peermsgs_event e = blankEvent;
177    e.eventType = TR_PEERMSG_GOT_HAVE;
178    e.pieceIndex = pieceIndex;
179    tr_publisherPublish( peer->publisher, peer, &e );
180}
181
182static void
183fireGotError( tr_peermsgs * peer )
184{
185    publishEvent( peer, TR_PEERMSG_GOT_ERROR );
186}
187
188static void
189fireBlocksRunningLow( tr_peermsgs * peer )
190{
191    publishEvent( peer, TR_PEERMSG_BLOCKS_RUNNING_LOW );
192}
193
194/**
195***  INTEREST
196**/
197
198static int
199isPieceInteresting( const tr_peermsgs   * peer,
200                    int                   piece )
201{
202    const tr_torrent * torrent = peer->torrent;
203    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
204        return FALSE;
205    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we already have it */
206        return FALSE;
207    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
208        return FALSE;
209    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned for it */
210        return FALSE;
211    return TRUE;
212}
213
214static int
215isPeerInteresting( const tr_peermsgs * peer )
216{
217    int i;
218    const tr_torrent * torrent = peer->torrent;
219    const tr_bitfield * bitfield = tr_cpPieceBitfield( torrent->completion );
220
221    if( !peer->info->have ) /* We don't know what this peer has */
222        return FALSE;
223
224    assert( bitfield->len == peer->info->have->len );
225
226    for( i=0; i<torrent->info.pieceCount; ++i )
227        if( isPieceInteresting( peer, i ) )
228            return TRUE;
229
230    return FALSE;
231}
232
233static void
234sendInterest( tr_peermsgs * peer, int weAreInterested )
235{
236    const uint32_t len = sizeof(uint8_t);
237    const uint8_t bt_msgid = weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED;
238
239    fprintf( stderr, "peer %p: enqueueing an %s message\n", peer, (weAreInterested ? "interested" : "not interested") );
240    tr_peerIoWriteUint32( peer->io, peer->outMessages, len );
241    tr_peerIoWriteBytes( peer->io, peer->outMessages, &bt_msgid, 1 );
242}
243
244static void
245updateInterest( tr_peermsgs * peer )
246{
247    const int i = isPeerInteresting( peer );
248    if( i != peer->info->clientIsInterested )
249        sendInterest( peer, i );
250}
251
252void
253tr_peerMsgsSetChoke( tr_peermsgs * peer, int choke )
254{
255    assert( peer != NULL );
256
257    if( peer->info->peerIsChoked != !!choke )
258    {
259        const uint32_t len = sizeof(uint8_t);
260        const uint8_t bt_msgid = choke ? BT_CHOKE : BT_UNCHOKE;
261
262        peer->info->peerIsChoked = choke ? 1 : 0;
263        if( peer->info )
264        {
265            tr_list_foreach( peer->peerAskedFor, tr_free );
266            tr_list_free( &peer->peerAskedFor );
267        }
268
269        fprintf( stderr, "peer %p: enqueuing a %s message\n", peer, (choke ? "choke" : "unchoke") );
270        tr_peerIoWriteUint32( peer->io, peer->outMessages, len );
271        tr_peerIoWriteBytes( peer->io, peer->outMessages, &bt_msgid, 1 );
272    }
273}
274
275/**
276***
277**/
278
279int
280tr_peerMsgsAddRequest( tr_peermsgs * peer,
281                       uint32_t      index, 
282                       uint32_t      offset, 
283                       uint32_t      length )
284{
285    int ret =-1;
286
287    if( tr_list_size(peer->clientAskedFor) < OUT_REQUESTS_MAX )
288    {
289        const uint8_t bt_msgid = BT_REQUEST;
290        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
291        struct peer_request * req = tr_new( struct peer_request, 1 );
292
293        tr_peerIoWriteUint32( peer->io, peer->outMessages, len );
294        tr_peerIoWriteBytes( peer->io, peer->outMessages, &bt_msgid, 1 );
295        tr_peerIoWriteUint32( peer->io, peer->outMessages, index );
296        tr_peerIoWriteUint32( peer->io, peer->outMessages, offset );
297        tr_peerIoWriteUint32( peer->io, peer->outMessages, length );
298        fprintf( stderr, "peer %p: requesting a block from piece %u, offset %u, length %u\n",
299                         peer, (unsigned int)index, (unsigned int)offset, (unsigned int)length );
300
301        req->index = index;
302        req->offset = offset;
303        req->length = length;
304        tr_list_append( &peer->clientAskedFor, req );
305
306        ret = 0;
307    }
308
309    return ret;
310}
311
312/**
313***
314**/
315
316static void
317parseLtepHandshake( tr_peermsgs * peer, int len, struct evbuffer * inbuf )
318{
319    benc_val_t val, * sub;
320    uint8_t * tmp = tr_new( uint8_t, len );
321    evbuffer_remove( inbuf, tmp, len );
322
323    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
324        fprintf( stderr, "GET  extended-handshake, couldn't get dictionary\n" );
325        tr_free( tmp );
326        return;
327    }
328
329    tr_bencPrint( &val );
330
331    /* check supported messages for utorrent pex */
332    sub = tr_bencDictFind( &val, "m" );
333    if( tr_bencIsDict( sub ) ) {
334        sub = tr_bencDictFind( sub, "ut_pex" );
335        if( tr_bencIsInt( sub ) ) {
336            peer->ut_pex = (uint8_t) sub->val.i;
337            fprintf( stderr, "peer->ut_pex is %d\n", peer->ut_pex );
338        }
339    }
340
341    /* get peer's client name */
342    sub = tr_bencDictFind( &val, "v" );
343    if( tr_bencIsStr( sub ) ) {
344int i;
345        tr_free( peer->info->client );
346        fprintf( stderr, "dictionary says client is [%s]\n", sub->val.s.s );
347        peer->info->client = tr_strndup( sub->val.s.s, sub->val.s.i );
348for( i=0; i<sub->val.s.i; ++i ) { fprintf( stderr, "[%c] (%d)\n", sub->val.s.s[i], (int)sub->val.s.s[i] );
349                                  if( (int)peer->info->client[i]==-75 ) peer->info->client[i]='u'; }
350        fprintf( stderr, "peer->client is now [%s]\n", peer->info->client );
351    }
352
353    /* get peer's listening port */
354    sub = tr_bencDictFind( &val, "p" );
355    if( tr_bencIsInt( sub ) ) {
356        peer->listeningPort = htons( (uint16_t)sub->val.i );
357        fprintf( stderr, "peer->port is now %hd\n", peer->listeningPort );
358    }
359
360    tr_bencFree( &val );
361    tr_free( tmp );
362}
363
364static void
365parseUtPex( tr_peermsgs * peer, int msglen, struct evbuffer * inbuf )
366{
367    benc_val_t val, * sub;
368    uint8_t * tmp;
369
370    if( !peer->info->pexEnabled ) /* no sharing! */
371        return;
372
373    tmp = tr_new( uint8_t, msglen );
374    evbuffer_remove( inbuf, tmp, msglen );
375
376    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
377        fprintf( stderr, "GET can't read extended-pex dictionary\n" );
378        tr_free( tmp );
379        return;
380    }
381
382    sub = tr_bencDictFind( &val, "added" );
383    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
384        const int n = sub->val.s.i / 6 ;
385        fprintf( stderr, "got %d peers from uT pex\n", n );
386        tr_peerMgrAddPeers( peer->handle->peerMgr,
387                            peer->torrent->info.hash,
388                            TR_PEER_FROM_PEX,
389                            (uint8_t*)sub->val.s.s, n );
390    }
391
392    fireGotPex( peer );
393
394    tr_bencFree( &val );
395    tr_free( tmp );
396}
397
398static void
399parseLtep( tr_peermsgs * peer, int msglen, struct evbuffer * inbuf )
400{
401    uint8_t ltep_msgid;
402
403    tr_peerIoReadBytes( peer->io, inbuf, &ltep_msgid, 1 );
404    msglen--;
405
406    if( ltep_msgid == LTEP_HANDSHAKE )
407    {
408        fprintf( stderr, "got ltep handshake\n" );
409        parseLtepHandshake( peer, msglen, inbuf );
410    }
411    else if( ltep_msgid == peer->ut_pex )
412    {
413        fprintf( stderr, "got ut pex\n" );
414        parseUtPex( peer, msglen, inbuf );
415    }
416    else
417    {
418        fprintf( stderr, "skipping unknown ltep message (%d)\n", (int)ltep_msgid );
419        evbuffer_drain( inbuf, msglen );
420    }
421}
422
423static int
424readBtLength( tr_peermsgs * peer, struct evbuffer * inbuf )
425{
426    uint32_t len;
427    const size_t needlen = sizeof(uint32_t);
428
429    if( EVBUFFER_LENGTH(inbuf) < needlen )
430        return READ_MORE;
431
432    tr_peerIoReadUint32( peer->io, inbuf, &len );
433
434    if( len == 0 ) { /* peer sent us a keepalive message */
435        fprintf( stderr, "peer sent us a keepalive message...\n" );
436        peer->gotKeepAliveTime = tr_date( );
437    } else {
438        fprintf( stderr, "peer is sending us a message with %d bytes...\n", (int)len );
439        peer->incomingMessageLength = len;
440        peer->state = AWAITING_BT_MESSAGE;
441    } return READ_AGAIN;
442}
443
444static int
445readBtMessage( tr_peermsgs * peer, struct evbuffer * inbuf )
446{
447    uint8_t id;
448    uint32_t ui32;
449    size_t msglen = peer->incomingMessageLength;
450
451    if( EVBUFFER_LENGTH(inbuf) < msglen )
452        return READ_MORE;
453
454    tr_peerIoReadBytes( peer->io, inbuf, &id, 1 );
455    msglen--;
456    fprintf( stderr, "got a message from the peer... "
457                     "bt id number is %d, and remaining len is %d\n", (int)id, (int)msglen );
458
459    switch( id )
460    {
461        case BT_CHOKE:
462            assert( msglen == 0 );
463            fprintf( stderr, "got a BT_CHOKE\n" );
464            peer->info->clientIsChoked = 1;
465            tr_list_foreach( peer->peerAskedFor, tr_free );
466            tr_list_free( &peer->peerAskedFor );
467            /* FIXME: maybe choke them */
468            /* FIXME: unmark anything we'd requested from them... */
469            break;
470
471        case BT_UNCHOKE:
472            assert( msglen == 0 );
473            fprintf( stderr, "got a BT_UNCHOKE\n" );
474            peer->info->clientIsChoked = 0;
475            /* FIXME: maybe unchoke them */
476            /* FIXME: maybe send them requests */
477            break;
478
479        case BT_INTERESTED:
480            assert( msglen == 0 );
481            fprintf( stderr, "got a BT_INTERESTED\n" );
482            peer->info->peerIsInterested = 1;
483            /* FIXME: maybe unchoke them */
484            break;
485
486        case BT_NOT_INTERESTED:
487            assert( msglen == 0 );
488            fprintf( stderr, "got a BT_NOT_INTERESTED\n" );
489            peer->info->peerIsInterested = 0;
490            /* FIXME: maybe choke them */
491            break;
492
493        case BT_HAVE:
494            assert( msglen == 4 );
495            fprintf( stderr, "got a BT_HAVE\n" );
496            tr_peerIoReadUint32( peer->io, inbuf, &ui32 );
497            tr_bitfieldAdd( peer->info->have, ui32 );
498            peer->info->progress = tr_bitfieldCountTrueBits( peer->info->have ) / (float)peer->torrent->info.pieceCount;
499            fireGotHave( peer, ui32 );
500            updateInterest( peer );
501            break;
502
503        case BT_BITFIELD:
504            assert( msglen == peer->info->have->len );
505            fprintf( stderr, "got a BT_BITFIELD\n" );
506            tr_peerIoReadBytes( peer->io, inbuf, peer->info->have->bits, msglen );
507            peer->info->progress = tr_bitfieldCountTrueBits( peer->info->have ) / (float)peer->torrent->info.pieceCount;
508            fprintf( stderr, "peer progress is %f\n", peer->info->progress );
509            fireGotBitfield( peer, peer->info->have );
510            updateInterest( peer );
511            /* FIXME: maybe unchoke */
512            break;
513
514        case BT_REQUEST: {
515            struct peer_request * req;
516            assert( msglen == 12 );
517            fprintf( stderr, "got a BT_REQUEST\n" );
518            req = tr_new( struct peer_request, 1 );
519            tr_peerIoReadUint32( peer->io, inbuf, &req->index );
520            tr_peerIoReadUint32( peer->io, inbuf, &req->offset );
521            tr_peerIoReadUint32( peer->io, inbuf, &req->length );
522            if( !peer->info->peerIsChoked )
523                tr_list_append( &peer->peerAskedFor, req );
524            break;
525        }
526
527        case BT_CANCEL: {
528            struct peer_request req;
529            tr_list * node;
530            assert( msglen == 12 );
531            fprintf( stderr, "got a BT_CANCEL\n" );
532            tr_peerIoReadUint32( peer->io, inbuf, &req.index );
533            tr_peerIoReadUint32( peer->io, inbuf, &req.offset );
534            tr_peerIoReadUint32( peer->io, inbuf, &req.length );
535            node = tr_list_find( peer->peerAskedFor, &req, peer_request_compare );
536            if( node != NULL ) {
537                fprintf( stderr, "found the req that peer is cancelling... cancelled.\n" );
538                tr_list_remove_data( &peer->peerAskedFor, node->data );
539            }
540            break;
541        }
542
543        case BT_PIECE: {
544            fprintf( stderr, "got a BT_PIECE\n" );
545            assert( peer->blockToUs.length == 0 );
546            peer->state = READING_BT_PIECE;
547            tr_peerIoReadUint32( peer->io, inbuf, &peer->blockToUs.index );
548            tr_peerIoReadUint32( peer->io, inbuf, &peer->blockToUs.offset );
549            peer->blockToUs.length = msglen - 8;
550            assert( peer->blockToUs.length > 0 );
551            evbuffer_drain( peer->inBlock, ~0 );
552            break;
553        }
554
555        case BT_PORT: {
556            assert( msglen == 2 );
557            fprintf( stderr, "got a BT_PORT\n" );
558            tr_peerIoReadUint16( peer->io, inbuf, &peer->listeningPort );
559            break;
560        }
561
562        case BT_LTEP:
563            fprintf( stderr, "got a BT_LTEP\n" );
564            parseLtep( peer, msglen, inbuf );
565            break;
566
567        default:
568            fprintf( stderr, "got an unknown BT message type: %d\n", (int)id );
569            tr_peerIoDrain( peer->io, inbuf, msglen );
570            assert( 0 );
571    }
572
573    peer->incomingMessageLength = -1;
574    peer->state = AWAITING_BT_LENGTH;
575    return READ_AGAIN;
576}
577
578static int
579canDownload( const tr_peermsgs * peer )
580{
581    tr_torrent * tor = peer->torrent;
582
583#if 0
584    /* FIXME: was swift worth it?  did anyone notice a difference? */
585    if( SWIFT_ENABLED && !isSeeding && (peer->credit<0) )
586        return FALSE;
587#endif
588
589    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
590        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
591
592    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
593        return tr_rcCanTransfer( tor->download );
594
595    return TRUE;
596}
597
598static void
599gotBlock( tr_peermsgs * peer, int index, int offset, struct evbuffer * inbuf )
600{
601    tr_torrent * tor = peer->torrent;
602    const size_t length = EVBUFFER_LENGTH( inbuf );
603    const int block = _tr_block( tor, index, offset );
604    struct peer_request key, *req;
605
606    /* sanity clause */
607    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
608        tr_dbg( "have this block already..." );
609        return;
610    }
611    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
612        tr_dbg( "block is the wrong length..." );
613        return;
614    }
615
616    /* remove it from our `we asked for this' list */
617    key.index = index;
618    key.offset = offset;
619    key.length = length;
620    req = (struct peer_request*) tr_list_find( peer->clientAskedFor, &key,
621                                               peer_request_compare );
622    if( req == NULL ) {
623        tr_dbg( "we didn't ask the peer for this message..." );
624        return;
625    }
626    tr_list_remove_data( &peer->clientAskedFor, req );
627    tr_free( req );
628
629    /* write to disk */
630    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf )))
631        return;
632
633    /* make a note that this peer helped us with this piece */
634    if( !peer->info->blame )
635         peer->info->blame = tr_bitfieldNew( tor->info.pieceCount );
636    tr_bitfieldAdd( peer->info->blame, index );
637
638    tr_cpBlockAdd( tor->completion, block );
639
640    tor->downloadedCur += length;
641    tr_rcTransferred( tor->download, length );
642    tr_rcTransferred( tor->handle->download, length );
643
644    if( tr_list_size(peer->clientAskedFor) <= OUT_REQUESTS_LOW )
645        fireBlocksRunningLow( peer );
646}
647
648
649static ReadState
650readBtPiece( tr_peermsgs * peer, struct evbuffer * inbuf )
651{
652    assert( peer->blockToUs.length > 0 );
653
654    if( !canDownload( peer ) )
655    {
656        peer->notListening = 1;
657        tr_peerIoSetIOMode ( peer->io, 0, EV_READ );
658        return READ_DONE;
659    }
660    else
661    {
662        /* inbuf ->  inBlock */
663        const uint32_t len = MIN( EVBUFFER_LENGTH(inbuf), peer->blockToUs.length );
664        uint8_t * tmp = tr_new( uint8_t, len );
665        tr_peerIoReadBytes( peer->io, inbuf, tmp, len );
666        evbuffer_add( peer->inBlock, tmp, len );
667        tr_free( tmp );
668        peer->blockToUs.length -= len;
669
670        if( !peer->blockToUs.length )
671        {
672            gotBlock( peer, peer->blockToUs.index,
673                            peer->blockToUs.offset,
674                            peer->inBlock );
675            evbuffer_drain( peer->outBlock, ~0 );
676            peer->state = AWAITING_BT_LENGTH;
677        }
678
679        return READ_AGAIN;
680    }
681}
682
683static ReadState
684canRead( struct bufferevent * evin, void * vpeer )
685{
686    ReadState ret;
687    tr_peermsgs * peer = (tr_peermsgs *) vpeer;
688    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
689    fprintf( stderr, "peer %p got a canRead; state is [%s]\n", peer, getStateName(peer->state) );
690
691    switch( peer->state )
692    {
693        case AWAITING_BT_LENGTH:  ret = readBtLength  ( peer, inbuf ); break;
694        case AWAITING_BT_MESSAGE: ret = readBtMessage ( peer, inbuf ); break;
695        case READING_BT_PIECE:    ret = readBtPiece   ( peer, inbuf ); break;
696        default: assert( 0 );
697    }
698    return ret;
699}
700
701/**
702***
703**/
704
705static int
706canUpload( const tr_peermsgs * peer )
707{
708    const tr_torrent * tor = peer->torrent;
709
710    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
711        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
712
713    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
714        return tr_rcCanTransfer( tor->upload );
715
716    return TRUE;
717}
718
719static int
720pulse( void * vpeer )
721{
722    tr_peermsgs * peer = (tr_peermsgs *) vpeer;
723    size_t len;
724
725    /* if we froze out a downloaded block because of speed limits,
726       start listening to the peer again */
727    if( peer->notListening )
728    {
729        fprintf( stderr, "peer %p thawing out...\n", peer );
730        peer->notListening = 0;
731        tr_peerIoSetIOMode ( peer->io, EV_READ, 0 );
732    }
733
734    if(( len = EVBUFFER_LENGTH( peer->outBlock ) ))
735    {
736        if( canUpload( peer ) )
737        {
738            const size_t outlen = MIN( len, 4096 );
739fprintf( stderr, "peer %p outblock writing %d bytes...\n", peer, (int)outlen );
740            tr_peerIoWrite( peer->io, EVBUFFER_DATA(peer->outBlock), outlen );
741            evbuffer_drain( peer->outBlock, outlen );
742
743            peer->torrent->uploadedCur += outlen;
744            tr_rcTransferred( peer->torrent->upload, outlen );
745            tr_rcTransferred( peer->handle->upload, outlen );
746        }
747    }
748    else if(( len = EVBUFFER_LENGTH( peer->outMessages ) ))
749    {
750        fprintf( stderr, "peer %p pulse is writing %d bytes worth of messages...\n", peer, (int)len );
751        tr_peerIoWriteBuf( peer->io, peer->outMessages );
752        evbuffer_drain( peer->outMessages, ~0 );
753    }
754    else if(( peer->peerAskedFor ))
755    {
756        struct peer_request * req = (struct peer_request*) peer->peerAskedFor->data;
757        uint8_t * tmp = tr_new( uint8_t, req->length );
758        const uint8_t msgid = BT_PIECE;
759        const uint32_t msglen = sizeof(uint8_t) + sizeof(uint32_t)*2 + req->length;
760fprintf( stderr, "peer %p starting to upload a block...\n", peer );
761        tr_ioRead( peer->torrent, req->index, req->offset, req->length, tmp );
762        tr_peerIoWriteUint32( peer->io, peer->outBlock, msglen );
763        tr_peerIoWriteBytes ( peer->io, peer->outBlock, &msgid, 1 );
764        tr_peerIoWriteUint32( peer->io, peer->outBlock, req->index );
765        tr_peerIoWriteUint32( peer->io, peer->outBlock, req->offset );
766        tr_peerIoWriteBytes ( peer->io, peer->outBlock, tmp, req->length );
767        tr_free( tmp );
768    }
769
770    return TRUE; /* loop forever */
771}
772
773static void
774didWrite( struct bufferevent * evin UNUSED, void * vpeer )
775{
776    pulse( (tr_peermsgs *) vpeer );
777}
778
779static void
780gotError( struct bufferevent * evbuf UNUSED, short what UNUSED, void * vpeer )
781{
782    fireGotError( (tr_peermsgs*)vpeer );
783}
784
785static void
786sendBitfield( tr_peermsgs * peer )
787{
788    const tr_bitfield * bitfield = tr_cpPieceBitfield( peer->torrent->completion );
789    const uint32_t len = sizeof(uint8_t) + bitfield->len;
790    const uint8_t bt_msgid = BT_BITFIELD;
791
792    fprintf( stderr, "peer %p: enqueueing a bitfield message\n", peer );
793    tr_peerIoWriteUint32( peer->io, peer->outMessages, len );
794    tr_peerIoWriteBytes( peer->io, peer->outMessages, &bt_msgid, 1 );
795    tr_peerIoWriteBytes( peer->io, peer->outMessages, bitfield->bits, bitfield->len );
796}
797
798/**
799***
800**/
801
802#define MAX_DIFFS 50
803
804typedef struct
805{
806    tr_pex * added;
807    tr_pex * dropped;
808    tr_pex * elements;
809    int addedCount;
810    int droppedCount;
811    int elementCount;
812    int diffCount;
813}
814PexDiffs;
815
816static void pexAddedCb( void * vpex, void * userData )
817{
818    PexDiffs * diffs = (PexDiffs *) userData;
819    tr_pex * pex = (tr_pex *) vpex;
820    if( diffs->diffCount < MAX_DIFFS )
821    {
822        diffs->diffCount++;
823        diffs->added[diffs->addedCount++] = *pex;
824        diffs->elements[diffs->elementCount++] = *pex;
825    }
826}
827
828static void pexRemovedCb( void * vpex, void * userData )
829{
830    PexDiffs * diffs = (PexDiffs *) userData;
831    tr_pex * pex = (tr_pex *) vpex;
832    if( diffs->diffCount < MAX_DIFFS )
833    {
834        diffs->diffCount++;
835        diffs->dropped[diffs->droppedCount++] = *pex;
836    }
837}
838
839static void pexElementCb( void * vpex, void * userData )
840{
841    PexDiffs * diffs = (PexDiffs *) userData;
842    tr_pex * pex = (tr_pex *) vpex;
843    if( diffs->diffCount < MAX_DIFFS )
844    {
845        diffs->diffCount++;
846        diffs->elements[diffs->elementCount++] = *pex;
847    }
848}
849
850static int
851pexPulse( void * vpeer )
852{
853    tr_peermsgs * peer = (tr_peermsgs *) vpeer;
854
855    if( peer->info->pexEnabled )
856    {
857        int i;
858        tr_pex * newPex = NULL;
859        const int newCount = tr_peerMgrGetPeers( peer->handle->peerMgr, peer->torrent->info.hash, &newPex );
860        PexDiffs diffs;
861        benc_val_t val, *added, *dropped, *flags;
862        uint8_t *tmp, *walk;
863        char * benc;
864        int bencLen;
865        const uint8_t bt_msgid = BT_LTEP;
866        const uint8_t ltep_msgid = peer->ut_pex;
867
868        /* build the diffs */
869        diffs.added = tr_new( tr_pex, newCount );
870        diffs.addedCount = 0;
871        diffs.dropped = tr_new( tr_pex, peer->pexCount );
872        diffs.droppedCount = 0;
873        diffs.elements = tr_new( tr_pex, newCount + peer->pexCount );
874        diffs.elementCount = 0;
875        diffs.diffCount = 0;
876        tr_set_compare( peer->pex, peer->pexCount,
877                        newPex, newCount,
878                        tr_pexCompare, sizeof(tr_pex),
879                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
880        fprintf( stderr, "pex: old peer count %d, new peer count %d, added %d, removed %d\n", peer->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
881
882        /* update peer */
883        tr_free( peer->pex );
884        peer->pex = diffs.elements;
885        peer->pexCount = diffs.elementCount;
886
887       
888        /* build the pex payload */
889        tr_bencInit( &val, TYPE_DICT );
890        tr_bencDictReserve( &val, 3 );
891
892        /* "added" */
893        added = tr_bencDictAdd( &val, "added" );
894        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
895        for( i=0; i<diffs.addedCount; ++i ) {
896            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
897            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
898        }
899        assert( ( walk - tmp ) == diffs.addedCount * 6 );
900        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
901
902        /* "added.f" */
903        flags = tr_bencDictAdd( &val, "added.f" );
904        tmp = walk = tr_new( uint8_t, diffs.addedCount );
905        for( i=0; i<diffs.addedCount; ++i )
906            *walk++ = diffs.added[i].flags;
907        assert( ( walk - tmp ) == diffs.addedCount );
908        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
909
910        /* "dropped" */
911        dropped = tr_bencDictAdd( &val, "dropped" );
912        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
913        for( i=0; i<diffs.droppedCount; ++i ) {
914            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
915            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
916        }
917        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
918        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
919
920        /* write the pex message */
921        benc = tr_bencSaveMalloc( &val, &bencLen );
922        tr_peerIoWriteUint32( peer->io, peer->outBlock, 1 + 1 + bencLen );
923        tr_peerIoWriteBytes ( peer->io, peer->outBlock, &bt_msgid, 1 );
924        tr_peerIoWriteBytes ( peer->io, peer->outBlock, &ltep_msgid, 1 );
925        tr_peerIoWriteBytes ( peer->io, peer->outBlock, benc, bencLen );
926
927        /* cleanup */
928        tr_free( benc );
929        tr_bencFree( &val );
930        tr_free( diffs.added );
931        tr_free( diffs.dropped );
932        tr_free( newPex );
933    }
934
935    return TRUE;
936}
937
938/**
939***
940**/
941
942tr_peermsgs*
943tr_peerMsgsNew( struct tr_torrent * torrent, struct tr_peer * info )
944{
945    tr_peermsgs * peer;
946
947    assert( info != NULL );
948    assert( info->io != NULL );
949
950    peer = tr_new0( tr_peermsgs, 1 );
951    peer->publisher = tr_publisherNew( );
952    peer->info = info;
953    peer->handle = torrent->handle;
954    peer->torrent = torrent;
955    peer->io = info->io;
956    peer->info->clientIsChoked = 1;
957    peer->info->peerIsChoked = 1;
958    peer->info->clientIsInterested = 0;
959    peer->info->peerIsInterested = 0;
960    peer->info->have = tr_bitfieldNew( torrent->info.pieceCount );
961    peer->pulseTag = tr_timerNew( peer->handle, pulse, peer, NULL, 500 );
962fprintf( stderr, "peer %p pulseTag %p\n", peer, peer->pulseTag );
963    peer->pexTag = tr_timerNew( peer->handle, pexPulse, peer, NULL, PEX_INTERVAL );
964    peer->outMessages = evbuffer_new( );
965    peer->outBlock = evbuffer_new( );
966    peer->inBlock = evbuffer_new( );
967
968    tr_peerIoSetIOFuncs( peer->io, canRead, didWrite, gotError, peer );
969    tr_peerIoSetIOMode( peer->io, EV_READ|EV_WRITE, 0 );
970
971    sendBitfield( peer );
972
973    return peer;
974}
975
976void
977tr_peerMsgsFree( tr_peermsgs* p )
978{
979    if( p != NULL )
980    {
981fprintf( stderr, "peer %p destroying its pulse tag\n", p );
982        tr_publisherFree( &p->publisher );
983        tr_timerFree( &p->pulseTag );
984        tr_timerFree( &p->pexTag );
985        evbuffer_free( p->outMessages );
986        evbuffer_free( p->outBlock );
987        evbuffer_free( p->inBlock );
988        tr_free( p );
989    }
990}
991
992tr_publisher_tag
993tr_peerMsgsSubscribe( tr_peermsgs       * peer,
994                      tr_delivery_func    func,
995                      void              * userData )
996{
997    return tr_publisherSubscribe( peer->publisher, func, userData );
998}
999
1000void
1001tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1002                        tr_publisher_tag    tag )
1003{
1004    tr_publisherUnsubscribe( peer->publisher, tag );
1005}
Note: See TracBrowser for help on using the repository browser.