source: trunk/libtransmission/peer-msgs.c @ 5127

Last change on this file since 5127 was 5127, checked in by charles, 14 years ago

more housekeeping: benc_val_t --> tr_benc

  • Property svn:keywords set to Date Rev Author Id
File size: 55.9 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 5127 2008-02-26 21:58:58Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18#include <libgen.h> /* basename */
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "inout.h"
26#include "list.h"
27#include "peer-io.h"
28#include "peer-mgr.h"
29#include "peer-mgr-private.h"
30#include "peer-msgs.h"
31#include "ratecontrol.h"
32#include "stats.h"
33#include "torrent.h"
34#include "trevent.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41enum
42{
43    BT_CHOKE                = 0,
44    BT_UNCHOKE              = 1,
45    BT_INTERESTED           = 2,
46    BT_NOT_INTERESTED       = 3,
47    BT_HAVE                 = 4,
48    BT_BITFIELD             = 5,
49    BT_REQUEST              = 6,
50    BT_PIECE                = 7,
51    BT_CANCEL               = 8,
52    BT_PORT                 = 9,
53    BT_SUGGEST              = 13,
54    BT_HAVE_ALL             = 14,
55    BT_HAVE_NONE            = 15,
56    BT_REJECT               = 16,
57    BT_ALLOWED_FAST         = 17,
58    BT_LTEP                 = 20,
59
60    LTEP_HANDSHAKE          = 0,
61
62    TR_LTEP_PEX             = 1,
63
64    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
65
66    /* idle seconds before we send a keepalive */
67    KEEPALIVE_INTERVAL_SECS = 90,
68
69    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
70    PEER_PULSE_INTERVAL     = (100),       /* msec between calls to pulse() */
71    RATE_PULSE_INTERVAL     = (250),       /* msec between calls to ratePulse() */
72
73    MAX_OUTBUF_SIZE         = (1024),
74     
75    /* Fast Peers Extension constants */
76    MAX_FAST_ALLOWED_COUNT   = 10,          /* max. number of pieces we fast-allow to another peer */
77    MAX_FAST_ALLOWED_THRESHOLD = 10,        /* max threshold for allowing fast-pieces requests */
78
79    QUEUED_REQUEST_TTL_SECS = 20,
80
81    SENT_REQUEST_TTL_SECS = 90
82
83};
84
85enum
86{
87    AWAITING_BT_LENGTH,
88    AWAITING_BT_ID,
89    AWAITING_BT_MESSAGE,
90    AWAITING_BT_PIECE
91};
92
93struct peer_request
94{
95    uint32_t index;
96    uint32_t offset;
97    uint32_t length;
98    time_t time_requested;
99};
100
101static int
102compareRequest( const void * va, const void * vb )
103{
104    int i;
105    const struct peer_request * a = va;
106    const struct peer_request * b = vb;
107    if(( i = tr_compareUint32( a->index, b->index ))) return i;
108    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
109    if(( i = tr_compareUint32( a->length, b->length ))) return i;
110    return 0;
111}
112
113/* this is raw, unchanged data from the peer regarding
114 * the current message that it's sending us. */
115struct tr_incoming
116{
117    uint32_t length; /* includes the +1 for id length */
118    uint8_t id;
119    struct peer_request blockReq; /* metadata for incoming blocks */
120    struct evbuffer * block; /* piece data for incoming blocks */
121};
122
123struct tr_peermsgs
124{
125    tr_peer * info;
126
127    tr_handle * handle;
128    tr_torrent * torrent;
129    tr_peerIo * io;
130
131    tr_publisher_t * publisher;
132
133    struct evbuffer * outBlock;    /* buffer of all the current piece message */
134    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
135    tr_list * peerAskedFor;
136    tr_list * peerAskedForFast;
137    tr_list * clientAskedFor;
138    tr_list * clientWillAskFor;
139
140    tr_timer * rateTimer;
141    tr_timer * pulseTimer;
142    tr_timer * pexTimer;
143
144    time_t lastReqAddedAt;
145    time_t clientSentPexAt;
146    time_t clientSentAnythingAt;
147
148    time_t clientSentPieceDataAt;
149    time_t peerSentPieceDataAt;
150
151    unsigned int peerSentBitfield         : 1;
152    unsigned int peerSupportsPex          : 1;
153    unsigned int clientSentLtepHandshake  : 1;
154    unsigned int peerSentLtepHandshake    : 1;
155    unsigned int sendingBlock             : 1;
156   
157    tr_bitfield * clientAllowedPieces;
158    tr_bitfield * peerAllowedPieces;
159   
160    tr_bitfield * clientSuggestedPieces;
161   
162    uint8_t state;
163    uint8_t ut_pex_id;
164    uint16_t pexCount;
165    uint32_t maxActiveRequests;
166    uint32_t minActiveRequests;
167
168    struct tr_incoming incoming; 
169
170    tr_pex * pex;
171};
172
173/**
174***
175**/
176
177static void
178myDebug( const char * file, int line,
179         const struct tr_peermsgs * msgs,
180         const char * fmt, ... )
181{
182    FILE * fp = tr_getLog( );
183    if( fp != NULL )
184    {
185        va_list args;
186        char timestr[64];
187        struct evbuffer * buf = evbuffer_new( );
188        char * myfile = tr_strdup( file );
189
190        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
191                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
192                             msgs->torrent->info.name,
193                             tr_peerIoGetAddrStr( msgs->io ),
194                             msgs->info->client );
195        va_start( args, fmt );
196        evbuffer_add_vprintf( buf, fmt, args );
197        va_end( args );
198        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
199        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
200
201        tr_free( myfile );
202        evbuffer_free( buf );
203    }
204}
205
206#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
207
208/**
209***
210**/
211
212static void
213protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
214{
215    tr_peerIo * io = msgs->io;
216    struct evbuffer * out = msgs->outMessages;
217
218    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
219    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
220    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
221    tr_peerIoWriteUint32( io, out, req->index );
222    tr_peerIoWriteUint32( io, out, req->offset );
223    tr_peerIoWriteUint32( io, out, req->length );
224}
225
226static void
227protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
228{
229    tr_peerIo * io = msgs->io;
230    struct evbuffer * out = msgs->outMessages;
231
232    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
233    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
234    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
235    tr_peerIoWriteUint32( io, out, req->index );
236    tr_peerIoWriteUint32( io, out, req->offset );
237    tr_peerIoWriteUint32( io, out, req->length );
238}
239
240static void
241protocolSendHave( tr_peermsgs * msgs, uint32_t index )
242{
243    tr_peerIo * io = msgs->io;
244    struct evbuffer * out = msgs->outMessages;
245
246    dbgmsg( msgs, "sending Have %u", index );
247    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
248    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
249    tr_peerIoWriteUint32( io, out, index );
250}
251
252static void
253protocolSendChoke( tr_peermsgs * msgs, int choke )
254{
255    tr_peerIo * io = msgs->io;
256    struct evbuffer * out = msgs->outMessages;
257
258    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
259    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
260    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
261}
262
263/**
264***  EVENTS
265**/
266
267static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
268
269static void
270publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
271{
272    tr_publisherPublish( msgs->publisher, msgs->info, e );
273}
274
275static void
276fireError( tr_peermsgs * msgs, tr_errno err )
277{
278    tr_peermsgs_event e = blankEvent;
279    e.eventType = TR_PEERMSG_ERROR;
280    e.err = err;
281    publish( msgs, &e );
282}
283
284static void
285fireNeedReq( tr_peermsgs * msgs )
286{
287    tr_peermsgs_event e = blankEvent;
288    e.eventType = TR_PEERMSG_NEED_REQ;
289    publish( msgs, &e );
290}
291
292static void
293firePeerProgress( tr_peermsgs * msgs )
294{
295    tr_peermsgs_event e = blankEvent;
296    e.eventType = TR_PEERMSG_PEER_PROGRESS;
297    e.progress = msgs->info->progress;
298    publish( msgs, &e );
299}
300
301static void
302fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
303{
304    tr_peermsgs_event e = blankEvent;
305    e.eventType = TR_PEERMSG_CLIENT_HAVE;
306    e.pieceIndex = pieceIndex;
307    publish( msgs, &e );
308}
309
310static void
311fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
312{
313    tr_peermsgs_event e = blankEvent;
314    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
315    e.pieceIndex = req->index;
316    e.offset = req->offset;
317    e.length = req->length;
318    publish( msgs, &e );
319}
320
321static void
322firePieceData( tr_peermsgs * msgs )
323{
324    tr_peermsgs_event e = blankEvent;
325    e.eventType = TR_PEERMSG_PIECE_DATA;
326    publish( msgs, &e );
327}
328
329static void
330fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
331{
332    tr_peermsgs_event e = blankEvent;
333    e.eventType = TR_PEERMSG_CANCEL;
334    e.pieceIndex = req->index;
335    e.offset = req->offset;
336    e.length = req->length;
337    publish( msgs, &e );
338}
339
340/**
341***  INTEREST
342**/
343
344static int
345isPieceInteresting( const tr_peermsgs   * peer,
346                    int                   piece )
347{
348    const tr_torrent * torrent = peer->torrent;
349    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
350        return FALSE;
351    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
352        return FALSE;
353    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
354        return FALSE;
355    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
356        return FALSE;
357    return TRUE;
358}
359
360/* "interested" means we'll ask for piece data if they unchoke us */
361static int
362isPeerInteresting( const tr_peermsgs * msgs )
363{
364    int i;
365    const tr_torrent * torrent;
366    const tr_bitfield * bitfield;
367    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
368
369    if( clientIsSeed )
370        return FALSE;
371
372    torrent = msgs->torrent;
373    bitfield = tr_cpPieceBitfield( torrent->completion );
374
375    if( !msgs->info->have )
376        return TRUE;
377
378    assert( bitfield->len == msgs->info->have->len );
379    for( i=0; i<torrent->info.pieceCount; ++i )
380        if( isPieceInteresting( msgs, i ) )
381            return TRUE;
382
383    return FALSE;
384}
385
386static void
387sendInterest( tr_peermsgs * msgs, int weAreInterested )
388{
389    assert( msgs != NULL );
390    assert( weAreInterested==0 || weAreInterested==1 );
391
392    msgs->info->clientIsInterested = weAreInterested;
393    dbgmsg( msgs, "Sending %s",
394            weAreInterested ? "Interested" : "Not Interested");
395
396    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
397    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
398                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
399}
400
401static void
402updateInterest( tr_peermsgs * msgs )
403{
404    const int i = isPeerInteresting( msgs );
405    if( i != msgs->info->clientIsInterested )
406        sendInterest( msgs, i );
407    if( i )
408        fireNeedReq( msgs );
409}
410
411#define MIN_CHOKE_PERIOD_SEC 10
412
413static void
414cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
415{
416    tr_list_free( &msgs->peerAskedFor, tr_free );
417}
418
419void
420tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
421{
422    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
423
424    assert( msgs != NULL );
425    assert( msgs->info != NULL );
426    assert( choke==0 || choke==1 );
427
428    if( msgs->info->chokeChangedAt > fibrillationTime )
429    {
430        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
431    }
432    else if( msgs->info->peerIsChoked != choke )
433    {
434        msgs->info->peerIsChoked = choke;
435        if( choke )
436            cancelAllRequestsToClientExceptFast( msgs );
437        protocolSendChoke( msgs, choke );
438        msgs->info->chokeChangedAt = time( NULL );
439    }
440}
441
442/**
443***
444**/
445
446void
447tr_peerMsgsHave( tr_peermsgs * msgs,
448                 uint32_t      index )
449{
450    protocolSendHave( msgs, index );
451
452    /* since we have more pieces now, we might not be interested in this peer */
453    updateInterest( msgs );
454}
455#if 0
456static void
457sendFastSuggest( tr_peermsgs * msgs,
458                 uint32_t      pieceIndex )
459{
460    assert( msgs != NULL );
461   
462    if( tr_peerIoSupportsFEXT( msgs->io ) )
463    {
464        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
465        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
466        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
467    }
468}
469#endif
470static void
471sendFastHave( tr_peermsgs * msgs, int all )
472{
473    assert( msgs != NULL );
474   
475    if( tr_peerIoSupportsFEXT( msgs->io ) )
476    {
477        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
478        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
479                                                                : BT_HAVE_NONE ) );
480        updateInterest( msgs );
481    }
482}
483
484static void
485sendFastReject( tr_peermsgs * msgs,
486                uint32_t      pieceIndex,
487                uint32_t      offset,
488                uint32_t      length )
489{
490    assert( msgs != NULL );
491
492    if( tr_peerIoSupportsFEXT( msgs->io ) )
493    {
494        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
495        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
496        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
497        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
498        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
499        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
500    }
501}
502
503static tr_bitfield*
504getPeerAllowedPieces( tr_peermsgs * msgs )
505{
506    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
507    {
508        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
509            MAX_FAST_ALLOWED_COUNT,
510            msgs->torrent->info.pieceCount,
511            msgs->torrent->info.hash,
512            tr_peerIoGetAddress( msgs->io, NULL ) );
513    }
514
515    return msgs->peerAllowedPieces;
516}
517
518static void
519sendFastAllowed( tr_peermsgs * msgs,
520                 uint32_t      pieceIndex)
521{
522    assert( msgs != NULL );
523   
524    if( tr_peerIoSupportsFEXT( msgs->io ) )
525    {
526        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
527        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
528        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
529    }
530}
531
532static void
533sendFastAllowedSet( tr_peermsgs * msgs )
534{
535    int i = 0;
536
537    while (i <= msgs->torrent->info.pieceCount )
538    {
539        if ( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i) )
540            sendFastAllowed( msgs, i );
541        i++;
542    }
543}
544
545static void
546maybeSendFastAllowedSet( tr_peermsgs * msgs )
547{
548    if( tr_bitfieldCountTrueBits( msgs->info->have ) <= MAX_FAST_ALLOWED_THRESHOLD )
549        sendFastAllowedSet( msgs );
550}
551
552
553/**
554***
555**/
556
557static int
558reqIsValid( const tr_peermsgs   * msgs,
559            uint32_t              index,
560            uint32_t              offset,
561            uint32_t              length )
562{
563    const tr_torrent * tor = msgs->torrent;
564
565    if( index >= (uint32_t) tor->info.pieceCount )
566        return FALSE;
567    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
568        return FALSE;
569    if( length > MAX_REQUEST_BYTE_COUNT )
570        return FALSE;
571    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
572        return FALSE;
573
574    return TRUE;
575}
576
577static int
578requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
579{
580    return reqIsValid( msgs, req->index, req->offset, req->length );
581}
582
583static void
584expireOldRequests( tr_peermsgs * msgs )
585{
586    tr_list * l;
587    tr_list * prune = NULL;
588    const time_t now = time( NULL );
589
590    /* find queued requests that are too old
591       "time_requested" here is when the request was queued */
592    for( l=msgs->clientWillAskFor; l!=NULL; l=l->next ) {
593        struct peer_request * req = l->data;
594        if( req->time_requested + QUEUED_REQUEST_TTL_SECS < now )
595            tr_list_prepend( &prune, req );
596    }
597
598    /* find sent requests that are too old
599       "time_requested" here is when the request was sent */
600    for( l=msgs->clientAskedFor; l!=NULL; l=l->next ) {
601        struct peer_request * req = l->data;
602        if( req->time_requested + SENT_REQUEST_TTL_SECS < now )
603            tr_list_prepend( &prune, req );
604    }
605
606    /* expire the old requests */
607    for( l=prune; l!=NULL; l=l->next ) {
608        struct peer_request * req = l->data;
609        tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
610    }
611
612    /* cleanup */
613    tr_list_free( &prune, NULL );
614}
615
616static void
617pumpRequestQueue( tr_peermsgs * msgs )
618{
619    const int max = msgs->maxActiveRequests;
620    const int min = msgs->minActiveRequests;
621    int count = tr_list_size( msgs->clientAskedFor );
622    int sent = 0;
623
624    if( count > min )
625        return;
626    if( msgs->info->clientIsChoked )
627        return;
628
629    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
630    {
631        struct peer_request * r = tr_list_pop_front( &msgs->clientWillAskFor );
632        assert( requestIsValid( msgs, r ) );
633        assert( tr_bitfieldHas( msgs->info->have, r->index ) );
634        protocolSendRequest( msgs, r );
635        r->time_requested = msgs->lastReqAddedAt = time( NULL );
636        tr_list_append( &msgs->clientAskedFor, r );
637        ++count;
638        ++sent;
639    }
640
641    if( sent )
642        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
643                sent,
644                tr_list_size(msgs->clientAskedFor),
645                tr_list_size(msgs->clientWillAskFor) );
646
647    if( count < max )
648        fireNeedReq( msgs );
649}
650
651static int
652pulse( void * vmsgs );
653
654int
655tr_peerMsgsAddRequest( tr_peermsgs * msgs,
656                       uint32_t      index, 
657                       uint32_t      offset, 
658                       uint32_t      length )
659{
660    const int req_max = msgs->maxActiveRequests;
661    struct peer_request tmp, *req;
662
663    assert( msgs != NULL );
664    assert( msgs->torrent != NULL );
665    assert( reqIsValid( msgs, index, offset, length ) );
666
667    /**
668    ***  Reasons to decline the request
669    **/
670
671    /* don't send requests to choked clients */
672    if( msgs->info->clientIsChoked ) {
673        dbgmsg( msgs, "declining request because they're choking us" );
674        return TR_ADDREQ_CLIENT_CHOKED;
675    }
676
677    /* peer doesn't have this piece */
678    if( !tr_bitfieldHas( msgs->info->have, index ) )
679        return TR_ADDREQ_MISSING;
680
681    /* peer's queue is full */
682    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
683        dbgmsg( msgs, "declining request because we're full" );
684        return TR_ADDREQ_FULL;
685    }
686
687    /* have we already asked for this piece? */
688    tmp.index = index;
689    tmp.offset = offset;
690    tmp.length = length;
691    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
692        dbgmsg( msgs, "declining because it's a duplicate" );
693        return TR_ADDREQ_DUPLICATE;
694    }
695    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
696        dbgmsg( msgs, "declining because it's a duplicate" );
697        return TR_ADDREQ_DUPLICATE;
698    }
699
700    /**
701    ***  Accept this request
702    **/
703
704    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
705    req = tr_new0( struct peer_request, 1 );
706    *req = tmp;
707    req->time_requested = time( NULL );
708    tr_list_append( &msgs->clientWillAskFor, req );
709    return TR_ADDREQ_OK;
710}
711
712static void
713cancelAllRequestsToPeer( tr_peermsgs * msgs )
714{
715    struct peer_request * req;
716
717    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
718    {
719        fireCancelledReq( msgs, req );
720        tr_free( req );
721    }
722
723    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
724    {
725        fireCancelledReq( msgs, req );
726        protocolSendCancel( msgs, req );
727        tr_free( req );
728    }
729}
730
731void
732tr_peerMsgsCancel( tr_peermsgs * msgs,
733                   uint32_t      pieceIndex,
734                   uint32_t      offset,
735                   uint32_t      length )
736{
737    struct peer_request *req, tmp;
738
739    assert( msgs != NULL );
740    assert( length > 0 );
741
742    /* have we asked the peer for this piece? */
743    tmp.index = pieceIndex;
744    tmp.offset = offset;
745    tmp.length = length;
746
747    /* if it's only in the queue and hasn't been sent yet, free it */
748    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
749    {
750        fireCancelledReq( msgs, req );
751        tr_free( req );
752    }
753
754    /* if it's already been sent, send a cancel message too */
755    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
756    {
757        protocolSendCancel( msgs, req );
758        fireCancelledReq( msgs, req );
759        tr_free( req );
760    }
761}
762
763/**
764***
765**/
766
767static void
768sendLtepHandshake( tr_peermsgs * msgs )
769{
770    tr_benc val, *m;
771    char * buf;
772    int len;
773    int pex;
774    const char * v = TR_NAME " " USERAGENT_PREFIX;
775    const int port = tr_getPublicPort( msgs->handle );
776    struct evbuffer * outbuf;
777
778    if( msgs->clientSentLtepHandshake )
779        return;
780
781    outbuf = evbuffer_new( );
782    dbgmsg( msgs, "sending an ltep handshake" );
783    msgs->clientSentLtepHandshake = 1;
784
785    /* decide if we want to advertise pex support */
786    if( !tr_torrentAllowsPex( msgs->torrent ) )
787        pex = 0;
788    else if( msgs->peerSentLtepHandshake )
789        pex = msgs->peerSupportsPex ? 1 : 0;
790    else
791        pex = 1;
792
793    tr_bencInit( &val, TYPE_DICT );
794    tr_bencDictReserve( &val, 4 );
795    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
796    m  = tr_bencDictAdd( &val, "m" );
797    tr_bencInit( m, TYPE_DICT );
798    if( pex ) {
799        tr_bencDictReserve( m, 1 );
800        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), TR_LTEP_PEX );
801    }
802    if( port > 0 )
803        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
804    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
805    buf = tr_bencSave( &val, &len );
806
807    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
808    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
809    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
810    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
811
812    tr_peerIoWriteBuf( msgs->io, outbuf );
813
814#if 0
815    dbgmsg( msgs, "here is the ltep handshake we sent:" );
816    tr_bencPrint( &val );
817    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
818#endif
819
820    /* cleanup */
821    tr_bencFree( &val );
822    tr_free( buf );
823    evbuffer_free( outbuf );
824}
825
826static void
827parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
828{
829    tr_benc val, * sub;
830    uint8_t * tmp = tr_new( uint8_t, len );
831
832    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
833    msgs->peerSentLtepHandshake = 1;
834
835    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
836        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
837        tr_free( tmp );
838        return;
839    }
840
841#if 0
842    dbgmsg( msgs, "here is the ltep handshake we read:" );
843    tr_bencPrint( &val );
844    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
845#endif
846
847    /* does the peer prefer encrypted connections? */
848    if(( sub = tr_bencDictFindType( &val, "e", TYPE_INT )))
849        msgs->info->encryption_preference = sub->val.i
850                                      ? ENCRYPTION_PREFERENCE_YES
851                                      : ENCRYPTION_PREFERENCE_NO;
852
853    /* check supported messages for utorrent pex */
854    msgs->peerSupportsPex = 0;
855    if(( sub = tr_bencDictFindType( &val, "m", TYPE_DICT ))) {
856        if(( sub = tr_bencDictFindType( sub, "ut_pex", TYPE_INT ))) {
857            msgs->ut_pex_id = (uint8_t) sub->val.i;
858            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
859            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
860        }
861    }
862
863    /* get peer's listening port */
864    if(( sub = tr_bencDictFindType( &val, "p", TYPE_INT ))) {
865        msgs->info->port = htons( (uint16_t)sub->val.i );
866        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
867    }
868
869    tr_bencFree( &val );
870    tr_free( tmp );
871}
872
873static void
874parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
875{
876    int gotval = 0;
877    uint8_t * tmp = tr_new( uint8_t, msglen );
878    tr_benc val, *sub;
879    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
880
881    if( tr_torrentAllowsPex( msgs->torrent )
882        && (( gotval = !tr_bencLoad( tmp, msglen, &val, NULL )))
883        && (( sub = tr_bencDictFindType( &val, "added", TYPE_STR ))))
884    {
885        const int n = sub->val.s.i / 6 ;
886        tr_inf( "torrent %s got %d peers from uT pex", msgs->torrent->info.name, n );
887        tr_peerMgrAddPeers( msgs->handle->peerMgr,
888                            msgs->torrent->info.hash,
889                            TR_PEER_FROM_PEX,
890                            (uint8_t*)sub->val.s.s, n );
891    }
892
893    if( gotval )
894        tr_bencFree( &val );
895    tr_free( tmp );
896}
897
898static void
899sendPex( tr_peermsgs * msgs );
900
901static void
902parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
903{
904    uint8_t ltep_msgid;
905
906    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
907    msglen--;
908
909    if( ltep_msgid == LTEP_HANDSHAKE )
910    {
911        dbgmsg( msgs, "got ltep handshake" );
912        parseLtepHandshake( msgs, msglen, inbuf );
913        if( tr_peerIoSupportsLTEP( msgs->io ) )
914        {
915            sendLtepHandshake( msgs );
916            sendPex( msgs );
917        }
918    }
919    else if( ltep_msgid == TR_LTEP_PEX )
920    {
921        dbgmsg( msgs, "got ut pex" );
922        msgs->peerSupportsPex = 1;
923        parseUtPex( msgs, msglen, inbuf );
924    }
925    else
926    {
927        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
928        evbuffer_drain( inbuf, msglen );
929    }
930}
931
932static int
933readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
934{
935    uint32_t len;
936
937    if( inlen < sizeof(len) )
938        return READ_MORE;
939
940    tr_peerIoReadUint32( msgs->io, inbuf, &len );
941
942    if( len == 0 ) /* peer sent us a keepalive message */
943        dbgmsg( msgs, "got KeepAlive" );
944    else {
945        msgs->incoming.length = len;
946        msgs->state = AWAITING_BT_ID;
947    }
948
949    return READ_AGAIN;
950}
951
952static int
953readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
954
955static int
956readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
957{
958    uint8_t id;
959
960    if( inlen < sizeof(uint8_t) )
961        return READ_MORE;
962
963    tr_peerIoReadUint8( msgs->io, inbuf, &id );
964    msgs->incoming.id = id;
965
966    if( id==BT_PIECE )
967    {
968        msgs->state = AWAITING_BT_PIECE;
969        return READ_AGAIN;
970    }
971    else if( msgs->incoming.length != 1 )
972    {
973        msgs->state = AWAITING_BT_MESSAGE;
974        return READ_AGAIN;
975    }
976    else return readBtMessage( msgs, inbuf, inlen-1 );
977}
978
979static void
980updatePeerProgress( tr_peermsgs * msgs )
981{
982    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
983    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
984    updateInterest( msgs );
985    firePeerProgress( msgs );
986}
987
988static int
989clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
990{
991    /* We can't send a fast piece if a peer has more than MAX_FAST_ALLOWED_THRESHOLD pieces */
992    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
993        return FALSE;
994   
995    /* ...or if we don't have ourself enough pieces */
996    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
997        return FALSE;
998
999    /* Maybe a bandwidth limit ? */
1000    return TRUE;
1001}
1002
1003static void
1004peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1005{
1006    const int reqIsValid = requestIsValid( msgs, req );
1007    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1008    const int peerIsChoked = msgs->info->peerIsChoked;
1009    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1010    const int pieceIsFast = reqIsValid && tr_bitfieldHas( getPeerAllowedPieces( msgs ), req->index );
1011    const int canSendFast = clientCanSendFastBlock( msgs );
1012
1013    if( !reqIsValid ) /* bad request */
1014    {
1015        dbgmsg( msgs, "rejecting an invalid request." );
1016        sendFastReject( msgs, req->index, req->offset, req->length );
1017    }
1018    else if( !clientHasPiece ) /* we don't have it */
1019    {
1020        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1021        sendFastReject( msgs, req->index, req->offset, req->length );
1022    }
1023    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1024    {
1025        tr_peerMsgsSetChoke( msgs, 1 );
1026        sendFastReject( msgs, req->index, req->offset, req->length );
1027    }
1028    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1029    {
1030        sendFastReject( msgs, req->index, req->offset, req->length );
1031    }
1032    else /* YAY */
1033    {
1034        struct peer_request * tmp = tr_new( struct peer_request, 1 );
1035        *tmp = *req;
1036        if( peerIsFast && pieceIsFast )
1037            tr_list_append( &msgs->peerAskedForFast, tmp );
1038        else
1039            tr_list_append( &msgs->peerAskedFor, tmp );
1040    }
1041}
1042
1043static int
1044messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1045{
1046    switch( id )
1047    {
1048        case BT_CHOKE:
1049        case BT_UNCHOKE:
1050        case BT_INTERESTED:
1051        case BT_NOT_INTERESTED:
1052        case BT_HAVE_ALL:
1053        case BT_HAVE_NONE:
1054            return len==1;
1055
1056        case BT_HAVE:
1057        case BT_SUGGEST:
1058        case BT_ALLOWED_FAST:
1059            return len==5;
1060
1061        case BT_BITFIELD:
1062            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1063       
1064        case BT_REQUEST:
1065        case BT_CANCEL:
1066        case BT_REJECT:
1067            return len==13;
1068
1069        case BT_PIECE:
1070            return len>9 && len<=16393;
1071
1072        case BT_PORT:
1073            return len==3;
1074
1075        case BT_LTEP:
1076            return len >= 2;
1077
1078        default:
1079            return FALSE;
1080    }
1081}
1082
1083static int
1084clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1085
1086static void
1087clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1088{
1089    tr_torrent * tor = msgs->torrent;
1090    tor->activityDate = tr_date( );
1091    tor->downloadedCur += byteCount;
1092    msgs->peerSentPieceDataAt = time( NULL );
1093    msgs->info->pieceDataActivityDate = time( NULL );
1094    msgs->info->credit += (int)(byteCount * SWIFT_REPAYMENT_RATIO);
1095    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1096    tr_rcTransferred( tor->download, byteCount );
1097    tr_rcTransferred( tor->handle->download, byteCount );
1098    tr_statsAddDownloaded( msgs->handle, byteCount );
1099    firePieceData( msgs );
1100}
1101
1102static int
1103readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1104{
1105    struct peer_request * req = &msgs->incoming.blockReq;
1106    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1107    dbgmsg( msgs, "In readBtPiece" );
1108
1109    if( !req->length )
1110    {
1111        if( inlen < 8 )
1112            return READ_MORE;
1113
1114        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1115        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1116        req->length = msgs->incoming.length - 9;
1117        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1118        return READ_AGAIN;
1119    }
1120    else
1121    {
1122        int err;
1123
1124        /* read in another chunk of data */
1125        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1126        size_t n = MIN( nLeft, inlen );
1127        uint8_t * buf = tr_new( uint8_t, n );
1128        assert( EVBUFFER_LENGTH(inbuf) >= n );
1129        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1130        evbuffer_add( msgs->incoming.block, buf, n );
1131        clientGotBytes( msgs, n );
1132        tr_free( buf );
1133        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1134               (int)n, req->index, req->offset, req->length,
1135               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1136        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1137            return READ_MORE;
1138
1139        /* we've got the whole block ... process it */
1140        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1141
1142        /* cleanup */
1143        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1144        req->length = 0;
1145        msgs->state = AWAITING_BT_LENGTH;
1146        if( !err )
1147            return READ_AGAIN;
1148        else {
1149            fireError( msgs, err );
1150            return READ_DONE;
1151        }
1152    }
1153}
1154
1155static int
1156readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1157{
1158    uint32_t ui32;
1159    uint32_t msglen = msgs->incoming.length;
1160    const uint8_t id = msgs->incoming.id;
1161    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1162
1163    --msglen; /* id length */
1164
1165    if( inlen < msglen )
1166        return READ_MORE;
1167
1168    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1169
1170    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1171    {
1172        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1173        fireError( msgs, TR_ERROR );
1174        return READ_DONE;
1175    }
1176
1177    switch( id )
1178    {
1179        case BT_CHOKE:
1180            dbgmsg( msgs, "got Choke" );
1181            msgs->info->clientIsChoked = 1;
1182            cancelAllRequestsToPeer( msgs );
1183            cancelAllRequestsToClientExceptFast( msgs );
1184            break;
1185
1186        case BT_UNCHOKE:
1187            dbgmsg( msgs, "got Unchoke" );
1188            msgs->info->clientIsChoked = 0;
1189            fireNeedReq( msgs );
1190            break;
1191
1192        case BT_INTERESTED:
1193            dbgmsg( msgs, "got Interested" );
1194            msgs->info->peerIsInterested = 1;
1195            tr_peerMsgsSetChoke( msgs, 0 );
1196            break;
1197
1198        case BT_NOT_INTERESTED:
1199            dbgmsg( msgs, "got Not Interested" );
1200            msgs->info->peerIsInterested = 0;
1201            break;
1202           
1203        case BT_HAVE:
1204            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1205            dbgmsg( msgs, "got Have: %u", ui32 );
1206            tr_bitfieldAdd( msgs->info->have, ui32 );
1207            /* If this is a fast-allowed piece for this peer, mark it as normal now */
1208            if( msgs->clientAllowedPieces != NULL && tr_bitfieldHas( msgs->clientAllowedPieces, ui32 ) )
1209                tr_bitfieldRem( msgs->clientAllowedPieces, ui32 );
1210            updatePeerProgress( msgs );
1211            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1212            break;
1213
1214        case BT_BITFIELD: {
1215            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1216            dbgmsg( msgs, "got a bitfield" );
1217            msgs->peerSentBitfield = 1;
1218            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1219            updatePeerProgress( msgs );
1220            maybeSendFastAllowedSet( msgs );
1221            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
1222            fireNeedReq( msgs );
1223            break;
1224        }
1225
1226        case BT_REQUEST: {
1227            struct peer_request req;
1228            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1229            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1230            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1231            dbgmsg( msgs, "got Request: %u:%u->%u", req.index, req.offset, req.length );
1232            peerMadeRequest( msgs, &req );
1233            break;
1234        }
1235
1236        case BT_CANCEL: {
1237            struct peer_request req;
1238            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1239            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1240            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1241            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1242            tr_free( tr_list_remove( &msgs->peerAskedForFast, &req, compareRequest ) );
1243            tr_free( tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ) );
1244            break;
1245        }
1246
1247        case BT_PIECE:
1248            assert( 0 ); /* should be handled elsewhere! */
1249            break;
1250       
1251        case BT_PORT:
1252            dbgmsg( msgs, "Got a BT_PORT" );
1253            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1254            break;
1255           
1256        case BT_SUGGEST: {
1257            const tr_bitfield * bt = tr_cpPieceBitfield( msgs->torrent->completion );
1258            dbgmsg( msgs, "Got a BT_SUGGEST" );
1259            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1260            if( tr_bitfieldHas( bt, ui32 ) )
1261                tr_bitfieldAdd( msgs->clientSuggestedPieces, ui32 );
1262            break;
1263        }
1264           
1265        case BT_HAVE_ALL:
1266            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1267            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1268            updatePeerProgress( msgs );
1269            maybeSendFastAllowedSet( msgs );
1270            break;
1271       
1272       
1273        case BT_HAVE_NONE:
1274            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1275            tr_bitfieldClear( msgs->info->have );
1276            updatePeerProgress( msgs );
1277            maybeSendFastAllowedSet( msgs );
1278            break;
1279       
1280        case BT_REJECT: {
1281            struct peer_request req;
1282            dbgmsg( msgs, "Got a BT_REJECT" );
1283            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1284            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1285            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1286            tr_free( tr_list_remove( &msgs->clientAskedFor, &req, compareRequest ) );
1287            break;
1288        }
1289
1290        case BT_ALLOWED_FAST: {
1291            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1292            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1293            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1294            break;
1295        }
1296
1297        case BT_LTEP:
1298            dbgmsg( msgs, "Got a BT_LTEP" );
1299            parseLtep( msgs, msglen, inbuf );
1300            break;
1301
1302        default:
1303            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1304            tr_peerIoDrain( msgs->io, inbuf, msglen );
1305            break;
1306    }
1307
1308    assert( msglen + 1 == msgs->incoming.length );
1309    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1310
1311    msgs->state = AWAITING_BT_LENGTH;
1312    return READ_AGAIN;
1313}
1314
1315static void
1316peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1317{
1318    tr_torrent * tor = msgs->torrent;
1319    tor->activityDate = tr_date( );
1320    tor->uploadedCur += byteCount;
1321    msgs->clientSentPieceDataAt = time( NULL );
1322    msgs->info->pieceDataActivityDate = time( NULL );
1323    msgs->info->credit -= byteCount;
1324    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1325    tr_rcTransferred( tor->upload, byteCount );
1326    tr_rcTransferred( tor->handle->upload, byteCount );
1327    tr_statsAddUploaded( msgs->handle, byteCount );
1328    firePieceData( msgs );
1329}
1330
1331static size_t
1332getDownloadMax( const tr_peermsgs * msgs )
1333{
1334    static const size_t maxval = ~0;
1335    const tr_torrent * tor = msgs->torrent;
1336
1337    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1338        return tor->handle->useDownloadLimit
1339            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1340
1341    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1342        return tr_rcBytesLeft( tor->download );
1343
1344    return maxval;
1345}
1346
1347static void
1348reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1349{
1350    tr_torrent * tor = msgs->torrent;
1351
1352    /* increment the `corrupt' field */
1353    tor->corruptCur += byteCount;
1354
1355    /* decrement the `downloaded' field */
1356    if( tor->downloadedCur >= byteCount )
1357        tor->downloadedCur -= byteCount;
1358    else
1359        tor->downloadedCur = 0;
1360}
1361
1362
1363static void
1364gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1365{
1366    const uint32_t byteCount =
1367        tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1368    reassignBytesToCorrupt( msgs, byteCount );
1369}
1370
1371static void
1372clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1373{
1374    reassignBytesToCorrupt( msgs, req->length );
1375}
1376
1377static void
1378addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1379{
1380    if( !msgs->info->blame )
1381         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1382    tr_bitfieldAdd( msgs->info->blame, index );
1383}
1384
1385static tr_errno
1386clientGotBlock( tr_peermsgs                * msgs,
1387                const uint8_t              * data,
1388                const struct peer_request  * req )
1389{
1390    int err;
1391    tr_torrent * tor = msgs->torrent;
1392    const int block = _tr_block( tor, req->index, req->offset );
1393    struct peer_request *myreq;
1394
1395    assert( msgs != NULL );
1396    assert( req != NULL );
1397
1398    if( req->length != (uint32_t)tr_torBlockCountBytes( msgs->torrent, block ) )
1399    {
1400        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1401                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1402        return TR_ERROR;
1403    }
1404
1405    /* save the block */
1406    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1407
1408    /**
1409    *** Remove the block from our `we asked for this' list
1410    **/
1411
1412    myreq = tr_list_remove( &msgs->clientAskedFor, req, compareRequest );
1413    if( myreq == NULL ) {
1414        clientGotUnwantedBlock( msgs, req );
1415        dbgmsg( msgs, "we didn't ask for this message..." );
1416        return 0;
1417    }
1418
1419    dbgmsg( msgs, "got block %u:%u->%u (turnaround time %d secs)", 
1420                  myreq->index, myreq->offset, myreq->length,
1421                  (int)(time(NULL) - myreq->time_requested) );
1422    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1423                  tr_list_size(msgs->clientAskedFor));
1424
1425    tr_free( myreq );
1426    myreq = NULL;
1427
1428    /**
1429    *** Error checks
1430    **/
1431
1432    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1433        dbgmsg( msgs, "we have this block already..." );
1434        clientGotUnwantedBlock( msgs, req );
1435        return 0;
1436    }
1437
1438    /**
1439    ***  Save the block
1440    **/
1441
1442    msgs->info->peerSentPieceDataAt = time( NULL );
1443    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1444        return err;
1445
1446    tr_cpBlockAdd( tor->completion, block );
1447
1448    addPeerToBlamefield( msgs, req->index );
1449
1450    fireGotBlock( msgs, req );
1451
1452    /**
1453    ***  Handle if this was the last block in the piece
1454    **/
1455
1456    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1457    {
1458        const tr_errno err = tr_ioTestPiece( tor, req->index );
1459
1460        tr_torrentSetHasPiece( tor, req->index, !err );
1461        tr_torrentSetPieceChecked( tor, req->index, TRUE );
1462        tr_peerMgrSetBlame( tor->handle->peerMgr, tor->info.hash, req->index, !err );
1463
1464        if( !err )
1465            fireClientHave( msgs, req->index );
1466        else
1467            gotBadPiece( msgs, req->index );
1468    }
1469
1470    return 0;
1471}
1472
1473static void
1474didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1475{
1476    pulse( vmsgs );
1477}
1478
1479static ReadState
1480canRead( struct bufferevent * evin, void * vmsgs )
1481{
1482    ReadState ret;
1483    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1484    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1485    const size_t buflen = EVBUFFER_LENGTH( inbuf );
1486    const size_t downloadMax = getDownloadMax( msgs );
1487    const size_t n = MIN( buflen, downloadMax );
1488
1489    if( !n )
1490    {
1491        ret = READ_DONE;
1492    }
1493    else switch( msgs->state )
1494    {
1495        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, inbuf, n ); break;
1496        case AWAITING_BT_ID:      ret = readBtId     ( msgs, inbuf, n ); break;
1497        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, inbuf, n ); break;
1498        case AWAITING_BT_PIECE:   ret = readBtPiece  ( msgs, inbuf, n ); break;
1499        default: assert( 0 );
1500    }
1501
1502    return ret;
1503}
1504
1505static void
1506sendKeepalive( tr_peermsgs * msgs )
1507{
1508    dbgmsg( msgs, "sending a keepalive message" );
1509    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1510}
1511
1512/**
1513***
1514**/
1515
1516static int
1517isSwiftEnabled( const tr_peermsgs * msgs )
1518{
1519    /* rationale: SWIFT is good for getting rid of deadbeats, but most
1520     * private trackers have ratios where you _want_ to feed deadbeats
1521     * as much as possible.  So we disable SWIFT on private torrents */
1522    return SWIFT_ENABLED
1523        && !tr_torrentIsSeed( msgs->torrent )
1524        && !tr_torrentIsPrivate( msgs->torrent );
1525}
1526
1527static size_t
1528getUploadMax( const tr_peermsgs * msgs )
1529{
1530    static const size_t maxval = ~0;
1531    const tr_torrent * tor = msgs->torrent;
1532    const int useSwift = isSwiftEnabled( msgs );
1533    const size_t swiftLeft = msgs->info->credit;
1534    size_t speedLeft;
1535    size_t bufLeft;
1536    size_t ret;
1537
1538    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1539        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1540    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1541        speedLeft = tr_rcBytesLeft( tor->upload );
1542    else
1543        speedLeft = ~0;
1544
1545    bufLeft = MAX_OUTBUF_SIZE - tr_peerIoWriteBytesWaiting( msgs->io );
1546    ret = MIN( speedLeft, bufLeft );
1547    if( useSwift)
1548        ret = MIN( ret, swiftLeft );
1549    return ret;
1550}
1551
1552static int
1553ratePulse( void * vmsgs )
1554{
1555    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1556    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1557    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1558    msgs->maxActiveRequests = MIN( 4 + (int)(msgs->info->rateToClient/4), 100 );
1559    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1560    return TRUE;
1561}
1562
1563static struct peer_request*
1564popNextRequest( tr_peermsgs * msgs )
1565{
1566    struct peer_request * ret;
1567    ret = tr_list_pop_front( &msgs->peerAskedForFast );
1568    if( !ret )
1569        ret = tr_list_pop_front( &msgs->peerAskedFor);
1570    return ret;
1571}
1572
1573static int
1574pulse( void * vmsgs )
1575{
1576    const time_t now = time( NULL );
1577    tr_peermsgs * msgs = vmsgs;
1578    struct peer_request * r;
1579
1580    tr_peerIoTryRead( msgs->io );
1581    pumpRequestQueue( msgs );
1582    expireOldRequests( msgs );
1583
1584    if( msgs->sendingBlock )
1585    {
1586        const size_t uploadMax = getUploadMax( msgs );
1587        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1588        const size_t outlen = MIN( len, uploadMax );
1589
1590        assert( len );
1591
1592        if( outlen )
1593        {
1594            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1595            evbuffer_drain( msgs->outBlock, outlen );
1596            peerGotBytes( msgs, outlen );
1597
1598            len -= outlen;
1599            msgs->clientSentAnythingAt = now;
1600            msgs->sendingBlock = len!=0;
1601
1602            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1603        }
1604        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1605    }
1606
1607    if( !msgs->sendingBlock )
1608    {
1609        if(( EVBUFFER_LENGTH( msgs->outMessages ) ))
1610        {
1611            dbgmsg( msgs, "flushing outMessages..." );
1612            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1613            msgs->clientSentAnythingAt = now;
1614        }
1615        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1616            && (( r = popNextRequest( msgs )))
1617            && requestIsValid( msgs, r )
1618            && tr_cpPieceIsComplete( msgs->torrent->completion, r->index ) )
1619        {
1620            uint8_t * buf = tr_new( uint8_t, r->length );
1621
1622            if( !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1623            {
1624                tr_peerIo * io = msgs->io;
1625                struct evbuffer * out = msgs->outBlock;
1626
1627                dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
1628                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
1629                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1630                tr_peerIoWriteUint32( io, out, r->index );
1631                tr_peerIoWriteUint32( io, out, r->offset );
1632                tr_peerIoWriteBytes ( io, out, buf, r->length );
1633                msgs->sendingBlock = 1;
1634            }
1635
1636            tr_free( buf );
1637            tr_free( r );
1638        }
1639        else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1640        {
1641            sendKeepalive( msgs );
1642        }
1643    }
1644
1645    return TRUE; /* loop forever */
1646}
1647
1648static void
1649gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1650{
1651    if( what & EVBUFFER_TIMEOUT )
1652        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1653    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1654        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1655                what, errno, tr_strerror(errno) );
1656    fireError( vmsgs, TR_ERROR );
1657}
1658
1659static void
1660sendBitfield( tr_peermsgs * msgs )
1661{
1662    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1663    struct evbuffer * out = msgs->outMessages;
1664
1665    dbgmsg( msgs, "sending peer a bitfield message" );
1666    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1667    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1668    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1669}
1670
1671/**
1672***
1673**/
1674
1675/* some peers give us error messages if we send
1676   more than this many peers in a single pex message */
1677#define MAX_PEX_DIFFS 200
1678
1679typedef struct
1680{
1681    tr_pex * added;
1682    tr_pex * dropped;
1683    tr_pex * elements;
1684    int addedCount;
1685    int droppedCount;
1686    int elementCount;
1687    int diffCount;
1688}
1689PexDiffs;
1690
1691static void
1692pexAddedCb( void * vpex, void * userData )
1693{
1694    PexDiffs * diffs = (PexDiffs *) userData;
1695    tr_pex * pex = (tr_pex *) vpex;
1696    if( diffs->diffCount < MAX_PEX_DIFFS )
1697    {
1698        diffs->diffCount++;
1699        diffs->added[diffs->addedCount++] = *pex;
1700        diffs->elements[diffs->elementCount++] = *pex;
1701    }
1702}
1703
1704static void
1705pexRemovedCb( void * vpex, void * userData )
1706{
1707    PexDiffs * diffs = (PexDiffs *) userData;
1708    tr_pex * pex = (tr_pex *) vpex;
1709    if( diffs->diffCount < MAX_PEX_DIFFS )
1710    {
1711        diffs->diffCount++;
1712        diffs->dropped[diffs->droppedCount++] = *pex;
1713    }
1714}
1715
1716static void
1717pexElementCb( void * vpex, void * userData )
1718{
1719    PexDiffs * diffs = (PexDiffs *) userData;
1720    tr_pex * pex = (tr_pex *) vpex;
1721    if( diffs->diffCount < MAX_PEX_DIFFS )
1722    {
1723        diffs->diffCount++;
1724        diffs->elements[diffs->elementCount++] = *pex;
1725    }
1726}
1727
1728static void
1729sendPex( tr_peermsgs * msgs )
1730{
1731    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1732    {
1733        int i;
1734        tr_pex * newPex = NULL;
1735        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1736        PexDiffs diffs;
1737        tr_benc val, *added, *dropped, *flags;
1738        uint8_t *tmp, *walk;
1739        char * benc;
1740        int bencLen;
1741
1742        /* build the diffs */
1743        diffs.added = tr_new( tr_pex, newCount );
1744        diffs.addedCount = 0;
1745        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1746        diffs.droppedCount = 0;
1747        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1748        diffs.elementCount = 0;
1749        diffs.diffCount = 0;
1750        tr_set_compare( msgs->pex, msgs->pexCount,
1751                        newPex, newCount,
1752                        tr_pexCompare, sizeof(tr_pex),
1753                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1754        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1755
1756        /* update peer */
1757        tr_free( msgs->pex );
1758        msgs->pex = diffs.elements;
1759        msgs->pexCount = diffs.elementCount;
1760
1761        /* build the pex payload */
1762        tr_bencInit( &val, TYPE_DICT );
1763        tr_bencDictReserve( &val, 3 );
1764
1765        /* "added" */
1766        added = tr_bencDictAdd( &val, "added" );
1767        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1768        for( i=0; i<diffs.addedCount; ++i ) {
1769            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1770            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1771        }
1772        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1773        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1774
1775        /* "added.f" */
1776        flags = tr_bencDictAdd( &val, "added.f" );
1777        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1778        for( i=0; i<diffs.addedCount; ++i )
1779            *walk++ = diffs.added[i].flags;
1780        assert( ( walk - tmp ) == diffs.addedCount );
1781        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1782
1783        /* "dropped" */
1784        dropped = tr_bencDictAdd( &val, "dropped" );
1785        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1786        for( i=0; i<diffs.droppedCount; ++i ) {
1787            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1788            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1789        }
1790        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1791        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1792
1793        /* write the pex message */
1794        benc = tr_bencSave( &val, &bencLen );
1795        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1796        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1797        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, msgs->ut_pex_id );
1798        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1799
1800        /* cleanup */
1801        tr_free( benc );
1802        tr_bencFree( &val );
1803        tr_free( diffs.added );
1804        tr_free( diffs.dropped );
1805        tr_free( newPex );
1806
1807        msgs->clientSentPexAt = time( NULL );
1808    }
1809}
1810
1811static int
1812pexPulse( void * vpeer )
1813{
1814    sendPex( vpeer );
1815    return TRUE;
1816}
1817
1818/**
1819***
1820**/
1821
1822tr_peermsgs*
1823tr_peerMsgsNew( struct tr_torrent * torrent,
1824                struct tr_peer    * info,
1825                tr_delivery_func    func,
1826                void              * userData,
1827                tr_publisher_tag  * setme )
1828{
1829    tr_peermsgs * m;
1830
1831    assert( info != NULL );
1832    assert( info->io != NULL );
1833
1834    m = tr_new0( tr_peermsgs, 1 );
1835    m->publisher = tr_publisherNew( );
1836    m->info = info;
1837    m->handle = torrent->handle;
1838    m->torrent = torrent;
1839    m->io = info->io;
1840    m->info->clientIsChoked = 1;
1841    m->info->peerIsChoked = 1;
1842    m->info->clientIsInterested = 0;
1843    m->info->peerIsInterested = 0;
1844    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1845    m->state = AWAITING_BT_LENGTH;
1846    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1847    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1848    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1849    m->outMessages = evbuffer_new( );
1850    m->incoming.block = evbuffer_new( );
1851    m->outBlock = evbuffer_new( );
1852    m->peerAllowedPieces = NULL;
1853    m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1854    m->clientSuggestedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1855    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1856
1857    if ( tr_peerIoSupportsLTEP( m->io ) )
1858        sendLtepHandshake( m );
1859
1860    /* bitfield/have-all/have-none must preceed other non-handshake messages... */
1861    if ( !tr_peerIoSupportsFEXT( m->io ) )
1862        sendBitfield( m );
1863    else {
1864        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1865        float completion = tr_cpPercentComplete( m->torrent->completion );
1866        if ( completion == 0.0f ) {
1867            sendFastHave( m, 0 );
1868        } else if ( completion == 1.0f ) {
1869            sendFastHave( m, 1 );
1870        } else {
1871            sendBitfield( m );
1872        }
1873    }
1874   
1875    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1876    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1877    ratePulse( m );
1878
1879    return m;
1880}
1881
1882void
1883tr_peerMsgsFree( tr_peermsgs* msgs )
1884{
1885    if( msgs != NULL )
1886    {
1887        tr_timerFree( &msgs->pulseTimer );
1888        tr_timerFree( &msgs->rateTimer );
1889        tr_timerFree( &msgs->pexTimer );
1890        tr_publisherFree( &msgs->publisher );
1891        tr_list_free( &msgs->clientWillAskFor, tr_free );
1892        tr_list_free( &msgs->clientAskedFor, tr_free );
1893        tr_list_free( &msgs->peerAskedForFast, tr_free );
1894        tr_list_free( &msgs->peerAskedFor, tr_free );
1895        tr_bitfieldFree( msgs->peerAllowedPieces );
1896        tr_bitfieldFree( msgs->clientAllowedPieces );
1897        tr_bitfieldFree( msgs->clientSuggestedPieces );
1898        evbuffer_free( msgs->incoming.block );
1899        evbuffer_free( msgs->outMessages );
1900        evbuffer_free( msgs->outBlock );
1901        tr_free( msgs->pex );
1902
1903        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1904        tr_free( msgs );
1905    }
1906}
1907
1908tr_publisher_tag
1909tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1910                      tr_delivery_func    func,
1911                      void              * userData )
1912{
1913    return tr_publisherSubscribe( peer->publisher, func, userData );
1914}
1915
1916void
1917tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1918                        tr_publisher_tag    tag )
1919{
1920    tr_publisherUnsubscribe( peer->publisher, tag );
1921}
1922
1923int
1924tr_peerMsgsIsPieceFastAllowed( const tr_peermsgs * peer,
1925                               uint32_t            index )
1926{
1927    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1928}
1929
1930int
1931tr_peerMsgsIsPieceSuggested( const tr_peermsgs * peer,
1932                             uint32_t            index )
1933{
1934    return tr_bitfieldHas( peer->clientSuggestedPieces, index );
1935}
1936
Note: See TracBrowser for help on using the repository browser.