source: branches/encryption/libtransmission/peer-msgs.c @ 2983

Last change on this file since 2983 was 2983, checked in by charles, 15 years ago

a little more work on requests

  • Property svn:keywords set to Date Rev Author Id
File size: 28.0 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 2983 2007-09-07 04:49:05Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <arpa/inet.h>
20
21#include <sys/types.h> /* event.h needs this */
22#include <event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "completion.h"
27#include "inout.h"
28#include "list.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ratecontrol.h"
34#include "timer.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41#define MINUTES_TO_MSEC(N) ((N) * 60 * 1000)
42
43/* pex attempts are made this frequently */
44#define PEX_INTERVAL (MINUTES_TO_MSEC(1))
45
46/* the most requests we'll batch up for this peer */
47#define MAX_OUT_REQUESTS 10
48
49enum
50{
51    BT_CHOKE           = 0,
52    BT_UNCHOKE         = 1,
53    BT_INTERESTED      = 2,
54    BT_NOT_INTERESTED  = 3,
55    BT_HAVE            = 4,
56    BT_BITFIELD        = 5,
57    BT_REQUEST         = 6,
58    BT_PIECE           = 7,
59    BT_CANCEL          = 8,
60    BT_PORT            = 9,
61    BT_LTEP            = 20,
62
63    LTEP_HANDSHAKE     = 0
64};
65
66enum
67{
68    AWAITING_BT_LENGTH,
69    AWAITING_BT_MESSAGE,
70    READING_BT_PIECE
71};
72
73static const char *
74getStateName( int state )
75{
76    switch( state )
77    {
78        case AWAITING_BT_LENGTH: return "awaiting bt length";
79        case AWAITING_BT_MESSAGE: return "awaiting bt message";
80        case READING_BT_PIECE: return "reading bt piece";
81    }
82
83    fprintf (stderr, "PeerManager::getStateName: unhandled state %d\n", state );
84    abort( );
85}
86
87struct peer_request
88{
89    uint32_t index;
90    uint32_t offset;
91    uint32_t length;
92};
93
94static int
95peer_request_compare_func( const void * va, const void * vb )
96{
97    struct peer_request * a = (struct peer_request*) va;
98    struct peer_request * b = (struct peer_request*) vb;
99    if( a->index != b->index ) return a->index - b->index;
100    if( a->offset != b->offset ) return a->offset - b->offset;
101    if( a->length != b->length ) return a->length - b->length;
102    return 0;
103}
104
105struct tr_peermsgs
106{
107    tr_peer * info;
108
109    tr_handle * handle;
110    tr_torrent * torrent;
111    tr_peerIo * io;
112
113    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
114    struct evbuffer * outBlock;    /* the block we're currently sending */
115    struct evbuffer * inBlock;     /* the block we're currently receiving */
116    tr_list * peerAskedFor;
117    tr_list * clientAskedFor;
118
119    tr_timer_tag pulseTag;
120    tr_timer_tag pexTag;
121
122    unsigned int  notListening        : 1;
123
124    struct peer_request blockToUs;
125
126    int state;
127
128    uint32_t incomingMessageLength;
129
130    uint64_t gotKeepAliveTime;
131
132    uint8_t ut_pex;
133    uint16_t listeningPort;
134
135    tr_pex * pex;
136    int pexCount;
137};
138
139/**
140***  INTEREST
141**/
142
143static int
144isPieceInteresting( const tr_peermsgs   * peer,
145                    int                   piece )
146{
147    const tr_torrent * torrent = peer->torrent;
148    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
149        return FALSE;
150    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we already have it */
151        return FALSE;
152    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
153        return FALSE;
154    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned for it */
155        return FALSE;
156    return TRUE;
157}
158
159static int
160isPeerInteresting( const tr_peermsgs * peer )
161{
162    int i;
163    const tr_torrent * torrent = peer->torrent;
164    const tr_bitfield * bitfield = tr_cpPieceBitfield( torrent->completion );
165
166    if( !peer->info->have ) /* We don't know what this peer has */
167        return FALSE;
168
169    assert( bitfield->len == peer->info->have->len );
170
171    for( i=0; i<torrent->info.pieceCount; ++i )
172        if( isPieceInteresting( peer, i ) )
173            return TRUE;
174
175    return FALSE;
176}
177
178static void
179sendInterest( tr_peermsgs * peer, int weAreInterested )
180{
181    const uint32_t len = sizeof(uint8_t);
182    const uint8_t bt_msgid = weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED;
183
184    fprintf( stderr, "peer %p: enqueueing an %s message\n", peer, (weAreInterested ? "interested" : "not interested") );
185    tr_peerIoWriteUint32( peer->io, peer->outMessages, len );
186    tr_peerIoWriteBytes( peer->io, peer->outMessages, &bt_msgid, 1 );
187}
188
189static void
190updateInterest( tr_peermsgs * peer )
191{
192    const int i = isPeerInteresting( peer );
193    if( i != peer->info->clientIsInterested )
194        sendInterest( peer, i );
195}
196
197void
198tr_peerMsgsSetChoke( tr_peermsgs * peer, int choke )
199{
200    assert( peer != NULL );
201
202    if( peer->info->peerIsChoked != !!choke )
203    {
204        const uint32_t len = sizeof(uint8_t);
205        const uint8_t bt_msgid = choke ? BT_CHOKE : BT_UNCHOKE;
206
207        peer->info->peerIsChoked = choke ? 1 : 0;
208        if( peer->info )
209        {
210            tr_list_foreach( peer->peerAskedFor, tr_free );
211            tr_list_free( &peer->peerAskedFor );
212        }
213
214        fprintf( stderr, "peer %p: enqueuing a %s message\n", peer, (choke ? "choke" : "unchoke") );
215        tr_peerIoWriteUint32( peer->io, peer->outMessages, len );
216        tr_peerIoWriteBytes( peer->io, peer->outMessages, &bt_msgid, 1 );
217    }
218}
219
220/**
221***
222**/
223
224int
225tr_peerMsgsAddRequest( tr_peermsgs * peer,
226                       uint32_t      index, 
227                       uint32_t      offset, 
228                       uint32_t      length )
229{
230    int ret =-1;
231
232    if( tr_list_size(peer->clientAskedFor) < MAX_OUT_REQUESTS )
233    {
234        const uint8_t bt_msgid = BT_REQUEST;
235        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
236        struct peer_request * req = tr_new( struct peer_request, 1 );
237
238        tr_peerIoWriteUint32( peer->io, peer->outMessages, len );
239        tr_peerIoWriteBytes( peer->io, peer->outMessages, &bt_msgid, 1 );
240        tr_peerIoWriteUint32( peer->io, peer->outMessages, index );
241        tr_peerIoWriteUint32( peer->io, peer->outMessages, offset );
242        tr_peerIoWriteUint32( peer->io, peer->outMessages, length );
243        fprintf( stderr, "peer %p: requesting a block from piece %u, offset %u, length %u\n",
244                         peer, (unsigned int)index, (unsigned int)offset, (unsigned int)length );
245
246        req->index = index;
247        req->offset = offset;
248        req->length = length;
249        tr_list_append( &peer->clientAskedFor, req );
250
251        ret = 0;
252    }
253
254    return ret;
255}
256
257/**
258***
259**/
260
261static void
262parseLtepHandshake( tr_peermsgs * peer, int len, struct evbuffer * inbuf )
263{
264    benc_val_t val, * sub;
265    uint8_t * tmp = tr_new( uint8_t, len );
266    evbuffer_remove( inbuf, tmp, len );
267
268    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
269        fprintf( stderr, "GET  extended-handshake, couldn't get dictionary\n" );
270        tr_free( tmp );
271        return;
272    }
273
274    tr_bencPrint( &val );
275
276    /* check supported messages for utorrent pex */
277    sub = tr_bencDictFind( &val, "m" );
278    if( tr_bencIsDict( sub ) ) {
279        sub = tr_bencDictFind( sub, "ut_pex" );
280        if( tr_bencIsInt( sub ) ) {
281            peer->ut_pex = (uint8_t) sub->val.i;
282            fprintf( stderr, "peer->ut_pex is %d\n", peer->ut_pex );
283        }
284    }
285
286    /* get peer's client name */
287    sub = tr_bencDictFind( &val, "v" );
288    if( tr_bencIsStr( sub ) ) {
289int i;
290        tr_free( peer->info->client );
291        fprintf( stderr, "dictionary says client is [%s]\n", sub->val.s.s );
292        peer->info->client = tr_strndup( sub->val.s.s, sub->val.s.i );
293for( i=0; i<sub->val.s.i; ++i ) { fprintf( stderr, "[%c] (%d)\n", sub->val.s.s[i], (int)sub->val.s.s[i] );
294                                  if( (int)peer->info->client[i]==-75 ) peer->info->client[i]='u'; }
295        fprintf( stderr, "peer->client is now [%s]\n", peer->info->client );
296    }
297
298    /* get peer's listening port */
299    sub = tr_bencDictFind( &val, "p" );
300    if( tr_bencIsInt( sub ) ) {
301        peer->listeningPort = htons( (uint16_t)sub->val.i );
302        fprintf( stderr, "peer->port is now %hd\n", peer->listeningPort );
303    }
304
305    tr_bencFree( &val );
306    tr_free( tmp );
307}
308
309static void
310parseUtPex( tr_peermsgs * peer, int msglen, struct evbuffer * inbuf )
311{
312    benc_val_t val, * sub;
313    uint8_t * tmp;
314
315    if( !peer->info->pexEnabled ) /* no sharing! */
316        return;
317
318    tmp = tr_new( uint8_t, msglen );
319    evbuffer_remove( inbuf, tmp, msglen );
320
321    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
322        fprintf( stderr, "GET can't read extended-pex dictionary\n" );
323        tr_free( tmp );
324        return;
325    }
326
327    sub = tr_bencDictFind( &val, "added" );
328    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
329        const int n = sub->val.s.i / 6 ;
330        fprintf( stderr, "got %d peers from uT pex\n", n );
331        tr_peerMgrAddPeers( peer->handle->peerMgr,
332                            peer->torrent->info.hash,
333                            TR_PEER_FROM_PEX,
334                            (uint8_t*)sub->val.s.s, n );
335    }
336
337    tr_bencFree( &val );
338    tr_free( tmp );
339}
340
341static void
342parseLtep( tr_peermsgs * peer, int msglen, struct evbuffer * inbuf )
343{
344    uint8_t ltep_msgid;
345
346    tr_peerIoReadBytes( peer->io, inbuf, &ltep_msgid, 1 );
347    msglen--;
348
349    if( ltep_msgid == LTEP_HANDSHAKE )
350    {
351        fprintf( stderr, "got ltep handshake\n" );
352        parseLtepHandshake( peer, msglen, inbuf );
353    }
354    else if( ltep_msgid == peer->ut_pex )
355    {
356        fprintf( stderr, "got ut pex\n" );
357        parseUtPex( peer, msglen, inbuf );
358    }
359    else
360    {
361        fprintf( stderr, "skipping unknown ltep message (%d)\n", (int)ltep_msgid );
362        evbuffer_drain( inbuf, msglen );
363    }
364}
365
366static int
367readBtLength( tr_peermsgs * peer, struct evbuffer * inbuf )
368{
369    uint32_t len;
370    const size_t needlen = sizeof(uint32_t);
371
372    if( EVBUFFER_LENGTH(inbuf) < needlen )
373        return READ_MORE;
374
375    tr_peerIoReadUint32( peer->io, inbuf, &len );
376
377    if( len == 0 ) { /* peer sent us a keepalive message */
378        fprintf( stderr, "peer sent us a keepalive message...\n" );
379        peer->gotKeepAliveTime = tr_date( );
380    } else {
381        fprintf( stderr, "peer is sending us a message with %d bytes...\n", (int)len );
382        peer->incomingMessageLength = len;
383        peer->state = AWAITING_BT_MESSAGE;
384    } return READ_AGAIN;
385}
386
387static int
388readBtMessage( tr_peermsgs * peer, struct evbuffer * inbuf )
389{
390    uint8_t id;
391    uint32_t ui32;
392    size_t msglen = peer->incomingMessageLength;
393
394    if( EVBUFFER_LENGTH(inbuf) < msglen )
395        return READ_MORE;
396
397    tr_peerIoReadBytes( peer->io, inbuf, &id, 1 );
398    msglen--;
399    fprintf( stderr, "got a message from the peer... "
400                     "bt id number is %d, and remaining len is %d\n", (int)id, (int)msglen );
401
402    switch( id )
403    {
404        case BT_CHOKE:
405            assert( msglen == 0 );
406            fprintf( stderr, "got a BT_CHOKE\n" );
407            peer->info->clientIsChoked = 1;
408            tr_list_foreach( peer->peerAskedFor, tr_free );
409            tr_list_free( &peer->peerAskedFor );
410            /* FIXME: maybe choke them */
411            /* FIXME: unmark anything we'd requested from them... */
412            break;
413
414        case BT_UNCHOKE:
415            assert( msglen == 0 );
416            fprintf( stderr, "got a BT_UNCHOKE\n" );
417            peer->info->clientIsChoked = 0;
418            /* FIXME: maybe unchoke them */
419            /* FIXME: maybe send them requests */
420            break;
421
422        case BT_INTERESTED:
423            assert( msglen == 0 );
424            fprintf( stderr, "got a BT_INTERESTED\n" );
425            peer->info->peerIsInterested = 1;
426            /* FIXME: maybe unchoke them */
427            break;
428
429        case BT_NOT_INTERESTED:
430            assert( msglen == 0 );
431            fprintf( stderr, "got a BT_NOT_INTERESTED\n" );
432            peer->info->peerIsInterested = 0;
433            /* FIXME: maybe choke them */
434            break;
435
436        case BT_HAVE:
437            assert( msglen == 4 );
438            fprintf( stderr, "got a BT_HAVE\n" );
439            tr_peerIoReadUint32( peer->io, inbuf, &ui32 );
440            tr_bitfieldAdd( peer->info->have, ui32 );
441            peer->info->progress = tr_bitfieldCountTrueBits( peer->info->have ) / (float)peer->torrent->info.pieceCount;
442            updateInterest( peer );
443            break;
444
445        case BT_BITFIELD:
446            assert( msglen == peer->info->have->len );
447            fprintf( stderr, "got a BT_BITFIELD\n" );
448            tr_peerIoReadBytes( peer->io, inbuf, peer->info->have->bits, msglen );
449            peer->info->progress = tr_bitfieldCountTrueBits( peer->info->have ) / (float)peer->torrent->info.pieceCount;
450            fprintf( stderr, "peer progress is %f\n", peer->info->progress );
451            updateInterest( peer );
452            /* FIXME: maybe unchoke */
453            break;
454
455        case BT_REQUEST: {
456            struct peer_request * req;
457            assert( msglen == 12 );
458            fprintf( stderr, "got a BT_REQUEST\n" );
459            req = tr_new( struct peer_request, 1 );
460            tr_peerIoReadUint32( peer->io, inbuf, &req->index );
461            tr_peerIoReadUint32( peer->io, inbuf, &req->offset );
462            tr_peerIoReadUint32( peer->io, inbuf, &req->length );
463            if( !peer->info->peerIsChoked )
464                tr_list_append( &peer->peerAskedFor, req );
465            break;
466        }
467
468        case BT_CANCEL: {
469            struct peer_request req;
470            tr_list * node;
471            assert( msglen == 12 );
472            fprintf( stderr, "got a BT_CANCEL\n" );
473            tr_peerIoReadUint32( peer->io, inbuf, &req.index );
474            tr_peerIoReadUint32( peer->io, inbuf, &req.offset );
475            tr_peerIoReadUint32( peer->io, inbuf, &req.length );
476            node = tr_list_find( peer->peerAskedFor, &req, peer_request_compare_func );
477            if( node != NULL ) {
478                fprintf( stderr, "found the req that peer is cancelling... cancelled.\n" );
479                tr_list_remove_data( &peer->peerAskedFor, node->data );
480            }
481            break;
482        }
483
484        case BT_PIECE: {
485            fprintf( stderr, "got a BT_PIECE\n" );
486            assert( peer->blockToUs.length == 0 );
487            peer->state = READING_BT_PIECE;
488            tr_peerIoReadUint32( peer->io, inbuf, &peer->blockToUs.index );
489            tr_peerIoReadUint32( peer->io, inbuf, &peer->blockToUs.offset );
490            peer->blockToUs.length = msglen - 8;
491            assert( peer->blockToUs.length > 0 );
492            evbuffer_drain( peer->inBlock, ~0 );
493            break;
494        }
495
496        case BT_PORT: {
497            assert( msglen == 2 );
498            fprintf( stderr, "got a BT_PORT\n" );
499            tr_peerIoReadUint16( peer->io, inbuf, &peer->listeningPort );
500            break;
501        }
502
503        case BT_LTEP:
504            fprintf( stderr, "got a BT_LTEP\n" );
505            parseLtep( peer, msglen, inbuf );
506            break;
507
508        default:
509            fprintf( stderr, "got an unknown BT message type: %d\n", (int)id );
510            tr_peerIoDrain( peer->io, inbuf, msglen );
511            assert( 0 );
512    }
513
514    peer->incomingMessageLength = -1;
515    peer->state = AWAITING_BT_LENGTH;
516    return READ_AGAIN;
517}
518
519static int
520canDownload( const tr_peermsgs * peer )
521{
522    tr_torrent * tor = peer->torrent;
523
524#if 0
525    /* FIXME: was swift worth it?  did anyone notice a difference? */
526    if( SWIFT_ENABLED && !isSeeding && (peer->credit<0) )
527        return FALSE;
528#endif
529
530    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
531        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
532
533    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
534        return tr_rcCanTransfer( tor->download );
535
536    return TRUE;
537}
538
539static int
540weAskedForThisBlock( const tr_peermsgs * peer, uint32_t index, uint32_t offset, uint32_t length )
541{
542    struct peer_request tmp;
543    tmp.index = index;
544    tmp.offset = offset;
545    tmp.length = length;
546
547    return tr_list_find( peer->clientAskedFor, &tmp, peer_request_compare_func ) != NULL;
548}
549
550static void
551gotBlock( tr_peermsgs * peer, int index, int offset, struct evbuffer * inbuf )
552{
553    tr_torrent * tor = peer->torrent;
554    const size_t len = EVBUFFER_LENGTH( inbuf );
555    const int block = _tr_block( tor, index, offset );
556
557    /* sanity clause */
558    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
559        tr_dbg( "have this block already..." );
560        return;
561    }
562    if( (int)len != tr_torBlockCountBytes( tor, block ) ) {
563        tr_dbg( "block is the wrong length..." );
564        return;
565    }
566    if( !weAskedForThisBlock( peer, index, offset, len ) ) {
567        tr_dbg( "we didn't ask the peer for this message..." );
568        return;
569    }
570
571    /* write to disk */
572    if( tr_ioWrite( tor, index, offset, len, EVBUFFER_DATA( inbuf )))
573        return;
574
575    /* make a note that this peer helped us with this piece */
576    if( !peer->info->blame )
577         peer->info->blame = tr_bitfieldNew( tor->info.pieceCount );
578    tr_bitfieldAdd( peer->info->blame, index );
579
580    tr_cpBlockAdd( tor->completion, block );
581
582    tor->downloadedCur += len;
583    tr_rcTransferred( tor->download, len );
584    tr_rcTransferred( tor->handle->download, len );
585}
586
587
588static ReadState
589readBtPiece( tr_peermsgs * peer, struct evbuffer * inbuf )
590{
591    assert( peer->blockToUs.length > 0 );
592
593    if( !canDownload( peer ) )
594    {
595        peer->notListening = 1;
596        tr_peerIoSetIOMode ( peer->io, 0, EV_READ );
597        return READ_DONE;
598    }
599    else
600    {
601        /* inbuf ->  inBlock */
602        const uint32_t len = MIN( EVBUFFER_LENGTH(inbuf), peer->blockToUs.length );
603        uint8_t * tmp = tr_new( uint8_t, len );
604        tr_peerIoReadBytes( peer->io, inbuf, tmp, len );
605        evbuffer_add( peer->inBlock, tmp, len );
606        tr_free( tmp );
607        peer->blockToUs.length -= len;
608
609        if( !peer->blockToUs.length )
610        {
611            gotBlock( peer, peer->blockToUs.index,
612                            peer->blockToUs.offset,
613                            peer->inBlock );
614            evbuffer_drain( peer->outBlock, ~0 );
615            peer->state = AWAITING_BT_LENGTH;
616        }
617
618        return READ_AGAIN;
619    }
620}
621
622static ReadState
623canRead( struct bufferevent * evin, void * vpeer )
624{
625    ReadState ret;
626    tr_peermsgs * peer = (tr_peermsgs *) vpeer;
627    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
628    fprintf( stderr, "peer %p got a canRead; state is [%s]\n", peer, getStateName(peer->state) );
629
630    switch( peer->state )
631    {
632        case AWAITING_BT_LENGTH:  ret = readBtLength  ( peer, inbuf ); break;
633        case AWAITING_BT_MESSAGE: ret = readBtMessage ( peer, inbuf ); break;
634        case READING_BT_PIECE:    ret = readBtPiece   ( peer, inbuf ); break;
635        default: assert( 0 );
636    }
637    return ret;
638}
639
640/**
641***
642**/
643
644static int
645canUpload( const tr_peermsgs * peer )
646{
647    const tr_torrent * tor = peer->torrent;
648
649    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
650        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
651
652    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
653        return tr_rcCanTransfer( tor->upload );
654
655    return TRUE;
656}
657
658static int
659pulse( void * vpeer )
660{
661    tr_peermsgs * peer = (tr_peermsgs *) vpeer;
662    size_t len;
663
664fprintf( stderr, "peer %p pulse... notlistening %d, outblock size: %d, outmessages size %d, peerAskedFor %p\n",
665         vpeer,
666         (int)peer->notListening,
667         (int)EVBUFFER_LENGTH( peer->outBlock ),
668         (int)EVBUFFER_LENGTH( peer->outMessages ),
669         peer->peerAskedFor );
670
671    /* if we froze out a downloaded block because of speed limits,
672       start listening to the peer again */
673    if( peer->notListening )
674    {
675        fprintf( stderr, "peer %p thawing out...\n", peer );
676        peer->notListening = 0;
677        tr_peerIoSetIOMode ( peer->io, EV_READ, 0 );
678    }
679
680    if(( len = EVBUFFER_LENGTH( peer->outBlock ) ))
681    {
682fprintf( stderr, "peer %p needing to upload... canUpload %d\n", peer, canUpload(peer) );
683        if( canUpload( peer ) )
684        {
685            const size_t outlen = MIN( len, 2048 );
686fprintf( stderr, "peer %p writing %d bytes...\n", peer, (int)outlen );
687            tr_peerIoWrite( peer->io, EVBUFFER_DATA(peer->outBlock), outlen );
688            evbuffer_drain( peer->outBlock, outlen );
689
690            peer->torrent->uploadedCur += outlen;
691            tr_rcTransferred( peer->torrent->upload, outlen );
692            tr_rcTransferred( peer->handle->upload, outlen );
693        }
694    }
695    else if(( len = EVBUFFER_LENGTH( peer->outMessages ) ))
696    {
697        fprintf( stderr, "peer %p pulse is writing %d bytes worth of messages...\n", peer, (int)len );
698        tr_peerIoWriteBuf( peer->io, peer->outMessages );
699        evbuffer_drain( peer->outMessages, ~0 );
700    }
701    else if(( peer->peerAskedFor ))
702    {
703        struct peer_request * req = (struct peer_request*) peer->peerAskedFor->data;
704        uint8_t * tmp = tr_new( uint8_t, req->length );
705        const uint8_t msgid = BT_PIECE;
706        const uint32_t msglen = sizeof(uint8_t) + sizeof(uint32_t)*2 + req->length;
707fprintf( stderr, "peer %p starting to upload a block...\n", peer );
708        tr_ioRead( peer->torrent, req->index, req->offset, req->length, tmp );
709        tr_peerIoWriteUint32( peer->io, peer->outBlock, msglen );
710        tr_peerIoWriteBytes ( peer->io, peer->outBlock, &msgid, 1 );
711        tr_peerIoWriteUint32( peer->io, peer->outBlock, req->index );
712        tr_peerIoWriteUint32( peer->io, peer->outBlock, req->offset );
713        tr_peerIoWriteBytes ( peer->io, peer->outBlock, tmp, req->length );
714        tr_free( tmp );
715    }
716
717    return TRUE; /* loop forever */
718}
719
720static void
721didWrite( struct bufferevent * evin UNUSED, void * vpeer )
722{
723    tr_peermsgs * peer = (tr_peermsgs *) vpeer;
724    fprintf( stderr, "peer %p got a didWrite...\n", peer );
725    pulse( vpeer );
726}
727
728static void
729gotError( struct bufferevent * evbuf UNUSED, short what, void * vpeer )
730{
731    tr_peermsgs * peer = (tr_peermsgs *) vpeer;
732    fprintf( stderr, "peer %p got an error in %d\n", peer, (int)what );
733}
734
735static void
736sendBitfield( tr_peermsgs * peer )
737{
738    const tr_bitfield * bitfield = tr_cpPieceBitfield( peer->torrent->completion );
739    const uint32_t len = sizeof(uint8_t) + bitfield->len;
740    const uint8_t bt_msgid = BT_BITFIELD;
741
742    fprintf( stderr, "peer %p: enqueueing a bitfield message\n", peer );
743    tr_peerIoWriteUint32( peer->io, peer->outMessages, len );
744    tr_peerIoWriteBytes( peer->io, peer->outMessages, &bt_msgid, 1 );
745    tr_peerIoWriteBytes( peer->io, peer->outMessages, bitfield->bits, bitfield->len );
746}
747
748/**
749***
750**/
751
752#define MAX_DIFFS 50
753
754typedef struct
755{
756    tr_pex * added;
757    tr_pex * dropped;
758    tr_pex * elements;
759    int addedCount;
760    int droppedCount;
761    int elementCount;
762    int diffCount;
763}
764PexDiffs;
765
766static void pexAddedCb( void * vpex, void * userData )
767{
768    PexDiffs * diffs = (PexDiffs *) userData;
769    tr_pex * pex = (tr_pex *) vpex;
770    if( diffs->diffCount < MAX_DIFFS )
771    {
772        diffs->diffCount++;
773        diffs->added[diffs->addedCount++] = *pex;
774        diffs->elements[diffs->elementCount++] = *pex;
775    }
776}
777
778static void pexRemovedCb( void * vpex, void * userData )
779{
780    PexDiffs * diffs = (PexDiffs *) userData;
781    tr_pex * pex = (tr_pex *) vpex;
782    if( diffs->diffCount < MAX_DIFFS )
783    {
784        diffs->diffCount++;
785        diffs->dropped[diffs->droppedCount++] = *pex;
786    }
787}
788
789static void pexElementCb( void * vpex, void * userData )
790{
791    PexDiffs * diffs = (PexDiffs *) userData;
792    tr_pex * pex = (tr_pex *) vpex;
793    if( diffs->diffCount < MAX_DIFFS )
794    {
795        diffs->diffCount++;
796        diffs->elements[diffs->elementCount++] = *pex;
797    }
798}
799
800static int
801pexPulse( void * vpeer )
802{
803    tr_peermsgs * peer = (tr_peermsgs *) vpeer;
804
805    if( peer->info->pexEnabled )
806    {
807        int i;
808        tr_pex * newPex = NULL;
809        const int newCount = tr_peerMgrGetPeers( peer->handle->peerMgr, peer->torrent->info.hash, &newPex );
810        PexDiffs diffs;
811        benc_val_t val, *added, *dropped, *flags;
812        uint8_t *tmp, *walk;
813        char * benc;
814        int bencLen;
815        const uint8_t bt_msgid = BT_LTEP;
816        const uint8_t ltep_msgid = peer->ut_pex;
817
818        /* build the diffs */
819        diffs.added = tr_new( tr_pex, newCount );
820        diffs.addedCount = 0;
821        diffs.dropped = tr_new( tr_pex, peer->pexCount );
822        diffs.droppedCount = 0;
823        diffs.elements = tr_new( tr_pex, newCount + peer->pexCount );
824        diffs.elementCount = 0;
825        diffs.diffCount = 0;
826        tr_set_compare( peer->pex, peer->pexCount,
827                        newPex, newCount,
828                        tr_pexCompare, sizeof(tr_pex),
829                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
830        fprintf( stderr, "pex: old peer count %d, new peer count %d, added %d, removed %d\n", peer->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
831
832        /* update peer */
833        tr_free( peer->pex );
834        peer->pex = diffs.elements;
835        peer->pexCount = diffs.elementCount;
836
837       
838        /* build the pex payload */
839        tr_bencInit( &val, TYPE_DICT );
840        tr_bencDictReserve( &val, 3 );
841
842        /* "added" */
843        added = tr_bencDictAdd( &val, "added" );
844        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
845        for( i=0; i<diffs.addedCount; ++i ) {
846            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
847            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
848        }
849        assert( ( walk - tmp ) == diffs.addedCount * 6 );
850        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
851
852        /* "added.f" */
853        flags = tr_bencDictAdd( &val, "added.f" );
854        tmp = walk = tr_new( uint8_t, diffs.addedCount );
855        for( i=0; i<diffs.addedCount; ++i )
856            *walk++ = diffs.added[i].flags;
857        assert( ( walk - tmp ) == diffs.addedCount );
858        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
859
860        /* "dropped" */
861        dropped = tr_bencDictAdd( &val, "dropped" );
862        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
863        for( i=0; i<diffs.droppedCount; ++i ) {
864            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
865            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
866        }
867        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
868        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
869
870        /* write the pex message */
871        benc = tr_bencSaveMalloc( &val, &bencLen );
872        tr_peerIoWriteUint32( peer->io, peer->outBlock, 1 + 1 + bencLen );
873        tr_peerIoWriteBytes ( peer->io, peer->outBlock, &bt_msgid, 1 );
874        tr_peerIoWriteBytes ( peer->io, peer->outBlock, &ltep_msgid, 1 );
875        tr_peerIoWriteBytes ( peer->io, peer->outBlock, benc, bencLen );
876
877        /* cleanup */
878        tr_free( benc );
879        tr_bencFree( &val );
880        tr_free( diffs.added );
881        tr_free( diffs.dropped );
882        tr_free( newPex );
883    }
884
885    return TRUE;
886}
887
888/**
889***
890**/
891
892tr_peermsgs*
893tr_peerMsgsNew( struct tr_torrent * torrent, struct tr_peer * info )
894{
895    tr_peermsgs * peer;
896
897    assert( info != NULL );
898    assert( info->io != NULL );
899
900    peer = tr_new0( tr_peermsgs, 1 );
901    peer->info = info;
902    peer->handle = torrent->handle;
903    peer->torrent = torrent;
904    peer->io = info->io;
905    peer->info->clientIsChoked = 1;
906    peer->info->peerIsChoked = 1;
907    peer->info->clientIsInterested = 0;
908    peer->info->peerIsInterested = 0;
909    peer->info->have = tr_bitfieldNew( torrent->info.pieceCount );
910    peer->pulseTag = tr_timerNew( peer->handle, pulse, peer, NULL, 500 );
911fprintf( stderr, "peer %p pulseTag %p\n", peer, peer->pulseTag );
912    peer->pexTag = tr_timerNew( peer->handle, pexPulse, peer, NULL, PEX_INTERVAL );
913    peer->outMessages = evbuffer_new( );
914    peer->outBlock = evbuffer_new( );
915    peer->inBlock = evbuffer_new( );
916
917    tr_peerIoSetIOFuncs( peer->io, canRead, didWrite, gotError, peer );
918    tr_peerIoSetIOMode( peer->io, EV_READ|EV_WRITE, 0 );
919
920    sendBitfield( peer );
921
922    return peer;
923}
924
925void
926tr_peerMsgsFree( tr_peermsgs* p )
927{
928    if( p != NULL )
929    {
930fprintf( stderr, "peer %p destroying its pulse tag\n", p );
931        tr_timerFree( &p->pulseTag );
932        tr_timerFree( &p->pexTag );
933        evbuffer_free( p->outMessages );
934        evbuffer_free( p->outBlock );
935        evbuffer_free( p->inBlock );
936        tr_free( p );
937    }
938}
Note: See TracBrowser for help on using the repository browser.