source: branches/encryption/libtransmission/peer-msgs.c @ 3095

Last change on this file since 3095 was 3095, checked in by charles, 15 years ago

fix a couple of bugs I introduced yesterday.

  • Property svn:keywords set to Date Rev Author Id
File size: 35.1 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3095 2007-09-17 13:09:48Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <arpa/inet.h>
20
21#include <sys/types.h> /* event.h needs this */
22#include <event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "completion.h"
27#include "inout.h"
28#include "list.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ratecontrol.h"
34#include "trevent.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41#define PEX_INTERVAL (60 * 1000)
42
43#define PEER_PULSE_INTERVAL (50)
44
45enum
46{
47    BT_CHOKE           = 0,
48    BT_UNCHOKE         = 1,
49    BT_INTERESTED      = 2,
50    BT_NOT_INTERESTED  = 3,
51    BT_HAVE            = 4,
52    BT_BITFIELD        = 5,
53    BT_REQUEST         = 6,
54    BT_PIECE           = 7,
55    BT_CANCEL          = 8,
56    BT_PORT            = 9,
57    BT_LTEP            = 20,
58
59    LTEP_HANDSHAKE     = 0
60};
61
62enum
63{
64    AWAITING_BT_LENGTH,
65    AWAITING_BT_MESSAGE,
66    READING_BT_PIECE
67};
68
69struct peer_request
70{
71    uint32_t index;
72    uint32_t offset;
73    uint32_t length;
74    time_t time_requested;
75};
76
77static int
78peer_request_compare( const void * va, const void * vb )
79{
80    struct peer_request * a = (struct peer_request*) va;
81    struct peer_request * b = (struct peer_request*) vb;
82    if( a->index != b->index ) return a->index - b->index;
83    if( a->offset != b->offset ) return a->offset - b->offset;
84    if( a->length != b->length ) return a->length - b->length;
85    return 0;
86}
87
88struct tr_peermsgs
89{
90    tr_peer * info;
91
92    tr_handle * handle;
93    tr_torrent * torrent;
94    tr_peerIo * io;
95
96    tr_publisher_t * publisher;
97
98    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
99    struct evbuffer * outBlock;    /* the block we're currently sending */
100    struct evbuffer * inBlock;     /* the block we're currently receiving */
101    tr_list * peerAskedFor;
102    tr_list * clientAskedFor;
103
104    tr_timer * pulseTimer;
105    tr_timer * pexTimer;
106
107    struct peer_request blockToUs; /* the block currntly being sent to us */
108
109    time_t lastReqAddedAt;
110    time_t gotKeepAliveTime;
111    time_t clientSentPexAt;
112
113    unsigned int notListening    : 1;
114    unsigned int peerSupportsPex : 1;
115
116    uint8_t state;
117
118    uint8_t ut_pex_id;
119    uint16_t listeningPort;
120
121    uint16_t pexCount;
122
123    uint32_t incomingMessageLength;
124
125    tr_pex * pex;
126};
127
128/**
129***  EVENTS
130**/
131
132static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0 };
133
134static void
135publishEvent( tr_peermsgs * peer, int eventType )
136{
137    tr_peermsgs_event e = blankEvent;
138    e.eventType = eventType;
139    tr_publisherPublish( peer->publisher, peer, &e );
140}
141
142static void
143fireNeedReq( tr_peermsgs * msgs )
144{
145    tr_peermsgs_event e = blankEvent;
146    e.eventType = TR_PEERMSG_NEED_REQ;
147    tr_publisherPublish( msgs->publisher, msgs, &e );
148}
149
150static void
151fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
152{
153    tr_peermsgs_event e = blankEvent;
154    e.eventType = TR_PEERMSG_CLIENT_HAVE;
155    e.pieceIndex = pieceIndex;
156    tr_publisherPublish( msgs->publisher, msgs, &e );
157}
158
159static void
160fireGotBlock( tr_peermsgs * peer, uint32_t pieceIndex, uint32_t offset, uint32_t length )
161{
162    tr_peermsgs_event e = blankEvent;
163    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
164    e.pieceIndex = pieceIndex;
165    e.offset = offset;
166    e.length = length;
167    tr_publisherPublish( peer->publisher, peer, &e );
168}
169
170static void
171fireGotError( tr_peermsgs * peer )
172{
173    publishEvent( peer, TR_PEERMSG_GOT_ERROR );
174}
175
176/**
177***  INTEREST
178**/
179
180static int
181isPieceInteresting( const tr_peermsgs   * peer,
182                    int                   piece )
183{
184    const tr_torrent * torrent = peer->torrent;
185    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
186        return FALSE;
187    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we already have it */
188        return FALSE;
189    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
190        return FALSE;
191    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned for it */
192        return FALSE;
193    return TRUE;
194}
195
196static int
197isPeerInteresting( const tr_peermsgs * msgs )
198{
199    const int clientIsSeed = tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
200    const int peerIsSeed = msgs->info->progress >= 1.0;
201
202    if( peerIsSeed )
203    {
204        return !clientIsSeed;
205    }
206    else if( clientIsSeed )
207    {
208        return !peerIsSeed;
209    }
210    else /* we're both leeches... */
211    {
212        int i;
213        const tr_torrent * torrent = msgs->torrent;
214        const tr_bitfield * bitfield = tr_cpPieceBitfield( torrent->completion );
215
216        if( !msgs->info->have ) /* We don't know what this peer has... what should this be? */
217            return TRUE;
218
219        assert( bitfield->len == msgs->info->have->len );
220        for( i=0; i<torrent->info.pieceCount; ++i )
221            if( isPieceInteresting( msgs, i ) )
222                return TRUE;
223
224        return FALSE;
225    }
226}
227
228static void
229sendInterest( tr_peermsgs * msgs, int weAreInterested )
230{
231    const uint32_t len = sizeof(uint8_t);
232    const uint8_t bt_msgid = weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED;
233const time_t now = time( NULL );
234
235    assert( msgs != NULL );
236    assert( weAreInterested==0 || weAreInterested==1 );
237
238    msgs->info->clientIsInterested = weAreInterested;
239    fprintf( stderr, "peer %p: sending an %s message at %s\n", msgs, (weAreInterested ? "INTERESTED" : "NOT_INTERESTED"), ctime(&now) );
240    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
241    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
242}
243
244static void
245updateInterest( tr_peermsgs * msgs )
246{
247    const int i = isPeerInteresting( msgs );
248    if( i != msgs->info->clientIsInterested )
249        sendInterest( msgs, i );
250    if( i )
251        fireNeedReq( msgs );
252}
253
254void
255tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
256{
257    assert( msgs != NULL );
258    assert( msgs->info != NULL );
259    assert( choke==0 || choke==1 );
260const time_t now = time( NULL );
261
262    if( msgs->info->peerIsChoked != choke )
263    {
264        const uint32_t len = sizeof(uint8_t);
265        const uint8_t bt_msgid = choke ? BT_CHOKE : BT_UNCHOKE;
266
267        msgs->info->peerIsChoked = choke ? 1 : 0;
268        if( msgs->info )
269        {
270            tr_list_foreach( msgs->peerAskedFor, tr_free );
271            tr_list_free( &msgs->peerAskedFor );
272        }
273
274        fprintf( stderr, "peer %p: sending a %s message at %s\n", msgs, (choke ? "CHOKE" : "UNCHOKE"), ctime(&now) );
275        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
276        tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
277    }
278}
279
280/**
281***
282**/
283
284void
285tr_peerMsgsCancel( tr_peermsgs * msgs,
286                   uint32_t      pieceIndex,
287                   uint32_t      offset,
288                   uint32_t      length )
289{
290    tr_list * node;
291    struct peer_request tmp;
292
293    assert( msgs != NULL );
294    assert( length > 0 );
295
296    tmp.index = pieceIndex;
297    tmp.offset = offset;
298    tmp.length = length;
299
300    node = tr_list_find( msgs->clientAskedFor, &tmp, peer_request_compare );
301    if( node != NULL )
302    {
303        /* cancel the request */
304        const uint8_t bt_msgid = BT_CANCEL;
305        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
306        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
307        tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
308        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
309        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
310        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
311
312        /* remove it from our "requested" list */
313        tr_list_remove_data( &msgs->peerAskedFor, node->data );
314    }
315}
316
317/**
318***
319**/
320
321void
322tr_peerMsgsHave( tr_peermsgs * msgs,
323                 uint32_t      pieceIndex )
324{
325    const uint8_t bt_msgid = BT_HAVE;
326    const uint32_t len = sizeof(uint8_t) + sizeof(uint32_t);
327    fprintf( stderr, "peer %p: w00t telling them we HAVE piece #%d\n", msgs, pieceIndex );
328    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
329    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
330    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
331
332    updateInterest( msgs );
333}
334
335/**
336***
337**/
338
339static int
340pulse( void * vmsgs );
341
342int
343tr_peerMsgsAddRequest( tr_peermsgs * msgs,
344                       uint32_t      index, 
345                       uint32_t      offset, 
346                       uint32_t      length )
347{
348    const uint8_t bt_msgid = BT_REQUEST;
349    const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
350    struct peer_request * req;
351    int maxSize;
352
353    assert( msgs != NULL );
354    assert( msgs->torrent != NULL );
355    assert( index < ((uint32_t)msgs->torrent->info.pieceCount) );
356    assert( offset < (uint32_t)tr_torPieceCountBytes( msgs->torrent, (int)index ) );
357    assert( (offset + length) <= (uint32_t)tr_torPieceCountBytes( msgs->torrent, (int)index ) );
358
359    if( msgs->info->clientIsChoked )
360        return TR_ADDREQ_CLIENT_CHOKED;
361
362    if( !tr_bitfieldHas( msgs->info->have, index ) )
363        return TR_ADDREQ_MISSING;
364
365    maxSize = MIN( 2 + (int)(tr_peerIoGetRateToClient(msgs->io)/10), 100 );
366    //if( ( time(NULL) - msgs->lastReqAddedAt <= 5 ) && ( tr_list_size( msgs->clientAskedFor) >= maxSize ) )
367    if( tr_list_size( msgs->clientAskedFor) >= maxSize )
368        return TR_ADDREQ_FULL;
369
370    fprintf( stderr, "w00t peer %p has a max request queue size of %d... adding request for piece %d, offset %d\n", msgs, maxSize, (int)index, (int)offset );
371
372    /* queue the request */
373    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
374    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
375    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, index );
376    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
377    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
378
379    /* add it to our `requests sent' list */
380    req = tr_new( struct peer_request, 1 );
381    req->index = index;
382    req->offset = offset;
383    req->length = length;
384    req->time_requested = msgs->lastReqAddedAt = time( NULL );
385    tr_list_append( &msgs->clientAskedFor, req );
386    pulse( msgs );
387
388    return TR_ADDREQ_OK;
389}
390
391/**
392***
393**/
394
395static void
396parseLtepHandshake( tr_peermsgs * peer, int len, struct evbuffer * inbuf )
397{
398    benc_val_t val, * sub;
399    uint8_t * tmp = tr_new( uint8_t, len );
400    evbuffer_remove( inbuf, tmp, len );
401
402    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
403        fprintf( stderr, "GET  extended-handshake, couldn't get dictionary\n" );
404        tr_free( tmp );
405        return;
406    }
407
408    tr_bencPrint( &val );
409
410    /* check supported messages for utorrent pex */
411    sub = tr_bencDictFind( &val, "m" );
412    if( tr_bencIsDict( sub ) ) {
413        sub = tr_bencDictFind( sub, "ut_pex" );
414        if( tr_bencIsInt( sub ) ) {
415            peer->peerSupportsPex = 1;
416            peer->ut_pex_id = (uint8_t) sub->val.i;
417            fprintf( stderr, "peer->ut_pex is %d\n", (int)peer->ut_pex_id );
418        }
419    }
420
421#if 0
422    /* get peer's client name */
423    sub = tr_bencDictFind( &val, "v" );
424    if( tr_bencIsStr( sub ) ) {
425int i;
426        tr_free( peer->info->client );
427        fprintf( stderr, "dictionary says client is [%s]\n", sub->val.s.s );
428        peer->info->client = tr_strndup( sub->val.s.s, sub->val.s.i );
429for( i=0; i<sub->val.s.i; ++i ) { fprintf( stderr, "[%c] (%d)\n", sub->val.s.s[i], (int)sub->val.s.s[i] );
430                                  if( (int)peer->info->client[i]==-75 ) peer->info->client[i]='u'; }
431        fprintf( stderr, "peer->client is now [%s]\n", peer->info->client );
432    }
433#endif
434
435    /* get peer's listening port */
436    sub = tr_bencDictFind( &val, "p" );
437    if( tr_bencIsInt( sub ) ) {
438        peer->listeningPort = htons( (uint16_t)sub->val.i );
439        fprintf( stderr, "peer->port is now %hu\n", peer->listeningPort );
440    }
441
442    tr_bencFree( &val );
443    tr_free( tmp );
444}
445
446static void
447parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
448{
449    benc_val_t val, * sub;
450    uint8_t * tmp;
451
452    if( msgs->torrent->pexDisabled ) /* no sharing! */
453        return;
454
455    tmp = tr_new( uint8_t, msglen );
456    evbuffer_remove( inbuf, tmp, msglen );
457
458    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
459        fprintf( stderr, "GET can't read extended-pex dictionary\n" );
460        tr_free( tmp );
461        return;
462    }
463
464    sub = tr_bencDictFind( &val, "added" );
465    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
466        const int n = sub->val.s.i / 6 ;
467        fprintf( stderr, "got %d peers from uT pex\n", n );
468        tr_peerMgrAddPeers( msgs->handle->peerMgr,
469                            msgs->torrent->info.hash,
470                            TR_PEER_FROM_PEX,
471                            (uint8_t*)sub->val.s.s, n );
472    }
473
474    tr_bencFree( &val );
475    tr_free( tmp );
476}
477
478static void
479sendPex( tr_peermsgs * msgs );
480
481static void
482parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
483{
484    uint8_t ltep_msgid;
485
486    tr_peerIoReadBytes( msgs->io, inbuf, &ltep_msgid, 1 );
487    msglen--;
488
489    if( ltep_msgid == LTEP_HANDSHAKE )
490    {
491        fprintf( stderr, "got ltep handshake\n" );
492        parseLtepHandshake( msgs, msglen, inbuf );
493    }
494    else if( ltep_msgid == msgs->ut_pex_id )
495    {
496        fprintf( stderr, "got ut pex\n" );
497        msgs->peerSupportsPex = 1;
498        parseUtPex( msgs, msglen, inbuf );
499    }
500    else
501    {
502        fprintf( stderr, "skipping unknown ltep message (%d)\n", (int)ltep_msgid );
503        evbuffer_drain( inbuf, msglen );
504    }
505}
506
507static int
508readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
509{
510    uint32_t len;
511    const size_t needlen = sizeof(uint32_t);
512
513    if( EVBUFFER_LENGTH(inbuf) < needlen )
514        return READ_MORE;
515
516    tr_peerIoReadUint32( msgs->io, inbuf, &len );
517
518    if( len == 0 ) { /* peer sent us a keepalive message */
519        fprintf( stderr, "peer %p sent us a keepalive message...\n", msgs );
520        msgs->gotKeepAliveTime = time( NULL );
521    } else {
522        fprintf( stderr, "peer %p is sending us a message with %"PRIu64" bytes...\n", msgs, (uint64_t)len );
523        msgs->incomingMessageLength = len;
524        msgs->state = AWAITING_BT_MESSAGE;
525    } return READ_AGAIN;
526}
527
528static int
529readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
530{
531    uint8_t id;
532    uint32_t ui32;
533    uint32_t msglen = msgs->incomingMessageLength;
534const time_t now = time( NULL );
535
536    if( EVBUFFER_LENGTH(inbuf) < msglen )
537        return READ_MORE;
538
539    tr_peerIoReadBytes( msgs->io, inbuf, &id, 1 );
540    msglen--;
541    fprintf( stderr, "peer %p sent us a message... "
542                     "bt id number is %d, and remaining len is %d\n", msgs, (int)id, (int)msglen );
543
544    switch( id )
545    {
546        case BT_CHOKE:
547            assert( msglen == 0 );
548            fprintf( stderr, "w00t peer-msgs %p sent us a BT_CHOKE at %s\n", msgs, ctime(&now) );
549            msgs->info->clientIsChoked = 1;
550            tr_list_foreach( msgs->peerAskedFor, tr_free );
551            tr_list_free( &msgs->peerAskedFor );
552            tr_list_foreach( msgs->clientAskedFor, tr_free );
553            tr_list_free( &msgs->clientAskedFor );
554            break;
555
556        case BT_UNCHOKE:
557            assert( msglen == 0 );
558            fprintf( stderr, "w00t peer-msgs %p sent us a BT_UNCHOKE at %s\n", msgs, ctime(&now) );
559            msgs->info->clientIsChoked = 0;
560            fireNeedReq( msgs );
561            break;
562
563        case BT_INTERESTED:
564            assert( msglen == 0 );
565            fprintf( stderr, "w00t peer-msgs %p sent us a BT_INTERESTED at %s\n", msgs, ctime(&now) );
566            msgs->info->peerIsInterested = 1;
567            break;
568
569        case BT_NOT_INTERESTED:
570            assert( msglen == 0 );
571            fprintf( stderr, "w00t peer-msgs %p sent us a BT_NOT_INTERESTED at %s\n", msgs, ctime(&now) );
572            msgs->info->peerIsInterested = 0;
573            break;
574
575        case BT_HAVE:
576            assert( msglen == 4 );
577            fprintf( stderr, "peer-msgs %p sent us a BT_HAVE\n", msgs );
578            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
579            tr_bitfieldAdd( msgs->info->have, ui32 );
580            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
581            fprintf( stderr, "after the HAVE message, peer progress is %f\n", msgs->info->progress );
582            updateInterest( msgs );
583            break;
584
585        case BT_BITFIELD:
586            assert( msglen == msgs->info->have->len );
587            fprintf( stderr, "peer-msgs %p sent us a BT_BITFIELD\n", msgs );
588            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
589            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
590            fprintf( stderr, "after the BITFIELD peer progress is %f\n", msgs->info->progress );
591            updateInterest( msgs );
592            /* FIXME: maybe unchoke */
593            break;
594
595        case BT_REQUEST: {
596            struct peer_request * req;
597            assert( msglen == 12 );
598            fprintf( stderr, "got a BT_REQUEST\n" );
599            req = tr_new( struct peer_request, 1 );
600            tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
601            tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
602            tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
603            if( !msgs->info->peerIsChoked )
604                tr_list_append( &msgs->peerAskedFor, req );
605            break;
606        }
607
608        case BT_CANCEL: {
609            struct peer_request req;
610            tr_list * node;
611            assert( msglen == 12 );
612            fprintf( stderr, "got a BT_CANCEL\n" );
613            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
614            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
615            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
616            node = tr_list_find( msgs->peerAskedFor, &req, peer_request_compare );
617            if( node != NULL ) {
618                fprintf( stderr, "found the req that peer is cancelling... cancelled.\n" );
619                tr_list_remove_data( &msgs->peerAskedFor, node->data );
620            }
621            break;
622        }
623
624        case BT_PIECE: {
625            fprintf( stderr, "peer-msgs %p sent us a BT_PIECE\n", msgs );
626            assert( msgs->blockToUs.length == 0 );
627            msgs->state = READING_BT_PIECE;
628            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
629            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
630            msgs->blockToUs.length = msglen - 8;
631            assert( msgs->blockToUs.length > 0 );
632            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
633            //evbuffer_drain( msgs->inBlock, ~0 );
634            return READ_AGAIN;
635            break;
636        }
637
638        case BT_PORT: {
639            assert( msglen == 2 );
640            fprintf( stderr, "peer-msgs %p sent us a BT_PORT\n", msgs );
641            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->listeningPort );
642            break;
643        }
644
645        case BT_LTEP:
646            fprintf( stderr, "peer-msgs %p sent us a BT_LTEP\n", msgs );
647            parseLtep( msgs, msglen, inbuf );
648            break;
649
650        default:
651            fprintf( stderr, "peer-msgs %p sent us an UNKNOWN: %d\n", msgs, (int)id );
652            tr_peerIoDrain( msgs->io, inbuf, msglen );
653            assert( 0 );
654    }
655
656    msgs->incomingMessageLength = -1;
657    msgs->state = AWAITING_BT_LENGTH;
658    return READ_AGAIN;
659}
660
661static void
662clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
663{
664    tr_torrent * tor = msgs->torrent;
665    tor->downloadedCur += byteCount;
666    tr_rcTransferred( tor->download, byteCount );
667    tr_rcTransferred( tor->handle->download, byteCount );
668}
669
670static void
671peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
672{
673    tr_torrent * tor = msgs->torrent;
674    tor->uploadedCur += byteCount;
675    tr_rcTransferred( tor->upload, byteCount );
676    tr_rcTransferred( tor->handle->upload, byteCount );
677}
678
679static int
680canDownload( const tr_peermsgs * msgs UNUSED )
681{
682#if 0
683    tr_torrent * tor = msgs->torrent;
684
685    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
686        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
687
688    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
689        return tr_rcCanTransfer( tor->download );
690#endif
691
692    return TRUE;
693}
694
695static void
696reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
697{
698    tr_torrent * tor = msgs->torrent;
699
700    /* increment the `corrupt' field */
701    tor->corruptCur += byteCount;
702
703    /* decrement the `downloaded' field */
704    if( tor->downloadedCur >= byteCount )
705        tor->downloadedCur -= byteCount;
706    else
707        tor->downloadedCur = 0;
708}
709
710
711static void
712gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
713{
714    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
715    reassignBytesToCorrupt( msgs, byteCount );
716}
717
718static void
719gotUnwantedBlock( tr_peermsgs * msgs, uint32_t index UNUSED, uint32_t offset UNUSED, uint32_t length )
720{
721    reassignBytesToCorrupt( msgs, length );
722}
723
724static void
725addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
726{
727    if( !msgs->info->blame )
728         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
729    tr_bitfieldAdd( msgs->info->blame, index );
730}
731
732static void
733gotBlock( tr_peermsgs      * msgs,
734          struct evbuffer  * inbuf,
735          uint32_t           index,
736          uint32_t           offset,
737          uint32_t           length )
738{
739    tr_torrent * tor = msgs->torrent;
740    const int block = _tr_block( tor, index, offset );
741    struct peer_request key, *req;
742    const time_t now = time( NULL );
743
744    /**
745    *** Remove the block from our `we asked for this' list
746    **/
747
748    key.index = index;
749    key.offset = offset;
750    key.length = length;
751    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
752                                                 peer_request_compare );
753    fprintf( stderr, "w00t got a block from %p. turnaround time for this block was %d seconds... at %s\n",
754                     msgs, (int)(time(NULL) - req->time_requested), ctime(&now) );
755    if( req == NULL ) {
756        gotUnwantedBlock( msgs, index, offset, length );
757        fprintf( stderr, "we didn't ask for this message...\n" );
758        tr_dbg( "we didn't ask the peer for this message..." );
759        return;
760    }
761    tr_free( req );
762    fprintf( stderr, "peer %p now has %d block requests in its outbox\n",
763             msgs, tr_list_size(msgs->clientAskedFor));
764
765    /**
766    *** Error checks
767    **/
768
769    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
770        fprintf( stderr, "have this block already...\n" );
771        tr_dbg( "have this block already..." );
772        gotUnwantedBlock( msgs, index, offset, length );
773        return;
774    }
775
776    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
777        fprintf( stderr, "block is the wrong length..." );
778        tr_dbg( "block is the wrong length..." );
779        gotUnwantedBlock( msgs, index, offset, length );
780        return;
781    }
782
783    /**
784    ***  Write the block
785    **/
786
787    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf ))) {
788        return;
789    }
790
791    tr_cpBlockAdd( tor->completion, block );
792
793    addUsToBlamefield( msgs, index );
794
795    fireGotBlock( msgs, index, offset, length );
796    fireNeedReq( msgs );
797
798    /**
799    ***  Handle if this was the last block in the piece
800    **/
801
802    if( tr_cpPieceIsComplete( tor->completion, index ) )
803    {
804        if( !tr_ioHash( tor, index ) )
805        {
806            gotBadPiece( msgs, index );
807            return;
808        }
809
810        fireClientHave( msgs, index );
811    }
812}
813
814
815static ReadState
816readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
817{
818    uint32_t inlen;
819    uint8_t * tmp;
820
821    assert( msgs != NULL );
822    assert( msgs->blockToUs.length > 0 );
823    assert( inbuf != NULL );
824    assert( EVBUFFER_LENGTH( inbuf ) > 0 );
825
826    /* read from the inbuf into our block buffer */
827    inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
828    tmp = tr_new( uint8_t, inlen );
829    tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
830    evbuffer_add( msgs->inBlock, tmp, inlen );
831
832    /* update our tables accordingly */
833    assert( inlen >= msgs->blockToUs.length );
834    msgs->blockToUs.length -= inlen;
835    msgs->info->peerSentDataAt = time( NULL );
836    clientGotBytes( msgs, inlen );
837
838    /* if this was the entire block, save it */
839    if( !msgs->blockToUs.length )
840    {
841        fprintf( stderr, "w00t -- got block index %u, offset %u\n", msgs->blockToUs.index, msgs->blockToUs.offset );
842        assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
843        gotBlock( msgs, msgs->inBlock,
844                        msgs->blockToUs.index,
845                        msgs->blockToUs.offset,
846                        EVBUFFER_LENGTH( msgs->inBlock ) );
847        evbuffer_drain( msgs->inBlock, ~0 );
848        msgs->state = AWAITING_BT_LENGTH;
849    }
850
851    /* cleanup */
852    tr_free( tmp );
853    return READ_AGAIN;
854}
855
856static ReadState
857canRead( struct bufferevent * evin, void * vpeer )
858{
859    ReadState ret;
860    tr_peermsgs * peer = (tr_peermsgs *) vpeer;
861    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
862
863    switch( peer->state )
864    {
865        case AWAITING_BT_LENGTH:  ret = readBtLength  ( peer, inbuf ); break;
866        case AWAITING_BT_MESSAGE: ret = readBtMessage ( peer, inbuf ); break;
867        case READING_BT_PIECE:    ret = readBtPiece   ( peer, inbuf ); break;
868        default: assert( 0 );
869    }
870    return ret;
871}
872
873/**
874***
875**/
876
877static int
878canWrite( const tr_peermsgs * msgs )
879{
880    /* don't let our outbuffer get too large */
881    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 2048 )
882        return FALSE;
883
884    return TRUE;
885}
886
887static int
888canUpload( const tr_peermsgs * msgs )
889{
890    const tr_torrent * tor = msgs->torrent;
891
892    if( !canWrite( msgs ) )
893        return FALSE;
894
895    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
896        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
897
898    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
899        return tr_rcCanTransfer( tor->upload );
900
901    return TRUE;
902}
903
904static int
905pulse( void * vmsgs )
906{
907    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
908    size_t len;
909
910    /* if we froze out a downloaded block because of speed limits,
911       start listening to the peer again */
912#if 0
913    if( msgs->notListening )
914    {
915        fprintf( stderr, "msgs %p thawing out...\n", msgs );
916        msgs->notListening = 0;
917        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
918    }
919#endif
920
921    if( !canWrite( msgs ) )
922    {
923    }
924    else if(( len = EVBUFFER_LENGTH( msgs->outBlock ) ))
925    {
926        while ( len && canUpload( msgs ) )
927        {
928            const size_t outlen = MIN( len, 2048 );
929            tr_peerIoWrite( msgs->io, EVBUFFER_DATA(msgs->outBlock), outlen );
930            evbuffer_drain( msgs->outBlock, outlen );
931            peerGotBytes( msgs, outlen );
932            len -= outlen;
933        }
934    }
935    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
936    {
937        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
938    }
939    else if(( msgs->peerAskedFor ))
940    {
941        struct peer_request * req = (struct peer_request*) msgs->peerAskedFor->data;
942        uint8_t * tmp = tr_new( uint8_t, req->length );
943        const uint8_t msgid = BT_PIECE;
944        const uint32_t msglen = sizeof(uint8_t) + sizeof(uint32_t)*2 + req->length;
945        tr_ioRead( msgs->torrent, req->index, req->offset, req->length, tmp );
946        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, msglen );
947        tr_peerIoWriteBytes ( msgs->io, msgs->outBlock, &msgid, 1 );
948        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, req->index );
949        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, req->offset );
950        tr_peerIoWriteBytes ( msgs->io, msgs->outBlock, tmp, req->length );
951        tr_free( tmp );
952    }
953
954    return TRUE; /* loop forever */
955}
956
957static void
958didWrite( struct bufferevent * evin UNUSED, void * vpeer )
959{
960    pulse( (tr_peermsgs *) vpeer );
961}
962
963static void
964gotError( struct bufferevent * evbuf UNUSED, short what UNUSED, void * vpeer )
965{
966    fireGotError( (tr_peermsgs*)vpeer );
967}
968
969static void
970sendBitfield( tr_peermsgs * peer )
971{
972    const tr_bitfield * bitfield = tr_cpPieceBitfield( peer->torrent->completion );
973    const uint32_t len = sizeof(uint8_t) + bitfield->len;
974    const uint8_t bt_msgid = BT_BITFIELD;
975
976    fprintf( stderr, "peer %p: enqueueing a bitfield message\n", peer );
977    tr_peerIoWriteUint32( peer->io, peer->outMessages, len );
978    tr_peerIoWriteBytes( peer->io, peer->outMessages, &bt_msgid, 1 );
979    tr_peerIoWriteBytes( peer->io, peer->outMessages, bitfield->bits, bitfield->len );
980}
981
982/**
983***
984**/
985
986#define MAX_DIFFS 50
987
988typedef struct
989{
990    tr_pex * added;
991    tr_pex * dropped;
992    tr_pex * elements;
993    int addedCount;
994    int droppedCount;
995    int elementCount;
996    int diffCount;
997}
998PexDiffs;
999
1000static void
1001pexAddedCb( void * vpex, void * userData )
1002{
1003    PexDiffs * diffs = (PexDiffs *) userData;
1004    tr_pex * pex = (tr_pex *) vpex;
1005    if( diffs->diffCount < MAX_DIFFS )
1006    {
1007        diffs->diffCount++;
1008        diffs->added[diffs->addedCount++] = *pex;
1009        diffs->elements[diffs->elementCount++] = *pex;
1010    }
1011}
1012
1013static void
1014pexRemovedCb( void * vpex, void * userData )
1015{
1016    PexDiffs * diffs = (PexDiffs *) userData;
1017    tr_pex * pex = (tr_pex *) vpex;
1018    if( diffs->diffCount < MAX_DIFFS )
1019    {
1020        diffs->diffCount++;
1021        diffs->dropped[diffs->droppedCount++] = *pex;
1022    }
1023}
1024
1025static void
1026pexElementCb( void * vpex, void * userData )
1027{
1028    PexDiffs * diffs = (PexDiffs *) userData;
1029    tr_pex * pex = (tr_pex *) vpex;
1030    if( diffs->diffCount < MAX_DIFFS )
1031    {
1032        diffs->diffCount++;
1033        diffs->elements[diffs->elementCount++] = *pex;
1034    }
1035}
1036
1037static void
1038sendPex( tr_peermsgs * msgs )
1039{
1040    if( msgs->peerSupportsPex && !msgs->torrent->pexDisabled )
1041    {
1042        int i;
1043        tr_pex * newPex = NULL;
1044        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1045        PexDiffs diffs;
1046        benc_val_t val, *added, *dropped, *flags;
1047        uint8_t *tmp, *walk;
1048        char * benc;
1049        int bencLen;
1050        const uint8_t bt_msgid = BT_LTEP;
1051        const uint8_t ltep_msgid = msgs->ut_pex_id;
1052
1053        /* build the diffs */
1054        diffs.added = tr_new( tr_pex, newCount );
1055        diffs.addedCount = 0;
1056        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1057        diffs.droppedCount = 0;
1058        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1059        diffs.elementCount = 0;
1060        diffs.diffCount = 0;
1061        tr_set_compare( msgs->pex, msgs->pexCount,
1062                        newPex, newCount,
1063                        tr_pexCompare, sizeof(tr_pex),
1064                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1065        fprintf( stderr, "pex: old peer count %d, new peer count %d, added %d, removed %d\n", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1066
1067        /* update peer */
1068        tr_free( msgs->pex );
1069        msgs->pex = diffs.elements;
1070        msgs->pexCount = diffs.elementCount;
1071
1072        /* build the pex payload */
1073        tr_bencInit( &val, TYPE_DICT );
1074        tr_bencDictReserve( &val, 3 );
1075
1076        /* "added" */
1077        added = tr_bencDictAdd( &val, "added" );
1078        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1079        for( i=0; i<diffs.addedCount; ++i ) {
1080            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1081            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1082        }
1083        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1084        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1085
1086        /* "added.f" */
1087        flags = tr_bencDictAdd( &val, "added.f" );
1088        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1089        for( i=0; i<diffs.addedCount; ++i ) {
1090            fprintf( stderr, "PEX -->> -->> flag is %d\n", (int)diffs.added[i].flags );
1091            *walk++ = diffs.added[i].flags;
1092        }
1093        assert( ( walk - tmp ) == diffs.addedCount );
1094        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1095
1096        /* "dropped" */
1097        dropped = tr_bencDictAdd( &val, "dropped" );
1098        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1099        for( i=0; i<diffs.droppedCount; ++i ) {
1100            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1101            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1102        }
1103        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1104        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1105
1106        /* write the pex message */
1107        benc = tr_bencSaveMalloc( &val, &bencLen );
1108        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 1 + 1 + bencLen );
1109        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, &bt_msgid, 1 );
1110        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, &ltep_msgid, 1 );
1111        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1112
1113        /* cleanup */
1114        tr_free( benc );
1115        tr_bencFree( &val );
1116        tr_free( diffs.added );
1117        tr_free( diffs.dropped );
1118        tr_free( newPex );
1119
1120        msgs->clientSentPexAt = time( NULL );
1121    }
1122}
1123
1124static int
1125pexPulse( void * vpeer )
1126{
1127    sendPex( vpeer );
1128    return TRUE;
1129}
1130
1131/**
1132***
1133**/
1134
1135tr_peermsgs*
1136tr_peerMsgsNew( struct tr_torrent * torrent, struct tr_peer * info )
1137{
1138    tr_peermsgs * peer;
1139
1140    assert( info != NULL );
1141    assert( info->io != NULL );
1142
1143    peer = tr_new0( tr_peermsgs, 1 );
1144    peer->publisher = tr_publisherNew( );
1145    peer->info = info;
1146    peer->handle = torrent->handle;
1147    peer->torrent = torrent;
1148    peer->io = info->io;
1149    peer->info->clientIsChoked = 1;
1150    peer->info->peerIsChoked = 1;
1151    peer->info->clientIsInterested = 0;
1152    peer->info->peerIsInterested = 0;
1153    peer->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1154    peer->pulseTimer = tr_timerNew( peer->handle, pulse, peer, PEER_PULSE_INTERVAL );
1155    peer->pexTimer = tr_timerNew( peer->handle, pexPulse, peer, PEX_INTERVAL );
1156    peer->outMessages = evbuffer_new( );
1157    peer->outBlock = evbuffer_new( );
1158    peer->inBlock = evbuffer_new( );
1159
1160    tr_peerIoSetIOFuncs( peer->io, canRead, didWrite, gotError, peer );
1161    tr_peerIoSetIOMode( peer->io, EV_READ|EV_WRITE, 0 );
1162
1163    sendBitfield( peer );
1164    fireNeedReq( peer );
1165
1166    return peer;
1167}
1168
1169void
1170tr_peerMsgsFree( tr_peermsgs* p )
1171{
1172    if( p != NULL )
1173    {
1174        tr_timerFree( &p->pulseTimer );
1175        tr_timerFree( &p->pexTimer );
1176        tr_publisherFree( &p->publisher );
1177        tr_list_foreach( p->clientAskedFor, tr_free );
1178        tr_list_free( &p->clientAskedFor );
1179        tr_list_foreach( p->peerAskedFor, tr_free );
1180        tr_list_free( &p->peerAskedFor );
1181        evbuffer_free( p->outMessages );
1182        evbuffer_free( p->outBlock );
1183        evbuffer_free( p->inBlock );
1184        tr_free( p );
1185    }
1186}
1187
1188tr_publisher_tag
1189tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1190                      tr_delivery_func    func,
1191                      void              * userData )
1192{
1193    return tr_publisherSubscribe( peer->publisher, func, userData );
1194}
1195
1196void
1197tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1198                        tr_publisher_tag    tag )
1199{
1200    tr_publisherUnsubscribe( peer->publisher, tag );
1201}
Note: See TracBrowser for help on using the repository browser.