source: branches/encryption/libtransmission/peer-msgs.c @ 2982

Last change on this file since 2982 was 2982, checked in by charles, 15 years ago

requests

  • Property svn:keywords set to Date Rev Author Id
File size: 27.8 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 2982 2007-09-06 21:00:39Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <arpa/inet.h>
20
21#include <sys/types.h> /* event.h needs this */
22#include <event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "completion.h"
27#include "inout.h"
28#include "list.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ratecontrol.h"
34#include "timer.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41#define MINUTES_TO_MSEC(N) ((N) * 60 * 1000)
42
43/* pex attempts are made this frequently */
44#define PEX_INTERVAL (MINUTES_TO_MSEC(1))
45
46/* the most requests we'll batch up for this peer */
47#define MAX_OUT_REQUESTS 10
48
49enum
50{
51    BT_CHOKE           = 0,
52    BT_UNCHOKE         = 1,
53    BT_INTERESTED      = 2,
54    BT_NOT_INTERESTED  = 3,
55    BT_HAVE            = 4,
56    BT_BITFIELD        = 5,
57    BT_REQUEST         = 6,
58    BT_PIECE           = 7,
59    BT_CANCEL          = 8,
60    BT_PORT            = 9,
61    BT_LTEP            = 20,
62
63    LTEP_HANDSHAKE     = 0
64};
65
66enum
67{
68    AWAITING_BT_LENGTH,
69    AWAITING_BT_MESSAGE,
70    READING_BT_PIECE
71};
72
73static const char *
74getStateName( int state )
75{
76    switch( state )
77    {
78        case AWAITING_BT_LENGTH: return "awaiting bt length";
79        case AWAITING_BT_MESSAGE: return "awaiting bt message";
80        case READING_BT_PIECE: return "reading bt piece";
81    }
82
83    fprintf (stderr, "PeerManager::getStateName: unhandled state %d\n", state );
84    abort( );
85}
86
87struct peer_request
88{
89    uint32_t index;
90    uint32_t offset;
91    uint32_t length;
92};
93
94static int
95peer_request_compare_func( const void * va, const void * vb )
96{
97    struct peer_request * a = (struct peer_request*) va;
98    struct peer_request * b = (struct peer_request*) vb;
99    if( a->index != b->index ) return a->index - b->index;
100    if( a->offset != b->offset ) return a->offset - b->offset;
101    if( a->length != b->length ) return a->length - b->length;
102    return 0;
103}
104
105struct tr_peermsgs
106{
107    tr_peer * info;
108
109    tr_handle * handle;
110    tr_torrent * torrent;
111    tr_peerIo * io;
112
113    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
114    struct evbuffer * outBlock;    /* the block we're currently sending */
115    struct evbuffer * inBlock;     /* the block we're currently receiving */
116    tr_list * peerAskedFor;
117    tr_list * clientAskedFor;
118
119    tr_timer_tag pulseTag;
120    tr_timer_tag pexTag;
121
122    unsigned int  notListening        : 1;
123
124    struct peer_request blockToUs;
125
126    int state;
127
128    uint32_t incomingMessageLength;
129
130    uint64_t gotKeepAliveTime;
131
132    uint8_t ut_pex;
133    uint16_t listeningPort;
134
135    tr_pex * pex;
136    int pexCount;
137};
138
139/**
140***  INTEREST
141**/
142
143static int
144isPieceInteresting( const tr_peermsgs   * peer,
145                    int                   piece )
146{
147    const tr_torrent * torrent = peer->torrent;
148    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
149        return FALSE;
150    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we already have it */
151        return FALSE;
152    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
153        return FALSE;
154    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned for it */
155        return FALSE;
156    return TRUE;
157}
158
159static int
160isInteresting( const tr_peermsgs * peer )
161{
162    int i;
163    const tr_torrent * torrent = peer->torrent;
164    const tr_bitfield * bitfield = tr_cpPieceBitfield( torrent->completion );
165
166    if( !peer->info->have ) /* We don't know what this peer has */
167        return FALSE;
168
169    assert( bitfield->len == peer->info->have->len );
170
171    for( i=0; i<torrent->info.pieceCount; ++i )
172        if( isPieceInteresting( peer, i ) )
173            return TRUE;
174
175    return FALSE;
176}
177
178static void
179sendInterest( tr_peermsgs * peer, int weAreInterested )
180{
181    const uint32_t len = sizeof(uint8_t);
182    const uint8_t bt_msgid = weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED;
183
184    fprintf( stderr, "peer %p: enqueueing an %s message\n", peer, (weAreInterested ? "interested" : "not interested") );
185    tr_peerIoWriteUint32( peer->io, peer->outMessages, len );
186    tr_peerIoWriteBytes( peer->io, peer->outMessages, &bt_msgid, 1 );
187}
188
189static void
190updateInterest( tr_peermsgs * peer )
191{
192    const int i = isInteresting( peer );
193    if( i != peer->info->clientIsInterested )
194        sendInterest( peer, i );
195}
196
197void
198tr_peerMsgsSetChoke( tr_peermsgs * peer, int choke )
199{
200    assert( peer != NULL );
201
202    if( peer->info->peerIsChoked != !!choke )
203    {
204        const uint32_t len = sizeof(uint8_t);
205        const uint8_t bt_msgid = choke ? BT_CHOKE : BT_UNCHOKE;
206
207        peer->info->peerIsChoked = choke ? 1 : 0;
208        if( peer->info )
209        {
210            tr_list_foreach( peer->peerAskedFor, tr_free );
211            tr_list_free( &peer->peerAskedFor );
212        }
213
214        fprintf( stderr, "peer %p: enqueuing a %s message\n", peer, (choke ? "choke" : "unchoke") );
215        tr_peerIoWriteUint32( peer->io, peer->outMessages, len );
216        tr_peerIoWriteBytes( peer->io, peer->outMessages, &bt_msgid, 1 );
217    }
218}
219
220/**
221***
222**/
223
224int
225tr_peerMsgsAddRequest( tr_peermsgs * peer,
226                       uint32_t      index, 
227                       uint32_t      begin, 
228                       uint32_t      length )
229{
230    int ret =-1;
231
232    if( tr_list_size(peer->clientAskedFor) < MAX_OUT_REQUESTS )
233    {
234        const uint8_t bt_msgid = BT_REQUEST;
235        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
236        struct peer_request * req = tr_new( peer_request, 1 );
237
238        tr_peerIoWriteUint32( peer->io, peer->outMessages, len );
239        tr_peerIoWriteBytes( peer->io, peer->outMessages, &bt_msgid, 1 );
240        tr_peerIoWriteUint32( peer->io, peer->outMessages, index );
241        tr_peerIoWriteUint32( peer->io, peer->outMessages, begin );
242        tr_peerIoWriteUint32( peer->io, peer->outMessages, length );
243        fprintf( stderr, "peer %p: requesting a block from piece %u, begin %u, length %u\n",
244                         peer, (unsigned int)index, (unsigned int)begin, (unsigned int)length );
245
246        req->index = index;
247        req->begin = begin;
248        req->length = length;
249        tr_list_append( &peer->clientAskedFor, req );
250
251        ret = 0;
252    }
253
254    return ret;
255}
256
257/**
258***
259**/
260
261static void
262parseLtepHandshake( tr_peermsgs * peer, int len, struct evbuffer * inbuf )
263{
264    benc_val_t val, * sub;
265    uint8_t * tmp = tr_new( uint8_t, len );
266    evbuffer_remove( inbuf, tmp, len );
267
268    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
269        fprintf( stderr, "GET  extended-handshake, couldn't get dictionary\n" );
270        tr_free( tmp );
271        return;
272    }
273
274    tr_bencPrint( &val );
275
276    /* check supported messages for utorrent pex */
277    sub = tr_bencDictFind( &val, "m" );
278    if( tr_bencIsDict( sub ) ) {
279        sub = tr_bencDictFind( sub, "ut_pex" );
280        if( tr_bencIsInt( sub ) ) {
281            peer->ut_pex = (uint8_t) sub->val.i;
282            fprintf( stderr, "peer->ut_pex is %d\n", peer->ut_pex );
283        }
284    }
285
286    /* get peer's client name */
287    sub = tr_bencDictFind( &val, "v" );
288    if( tr_bencIsStr( sub ) ) {
289int i;
290        tr_free( peer->info->client );
291        fprintf( stderr, "dictionary says client is [%s]\n", sub->val.s.s );
292        peer->info->client = tr_strndup( sub->val.s.s, sub->val.s.i );
293for( i=0; i<sub->val.s.i; ++i ) { fprintf( stderr, "[%c] (%d)\n", sub->val.s.s[i], (int)sub->val.s.s[i] );
294                                  if( (int)peer->info->client[i]==-75 ) peer->info->client[i]='u'; }
295        fprintf( stderr, "peer->client is now [%s]\n", peer->info->client );
296    }
297
298    /* get peer's listening port */
299    sub = tr_bencDictFind( &val, "p" );
300    if( tr_bencIsInt( sub ) ) {
301        peer->listeningPort = htons( (uint16_t)sub->val.i );
302        fprintf( stderr, "peer->port is now %hd\n", peer->listeningPort );
303    }
304
305    tr_bencFree( &val );
306    tr_free( tmp );
307}
308
309static void
310parseUtPex( tr_peermsgs * peer, int msglen, struct evbuffer * inbuf )
311{
312    benc_val_t val, * sub;
313    uint8_t * tmp;
314
315    if( !peer->info->pexEnabled ) /* no sharing! */
316        return;
317
318    tmp = tr_new( uint8_t, msglen );
319    evbuffer_remove( inbuf, tmp, msglen );
320
321    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
322        fprintf( stderr, "GET can't read extended-pex dictionary\n" );
323        tr_free( tmp );
324        return;
325    }
326
327    sub = tr_bencDictFind( &val, "added" );
328    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
329        const int n = sub->val.s.i / 6 ;
330        fprintf( stderr, "got %d peers from uT pex\n", n );
331        tr_peerMgrAddPeers( peer->handle->peerMgr,
332                            peer->torrent->info.hash,
333                            TR_PEER_FROM_PEX,
334                            (uint8_t*)sub->val.s.s, n );
335    }
336
337    tr_bencFree( &val );
338    tr_free( tmp );
339}
340
341static void
342parseLtep( tr_peermsgs * peer, int msglen, struct evbuffer * inbuf )
343{
344    uint8_t ltep_msgid;
345
346    tr_peerIoReadBytes( peer->io, inbuf, &ltep_msgid, 1 );
347    msglen--;
348
349    if( ltep_msgid == LTEP_HANDSHAKE )
350    {
351        fprintf( stderr, "got ltep handshake\n" );
352        parseLtepHandshake( peer, msglen, inbuf );
353    }
354    else if( ltep_msgid == peer->ut_pex )
355    {
356        fprintf( stderr, "got ut pex\n" );
357        parseUtPex( peer, msglen, inbuf );
358    }
359    else
360    {
361        fprintf( stderr, "skipping unknown ltep message (%d)\n", (int)ltep_msgid );
362        evbuffer_drain( inbuf, msglen );
363    }
364}
365
366static int
367readBtLength( tr_peermsgs * peer, struct evbuffer * inbuf )
368{
369    uint32_t len;
370    const size_t needlen = sizeof(uint32_t);
371
372    if( EVBUFFER_LENGTH(inbuf) < needlen )
373        return READ_MORE;
374
375    tr_peerIoReadUint32( peer->io, inbuf, &len );
376
377    if( len == 0 ) { /* peer sent us a keepalive message */
378        fprintf( stderr, "peer sent us a keepalive message...\n" );
379        peer->gotKeepAliveTime = tr_date( );
380    } else {
381        fprintf( stderr, "peer is sending us a message with %d bytes...\n", (int)len );
382        peer->incomingMessageLength = len;
383        peer->state = AWAITING_BT_MESSAGE;
384    } return READ_AGAIN;
385}
386
387static int
388readBtMessage( tr_peermsgs * peer, struct evbuffer * inbuf )
389{
390    uint8_t id;
391    uint32_t ui32;
392    size_t msglen = peer->incomingMessageLength;
393
394    if( EVBUFFER_LENGTH(inbuf) < msglen )
395        return READ_MORE;
396
397    tr_peerIoReadBytes( peer->io, inbuf, &id, 1 );
398    msglen--;
399    fprintf( stderr, "got a message from the peer... "
400                     "bt id number is %d, and remaining len is %d\n", (int)id, (int)msglen );
401
402    switch( id )
403    {
404        case BT_CHOKE:
405            assert( msglen == 0 );
406            fprintf( stderr, "got a BT_CHOKE\n" );
407            peer->info->clientIsChoked = 1;
408            tr_list_foreach( peer->peerAskedFor, tr_free );
409            tr_list_free( &peer->peerAskedFor );
410            /* FIXME: maybe choke them */
411            /* FIXME: unmark anything we'd requested from them... */
412            break;
413
414        case BT_UNCHOKE:
415            assert( msglen == 0 );
416            fprintf( stderr, "got a BT_UNCHOKE\n" );
417            peer->info->clientIsChoked = 0;
418            /* FIXME: maybe unchoke them */
419            /* FIXME: maybe send them requests */
420            break;
421
422        case BT_INTERESTED:
423            assert( msglen == 0 );
424            fprintf( stderr, "got a BT_INTERESTED\n" );
425            peer->info->peerIsInterested = 1;
426            /* FIXME: maybe unchoke them */
427            break;
428
429        case BT_NOT_INTERESTED:
430            assert( msglen == 0 );
431            fprintf( stderr, "got a BT_NOT_INTERESTED\n" );
432            peer->info->peerIsInterested = 0;
433            /* FIXME: maybe choke them */
434            break;
435
436        case BT_HAVE:
437            assert( msglen == 4 );
438            fprintf( stderr, "got a BT_HAVE\n" );
439            tr_peerIoReadUint32( peer->io, inbuf, &ui32 );
440            tr_bitfieldAdd( peer->info->have, ui32 );
441            peer->info->progress = tr_bitfieldCountTrueBits( peer->info->have ) / (float)peer->torrent->info.pieceCount;
442            updateInterest( peer );
443            break;
444
445        case BT_BITFIELD:
446            assert( msglen == peer->info->have->len );
447            fprintf( stderr, "got a BT_BITFIELD\n" );
448            tr_peerIoReadBytes( peer->io, inbuf, peer->info->have->bits, msglen );
449            peer->info->progress = tr_bitfieldCountTrueBits( peer->info->have ) / (float)peer->torrent->info.pieceCount;
450            fprintf( stderr, "peer progress is %f\n", peer->info->progress );
451            updateInterest( peer );
452            /* FIXME: maybe unchoke */
453            break;
454
455        case BT_REQUEST: {
456            struct peer_request * req;
457            assert( msglen == 12 );
458            fprintf( stderr, "got a BT_REQUEST\n" );
459            req = tr_new( struct peer_request, 1 );
460            tr_peerIoReadUint32( peer->io, inbuf, &req->index );
461            tr_peerIoReadUint32( peer->io, inbuf, &req->offset );
462            tr_peerIoReadUint32( peer->io, inbuf, &req->length );
463            if( !peer->info->peerIsChoked )
464                tr_list_append( &peer->peerAskedFor, req );
465            break;
466        }
467
468        case BT_CANCEL: {
469            struct peer_request req;
470            tr_list * node;
471            assert( msglen == 12 );
472            fprintf( stderr, "got a BT_CANCEL\n" );
473            tr_peerIoReadUint32( peer->io, inbuf, &req.index );
474            tr_peerIoReadUint32( peer->io, inbuf, &req.offset );
475            tr_peerIoReadUint32( peer->io, inbuf, &req.length );
476            node = tr_list_find( peer->peerAskedFor, &req, peer_request_compare_func );
477            if( node != NULL ) {
478                fprintf( stderr, "found the req that peer is cancelling... cancelled.\n" );
479                tr_list_remove_data( &peer->peerAskedFor, node->data );
480            }
481            break;
482        }
483
484        case BT_PIECE: {
485            fprintf( stderr, "got a BT_PIECE\n" );
486            assert( peer->blockToUs.length == 0 );
487            peer->state = READING_BT_PIECE;
488            tr_peerIoReadUint32( peer->io, inbuf, &peer->blockToUs.index );
489            tr_peerIoReadUint32( peer->io, inbuf, &peer->blockToUs.offset );
490            peer->blockToUs.length = msglen - 8;
491            assert( peer->blockToUs.length > 0 );
492            evbuffer_drain( peer->inBlock, ~0 );
493            break;
494        }
495
496        case BT_PORT: {
497            assert( msglen == 2 );
498            fprintf( stderr, "got a BT_PORT\n" );
499            tr_peerIoReadUint16( peer->io, inbuf, &peer->listeningPort );
500            break;
501        }
502
503        case BT_LTEP:
504            fprintf( stderr, "got a BT_LTEP\n" );
505            parseLtep( peer, msglen, inbuf );
506            break;
507
508        default:
509            fprintf( stderr, "got an unknown BT message type: %d\n", (int)id );
510            tr_peerIoDrain( peer->io, inbuf, msglen );
511            assert( 0 );
512    }
513
514    peer->incomingMessageLength = -1;
515    peer->state = AWAITING_BT_LENGTH;
516    return READ_AGAIN;
517}
518
519static int
520canDownload( const tr_peermsgs * peer )
521{
522    tr_torrent * tor = peer->torrent;
523
524#if 0
525    /* FIXME: was swift worth it?  did anyone notice a difference? */
526    if( SWIFT_ENABLED && !isSeeding && (peer->credit<0) )
527        return FALSE;
528#endif
529
530    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
531        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
532
533    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
534        return tr_rcCanTransfer( tor->download );
535
536    return TRUE;
537}
538
539static int
540weAskedForThisBlock( const tr_peermsgs * peer, uint32_t index, uint32_t offset, uint32_t
541
542static void
543gotBlock( tr_peermsgs * peer, int index, int offset, struct evbuffer * inbuf )
544{
545    tr_torrent * tor = peer->torrent;
546    const size_t len = EVBUFFER_LENGTH( inbuf );
547    const int block = _tr_block( tor, index, offset );
548
549    /* sanity clause */
550    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
551        tr_dbg( "have this block already..." );
552        return;
553    }
554    if( (int)len != tr_torBlockCountBytes( tor, block ) ) {
555        tr_dbg( "block is the wrong length..." );
556        return;
557    }
558
559cc
560struct peer_request
561{
562    uint32_t index;
563    uint32_t offset;
564    uint32_t length;
565};
566
567
568        tr_list_append( &peer->clientAskedFor, req );
569
570ccc
571    --peer->outReqCount;
572
573    /* write to disk */
574    if( tr_ioWrite( tor, index, offset, len, EVBUFFER_DATA( inbuf )))
575        return;
576
577    /* make a note that this peer helped us with this piece */
578    if( !peer->info->blame )
579         peer->info->blame = tr_bitfieldNew( tor->info.pieceCount );
580    tr_bitfieldAdd( peer->info->blame, index );
581
582    tr_cpBlockAdd( tor->completion, block );
583
584    tor->downloadedCur += len;
585    tr_rcTransferred( tor->download, len );
586    tr_rcTransferred( tor->handle->download, len );
587
588//    broadcastCancel( tor, index, begin, len - 8 );
589}
590
591
592static ReadState
593readBtPiece( tr_peermsgs * peer, struct evbuffer * inbuf )
594{
595    assert( peer->blockToUs.length > 0 );
596
597    if( !canDownload( peer ) )
598    {
599        peer->notListening = 1;
600        tr_peerIoSetIOMode ( peer->io, 0, EV_READ );
601        return READ_DONE;
602    }
603    else
604    {
605        /* inbuf ->  inBlock */
606        const uint32_t len = MIN( EVBUFFER_LENGTH(inbuf), peer->blockToUs.length );
607        uint8_t * tmp = tr_new( uint8_t, len );
608        tr_peerIoReadBytes( peer->io, inbuf, tmp, len );
609        evbuffer_add( peer->inBlock, tmp, len );
610        tr_free( tmp );
611        peer->blockToUs.length -= len;
612
613        if( !peer->blockToUs.length )
614        {
615            gotBlock( peer, peer->blockToUs.index,
616                            peer->blockToUs.offset,
617                            peer->inBlock );
618            evbuffer_drain( peer->outBlock, ~0 );
619            peer->state = AWAITING_BT_LENGTH;
620        }
621
622        return READ_AGAIN;
623    }
624}
625
626static ReadState
627canRead( struct bufferevent * evin, void * vpeer )
628{
629    ReadState ret;
630    tr_peermsgs * peer = (tr_peermsgs *) vpeer;
631    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
632    fprintf( stderr, "peer %p got a canRead; state is [%s]\n", peer, getStateName(peer->state) );
633
634    switch( peer->state )
635    {
636        case AWAITING_BT_LENGTH:  ret = readBtLength  ( peer, inbuf ); break;
637        case AWAITING_BT_MESSAGE: ret = readBtMessage ( peer, inbuf ); break;
638        case READING_BT_PIECE:    ret = readBtPiece   ( peer, inbuf ); break;
639        default: assert( 0 );
640    }
641    return ret;
642}
643
644/**
645***
646**/
647
648static int
649canUpload( const tr_peermsgs * peer )
650{
651    const tr_torrent * tor = peer->torrent;
652
653    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
654        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
655
656    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
657        return tr_rcCanTransfer( tor->upload );
658
659    return TRUE;
660}
661
662static int
663pulse( void * vpeer )
664{
665    tr_peermsgs * peer = (tr_peermsgs *) vpeer;
666    size_t len;
667
668fprintf( stderr, "peer %p pulse... notlistening %d, outblock size: %d, outmessages size %d, peerAskedFor %p\n",
669         vpeer,
670         (int)peer->notListening,
671         (int)EVBUFFER_LENGTH( peer->outBlock ),
672         (int)EVBUFFER_LENGTH( peer->outMessages ),
673         peer->peerAskedFor );
674
675    /* if we froze out a downloaded block because of speed limits,
676       start listening to the peer again */
677    if( peer->notListening )
678    {
679        fprintf( stderr, "peer %p thawing out...\n", peer );
680        peer->notListening = 0;
681        tr_peerIoSetIOMode ( peer->io, EV_READ, 0 );
682    }
683
684    if(( len = EVBUFFER_LENGTH( peer->outBlock ) ))
685    {
686fprintf( stderr, "peer %p needing to upload... canUpload %d\n", peer, canUpload(peer) );
687        if( canUpload( peer ) )
688        {
689            const size_t outlen = MIN( len, 2048 );
690fprintf( stderr, "peer %p writing %d bytes...\n", peer, (int)outlen );
691            tr_peerIoWrite( peer->io, EVBUFFER_DATA(peer->outBlock), outlen );
692            evbuffer_drain( peer->outBlock, outlen );
693
694            peer->torrent->uploadedCur += outlen;
695            tr_rcTransferred( peer->torrent->upload, outlen );
696            tr_rcTransferred( peer->handle->upload, outlen );
697        }
698    }
699    else if(( len = EVBUFFER_LENGTH( peer->outMessages ) ))
700    {
701        fprintf( stderr, "peer %p pulse is writing %d bytes worth of messages...\n", peer, (int)len );
702        tr_peerIoWriteBuf( peer->io, peer->outMessages );
703        evbuffer_drain( peer->outMessages, ~0 );
704    }
705    else if(( peer->peerAskedFor ))
706    {
707        struct peer_request * req = (struct peer_request*) peer->peerAskedFor->data;
708        uint8_t * tmp = tr_new( uint8_t, req->length );
709        const uint8_t msgid = BT_PIECE;
710        const uint32_t msglen = sizeof(uint8_t) + sizeof(uint32_t)*2 + req->length;
711fprintf( stderr, "peer %p starting to upload a block...\n", peer );
712        tr_ioRead( peer->torrent, req->index, req->offset, req->length, tmp );
713        tr_peerIoWriteUint32( peer->io, peer->outBlock, msglen );
714        tr_peerIoWriteBytes ( peer->io, peer->outBlock, &msgid, 1 );
715        tr_peerIoWriteUint32( peer->io, peer->outBlock, req->index );
716        tr_peerIoWriteUint32( peer->io, peer->outBlock, req->offset );
717        tr_peerIoWriteBytes ( peer->io, peer->outBlock, tmp, req->length );
718        tr_free( tmp );
719    }
720
721    return TRUE; /* loop forever */
722}
723
724static void
725didWrite( struct bufferevent * evin UNUSED, void * vpeer )
726{
727    tr_peermsgs * peer = (tr_peermsgs *) vpeer;
728    fprintf( stderr, "peer %p got a didWrite...\n", peer );
729    pulse( vpeer );
730}
731
732static void
733gotError( struct bufferevent * evbuf UNUSED, short what, void * vpeer )
734{
735    tr_peermsgs * peer = (tr_peermsgs *) vpeer;
736    fprintf( stderr, "peer %p got an error in %d\n", peer, (int)what );
737}
738
739static void
740sendBitfield( tr_peermsgs * peer )
741{
742    const tr_bitfield * bitfield = tr_cpPieceBitfield( peer->torrent->completion );
743    const uint32_t len = sizeof(uint8_t) + bitfield->len;
744    const uint8_t bt_msgid = BT_BITFIELD;
745
746    fprintf( stderr, "peer %p: enqueueing a bitfield message\n", peer );
747    tr_peerIoWriteUint32( peer->io, peer->outMessages, len );
748    tr_peerIoWriteBytes( peer->io, peer->outMessages, &bt_msgid, 1 );
749    tr_peerIoWriteBytes( peer->io, peer->outMessages, bitfield->bits, bitfield->len );
750}
751
752/**
753***
754**/
755
756#define MAX_DIFFS 50
757
758typedef struct
759{
760    tr_pex * added;
761    tr_pex * dropped;
762    tr_pex * elements;
763    int addedCount;
764    int droppedCount;
765    int elementCount;
766    int diffCount;
767}
768PexDiffs;
769
770static void pexAddedCb( void * vpex, void * userData )
771{
772    PexDiffs * diffs = (PexDiffs *) userData;
773    tr_pex * pex = (tr_pex *) vpex;
774    if( diffs->diffCount < MAX_DIFFS )
775    {
776        diffs->diffCount++;
777        diffs->added[diffs->addedCount++] = *pex;
778        diffs->elements[diffs->elementCount++] = *pex;
779    }
780}
781
782static void pexRemovedCb( void * vpex, void * userData )
783{
784    PexDiffs * diffs = (PexDiffs *) userData;
785    tr_pex * pex = (tr_pex *) vpex;
786    if( diffs->diffCount < MAX_DIFFS )
787    {
788        diffs->diffCount++;
789        diffs->dropped[diffs->droppedCount++] = *pex;
790    }
791}
792
793static void pexElementCb( void * vpex, void * userData )
794{
795    PexDiffs * diffs = (PexDiffs *) userData;
796    tr_pex * pex = (tr_pex *) vpex;
797    if( diffs->diffCount < MAX_DIFFS )
798    {
799        diffs->diffCount++;
800        diffs->elements[diffs->elementCount++] = *pex;
801    }
802}
803
804static int
805pexPulse( void * vpeer )
806{
807    tr_peermsgs * peer = (tr_peermsgs *) vpeer;
808
809    if( peer->info->pexEnabled )
810    {
811        int i;
812        tr_pex * newPex = NULL;
813        const int newCount = tr_peerMgrGetPeers( peer->handle->peerMgr, peer->torrent->info.hash, &newPex );
814        PexDiffs diffs;
815        benc_val_t val, *added, *dropped, *flags;
816        uint8_t *tmp, *walk;
817        char * benc;
818        int bencLen;
819        const uint8_t bt_msgid = BT_LTEP;
820        const uint8_t ltep_msgid = peer->ut_pex;
821
822        /* build the diffs */
823        diffs.added = tr_new( tr_pex, newCount );
824        diffs.addedCount = 0;
825        diffs.dropped = tr_new( tr_pex, peer->pexCount );
826        diffs.droppedCount = 0;
827        diffs.elements = tr_new( tr_pex, newCount + peer->pexCount );
828        diffs.elementCount = 0;
829        diffs.diffCount = 0;
830        tr_set_compare( peer->pex, peer->pexCount,
831                        newPex, newCount,
832                        tr_pexCompare, sizeof(tr_pex),
833                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
834        fprintf( stderr, "pex: old peer count %d, new peer count %d, added %d, removed %d\n", peer->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
835
836        /* update peer */
837        tr_free( peer->pex );
838        peer->pex = diffs.elements;
839        peer->pexCount = diffs.elementCount;
840
841       
842        /* build the pex payload */
843        tr_bencInit( &val, TYPE_DICT );
844        tr_bencDictReserve( &val, 3 );
845
846        /* "added" */
847        added = tr_bencDictAdd( &val, "added" );
848        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
849        for( i=0; i<diffs.addedCount; ++i ) {
850            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
851            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
852        }
853        assert( ( walk - tmp ) == diffs.addedCount * 6 );
854        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
855
856        /* "added.f" */
857        flags = tr_bencDictAdd( &val, "added.f" );
858        tmp = walk = tr_new( uint8_t, diffs.addedCount );
859        for( i=0; i<diffs.addedCount; ++i )
860            *walk++ = diffs.added[i].flags;
861        assert( ( walk - tmp ) == diffs.addedCount );
862        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
863
864        /* "dropped" */
865        dropped = tr_bencDictAdd( &val, "dropped" );
866        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
867        for( i=0; i<diffs.droppedCount; ++i ) {
868            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
869            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
870        }
871        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
872        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
873
874        /* write the pex message */
875        benc = tr_bencSaveMalloc( &val, &bencLen );
876        tr_peerIoWriteUint32( peer->io, peer->outBlock, 1 + 1 + bencLen );
877        tr_peerIoWriteBytes ( peer->io, peer->outBlock, &bt_msgid, 1 );
878        tr_peerIoWriteBytes ( peer->io, peer->outBlock, &ltep_msgid, 1 );
879        tr_peerIoWriteBytes ( peer->io, peer->outBlock, benc, bencLen );
880
881        /* cleanup */
882        tr_free( benc );
883        tr_bencFree( &val );
884        tr_free( diffs.added );
885        tr_free( diffs.dropped );
886        tr_free( newPex );
887    }
888
889    return TRUE;
890}
891
892/**
893***
894**/
895
896tr_peermsgs*
897tr_peerMsgsNew( struct tr_torrent * torrent, struct tr_peer * info )
898{
899    tr_peermsgs * peer;
900
901    assert( info != NULL );
902    assert( info->io != NULL );
903
904    peer = tr_new0( tr_peermsgs, 1 );
905    peer->info = info;
906    peer->handle = torrent->handle;
907    peer->torrent = torrent;
908    peer->io = info->io;
909    peer->info->clientIsChoked = 1;
910    peer->info->peerIsChoked = 1;
911    peer->info->clientIsInterested = 0;
912    peer->info->peerIsInterested = 0;
913    peer->info->have = tr_bitfieldNew( torrent->info.pieceCount );
914    peer->pulseTag = tr_timerNew( peer->handle, pulse, peer, NULL, 500 );
915fprintf( stderr, "peer %p pulseTag %p\n", peer, peer->pulseTag );
916    peer->pexTag = tr_timerNew( peer->handle, pexPulse, peer, NULL, PEX_INTERVAL );
917    peer->outMessages = evbuffer_new( );
918    peer->outBlock = evbuffer_new( );
919    peer->inBlock = evbuffer_new( );
920
921    tr_peerIoSetIOFuncs( peer->io, canRead, didWrite, gotError, peer );
922    tr_peerIoSetIOMode( peer->io, EV_READ|EV_WRITE, 0 );
923
924    sendBitfield( peer );
925
926    return peer;
927}
928
929void
930tr_peerMsgsFree( tr_peermsgs* p )
931{
932    if( p != NULL )
933    {
934fprintf( stderr, "peer %p destroying its pulse tag\n", p );
935        tr_timerFree( &p->pulseTag );
936        tr_timerFree( &p->pexTag );
937        evbuffer_free( p->outMessages );
938        evbuffer_free( p->outBlock );
939        evbuffer_free( p->inBlock );
940        tr_free( p );
941    }
942}
Note: See TracBrowser for help on using the repository browser.