source: trunk/libtransmission/peer-msgs.c @ 5277

Last change on this file since 5277 was 5277, checked in by charles, 14 years ago

#781: try to get a little further on this `too much corrupt' ticket by adding a more helpful log message when a downloaded piece fails its checksum test.

  • Property svn:keywords set to Date Rev Author Id
File size: 56.2 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 5277 2008-03-18 01:39:06Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18#include <libgen.h> /* basename */
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "inout.h"
26#include "peer-io.h"
27#include "peer-mgr.h"
28#include "peer-mgr-private.h"
29#include "peer-msgs.h"
30#include "ratecontrol.h"
31#include "stats.h"
32#include "torrent.h"
33#include "trevent.h"
34#include "utils.h"
35
36/**
37***
38**/
39
40enum
41{
42    BT_CHOKE                = 0,
43    BT_UNCHOKE              = 1,
44    BT_INTERESTED           = 2,
45    BT_NOT_INTERESTED       = 3,
46    BT_HAVE                 = 4,
47    BT_BITFIELD             = 5,
48    BT_REQUEST              = 6,
49    BT_PIECE                = 7,
50    BT_CANCEL               = 8,
51    BT_PORT                 = 9,
52    BT_SUGGEST              = 13,
53    BT_HAVE_ALL             = 14,
54    BT_HAVE_NONE            = 15,
55    BT_REJECT               = 16,
56    BT_ALLOWED_FAST         = 17,
57    BT_LTEP                 = 20,
58
59    LTEP_HANDSHAKE          = 0,
60
61    TR_LTEP_PEX             = 1,
62
63    MIN_CHOKE_PERIOD_SEC    = (10),
64
65    /* idle seconds before we send a keepalive */
66    KEEPALIVE_INTERVAL_SECS = 90,
67
68    PEX_INTERVAL            = (60 * 1000), /* msec between sendPex() calls */
69    PEER_PULSE_INTERVAL     = (100),       /* msec between pulse() calls */
70    RATE_PULSE_INTERVAL     = (250),       /* msec between ratePulse() calls */
71
72    MAX_QUEUE_SIZE          = (100),
73    MAX_OUTBUF_SIZE         = (1024),
74     
75    /* (fast peers) max number of pieces we fast-allow to another peer */
76    MAX_FAST_ALLOWED_COUNT   = 10,
77
78    /* (fast peers) max threshold for allowing fast-pieces requests */
79    MAX_FAST_ALLOWED_THRESHOLD = 10,
80
81    QUEUED_REQUEST_TTL_SECS = 20,
82
83    SENT_REQUEST_TTL_SECS = 90
84};
85
86/**
87***  REQUEST MANAGEMENT
88**/
89
90enum
91{
92    AWAITING_BT_LENGTH,
93    AWAITING_BT_ID,
94    AWAITING_BT_MESSAGE,
95    AWAITING_BT_PIECE
96};
97
98struct peer_request
99{
100    uint32_t index;
101    uint32_t offset;
102    uint32_t length;
103    time_t time_requested;
104};
105
106static int
107compareRequest( const void * va, const void * vb )
108{
109    int i;
110    const struct peer_request * a = va;
111    const struct peer_request * b = vb;
112    if(( i = tr_compareUint32( a->index, b->index ))) return i;
113    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
114    if(( i = tr_compareUint32( a->length, b->length ))) return i;
115    return 0;
116}
117
118struct request_list
119{
120    uint16_t count;
121    uint16_t max;
122    struct peer_request * requests;
123};
124
125static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
126
127static void
128reqListReserve( struct request_list * list, uint16_t max )
129{
130    if( list->max < max )
131    {
132        list->max = max;
133        list->requests = tr_renew( struct peer_request,
134                                   list->requests,
135                                   list->max );
136    }
137}
138
139static void
140reqListClear( struct request_list * list )
141{
142    tr_free( list->requests );
143    *list = REQUEST_LIST_INIT;
144}
145
146static void
147reqListRemoveOne( struct request_list * list, int i )
148{
149    assert( 0<=i && i<list->count );
150
151    memmove( &list->requests[i],
152             &list->requests[i+1],
153             sizeof( struct peer_request ) * ( --list->count - i ) );
154}
155
156static void
157reqListAppend( struct request_list * list, const struct peer_request * req )
158{
159    if( ++list->count >= list->max )
160        reqListReserve( list, list->max + 8 );
161
162    list->requests[list->count-1] = *req;
163}
164
165static tr_errno
166reqListPop( struct request_list * list, struct peer_request * setme )
167{
168    tr_errno err;
169
170    if( !list->count )
171        err = TR_ERROR;
172    else {
173        *setme = list->requests[0];
174        reqListRemoveOne( list, 0 );
175        err = TR_OK;
176    }
177
178    return err;
179}
180
181static int
182reqListFind( struct request_list * list, const struct peer_request * key )
183{
184    uint16_t i;
185    for( i=0; i<list->count; ++i )
186        if( !compareRequest( key, list->requests+i ) )
187            return i;
188    return -1;
189}
190
191static tr_errno
192reqListRemove( struct request_list * list, const struct peer_request * key )
193{
194    tr_errno err;
195    const int i = reqListFind( list, key );
196
197    if( i < 0 )
198        err = TR_ERROR;
199    else {
200        err = TR_OK;
201        reqListRemoveOne( list, i );
202    }
203
204    return err;
205}
206
207static void
208reqListPrune( struct request_list * list,
209              struct request_list * pruned,
210              time_t                cutoff )
211{
212    int i, k=0, p=0;
213    struct peer_request keep[MAX_QUEUE_SIZE];
214    struct peer_request prune[MAX_QUEUE_SIZE];
215
216    for( i=0; i<list->count; ++i ) {
217        const struct peer_request * req = list->requests + i;
218        if( req->time_requested > cutoff )
219            keep[k++] = *req;
220        else
221            prune[p++] = *req;
222    }
223
224    memcpy( list->requests, keep, sizeof(struct peer_request) * k );
225    list->count = k;
226
227    reqListReserve( pruned, pruned->count + p );
228    memcpy( pruned->requests + pruned->count,
229            prune,
230            sizeof(struct peer_request) * p );
231    pruned->count += p;
232}
233
234/**
235***
236**/
237
238/* this is raw, unchanged data from the peer regarding
239 * the current message that it's sending us. */
240struct tr_incoming
241{
242    uint8_t id;
243    uint32_t length; /* includes the +1 for id length */
244    struct peer_request blockReq; /* metadata for incoming blocks */
245    struct evbuffer * block; /* piece data for incoming blocks */
246};
247
248struct tr_peermsgs
249{
250    unsigned int peerSentBitfield         : 1;
251    unsigned int peerSupportsPex          : 1;
252    unsigned int clientSentLtepHandshake  : 1;
253    unsigned int peerSentLtepHandshake    : 1;
254    unsigned int sendingBlock             : 1;
255   
256    uint8_t state;
257    uint8_t ut_pex_id;
258    uint16_t pexCount;
259    uint16_t maxActiveRequests;
260    uint16_t minActiveRequests;
261
262    tr_peer * info;
263
264    tr_handle * handle;
265    tr_torrent * torrent;
266    tr_peerIo * io;
267
268    tr_publisher_t * publisher;
269
270    struct evbuffer * outBlock;    /* buffer of the current piece message */
271    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
272
273    struct request_list peerAskedFor;
274    struct request_list peerAskedForFast;
275    struct request_list clientAskedFor;
276    struct request_list clientWillAskFor;
277
278    tr_timer * rateTimer;
279    tr_timer * pulseTimer;
280    tr_timer * pexTimer;
281
282    time_t clientSentPexAt;
283    time_t clientSentAnythingAt;
284   
285    tr_bitfield * peerAllowedPieces;
286
287    struct tr_incoming incoming; 
288
289    tr_pex * pex;
290};
291
292/**
293***
294**/
295
296static void
297myDebug( const char * file, int line,
298         const struct tr_peermsgs * msgs,
299         const char * fmt, ... )
300{
301    FILE * fp = tr_getLog( );
302    if( fp != NULL )
303    {
304        va_list args;
305        char timestr[64];
306        struct evbuffer * buf = evbuffer_new( );
307        char * myfile = tr_strdup( file );
308
309        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
310                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
311                             msgs->torrent->info.name,
312                             tr_peerIoGetAddrStr( msgs->io ),
313                             msgs->info->client );
314        va_start( args, fmt );
315        evbuffer_add_vprintf( buf, fmt, args );
316        va_end( args );
317        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
318        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
319
320        tr_free( myfile );
321        evbuffer_free( buf );
322    }
323}
324
325#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
326
327/**
328***
329**/
330
331static void
332protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
333{
334    tr_peerIo * io = msgs->io;
335    struct evbuffer * out = msgs->outMessages;
336
337    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
338    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
339    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
340    tr_peerIoWriteUint32( io, out, req->index );
341    tr_peerIoWriteUint32( io, out, req->offset );
342    tr_peerIoWriteUint32( io, out, req->length );
343}
344
345static void
346protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
347{
348    tr_peerIo * io = msgs->io;
349    struct evbuffer * out = msgs->outMessages;
350
351    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
352    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
353    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
354    tr_peerIoWriteUint32( io, out, req->index );
355    tr_peerIoWriteUint32( io, out, req->offset );
356    tr_peerIoWriteUint32( io, out, req->length );
357}
358
359static void
360protocolSendHave( tr_peermsgs * msgs, uint32_t index )
361{
362    tr_peerIo * io = msgs->io;
363    struct evbuffer * out = msgs->outMessages;
364
365    dbgmsg( msgs, "sending Have %u", index );
366    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
367    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
368    tr_peerIoWriteUint32( io, out, index );
369}
370
371static void
372protocolSendChoke( tr_peermsgs * msgs, int choke )
373{
374    tr_peerIo * io = msgs->io;
375    struct evbuffer * out = msgs->outMessages;
376
377    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
378    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
379    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
380}
381
382/**
383***  EVENTS
384**/
385
386static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
387
388static void
389publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
390{
391    tr_publisherPublish( msgs->publisher, msgs->info, e );
392}
393
394static void
395fireError( tr_peermsgs * msgs, tr_errno err )
396{
397    tr_peermsgs_event e = blankEvent;
398    e.eventType = TR_PEERMSG_ERROR;
399    e.err = err;
400    publish( msgs, &e );
401}
402
403static void
404fireNeedReq( tr_peermsgs * msgs )
405{
406    tr_peermsgs_event e = blankEvent;
407    e.eventType = TR_PEERMSG_NEED_REQ;
408    publish( msgs, &e );
409}
410
411static void
412firePeerProgress( tr_peermsgs * msgs )
413{
414    tr_peermsgs_event e = blankEvent;
415    e.eventType = TR_PEERMSG_PEER_PROGRESS;
416    e.progress = msgs->info->progress;
417    publish( msgs, &e );
418}
419
420static void
421fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
422{
423    tr_peermsgs_event e = blankEvent;
424    e.eventType = TR_PEERMSG_CLIENT_HAVE;
425    e.pieceIndex = pieceIndex;
426    publish( msgs, &e );
427}
428
429static void
430fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
431{
432    tr_peermsgs_event e = blankEvent;
433    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
434    e.pieceIndex = req->index;
435    e.offset = req->offset;
436    e.length = req->length;
437    publish( msgs, &e );
438}
439
440static void
441firePieceData( tr_peermsgs * msgs )
442{
443    tr_peermsgs_event e = blankEvent;
444    e.eventType = TR_PEERMSG_PIECE_DATA;
445    publish( msgs, &e );
446}
447
448static void
449fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
450{
451    tr_peermsgs_event e = blankEvent;
452    e.eventType = TR_PEERMSG_CANCEL;
453    e.pieceIndex = req->index;
454    e.offset = req->offset;
455    e.length = req->length;
456    publish( msgs, &e );
457}
458
459/**
460***  INTEREST
461**/
462
463static int
464isPieceInteresting( const tr_peermsgs   * peer,
465                    int                   piece )
466{
467    const tr_torrent * torrent = peer->torrent;
468
469    return ( ( !torrent->info.pieces[piece].dnd )               /* we want it */
470          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
471          && ( tr_bitfieldHas( peer->info->have, piece ) ) );  /* peer has it */
472}
473
474/* "interested" means we'll ask for piece data if they unchoke us */
475static int
476isPeerInteresting( const tr_peermsgs * msgs )
477{
478    int i;
479    const tr_torrent * torrent;
480    const tr_bitfield * bitfield;
481    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
482
483    if( clientIsSeed )
484        return FALSE;
485
486    torrent = msgs->torrent;
487    bitfield = tr_cpPieceBitfield( torrent->completion );
488
489    if( !msgs->info->have )
490        return TRUE;
491
492    assert( bitfield->len == msgs->info->have->len );
493    for( i=0; i<torrent->info.pieceCount; ++i )
494        if( isPieceInteresting( msgs, i ) )
495            return TRUE;
496
497    return FALSE;
498}
499
500static void
501sendInterest( tr_peermsgs * msgs, int weAreInterested )
502{
503    assert( msgs != NULL );
504    assert( weAreInterested==0 || weAreInterested==1 );
505
506    msgs->info->clientIsInterested = weAreInterested;
507    dbgmsg( msgs, "Sending %s",
508            weAreInterested ? "Interested" : "Not Interested");
509
510    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
511    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
512                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
513}
514
515static void
516updateInterest( tr_peermsgs * msgs )
517{
518    const int i = isPeerInteresting( msgs );
519    if( i != msgs->info->clientIsInterested )
520        sendInterest( msgs, i );
521    if( i )
522        fireNeedReq( msgs );
523}
524
525static void
526cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
527{
528    reqListClear( &msgs->peerAskedFor );
529}
530
531void
532tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
533{
534    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
535
536    assert( msgs != NULL );
537    assert( msgs->info != NULL );
538    assert( choke==0 || choke==1 );
539
540    if( msgs->info->chokeChangedAt > fibrillationTime )
541    {
542        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
543    }
544    else if( msgs->info->peerIsChoked != choke )
545    {
546        msgs->info->peerIsChoked = choke;
547        if( choke )
548            cancelAllRequestsToClientExceptFast( msgs );
549        protocolSendChoke( msgs, choke );
550        msgs->info->chokeChangedAt = time( NULL );
551    }
552}
553
554/**
555***
556**/
557
558void
559tr_peerMsgsHave( tr_peermsgs * msgs,
560                 uint32_t      index )
561{
562    protocolSendHave( msgs, index );
563
564    /* since we have more pieces now, we might not be interested in this peer */
565    updateInterest( msgs );
566}
567#if 0
568static void
569sendFastSuggest( tr_peermsgs * msgs,
570                 uint32_t      pieceIndex )
571{
572    assert( msgs != NULL );
573   
574    if( tr_peerIoSupportsFEXT( msgs->io ) )
575    {
576        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
577        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
578        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
579    }
580}
581#endif
582static void
583sendFastHave( tr_peermsgs * msgs, int all )
584{
585    assert( msgs != NULL );
586   
587    if( tr_peerIoSupportsFEXT( msgs->io ) )
588    {
589        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
590        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
591                                                                : BT_HAVE_NONE ) );
592        updateInterest( msgs );
593    }
594}
595
596static void
597sendFastReject( tr_peermsgs * msgs,
598                uint32_t      pieceIndex,
599                uint32_t      offset,
600                uint32_t      length )
601{
602    assert( msgs != NULL );
603
604    if( tr_peerIoSupportsFEXT( msgs->io ) )
605    {
606        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
607        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
608        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
609        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
610        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
611        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
612    }
613}
614
615static tr_bitfield*
616getPeerAllowedPieces( tr_peermsgs * msgs )
617{
618    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
619    {
620        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
621            MAX_FAST_ALLOWED_COUNT,
622            msgs->torrent->info.pieceCount,
623            msgs->torrent->info.hash,
624            tr_peerIoGetAddress( msgs->io, NULL ) );
625    }
626
627    return msgs->peerAllowedPieces;
628}
629
630static void
631sendFastAllowed( tr_peermsgs * msgs,
632                 uint32_t      pieceIndex)
633{
634    assert( msgs != NULL );
635   
636    if( tr_peerIoSupportsFEXT( msgs->io ) )
637    {
638        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
639        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
640        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
641    }
642}
643
644static void
645sendFastAllowedSet( tr_peermsgs * msgs )
646{
647    int i = 0;
648
649    while (i <= msgs->torrent->info.pieceCount )
650    {
651        if ( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i) )
652            sendFastAllowed( msgs, i );
653        i++;
654    }
655}
656
657static void
658maybeSendFastAllowedSet( tr_peermsgs * msgs )
659{
660    if( tr_bitfieldCountTrueBits( msgs->info->have ) <= MAX_FAST_ALLOWED_THRESHOLD )
661        sendFastAllowedSet( msgs );
662}
663
664
665/**
666***
667**/
668
669static int
670reqIsValid( const tr_peermsgs   * msgs,
671            uint32_t              index,
672            uint32_t              offset,
673            uint32_t              length )
674{
675    return tr_torrentReqIsValid( msgs->torrent, index, offset, length );
676}
677
678static int
679requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
680{
681    return reqIsValid( msgs, req->index, req->offset, req->length );
682}
683
684static void
685expireOldRequests( tr_peermsgs * msgs )
686{
687    int i;
688    const time_t now = time( NULL );
689    const time_t queued_cutoff = now - QUEUED_REQUEST_TTL_SECS;
690    const time_t sent_cutoff   = now - SENT_REQUEST_TTL_SECS;
691    struct request_list pruned = REQUEST_LIST_INIT;
692
693    reqListPrune( &msgs->clientWillAskFor, &pruned, queued_cutoff );
694    reqListPrune( &msgs->clientAskedFor, &pruned, sent_cutoff );
695
696    /* expire the old requests */
697    for( i=0; i<pruned.count; ++i ) {
698        const struct peer_request * req = &pruned.requests[i];
699        tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
700    }
701
702    /* cleanup */
703    reqListClear( &pruned );
704}
705
706static void
707pumpRequestQueue( tr_peermsgs * msgs )
708{
709    const int max = msgs->maxActiveRequests;
710    const int min = msgs->minActiveRequests;
711    const time_t now = time( NULL );
712    int sent = 0;
713    int count = msgs->clientAskedFor.count;
714    struct peer_request req;
715
716    if( count > min )
717        return;
718    if( msgs->info->clientIsChoked )
719        return;
720
721    while( ( count < max ) && !reqListPop( &msgs->clientWillAskFor, &req ) )
722    {
723        assert( requestIsValid( msgs, &req ) );
724        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
725
726        protocolSendRequest( msgs, &req );
727        req.time_requested = now;
728        reqListAppend( &msgs->clientAskedFor, &req );
729
730        ++count;
731        ++sent;
732    }
733
734    if( sent )
735        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
736                sent,
737                msgs->clientAskedFor.count,
738                msgs->clientWillAskFor.count );
739
740    if( count < max )
741        fireNeedReq( msgs );
742}
743
744static int
745pulse( void * vmsgs );
746
747int
748tr_peerMsgsAddRequest( tr_peermsgs * msgs,
749                       uint32_t      index, 
750                       uint32_t      offset, 
751                       uint32_t      length )
752{
753    const int req_max = msgs->maxActiveRequests;
754    struct peer_request req;
755
756    assert( msgs != NULL );
757    assert( msgs->torrent != NULL );
758    assert( reqIsValid( msgs, index, offset, length ) );
759
760    /**
761    ***  Reasons to decline the request
762    **/
763
764    /* don't send requests to choked clients */
765    if( msgs->info->clientIsChoked ) {
766        dbgmsg( msgs, "declining request because they're choking us" );
767        return TR_ADDREQ_CLIENT_CHOKED;
768    }
769
770    /* peer doesn't have this piece */
771    if( !tr_bitfieldHas( msgs->info->have, index ) )
772        return TR_ADDREQ_MISSING;
773
774    /* peer's queue is full */
775    if( msgs->clientWillAskFor.count >= req_max ) {
776        dbgmsg( msgs, "declining request because we're full" );
777        return TR_ADDREQ_FULL;
778    }
779
780    /* have we already asked for this piece? */
781    req.index = index;
782    req.offset = offset;
783    req.length = length;
784    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
785        dbgmsg( msgs, "declining because it's a duplicate" );
786        return TR_ADDREQ_DUPLICATE;
787    }
788    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
789        dbgmsg( msgs, "declining because it's a duplicate" );
790        return TR_ADDREQ_DUPLICATE;
791    }
792
793    /**
794    ***  Accept this request
795    **/
796
797    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
798    req.time_requested = time( NULL );
799    reqListAppend( &msgs->clientWillAskFor, &req );
800    return TR_ADDREQ_OK;
801}
802
803static void
804cancelAllRequestsToPeer( tr_peermsgs * msgs )
805{
806    int i;
807    struct request_list a = msgs->clientWillAskFor;
808    struct request_list b = msgs->clientAskedFor;
809
810    msgs->clientAskedFor = REQUEST_LIST_INIT;
811    msgs->clientWillAskFor = REQUEST_LIST_INIT;
812
813    for( i=0; i<a.count; ++i )
814        fireCancelledReq( msgs, &a.requests[i] );
815
816    for( i=0; i<b.count; ++i ) {
817        fireCancelledReq( msgs, &b.requests[i] );
818        protocolSendCancel( msgs, &b.requests[i] );
819    }
820
821    reqListClear( &a );
822    reqListClear( &b );
823}
824
825void
826tr_peerMsgsCancel( tr_peermsgs * msgs,
827                   uint32_t      pieceIndex,
828                   uint32_t      offset,
829                   uint32_t      length )
830{
831    struct peer_request req;
832
833    assert( msgs != NULL );
834    assert( length > 0 );
835
836    /* have we asked the peer for this piece? */
837    req.index = pieceIndex;
838    req.offset = offset;
839    req.length = length;
840
841    /* if it's only in the queue and hasn't been sent yet, free it */
842    if( !reqListRemove( &msgs->clientWillAskFor, &req ) )
843        fireCancelledReq( msgs, &req );
844
845    /* if it's already been sent, send a cancel message too */
846    if( !reqListRemove( &msgs->clientAskedFor, &req ) ) {
847        protocolSendCancel( msgs, &req );
848        fireCancelledReq( msgs, &req );
849    }
850}
851
852/**
853***
854**/
855
856static void
857sendLtepHandshake( tr_peermsgs * msgs )
858{
859    tr_benc val, *m;
860    char * buf;
861    int len;
862    int pex;
863    const char * v = TR_NAME " " USERAGENT_PREFIX;
864    const int port = tr_getPublicPort( msgs->handle );
865    struct evbuffer * outbuf;
866
867    if( msgs->clientSentLtepHandshake )
868        return;
869
870    outbuf = evbuffer_new( );
871    dbgmsg( msgs, "sending an ltep handshake" );
872    msgs->clientSentLtepHandshake = 1;
873
874    /* decide if we want to advertise pex support */
875    if( !tr_torrentAllowsPex( msgs->torrent ) )
876        pex = 0;
877    else if( msgs->peerSentLtepHandshake )
878        pex = msgs->peerSupportsPex ? 1 : 0;
879    else
880        pex = 1;
881
882    tr_bencInit( &val, TYPE_DICT );
883    tr_bencDictReserve( &val, 4 );
884    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
885    m  = tr_bencDictAdd( &val, "m" );
886    tr_bencInit( m, TYPE_DICT );
887    if( pex ) {
888        tr_bencDictReserve( m, 1 );
889        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), TR_LTEP_PEX );
890    }
891    if( port > 0 )
892        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
893    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
894    buf = tr_bencSave( &val, &len );
895
896    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
897    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
898    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
899    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
900
901    tr_peerIoWriteBuf( msgs->io, outbuf );
902
903#if 0
904    dbgmsg( msgs, "here is the ltep handshake we sent:" );
905    tr_bencPrint( &val );
906    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
907#endif
908
909    /* cleanup */
910    tr_bencFree( &val );
911    tr_free( buf );
912    evbuffer_free( outbuf );
913}
914
915static void
916parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
917{
918    tr_benc val, * sub;
919    uint8_t * tmp = tr_new( uint8_t, len );
920
921    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
922    msgs->peerSentLtepHandshake = 1;
923
924    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
925        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
926        tr_free( tmp );
927        return;
928    }
929
930#if 0
931    dbgmsg( msgs, "here is the ltep handshake we read:" );
932    tr_bencPrint( &val );
933    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
934#endif
935
936    /* does the peer prefer encrypted connections? */
937    if(( sub = tr_bencDictFindType( &val, "e", TYPE_INT )))
938        msgs->info->encryption_preference = sub->val.i
939                                      ? ENCRYPTION_PREFERENCE_YES
940                                      : ENCRYPTION_PREFERENCE_NO;
941
942    /* check supported messages for utorrent pex */
943    msgs->peerSupportsPex = 0;
944    if(( sub = tr_bencDictFindType( &val, "m", TYPE_DICT ))) {
945        if(( sub = tr_bencDictFindType( sub, "ut_pex", TYPE_INT ))) {
946            msgs->ut_pex_id = (uint8_t) sub->val.i;
947            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
948            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
949        }
950    }
951
952    /* get peer's listening port */
953    if(( sub = tr_bencDictFindType( &val, "p", TYPE_INT ))) {
954        msgs->info->port = htons( (uint16_t)sub->val.i );
955        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
956    }
957
958    tr_bencFree( &val );
959    tr_free( tmp );
960}
961
962static void
963parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
964{
965    int loaded = 0;
966    uint8_t * tmp = tr_new( uint8_t, msglen );
967    tr_benc val, *sub;
968    const tr_torrent * tor = msgs->torrent;
969    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
970
971    if( tr_torrentAllowsPex( tor )
972        && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
973        && (( sub = tr_bencDictFindType( &val, "added", TYPE_STR ))))
974    {
975        const int n = sub->val.s.i / 6 ;
976        if( n )
977            tr_torinf( tor, _( "Got %d peers from peer exchange" ), n );
978        tr_peerMgrAddPeers( msgs->handle->peerMgr,
979                            tor->info.hash,
980                            TR_PEER_FROM_PEX,
981                            (uint8_t*)sub->val.s.s, n );
982    }
983
984    if( loaded )
985        tr_bencFree( &val );
986    tr_free( tmp );
987}
988
989static void
990sendPex( tr_peermsgs * msgs );
991
992static void
993parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
994{
995    uint8_t ltep_msgid;
996
997    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
998    msglen--;
999
1000    if( ltep_msgid == LTEP_HANDSHAKE )
1001    {
1002        dbgmsg( msgs, "got ltep handshake" );
1003        parseLtepHandshake( msgs, msglen, inbuf );
1004        if( tr_peerIoSupportsLTEP( msgs->io ) )
1005        {
1006            sendLtepHandshake( msgs );
1007            sendPex( msgs );
1008        }
1009    }
1010    else if( ltep_msgid == TR_LTEP_PEX )
1011    {
1012        dbgmsg( msgs, "got ut pex" );
1013        msgs->peerSupportsPex = 1;
1014        parseUtPex( msgs, msglen, inbuf );
1015    }
1016    else
1017    {
1018        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1019        evbuffer_drain( inbuf, msglen );
1020    }
1021}
1022
1023static int
1024readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1025{
1026    uint32_t len;
1027
1028    if( inlen < sizeof(len) )
1029        return READ_MORE;
1030
1031    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1032
1033    if( len == 0 ) /* peer sent us a keepalive message */
1034        dbgmsg( msgs, "got KeepAlive" );
1035    else {
1036        msgs->incoming.length = len;
1037        msgs->state = AWAITING_BT_ID;
1038    }
1039
1040    return READ_AGAIN;
1041}
1042
1043static int
1044readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
1045
1046static int
1047readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1048{
1049    uint8_t id;
1050
1051    if( inlen < sizeof(uint8_t) )
1052        return READ_MORE;
1053
1054    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1055    msgs->incoming.id = id;
1056
1057    if( id==BT_PIECE )
1058    {
1059        msgs->state = AWAITING_BT_PIECE;
1060        return READ_AGAIN;
1061    }
1062    else if( msgs->incoming.length != 1 )
1063    {
1064        msgs->state = AWAITING_BT_MESSAGE;
1065        return READ_AGAIN;
1066    }
1067    else return readBtMessage( msgs, inbuf, inlen-1 );
1068}
1069
1070static void
1071updatePeerProgress( tr_peermsgs * msgs )
1072{
1073    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1074                         / (float)msgs->torrent->info.pieceCount;
1075    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1076    updateInterest( msgs );
1077    firePeerProgress( msgs );
1078}
1079
1080static int
1081clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1082{
1083    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1084    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
1085        return FALSE;
1086   
1087    /* ...or if we don't have ourself enough pieces */
1088    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
1089        return FALSE;
1090
1091    /* Maybe a bandwidth limit ? */
1092    return TRUE;
1093}
1094
1095static void
1096peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1097{
1098    const int reqIsValid = requestIsValid( msgs, req );
1099    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1100    const int peerIsChoked = msgs->info->peerIsChoked;
1101    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1102    const int pieceIsFast = reqIsValid && tr_bitfieldHas( getPeerAllowedPieces( msgs ), req->index );
1103    const int canSendFast = clientCanSendFastBlock( msgs );
1104
1105    if( !reqIsValid ) /* bad request */
1106    {
1107        dbgmsg( msgs, "rejecting an invalid request." );
1108        sendFastReject( msgs, req->index, req->offset, req->length );
1109    }
1110    else if( !clientHasPiece ) /* we don't have it */
1111    {
1112        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1113        sendFastReject( msgs, req->index, req->offset, req->length );
1114    }
1115    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1116    {
1117        tr_peerMsgsSetChoke( msgs, 1 );
1118        sendFastReject( msgs, req->index, req->offset, req->length );
1119    }
1120    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1121    {
1122        sendFastReject( msgs, req->index, req->offset, req->length );
1123    }
1124    else /* YAY */
1125    {
1126        if( peerIsFast && pieceIsFast )
1127            reqListAppend( &msgs->peerAskedForFast, req );
1128        else
1129            reqListAppend( &msgs->peerAskedFor, req );
1130    }
1131}
1132
1133static int
1134messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1135{
1136    switch( id )
1137    {
1138        case BT_CHOKE:
1139        case BT_UNCHOKE:
1140        case BT_INTERESTED:
1141        case BT_NOT_INTERESTED:
1142        case BT_HAVE_ALL:
1143        case BT_HAVE_NONE:
1144            return len==1;
1145
1146        case BT_HAVE:
1147        case BT_SUGGEST:
1148        case BT_ALLOWED_FAST:
1149            return len==5;
1150
1151        case BT_BITFIELD:
1152            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1153       
1154        case BT_REQUEST:
1155        case BT_CANCEL:
1156        case BT_REJECT:
1157            return len==13;
1158
1159        case BT_PIECE:
1160            return len>9 && len<=16393;
1161
1162        case BT_PORT:
1163            return len==3;
1164
1165        case BT_LTEP:
1166            return len >= 2;
1167
1168        default:
1169            return FALSE;
1170    }
1171}
1172
1173static int
1174clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1175
1176static void
1177clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1178{
1179    const time_t now = time( NULL );
1180    tr_torrent * tor = msgs->torrent;
1181    tor->activityDate = tr_date( );
1182    tor->downloadedCur += byteCount;
1183    msgs->info->pieceDataActivityDate = now;
1184    msgs->info->credit += (int)(byteCount * SWIFT_REPAYMENT_RATIO);
1185    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1186    tr_rcTransferred( tor->download, byteCount );
1187    tr_rcTransferred( tor->handle->download, byteCount );
1188    tr_statsAddDownloaded( msgs->handle, byteCount );
1189    firePieceData( msgs );
1190}
1191
1192static int
1193readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1194{
1195    struct peer_request * req = &msgs->incoming.blockReq;
1196    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1197    dbgmsg( msgs, "In readBtPiece" );
1198
1199    if( !req->length )
1200    {
1201        if( inlen < 8 )
1202            return READ_MORE;
1203
1204        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1205        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1206        req->length = msgs->incoming.length - 9;
1207        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1208        return READ_AGAIN;
1209    }
1210    else
1211    {
1212        int err;
1213
1214        /* read in another chunk of data */
1215        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1216        size_t n = MIN( nLeft, inlen );
1217        uint8_t * buf = tr_new( uint8_t, n );
1218        assert( EVBUFFER_LENGTH(inbuf) >= n );
1219        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1220        evbuffer_add( msgs->incoming.block, buf, n );
1221        clientGotBytes( msgs, n );
1222        tr_free( buf );
1223        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1224               (int)n, req->index, req->offset, req->length,
1225               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1226        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1227            return READ_MORE;
1228
1229        /* we've got the whole block ... process it */
1230        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1231
1232        /* cleanup */
1233        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1234        req->length = 0;
1235        msgs->state = AWAITING_BT_LENGTH;
1236        if( !err )
1237            return READ_AGAIN;
1238        else {
1239            fireError( msgs, err );
1240            return READ_DONE;
1241        }
1242    }
1243}
1244
1245static int
1246readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1247{
1248    uint32_t ui32;
1249    uint32_t msglen = msgs->incoming.length;
1250    const uint8_t id = msgs->incoming.id;
1251    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1252
1253    --msglen; /* id length */
1254
1255    if( inlen < msglen )
1256        return READ_MORE;
1257
1258    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1259
1260    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1261    {
1262        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1263        fireError( msgs, TR_ERROR );
1264        return READ_DONE;
1265    }
1266
1267    switch( id )
1268    {
1269        case BT_CHOKE:
1270            dbgmsg( msgs, "got Choke" );
1271            msgs->info->clientIsChoked = 1;
1272            cancelAllRequestsToPeer( msgs );
1273            cancelAllRequestsToClientExceptFast( msgs );
1274            break;
1275
1276        case BT_UNCHOKE:
1277            dbgmsg( msgs, "got Unchoke" );
1278            msgs->info->clientIsChoked = 0;
1279            fireNeedReq( msgs );
1280            break;
1281
1282        case BT_INTERESTED:
1283            dbgmsg( msgs, "got Interested" );
1284            msgs->info->peerIsInterested = 1;
1285            tr_peerMsgsSetChoke( msgs, 0 );
1286            break;
1287
1288        case BT_NOT_INTERESTED:
1289            dbgmsg( msgs, "got Not Interested" );
1290            msgs->info->peerIsInterested = 0;
1291            break;
1292           
1293        case BT_HAVE:
1294            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1295            dbgmsg( msgs, "got Have: %u", ui32 );
1296            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1297                fireError( msgs, TR_ERROR_PEER_MESSAGE );
1298            updatePeerProgress( msgs );
1299            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1300            break;
1301
1302        case BT_BITFIELD: {
1303            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1304            dbgmsg( msgs, "got a bitfield" );
1305            msgs->peerSentBitfield = 1;
1306            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1307            updatePeerProgress( msgs );
1308            maybeSendFastAllowedSet( msgs );
1309            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
1310            fireNeedReq( msgs );
1311            break;
1312        }
1313
1314        case BT_REQUEST: {
1315            struct peer_request r;
1316            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1317            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1318            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1319            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1320            peerMadeRequest( msgs, &r );
1321            break;
1322        }
1323
1324        case BT_CANCEL: {
1325            struct peer_request r;
1326            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1327            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1328            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1329            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1330            reqListRemove( &msgs->peerAskedForFast, &r );
1331            reqListRemove( &msgs->peerAskedFor, &r );
1332            break;
1333        }
1334
1335        case BT_PIECE:
1336            assert( 0 ); /* handled elsewhere! */
1337            break;
1338       
1339        case BT_PORT:
1340            dbgmsg( msgs, "Got a BT_PORT" );
1341            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1342            break;
1343           
1344        case BT_SUGGEST: {
1345            dbgmsg( msgs, "Got a BT_SUGGEST" );
1346            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1347            /* we don't do anything with this yet */
1348            break;
1349        }
1350           
1351        case BT_HAVE_ALL:
1352            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1353            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1354            updatePeerProgress( msgs );
1355            maybeSendFastAllowedSet( msgs );
1356            break;
1357       
1358       
1359        case BT_HAVE_NONE:
1360            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1361            tr_bitfieldClear( msgs->info->have );
1362            updatePeerProgress( msgs );
1363            maybeSendFastAllowedSet( msgs );
1364            break;
1365       
1366        case BT_REJECT: {
1367            struct peer_request r;
1368            dbgmsg( msgs, "Got a BT_REJECT" );
1369            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1370            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1371            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1372            reqListRemove( &msgs->clientAskedFor, &r );
1373            break;
1374        }
1375
1376        case BT_ALLOWED_FAST: {
1377            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1378            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1379            /* we don't do anything with this yet */
1380            break;
1381        }
1382
1383        case BT_LTEP:
1384            dbgmsg( msgs, "Got a BT_LTEP" );
1385            parseLtep( msgs, msglen, inbuf );
1386            break;
1387
1388        default:
1389            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1390            tr_peerIoDrain( msgs->io, inbuf, msglen );
1391            break;
1392    }
1393
1394    assert( msglen + 1 == msgs->incoming.length );
1395    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1396
1397    msgs->state = AWAITING_BT_LENGTH;
1398    return READ_AGAIN;
1399}
1400
1401static void
1402peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1403{
1404    const time_t now = time( NULL );
1405    tr_torrent * tor = msgs->torrent;
1406    tor->activityDate = tr_date( );
1407    tor->uploadedCur += byteCount;
1408    msgs->info->pieceDataActivityDate = now;
1409    msgs->info->credit -= byteCount;
1410    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1411    tr_rcTransferred( tor->upload, byteCount );
1412    tr_rcTransferred( tor->handle->upload, byteCount );
1413    tr_statsAddUploaded( msgs->handle, byteCount );
1414    firePieceData( msgs );
1415}
1416
1417static size_t
1418getDownloadMax( const tr_peermsgs * msgs )
1419{
1420    static const size_t maxval = ~0;
1421    const tr_torrent * tor = msgs->torrent;
1422
1423    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1424        return tor->handle->useDownloadLimit
1425            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1426
1427    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1428        return tr_rcBytesLeft( tor->download );
1429
1430    return maxval;
1431}
1432
1433static void
1434reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1435{
1436    tr_torrent * tor = msgs->torrent;
1437
1438    /* increment the `corrupt' field */
1439    tor->corruptCur += byteCount;
1440
1441    /* decrement the `downloaded' field */
1442    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1443}
1444
1445
1446static void
1447gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1448{
1449    const uint32_t byteCount =
1450        tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1451    reassignBytesToCorrupt( msgs, byteCount );
1452}
1453
1454static void
1455clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1456{
1457    reassignBytesToCorrupt( msgs, req->length );
1458}
1459
1460static void
1461addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1462{
1463    if( !msgs->info->blame )
1464         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1465    tr_bitfieldAdd( msgs->info->blame, index );
1466}
1467
1468static tr_errno
1469clientGotBlock( tr_peermsgs                * msgs,
1470                const uint8_t              * data,
1471                const struct peer_request  * req )
1472{
1473    int err;
1474    tr_torrent * tor = msgs->torrent;
1475    const int block = _tr_block( tor, req->index, req->offset );
1476
1477    assert( msgs != NULL );
1478    assert( req != NULL );
1479
1480    if( req->length != (uint32_t)tr_torBlockCountBytes( msgs->torrent, block ) )
1481    {
1482        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1483                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1484        return TR_ERROR;
1485    }
1486
1487    /* save the block */
1488    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1489
1490    /**
1491    *** Remove the block from our `we asked for this' list
1492    **/
1493
1494    if( reqListRemove( &msgs->clientAskedFor, req ) )
1495    {
1496        clientGotUnwantedBlock( msgs, req );
1497        dbgmsg( msgs, "we didn't ask for this message..." );
1498        return 0;
1499    }
1500
1501    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1502                  msgs->clientAskedFor.count );
1503
1504    /**
1505    *** Error checks
1506    **/
1507
1508    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1509        dbgmsg( msgs, "we have this block already..." );
1510        clientGotUnwantedBlock( msgs, req );
1511        return 0;
1512    }
1513
1514    /**
1515    ***  Save the block
1516    **/
1517
1518    msgs->info->peerSentPieceDataAt = time( NULL );
1519    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1520        return err;
1521
1522    tr_cpBlockAdd( tor->completion, block );
1523
1524    addPeerToBlamefield( msgs, req->index );
1525
1526    fireGotBlock( msgs, req );
1527
1528    /**
1529    ***  Handle if this was the last block in the piece
1530    **/
1531
1532    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1533    {
1534        const tr_errno err = tr_ioTestPiece( tor, req->index );
1535
1536        if( err )
1537            tr_torerr( tor, _( "Downloaded piece %lu failed its checksum test: %s" ),
1538                       (unsigned long)req->index,
1539                       tr_errorString( err ) );
1540
1541        tr_torrentSetHasPiece( tor, req->index, !err );
1542        tr_torrentSetPieceChecked( tor, req->index, TRUE );
1543        tr_peerMgrSetBlame( tor->handle->peerMgr, tor->info.hash, req->index, !err );
1544
1545        if( !err )
1546            fireClientHave( msgs, req->index );
1547        else
1548            gotBadPiece( msgs, req->index );
1549    }
1550
1551    return 0;
1552}
1553
1554static void
1555didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1556{
1557    pulse( vmsgs );
1558}
1559
1560static ReadState
1561canRead( struct bufferevent * evin, void * vmsgs )
1562{
1563    ReadState ret;
1564    tr_peermsgs * msgs = vmsgs;
1565    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1566    const size_t inlen = EVBUFFER_LENGTH( in );
1567
1568    if( !inlen )
1569    {
1570        ret = READ_DONE;
1571    }
1572    else if( msgs->state == AWAITING_BT_PIECE )
1573    {
1574        const size_t downloadMax = getDownloadMax( msgs );
1575        const size_t n = MIN( inlen, downloadMax );
1576        ret = n ? readBtPiece( msgs, in, n ) : READ_DONE;
1577    }
1578    else switch( msgs->state )
1579    {
1580        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, in, inlen ); break;
1581        case AWAITING_BT_ID:      ret = readBtId     ( msgs, in, inlen ); break;
1582        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, in, inlen ); break;
1583        default:                  assert( 0 );
1584    }
1585
1586    return ret;
1587}
1588
1589static void
1590sendKeepalive( tr_peermsgs * msgs )
1591{
1592    dbgmsg( msgs, "sending a keepalive message" );
1593    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1594}
1595
1596/**
1597***
1598**/
1599
1600static int
1601isSwiftEnabled( const tr_peermsgs * msgs )
1602{
1603    /* rationale: SWIFT is good for getting rid of deadbeats, but most
1604     * private trackers have ratios where you _want_ to feed deadbeats
1605     * as much as possible.  So we disable SWIFT on private torrents */
1606    return SWIFT_ENABLED
1607        && !tr_torrentIsSeed( msgs->torrent )
1608        && !tr_torrentIsPrivate( msgs->torrent );
1609}
1610
1611static size_t
1612getUploadMax( const tr_peermsgs * msgs )
1613{
1614    static const size_t maxval = ~0;
1615    const tr_torrent * tor = msgs->torrent;
1616    const int useSwift = isSwiftEnabled( msgs );
1617    const size_t swiftLeft = msgs->info->credit;
1618    size_t speedLeft;
1619    size_t bufLeft;
1620    size_t ret;
1621
1622    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1623        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1624    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1625        speedLeft = tr_rcBytesLeft( tor->upload );
1626    else
1627        speedLeft = ~0;
1628
1629    bufLeft = MAX_OUTBUF_SIZE - tr_peerIoWriteBytesWaiting( msgs->io );
1630    ret = MIN( speedLeft, bufLeft );
1631    if( useSwift)
1632        ret = MIN( ret, swiftLeft );
1633    return ret;
1634}
1635
1636static int
1637ratePulse( void * vmsgs )
1638{
1639    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1640    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1641    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1642    msgs->maxActiveRequests = MIN( 4 + (int)(msgs->info->rateToClient/4), MAX_QUEUE_SIZE );
1643    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1644    return TRUE;
1645}
1646
1647static tr_errno
1648popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
1649{
1650    if( !reqListPop( &msgs->peerAskedForFast, setme ) )
1651        return 0;
1652    if( !reqListPop( &msgs->peerAskedFor, setme ) )
1653        return 0;
1654
1655    return TR_ERROR;
1656}
1657
1658static int
1659pulse( void * vmsgs )
1660{
1661    const time_t now = time( NULL );
1662    tr_peermsgs * msgs = vmsgs;
1663
1664    tr_peerIoTryRead( msgs->io );
1665    pumpRequestQueue( msgs );
1666    expireOldRequests( msgs );
1667
1668    if( msgs->sendingBlock )
1669    {
1670        const size_t uploadMax = getUploadMax( msgs );
1671        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1672        const size_t outlen = MIN( len, uploadMax );
1673
1674        assert( len );
1675
1676        if( outlen )
1677        {
1678            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1679            evbuffer_drain( msgs->outBlock, outlen );
1680            peerGotBytes( msgs, outlen );
1681
1682            len -= outlen;
1683            msgs->clientSentAnythingAt = now;
1684            msgs->sendingBlock = len!=0;
1685
1686            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1687        }
1688        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1689    }
1690
1691    if( !msgs->sendingBlock )
1692    {
1693        struct peer_request req;
1694
1695        if(( EVBUFFER_LENGTH( msgs->outMessages ) ))
1696        {
1697            dbgmsg( msgs, "flushing outMessages..." );
1698            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1699            msgs->clientSentAnythingAt = now;
1700        }
1701        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1702            && !popNextRequest( msgs, &req )
1703            && requestIsValid( msgs, &req )
1704            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1705        {
1706            uint8_t * buf = tr_new( uint8_t, req.length );
1707
1708            if( !tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ) )
1709            {
1710                tr_peerIo * io = msgs->io;
1711                struct evbuffer * out = msgs->outBlock;
1712
1713                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1714                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + req.length );
1715                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1716                tr_peerIoWriteUint32( io, out, req.index );
1717                tr_peerIoWriteUint32( io, out, req.offset );
1718                tr_peerIoWriteBytes ( io, out, buf, req.length );
1719                msgs->sendingBlock = 1;
1720            }
1721
1722            tr_free( buf );
1723        }
1724        else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1725        {
1726            sendKeepalive( msgs );
1727        }
1728    }
1729
1730    return TRUE; /* loop forever */
1731}
1732
1733static void
1734gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1735{
1736    if( what & EVBUFFER_TIMEOUT )
1737        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1738    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1739        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1740                what, errno, tr_strerror(errno) );
1741    fireError( vmsgs, TR_ERROR );
1742}
1743
1744static void
1745sendBitfield( tr_peermsgs * msgs )
1746{
1747    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1748    struct evbuffer * out = msgs->outMessages;
1749
1750    dbgmsg( msgs, "sending peer a bitfield message" );
1751    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1752    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1753    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1754}
1755
1756/**
1757***
1758**/
1759
1760/* some peers give us error messages if we send
1761   more than this many peers in a single pex message */
1762#define MAX_PEX_DIFFS 200
1763
1764typedef struct
1765{
1766    tr_pex * added;
1767    tr_pex * dropped;
1768    tr_pex * elements;
1769    int addedCount;
1770    int droppedCount;
1771    int elementCount;
1772    int diffCount;
1773}
1774PexDiffs;
1775
1776static void
1777pexAddedCb( void * vpex, void * userData )
1778{
1779    PexDiffs * diffs = (PexDiffs *) userData;
1780    tr_pex * pex = (tr_pex *) vpex;
1781    if( diffs->diffCount < MAX_PEX_DIFFS )
1782    {
1783        diffs->diffCount++;
1784        diffs->added[diffs->addedCount++] = *pex;
1785        diffs->elements[diffs->elementCount++] = *pex;
1786    }
1787}
1788
1789static void
1790pexRemovedCb( void * vpex, void * userData )
1791{
1792    PexDiffs * diffs = (PexDiffs *) userData;
1793    tr_pex * pex = (tr_pex *) vpex;
1794    if( diffs->diffCount < MAX_PEX_DIFFS )
1795    {
1796        diffs->diffCount++;
1797        diffs->dropped[diffs->droppedCount++] = *pex;
1798    }
1799}
1800
1801static void
1802pexElementCb( void * vpex, void * userData )
1803{
1804    PexDiffs * diffs = (PexDiffs *) userData;
1805    tr_pex * pex = (tr_pex *) vpex;
1806    if( diffs->diffCount < MAX_PEX_DIFFS )
1807    {
1808        diffs->diffCount++;
1809        diffs->elements[diffs->elementCount++] = *pex;
1810    }
1811}
1812
1813static void
1814sendPex( tr_peermsgs * msgs )
1815{
1816    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1817    {
1818        int i;
1819        tr_pex * newPex = NULL;
1820        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1821        PexDiffs diffs;
1822        tr_benc val, *added, *dropped, *flags;
1823        uint8_t *tmp, *walk;
1824        char * benc;
1825        int bencLen;
1826
1827        /* build the diffs */
1828        diffs.added = tr_new( tr_pex, newCount );
1829        diffs.addedCount = 0;
1830        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1831        diffs.droppedCount = 0;
1832        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1833        diffs.elementCount = 0;
1834        diffs.diffCount = 0;
1835        tr_set_compare( msgs->pex, msgs->pexCount,
1836                        newPex, newCount,
1837                        tr_pexCompare, sizeof(tr_pex),
1838                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1839        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1840
1841        /* update peer */
1842        tr_free( msgs->pex );
1843        msgs->pex = diffs.elements;
1844        msgs->pexCount = diffs.elementCount;
1845
1846        /* build the pex payload */
1847        tr_bencInit( &val, TYPE_DICT );
1848        tr_bencDictReserve( &val, 3 );
1849
1850        /* "added" */
1851        added = tr_bencDictAdd( &val, "added" );
1852        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1853        for( i=0; i<diffs.addedCount; ++i ) {
1854            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1855            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1856        }
1857        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1858        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1859
1860        /* "added.f" */
1861        flags = tr_bencDictAdd( &val, "added.f" );
1862        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1863        for( i=0; i<diffs.addedCount; ++i )
1864            *walk++ = diffs.added[i].flags;
1865        assert( ( walk - tmp ) == diffs.addedCount );
1866        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1867
1868        /* "dropped" */
1869        dropped = tr_bencDictAdd( &val, "dropped" );
1870        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1871        for( i=0; i<diffs.droppedCount; ++i ) {
1872            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1873            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1874        }
1875        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1876        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1877
1878        /* write the pex message */
1879        benc = tr_bencSave( &val, &bencLen );
1880        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1881        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1882        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, msgs->ut_pex_id );
1883        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1884
1885        /* cleanup */
1886        tr_free( benc );
1887        tr_bencFree( &val );
1888        tr_free( diffs.added );
1889        tr_free( diffs.dropped );
1890        tr_free( newPex );
1891
1892        msgs->clientSentPexAt = time( NULL );
1893    }
1894}
1895
1896static int
1897pexPulse( void * vpeer )
1898{
1899    sendPex( vpeer );
1900    return TRUE;
1901}
1902
1903/**
1904***
1905**/
1906
1907tr_peermsgs*
1908tr_peerMsgsNew( struct tr_torrent * torrent,
1909                struct tr_peer    * info,
1910                tr_delivery_func    func,
1911                void              * userData,
1912                tr_publisher_tag  * setme )
1913{
1914    tr_peermsgs * m;
1915
1916    assert( info != NULL );
1917    assert( info->io != NULL );
1918
1919    m = tr_new0( tr_peermsgs, 1 );
1920    m->publisher = tr_publisherNew( );
1921    m->info = info;
1922    m->handle = torrent->handle;
1923    m->torrent = torrent;
1924    m->io = info->io;
1925    m->info->clientIsChoked = 1;
1926    m->info->peerIsChoked = 1;
1927    m->info->clientIsInterested = 0;
1928    m->info->peerIsInterested = 0;
1929    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1930    m->state = AWAITING_BT_LENGTH;
1931    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1932    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1933    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1934    m->outMessages = evbuffer_new( );
1935    m->incoming.block = evbuffer_new( );
1936    m->outBlock = evbuffer_new( );
1937    m->peerAllowedPieces = NULL;
1938    m->peerAskedFor = REQUEST_LIST_INIT;
1939    m->peerAskedForFast = REQUEST_LIST_INIT;
1940    m->clientAskedFor = REQUEST_LIST_INIT;
1941    m->clientWillAskFor = REQUEST_LIST_INIT;
1942    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1943
1944    if ( tr_peerIoSupportsLTEP( m->io ) )
1945        sendLtepHandshake( m );
1946
1947    /* bitfield/have-all/have-none must preceed other non-handshake messages... */
1948    if ( !tr_peerIoSupportsFEXT( m->io ) )
1949        sendBitfield( m );
1950    else {
1951        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1952        float completion = tr_cpPercentComplete( m->torrent->completion );
1953        if ( completion == 0.0f ) {
1954            sendFastHave( m, 0 );
1955        } else if ( completion == 1.0f ) {
1956            sendFastHave( m, 1 );
1957        } else {
1958            sendBitfield( m );
1959        }
1960    }
1961   
1962    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1963    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1964    ratePulse( m );
1965
1966    return m;
1967}
1968
1969void
1970tr_peerMsgsFree( tr_peermsgs* msgs )
1971{
1972    if( msgs != NULL )
1973    {
1974        tr_timerFree( &msgs->pulseTimer );
1975        tr_timerFree( &msgs->rateTimer );
1976        tr_timerFree( &msgs->pexTimer );
1977        tr_publisherFree( &msgs->publisher );
1978        reqListClear( &msgs->clientWillAskFor );
1979        reqListClear( &msgs->clientAskedFor );
1980        reqListClear( &msgs->peerAskedForFast );
1981        reqListClear( &msgs->peerAskedFor );
1982        tr_bitfieldFree( msgs->peerAllowedPieces );
1983        evbuffer_free( msgs->incoming.block );
1984        evbuffer_free( msgs->outMessages );
1985        evbuffer_free( msgs->outBlock );
1986        tr_free( msgs->pex );
1987
1988        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1989        tr_free( msgs );
1990    }
1991}
1992
1993tr_publisher_tag
1994tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1995                      tr_delivery_func    func,
1996                      void              * userData )
1997{
1998    return tr_publisherSubscribe( peer->publisher, func, userData );
1999}
2000
2001void
2002tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
2003                        tr_publisher_tag    tag )
2004{
2005    tr_publisherUnsubscribe( peer->publisher, tag );
2006}
Note: See TracBrowser for help on using the repository browser.