source: trunk/libtransmission/peer-msgs.c @ 5498

Last change on this file since 5498 was 5498, checked in by charles, 14 years ago

hack on IPC a bit because it's been too long since I broke it

  • Property svn:keywords set to Date Rev Author Id
File size: 56.3 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 5498 2008-04-03 21:38:32Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18#include <libgen.h> /* basename */
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "inout.h"
26#include "peer-io.h"
27#include "peer-mgr.h"
28#include "peer-mgr-private.h"
29#include "peer-msgs.h"
30#include "ratecontrol.h"
31#include "stats.h"
32#include "torrent.h"
33#include "trevent.h"
34#include "utils.h"
35
36/**
37***
38**/
39
40enum
41{
42    BT_CHOKE                = 0,
43    BT_UNCHOKE              = 1,
44    BT_INTERESTED           = 2,
45    BT_NOT_INTERESTED       = 3,
46    BT_HAVE                 = 4,
47    BT_BITFIELD             = 5,
48    BT_REQUEST              = 6,
49    BT_PIECE                = 7,
50    BT_CANCEL               = 8,
51    BT_PORT                 = 9,
52    BT_SUGGEST              = 13,
53    BT_HAVE_ALL             = 14,
54    BT_HAVE_NONE            = 15,
55    BT_REJECT               = 16,
56    BT_ALLOWED_FAST         = 17,
57    BT_LTEP                 = 20,
58
59    LTEP_HANDSHAKE          = 0,
60
61    TR_LTEP_PEX             = 1,
62
63    MIN_CHOKE_PERIOD_SEC    = (10),
64
65    /* idle seconds before we send a keepalive */
66    KEEPALIVE_INTERVAL_SECS = 90,
67
68    PEX_INTERVAL            = (60 * 1000), /* msec between sendPex() calls */
69    PEER_PULSE_INTERVAL     = (100),       /* msec between pulse() calls */
70    RATE_PULSE_INTERVAL     = (250),       /* msec between ratePulse() calls */
71
72    MAX_QUEUE_SIZE          = (100),
73    MAX_OUTBUF_SIZE         = (1024),
74     
75    /* (fast peers) max number of pieces we fast-allow to another peer */
76    MAX_FAST_ALLOWED_COUNT   = 10,
77
78    /* (fast peers) max threshold for allowing fast-pieces requests */
79    MAX_FAST_ALLOWED_THRESHOLD = 10,
80
81    /* how long an unsent request can stay queued before it's returned
82       back to the peer-mgr's pool of requests */
83    QUEUED_REQUEST_TTL_SECS = 20,
84
85    /* how long a sent request can stay queued before it's returned
86       back to the peer-mgr's pool of requests */
87    SENT_REQUEST_TTL_SECS = 120
88};
89
90/**
91***  REQUEST MANAGEMENT
92**/
93
94enum
95{
96    AWAITING_BT_LENGTH,
97    AWAITING_BT_ID,
98    AWAITING_BT_MESSAGE,
99    AWAITING_BT_PIECE
100};
101
102struct peer_request
103{
104    uint32_t index;
105    uint32_t offset;
106    uint32_t length;
107    time_t time_requested;
108};
109
110static int
111compareRequest( const void * va, const void * vb )
112{
113    int i;
114    const struct peer_request * a = va;
115    const struct peer_request * b = vb;
116    if(( i = tr_compareUint32( a->index, b->index ))) return i;
117    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
118    if(( i = tr_compareUint32( a->length, b->length ))) return i;
119    return 0;
120}
121
122struct request_list
123{
124    uint16_t count;
125    uint16_t max;
126    struct peer_request * requests;
127};
128
129static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
130
131static void
132reqListReserve( struct request_list * list, uint16_t max )
133{
134    if( list->max < max )
135    {
136        list->max = max;
137        list->requests = tr_renew( struct peer_request,
138                                   list->requests,
139                                   list->max );
140    }
141}
142
143static void
144reqListClear( struct request_list * list )
145{
146    tr_free( list->requests );
147    *list = REQUEST_LIST_INIT;
148}
149
150static void
151reqListCopy( struct request_list * dest, const struct request_list * src )
152{
153    dest->count = src->count;
154    dest->max = src->max;
155    dest->requests = tr_new( struct peer_request, dest->max );
156    memcpy( dest->requests, src->requests, sizeof( struct peer_request ) * dest->count );
157}
158
159static void
160reqListRemoveOne( struct request_list * list, int i )
161{
162    assert( 0<=i && i<list->count );
163
164    memmove( &list->requests[i],
165             &list->requests[i+1],
166             sizeof( struct peer_request ) * ( --list->count - i ) );
167}
168
169static void
170reqListAppend( struct request_list * list, const struct peer_request * req )
171{
172    if( ++list->count >= list->max )
173        reqListReserve( list, list->max + 8 );
174
175    list->requests[list->count-1] = *req;
176}
177
178static tr_errno
179reqListPop( struct request_list * list, struct peer_request * setme )
180{
181    tr_errno err;
182
183    if( !list->count )
184        err = TR_ERROR;
185    else {
186        *setme = list->requests[0];
187        reqListRemoveOne( list, 0 );
188        err = TR_OK;
189    }
190
191    return err;
192}
193
194static int
195reqListFind( struct request_list * list, const struct peer_request * key )
196{
197    uint16_t i;
198    for( i=0; i<list->count; ++i )
199        if( !compareRequest( key, list->requests+i ) )
200            return i;
201    return -1;
202}
203
204static tr_errno
205reqListRemove( struct request_list * list, const struct peer_request * key )
206{
207    tr_errno err;
208    const int i = reqListFind( list, key );
209
210    if( i < 0 )
211        err = TR_ERROR;
212    else {
213        err = TR_OK;
214        reqListRemoveOne( list, i );
215    }
216
217    return err;
218}
219
220/**
221***
222**/
223
224/* this is raw, unchanged data from the peer regarding
225 * the current message that it's sending us. */
226struct tr_incoming
227{
228    uint8_t id;
229    uint32_t length; /* includes the +1 for id length */
230    struct peer_request blockReq; /* metadata for incoming blocks */
231    struct evbuffer * block; /* piece data for incoming blocks */
232};
233
234struct tr_peermsgs
235{
236    unsigned int peerSentBitfield         : 1;
237    unsigned int peerSupportsPex          : 1;
238    unsigned int clientSentLtepHandshake  : 1;
239    unsigned int peerSentLtepHandshake    : 1;
240    unsigned int sendingBlock             : 1;
241   
242    uint8_t state;
243    uint8_t ut_pex_id;
244    uint16_t pexCount;
245    uint16_t maxActiveRequests;
246    uint16_t minActiveRequests;
247
248    tr_peer * info;
249
250    tr_handle * handle;
251    tr_torrent * torrent;
252    tr_peerIo * io;
253
254    tr_publisher_t * publisher;
255
256    struct evbuffer * outBlock;    /* buffer of the current piece message */
257    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
258
259    struct request_list peerAskedFor;
260    struct request_list peerAskedForFast;
261    struct request_list clientAskedFor;
262    struct request_list clientWillAskFor;
263
264    tr_timer * rateTimer;
265    tr_timer * pulseTimer;
266    tr_timer * pexTimer;
267
268    time_t clientSentPexAt;
269    time_t clientSentAnythingAt;
270   
271    tr_bitfield * peerAllowedPieces;
272
273    struct tr_incoming incoming; 
274
275    tr_pex * pex;
276};
277
278/**
279***
280**/
281
282static void
283myDebug( const char * file, int line,
284         const struct tr_peermsgs * msgs,
285         const char * fmt, ... )
286{
287    FILE * fp = tr_getLog( );
288    if( fp != NULL )
289    {
290        va_list args;
291        char timestr[64];
292        struct evbuffer * buf = evbuffer_new( );
293        char * myfile = tr_strdup( file );
294
295        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
296                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
297                             msgs->torrent->info.name,
298                             tr_peerIoGetAddrStr( msgs->io ),
299                             msgs->info->client );
300        va_start( args, fmt );
301        evbuffer_add_vprintf( buf, fmt, args );
302        va_end( args );
303        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
304        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
305
306        tr_free( myfile );
307        evbuffer_free( buf );
308    }
309}
310
311#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
312
313/**
314***
315**/
316
317static void
318protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
319{
320    tr_peerIo * io = msgs->io;
321    struct evbuffer * out = msgs->outMessages;
322
323    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
324    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
325    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
326    tr_peerIoWriteUint32( io, out, req->index );
327    tr_peerIoWriteUint32( io, out, req->offset );
328    tr_peerIoWriteUint32( io, out, req->length );
329}
330
331static void
332protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
333{
334    tr_peerIo * io = msgs->io;
335    struct evbuffer * out = msgs->outMessages;
336
337    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
338    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
339    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
340    tr_peerIoWriteUint32( io, out, req->index );
341    tr_peerIoWriteUint32( io, out, req->offset );
342    tr_peerIoWriteUint32( io, out, req->length );
343}
344
345static void
346protocolSendHave( tr_peermsgs * msgs, uint32_t index )
347{
348    tr_peerIo * io = msgs->io;
349    struct evbuffer * out = msgs->outMessages;
350
351    dbgmsg( msgs, "sending Have %u", index );
352    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
353    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
354    tr_peerIoWriteUint32( io, out, index );
355}
356
357static void
358protocolSendChoke( tr_peermsgs * msgs, int choke )
359{
360    tr_peerIo * io = msgs->io;
361    struct evbuffer * out = msgs->outMessages;
362
363    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
364    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
365    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
366}
367
368/**
369***  EVENTS
370**/
371
372static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
373
374static void
375publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
376{
377    tr_publisherPublish( msgs->publisher, msgs->info, e );
378}
379
380static void
381fireError( tr_peermsgs * msgs, tr_errno err )
382{
383    tr_peermsgs_event e = blankEvent;
384    e.eventType = TR_PEERMSG_ERROR;
385    e.err = err;
386    publish( msgs, &e );
387}
388
389static void
390fireNeedReq( tr_peermsgs * msgs )
391{
392    tr_peermsgs_event e = blankEvent;
393    e.eventType = TR_PEERMSG_NEED_REQ;
394    publish( msgs, &e );
395}
396
397static void
398firePeerProgress( tr_peermsgs * msgs )
399{
400    tr_peermsgs_event e = blankEvent;
401    e.eventType = TR_PEERMSG_PEER_PROGRESS;
402    e.progress = msgs->info->progress;
403    publish( msgs, &e );
404}
405
406static void
407fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
408{
409    tr_peermsgs_event e = blankEvent;
410    e.eventType = TR_PEERMSG_CLIENT_HAVE;
411    e.pieceIndex = pieceIndex;
412    publish( msgs, &e );
413}
414
415static void
416fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
417{
418    tr_peermsgs_event e = blankEvent;
419    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
420    e.pieceIndex = req->index;
421    e.offset = req->offset;
422    e.length = req->length;
423    publish( msgs, &e );
424}
425
426static void
427firePieceData( tr_peermsgs * msgs )
428{
429    tr_peermsgs_event e = blankEvent;
430    e.eventType = TR_PEERMSG_PIECE_DATA;
431    publish( msgs, &e );
432}
433
434static void
435fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
436{
437    tr_peermsgs_event e = blankEvent;
438    e.eventType = TR_PEERMSG_CANCEL;
439    e.pieceIndex = req->index;
440    e.offset = req->offset;
441    e.length = req->length;
442    publish( msgs, &e );
443}
444
445/**
446***  INTEREST
447**/
448
449static int
450isPieceInteresting( const tr_peermsgs   * peer,
451                    int                   piece )
452{
453    const tr_torrent * torrent = peer->torrent;
454
455    return ( ( !torrent->info.pieces[piece].dnd )               /* we want it */
456          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
457          && ( tr_bitfieldHas( peer->info->have, piece ) ) );  /* peer has it */
458}
459
460/* "interested" means we'll ask for piece data if they unchoke us */
461static int
462isPeerInteresting( const tr_peermsgs * msgs )
463{
464    tr_piece_index_t i;
465    const tr_torrent * torrent;
466    const tr_bitfield * bitfield;
467    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
468
469    if( clientIsSeed )
470        return FALSE;
471
472    torrent = msgs->torrent;
473    bitfield = tr_cpPieceBitfield( torrent->completion );
474
475    if( !msgs->info->have )
476        return TRUE;
477
478    assert( bitfield->len == msgs->info->have->len );
479    for( i=0; i<torrent->info.pieceCount; ++i )
480        if( isPieceInteresting( msgs, i ) )
481            return TRUE;
482
483    return FALSE;
484}
485
486static void
487sendInterest( tr_peermsgs * msgs, int weAreInterested )
488{
489    assert( msgs != NULL );
490    assert( weAreInterested==0 || weAreInterested==1 );
491
492    msgs->info->clientIsInterested = weAreInterested;
493    dbgmsg( msgs, "Sending %s",
494            weAreInterested ? "Interested" : "Not Interested");
495
496    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
497    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
498                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
499}
500
501static void
502updateInterest( tr_peermsgs * msgs )
503{
504    const int i = isPeerInteresting( msgs );
505    if( i != msgs->info->clientIsInterested )
506        sendInterest( msgs, i );
507    if( i )
508        fireNeedReq( msgs );
509}
510
511static void
512cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
513{
514    reqListClear( &msgs->peerAskedFor );
515}
516
517void
518tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
519{
520    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
521
522    assert( msgs != NULL );
523    assert( msgs->info != NULL );
524    assert( choke==0 || choke==1 );
525
526    if( msgs->info->chokeChangedAt > fibrillationTime )
527    {
528        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
529    }
530    else if( msgs->info->peerIsChoked != choke )
531    {
532        msgs->info->peerIsChoked = choke;
533        if( choke )
534            cancelAllRequestsToClientExceptFast( msgs );
535        protocolSendChoke( msgs, choke );
536        msgs->info->chokeChangedAt = time( NULL );
537    }
538}
539
540/**
541***
542**/
543
544void
545tr_peerMsgsHave( tr_peermsgs * msgs,
546                 uint32_t      index )
547{
548    protocolSendHave( msgs, index );
549
550    /* since we have more pieces now, we might not be interested in this peer */
551    updateInterest( msgs );
552}
553#if 0
554static void
555sendFastSuggest( tr_peermsgs * msgs,
556                 uint32_t      pieceIndex )
557{
558    assert( msgs != NULL );
559   
560    if( tr_peerIoSupportsFEXT( msgs->io ) )
561    {
562        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
563        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
564        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
565    }
566}
567#endif
568static void
569sendFastHave( tr_peermsgs * msgs, int all )
570{
571    assert( msgs != NULL );
572   
573    if( tr_peerIoSupportsFEXT( msgs->io ) )
574    {
575        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
576        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
577                                                                : BT_HAVE_NONE ) );
578        updateInterest( msgs );
579    }
580}
581
582static void
583sendFastReject( tr_peermsgs * msgs,
584                uint32_t      pieceIndex,
585                uint32_t      offset,
586                uint32_t      length )
587{
588    assert( msgs != NULL );
589
590    if( tr_peerIoSupportsFEXT( msgs->io ) )
591    {
592        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
593        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
594        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
595        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
596        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
597        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
598    }
599}
600
601static tr_bitfield*
602getPeerAllowedPieces( tr_peermsgs * msgs )
603{
604    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
605    {
606        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
607            MAX_FAST_ALLOWED_COUNT,
608            msgs->torrent->info.pieceCount,
609            msgs->torrent->info.hash,
610            tr_peerIoGetAddress( msgs->io, NULL ) );
611    }
612
613    return msgs->peerAllowedPieces;
614}
615
616static void
617sendFastAllowed( tr_peermsgs * msgs,
618                 uint32_t      pieceIndex)
619{
620    assert( msgs != NULL );
621   
622    if( tr_peerIoSupportsFEXT( msgs->io ) )
623    {
624        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
625        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
626        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
627    }
628}
629
630static void
631sendFastAllowedSet( tr_peermsgs * msgs )
632{
633    tr_piece_index_t i = 0;
634
635    while (i <= msgs->torrent->info.pieceCount )
636    {
637        if ( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i) )
638            sendFastAllowed( msgs, i );
639        i++;
640    }
641}
642
643static void
644maybeSendFastAllowedSet( tr_peermsgs * msgs )
645{
646    if( tr_bitfieldCountTrueBits( msgs->info->have ) <= MAX_FAST_ALLOWED_THRESHOLD )
647        sendFastAllowedSet( msgs );
648}
649
650
651/**
652***
653**/
654
655static int
656reqIsValid( const tr_peermsgs   * msgs,
657            uint32_t              index,
658            uint32_t              offset,
659            uint32_t              length )
660{
661    return tr_torrentReqIsValid( msgs->torrent, index, offset, length );
662}
663
664static int
665requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
666{
667    return reqIsValid( msgs, req->index, req->offset, req->length );
668}
669
670static void
671expireOldRequests( tr_peermsgs * msgs )
672{
673    int i;
674    time_t oldestAllowed;
675    struct request_list tmp = REQUEST_LIST_INIT;
676
677    /* cancel requests that have been queued for too long */
678    oldestAllowed = time( NULL ) - QUEUED_REQUEST_TTL_SECS;
679    reqListCopy( &tmp, &msgs->clientWillAskFor );
680    for( i=0; i<tmp.count; ++i ) {
681        const struct peer_request * req = &tmp.requests[i];
682        if( req->time_requested < oldestAllowed )
683            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
684    }
685    reqListClear( &tmp );
686
687    /* cancel requests that were sent too long ago */
688    oldestAllowed = time( NULL ) - SENT_REQUEST_TTL_SECS;
689    reqListCopy( &tmp, &msgs->clientAskedFor );
690    for( i=0; i<tmp.count; ++i ) {
691        const struct peer_request * req = &tmp.requests[i];
692        if( req->time_requested < oldestAllowed )
693            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
694    }
695    reqListClear( &tmp );
696}
697
698static void
699pumpRequestQueue( tr_peermsgs * msgs )
700{
701    const int max = msgs->maxActiveRequests;
702    const int min = msgs->minActiveRequests;
703    const time_t now = time( NULL );
704    int sent = 0;
705    int count = msgs->clientAskedFor.count;
706    struct peer_request req;
707
708    if( count > min )
709        return;
710    if( msgs->info->clientIsChoked )
711        return;
712
713    while( ( count < max ) && !reqListPop( &msgs->clientWillAskFor, &req ) )
714    {
715        assert( requestIsValid( msgs, &req ) );
716        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
717
718        protocolSendRequest( msgs, &req );
719        req.time_requested = now;
720        reqListAppend( &msgs->clientAskedFor, &req );
721
722        ++count;
723        ++sent;
724    }
725
726    if( sent )
727        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
728                sent,
729                msgs->clientAskedFor.count,
730                msgs->clientWillAskFor.count );
731
732    if( count < max )
733        fireNeedReq( msgs );
734}
735
736static int
737pulse( void * vmsgs );
738
739int
740tr_peerMsgsAddRequest( tr_peermsgs * msgs,
741                       uint32_t      index, 
742                       uint32_t      offset, 
743                       uint32_t      length )
744{
745    const int req_max = msgs->maxActiveRequests;
746    struct peer_request req;
747
748    assert( msgs != NULL );
749    assert( msgs->torrent != NULL );
750    assert( reqIsValid( msgs, index, offset, length ) );
751
752    /**
753    ***  Reasons to decline the request
754    **/
755
756    /* don't send requests to choked clients */
757    if( msgs->info->clientIsChoked ) {
758        dbgmsg( msgs, "declining request because they're choking us" );
759        return TR_ADDREQ_CLIENT_CHOKED;
760    }
761
762    /* peer doesn't have this piece */
763    if( !tr_bitfieldHas( msgs->info->have, index ) )
764        return TR_ADDREQ_MISSING;
765
766    /* peer's queue is full */
767    if( msgs->clientWillAskFor.count >= req_max ) {
768        dbgmsg( msgs, "declining request because we're full" );
769        return TR_ADDREQ_FULL;
770    }
771
772    /* have we already asked for this piece? */
773    req.index = index;
774    req.offset = offset;
775    req.length = length;
776    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
777        dbgmsg( msgs, "declining because it's a duplicate" );
778        return TR_ADDREQ_DUPLICATE;
779    }
780    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
781        dbgmsg( msgs, "declining because it's a duplicate" );
782        return TR_ADDREQ_DUPLICATE;
783    }
784
785    /**
786    ***  Accept this request
787    **/
788
789    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
790    req.time_requested = time( NULL );
791    reqListAppend( &msgs->clientWillAskFor, &req );
792    return TR_ADDREQ_OK;
793}
794
795static void
796cancelAllRequestsToPeer( tr_peermsgs * msgs )
797{
798    int i;
799    struct request_list a = msgs->clientWillAskFor;
800    struct request_list b = msgs->clientAskedFor;
801
802    msgs->clientAskedFor = REQUEST_LIST_INIT;
803    msgs->clientWillAskFor = REQUEST_LIST_INIT;
804
805    for( i=0; i<a.count; ++i )
806        fireCancelledReq( msgs, &a.requests[i] );
807
808    for( i=0; i<b.count; ++i ) {
809        fireCancelledReq( msgs, &b.requests[i] );
810        protocolSendCancel( msgs, &b.requests[i] );
811    }
812
813    reqListClear( &a );
814    reqListClear( &b );
815}
816
817void
818tr_peerMsgsCancel( tr_peermsgs * msgs,
819                   uint32_t      pieceIndex,
820                   uint32_t      offset,
821                   uint32_t      length )
822{
823    struct peer_request req;
824
825    assert( msgs != NULL );
826    assert( length > 0 );
827
828    /* have we asked the peer for this piece? */
829    req.index = pieceIndex;
830    req.offset = offset;
831    req.length = length;
832
833    /* if it's only in the queue and hasn't been sent yet, free it */
834    if( !reqListRemove( &msgs->clientWillAskFor, &req ) )
835        fireCancelledReq( msgs, &req );
836
837    /* if it's already been sent, send a cancel message too */
838    if( !reqListRemove( &msgs->clientAskedFor, &req ) ) {
839        protocolSendCancel( msgs, &req );
840        fireCancelledReq( msgs, &req );
841    }
842}
843
844/**
845***
846**/
847
848static void
849sendLtepHandshake( tr_peermsgs * msgs )
850{
851    tr_benc val, *m;
852    char * buf;
853    int len;
854    int pex;
855    const char * v = TR_NAME " " USERAGENT_PREFIX;
856    const int port = tr_getPublicPort( msgs->handle );
857    struct evbuffer * outbuf;
858
859    if( msgs->clientSentLtepHandshake )
860        return;
861
862    outbuf = evbuffer_new( );
863    dbgmsg( msgs, "sending an ltep handshake" );
864    msgs->clientSentLtepHandshake = 1;
865
866    /* decide if we want to advertise pex support */
867    if( !tr_torrentAllowsPex( msgs->torrent ) )
868        pex = 0;
869    else if( msgs->peerSentLtepHandshake )
870        pex = msgs->peerSupportsPex ? 1 : 0;
871    else
872        pex = 1;
873
874    tr_bencInitDict( &val, 4 );
875    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
876    m  = tr_bencDictAdd( &val, "m" );
877    tr_bencInit( m, TYPE_DICT );
878    if( pex ) {
879        tr_bencDictReserve( m, 1 );
880        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), TR_LTEP_PEX );
881    }
882    if( port > 0 )
883        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
884    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
885    buf = tr_bencSave( &val, &len );
886
887    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
888    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
889    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
890    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
891
892    tr_peerIoWriteBuf( msgs->io, outbuf );
893
894#if 0
895    dbgmsg( msgs, "here is the ltep handshake we sent:" );
896    tr_bencPrint( &val );
897    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
898#endif
899
900    /* cleanup */
901    tr_bencFree( &val );
902    tr_free( buf );
903    evbuffer_free( outbuf );
904}
905
906static void
907parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
908{
909    tr_benc val, * sub;
910    uint8_t * tmp = tr_new( uint8_t, len );
911
912    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
913    msgs->peerSentLtepHandshake = 1;
914
915    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
916        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
917        tr_free( tmp );
918        return;
919    }
920
921#if 0
922    dbgmsg( msgs, "here is the ltep handshake we read:" );
923    tr_bencPrint( &val );
924    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
925#endif
926
927    /* does the peer prefer encrypted connections? */
928    if(( sub = tr_bencDictFindType( &val, "e", TYPE_INT )))
929        msgs->info->encryption_preference = sub->val.i
930                                      ? ENCRYPTION_PREFERENCE_YES
931                                      : ENCRYPTION_PREFERENCE_NO;
932
933    /* check supported messages for utorrent pex */
934    msgs->peerSupportsPex = 0;
935    if(( sub = tr_bencDictFindType( &val, "m", TYPE_DICT ))) {
936        if(( sub = tr_bencDictFindType( sub, "ut_pex", TYPE_INT ))) {
937            msgs->ut_pex_id = (uint8_t) sub->val.i;
938            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
939            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
940        }
941    }
942
943    /* get peer's listening port */
944    if(( sub = tr_bencDictFindType( &val, "p", TYPE_INT ))) {
945        msgs->info->port = htons( (uint16_t)sub->val.i );
946        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
947    }
948
949    tr_bencFree( &val );
950    tr_free( tmp );
951}
952
953static void
954parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
955{
956    int loaded = 0;
957    uint8_t * tmp = tr_new( uint8_t, msglen );
958    tr_benc val, *sub;
959    const tr_torrent * tor = msgs->torrent;
960    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
961
962    if( tr_torrentAllowsPex( tor )
963        && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
964        && (( sub = tr_bencDictFindType( &val, "added", TYPE_STR ))))
965    {
966        const int n = sub->val.s.i / 6 ;
967        if( n )
968            tr_tordbg( tor, _( "Got %d peers from peer exchange" ), n );
969        tr_peerMgrAddPeers( msgs->handle->peerMgr,
970                            tor->info.hash,
971                            TR_PEER_FROM_PEX,
972                            (uint8_t*)sub->val.s.s, n );
973    }
974
975    if( loaded )
976        tr_bencFree( &val );
977    tr_free( tmp );
978}
979
980static void
981sendPex( tr_peermsgs * msgs );
982
983static void
984parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
985{
986    uint8_t ltep_msgid;
987
988    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
989    msglen--;
990
991    if( ltep_msgid == LTEP_HANDSHAKE )
992    {
993        dbgmsg( msgs, "got ltep handshake" );
994        parseLtepHandshake( msgs, msglen, inbuf );
995        if( tr_peerIoSupportsLTEP( msgs->io ) )
996        {
997            sendLtepHandshake( msgs );
998            sendPex( msgs );
999        }
1000    }
1001    else if( ltep_msgid == TR_LTEP_PEX )
1002    {
1003        dbgmsg( msgs, "got ut pex" );
1004        msgs->peerSupportsPex = 1;
1005        parseUtPex( msgs, msglen, inbuf );
1006    }
1007    else
1008    {
1009        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1010        evbuffer_drain( inbuf, msglen );
1011    }
1012}
1013
1014static int
1015readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1016{
1017    uint32_t len;
1018
1019    if( inlen < sizeof(len) )
1020        return READ_MORE;
1021
1022    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1023
1024    if( len == 0 ) /* peer sent us a keepalive message */
1025        dbgmsg( msgs, "got KeepAlive" );
1026    else {
1027        msgs->incoming.length = len;
1028        msgs->state = AWAITING_BT_ID;
1029    }
1030
1031    return READ_AGAIN;
1032}
1033
1034static int
1035readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
1036
1037static int
1038readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1039{
1040    uint8_t id;
1041
1042    if( inlen < sizeof(uint8_t) )
1043        return READ_MORE;
1044
1045    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1046    msgs->incoming.id = id;
1047
1048    if( id==BT_PIECE )
1049    {
1050        msgs->state = AWAITING_BT_PIECE;
1051        return READ_AGAIN;
1052    }
1053    else if( msgs->incoming.length != 1 )
1054    {
1055        msgs->state = AWAITING_BT_MESSAGE;
1056        return READ_AGAIN;
1057    }
1058    else return readBtMessage( msgs, inbuf, inlen-1 );
1059}
1060
1061static void
1062updatePeerProgress( tr_peermsgs * msgs )
1063{
1064    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1065                         / (float)msgs->torrent->info.pieceCount;
1066    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1067    updateInterest( msgs );
1068    firePeerProgress( msgs );
1069}
1070
1071static int
1072clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1073{
1074    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1075    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
1076        return FALSE;
1077   
1078    /* ...or if we don't have ourself enough pieces */
1079    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
1080        return FALSE;
1081
1082    /* Maybe a bandwidth limit ? */
1083    return TRUE;
1084}
1085
1086static void
1087peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1088{
1089    const int reqIsValid = requestIsValid( msgs, req );
1090    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1091    const int peerIsChoked = msgs->info->peerIsChoked;
1092    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1093    const int pieceIsFast = reqIsValid && tr_bitfieldHas( getPeerAllowedPieces( msgs ), req->index );
1094    const int canSendFast = clientCanSendFastBlock( msgs );
1095
1096    if( !reqIsValid ) /* bad request */
1097    {
1098        dbgmsg( msgs, "rejecting an invalid request." );
1099        sendFastReject( msgs, req->index, req->offset, req->length );
1100    }
1101    else if( !clientHasPiece ) /* we don't have it */
1102    {
1103        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1104        sendFastReject( msgs, req->index, req->offset, req->length );
1105    }
1106    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1107    {
1108        tr_peerMsgsSetChoke( msgs, 1 );
1109        sendFastReject( msgs, req->index, req->offset, req->length );
1110    }
1111    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1112    {
1113        sendFastReject( msgs, req->index, req->offset, req->length );
1114    }
1115    else /* YAY */
1116    {
1117        if( peerIsFast && pieceIsFast )
1118            reqListAppend( &msgs->peerAskedForFast, req );
1119        else
1120            reqListAppend( &msgs->peerAskedFor, req );
1121    }
1122}
1123
1124static int
1125messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1126{
1127    switch( id )
1128    {
1129        case BT_CHOKE:
1130        case BT_UNCHOKE:
1131        case BT_INTERESTED:
1132        case BT_NOT_INTERESTED:
1133        case BT_HAVE_ALL:
1134        case BT_HAVE_NONE:
1135            return len==1;
1136
1137        case BT_HAVE:
1138        case BT_SUGGEST:
1139        case BT_ALLOWED_FAST:
1140            return len==5;
1141
1142        case BT_BITFIELD:
1143            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1144       
1145        case BT_REQUEST:
1146        case BT_CANCEL:
1147        case BT_REJECT:
1148            return len==13;
1149
1150        case BT_PIECE:
1151            return len>9 && len<=16393;
1152
1153        case BT_PORT:
1154            return len==3;
1155
1156        case BT_LTEP:
1157            return len >= 2;
1158
1159        default:
1160            return FALSE;
1161    }
1162}
1163
1164static int
1165clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1166
1167static void
1168clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1169{
1170    const time_t now = time( NULL );
1171    tr_torrent * tor = msgs->torrent;
1172    tor->activityDate = tr_date( );
1173    tor->downloadedCur += byteCount;
1174    msgs->info->pieceDataActivityDate = now;
1175    msgs->info->credit += (int)(byteCount * SWIFT_REPAYMENT_RATIO);
1176    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1177    tr_rcTransferred( tor->download, byteCount );
1178    tr_rcTransferred( tor->handle->download, byteCount );
1179    tr_statsAddDownloaded( msgs->handle, byteCount );
1180    firePieceData( msgs );
1181}
1182
1183static int
1184readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1185{
1186    struct peer_request * req = &msgs->incoming.blockReq;
1187    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1188    dbgmsg( msgs, "In readBtPiece" );
1189
1190    if( !req->length )
1191    {
1192        if( inlen < 8 )
1193            return READ_MORE;
1194
1195        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1196        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1197        req->length = msgs->incoming.length - 9;
1198        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1199        return READ_AGAIN;
1200    }
1201    else
1202    {
1203        int err;
1204
1205        /* read in another chunk of data */
1206        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1207        size_t n = MIN( nLeft, inlen );
1208        uint8_t * buf = tr_new( uint8_t, n );
1209        assert( EVBUFFER_LENGTH(inbuf) >= n );
1210        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1211        evbuffer_add( msgs->incoming.block, buf, n );
1212        clientGotBytes( msgs, n );
1213        tr_free( buf );
1214        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1215               (int)n, req->index, req->offset, req->length,
1216               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1217        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1218            return READ_MORE;
1219
1220        /* we've got the whole block ... process it */
1221        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1222
1223        /* cleanup */
1224        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1225        req->length = 0;
1226        msgs->state = AWAITING_BT_LENGTH;
1227        if( !err )
1228            return READ_AGAIN;
1229        else {
1230            fireError( msgs, err );
1231            return READ_DONE;
1232        }
1233    }
1234}
1235
1236static int
1237readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1238{
1239    uint32_t ui32;
1240    uint32_t msglen = msgs->incoming.length;
1241    const uint8_t id = msgs->incoming.id;
1242    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1243
1244    --msglen; /* id length */
1245
1246    if( inlen < msglen )
1247        return READ_MORE;
1248
1249    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1250
1251    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1252    {
1253        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1254        fireError( msgs, TR_ERROR );
1255        return READ_DONE;
1256    }
1257
1258    switch( id )
1259    {
1260        case BT_CHOKE:
1261            dbgmsg( msgs, "got Choke" );
1262            msgs->info->clientIsChoked = 1;
1263            cancelAllRequestsToPeer( msgs );
1264            cancelAllRequestsToClientExceptFast( msgs );
1265            break;
1266
1267        case BT_UNCHOKE:
1268            dbgmsg( msgs, "got Unchoke" );
1269            msgs->info->clientIsChoked = 0;
1270            fireNeedReq( msgs );
1271            break;
1272
1273        case BT_INTERESTED:
1274            dbgmsg( msgs, "got Interested" );
1275            msgs->info->peerIsInterested = 1;
1276            tr_peerMsgsSetChoke( msgs, 0 );
1277            break;
1278
1279        case BT_NOT_INTERESTED:
1280            dbgmsg( msgs, "got Not Interested" );
1281            msgs->info->peerIsInterested = 0;
1282            break;
1283           
1284        case BT_HAVE:
1285            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1286            dbgmsg( msgs, "got Have: %u", ui32 );
1287            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1288                fireError( msgs, TR_ERROR_PEER_MESSAGE );
1289            updatePeerProgress( msgs );
1290            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1291            break;
1292
1293        case BT_BITFIELD: {
1294            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1295            dbgmsg( msgs, "got a bitfield" );
1296            msgs->peerSentBitfield = 1;
1297            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1298            updatePeerProgress( msgs );
1299            maybeSendFastAllowedSet( msgs );
1300            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
1301            fireNeedReq( msgs );
1302            break;
1303        }
1304
1305        case BT_REQUEST: {
1306            struct peer_request r;
1307            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1308            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1309            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1310            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1311            peerMadeRequest( msgs, &r );
1312            break;
1313        }
1314
1315        case BT_CANCEL: {
1316            struct peer_request r;
1317            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1318            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1319            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1320            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1321            reqListRemove( &msgs->peerAskedForFast, &r );
1322            reqListRemove( &msgs->peerAskedFor, &r );
1323            break;
1324        }
1325
1326        case BT_PIECE:
1327            assert( 0 ); /* handled elsewhere! */
1328            break;
1329       
1330        case BT_PORT:
1331            dbgmsg( msgs, "Got a BT_PORT" );
1332            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1333            break;
1334           
1335        case BT_SUGGEST: {
1336            dbgmsg( msgs, "Got a BT_SUGGEST" );
1337            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1338            /* we don't do anything with this yet */
1339            break;
1340        }
1341           
1342        case BT_HAVE_ALL:
1343            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1344            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1345            updatePeerProgress( msgs );
1346            maybeSendFastAllowedSet( msgs );
1347            break;
1348       
1349       
1350        case BT_HAVE_NONE:
1351            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1352            tr_bitfieldClear( msgs->info->have );
1353            updatePeerProgress( msgs );
1354            maybeSendFastAllowedSet( msgs );
1355            break;
1356       
1357        case BT_REJECT: {
1358            struct peer_request r;
1359            dbgmsg( msgs, "Got a BT_REJECT" );
1360            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1361            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1362            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1363            reqListRemove( &msgs->clientAskedFor, &r );
1364            break;
1365        }
1366
1367        case BT_ALLOWED_FAST: {
1368            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1369            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1370            /* we don't do anything with this yet */
1371            break;
1372        }
1373
1374        case BT_LTEP:
1375            dbgmsg( msgs, "Got a BT_LTEP" );
1376            parseLtep( msgs, msglen, inbuf );
1377            break;
1378
1379        default:
1380            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1381            tr_peerIoDrain( msgs->io, inbuf, msglen );
1382            break;
1383    }
1384
1385    assert( msglen + 1 == msgs->incoming.length );
1386    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1387
1388    msgs->state = AWAITING_BT_LENGTH;
1389    return READ_AGAIN;
1390}
1391
1392static void
1393peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1394{
1395    const time_t now = time( NULL );
1396    tr_torrent * tor = msgs->torrent;
1397    tor->activityDate = tr_date( );
1398    tor->uploadedCur += byteCount;
1399    msgs->info->pieceDataActivityDate = now;
1400    msgs->info->credit -= byteCount;
1401    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1402    tr_rcTransferred( tor->upload, byteCount );
1403    tr_rcTransferred( tor->handle->upload, byteCount );
1404    tr_statsAddUploaded( msgs->handle, byteCount );
1405    firePieceData( msgs );
1406}
1407
1408static size_t
1409getDownloadMax( const tr_peermsgs * msgs )
1410{
1411    static const size_t maxval = ~0;
1412    const tr_torrent * tor = msgs->torrent;
1413
1414    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1415        return tor->handle->useDownloadLimit
1416            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1417
1418    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1419        return tr_rcBytesLeft( tor->download );
1420
1421    return maxval;
1422}
1423
1424static void
1425decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1426{
1427    tr_torrent * tor = msgs->torrent;
1428    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1429}
1430
1431static void
1432reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1433{
1434    tr_torrent * tor = msgs->torrent;
1435
1436    tor->corruptCur += byteCount;
1437
1438    decrementDownloadedCount( msgs, byteCount );
1439}
1440
1441static void
1442gotBadPiece( tr_peermsgs * msgs, tr_piece_index_t pieceIndex )
1443{
1444    const uint32_t byteCount =
1445        tr_torPieceCountBytes( msgs->torrent, pieceIndex );
1446    reassignBytesToCorrupt( msgs, byteCount );
1447}
1448
1449static void
1450clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1451{
1452    decrementDownloadedCount( msgs, req->length );
1453}
1454
1455static void
1456addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1457{
1458    if( !msgs->info->blame )
1459         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1460    tr_bitfieldAdd( msgs->info->blame, index );
1461}
1462
1463static tr_errno
1464clientGotBlock( tr_peermsgs                * msgs,
1465                const uint8_t              * data,
1466                const struct peer_request  * req )
1467{
1468    int err;
1469    tr_torrent * tor = msgs->torrent;
1470    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1471
1472    assert( msgs != NULL );
1473    assert( req != NULL );
1474
1475    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1476    {
1477        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1478                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1479        return TR_ERROR;
1480    }
1481
1482    /* save the block */
1483    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1484
1485    /**
1486    *** Remove the block from our `we asked for this' list
1487    **/
1488
1489    if( reqListRemove( &msgs->clientAskedFor, req ) )
1490    {
1491        clientGotUnwantedBlock( msgs, req );
1492        dbgmsg( msgs, "we didn't ask for this message..." );
1493        return 0;
1494    }
1495
1496    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1497                  msgs->clientAskedFor.count );
1498
1499    /**
1500    *** Error checks
1501    **/
1502
1503    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1504        dbgmsg( msgs, "we have this block already..." );
1505        clientGotUnwantedBlock( msgs, req );
1506        return 0;
1507    }
1508
1509    /**
1510    ***  Save the block
1511    **/
1512
1513    msgs->info->peerSentPieceDataAt = time( NULL );
1514    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1515        return err;
1516
1517    tr_cpBlockAdd( tor->completion, block );
1518
1519    addPeerToBlamefield( msgs, req->index );
1520
1521    fireGotBlock( msgs, req );
1522
1523    /**
1524    ***  Handle if this was the last block in the piece
1525    **/
1526
1527    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1528    {
1529        const tr_errno err = tr_ioTestPiece( tor, req->index );
1530
1531        if( err )
1532            tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test: %s" ),
1533                       (unsigned long)req->index,
1534                       tr_errorString( err ) );
1535
1536        tr_torrentSetHasPiece( tor, req->index, !err );
1537        tr_torrentSetPieceChecked( tor, req->index, TRUE );
1538        tr_peerMgrSetBlame( tor->handle->peerMgr, tor->info.hash, req->index, !err );
1539
1540        if( !err )
1541            fireClientHave( msgs, req->index );
1542        else
1543            gotBadPiece( msgs, req->index );
1544    }
1545
1546    return 0;
1547}
1548
1549static void
1550didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1551{
1552    pulse( vmsgs );
1553}
1554
1555static ReadState
1556canRead( struct bufferevent * evin, void * vmsgs )
1557{
1558    ReadState ret;
1559    tr_peermsgs * msgs = vmsgs;
1560    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1561    const size_t inlen = EVBUFFER_LENGTH( in );
1562
1563    if( !inlen )
1564    {
1565        ret = READ_DONE;
1566    }
1567    else if( msgs->state == AWAITING_BT_PIECE )
1568    {
1569        const size_t downloadMax = getDownloadMax( msgs );
1570        const size_t n = MIN( inlen, downloadMax );
1571        ret = n ? readBtPiece( msgs, in, n ) : READ_DONE;
1572    }
1573    else switch( msgs->state )
1574    {
1575        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, in, inlen ); break;
1576        case AWAITING_BT_ID:      ret = readBtId     ( msgs, in, inlen ); break;
1577        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, in, inlen ); break;
1578        default:                  assert( 0 );
1579    }
1580
1581    return ret;
1582}
1583
1584static void
1585sendKeepalive( tr_peermsgs * msgs )
1586{
1587    dbgmsg( msgs, "sending a keepalive message" );
1588    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1589}
1590
1591/**
1592***
1593**/
1594
1595static int
1596isSwiftEnabled( const tr_peermsgs * msgs )
1597{
1598    /* rationale: SWIFT is good for getting rid of deadbeats, but most
1599     * private trackers have ratios where you _want_ to feed deadbeats
1600     * as much as possible.  So we disable SWIFT on private torrents */
1601    return SWIFT_ENABLED
1602        && !tr_torrentIsSeed( msgs->torrent )
1603        && !tr_torrentIsPrivate( msgs->torrent );
1604}
1605
1606static size_t
1607getUploadMax( const tr_peermsgs * msgs )
1608{
1609    static const size_t maxval = ~0;
1610    const tr_torrent * tor = msgs->torrent;
1611    const int useSwift = isSwiftEnabled( msgs );
1612    const size_t swiftLeft = msgs->info->credit;
1613    size_t speedLeft;
1614    size_t bufLeft;
1615    size_t ret;
1616
1617    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1618        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1619    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1620        speedLeft = tr_rcBytesLeft( tor->upload );
1621    else
1622        speedLeft = ~0;
1623
1624    bufLeft = MAX_OUTBUF_SIZE - tr_peerIoWriteBytesWaiting( msgs->io );
1625    ret = MIN( speedLeft, bufLeft );
1626    if( useSwift)
1627        ret = MIN( ret, swiftLeft );
1628    return ret;
1629}
1630
1631static int
1632ratePulse( void * vmsgs )
1633{
1634    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1635    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1636    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1637    msgs->maxActiveRequests = MIN( 4 + (int)(msgs->info->rateToClient/4), MAX_QUEUE_SIZE );
1638    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1639    return TRUE;
1640}
1641
1642static tr_errno
1643popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
1644{
1645    if( !reqListPop( &msgs->peerAskedForFast, setme ) )
1646        return 0;
1647    if( !reqListPop( &msgs->peerAskedFor, setme ) )
1648        return 0;
1649
1650    return TR_ERROR;
1651}
1652
1653static int
1654pulse( void * vmsgs )
1655{
1656    const time_t now = time( NULL );
1657    tr_peermsgs * msgs = vmsgs;
1658
1659    tr_peerIoTryRead( msgs->io );
1660    pumpRequestQueue( msgs );
1661    expireOldRequests( msgs );
1662
1663    if( msgs->sendingBlock )
1664    {
1665        const size_t uploadMax = getUploadMax( msgs );
1666        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1667        const size_t outlen = MIN( len, uploadMax );
1668
1669        assert( len );
1670
1671        if( outlen )
1672        {
1673            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1674            evbuffer_drain( msgs->outBlock, outlen );
1675            peerGotBytes( msgs, outlen );
1676
1677            len -= outlen;
1678            msgs->clientSentAnythingAt = now;
1679            msgs->sendingBlock = len!=0;
1680
1681            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1682        }
1683        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1684    }
1685
1686    if( !msgs->sendingBlock )
1687    {
1688        struct peer_request req;
1689
1690        if(( EVBUFFER_LENGTH( msgs->outMessages ) ))
1691        {
1692            dbgmsg( msgs, "flushing outMessages..." );
1693            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1694            msgs->clientSentAnythingAt = now;
1695        }
1696        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1697            && !popNextRequest( msgs, &req )
1698            && requestIsValid( msgs, &req )
1699            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1700        {
1701            uint8_t * buf = tr_new( uint8_t, req.length );
1702
1703            if( !tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ) )
1704            {
1705                tr_peerIo * io = msgs->io;
1706                struct evbuffer * out = msgs->outBlock;
1707
1708                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1709                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + req.length );
1710                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1711                tr_peerIoWriteUint32( io, out, req.index );
1712                tr_peerIoWriteUint32( io, out, req.offset );
1713                tr_peerIoWriteBytes ( io, out, buf, req.length );
1714                msgs->sendingBlock = 1;
1715            }
1716
1717            tr_free( buf );
1718        }
1719        else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1720        {
1721            sendKeepalive( msgs );
1722        }
1723    }
1724
1725    return TRUE; /* loop forever */
1726}
1727
1728static void
1729gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1730{
1731    if( what & EVBUFFER_TIMEOUT )
1732        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1733    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1734        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1735                what, errno, tr_strerror(errno) );
1736    fireError( vmsgs, TR_ERROR );
1737}
1738
1739static void
1740sendBitfield( tr_peermsgs * msgs )
1741{
1742    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1743    struct evbuffer * out = msgs->outMessages;
1744
1745    dbgmsg( msgs, "sending peer a bitfield message" );
1746    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1747    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1748    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1749}
1750
1751/**
1752***
1753**/
1754
1755/* some peers give us error messages if we send
1756   more than this many peers in a single pex message */
1757#define MAX_PEX_DIFFS 200
1758
1759typedef struct
1760{
1761    tr_pex * added;
1762    tr_pex * dropped;
1763    tr_pex * elements;
1764    int addedCount;
1765    int droppedCount;
1766    int elementCount;
1767    int diffCount;
1768}
1769PexDiffs;
1770
1771static void
1772pexAddedCb( void * vpex, void * userData )
1773{
1774    PexDiffs * diffs = (PexDiffs *) userData;
1775    tr_pex * pex = (tr_pex *) vpex;
1776    if( diffs->diffCount < MAX_PEX_DIFFS )
1777    {
1778        diffs->diffCount++;
1779        diffs->added[diffs->addedCount++] = *pex;
1780        diffs->elements[diffs->elementCount++] = *pex;
1781    }
1782}
1783
1784static void
1785pexRemovedCb( void * vpex, void * userData )
1786{
1787    PexDiffs * diffs = (PexDiffs *) userData;
1788    tr_pex * pex = (tr_pex *) vpex;
1789    if( diffs->diffCount < MAX_PEX_DIFFS )
1790    {
1791        diffs->diffCount++;
1792        diffs->dropped[diffs->droppedCount++] = *pex;
1793    }
1794}
1795
1796static void
1797pexElementCb( void * vpex, void * userData )
1798{
1799    PexDiffs * diffs = (PexDiffs *) userData;
1800    tr_pex * pex = (tr_pex *) vpex;
1801    if( diffs->diffCount < MAX_PEX_DIFFS )
1802    {
1803        diffs->diffCount++;
1804        diffs->elements[diffs->elementCount++] = *pex;
1805    }
1806}
1807
1808static void
1809sendPex( tr_peermsgs * msgs )
1810{
1811    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1812    {
1813        int i;
1814        tr_pex * newPex = NULL;
1815        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1816        PexDiffs diffs;
1817        tr_benc val, *added, *dropped, *flags;
1818        uint8_t *tmp, *walk;
1819        char * benc;
1820        int bencLen;
1821
1822        /* build the diffs */
1823        diffs.added = tr_new( tr_pex, newCount );
1824        diffs.addedCount = 0;
1825        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1826        diffs.droppedCount = 0;
1827        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1828        diffs.elementCount = 0;
1829        diffs.diffCount = 0;
1830        tr_set_compare( msgs->pex, msgs->pexCount,
1831                        newPex, newCount,
1832                        tr_pexCompare, sizeof(tr_pex),
1833                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1834        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1835
1836        /* update peer */
1837        tr_free( msgs->pex );
1838        msgs->pex = diffs.elements;
1839        msgs->pexCount = diffs.elementCount;
1840
1841        /* build the pex payload */
1842        tr_bencInitDict( &val, 3 );
1843
1844        /* "added" */
1845        added = tr_bencDictAdd( &val, "added" );
1846        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1847        for( i=0; i<diffs.addedCount; ++i ) {
1848            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1849            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1850        }
1851        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1852        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1853
1854        /* "added.f" */
1855        flags = tr_bencDictAdd( &val, "added.f" );
1856        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1857        for( i=0; i<diffs.addedCount; ++i )
1858            *walk++ = diffs.added[i].flags;
1859        assert( ( walk - tmp ) == diffs.addedCount );
1860        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1861
1862        /* "dropped" */
1863        dropped = tr_bencDictAdd( &val, "dropped" );
1864        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1865        for( i=0; i<diffs.droppedCount; ++i ) {
1866            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1867            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1868        }
1869        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1870        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1871
1872        /* write the pex message */
1873        benc = tr_bencSave( &val, &bencLen );
1874        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1875        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1876        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, msgs->ut_pex_id );
1877        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1878
1879        /* cleanup */
1880        tr_free( benc );
1881        tr_bencFree( &val );
1882        tr_free( diffs.added );
1883        tr_free( diffs.dropped );
1884        tr_free( newPex );
1885
1886        msgs->clientSentPexAt = time( NULL );
1887    }
1888}
1889
1890static int
1891pexPulse( void * vpeer )
1892{
1893    sendPex( vpeer );
1894    return TRUE;
1895}
1896
1897/**
1898***
1899**/
1900
1901tr_peermsgs*
1902tr_peerMsgsNew( struct tr_torrent * torrent,
1903                struct tr_peer    * info,
1904                tr_delivery_func    func,
1905                void              * userData,
1906                tr_publisher_tag  * setme )
1907{
1908    tr_peermsgs * m;
1909
1910    assert( info != NULL );
1911    assert( info->io != NULL );
1912
1913    m = tr_new0( tr_peermsgs, 1 );
1914    m->publisher = tr_publisherNew( );
1915    m->info = info;
1916    m->handle = torrent->handle;
1917    m->torrent = torrent;
1918    m->io = info->io;
1919    m->info->clientIsChoked = 1;
1920    m->info->peerIsChoked = 1;
1921    m->info->clientIsInterested = 0;
1922    m->info->peerIsInterested = 0;
1923    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1924    m->state = AWAITING_BT_LENGTH;
1925    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1926    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1927    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1928    m->outMessages = evbuffer_new( );
1929    m->incoming.block = evbuffer_new( );
1930    m->outBlock = evbuffer_new( );
1931    m->peerAllowedPieces = NULL;
1932    m->peerAskedFor = REQUEST_LIST_INIT;
1933    m->peerAskedForFast = REQUEST_LIST_INIT;
1934    m->clientAskedFor = REQUEST_LIST_INIT;
1935    m->clientWillAskFor = REQUEST_LIST_INIT;
1936    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1937
1938    if ( tr_peerIoSupportsLTEP( m->io ) )
1939        sendLtepHandshake( m );
1940
1941    /* bitfield/have-all/have-none must preceed other non-handshake messages... */
1942    if ( !tr_peerIoSupportsFEXT( m->io ) )
1943        sendBitfield( m );
1944    else {
1945        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1946        float completion = tr_cpPercentComplete( m->torrent->completion );
1947        if ( completion == 0.0f ) {
1948            sendFastHave( m, 0 );
1949        } else if ( completion == 1.0f ) {
1950            sendFastHave( m, 1 );
1951        } else {
1952            sendBitfield( m );
1953        }
1954    }
1955   
1956    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1957    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1958    ratePulse( m );
1959
1960    return m;
1961}
1962
1963void
1964tr_peerMsgsFree( tr_peermsgs* msgs )
1965{
1966    if( msgs != NULL )
1967    {
1968        tr_timerFree( &msgs->pulseTimer );
1969        tr_timerFree( &msgs->rateTimer );
1970        tr_timerFree( &msgs->pexTimer );
1971        tr_publisherFree( &msgs->publisher );
1972        reqListClear( &msgs->clientWillAskFor );
1973        reqListClear( &msgs->clientAskedFor );
1974        reqListClear( &msgs->peerAskedForFast );
1975        reqListClear( &msgs->peerAskedFor );
1976        tr_bitfieldFree( msgs->peerAllowedPieces );
1977        evbuffer_free( msgs->incoming.block );
1978        evbuffer_free( msgs->outMessages );
1979        evbuffer_free( msgs->outBlock );
1980        tr_free( msgs->pex );
1981
1982        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1983        tr_free( msgs );
1984    }
1985}
1986
1987tr_publisher_tag
1988tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1989                      tr_delivery_func    func,
1990                      void              * userData )
1991{
1992    return tr_publisherSubscribe( peer->publisher, func, userData );
1993}
1994
1995void
1996tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1997                        tr_publisher_tag    tag )
1998{
1999    tr_publisherUnsubscribe( peer->publisher, tag );
2000}
Note: See TracBrowser for help on using the repository browser.