source: trunk/libtransmission/peer-msgs.c @ 3777

Last change on this file since 3777 was 3777, checked in by charles, 13 years ago
  • fix a bug that could corrupt peer connections, causing slower speeds and/or fewer connections
  • add more assertions to smoke out any remaining BT protocol bugs
  • Property svn:keywords set to Date Rev Author Id
File size: 49.5 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3777 2007-11-10 04:56:27Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <errno.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <netinet/in.h> /* struct in_addr */
22
23#include <event.h>
24
25#include "transmission.h"
26#include "bencode.h"
27#include "completion.h"
28#include "inout.h"
29#include "list.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "trevent.h"
36#include "utils.h"
37
38/**
39***
40**/
41
42enum
43{
44    BT_CHOKE                = 0,
45    BT_UNCHOKE              = 1,
46    BT_INTERESTED           = 2,
47    BT_NOT_INTERESTED       = 3,
48    BT_HAVE                 = 4,
49    BT_BITFIELD             = 5,
50    BT_REQUEST              = 6,
51    BT_PIECE                = 7,
52    BT_CANCEL               = 8,
53    BT_PORT                 = 9,
54    BT_SUGGEST              = 13,
55    BT_HAVE_ALL             = 14,
56    BT_HAVE_NONE            = 15,
57    BT_REJECT               = 16,
58    BT_ALLOWED_FAST         = 17,
59    BT_LTEP                 = 20,
60
61    LTEP_HANDSHAKE          = 0,
62
63    OUR_LTEP_PEX            = 1,
64
65    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
66
67    /* idle seconds before we send a keepalive */
68    KEEPALIVE_INTERVAL_SECS = 90,
69
70    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
71    PEER_PULSE_INTERVAL     = (100),       /* msec between calls to pulse() */
72    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
73
74    /* number of pieces generated for allow-fast,
75      threshold for fast-allowing others */
76    MAX_ALLOWED_SET_COUNT   = 10
77};
78
79enum
80{
81    AWAITING_BT_LENGTH,
82    AWAITING_BT_MESSAGE
83};
84
85struct peer_request
86{
87    uint32_t index;
88    uint32_t offset;
89    uint32_t length;
90    time_t time_requested;
91};
92
93static int
94compareRequest( const void * va, const void * vb )
95{
96    int i;
97    const struct peer_request * a = va;
98    const struct peer_request * b = vb;
99    if(( i = tr_compareUint32( a->index, b->index ))) return i;
100    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
101    if(( i = tr_compareUint32( a->length, b->length ))) return i;
102    return 0;
103}
104
105struct tr_peermsgs
106{
107    tr_peer * info;
108
109    tr_handle * handle;
110    tr_torrent * torrent;
111    tr_peerIo * io;
112
113    tr_publisher_t * publisher;
114
115    struct evbuffer * outBlock;    /* buffer of all the current piece message */
116    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
117    tr_list * peerAskedFor;
118    tr_list * peerAskedForFast;
119    tr_list * clientAskedFor;
120    tr_list * clientWillAskFor;
121
122    tr_timer * rateTimer;
123    tr_timer * pulseTimer;
124    tr_timer * pexTimer;
125
126    time_t lastReqAddedAt;
127    time_t clientSentPexAt;
128    time_t clientSentAnythingAt;
129
130    unsigned int notListening             : 1;
131    unsigned int peerSupportsPex          : 1;
132    unsigned int clientSentLtepHandshake  : 1;
133    unsigned int peerSentLtepHandshake    : 1;
134   
135    tr_bitfield * clientAllowedPieces;
136    tr_bitfield * peerAllowedPieces;
137   
138    uint8_t state;
139    uint8_t ut_pex_id;
140    uint16_t pexCount;
141    uint32_t incomingMessageLength;
142    uint32_t maxActiveRequests;
143    uint32_t minActiveRequests;
144
145    tr_pex * pex;
146};
147
148/**
149***
150**/
151
152static void
153myDebug( const char * file, int line,
154         const struct tr_peermsgs * msgs,
155         const char * fmt, ... )
156{
157    FILE * fp = tr_getLog( );
158    if( fp != NULL )
159    {
160        va_list args;
161        char timestr[64];
162        struct evbuffer * buf = evbuffer_new( );
163        char * myfile = tr_strdup( file );
164
165        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
166                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
167                             msgs->torrent->info.name,
168                             tr_peerIoGetAddrStr( msgs->io ),
169                             msgs->info->client );
170        va_start( args, fmt );
171        evbuffer_add_vprintf( buf, fmt, args );
172        va_end( args );
173        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
174        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
175
176        tr_free( myfile );
177        evbuffer_free( buf );
178    }
179}
180
181#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
182
183/**
184***
185**/
186
187static void
188protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
189{
190    tr_peerIo * io = msgs->io;
191    struct evbuffer * out = msgs->outMessages;
192
193    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
194    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
195    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
196    tr_peerIoWriteUint32( io, out, req->index );
197    tr_peerIoWriteUint32( io, out, req->offset );
198    tr_peerIoWriteUint32( io, out, req->length );
199}
200
201static void
202protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
203{
204    tr_peerIo * io = msgs->io;
205    struct evbuffer * out = msgs->outMessages;
206
207    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
208    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
209    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
210    tr_peerIoWriteUint32( io, out, req->index );
211    tr_peerIoWriteUint32( io, out, req->offset );
212    tr_peerIoWriteUint32( io, out, req->length );
213}
214
215static void
216protocolSendHave( tr_peermsgs * msgs, uint32_t index )
217{
218    tr_peerIo * io = msgs->io;
219    struct evbuffer * out = msgs->outMessages;
220
221    dbgmsg( msgs, "sending Have %u", index );
222    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
223    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
224    tr_peerIoWriteUint32( io, out, index );
225}
226
227static void
228protocolSendChoke( tr_peermsgs * msgs, int choke )
229{
230    tr_peerIo * io = msgs->io;
231    struct evbuffer * out = msgs->outMessages;
232
233    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
234    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
235    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
236}
237
238/**
239***  EVENTS
240**/
241
242static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
243
244static void
245publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
246{
247    tr_publisherPublish( msgs->publisher, msgs->info, e );
248}
249
250static void
251fireGotError( tr_peermsgs * msgs )
252{
253    tr_peermsgs_event e = blankEvent;
254    e.eventType = TR_PEERMSG_GOT_ERROR;
255    publish( msgs, &e );
256}
257
258static void
259fireNeedReq( tr_peermsgs * msgs )
260{
261    tr_peermsgs_event e = blankEvent;
262    e.eventType = TR_PEERMSG_NEED_REQ;
263    publish( msgs, &e );
264}
265
266static void
267firePeerProgress( tr_peermsgs * msgs )
268{
269    tr_peermsgs_event e = blankEvent;
270    e.eventType = TR_PEERMSG_PEER_PROGRESS;
271    e.progress = msgs->info->progress;
272    publish( msgs, &e );
273}
274
275static void
276fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
277{
278    tr_peermsgs_event e = blankEvent;
279    e.eventType = TR_PEERMSG_CLIENT_HAVE;
280    e.pieceIndex = pieceIndex;
281    publish( msgs, &e );
282}
283
284static void
285fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
286{
287    tr_peermsgs_event e = blankEvent;
288    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
289    e.pieceIndex = req->index;
290    e.offset = req->offset;
291    e.length = req->length;
292    publish( msgs, &e );
293}
294
295static void
296fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
297{
298    tr_peermsgs_event e = blankEvent;
299    e.eventType = TR_PEERMSG_CANCEL;
300    e.pieceIndex = req->index;
301    e.offset = req->offset;
302    e.length = req->length;
303    publish( msgs, &e );
304}
305
306/**
307***  INTEREST
308**/
309
310static int
311isPieceInteresting( const tr_peermsgs   * peer,
312                    int                   piece )
313{
314    const tr_torrent * torrent = peer->torrent;
315    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
316        return FALSE;
317    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
318        return FALSE;
319    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
320        return FALSE;
321    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
322        return FALSE;
323    return TRUE;
324}
325
326/* "interested" means we'll ask for piece data if they unchoke us */
327static int
328isPeerInteresting( const tr_peermsgs * msgs )
329{
330    int i;
331    const tr_torrent * torrent;
332    const tr_bitfield * bitfield;
333    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
334
335    if( clientIsSeed )
336        return FALSE;
337
338    torrent = msgs->torrent;
339    bitfield = tr_cpPieceBitfield( torrent->completion );
340
341    if( !msgs->info->have )
342        return TRUE;
343
344    assert( bitfield->len == msgs->info->have->len );
345    for( i=0; i<torrent->info.pieceCount; ++i )
346        if( isPieceInteresting( msgs, i ) )
347            return TRUE;
348
349    return FALSE;
350}
351
352static void
353sendInterest( tr_peermsgs * msgs, int weAreInterested )
354{
355    assert( msgs != NULL );
356    assert( weAreInterested==0 || weAreInterested==1 );
357
358    msgs->info->clientIsInterested = weAreInterested;
359    dbgmsg( msgs, "Sending %s",
360            weAreInterested ? "Interested" : "Not Interested");
361
362    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
363    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
364                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
365}
366
367static void
368updateInterest( tr_peermsgs * msgs )
369{
370    const int i = isPeerInteresting( msgs );
371    if( i != msgs->info->clientIsInterested )
372        sendInterest( msgs, i );
373    if( i )
374        fireNeedReq( msgs );
375}
376
377#define MIN_CHOKE_PERIOD_SEC 10
378
379static void
380cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
381{
382    tr_list_free( &msgs->peerAskedFor, tr_free );
383}
384
385void
386tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
387{
388    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
389
390    assert( msgs != NULL );
391    assert( msgs->info != NULL );
392    assert( choke==0 || choke==1 );
393
394    if( msgs->info->chokeChangedAt > fibrillationTime )
395    {
396        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
397    }
398    else if( msgs->info->peerIsChoked != choke )
399    {
400        msgs->info->peerIsChoked = choke;
401        if( choke )
402            cancelAllRequestsToClientExceptFast( msgs );
403        protocolSendChoke( msgs, choke );
404        msgs->info->chokeChangedAt = time( NULL );
405    }
406}
407
408/**
409***
410**/
411
412void
413tr_peerMsgsHave( tr_peermsgs * msgs,
414                 uint32_t      index )
415{
416    protocolSendHave( msgs, index );
417
418    /* since we have more pieces now, we might not be interested in this peer */
419    updateInterest( msgs );
420}
421#if 0
422static void
423sendFastSuggest( tr_peermsgs * msgs,
424                 uint32_t      pieceIndex )
425{
426    dbgmsg( msgs, "w00t SUGGESTing them piece #%d", pieceIndex );
427    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
428    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
429    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
430   
431    updateInterest( msgs );
432}
433#endif
434static void
435sendFastHave( tr_peermsgs * msgs, int all )
436{
437    dbgmsg( msgs, "w00t telling them we have %s pieces", (all ? "ALL" : "NONE" ) );
438    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
439    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
440                                                            : BT_HAVE_NONE ) );
441    updateInterest( msgs );
442}
443
444static void
445sendFastReject( tr_peermsgs * msgs,
446                uint32_t      pieceIndex,
447                uint32_t      offset,
448                uint32_t      length )
449{
450    assert( msgs != NULL );
451
452    if( tr_peerIoSupportsFEXT( msgs->io ) )
453    {
454        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
455        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
456        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
457        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
458        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
459        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
460    }
461}
462
463static void
464sendFastAllowed( tr_peermsgs * msgs,
465                 uint32_t      pieceIndex)
466{
467    dbgmsg( msgs, "w00t telling them we ALLOW_FAST piece #%d", pieceIndex );
468    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
469    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
470    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
471}
472
473
474
475static void
476sendFastAllowedSet( tr_peermsgs * msgs )
477{
478    int i = 0;
479    while (i <= msgs->torrent->info.pieceCount )
480    {
481        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
482            sendFastAllowed( msgs, i );
483        i++;
484    }
485}
486
487
488/**
489***
490**/
491
492static int
493reqIsValid( const tr_peermsgs   * msgs,
494            uint32_t              index,
495            uint32_t              offset,
496            uint32_t              length )
497{
498    const tr_torrent * tor = msgs->torrent;
499
500    if( index >= (uint32_t) tor->info.pieceCount )
501        return FALSE;
502    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
503        return FALSE;
504    if( length > MAX_REQUEST_BYTE_COUNT )
505        return FALSE;
506    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
507        return FALSE;
508
509    return TRUE;
510}
511
512static int
513requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
514{
515    return reqIsValid( msgs, req->index, req->offset, req->length );
516}
517
518static void
519pumpRequestQueue( tr_peermsgs * msgs )
520{
521    const int max = msgs->maxActiveRequests;
522    const int min = msgs->minActiveRequests;
523    int count = tr_list_size( msgs->clientAskedFor );
524    int sent = 0;
525
526    if( count > min )
527        return;
528    if( msgs->info->clientIsChoked )
529        return;
530
531    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
532    {
533        struct peer_request * r = tr_list_pop_front( &msgs->clientWillAskFor );
534        protocolSendRequest( msgs, r );
535        r->time_requested = msgs->lastReqAddedAt = time( NULL );
536        tr_list_append( &msgs->clientAskedFor, r );
537        ++count;
538        ++sent;
539    }
540
541    if( sent )
542        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
543                sent,
544                tr_list_size(msgs->clientAskedFor),
545                tr_list_size(msgs->clientWillAskFor) );
546
547    if( count < max )
548        fireNeedReq( msgs );
549}
550
551int
552tr_peerMsgsAddRequest( tr_peermsgs * msgs,
553                       uint32_t      index, 
554                       uint32_t      offset, 
555                       uint32_t      length )
556{
557    const int req_max = msgs->maxActiveRequests;
558    struct peer_request tmp, *req;
559
560    assert( msgs != NULL );
561    assert( msgs->torrent != NULL );
562    assert( reqIsValid( msgs, index, offset, length ) );
563
564    /**
565    ***  Reasons to decline the request
566    **/
567
568    /* don't send requests to choked clients */
569    if( msgs->info->clientIsChoked ) {
570        dbgmsg( msgs, "declining request because they're choking us" );
571        return TR_ADDREQ_CLIENT_CHOKED;
572    }
573
574    /* peer doesn't have this piece */
575    if( !tr_bitfieldHas( msgs->info->have, index ) )
576        return TR_ADDREQ_MISSING;
577
578    /* peer's queue is full */
579    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
580        dbgmsg( msgs, "declining request because we're full" );
581        return TR_ADDREQ_FULL;
582    }
583
584    /* have we already asked for this piece? */
585    tmp.index = index;
586    tmp.offset = offset;
587    tmp.length = length;
588    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
589        dbgmsg( msgs, "declining because it's a duplicate" );
590        return TR_ADDREQ_DUPLICATE;
591    }
592    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
593        dbgmsg( msgs, "declining because it's a duplicate" );
594        return TR_ADDREQ_DUPLICATE;
595    }
596
597    /**
598    ***  Accept this request
599    **/
600
601    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
602    req = tr_new0( struct peer_request, 1 );
603    *req = tmp;
604    tr_list_append( &msgs->clientWillAskFor, req );
605    return TR_ADDREQ_OK;
606}
607
608static void
609cancelAllRequestsToPeer( tr_peermsgs * msgs )
610{
611    struct peer_request * req;
612
613    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
614    {
615        fireCancelledReq( msgs, req );
616        tr_free( req );
617    }
618
619    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
620    {
621        fireCancelledReq( msgs, req );
622        protocolSendCancel( msgs, req );
623        tr_free( req );
624    }
625}
626
627void
628tr_peerMsgsCancel( tr_peermsgs * msgs,
629                   uint32_t      pieceIndex,
630                   uint32_t      offset,
631                   uint32_t      length )
632{
633    struct peer_request *req, tmp;
634
635    assert( msgs != NULL );
636    assert( length > 0 );
637
638    /* have we asked the peer for this piece? */
639    tmp.index = pieceIndex;
640    tmp.offset = offset;
641    tmp.length = length;
642
643    /* if it's only in the queue and hasn't been sent yet, free it */
644    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
645    {
646        fireCancelledReq( msgs, req );
647        tr_free( req );
648    }
649
650    /* if it's already been sent, send a cancel message too */
651    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
652    {
653        protocolSendCancel( msgs, req );
654        fireCancelledReq( msgs, req );
655        tr_free( req );
656    }
657}
658
659/**
660***
661**/
662
663static void
664sendLtepHandshake( tr_peermsgs * msgs )
665{
666    benc_val_t val, *m;
667    char * buf;
668    int len;
669    int pex;
670    const char * v = TR_NAME " " USERAGENT_PREFIX;
671    const int port = tr_getPublicPort( msgs->handle );
672    struct evbuffer * outbuf;
673
674    if( msgs->clientSentLtepHandshake )
675        return;
676
677    outbuf = evbuffer_new( );
678    dbgmsg( msgs, "sending an ltep handshake" );
679    msgs->clientSentLtepHandshake = 1;
680
681    /* decide if we want to advertise pex support */
682    if( !tr_torrentIsPexEnabled( msgs->torrent ) )
683        pex = 0;
684    else if( msgs->peerSentLtepHandshake )
685        pex = msgs->peerSupportsPex ? 1 : 0;
686    else
687        pex = 1;
688
689    tr_bencInit( &val, TYPE_DICT );
690    tr_bencDictReserve( &val, 4 );
691    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
692    m  = tr_bencDictAdd( &val, "m" );
693    tr_bencInit( m, TYPE_DICT );
694    if( pex ) {
695        tr_bencDictReserve( m, 1 );
696        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
697    }
698    if( port > 0 )
699        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
700    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
701    buf = tr_bencSave( &val, &len );
702
703    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
704    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
705    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
706    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
707
708    tr_peerIoWriteBuf( msgs->io, outbuf );
709
710#if 0
711    dbgmsg( msgs, "here is the ltep handshake we sent:" );
712    tr_bencPrint( &val );
713#endif
714
715    /* cleanup */
716    tr_bencFree( &val );
717    tr_free( buf );
718    evbuffer_free( outbuf );
719}
720
721static void
722parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
723{
724    benc_val_t val, * sub;
725    uint8_t * tmp = tr_new( uint8_t, len );
726
727    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
728    msgs->peerSentLtepHandshake = 1;
729
730    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
731        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
732        tr_free( tmp );
733        return;
734    }
735
736#if 0
737    dbgmsg( msgs, "here is the ltep handshake we read:" );
738    tr_bencPrint( &val );
739#endif
740
741    /* does the peer prefer encrypted connections? */
742    sub = tr_bencDictFind( &val, "e" );
743    if( tr_bencIsInt( sub ) )
744        msgs->info->encryption_preference = sub->val.i
745                                      ? ENCRYPTION_PREFERENCE_YES
746                                      : ENCRYPTION_PREFERENCE_NO;
747
748    /* check supported messages for utorrent pex */
749    sub = tr_bencDictFind( &val, "m" );
750    if( tr_bencIsDict( sub ) ) {
751        sub = tr_bencDictFind( sub, "ut_pex" );
752        if( tr_bencIsInt( sub ) ) {
753            msgs->peerSupportsPex = 1;
754            msgs->ut_pex_id = (uint8_t) sub->val.i;
755            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
756        }
757    }
758
759    /* get peer's listening port */
760    sub = tr_bencDictFind( &val, "p" );
761    if( tr_bencIsInt( sub ) ) {
762        msgs->info->port = htons( (uint16_t)sub->val.i );
763        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
764    }
765
766    tr_bencFree( &val );
767    tr_free( tmp );
768}
769
770static void
771parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
772{
773    benc_val_t val, * sub;
774    uint8_t * tmp;
775
776    if( !tr_torrentIsPexEnabled( msgs->torrent ) ) /* no sharing! */
777        return;
778
779    tmp = tr_new( uint8_t, msglen );
780    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
781
782    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
783        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
784        tr_free( tmp );
785        return;
786    }
787
788    sub = tr_bencDictFind( &val, "added" );
789    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
790        const int n = sub->val.s.i / 6 ;
791        dbgmsg( msgs, "got %d peers from uT pex", n );
792        tr_peerMgrAddPeers( msgs->handle->peerMgr,
793                            msgs->torrent->info.hash,
794                            TR_PEER_FROM_PEX,
795                            (uint8_t*)sub->val.s.s, n );
796    }
797
798    tr_bencFree( &val );
799    tr_free( tmp );
800}
801
802static void
803sendPex( tr_peermsgs * msgs );
804
805static void
806parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
807{
808    uint8_t ltep_msgid;
809
810    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
811    msglen--;
812
813    if( ltep_msgid == LTEP_HANDSHAKE )
814    {
815        dbgmsg( msgs, "got ltep handshake" );
816        parseLtepHandshake( msgs, msglen, inbuf );
817        sendLtepHandshake( msgs );
818        sendPex( msgs );
819    }
820    else if( ltep_msgid == msgs->ut_pex_id )
821    {
822        dbgmsg( msgs, "got ut pex" );
823        msgs->peerSupportsPex = 1;
824        parseUtPex( msgs, msglen, inbuf );
825    }
826    else
827    {
828        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
829        evbuffer_drain( inbuf, msglen );
830    }
831}
832
833static int
834readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
835{
836    uint32_t len;
837    const size_t needlen = sizeof(uint32_t);
838
839    if( EVBUFFER_LENGTH(inbuf) < needlen )
840        return READ_MORE;
841
842    tr_peerIoReadUint32( msgs->io, inbuf, &len );
843
844    if( len == 0 ) /* peer sent us a keepalive message */
845        dbgmsg( msgs, "got KeepAlive" );
846    else {
847        msgs->incomingMessageLength = len;
848        msgs->state = AWAITING_BT_MESSAGE;
849    }
850
851    return READ_AGAIN;
852}
853
854static void
855updatePeerProgress( tr_peermsgs * msgs )
856{
857    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
858    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
859    updateInterest( msgs );
860    firePeerProgress( msgs );
861}
862
863static int
864clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
865{
866    /* FIXME(tiennou): base this on how many blocks we've already sent this
867     * peer, or maybe how many fast blocks per minute we've sent overall,
868     * or maybe how much bandwidth we're already using up w/o fast peers.
869     * I don't know what the Right Thing here is, but
870     * the previous measurement of how many pieces we have is not enough. */
871    return FALSE;
872}
873
874static void
875peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
876{
877    const int reqIsValid = requestIsValid( msgs, req );
878    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
879    const int peerIsChoked = msgs->info->peerIsChoked;
880    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
881    const int pieceIsFast = reqIsValid && tr_bitfieldHas( msgs->peerAllowedPieces, req->index );
882    const int canSendFast = clientCanSendFastBlock( msgs );
883
884    if( !reqIsValid ) /* bad request */
885    {
886        dbgmsg( msgs, "rejecting an invalid request." );
887        sendFastReject( msgs, req->index, req->offset, req->length );
888    }
889    else if( !clientHasPiece ) /* we don't have it */
890    {
891        dbgmsg( msgs, "rejecting request for a piece we don't have." );
892        sendFastReject( msgs, req->index, req->offset, req->length );
893    }
894    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
895    {
896        tr_peerMsgsSetChoke( msgs, 1 );
897        sendFastReject( msgs, req->index, req->offset, req->length );
898    }
899    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
900    {
901        sendFastReject( msgs, req->index, req->offset, req->length );
902    }
903    else /* YAY */
904    {
905        struct peer_request * tmp = tr_new( struct peer_request, 1 );
906        *tmp = *req;
907        if( peerIsFast && pieceIsFast )
908            tr_list_append( &msgs->peerAskedForFast, tmp );
909        else
910            tr_list_append( &msgs->peerAskedFor, tmp );
911    }
912}
913
914static int
915messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
916{
917    switch( id )
918    {
919        case BT_CHOKE:
920        case BT_UNCHOKE:
921        case BT_INTERESTED:
922        case BT_NOT_INTERESTED:
923        case BT_HAVE_ALL:
924        case BT_HAVE_NONE:
925            return len==1;
926
927        case BT_HAVE:
928        case BT_SUGGEST:
929        case BT_ALLOWED_FAST:
930            return len==5;
931
932        case BT_BITFIELD:
933            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
934       
935        case BT_REQUEST:
936        case BT_CANCEL:
937        case BT_REJECT:
938            return len==13;
939
940        case BT_PIECE:
941            return len>9 && len<=16393;
942
943        case BT_PORT:
944            return len==3;
945
946        case BT_LTEP:
947            return len >= 2;
948
949        default:
950            return FALSE;
951    }
952}
953
954static void
955clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
956
957static int
958readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
959{
960    uint8_t id;
961    uint32_t ui32;
962    uint32_t msglen = msgs->incomingMessageLength;
963    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
964
965    if( EVBUFFER_LENGTH(inbuf) < msglen )
966        return READ_MORE;
967
968    tr_peerIoReadUint8( msgs->io, inbuf, &id );
969    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)EVBUFFER_LENGTH(inbuf) );
970
971assert( messageLengthIsCorrect( msgs, id, msglen ) );
972
973    if( !messageLengthIsCorrect( msgs, id, msglen ) )
974    {
975        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
976        fireGotError( msgs );
977        return READ_DONE;
978    }
979
980    --msglen;
981
982    switch( id )
983    {
984        case BT_CHOKE:
985            dbgmsg( msgs, "got Choke" );
986            msgs->info->clientIsChoked = 1;
987            cancelAllRequestsToPeer( msgs );
988            cancelAllRequestsToClientExceptFast( msgs );
989            break;
990
991        case BT_UNCHOKE:
992            dbgmsg( msgs, "got Unchoke" );
993            msgs->info->clientIsChoked = 0;
994            fireNeedReq( msgs );
995            break;
996
997        case BT_INTERESTED:
998            dbgmsg( msgs, "got Interested" );
999            msgs->info->peerIsInterested = 1;
1000            tr_peerMsgsSetChoke( msgs, 0 );
1001            break;
1002
1003        case BT_NOT_INTERESTED:
1004            dbgmsg( msgs, "got Not Interested" );
1005            msgs->info->peerIsInterested = 0;
1006            break;
1007
1008        case BT_HAVE:
1009            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1010            dbgmsg( msgs, "got Have: %u", ui32 );
1011            tr_bitfieldAdd( msgs->info->have, ui32 );
1012            updatePeerProgress( msgs );
1013            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1014            break;
1015
1016        case BT_BITFIELD: {
1017            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1018            dbgmsg( msgs, "got a bitfield" );
1019            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1020            updatePeerProgress( msgs );
1021            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
1022            fireNeedReq( msgs );
1023            break;
1024        }
1025
1026        case BT_REQUEST: {
1027            struct peer_request req;
1028            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1029            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1030            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1031            dbgmsg( msgs, "got Request: %u:%u->%u", req.index, req.offset, req.length );
1032            peerMadeRequest( msgs, &req );
1033            break;
1034        }
1035
1036        case BT_CANCEL: {
1037            struct peer_request req;
1038            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1039            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1040            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1041            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1042            tr_free( tr_list_remove( &msgs->peerAskedForFast, &req, compareRequest ) );
1043            tr_free( tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ) );
1044            break;
1045        }
1046
1047        case BT_PIECE: {
1048            uint8_t * block;
1049            struct peer_request req;
1050            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1051            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1052            req.length = msglen - 8;
1053            block = tr_new( uint8_t, req.length );
1054            tr_peerIoReadBytes( msgs->io, inbuf, block, req.length );
1055            dbgmsg( msgs, "got a Block %u:%u->%u", req.index, req.offset, req.length );
1056            clientGotBlock( msgs, block, &req );
1057            tr_free( block );
1058            break;
1059        }
1060
1061        case BT_PORT:
1062            dbgmsg( msgs, "Got a BT_PORT" );
1063            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1064            break;
1065
1066#if 0 
1067        case BT_SUGGEST: {
1068            /* FIXME(tiennou) */
1069            uint32_t index;
1070            tr_peerIoReadUint32( msgs->io, inbuf, &index );
1071            break;
1072        }
1073
1074        case BT_HAVE_ALL:
1075            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1076            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1077            updatePeerProgress( msgs );
1078            break;
1079
1080        case BT_HAVE_NONE:
1081            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1082            tr_bitfieldClear( msgs->info->have );
1083            updatePeerProgress( msgs );
1084            break;
1085
1086        case BT_REJECT: {
1087            struct peer_request req;
1088            dbgmsg( msgs, "Got a BT_REJECT" );
1089            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1090            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1091            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1092            tr_free( tr_list_remove( &msgs->clientAskedFor, &req, compareRequest ) );
1093            break;
1094        }
1095
1096        case BT_ALLOWED_FAST: {
1097            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1098            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1099            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1100            break;
1101        }
1102#endif
1103
1104        case BT_LTEP:
1105            dbgmsg( msgs, "Got a BT_LTEP" );
1106            parseLtep( msgs, msglen, inbuf );
1107            break;
1108
1109        default:
1110            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1111            tr_peerIoDrain( msgs->io, inbuf, msglen );
1112assert( 0 );
1113            break;
1114    }
1115
1116
1117    dbgmsg( msgs, "startBufLen was %d, msglen was %d, current inbuf len is %d", (int)startBufLen, (int)(msglen+1), (int)EVBUFFER_LENGTH(inbuf) );
1118assert( msglen + 1 == msgs->incomingMessageLength );
1119assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msgs->incomingMessageLength );
1120
1121    msgs->incomingMessageLength = -1;
1122    msgs->state = AWAITING_BT_LENGTH;
1123    return READ_AGAIN;
1124}
1125
1126static void
1127clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1128{
1129    tr_torrent * tor = msgs->torrent;
1130    tor->activityDate = tr_date( );
1131    tor->downloadedCur += byteCount;
1132    msgs->info->pieceDataActivityDate = time( NULL );
1133    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1134    tr_rcTransferred( tor->download, byteCount );
1135    tr_rcTransferred( tor->handle->download, byteCount );
1136}
1137
1138static void
1139peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1140{
1141    tr_torrent * tor = msgs->torrent;
1142    tor->activityDate = tr_date( );
1143    tor->uploadedCur += byteCount;
1144    msgs->info->pieceDataActivityDate = time( NULL );
1145    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1146    tr_rcTransferred( tor->upload, byteCount );
1147    tr_rcTransferred( tor->handle->upload, byteCount );
1148}
1149
1150static int
1151canDownload( const tr_peermsgs * msgs )
1152{
1153    tr_torrent * tor = msgs->torrent;
1154
1155    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1156        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
1157
1158    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1159        return tr_rcCanTransfer( tor->download );
1160
1161    return TRUE;
1162}
1163
1164static void
1165reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1166{
1167    tr_torrent * tor = msgs->torrent;
1168
1169    /* increment the `corrupt' field */
1170    tor->corruptCur += byteCount;
1171
1172    /* decrement the `downloaded' field */
1173    if( tor->downloadedCur >= byteCount )
1174        tor->downloadedCur -= byteCount;
1175    else
1176        tor->downloadedCur = 0;
1177}
1178
1179
1180static void
1181gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1182{
1183    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1184    reassignBytesToCorrupt( msgs, byteCount );
1185}
1186
1187static void
1188clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1189{
1190    reassignBytesToCorrupt( msgs, req->length );
1191}
1192
1193static void
1194addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1195{
1196    if( !msgs->info->blame )
1197         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1198    tr_bitfieldAdd( msgs->info->blame, index );
1199}
1200
1201static void
1202clientGotBlock( tr_peermsgs * msgs, const uint8_t * data, const struct peer_request * req )
1203{
1204    int i;
1205    tr_torrent * tor = msgs->torrent;
1206    const int block = _tr_block( tor, req->index, req->offset );
1207    struct peer_request *myreq;
1208
1209    assert( msgs != NULL );
1210    assert( req != NULL );
1211    assert( req->length > 0 );
1212    assert( req->length == (uint32_t)tr_torBlockCountBytes( msgs->torrent, block ) );
1213
1214    /* save the block */
1215    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1216
1217    /**
1218    *** Remove the block from our `we asked for this' list
1219    **/
1220
1221    myreq = tr_list_remove( &msgs->clientAskedFor, req, compareRequest );
1222    if( myreq == NULL ) {
1223        clientGotUnwantedBlock( msgs, req );
1224        dbgmsg( msgs, "we didn't ask for this message..." );
1225        return;
1226    }
1227
1228    dbgmsg( msgs, "got block %u:%u->%u (turnaround time %d secs)", 
1229                  myreq->index, myreq->offset, myreq->length,
1230                  (int)(time(NULL) - myreq->time_requested) );
1231    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1232                  tr_list_size(msgs->clientAskedFor));
1233
1234    /**
1235    *** Error checks
1236    **/
1237
1238    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1239        dbgmsg( msgs, "we have this block already..." );
1240        clientGotUnwantedBlock( msgs, req );
1241        return;
1242    }
1243
1244    /**
1245    ***  Save the block
1246    **/
1247
1248    msgs->info->peerSentPieceDataAt = time( NULL );
1249    clientGotBytes( msgs, req->length );
1250    i = tr_ioWrite( tor, req->index, req->offset, req->length, data );
1251    if( i )
1252        return;
1253
1254#warning this sanity check is here to help track down the excess corrupt data bug, but is expensive and should be removed before the next release
1255{
1256    uint8_t * check = tr_new( uint8_t, req->length );
1257    const int val = tr_ioRead( tor, req->index, req->offset, req->length, check );
1258    assert( !val );
1259    assert( !memcmp( check, data, req->length ) );
1260    tr_free( check );
1261}
1262
1263    tr_cpBlockAdd( tor->completion, block );
1264
1265    addPeerToBlamefield( msgs, req->index );
1266
1267    fireGotBlock( msgs, req );
1268
1269    /**
1270    ***  Handle if this was the last block in the piece
1271    **/
1272
1273    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1274    {
1275        if( tr_ioHash( tor, req->index ) )
1276        {
1277            gotBadPiece( msgs, req->index );
1278            return;
1279        }
1280
1281        fireClientHave( msgs, req->index );
1282    }
1283}
1284
1285static ReadState
1286canRead( struct bufferevent * evin, void * vmsgs )
1287{
1288    ReadState ret;
1289    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1290    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1291
1292    if( !canDownload( msgs ) )
1293    {
1294        msgs->notListening = 1;
1295        tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
1296        ret = READ_DONE;
1297    }
1298    else switch( msgs->state )
1299    {
1300        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
1301        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
1302        default: assert( 0 );
1303    }
1304
1305    return ret;
1306}
1307
1308static void
1309sendKeepalive( tr_peermsgs * msgs )
1310{
1311    dbgmsg( msgs, "sending a keepalive message" );
1312    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1313}
1314
1315/**
1316***
1317**/
1318
1319static int
1320canWrite( const tr_peermsgs * msgs )
1321{
1322    /* don't let our outbuffer get too large */
1323    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 4096 )
1324        return FALSE;
1325
1326    return TRUE;
1327}
1328
1329static size_t
1330getUploadMax( const tr_peermsgs * msgs )
1331{
1332    static const size_t maxval = ~0;
1333    const tr_torrent * tor = msgs->torrent;
1334
1335    if( !canWrite( msgs ) )
1336        return 0;
1337
1338    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1339        return tor->handle->useUploadLimit
1340            ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1341
1342    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1343        return tr_rcBytesLeft( tor->upload );
1344
1345    return maxval;
1346}
1347
1348static int
1349ratePulse( void * vmsgs )
1350{
1351    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1352    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1353    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1354    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/5), 100 );
1355    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1356    return TRUE;
1357}
1358
1359static struct peer_request*
1360popNextRequest( tr_peermsgs * msgs )
1361{
1362    struct peer_request * ret;
1363    ret = tr_list_pop_front( &msgs->peerAskedForFast );
1364    if( !ret )
1365        ret = tr_list_pop_front( &msgs->peerAskedFor);
1366    return ret;
1367}
1368
1369static int
1370pulse( void * vmsgs )
1371{
1372    const time_t now = time( NULL );
1373    tr_peermsgs * msgs = vmsgs;
1374    struct peer_request * r;
1375    size_t len;
1376
1377    /* if we froze out a downloaded block because of speed limits,
1378       start listening to the peer again */
1379    if( msgs->notListening && canDownload( msgs ) )
1380    {
1381        msgs->notListening = 0;
1382        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
1383    }
1384
1385    pumpRequestQueue( msgs );
1386
1387    if( !canWrite( msgs ) )
1388    {
1389    }
1390    else if(( len = EVBUFFER_LENGTH( msgs->outBlock ) ))
1391    {
1392        const size_t uploadMax = getUploadMax( msgs );
1393        const size_t outlen = MIN( len, uploadMax );
1394        tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1395        evbuffer_drain( msgs->outBlock, outlen );
1396        msgs->clientSentAnythingAt = now;
1397        peerGotBytes( msgs, outlen );
1398        len -= outlen;
1399        dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1400        fflush( stdout );
1401    }
1402    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1403    {
1404        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1405        msgs->clientSentAnythingAt = now;
1406    }
1407    else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1408    {
1409        sendKeepalive( msgs );
1410    }
1411
1412    if( !EVBUFFER_LENGTH( msgs->outBlock )
1413        && (( r = popNextRequest( msgs )))
1414        && requestIsValid( msgs, r )
1415        && tr_cpPieceIsComplete( msgs->torrent->completion, r->index ) )
1416    {
1417        uint8_t * buf = tr_new( uint8_t, r->length );
1418
1419        if( !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1420        {
1421            tr_peerIo * io = msgs->io;
1422            struct evbuffer * out = msgs->outBlock;
1423
1424            dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
1425            tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
1426            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1427            tr_peerIoWriteUint32( io, out, r->index );
1428            tr_peerIoWriteUint32( io, out, r->offset );
1429            tr_peerIoWriteBytes ( io, out, buf, r->length );
1430        }
1431
1432        tr_free( buf );
1433        tr_free( r );
1434
1435        pulse( msgs ); /* start sending it right away */
1436    }
1437
1438    return TRUE; /* loop forever */
1439}
1440
1441static void
1442didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1443{
1444    pulse( vmsgs );
1445}
1446
1447static void
1448gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1449{
1450    dbgmsg( vmsgs, "libevent got an error! what=%d, errno=%d (%s)",
1451            (int)what, errno, strerror(errno) );
1452    fireGotError( vmsgs );
1453}
1454
1455static void
1456sendBitfield( tr_peermsgs * msgs )
1457{
1458    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1459    struct evbuffer * out = msgs->outMessages;
1460
1461    dbgmsg( msgs, "sending peer a bitfield message" );
1462    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1463    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1464    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1465}
1466
1467/**
1468***
1469**/
1470
1471/* some peers give us error messages if we send
1472   more than this many peers in a single pex message */
1473#define MAX_PEX_DIFFS 200
1474
1475typedef struct
1476{
1477    tr_pex * added;
1478    tr_pex * dropped;
1479    tr_pex * elements;
1480    int addedCount;
1481    int droppedCount;
1482    int elementCount;
1483    int diffCount;
1484}
1485PexDiffs;
1486
1487static void
1488pexAddedCb( void * vpex, void * userData )
1489{
1490    PexDiffs * diffs = (PexDiffs *) userData;
1491    tr_pex * pex = (tr_pex *) vpex;
1492    if( diffs->diffCount < MAX_PEX_DIFFS )
1493    {
1494        diffs->diffCount++;
1495        diffs->added[diffs->addedCount++] = *pex;
1496        diffs->elements[diffs->elementCount++] = *pex;
1497    }
1498}
1499
1500static void
1501pexRemovedCb( void * vpex, void * userData )
1502{
1503    PexDiffs * diffs = (PexDiffs *) userData;
1504    tr_pex * pex = (tr_pex *) vpex;
1505    if( diffs->diffCount < MAX_PEX_DIFFS )
1506    {
1507        diffs->diffCount++;
1508        diffs->dropped[diffs->droppedCount++] = *pex;
1509    }
1510}
1511
1512static void
1513pexElementCb( void * vpex, void * userData )
1514{
1515    PexDiffs * diffs = (PexDiffs *) userData;
1516    tr_pex * pex = (tr_pex *) vpex;
1517    if( diffs->diffCount < MAX_PEX_DIFFS )
1518    {
1519        diffs->diffCount++;
1520        diffs->elements[diffs->elementCount++] = *pex;
1521    }
1522}
1523
1524static void
1525sendPex( tr_peermsgs * msgs )
1526{
1527    if( msgs->peerSupportsPex && tr_torrentIsPexEnabled( msgs->torrent ) )
1528    {
1529        int i;
1530        tr_pex * newPex = NULL;
1531        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1532        PexDiffs diffs;
1533        benc_val_t val, *added, *dropped, *flags;
1534        uint8_t *tmp, *walk;
1535        char * benc;
1536        int bencLen;
1537
1538        /* build the diffs */
1539        diffs.added = tr_new( tr_pex, newCount );
1540        diffs.addedCount = 0;
1541        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1542        diffs.droppedCount = 0;
1543        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1544        diffs.elementCount = 0;
1545        diffs.diffCount = 0;
1546        tr_set_compare( msgs->pex, msgs->pexCount,
1547                        newPex, newCount,
1548                        tr_pexCompare, sizeof(tr_pex),
1549                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1550        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1551
1552        /* update peer */
1553        tr_free( msgs->pex );
1554        msgs->pex = diffs.elements;
1555        msgs->pexCount = diffs.elementCount;
1556
1557        /* build the pex payload */
1558        tr_bencInit( &val, TYPE_DICT );
1559        tr_bencDictReserve( &val, 3 );
1560
1561        /* "added" */
1562        added = tr_bencDictAdd( &val, "added" );
1563        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1564        for( i=0; i<diffs.addedCount; ++i ) {
1565            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1566            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1567        }
1568        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1569        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1570
1571        /* "added.f" */
1572        flags = tr_bencDictAdd( &val, "added.f" );
1573        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1574        for( i=0; i<diffs.addedCount; ++i )
1575            *walk++ = diffs.added[i].flags;
1576        assert( ( walk - tmp ) == diffs.addedCount );
1577        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1578
1579        /* "dropped" */
1580        dropped = tr_bencDictAdd( &val, "dropped" );
1581        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1582        for( i=0; i<diffs.droppedCount; ++i ) {
1583            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1584            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1585        }
1586        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1587        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1588
1589        /* write the pex message */
1590        benc = tr_bencSave( &val, &bencLen );
1591        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1592        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1593        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1594        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1595
1596        /* cleanup */
1597        tr_free( benc );
1598        tr_bencFree( &val );
1599        tr_free( diffs.added );
1600        tr_free( diffs.dropped );
1601        tr_free( newPex );
1602
1603        msgs->clientSentPexAt = time( NULL );
1604    }
1605}
1606
1607static int
1608pexPulse( void * vpeer )
1609{
1610    sendPex( vpeer );
1611    return TRUE;
1612}
1613
1614/**
1615***
1616**/
1617
1618tr_peermsgs*
1619tr_peerMsgsNew( struct tr_torrent * torrent,
1620                struct tr_peer    * info,
1621                tr_delivery_func    func,
1622                void              * userData,
1623                tr_publisher_tag  * setme )
1624{
1625    tr_peermsgs * m;
1626
1627    assert( info != NULL );
1628    assert( info->io != NULL );
1629
1630    m = tr_new0( tr_peermsgs, 1 );
1631    m->publisher = tr_publisherNew( );
1632    m->info = info;
1633    m->handle = torrent->handle;
1634    m->torrent = torrent;
1635    m->io = info->io;
1636    m->info->clientIsChoked = 1;
1637    m->info->peerIsChoked = 1;
1638    m->info->clientIsInterested = 0;
1639    m->info->peerIsInterested = 0;
1640    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1641    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1642    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1643    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1644    m->outMessages = evbuffer_new( );
1645    m->outBlock = evbuffer_new( );
1646    m->peerAllowedPieces = NULL;
1647    m->clientAllowedPieces = NULL;
1648    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1649   
1650    if ( tr_peerIoSupportsFEXT( m->io ) )
1651    {
1652        /* This peer is fastpeer-enabled, generate its allowed set
1653         * (before registering our callbacks) */
1654        if ( !m->peerAllowedPieces ) {
1655            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1656           
1657            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_ALLOWED_SET_COUNT,
1658                                                                 m->torrent->info.pieceCount,
1659                                                                 m->torrent->info.hash,
1660                                                                 peerAddr );
1661        }
1662        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1663    }
1664   
1665    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1666    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1667    tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
1668    ratePulse( m );
1669
1670    if ( tr_peerIoSupportsLTEP( m->io ) )
1671        sendLtepHandshake( m );
1672
1673    if ( !tr_peerIoSupportsFEXT( m->io ) )
1674        sendBitfield( m );
1675    else {
1676        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1677        uint32_t peerProgress;
1678        float completion = tr_cpPercentComplete( m->torrent->completion );
1679        if ( completion == 0.0f ) {
1680            sendFastHave( m, 0 );
1681        } else if ( completion == 1.0f ) {
1682            sendFastHave( m, 1 );
1683        } else {
1684            sendBitfield( m );
1685        }
1686        peerProgress = m->torrent->info.pieceCount * m->info->progress;
1687       
1688        if ( peerProgress < MAX_ALLOWED_SET_COUNT )
1689            sendFastAllowedSet( m );
1690    }
1691
1692    return m;
1693}
1694
1695void
1696tr_peerMsgsFree( tr_peermsgs* msgs )
1697{
1698    if( msgs != NULL )
1699    {
1700        tr_timerFree( &msgs->pulseTimer );
1701        tr_timerFree( &msgs->rateTimer );
1702        tr_timerFree( &msgs->pexTimer );
1703        tr_publisherFree( &msgs->publisher );
1704        tr_list_free( &msgs->clientWillAskFor, tr_free );
1705        tr_list_free( &msgs->clientAskedFor, tr_free );
1706        tr_list_free( &msgs->peerAskedForFast, tr_free );
1707        tr_list_free( &msgs->peerAskedFor, tr_free );
1708        evbuffer_free( msgs->outMessages );
1709        evbuffer_free( msgs->outBlock );
1710        tr_free( msgs->pex );
1711        msgs->pexCount = 0;
1712        tr_free( msgs );
1713    }
1714}
1715
1716tr_publisher_tag
1717tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1718                      tr_delivery_func    func,
1719                      void              * userData )
1720{
1721    return tr_publisherSubscribe( peer->publisher, func, userData );
1722}
1723
1724void
1725tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1726                        tr_publisher_tag    tag )
1727{
1728    tr_publisherUnsubscribe( peer->publisher, tag );
1729}
1730
1731int
1732tr_peerMsgIsPieceFastAllowed( const tr_peermsgs * peer,
1733                              uint32_t            index )
1734{
1735    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1736}
Note: See TracBrowser for help on using the repository browser.