source: trunk/libtransmission/peer-msgs.c @ 3895

Last change on this file since 3895 was 3895, checked in by charles, 15 years ago

fix the crash reported by pea_, Gimp, and John_Clay [ref: http://pastebin.ca/784834]

  • Property svn:keywords set to Date Rev Author Id
File size: 54.9 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3895 2007-11-19 21:44:38Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <errno.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <netinet/in.h> /* struct in_addr */
22
23#include <event.h>
24
25#include "transmission.h"
26#include "bencode.h"
27#include "completion.h"
28#include "inout.h"
29#include "list.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "trevent.h"
36#include "utils.h"
37
38/**
39***
40**/
41
42enum
43{
44    BT_CHOKE                = 0,
45    BT_UNCHOKE              = 1,
46    BT_INTERESTED           = 2,
47    BT_NOT_INTERESTED       = 3,
48    BT_HAVE                 = 4,
49    BT_BITFIELD             = 5,
50    BT_REQUEST              = 6,
51    BT_PIECE                = 7,
52    BT_CANCEL               = 8,
53    BT_PORT                 = 9,
54    BT_SUGGEST              = 13,
55    BT_HAVE_ALL             = 14,
56    BT_HAVE_NONE            = 15,
57    BT_REJECT               = 16,
58    BT_ALLOWED_FAST         = 17,
59    BT_LTEP                 = 20,
60
61    LTEP_HANDSHAKE          = 0,
62
63    OUR_LTEP_PEX            = 1,
64
65    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
66
67    /* idle seconds before we send a keepalive */
68    KEEPALIVE_INTERVAL_SECS = 90,
69
70    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
71    PEER_PULSE_INTERVAL     = (250),       /* msec between calls to pulse() */
72    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
73     
74    /* Fast Peers Extension constants */
75    MAX_FAST_ALLOWED_COUNT   = 10,          /* max. number of pieces we fast-allow to another peer */
76    MAX_FAST_ALLOWED_THRESHOLD = 10,        /* max threshold for allowing fast-pieces requests */
77
78};
79
80enum
81{
82    AWAITING_BT_LENGTH,
83    AWAITING_BT_ID,
84    AWAITING_BT_MESSAGE,
85    AWAITING_BT_PIECE
86};
87
88struct peer_request
89{
90    uint32_t index;
91    uint32_t offset;
92    uint32_t length;
93    time_t time_requested;
94};
95
96static int
97compareRequest( const void * va, const void * vb )
98{
99    int i;
100    const struct peer_request * a = va;
101    const struct peer_request * b = vb;
102    if(( i = tr_compareUint32( a->index, b->index ))) return i;
103    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
104    if(( i = tr_compareUint32( a->length, b->length ))) return i;
105    return 0;
106}
107
108/* this is raw, unchanged data from the peer regarding
109 * the current message that it's sending us. */
110struct tr_incoming
111{
112    uint32_t length; /* includes the +1 for id length */
113    uint8_t id;
114    struct peer_request blockReq; /* metadata for incoming blocks */
115    struct evbuffer * block; /* piece data for incoming blocks */
116};
117
118struct tr_peermsgs
119{
120    tr_peer * info;
121
122    tr_handle * handle;
123    tr_torrent * torrent;
124    tr_peerIo * io;
125
126    tr_publisher_t * publisher;
127
128    struct evbuffer * outBlock;    /* buffer of all the current piece message */
129    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
130    tr_list * peerAskedFor;
131    tr_list * peerAskedForFast;
132    tr_list * clientAskedFor;
133    tr_list * clientWillAskFor;
134
135    tr_timer * rateTimer;
136    tr_timer * pulseTimer;
137    tr_timer * pexTimer;
138
139    time_t lastReqAddedAt;
140    time_t clientSentPexAt;
141    time_t clientSentAnythingAt;
142
143    unsigned int peerSentBitfield         : 1;
144    unsigned int peerSupportsPex          : 1;
145    unsigned int clientSentLtepHandshake  : 1;
146    unsigned int peerSentLtepHandshake    : 1;
147    unsigned int sendingBlock             : 1;
148   
149    tr_bitfield * clientAllowedPieces;
150    tr_bitfield * peerAllowedPieces;
151   
152    tr_bitfield * clientSuggestedPieces;
153   
154    uint8_t state;
155    uint8_t ut_pex_id;
156    uint16_t pexCount;
157    uint32_t maxActiveRequests;
158    uint32_t minActiveRequests;
159
160    struct tr_incoming incoming; 
161
162    tr_pex * pex;
163};
164
165/**
166***
167**/
168
169static void
170myDebug( const char * file, int line,
171         const struct tr_peermsgs * msgs,
172         const char * fmt, ... )
173{
174    FILE * fp = tr_getLog( );
175    if( fp != NULL )
176    {
177        va_list args;
178        char timestr[64];
179        struct evbuffer * buf = evbuffer_new( );
180        char * myfile = tr_strdup( file );
181
182        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
183                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
184                             msgs->torrent->info.name,
185                             tr_peerIoGetAddrStr( msgs->io ),
186                             msgs->info->client );
187        va_start( args, fmt );
188        evbuffer_add_vprintf( buf, fmt, args );
189        va_end( args );
190        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
191        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
192
193        tr_free( myfile );
194        evbuffer_free( buf );
195    }
196}
197
198#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
199
200/**
201***
202**/
203
204static void
205protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
206{
207    tr_peerIo * io = msgs->io;
208    struct evbuffer * out = msgs->outMessages;
209
210    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
211    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
212    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
213    tr_peerIoWriteUint32( io, out, req->index );
214    tr_peerIoWriteUint32( io, out, req->offset );
215    tr_peerIoWriteUint32( io, out, req->length );
216}
217
218static void
219protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
220{
221    tr_peerIo * io = msgs->io;
222    struct evbuffer * out = msgs->outMessages;
223
224    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
225    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
226    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
227    tr_peerIoWriteUint32( io, out, req->index );
228    tr_peerIoWriteUint32( io, out, req->offset );
229    tr_peerIoWriteUint32( io, out, req->length );
230}
231
232static void
233protocolSendHave( tr_peermsgs * msgs, uint32_t index )
234{
235    tr_peerIo * io = msgs->io;
236    struct evbuffer * out = msgs->outMessages;
237
238    dbgmsg( msgs, "sending Have %u", index );
239    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
240    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
241    tr_peerIoWriteUint32( io, out, index );
242}
243
244static void
245protocolSendChoke( tr_peermsgs * msgs, int choke )
246{
247    tr_peerIo * io = msgs->io;
248    struct evbuffer * out = msgs->outMessages;
249
250    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
251    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
252    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
253}
254
255/**
256***  EVENTS
257**/
258
259static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
260
261static void
262publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
263{
264    tr_publisherPublish( msgs->publisher, msgs->info, e );
265}
266
267static void
268fireGotAssertError( tr_peermsgs * msgs )
269{
270    tr_peermsgs_event e = blankEvent;
271    e.eventType = TR_PEERMSG_GOT_ASSERT_ERROR;
272    publish( msgs, &e );
273}
274
275static void
276fireGotError( tr_peermsgs * msgs )
277{
278    tr_peermsgs_event e = blankEvent;
279    e.eventType = TR_PEERMSG_GOT_ERROR;
280    publish( msgs, &e );
281}
282
283static void
284fireNeedReq( tr_peermsgs * msgs )
285{
286    tr_peermsgs_event e = blankEvent;
287    e.eventType = TR_PEERMSG_NEED_REQ;
288    publish( msgs, &e );
289}
290
291static void
292firePeerProgress( tr_peermsgs * msgs )
293{
294    tr_peermsgs_event e = blankEvent;
295    e.eventType = TR_PEERMSG_PEER_PROGRESS;
296    e.progress = msgs->info->progress;
297    publish( msgs, &e );
298}
299
300static void
301fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
302{
303    tr_peermsgs_event e = blankEvent;
304    e.eventType = TR_PEERMSG_CLIENT_HAVE;
305    e.pieceIndex = pieceIndex;
306    publish( msgs, &e );
307}
308
309static void
310fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
311{
312    tr_peermsgs_event e = blankEvent;
313    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
314    e.pieceIndex = req->index;
315    e.offset = req->offset;
316    e.length = req->length;
317    publish( msgs, &e );
318}
319
320static void
321fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
322{
323    tr_peermsgs_event e = blankEvent;
324    e.eventType = TR_PEERMSG_CANCEL;
325    e.pieceIndex = req->index;
326    e.offset = req->offset;
327    e.length = req->length;
328    publish( msgs, &e );
329}
330
331/**
332***  INTEREST
333**/
334
335static int
336isPieceInteresting( const tr_peermsgs   * peer,
337                    int                   piece )
338{
339    const tr_torrent * torrent = peer->torrent;
340    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
341        return FALSE;
342    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
343        return FALSE;
344    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
345        return FALSE;
346    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
347        return FALSE;
348    return TRUE;
349}
350
351/* "interested" means we'll ask for piece data if they unchoke us */
352static int
353isPeerInteresting( const tr_peermsgs * msgs )
354{
355    int i;
356    const tr_torrent * torrent;
357    const tr_bitfield * bitfield;
358    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
359
360    if( clientIsSeed )
361        return FALSE;
362
363    torrent = msgs->torrent;
364    bitfield = tr_cpPieceBitfield( torrent->completion );
365
366    if( !msgs->info->have )
367        return TRUE;
368
369    assert( bitfield->len == msgs->info->have->len );
370    for( i=0; i<torrent->info.pieceCount; ++i )
371        if( isPieceInteresting( msgs, i ) )
372            return TRUE;
373
374    return FALSE;
375}
376
377static void
378sendInterest( tr_peermsgs * msgs, int weAreInterested )
379{
380    assert( msgs != NULL );
381    assert( weAreInterested==0 || weAreInterested==1 );
382
383    msgs->info->clientIsInterested = weAreInterested;
384    dbgmsg( msgs, "Sending %s",
385            weAreInterested ? "Interested" : "Not Interested");
386
387    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
388    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
389                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
390}
391
392static void
393updateInterest( tr_peermsgs * msgs )
394{
395    const int i = isPeerInteresting( msgs );
396    if( i != msgs->info->clientIsInterested )
397        sendInterest( msgs, i );
398    if( i )
399        fireNeedReq( msgs );
400}
401
402#define MIN_CHOKE_PERIOD_SEC 10
403
404static void
405cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
406{
407    tr_list_free( &msgs->peerAskedFor, tr_free );
408}
409
410void
411tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
412{
413    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
414
415    assert( msgs != NULL );
416    assert( msgs->info != NULL );
417    assert( choke==0 || choke==1 );
418
419    if( msgs->info->chokeChangedAt > fibrillationTime )
420    {
421        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
422    }
423    else if( msgs->info->peerIsChoked != choke )
424    {
425        msgs->info->peerIsChoked = choke;
426        if( choke )
427            cancelAllRequestsToClientExceptFast( msgs );
428        protocolSendChoke( msgs, choke );
429        msgs->info->chokeChangedAt = time( NULL );
430    }
431}
432
433/**
434***
435**/
436
437void
438tr_peerMsgsHave( tr_peermsgs * msgs,
439                 uint32_t      index )
440{
441    protocolSendHave( msgs, index );
442
443    /* since we have more pieces now, we might not be interested in this peer */
444    updateInterest( msgs );
445}
446#if 0
447static void
448sendFastSuggest( tr_peermsgs * msgs,
449                 uint32_t      pieceIndex )
450{
451    assert( msgs != NULL );
452   
453    if( tr_peerIoSupportsFEXT( msgs->io ) )
454    {
455        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
456        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
457        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
458    }
459}
460#endif
461static void
462sendFastHave( tr_peermsgs * msgs, int all )
463{
464    assert( msgs != NULL );
465   
466    if( tr_peerIoSupportsFEXT( msgs->io ) )
467    {
468        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
469        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
470                                                                : BT_HAVE_NONE ) );
471        updateInterest( msgs );
472    }
473}
474
475static void
476sendFastReject( tr_peermsgs * msgs,
477                uint32_t      pieceIndex,
478                uint32_t      offset,
479                uint32_t      length )
480{
481    assert( msgs != NULL );
482
483    if( tr_peerIoSupportsFEXT( msgs->io ) )
484    {
485        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
486        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
487        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
488        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
489        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
490        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
491    }
492}
493
494static void
495sendFastAllowed( tr_peermsgs * msgs,
496                 uint32_t      pieceIndex)
497{
498    assert( msgs != NULL );
499   
500    if( tr_peerIoSupportsFEXT( msgs->io ) )
501    {
502        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
503        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
504        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
505    }
506}
507
508
509
510static void
511sendFastAllowedSet( tr_peermsgs * msgs )
512{
513    int i = 0;
514    while (i <= msgs->torrent->info.pieceCount )
515    {
516        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
517            sendFastAllowed( msgs, i );
518        i++;
519    }
520}
521
522
523/**
524***
525**/
526
527static int
528reqIsValid( const tr_peermsgs   * msgs,
529            uint32_t              index,
530            uint32_t              offset,
531            uint32_t              length )
532{
533    const tr_torrent * tor = msgs->torrent;
534
535    if( index >= (uint32_t) tor->info.pieceCount )
536        return FALSE;
537    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
538        return FALSE;
539    if( length > MAX_REQUEST_BYTE_COUNT )
540        return FALSE;
541    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
542        return FALSE;
543
544    return TRUE;
545}
546
547static int
548requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
549{
550    return reqIsValid( msgs, req->index, req->offset, req->length );
551}
552
553static void
554pumpRequestQueue( tr_peermsgs * msgs )
555{
556    const int max = msgs->maxActiveRequests;
557    const int min = msgs->minActiveRequests;
558    int count = tr_list_size( msgs->clientAskedFor );
559    int sent = 0;
560
561    if( count > min )
562        return;
563    if( msgs->info->clientIsChoked )
564        return;
565
566    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
567    {
568        struct peer_request * r = tr_list_pop_front( &msgs->clientWillAskFor );
569        assert( requestIsValid( msgs, r ) );
570        assert( tr_bitfieldHas( msgs->info->have, r->index ) );
571        protocolSendRequest( msgs, r );
572        r->time_requested = msgs->lastReqAddedAt = time( NULL );
573        tr_list_append( &msgs->clientAskedFor, r );
574        ++count;
575        ++sent;
576    }
577
578    if( sent )
579        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
580                sent,
581                tr_list_size(msgs->clientAskedFor),
582                tr_list_size(msgs->clientWillAskFor) );
583
584    if( count < max )
585        fireNeedReq( msgs );
586}
587
588static int
589pulse( void * vmsgs );
590
591int
592tr_peerMsgsAddRequest( tr_peermsgs * msgs,
593                       uint32_t      index, 
594                       uint32_t      offset, 
595                       uint32_t      length )
596{
597    const int req_max = msgs->maxActiveRequests;
598    struct peer_request tmp, *req;
599
600    assert( msgs != NULL );
601    assert( msgs->torrent != NULL );
602    assert( reqIsValid( msgs, index, offset, length ) );
603
604    /**
605    ***  Reasons to decline the request
606    **/
607
608    /* don't send requests to choked clients */
609    if( msgs->info->clientIsChoked ) {
610        dbgmsg( msgs, "declining request because they're choking us" );
611        return TR_ADDREQ_CLIENT_CHOKED;
612    }
613
614    /* peer doesn't have this piece */
615    if( !tr_bitfieldHas( msgs->info->have, index ) )
616        return TR_ADDREQ_MISSING;
617
618    /* peer's queue is full */
619    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
620        dbgmsg( msgs, "declining request because we're full" );
621        return TR_ADDREQ_FULL;
622    }
623
624    /* have we already asked for this piece? */
625    tmp.index = index;
626    tmp.offset = offset;
627    tmp.length = length;
628    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
629        dbgmsg( msgs, "declining because it's a duplicate" );
630        return TR_ADDREQ_DUPLICATE;
631    }
632    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
633        dbgmsg( msgs, "declining because it's a duplicate" );
634        return TR_ADDREQ_DUPLICATE;
635    }
636
637    /**
638    ***  Accept this request
639    **/
640
641    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
642    req = tr_new0( struct peer_request, 1 );
643    *req = tmp;
644    tr_list_append( &msgs->clientWillAskFor, req );
645    return TR_ADDREQ_OK;
646}
647
648static void
649cancelAllRequestsToPeer( tr_peermsgs * msgs )
650{
651    struct peer_request * req;
652
653    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
654    {
655        fireCancelledReq( msgs, req );
656        tr_free( req );
657    }
658
659    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
660    {
661        fireCancelledReq( msgs, req );
662        protocolSendCancel( msgs, req );
663        tr_free( req );
664    }
665}
666
667void
668tr_peerMsgsCancel( tr_peermsgs * msgs,
669                   uint32_t      pieceIndex,
670                   uint32_t      offset,
671                   uint32_t      length )
672{
673    struct peer_request *req, tmp;
674
675    assert( msgs != NULL );
676    assert( length > 0 );
677
678    /* have we asked the peer for this piece? */
679    tmp.index = pieceIndex;
680    tmp.offset = offset;
681    tmp.length = length;
682
683    /* if it's only in the queue and hasn't been sent yet, free it */
684    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
685    {
686        fireCancelledReq( msgs, req );
687        tr_free( req );
688    }
689
690    /* if it's already been sent, send a cancel message too */
691    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
692    {
693        protocolSendCancel( msgs, req );
694        fireCancelledReq( msgs, req );
695        tr_free( req );
696    }
697}
698
699/**
700***
701**/
702
703static void
704sendLtepHandshake( tr_peermsgs * msgs )
705{
706    benc_val_t val, *m;
707    char * buf;
708    int len;
709    int pex;
710    const char * v = TR_NAME " " USERAGENT_PREFIX;
711    const int port = tr_getPublicPort( msgs->handle );
712    struct evbuffer * outbuf;
713
714    if( msgs->clientSentLtepHandshake )
715        return;
716
717    outbuf = evbuffer_new( );
718    dbgmsg( msgs, "sending an ltep handshake" );
719    msgs->clientSentLtepHandshake = 1;
720
721    /* decide if we want to advertise pex support */
722    if( !tr_torrentIsPexEnabled( msgs->torrent ) )
723        pex = 0;
724    else if( msgs->peerSentLtepHandshake )
725        pex = msgs->peerSupportsPex ? 1 : 0;
726    else
727        pex = 1;
728
729    tr_bencInit( &val, TYPE_DICT );
730    tr_bencDictReserve( &val, 4 );
731    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
732    m  = tr_bencDictAdd( &val, "m" );
733    tr_bencInit( m, TYPE_DICT );
734    if( pex ) {
735        tr_bencDictReserve( m, 1 );
736        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
737    }
738    if( port > 0 )
739        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
740    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
741    buf = tr_bencSave( &val, &len );
742
743    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
744    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
745    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
746    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
747
748    tr_peerIoWriteBuf( msgs->io, outbuf );
749
750#if 0
751    dbgmsg( msgs, "here is the ltep handshake we sent:" );
752    tr_bencPrint( &val );
753    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
754#endif
755
756    /* cleanup */
757    tr_bencFree( &val );
758    tr_free( buf );
759    evbuffer_free( outbuf );
760}
761
762static void
763parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
764{
765    benc_val_t val, * sub;
766    uint8_t * tmp = tr_new( uint8_t, len );
767
768    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
769    msgs->peerSentLtepHandshake = 1;
770
771    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
772        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
773        tr_free( tmp );
774        return;
775    }
776
777#if 0
778    dbgmsg( msgs, "here is the ltep handshake we read:" );
779    tr_bencPrint( &val );
780    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
781#endif
782
783    /* does the peer prefer encrypted connections? */
784    sub = tr_bencDictFind( &val, "e" );
785    if( tr_bencIsInt( sub ) )
786        msgs->info->encryption_preference = sub->val.i
787                                      ? ENCRYPTION_PREFERENCE_YES
788                                      : ENCRYPTION_PREFERENCE_NO;
789
790    /* check supported messages for utorrent pex */
791    sub = tr_bencDictFind( &val, "m" );
792    if( tr_bencIsDict( sub ) ) {
793        sub = tr_bencDictFind( sub, "ut_pex" );
794        if( tr_bencIsInt( sub ) ) {
795            msgs->peerSupportsPex = 1;
796            msgs->ut_pex_id = (uint8_t) sub->val.i;
797            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
798        }
799    }
800
801    /* get peer's listening port */
802    sub = tr_bencDictFind( &val, "p" );
803    if( tr_bencIsInt( sub ) ) {
804        msgs->info->port = htons( (uint16_t)sub->val.i );
805        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
806    }
807
808    tr_bencFree( &val );
809    tr_free( tmp );
810}
811
812static void
813parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
814{
815    benc_val_t val, * sub;
816    uint8_t * tmp;
817
818    if( !tr_torrentIsPexEnabled( msgs->torrent ) ) /* no sharing! */
819        return;
820
821    tmp = tr_new( uint8_t, msglen );
822    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
823
824    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
825        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
826        tr_free( tmp );
827        return;
828    }
829
830    sub = tr_bencDictFind( &val, "added" );
831    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
832        const int n = sub->val.s.i / 6 ;
833        dbgmsg( msgs, "got %d peers from uT pex", n );
834        tr_peerMgrAddPeers( msgs->handle->peerMgr,
835                            msgs->torrent->info.hash,
836                            TR_PEER_FROM_PEX,
837                            (uint8_t*)sub->val.s.s, n );
838    }
839
840    tr_bencFree( &val );
841    tr_free( tmp );
842}
843
844static void
845sendPex( tr_peermsgs * msgs );
846
847static void
848parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
849{
850    uint8_t ltep_msgid;
851
852    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
853    msglen--;
854
855    if( ltep_msgid == LTEP_HANDSHAKE )
856    {
857        dbgmsg( msgs, "got ltep handshake" );
858        parseLtepHandshake( msgs, msglen, inbuf );
859        if( tr_peerIoSupportsLTEP( msgs->io ) )
860        {
861            sendLtepHandshake( msgs );
862            sendPex( msgs );
863        }
864    }
865    else if( ltep_msgid == msgs->ut_pex_id )
866    {
867        dbgmsg( msgs, "got ut pex" );
868        msgs->peerSupportsPex = 1;
869        parseUtPex( msgs, msglen, inbuf );
870    }
871    else
872    {
873        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
874        evbuffer_drain( inbuf, msglen );
875    }
876}
877
878static int
879readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
880{
881    uint32_t len;
882
883    if( inlen < sizeof(len) )
884        return READ_MORE;
885
886    tr_peerIoReadUint32( msgs->io, inbuf, &len );
887
888    if( len == 0 ) /* peer sent us a keepalive message */
889        dbgmsg( msgs, "got KeepAlive" );
890    else {
891        msgs->incoming.length = len;
892        msgs->state = AWAITING_BT_ID;
893    }
894
895    return READ_AGAIN;
896}
897
898static int
899readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
900
901static int
902readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
903{
904    uint8_t id;
905
906    if( inlen < sizeof(uint8_t) )
907        return READ_MORE;
908
909    tr_peerIoReadUint8( msgs->io, inbuf, &id );
910    msgs->incoming.id = id;
911
912    if( id==BT_PIECE )
913    {
914        msgs->state = AWAITING_BT_PIECE;
915        return READ_AGAIN;
916    }
917    else if( msgs->incoming.length != 1 )
918    {
919        msgs->state = AWAITING_BT_MESSAGE;
920        return READ_AGAIN;
921    }
922    else return readBtMessage( msgs, inbuf, inlen-1 );
923}
924
925static void
926updatePeerProgress( tr_peermsgs * msgs )
927{
928    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
929    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
930    updateInterest( msgs );
931    firePeerProgress( msgs );
932}
933
934static int
935clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
936{
937    /* We can't send a fast piece if a peer has more than MAX_FAST_ALLOWED_THRESHOLD pieces */
938    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
939        return FALSE;
940   
941    /* ...or if we don't have ourself enough pieces */
942    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
943        return FALSE;
944
945    /* Maybe a bandwidth limit ? */
946    return TRUE;
947}
948
949static void
950peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
951{
952    const int reqIsValid = requestIsValid( msgs, req );
953    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
954    const int peerIsChoked = msgs->info->peerIsChoked;
955    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
956    const int pieceIsFast = reqIsValid && tr_bitfieldHas( msgs->peerAllowedPieces, req->index );
957    const int canSendFast = clientCanSendFastBlock( msgs );
958
959    if( !reqIsValid ) /* bad request */
960    {
961        dbgmsg( msgs, "rejecting an invalid request." );
962        sendFastReject( msgs, req->index, req->offset, req->length );
963    }
964    else if( !clientHasPiece ) /* we don't have it */
965    {
966        dbgmsg( msgs, "rejecting request for a piece we don't have." );
967        sendFastReject( msgs, req->index, req->offset, req->length );
968    }
969    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
970    {
971        tr_peerMsgsSetChoke( msgs, 1 );
972        sendFastReject( msgs, req->index, req->offset, req->length );
973    }
974    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
975    {
976        sendFastReject( msgs, req->index, req->offset, req->length );
977    }
978    else /* YAY */
979    {
980        struct peer_request * tmp = tr_new( struct peer_request, 1 );
981        *tmp = *req;
982        if( peerIsFast && pieceIsFast )
983            tr_list_append( &msgs->peerAskedForFast, tmp );
984        else
985            tr_list_append( &msgs->peerAskedFor, tmp );
986    }
987}
988
989static int
990messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
991{
992    switch( id )
993    {
994        case BT_CHOKE:
995        case BT_UNCHOKE:
996        case BT_INTERESTED:
997        case BT_NOT_INTERESTED:
998        case BT_HAVE_ALL:
999        case BT_HAVE_NONE:
1000            return len==1;
1001
1002        case BT_HAVE:
1003        case BT_SUGGEST:
1004        case BT_ALLOWED_FAST:
1005            return len==5;
1006
1007        case BT_BITFIELD:
1008            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1009       
1010        case BT_REQUEST:
1011        case BT_CANCEL:
1012        case BT_REJECT:
1013            return len==13;
1014
1015        case BT_PIECE:
1016            return len>9 && len<=16393;
1017
1018        case BT_PORT:
1019            return len==3;
1020
1021        case BT_LTEP:
1022            return len >= 2;
1023
1024        default:
1025            return FALSE;
1026    }
1027}
1028
1029static int
1030clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1031
1032static void
1033clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1034{
1035    tr_torrent * tor = msgs->torrent;
1036    tor->activityDate = tr_date( );
1037    tor->downloadedCur += byteCount;
1038    msgs->info->pieceDataActivityDate = time( NULL );
1039    msgs->info->credit += (int)(byteCount * SWIFT_REPAYMENT_RATIO);
1040    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1041    tr_rcTransferred( tor->download, byteCount );
1042    tr_rcTransferred( tor->handle->download, byteCount );
1043}
1044
1045
1046static int
1047readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1048{
1049    struct peer_request * req = &msgs->incoming.blockReq;
1050    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1051    dbgmsg( msgs, "In readBtPiece" );
1052
1053    if( !req->length )
1054    {
1055        if( inlen < 8 )
1056            return READ_MORE;
1057
1058        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1059        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1060        req->length = msgs->incoming.length - 9;
1061        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1062        return READ_AGAIN;
1063    }
1064    else
1065    {
1066        int err;
1067
1068        /* read in another chunk of data */
1069        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1070        size_t n = MIN( nLeft, inlen );
1071        uint8_t * buf = tr_new( uint8_t, n );
1072        assert( EVBUFFER_LENGTH(inbuf) >= n );
1073        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1074        evbuffer_add( msgs->incoming.block, buf, n );
1075        clientGotBytes( msgs, n );
1076        tr_free( buf );
1077        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1078               (int)n, req->index, req->offset, req->length,
1079               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1080        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1081            return READ_MORE;
1082
1083        /* we've got the whole block ... process it */
1084        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1085
1086        /* cleanup */
1087        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1088        req->length = 0;
1089        msgs->state = AWAITING_BT_LENGTH;
1090        if( !err )
1091            return READ_AGAIN;
1092        else {
1093            fireGotAssertError( msgs );
1094            return READ_DONE;
1095        }
1096    }
1097}
1098
1099static int
1100readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1101{
1102    uint32_t ui32;
1103    uint32_t msglen = msgs->incoming.length;
1104    const uint8_t id = msgs->incoming.id;
1105    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1106
1107    --msglen; // id length
1108
1109    if( inlen < msglen )
1110        return READ_MORE;
1111
1112    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1113
1114    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1115    {
1116        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1117        fireGotError( msgs );
1118        return READ_DONE;
1119    }
1120
1121    switch( id )
1122    {
1123        case BT_CHOKE:
1124            dbgmsg( msgs, "got Choke" );
1125            msgs->info->clientIsChoked = 1;
1126            cancelAllRequestsToPeer( msgs );
1127            cancelAllRequestsToClientExceptFast( msgs );
1128            break;
1129
1130        case BT_UNCHOKE:
1131            dbgmsg( msgs, "got Unchoke" );
1132            msgs->info->clientIsChoked = 0;
1133            fireNeedReq( msgs );
1134            break;
1135
1136        case BT_INTERESTED:
1137            dbgmsg( msgs, "got Interested" );
1138            msgs->info->peerIsInterested = 1;
1139            tr_peerMsgsSetChoke( msgs, 0 );
1140            break;
1141
1142        case BT_NOT_INTERESTED:
1143            dbgmsg( msgs, "got Not Interested" );
1144            msgs->info->peerIsInterested = 0;
1145            break;
1146           
1147        case BT_HAVE:
1148            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1149            dbgmsg( msgs, "got Have: %u", ui32 );
1150            tr_bitfieldAdd( msgs->info->have, ui32 );
1151            /* If this is a fast-allowed piece for this peer, mark it as normal now */
1152            if( msgs->clientAllowedPieces != NULL && tr_bitfieldHas( msgs->clientAllowedPieces, ui32 ) )
1153                tr_bitfieldRem( msgs->clientAllowedPieces, ui32 );
1154            updatePeerProgress( msgs );
1155            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1156            break;
1157
1158        case BT_BITFIELD: {
1159            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1160            dbgmsg( msgs, "got a bitfield" );
1161            msgs->peerSentBitfield = 1;
1162            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1163            updatePeerProgress( msgs );
1164            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
1165            fireNeedReq( msgs );
1166            break;
1167        }
1168
1169        case BT_REQUEST: {
1170            struct peer_request req;
1171            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1172            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1173            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1174            dbgmsg( msgs, "got Request: %u:%u->%u", req.index, req.offset, req.length );
1175            peerMadeRequest( msgs, &req );
1176            break;
1177        }
1178
1179        case BT_CANCEL: {
1180            struct peer_request req;
1181            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1182            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1183            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1184            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1185            tr_free( tr_list_remove( &msgs->peerAskedForFast, &req, compareRequest ) );
1186            tr_free( tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ) );
1187            break;
1188        }
1189
1190        case BT_PIECE:
1191            assert( 0 ); /* should be handled elsewhere! */
1192            break;
1193       
1194        case BT_PORT:
1195            dbgmsg( msgs, "Got a BT_PORT" );
1196            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1197            break;
1198           
1199        case BT_SUGGEST: {
1200            dbgmsg( msgs, "Got a BT_SUGGEST" );
1201            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1202            const tr_bitfield * bt = tr_cpPieceBitfield( msgs->torrent->completion );
1203            if( tr_bitfieldHas( bt, ui32 ) )
1204                tr_bitfieldAdd( msgs->clientSuggestedPieces, ui32 );
1205            break;
1206        }
1207           
1208        case BT_HAVE_ALL:
1209            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1210            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1211            updatePeerProgress( msgs );
1212            break;
1213       
1214       
1215        case BT_HAVE_NONE:
1216            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1217            tr_bitfieldClear( msgs->info->have );
1218            updatePeerProgress( msgs );
1219            break;
1220       
1221        case BT_REJECT: {
1222            struct peer_request req;
1223            dbgmsg( msgs, "Got a BT_REJECT" );
1224            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1225            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1226            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1227            tr_free( tr_list_remove( &msgs->clientAskedFor, &req, compareRequest ) );
1228            break;
1229        }
1230
1231        case BT_ALLOWED_FAST: {
1232            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1233            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1234            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1235            break;
1236        }
1237
1238        case BT_LTEP:
1239            dbgmsg( msgs, "Got a BT_LTEP" );
1240            parseLtep( msgs, msglen, inbuf );
1241            break;
1242
1243        default:
1244            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1245            tr_peerIoDrain( msgs->io, inbuf, msglen );
1246            break;
1247    }
1248
1249    assert( msglen + 1 == msgs->incoming.length );
1250    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1251
1252    msgs->state = AWAITING_BT_LENGTH;
1253    return READ_AGAIN;
1254}
1255
1256static void
1257peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1258{
1259    tr_torrent * tor = msgs->torrent;
1260    tor->activityDate = tr_date( );
1261    tor->uploadedCur += byteCount;
1262    msgs->info->pieceDataActivityDate = time( NULL );
1263    msgs->info->credit -= byteCount;
1264    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1265    tr_rcTransferred( tor->upload, byteCount );
1266    tr_rcTransferred( tor->handle->upload, byteCount );
1267}
1268
1269static size_t
1270getDownloadMax( const tr_peermsgs * msgs )
1271{
1272    static const size_t maxval = ~0;
1273    const tr_torrent * tor = msgs->torrent;
1274
1275    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1276        return tor->handle->useDownloadLimit
1277            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1278
1279    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1280        return tr_rcBytesLeft( tor->download );
1281
1282    return maxval;
1283}
1284
1285static void
1286reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1287{
1288    tr_torrent * tor = msgs->torrent;
1289
1290    /* increment the `corrupt' field */
1291    tor->corruptCur += byteCount;
1292
1293    /* decrement the `downloaded' field */
1294    if( tor->downloadedCur >= byteCount )
1295        tor->downloadedCur -= byteCount;
1296    else
1297        tor->downloadedCur = 0;
1298}
1299
1300
1301static void
1302gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1303{
1304    const uint32_t byteCount =
1305        tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1306    reassignBytesToCorrupt( msgs, byteCount );
1307}
1308
1309static void
1310clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1311{
1312    reassignBytesToCorrupt( msgs, req->length );
1313}
1314
1315static void
1316addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1317{
1318    if( !msgs->info->blame )
1319         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1320    tr_bitfieldAdd( msgs->info->blame, index );
1321}
1322
1323static int
1324clientGotBlock( tr_peermsgs                * msgs,
1325                const uint8_t              * data,
1326                const struct peer_request  * req )
1327{
1328    int i;
1329    tr_torrent * tor = msgs->torrent;
1330    const int block = _tr_block( tor, req->index, req->offset );
1331    struct peer_request *myreq;
1332
1333    assert( msgs != NULL );
1334    assert( req != NULL );
1335
1336    if( req->length != (uint32_t)tr_torBlockCountBytes( msgs->torrent, block ) )
1337    {
1338        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1339                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1340        return TR_ERROR_ASSERT;
1341    }
1342
1343    /* save the block */
1344    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1345
1346    /**
1347    *** Remove the block from our `we asked for this' list
1348    **/
1349
1350    myreq = tr_list_remove( &msgs->clientAskedFor, req, compareRequest );
1351    if( myreq == NULL ) {
1352        clientGotUnwantedBlock( msgs, req );
1353        dbgmsg( msgs, "we didn't ask for this message..." );
1354        return 0;
1355    }
1356
1357    dbgmsg( msgs, "got block %u:%u->%u (turnaround time %d secs)", 
1358                  myreq->index, myreq->offset, myreq->length,
1359                  (int)(time(NULL) - myreq->time_requested) );
1360    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1361                  tr_list_size(msgs->clientAskedFor));
1362
1363    tr_free( myreq );
1364    myreq = NULL;
1365
1366    /**
1367    *** Error checks
1368    **/
1369
1370    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1371        dbgmsg( msgs, "we have this block already..." );
1372        clientGotUnwantedBlock( msgs, req );
1373        return 0;
1374    }
1375
1376    /**
1377    ***  Save the block
1378    **/
1379
1380    msgs->info->peerSentPieceDataAt = time( NULL );
1381    i = tr_ioWrite( tor, req->index, req->offset, req->length, data );
1382    if( i )
1383        return 0;
1384
1385    tr_cpBlockAdd( tor->completion, block );
1386
1387    addPeerToBlamefield( msgs, req->index );
1388
1389    fireGotBlock( msgs, req );
1390
1391    /**
1392    ***  Handle if this was the last block in the piece
1393    **/
1394
1395    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1396    {
1397        if( tr_ioHash( tor, req->index ) )
1398        {
1399            gotBadPiece( msgs, req->index );
1400            return 0;
1401        }
1402
1403        fireClientHave( msgs, req->index );
1404    }
1405
1406    return 0;
1407}
1408
1409static ReadState
1410canRead( struct bufferevent * evin, void * vmsgs )
1411{
1412    ReadState ret;
1413    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1414    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1415    const size_t buflen = EVBUFFER_LENGTH( inbuf );
1416    const size_t downloadMax = getDownloadMax( msgs );
1417    const size_t n = MIN( buflen, downloadMax );
1418
1419    if( !n )
1420    {
1421        ret = READ_DONE;
1422    }
1423    else switch( msgs->state )
1424    {
1425        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, inbuf, n ); break;
1426        case AWAITING_BT_ID:      ret = readBtId     ( msgs, inbuf, n ); break;
1427        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, inbuf, n ); break;
1428        case AWAITING_BT_PIECE:   ret = readBtPiece  ( msgs, inbuf, n ); break;
1429        default: assert( 0 );
1430    }
1431
1432    return ret;
1433}
1434
1435static void
1436sendKeepalive( tr_peermsgs * msgs )
1437{
1438    dbgmsg( msgs, "sending a keepalive message" );
1439    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1440}
1441
1442/**
1443***
1444**/
1445
1446static size_t
1447getUploadMax( const tr_peermsgs * msgs )
1448{
1449    static const size_t maxval = ~0;
1450    const tr_torrent * tor = msgs->torrent;
1451    const int useSwift = SWIFT_ENABLED && !tr_torrentIsSeed( msgs->torrent );
1452    const size_t swiftLeft = msgs->info->credit;
1453    size_t speedLeft;
1454    size_t bufLeft;
1455    size_t ret;
1456
1457    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1458        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1459    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1460        speedLeft = tr_rcBytesLeft( tor->upload );
1461    else
1462        speedLeft = ~0;
1463
1464    bufLeft = 4096 - tr_peerIoWriteBytesWaiting( msgs->io );
1465    ret = MIN( speedLeft, bufLeft );
1466    if( useSwift)
1467        ret = MIN( ret, swiftLeft );
1468    return ret;
1469}
1470
1471static int
1472ratePulse( void * vmsgs )
1473{
1474    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1475    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1476    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1477    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/5), 100 );
1478    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1479    return TRUE;
1480}
1481
1482static struct peer_request*
1483popNextRequest( tr_peermsgs * msgs )
1484{
1485    struct peer_request * ret;
1486    ret = tr_list_pop_front( &msgs->peerAskedForFast );
1487    if( !ret )
1488        ret = tr_list_pop_front( &msgs->peerAskedFor);
1489    return ret;
1490}
1491
1492static void
1493updatePeerStatus( tr_peermsgs * msgs )
1494{
1495    tr_peer * peer = msgs->info;
1496
1497    if( !msgs->peerSentBitfield )
1498        peer->status = TR_PEER_STATUS_HANDSHAKE;
1499
1500    else if( ( time(NULL) - peer->pieceDataActivityDate ) < 3 )
1501        peer->status = peer->clientIsChoked
1502                       ? TR_PEER_STATUS_ACTIVE_AND_CHOKED
1503                       : TR_PEER_STATUS_ACTIVE;
1504
1505    else if( peer->peerIsChoked )
1506        peer->status = TR_PEER_STATUS_PEER_IS_CHOKED;
1507
1508    else if( peer->clientIsChoked )
1509        peer->status = peer->clientIsInterested
1510                       ? TR_PEER_STATUS_CLIENT_IS_INTERESTED
1511                       : TR_PEER_STATUS_CLIENT_IS_CHOKED;
1512
1513    else if( msgs->clientAskedFor != NULL )
1514        peer->status = TR_PEER_STATUS_REQUEST_SENT;
1515
1516    else
1517        peer->status = TR_PEER_STATUS_READY;
1518}
1519
1520static int
1521pulse( void * vmsgs )
1522{
1523    const time_t now = time( NULL );
1524    tr_peermsgs * msgs = vmsgs;
1525    struct peer_request * r;
1526
1527    tr_peerIoTryRead( msgs->io );
1528    pumpRequestQueue( msgs );
1529    updatePeerStatus( msgs );
1530
1531    if( msgs->sendingBlock )
1532    {
1533        const size_t uploadMax = getUploadMax( msgs );
1534        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1535        const size_t outlen = MIN( len, uploadMax );
1536
1537        assert( len );
1538
1539        if( outlen )
1540        {
1541            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1542            evbuffer_drain( msgs->outBlock, outlen );
1543            peerGotBytes( msgs, outlen );
1544
1545            len -= outlen;
1546            msgs->clientSentAnythingAt = now;
1547            msgs->sendingBlock = len!=0;
1548
1549            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1550        }
1551        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1552    }
1553
1554    if( !msgs->sendingBlock )
1555    {
1556        if(( EVBUFFER_LENGTH( msgs->outMessages ) ))
1557        {
1558            dbgmsg( msgs, "flushing outMessages..." );
1559            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1560            msgs->clientSentAnythingAt = now;
1561        }
1562        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1563            && (( r = popNextRequest( msgs )))
1564            && requestIsValid( msgs, r )
1565            && tr_cpPieceIsComplete( msgs->torrent->completion, r->index ) )
1566        {
1567            uint8_t * buf = tr_new( uint8_t, r->length );
1568
1569            if( !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1570            {
1571                tr_peerIo * io = msgs->io;
1572                struct evbuffer * out = msgs->outBlock;
1573
1574                dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
1575                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
1576                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1577                tr_peerIoWriteUint32( io, out, r->index );
1578                tr_peerIoWriteUint32( io, out, r->offset );
1579                tr_peerIoWriteBytes ( io, out, buf, r->length );
1580                msgs->sendingBlock = 1;
1581            }
1582
1583            tr_free( buf );
1584            tr_free( r );
1585        }
1586        else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1587        {
1588            sendKeepalive( msgs );
1589        }
1590    }
1591
1592    return TRUE; /* loop forever */
1593}
1594
1595static void
1596gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1597{
1598    if( what & EVBUFFER_TIMEOUT )
1599        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1600
1601    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) ) {
1602        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1603                what, errno, strerror(errno) );
1604        fireGotError( vmsgs );
1605    }
1606}
1607
1608static void
1609sendBitfield( tr_peermsgs * msgs )
1610{
1611    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1612    struct evbuffer * out = msgs->outMessages;
1613
1614    dbgmsg( msgs, "sending peer a bitfield message" );
1615    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1616    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1617    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1618}
1619
1620/**
1621***
1622**/
1623
1624/* some peers give us error messages if we send
1625   more than this many peers in a single pex message */
1626#define MAX_PEX_DIFFS 200
1627
1628typedef struct
1629{
1630    tr_pex * added;
1631    tr_pex * dropped;
1632    tr_pex * elements;
1633    int addedCount;
1634    int droppedCount;
1635    int elementCount;
1636    int diffCount;
1637}
1638PexDiffs;
1639
1640static void
1641pexAddedCb( void * vpex, void * userData )
1642{
1643    PexDiffs * diffs = (PexDiffs *) userData;
1644    tr_pex * pex = (tr_pex *) vpex;
1645    if( diffs->diffCount < MAX_PEX_DIFFS )
1646    {
1647        diffs->diffCount++;
1648        diffs->added[diffs->addedCount++] = *pex;
1649        diffs->elements[diffs->elementCount++] = *pex;
1650    }
1651}
1652
1653static void
1654pexRemovedCb( void * vpex, void * userData )
1655{
1656    PexDiffs * diffs = (PexDiffs *) userData;
1657    tr_pex * pex = (tr_pex *) vpex;
1658    if( diffs->diffCount < MAX_PEX_DIFFS )
1659    {
1660        diffs->diffCount++;
1661        diffs->dropped[diffs->droppedCount++] = *pex;
1662    }
1663}
1664
1665static void
1666pexElementCb( void * vpex, void * userData )
1667{
1668    PexDiffs * diffs = (PexDiffs *) userData;
1669    tr_pex * pex = (tr_pex *) vpex;
1670    if( diffs->diffCount < MAX_PEX_DIFFS )
1671    {
1672        diffs->diffCount++;
1673        diffs->elements[diffs->elementCount++] = *pex;
1674    }
1675}
1676
1677static void
1678sendPex( tr_peermsgs * msgs )
1679{
1680    if( msgs->peerSupportsPex && tr_torrentIsPexEnabled( msgs->torrent ) )
1681    {
1682        int i;
1683        tr_pex * newPex = NULL;
1684        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1685        PexDiffs diffs;
1686        benc_val_t val, *added, *dropped, *flags;
1687        uint8_t *tmp, *walk;
1688        char * benc;
1689        int bencLen;
1690
1691        /* build the diffs */
1692        diffs.added = tr_new( tr_pex, newCount );
1693        diffs.addedCount = 0;
1694        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1695        diffs.droppedCount = 0;
1696        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1697        diffs.elementCount = 0;
1698        diffs.diffCount = 0;
1699        tr_set_compare( msgs->pex, msgs->pexCount,
1700                        newPex, newCount,
1701                        tr_pexCompare, sizeof(tr_pex),
1702                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1703        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1704
1705        /* update peer */
1706        tr_free( msgs->pex );
1707        msgs->pex = diffs.elements;
1708        msgs->pexCount = diffs.elementCount;
1709
1710        /* build the pex payload */
1711        tr_bencInit( &val, TYPE_DICT );
1712        tr_bencDictReserve( &val, 3 );
1713
1714        /* "added" */
1715        added = tr_bencDictAdd( &val, "added" );
1716        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1717        for( i=0; i<diffs.addedCount; ++i ) {
1718            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1719            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1720        }
1721        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1722        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1723
1724        /* "added.f" */
1725        flags = tr_bencDictAdd( &val, "added.f" );
1726        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1727        for( i=0; i<diffs.addedCount; ++i )
1728            *walk++ = diffs.added[i].flags;
1729        assert( ( walk - tmp ) == diffs.addedCount );
1730        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1731
1732        /* "dropped" */
1733        dropped = tr_bencDictAdd( &val, "dropped" );
1734        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1735        for( i=0; i<diffs.droppedCount; ++i ) {
1736            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1737            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1738        }
1739        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1740        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1741
1742        /* write the pex message */
1743        benc = tr_bencSave( &val, &bencLen );
1744        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1745        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1746        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1747        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1748
1749        /* cleanup */
1750        tr_free( benc );
1751        tr_bencFree( &val );
1752        tr_free( diffs.added );
1753        tr_free( diffs.dropped );
1754        tr_free( newPex );
1755
1756        msgs->clientSentPexAt = time( NULL );
1757    }
1758}
1759
1760static int
1761pexPulse( void * vpeer )
1762{
1763    sendPex( vpeer );
1764    return TRUE;
1765}
1766
1767/**
1768***
1769**/
1770
1771tr_peermsgs*
1772tr_peerMsgsNew( struct tr_torrent * torrent,
1773                struct tr_peer    * info,
1774                tr_delivery_func    func,
1775                void              * userData,
1776                tr_publisher_tag  * setme )
1777{
1778    tr_peermsgs * m;
1779
1780    assert( info != NULL );
1781    assert( info->io != NULL );
1782
1783    m = tr_new0( tr_peermsgs, 1 );
1784    m->publisher = tr_publisherNew( );
1785    m->info = info;
1786    m->handle = torrent->handle;
1787    m->torrent = torrent;
1788    m->io = info->io;
1789    m->info->status = TR_PEER_STATUS_HANDSHAKE;
1790    m->info->clientIsChoked = 1;
1791    m->info->peerIsChoked = 1;
1792    m->info->clientIsInterested = 0;
1793    m->info->peerIsInterested = 0;
1794    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1795    m->state = AWAITING_BT_LENGTH;
1796    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1797    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1798    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1799    m->outMessages = evbuffer_new( );
1800    m->incoming.block = evbuffer_new( );
1801    m->outBlock = evbuffer_new( );
1802    m->peerAllowedPieces = NULL;
1803    m->clientAllowedPieces = NULL;
1804    m->clientSuggestedPieces = NULL;
1805    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1806   
1807    if ( tr_peerIoSupportsFEXT( m->io ) )
1808    {
1809        /* This peer is fastpeer-enabled, generate its allowed set
1810         * (before registering our callbacks) */
1811        if ( !m->peerAllowedPieces ) {
1812            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1813           
1814            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_FAST_ALLOWED_COUNT,
1815                                                                 m->torrent->info.pieceCount,
1816                                                                 m->torrent->info.hash,
1817                                                                 peerAddr );
1818        }
1819        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1820       
1821        m->clientSuggestedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1822    }
1823   
1824    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1825    tr_peerIoSetIOFuncs( m->io, canRead, gotError, m );
1826    ratePulse( m );
1827
1828    if ( tr_peerIoSupportsLTEP( m->io ) )
1829        sendLtepHandshake( m );
1830
1831    if ( !tr_peerIoSupportsFEXT( m->io ) )
1832        sendBitfield( m );
1833    else {
1834        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1835        uint32_t peerProgress;
1836        float completion = tr_cpPercentComplete( m->torrent->completion );
1837        if ( completion == 0.0f ) {
1838            sendFastHave( m, 0 );
1839        } else if ( completion == 1.0f ) {
1840            sendFastHave( m, 1 );
1841        } else {
1842            sendBitfield( m );
1843        }
1844        peerProgress = m->torrent->info.pieceCount * m->info->progress;
1845       
1846        if ( peerProgress < MAX_FAST_ALLOWED_COUNT )
1847            sendFastAllowedSet( m );
1848    }
1849
1850    return m;
1851}
1852
1853void
1854tr_peerMsgsFree( tr_peermsgs* msgs )
1855{
1856    if( msgs != NULL )
1857    {
1858        tr_timerFree( &msgs->pulseTimer );
1859        tr_timerFree( &msgs->rateTimer );
1860        tr_timerFree( &msgs->pexTimer );
1861        tr_publisherFree( &msgs->publisher );
1862        tr_list_free( &msgs->clientWillAskFor, tr_free );
1863        tr_list_free( &msgs->clientAskedFor, tr_free );
1864        tr_list_free( &msgs->peerAskedForFast, tr_free );
1865        tr_list_free( &msgs->peerAskedFor, tr_free );
1866        tr_bitfieldFree( msgs->peerAllowedPieces );
1867        tr_bitfieldFree( msgs->clientAllowedPieces );
1868        tr_bitfieldFree( msgs->clientSuggestedPieces );
1869        evbuffer_free( msgs->incoming.block );
1870        evbuffer_free( msgs->outMessages );
1871        evbuffer_free( msgs->outBlock );
1872        tr_free( msgs->pex );
1873
1874        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1875        tr_free( msgs );
1876    }
1877}
1878
1879tr_publisher_tag
1880tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1881                      tr_delivery_func    func,
1882                      void              * userData )
1883{
1884    return tr_publisherSubscribe( peer->publisher, func, userData );
1885}
1886
1887void
1888tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1889                        tr_publisher_tag    tag )
1890{
1891    tr_publisherUnsubscribe( peer->publisher, tag );
1892}
1893
1894int
1895tr_peerMsgsIsPieceFastAllowed( const tr_peermsgs * peer,
1896                               uint32_t            index )
1897{
1898    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1899}
1900
1901int
1902tr_peerMsgsIsPieceSuggested( const tr_peermsgs * peer,
1903                             uint32_t            index )
1904{
1905    return tr_bitfieldHas( peer->clientSuggestedPieces, index );
1906}
1907
Note: See TracBrowser for help on using the repository browser.