source: trunk/libtransmission/peer-msgs.c @ 3295

Last change on this file since 3295 was 3295, checked in by charles, 15 years ago
  • add a per-peer request queue to hold the next 10-15 seconds' worth of requests so that we always have more requests at hand when the current requests start to run low.
  • increase the tracker `numwant' variable to grow our peer pool
  • bugfixes in cancelling requests.
  • make the debug log sexy and readable like uTorrent's ;)
  • Property svn:keywords set to Date Rev Author Id
File size: 49.6 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3295 2007-10-06 18:20:52Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <arpa/inet.h>
20
21#include <sys/types.h> /* event.h needs this */
22#include <event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "completion.h"
27#include "inout.h"
28#include "list.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ratecontrol.h"
34#include "trevent.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41#define MAX_ALLOWED_SET_COUNT   10 /* number of pieces generated for allow-fast,
42                                    threshold for fast-allowing others */
43
44enum
45{
46    BT_CHOKE                = 0,
47    BT_UNCHOKE              = 1,
48    BT_INTERESTED           = 2,
49    BT_NOT_INTERESTED       = 3,
50    BT_HAVE                 = 4,
51    BT_BITFIELD             = 5,
52    BT_REQUEST              = 6,
53    BT_PIECE                = 7,
54    BT_CANCEL               = 8,
55    BT_PORT                 = 9,
56    BT_SUGGEST              = 13,
57    BT_HAVE_ALL             = 14,
58    BT_HAVE_NONE            = 15,
59    BT_REJECT               = 16,
60    BT_ALLOWED_FAST         = 17,
61    BT_LTEP                 = 20,
62
63    LTEP_HANDSHAKE          = 0,
64
65    OUR_LTEP_PEX            = 1,
66
67    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
68
69    KEEPALIVE_INTERVAL_SECS = 90,          /* idle seconds before we send a keepalive */
70    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
71    PEER_PULSE_INTERVAL     = (66),        /* msec between calls to pulse() */
72    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
73};
74
75enum
76{
77    AWAITING_BT_LENGTH,
78    AWAITING_BT_MESSAGE,
79    READING_BT_PIECE
80};
81
82struct peer_request
83{
84    uint32_t index;
85    uint32_t offset;
86    uint32_t length;
87    time_t time_requested;
88};
89
90static int
91compareRequest( const void * va, const void * vb )
92{
93    struct peer_request * a = (struct peer_request*) va;
94    struct peer_request * b = (struct peer_request*) vb;
95    if( a->index != b->index ) return a->index - b->index;
96    if( a->offset != b->offset ) return a->offset - b->offset;
97    if( a->length != b->length ) return a->length - b->length;
98    return 0;
99}
100
101struct tr_peermsgs
102{
103    tr_peer * info;
104
105    tr_handle * handle;
106    tr_torrent * torrent;
107    tr_peerIo * io;
108
109    tr_publisher_t * publisher;
110
111    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
112    struct evbuffer * inBlock;     /* the block we're currently receiving */
113    tr_list * peerAskedFor;
114    tr_list * clientAskedFor;
115    tr_list * clientWillAskFor;
116
117    tr_timer * rateTimer;
118    tr_timer * pulseTimer;
119    tr_timer * pexTimer;
120
121    struct peer_request blockToUs; /* the block currntly being sent to us */
122
123    time_t lastReqAddedAt;
124    time_t clientSentPexAt;
125    time_t clientSentAnythingAt;
126
127    unsigned int notListening             : 1;
128    unsigned int peerSupportsPex          : 1;
129    unsigned int clientSentLtepHandshake  : 1;
130    unsigned int peerSentLtepHandshake    : 1;
131   
132    tr_bitfield * clientAllowedPieces;
133    tr_bitfield * peerAllowedPieces;
134   
135    uint8_t state;
136    uint8_t ut_pex_id;
137    uint16_t pexCount;
138    uint32_t incomingMessageLength;
139    uint32_t maxActiveRequests;
140    uint32_t minActiveRequests;
141
142    tr_pex * pex;
143};
144
145/**
146***
147**/
148
149static void
150myDebug( const char * file, int line,
151         const struct tr_peermsgs * msgs,
152         const char * fmt, ... )
153{
154    FILE * fp = tr_getLog( );
155    if( fp != NULL )
156    {
157        va_list args;
158        struct evbuffer * buf = evbuffer_new( );
159        char timestr[64];
160        evbuffer_add_printf( buf, "[%s] %s [%s]: ",
161                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
162                             tr_peerIoGetAddrStr( msgs->io ),
163                             msgs->info->client );
164        va_start( args, fmt );
165        evbuffer_add_vprintf( buf, fmt, args );
166        va_end( args );
167        evbuffer_add_printf( buf, " (%s:%d)\n", file, line );
168
169        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
170        evbuffer_free( buf );
171    }
172}
173
174#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
175
176/**
177***
178**/
179
180static void
181protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
182{
183    tr_peerIo * io = msgs->io;
184    struct evbuffer * out = msgs->outMessages;
185
186    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
187    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
188    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
189    tr_peerIoWriteUint32( io, out, req->index );
190    tr_peerIoWriteUint32( io, out, req->offset );
191    tr_peerIoWriteUint32( io, out, req->length );
192}
193
194static void
195protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
196{
197    tr_peerIo * io = msgs->io;
198    struct evbuffer * out = msgs->outMessages;
199
200    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
201    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
202    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
203    tr_peerIoWriteUint32( io, out, req->index );
204    tr_peerIoWriteUint32( io, out, req->offset );
205    tr_peerIoWriteUint32( io, out, req->length );
206}
207
208static void
209protocolSendHave( tr_peermsgs * msgs, uint32_t index )
210{
211    tr_peerIo * io = msgs->io;
212    struct evbuffer * out = msgs->outMessages;
213
214    dbgmsg( msgs, "sending Have %u", index );
215    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
216    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
217    tr_peerIoWriteUint32( io, out, index );
218}
219
220static void
221protocolSendChoke( tr_peermsgs * msgs, int choke )
222{
223    tr_peerIo * io = msgs->io;
224    struct evbuffer * out = msgs->outMessages;
225
226    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
227    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
228    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
229}
230
231/**
232***  EVENTS
233**/
234
235static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
236
237static void
238publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
239{
240    tr_publisherPublish( msgs->publisher, msgs->info, e );
241}
242
243static void
244fireGotError( tr_peermsgs * msgs )
245{
246    tr_peermsgs_event e = blankEvent;
247    e.eventType = TR_PEERMSG_GOT_ERROR;
248    publish( msgs, &e );
249}
250
251static void
252fireNeedReq( tr_peermsgs * msgs )
253{
254    tr_peermsgs_event e = blankEvent;
255    e.eventType = TR_PEERMSG_NEED_REQ;
256    dbgmsg( msgs, "firing NEED_REQ" );
257    publish( msgs, &e );
258}
259
260static void
261firePeerProgress( tr_peermsgs * msgs )
262{
263    tr_peermsgs_event e = blankEvent;
264    e.eventType = TR_PEERMSG_PEER_PROGRESS;
265    e.progress = msgs->info->progress;
266    publish( msgs, &e );
267}
268
269static void
270fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
271{
272    tr_peermsgs_event e = blankEvent;
273    e.eventType = TR_PEERMSG_CLIENT_HAVE;
274    e.pieceIndex = pieceIndex;
275    publish( msgs, &e );
276}
277
278static void
279fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t length )
280{
281    tr_peermsgs_event e = blankEvent;
282    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
283    e.pieceIndex = pieceIndex;
284    e.offset = offset;
285    e.length = length;
286    publish( msgs, &e );
287}
288
289static void
290fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
291{
292    tr_peermsgs_event e = blankEvent;
293    e.eventType = TR_PEERMSG_CANCEL;
294    e.pieceIndex = req->index;
295    e.offset = req->offset;
296    e.length = req->length;
297    publish( msgs, &e );
298}
299
300/**
301***  INTEREST
302**/
303
304static int
305isPieceInteresting( const tr_peermsgs   * peer,
306                    int                   piece )
307{
308    const tr_torrent * torrent = peer->torrent;
309    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
310        return FALSE;
311    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
312        return FALSE;
313    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
314        return FALSE;
315    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
316        return FALSE;
317    return TRUE;
318}
319
320/* "interested" means we'll ask for piece data if they unchoke us */
321static int
322isPeerInteresting( const tr_peermsgs * msgs )
323{
324    int i;
325    const tr_torrent * torrent;
326    const tr_bitfield * bitfield;
327    const int clientIsSeed =
328        tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
329
330    if( clientIsSeed )
331        return FALSE;
332
333    torrent = msgs->torrent;
334    bitfield = tr_cpPieceBitfield( torrent->completion );
335
336    if( !msgs->info->have )
337        return TRUE;
338
339    assert( bitfield->len == msgs->info->have->len );
340    for( i=0; i<torrent->info.pieceCount; ++i )
341        if( isPieceInteresting( msgs, i ) )
342            return TRUE;
343
344    return FALSE;
345}
346
347static void
348sendInterest( tr_peermsgs * msgs, int weAreInterested )
349{
350    assert( msgs != NULL );
351    assert( weAreInterested==0 || weAreInterested==1 );
352
353    msgs->info->clientIsInterested = weAreInterested;
354    dbgmsg( msgs, "Sending %s",
355            weAreInterested ? "Interested" : "Not Interested");
356
357    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
358    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
359                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
360}
361
362static void
363updateInterest( tr_peermsgs * msgs )
364{
365    const int i = isPeerInteresting( msgs );
366    if( i != msgs->info->clientIsInterested )
367        sendInterest( msgs, i );
368    if( i )
369        fireNeedReq( msgs );
370}
371
372void
373tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
374{
375    assert( msgs != NULL );
376    assert( msgs->info != NULL );
377    assert( choke==0 || choke==1 );
378
379    if( msgs->info->peerIsChoked != choke )
380    {
381        msgs->info->peerIsChoked = choke;
382        tr_list * walk;
383       
384        if( choke )
385            for( walk = msgs->peerAskedFor; walk != NULL; )
386            {
387                tr_list * next = walk->next;
388                /* don't reject a peer's fast allowed requests at choke */
389                struct peer_request *req = walk->data;
390                if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
391                {
392                    tr_list_remove_data( &msgs->peerAskedFor, req );
393                    tr_free( req );
394                }
395                walk = next;
396            }
397
398        protocolSendChoke( msgs, choke );
399        msgs->info->chokeChangedAt = time( NULL );
400    }
401}
402
403/**
404***
405**/
406
407void
408tr_peerMsgsHave( tr_peermsgs * msgs,
409                 uint32_t      index )
410{
411    protocolSendHave( msgs, index );
412
413    /* since we have more pieces now, we might not be interested in this peer */
414    updateInterest( msgs );
415}
416#if 0
417static void
418sendFastSuggest( tr_peermsgs * msgs,
419                 uint32_t      pieceIndex )
420{
421    dbgmsg( msgs, "w00t SUGGESTing them piece #%d", pieceIndex );
422    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
423    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
424    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
425   
426    updateInterest( msgs );
427}
428#endif
429static void
430sendFastHave( tr_peermsgs * msgs,
431              int           all)
432{
433    dbgmsg( msgs, "w00t telling them we %s pieces", (all ? "HAVE_ALL" : "HAVE_NONE" ) );
434    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
435    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL : BT_HAVE_NONE ) );
436   
437    updateInterest( msgs );
438}
439
440static void
441sendFastReject( tr_peermsgs * msgs,
442                uint32_t      pieceIndex,
443                uint32_t      offset,
444                uint32_t      length )
445{
446    assert( msgs != NULL );
447    assert( length > 0 );
448   
449    /* reject the request */
450    const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
451    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
452    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
453    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
454    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
455    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
456}
457
458static void
459sendFastAllowed( tr_peermsgs * msgs,
460                 uint32_t      pieceIndex)
461{
462    dbgmsg( msgs, "w00t telling them we ALLOW_FAST piece #%d", pieceIndex );
463    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
464    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
465    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
466}
467
468
469
470static void
471sendFastAllowedSet( tr_peermsgs * msgs )
472{
473    int i = 0;
474    while (i <= msgs->torrent->info.pieceCount )
475    {
476        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
477            sendFastAllowed( msgs, i );
478        i++;
479    }
480}
481
482
483/**
484***
485**/
486
487static int
488reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length )
489{
490    const tr_torrent * tor = msgs->torrent;
491
492    if( index >= (uint32_t) tor->info.pieceCount )
493        return FALSE;
494    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
495        return FALSE;
496    if( length > MAX_REQUEST_BYTE_COUNT )
497        return FALSE;
498    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
499        return FALSE;
500
501    return TRUE;
502}
503
504static int
505requestIsValid( const tr_peermsgs * msgs, struct peer_request * req )
506{
507    return reqIsValid( msgs, req->index, req->offset, req->length );
508}
509
510static void
511pumpRequestQueue( tr_peermsgs * msgs )
512{
513    const int max = msgs->maxActiveRequests;
514    const int min = msgs->minActiveRequests;
515    int count = tr_list_size( msgs->clientAskedFor );
516    int sent = 0;
517
518    if( count > min )
519        return;
520    if( msgs->info->clientIsChoked )
521        return;
522
523    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
524    {
525        struct peer_request * req = tr_list_pop_front( &msgs->clientWillAskFor );
526        protocolSendRequest( msgs, req );
527        req->time_requested = msgs->lastReqAddedAt = time( NULL );
528        tr_list_append( &msgs->clientAskedFor, req );
529        ++count;
530        ++sent;
531    }
532
533    if( sent )
534        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
535                sent,
536                tr_list_size(msgs->clientAskedFor),
537                tr_list_size(msgs->clientWillAskFor) );
538
539    if( count < max )
540        fireNeedReq( msgs );
541}
542
543int
544tr_peerMsgsAddRequest( tr_peermsgs * msgs,
545                       uint32_t      index, 
546                       uint32_t      offset, 
547                       uint32_t      length )
548{
549    const int req_max = msgs->maxActiveRequests;
550    struct peer_request tmp, *req;
551
552    assert( msgs != NULL );
553    assert( msgs->torrent != NULL );
554    assert( reqIsValid( msgs, index, offset, length ) );
555
556    /**
557    ***  Reasons to decline the request
558    **/
559
560    /* don't send requests to choked clients */
561    if( msgs->info->clientIsChoked ) {
562        dbgmsg( msgs, "declining request because they're choking us" );
563        return TR_ADDREQ_CLIENT_CHOKED;
564    }
565
566    /* peer doesn't have this piece */
567    if( !tr_bitfieldHas( msgs->info->have, index ) )
568        return TR_ADDREQ_MISSING;
569
570    /* peer's queue is full */
571    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
572        dbgmsg( msgs, "declining request because we're full" );
573        return TR_ADDREQ_FULL;
574    }
575
576    /* have we already asked for this piece? */
577    tmp.index = index;
578    tmp.offset = offset;
579    tmp.length = length;
580    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
581        dbgmsg( msgs, "declining because it's a duplicate" );
582        return TR_ADDREQ_DUPLICATE;
583    }
584    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
585        dbgmsg( msgs, "declining because it's a duplicate" );
586        return TR_ADDREQ_DUPLICATE;
587    }
588
589    /**
590    ***  Accept this request
591    **/
592
593    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
594    req = tr_new0( struct peer_request, 1 );
595    *req = tmp;
596    tr_list_append( &msgs->clientWillAskFor, req );
597    return TR_ADDREQ_OK;
598}
599
600static void
601tr_peerMsgsCancelAllRequests( tr_peermsgs * msgs )
602{
603    struct peer_request * req;
604
605    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
606    {
607        fireCancelledReq( msgs, req );
608        tr_free( req );
609    }
610
611    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
612    {
613        fireCancelledReq( msgs, req );
614        protocolSendCancel( msgs, req );
615        tr_free( req );
616    }
617}
618
619void
620tr_peerMsgsCancel( tr_peermsgs * msgs,
621                   uint32_t      pieceIndex,
622                   uint32_t      offset,
623                   uint32_t      length )
624{
625    struct peer_request *req, tmp;
626
627    assert( msgs != NULL );
628    assert( length > 0 );
629
630    /* have we asked the peer for this piece? */
631    tmp.index = pieceIndex;
632    tmp.offset = offset;
633    tmp.length = length;
634
635    /* if it's only in the queue and hasn't been sent yet, free it */
636    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
637    {
638        fireCancelledReq( msgs, req );
639        tr_free( req );
640    }
641
642    /* if it's already been sent, send a cancel message too */
643    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
644    {
645        protocolSendCancel( msgs, req );
646        fireCancelledReq( msgs, req );
647        tr_free( req );
648    }
649}
650
651/**
652***
653**/
654
655static void
656sendLtepHandshake( tr_peermsgs * msgs )
657{
658    benc_val_t val, *m;
659    char * buf;
660    int len;
661    int pex;
662    const char * v = TR_NAME " " USERAGENT_PREFIX;
663    const int port = tr_getPublicPort( msgs->handle );
664    struct evbuffer * outbuf;
665
666    if( msgs->clientSentLtepHandshake )
667        return;
668
669    outbuf = evbuffer_new( );
670    dbgmsg( msgs, "sending an ltep handshake" );
671    msgs->clientSentLtepHandshake = 1;
672
673    /* decide if we want to advertise pex support */
674    if( msgs->torrent->pexDisabled )
675        pex = 0;
676    else if( msgs->peerSentLtepHandshake )
677        pex = msgs->peerSupportsPex ? 1 : 0;
678    else
679        pex = 1;
680
681    tr_bencInit( &val, TYPE_DICT );
682    tr_bencDictReserve( &val, 4 );
683    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
684    m  = tr_bencDictAdd( &val, "m" );
685    tr_bencInit( m, TYPE_DICT );
686    if( pex ) {
687        tr_bencDictReserve( m, 1 );
688        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
689    }
690    if( port > 0 )
691        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
692    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
693    buf = tr_bencSaveMalloc( &val,  &len );
694
695    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
696    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
697    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
698    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
699
700    tr_peerIoWriteBuf( msgs->io, outbuf );
701
702#if 0
703    dbgmsg( msgs, "here is the ltep handshake we sent:" );
704    tr_bencPrint( &val );
705#endif
706
707    /* cleanup */
708    tr_bencFree( &val );
709    tr_free( buf );
710    evbuffer_free( outbuf );
711}
712
713static void
714parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
715{
716    benc_val_t val, * sub;
717    uint8_t * tmp = tr_new( uint8_t, len );
718
719    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
720    msgs->peerSentLtepHandshake = 1;
721
722    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
723        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
724        tr_free( tmp );
725        return;
726    }
727
728#if 0
729    dbgmsg( msgs, "here is the ltep handshake we read:" );
730    tr_bencPrint( &val );
731#endif
732
733    /* does the peer prefer encrypted connections? */
734    sub = tr_bencDictFind( &val, "e" );
735    if( tr_bencIsInt( sub ) )
736        msgs->info->encryption_preference = sub->val.i
737                                      ? ENCRYPTION_PREFERENCE_YES
738                                      : ENCRYPTION_PREFERENCE_NO;
739
740    /* check supported messages for utorrent pex */
741    sub = tr_bencDictFind( &val, "m" );
742    if( tr_bencIsDict( sub ) ) {
743        sub = tr_bencDictFind( sub, "ut_pex" );
744        if( tr_bencIsInt( sub ) ) {
745            msgs->peerSupportsPex = 1;
746            msgs->ut_pex_id = (uint8_t) sub->val.i;
747            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
748        }
749    }
750
751    /* get peer's listening port */
752    sub = tr_bencDictFind( &val, "p" );
753    if( tr_bencIsInt( sub ) ) {
754        msgs->info->port = htons( (uint16_t)sub->val.i );
755        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
756    }
757
758    tr_bencFree( &val );
759    tr_free( tmp );
760}
761
762static void
763parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
764{
765    benc_val_t val, * sub;
766    uint8_t * tmp;
767
768    if( msgs->torrent->pexDisabled ) /* no sharing! */
769        return;
770
771    tmp = tr_new( uint8_t, msglen );
772    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
773
774    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
775        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
776        tr_free( tmp );
777        return;
778    }
779
780    sub = tr_bencDictFind( &val, "added" );
781    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
782        const int n = sub->val.s.i / 6 ;
783        dbgmsg( msgs, "got %d peers from uT pex", n );
784        tr_peerMgrAddPeers( msgs->handle->peerMgr,
785                            msgs->torrent->info.hash,
786                            TR_PEER_FROM_PEX,
787                            (uint8_t*)sub->val.s.s, n );
788    }
789
790    tr_bencFree( &val );
791    tr_free( tmp );
792}
793
794static void
795sendPex( tr_peermsgs * msgs );
796
797static void
798parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
799{
800    uint8_t ltep_msgid;
801
802    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
803    msglen--;
804
805    if( ltep_msgid == LTEP_HANDSHAKE )
806    {
807        dbgmsg( msgs, "got ltep handshake" );
808        parseLtepHandshake( msgs, msglen, inbuf );
809        sendLtepHandshake( msgs );
810        sendPex( msgs );
811    }
812    else if( ltep_msgid == msgs->ut_pex_id )
813    {
814        dbgmsg( msgs, "got ut pex" );
815        msgs->peerSupportsPex = 1;
816        parseUtPex( msgs, msglen, inbuf );
817    }
818    else
819    {
820        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
821        evbuffer_drain( inbuf, msglen );
822    }
823}
824
825static int
826readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
827{
828    uint32_t len;
829    const size_t needlen = sizeof(uint32_t);
830
831    if( EVBUFFER_LENGTH(inbuf) < needlen )
832        return READ_MORE;
833
834    tr_peerIoReadUint32( msgs->io, inbuf, &len );
835
836    if( len == 0 ) /* peer sent us a keepalive message */
837        dbgmsg( msgs, "got KeepAlive" );
838    else {
839        msgs->incomingMessageLength = len;
840        msgs->state = AWAITING_BT_MESSAGE;
841    }
842
843    return READ_AGAIN;
844}
845
846static void
847updatePeerProgress( tr_peermsgs * msgs )
848{
849    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
850    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
851    updateInterest( msgs );
852    firePeerProgress( msgs );
853}
854
855static int
856readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
857{
858    uint8_t id;
859    uint32_t ui32;
860    uint32_t msglen = msgs->incomingMessageLength;
861
862    if( EVBUFFER_LENGTH(inbuf) < msglen )
863        return READ_MORE;
864
865    tr_peerIoReadUint8( msgs->io, inbuf, &id );
866    msglen--;
867    dbgmsg( msgs, "got BT id %d, len %d", (int)id, (int)msglen );
868
869    switch( id )
870    {
871        case BT_CHOKE:
872            dbgmsg( msgs, "got Choke" );
873            assert( msglen == 0 );
874            msgs->info->clientIsChoked = 1;
875#if 0
876            tr_list * walk;
877            for( walk = msgs->peerAskedFor; walk != NULL; )
878            {
879                tr_list * next = walk->next;
880                /* We shouldn't reject a peer's fast allowed requests at choke */
881                struct peer_request *req = walk->data;
882                if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
883                {
884                    tr_list_remove_data( &msgs->peerAskedFor, req );
885                    tr_free( req );
886                }
887                walk = next;
888            }
889#endif
890            tr_peerMsgsCancelAllRequests( msgs );
891            break;
892
893        case BT_UNCHOKE:
894            dbgmsg( msgs, "got Unchoke" );
895            assert( msglen == 0 );
896            msgs->info->clientIsChoked = 0;
897            fireNeedReq( msgs );
898            break;
899
900        case BT_INTERESTED:
901            dbgmsg( msgs, "got Interested" );
902            assert( msglen == 0 );
903            msgs->info->peerIsInterested = 1;
904            tr_peerMsgsSetChoke( msgs, 0 );
905            break;
906
907        case BT_NOT_INTERESTED:
908            dbgmsg( msgs, "got Not Interested" );
909            assert( msglen == 0 );
910            msgs->info->peerIsInterested = 0;
911            break;
912
913        case BT_HAVE:
914            assert( msglen == 4 );
915            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
916            tr_bitfieldAdd( msgs->info->have, ui32 );
917            updatePeerProgress( msgs );
918            dbgmsg( msgs, "got Have: %u", ui32 );
919            break;
920
921        case BT_BITFIELD: {
922            const int clientIsSeed = tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
923            assert( msglen == msgs->info->have->len );
924            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
925            updatePeerProgress( msgs );
926            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
927            fireNeedReq( msgs );
928            break;
929        }
930
931        case BT_REQUEST: {
932            struct peer_request * req;
933            assert( msglen == 12 );
934            req = tr_new( struct peer_request, 1 );
935            tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
936            tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
937            tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
938            dbgmsg( msgs, "got Request: %u:%u->%u", req->index, req->offset, req->length );
939           
940            if ( !requestIsValid( msgs, req ) )
941            {
942                dbgmsg( msgs, "BT_REQUEST: invalid request, ignoring" );
943                tr_free( req );
944                break;
945            }
946            /*
947                If we're not choking him -> continue
948                If we're choking him
949                    it doesn't support FPE -> He's deaf, reCHOKE and bail...
950                    it support FPE
951                        If the asked piece is not allowed
952                            OR he's above our threshold
953                            OR we don't have the requested piece -> Reject
954                        Else
955                        Asked piece allowed AND he's below our threshold -> continue...
956             */
957   
958
959            if ( msgs->info->peerIsChoked )
960            {
961                if ( !tr_peerIoSupportsFEXT( msgs->io ) )
962                {
963                    dbgmsg( msgs, "BT_REQUEST: peer is choked, ignoring" );
964                    /* Didn't he get it? */
965                    tr_peerMsgsSetChoke( msgs, 1 );
966                    tr_free( req );
967                    break;
968                }
969                else
970                {
971                    if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index )
972                         || ( msgs->info->progress * (float)msgs->torrent->info.pieceCount) >= MAX_ALLOWED_SET_COUNT
973                         || !tr_cpPieceIsComplete( msgs->torrent->completion, req->index ) )
974                    {
975                        dbgmsg( msgs, "BT_REQUEST: peer requests an un-fastallowed piece" );
976                        sendFastReject( msgs, req->index, req->offset, req->length );
977                        tr_free( req );
978                        break;
979                    }
980                    dbgmsg( msgs, "BT_REQUEST: fast allowed piece, accepting request" );
981                }   
982            }
983           
984            tr_list_append( &msgs->peerAskedFor, req );
985            break;
986        }
987
988        case BT_CANCEL: {
989            struct peer_request req;
990            void * data;
991            assert( msglen == 12 );
992            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
993            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
994            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
995            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
996            data = tr_list_remove( &msgs->peerAskedFor, &req, compareRequest );
997            tr_free( data );
998            break;
999        }
1000
1001        case BT_PIECE: {
1002            dbgmsg( msgs, "got a Piece!" );
1003            assert( msgs->blockToUs.length == 0 );
1004            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
1005            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
1006            msgs->blockToUs.length = msglen - 8;
1007            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
1008            //evbuffer_drain( msgs->inBlock, ~0 );
1009            msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
1010            return READ_AGAIN;
1011            break;
1012        }
1013
1014        case BT_PORT: {
1015            dbgmsg( msgs, "Got a BT_PORT" );
1016            assert( msglen == 2 );
1017            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1018            break;
1019        }
1020       
1021        case BT_SUGGEST: {
1022            /* tiennou TODO */
1023            break;
1024        }
1025           
1026        case BT_HAVE_ALL: {
1027            assert( msglen == 0 );
1028            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1029            memset( msgs->info->have->bits, 1, msgs->info->have->len );
1030            updatePeerProgress( msgs );
1031            break;
1032        }
1033           
1034        case BT_HAVE_NONE: {
1035            assert( msglen == 0 );
1036            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1037            memset( msgs->info->have->bits, 1, msgs->info->have->len );
1038            updatePeerProgress( msgs );
1039            break;
1040        }
1041           
1042        case BT_REJECT: {
1043            struct peer_request req;
1044            tr_list * node;
1045            assert( msglen == 12 );
1046            dbgmsg( msgs, "Got a BT_REJECT" );
1047            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1048            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1049            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1050            node = tr_list_find( msgs->peerAskedFor, &req, compareRequest );
1051            if( node != NULL ) {
1052                void * data = node->data;
1053                tr_list_remove_data( &msgs->peerAskedFor, data );
1054                tr_free( data );
1055                dbgmsg( msgs, "found the req that peer has rejected... cancelled." );
1056            }
1057            break;
1058        }
1059           
1060        case BT_ALLOWED_FAST: {
1061            assert( msglen == 4 );
1062            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1063            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1064            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1065            break;
1066        }
1067           
1068        case BT_LTEP:
1069            dbgmsg( msgs, "Got a BT_LTEP" );
1070            parseLtep( msgs, msglen, inbuf );
1071            break;
1072
1073        default:
1074            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1075            tr_peerIoDrain( msgs->io, inbuf, msglen );
1076            assert( 0 );
1077    }
1078
1079    msgs->incomingMessageLength = -1;
1080    msgs->state = AWAITING_BT_LENGTH;
1081    return READ_AGAIN;
1082}
1083
1084static void
1085clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1086{
1087    tr_torrent * tor = msgs->torrent;
1088    tor->activityDate = tr_date( );
1089    tor->downloadedCur += byteCount;
1090    msgs->info->pieceDataActivityDate = time( NULL );
1091    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1092    tr_rcTransferred( tor->download, byteCount );
1093    tr_rcTransferred( tor->handle->download, byteCount );
1094}
1095
1096static void
1097peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1098{
1099    tr_torrent * tor = msgs->torrent;
1100    tor->activityDate = tr_date( );
1101    tor->uploadedCur += byteCount;
1102    msgs->info->pieceDataActivityDate = time( NULL );
1103    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1104    tr_rcTransferred( tor->upload, byteCount );
1105    tr_rcTransferred( tor->handle->upload, byteCount );
1106}
1107
1108static int
1109canDownload( const tr_peermsgs * msgs )
1110{
1111    tr_torrent * tor = msgs->torrent;
1112
1113    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1114        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
1115
1116    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1117        return tr_rcCanTransfer( tor->download );
1118
1119    return TRUE;
1120}
1121
1122static void
1123reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1124{
1125    tr_torrent * tor = msgs->torrent;
1126
1127    /* increment the `corrupt' field */
1128    tor->corruptCur += byteCount;
1129
1130    /* decrement the `downloaded' field */
1131    if( tor->downloadedCur >= byteCount )
1132        tor->downloadedCur -= byteCount;
1133    else
1134        tor->downloadedCur = 0;
1135}
1136
1137
1138static void
1139gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1140{
1141    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1142    reassignBytesToCorrupt( msgs, byteCount );
1143}
1144
1145static void
1146gotUnwantedBlock( tr_peermsgs * msgs, uint32_t index UNUSED, uint32_t offset UNUSED, uint32_t length )
1147{
1148    reassignBytesToCorrupt( msgs, length );
1149}
1150
1151static void
1152addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
1153{
1154    if( !msgs->info->blame )
1155         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1156    tr_bitfieldAdd( msgs->info->blame, index );
1157}
1158
1159static void
1160gotBlock( tr_peermsgs      * msgs,
1161          struct evbuffer  * inbuf,
1162          uint32_t           index,
1163          uint32_t           offset,
1164          uint32_t           length )
1165{
1166    tr_torrent * tor = msgs->torrent;
1167    const int block = _tr_block( tor, index, offset );
1168    struct peer_request key, *req;
1169
1170    /**
1171    *** Remove the block from our `we asked for this' list
1172    **/
1173
1174    key.index = index;
1175    key.offset = offset;
1176    key.length = length;
1177    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
1178                                                 compareRequest );
1179    if( req == NULL ) {
1180        gotUnwantedBlock( msgs, index, offset, length );
1181        dbgmsg( msgs, "we didn't ask for this message..." );
1182        return;
1183    }
1184    dbgmsg( msgs, "Got block %u:%u->%u (turnaround time %d secs)", 
1185                     req->index, req->offset, req->length,
1186                     (int)(time(NULL) - req->time_requested) );
1187    tr_free( req );
1188    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1189                  tr_list_size(msgs->clientAskedFor));
1190
1191    /**
1192    *** Error checks
1193    **/
1194
1195    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1196        dbgmsg( msgs, "have this block already..." );
1197        tr_dbg( "have this block already..." );
1198        gotUnwantedBlock( msgs, index, offset, length );
1199        return;
1200    }
1201
1202    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
1203        dbgmsg( msgs, "block is the wrong length..." );
1204        tr_dbg( "block is the wrong length..." );
1205        gotUnwantedBlock( msgs, index, offset, length );
1206        return;
1207    }
1208
1209    /**
1210    ***  Write the block
1211    **/
1212
1213    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf )))
1214        return;
1215
1216    tr_cpBlockAdd( tor->completion, block );
1217
1218    addUsToBlamefield( msgs, index );
1219
1220    fireGotBlock( msgs, index, offset, length );
1221
1222    /**
1223    ***  Handle if this was the last block in the piece
1224    **/
1225
1226    if( tr_cpPieceIsComplete( tor->completion, index ) )
1227    {
1228        if( tr_ioHash( tor, index ) )
1229        {
1230            gotBadPiece( msgs, index );
1231            return;
1232        }
1233
1234        fireClientHave( msgs, index );
1235    }
1236}
1237
1238
1239static ReadState
1240readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
1241{
1242    uint32_t inlen;
1243    uint8_t * tmp;
1244
1245    assert( msgs != NULL );
1246    assert( msgs->blockToUs.length > 0 );
1247    assert( inbuf != NULL );
1248    assert( EVBUFFER_LENGTH( inbuf ) > 0 );
1249
1250    /* read from the inbuf into our block buffer */
1251    inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
1252    tmp = tr_new( uint8_t, inlen );
1253    tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
1254    evbuffer_add( msgs->inBlock, tmp, inlen );
1255
1256    /* update our tables accordingly */
1257    assert( inlen >= msgs->blockToUs.length );
1258    msgs->blockToUs.length -= inlen;
1259    msgs->info->peerSentPieceDataAt = time( NULL );
1260    clientGotBytes( msgs, inlen );
1261
1262    /* if this was the entire block, save it */
1263    if( !msgs->blockToUs.length )
1264    {
1265        dbgmsg( msgs, "got block %u:%u", msgs->blockToUs.index, msgs->blockToUs.offset );
1266        assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
1267        gotBlock( msgs, msgs->inBlock,
1268                        msgs->blockToUs.index,
1269                        msgs->blockToUs.offset,
1270                        EVBUFFER_LENGTH( msgs->inBlock ) );
1271        evbuffer_drain( msgs->inBlock, ~0 );
1272        msgs->state = AWAITING_BT_LENGTH;
1273    }
1274
1275    /* cleanup */
1276    tr_free( tmp );
1277    return READ_AGAIN;
1278}
1279
1280static ReadState
1281canRead( struct bufferevent * evin, void * vmsgs )
1282{
1283    ReadState ret;
1284    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1285    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1286
1287    if( !canDownload( msgs ) )
1288    {
1289        msgs->notListening = 1;
1290        tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
1291        ret = READ_DONE;
1292    }
1293    else switch( msgs->state )
1294    {
1295        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
1296        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
1297        case READING_BT_PIECE:    ret = readBtPiece   ( msgs, inbuf ); break;
1298        default: assert( 0 );
1299    }
1300
1301    return ret;
1302}
1303
1304static void
1305sendKeepalive( tr_peermsgs * msgs )
1306{
1307    dbgmsg( msgs, "sending a keepalive message" );
1308    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1309}
1310
1311/**
1312***
1313**/
1314
1315static int
1316canWrite( const tr_peermsgs * msgs )
1317{
1318    /* don't let our outbuffer get too large */
1319    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 8192 )
1320        return FALSE;
1321
1322    return TRUE;
1323}
1324
1325static int
1326canUpload( const tr_peermsgs * msgs )
1327{
1328    const tr_torrent * tor = msgs->torrent;
1329
1330    if( !canWrite( msgs ) )
1331        return FALSE;
1332
1333    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1334        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
1335
1336    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1337        return tr_rcCanTransfer( tor->upload );
1338
1339    return TRUE;
1340}
1341
1342static int
1343ratePulse( void * vmsgs )
1344{
1345    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1346    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1347    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1348    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/10), 100 );
1349    msgs->minActiveRequests = msgs->maxActiveRequests / 2;
1350    return TRUE;
1351}
1352
1353static int
1354pulse( void * vmsgs )
1355{
1356    const time_t now = time( NULL );
1357    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1358    size_t len;
1359
1360    /* if we froze out a downloaded block because of speed limits,
1361       start listening to the peer again */
1362    if( msgs->notListening && canDownload( msgs ) )
1363    {
1364        msgs->notListening = 0;
1365        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
1366    }
1367
1368    pumpRequestQueue( msgs );
1369
1370    if( !canWrite( msgs ) )
1371    {
1372    }
1373    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1374    {
1375        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1376        msgs->clientSentAnythingAt = now;
1377    }
1378    else if(( msgs->peerAskedFor ))
1379    {
1380        if( canUpload( msgs ) )
1381        {
1382            struct peer_request * r = tr_list_pop_front( &msgs->peerAskedFor );
1383            uint8_t * tmp = tr_new( uint8_t, r->length );
1384            const uint32_t msglen = sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length;
1385            struct evbuffer * out = evbuffer_new( );
1386            assert( requestIsValid( msgs, r ) );
1387
1388            tr_peerIoWriteUint32( msgs->io, out, msglen );
1389            tr_peerIoWriteUint8 ( msgs->io, out, BT_PIECE );
1390            tr_peerIoWriteUint32( msgs->io, out, r->index );
1391            tr_peerIoWriteUint32( msgs->io, out, r->offset );
1392            tr_peerIoWriteBuf( msgs->io, out );
1393
1394            tr_ioRead( msgs->torrent, r->index, r->offset, r->length, tmp );
1395            tr_peerIoWrite( msgs->io, tmp, r->length );
1396            peerGotBytes( msgs, r->length );
1397
1398            dbgmsg( msgs, "Sending block %u:%u->%u (%d blocks left to send)", r->index, r->offset, r->length, tr_list_size(msgs->peerAskedFor) );
1399
1400            tr_free( r );
1401            tr_free( tmp );
1402            evbuffer_free( out );
1403        }
1404    }
1405    else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1406    {
1407        sendKeepalive( msgs );
1408    }
1409
1410    return TRUE; /* loop forever */
1411}
1412
1413static void
1414didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1415{
1416    pulse( vmsgs );
1417}
1418
1419static void
1420gotError( struct bufferevent * evbuf UNUSED, short what UNUSED, void * vpeer )
1421{
1422    fireGotError( vpeer );
1423}
1424
1425static void
1426sendBitfield( tr_peermsgs * msgs )
1427{
1428    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1429    struct evbuffer * out = msgs->outMessages;
1430
1431    dbgmsg( msgs, "sending peer a bitfield message" );
1432    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1433    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1434    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1435}
1436
1437/**
1438***
1439**/
1440
1441/* some peers give us error messages if we send
1442   more than this many peers in a single pex message */
1443#define MAX_PEX_DIFFS 200
1444
1445typedef struct
1446{
1447    tr_pex * added;
1448    tr_pex * dropped;
1449    tr_pex * elements;
1450    int addedCount;
1451    int droppedCount;
1452    int elementCount;
1453    int diffCount;
1454}
1455PexDiffs;
1456
1457static void
1458pexAddedCb( void * vpex, void * userData )
1459{
1460    PexDiffs * diffs = (PexDiffs *) userData;
1461    tr_pex * pex = (tr_pex *) vpex;
1462    if( diffs->diffCount < MAX_PEX_DIFFS )
1463    {
1464        diffs->diffCount++;
1465        diffs->added[diffs->addedCount++] = *pex;
1466        diffs->elements[diffs->elementCount++] = *pex;
1467    }
1468}
1469
1470static void
1471pexRemovedCb( void * vpex, void * userData )
1472{
1473    PexDiffs * diffs = (PexDiffs *) userData;
1474    tr_pex * pex = (tr_pex *) vpex;
1475    if( diffs->diffCount < MAX_PEX_DIFFS )
1476    {
1477        diffs->diffCount++;
1478        diffs->dropped[diffs->droppedCount++] = *pex;
1479    }
1480}
1481
1482static void
1483pexElementCb( void * vpex, void * userData )
1484{
1485    PexDiffs * diffs = (PexDiffs *) userData;
1486    tr_pex * pex = (tr_pex *) vpex;
1487    if( diffs->diffCount < MAX_PEX_DIFFS )
1488    {
1489        diffs->diffCount++;
1490        diffs->elements[diffs->elementCount++] = *pex;
1491    }
1492}
1493
1494static void
1495sendPex( tr_peermsgs * msgs )
1496{
1497    if( msgs->peerSupportsPex && !msgs->torrent->pexDisabled )
1498    {
1499        int i;
1500        tr_pex * newPex = NULL;
1501        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1502        PexDiffs diffs;
1503        benc_val_t val, *added, *dropped, *flags;
1504        uint8_t *tmp, *walk;
1505        char * benc;
1506        int bencLen;
1507
1508        /* build the diffs */
1509        diffs.added = tr_new( tr_pex, newCount );
1510        diffs.addedCount = 0;
1511        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1512        diffs.droppedCount = 0;
1513        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1514        diffs.elementCount = 0;
1515        diffs.diffCount = 0;
1516        tr_set_compare( msgs->pex, msgs->pexCount,
1517                        newPex, newCount,
1518                        tr_pexCompare, sizeof(tr_pex),
1519                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1520        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1521
1522        /* update peer */
1523        tr_free( msgs->pex );
1524        msgs->pex = diffs.elements;
1525        msgs->pexCount = diffs.elementCount;
1526
1527        /* build the pex payload */
1528        tr_bencInit( &val, TYPE_DICT );
1529        tr_bencDictReserve( &val, 3 );
1530
1531        /* "added" */
1532        added = tr_bencDictAdd( &val, "added" );
1533        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1534        for( i=0; i<diffs.addedCount; ++i ) {
1535            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1536            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1537        }
1538        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1539        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1540
1541        /* "added.f" */
1542        flags = tr_bencDictAdd( &val, "added.f" );
1543        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1544        for( i=0; i<diffs.addedCount; ++i )
1545            *walk++ = diffs.added[i].flags;
1546        assert( ( walk - tmp ) == diffs.addedCount );
1547        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1548
1549        /* "dropped" */
1550        dropped = tr_bencDictAdd( &val, "dropped" );
1551        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1552        for( i=0; i<diffs.droppedCount; ++i ) {
1553            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1554            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1555        }
1556        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1557        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1558
1559        /* write the pex message */
1560        benc = tr_bencSaveMalloc( &val, &bencLen );
1561        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1562        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1563        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1564        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1565
1566        /* cleanup */
1567        tr_free( benc );
1568        tr_bencFree( &val );
1569        tr_free( diffs.added );
1570        tr_free( diffs.dropped );
1571        tr_free( newPex );
1572
1573        msgs->clientSentPexAt = time( NULL );
1574    }
1575}
1576
1577static int
1578pexPulse( void * vpeer )
1579{
1580    sendPex( vpeer );
1581    return TRUE;
1582}
1583
1584/**
1585***
1586**/
1587
1588tr_peermsgs*
1589tr_peerMsgsNew( struct tr_torrent * torrent,
1590                struct tr_peer    * info,
1591                tr_delivery_func    func,
1592                void              * userData,
1593                tr_publisher_tag  * setme )
1594{
1595    tr_peermsgs * m;
1596
1597    assert( info != NULL );
1598    assert( info->io != NULL );
1599
1600    m = tr_new0( tr_peermsgs, 1 );
1601    m->publisher = tr_publisherNew( );
1602    m->info = info;
1603    m->handle = torrent->handle;
1604    m->torrent = torrent;
1605    m->io = info->io;
1606    m->info->clientIsChoked = 1;
1607    m->info->peerIsChoked = 1;
1608    m->info->clientIsInterested = 0;
1609    m->info->peerIsInterested = 0;
1610    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1611    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1612    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1613    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1614    m->outMessages = evbuffer_new( );
1615    m->inBlock = evbuffer_new( );
1616    m->peerAllowedPieces = NULL;
1617    m->clientAllowedPieces = NULL;
1618    setme = tr_publisherSubscribe( m->publisher, func, userData );
1619   
1620    if ( tr_peerIoSupportsFEXT( m->io ) )
1621    {
1622        /* This peer is fastpeer-enabled, generate its allowed set
1623         * (before registering our callbacks) */
1624        if ( !m->peerAllowedPieces ) {
1625            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1626           
1627            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_ALLOWED_SET_COUNT,
1628                                                                 m->torrent->info.pieceCount,
1629                                                                 m->torrent->info.hash,
1630                                                                 peerAddr );
1631        }
1632        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1633    }
1634   
1635    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1636    tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
1637    ratePulse( m );
1638
1639    /**
1640    ***  If we initiated this connection,
1641    ***  we may need to send LTEP/AZMP handshakes.
1642    ***  Otherwise we'll wait for the peer to send theirs first.
1643    **/
1644    if( !tr_peerIoIsIncoming( m->io ) )
1645    {
1646        if ( tr_peerIoSupportsLTEP( m->io ) ) {
1647            sendLtepHandshake( m );
1648           
1649        } else if ( tr_peerIoSupportsAZMP( m->io ) ) {
1650            dbgmsg( m, "FIXME: need to send AZMP handshake" );
1651           
1652        } else {
1653            /* no-op */
1654        }
1655    }
1656   
1657    if ( tr_peerIoSupportsFEXT( m->io ) )
1658    {
1659        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1660        float completion = tr_cpPercentComplete( m->torrent->completion );
1661        if ( completion == 0.0f ) {
1662            sendFastHave( m, 0 );
1663        } else if ( completion == 1.0f ) {
1664            sendFastHave( m, 1 );
1665        } else {
1666            sendBitfield( m );
1667        }
1668        uint32_t peerProgress = m->torrent->info.pieceCount * m->info->progress;
1669       
1670        if ( peerProgress < MAX_ALLOWED_SET_COUNT )
1671            sendFastAllowedSet( m );
1672    } else {
1673        sendBitfield( m );
1674    }
1675    return m;
1676}
1677
1678void
1679tr_peerMsgsFree( tr_peermsgs* msgs )
1680{
1681    if( msgs != NULL )
1682    {
1683        tr_timerFree( &msgs->pulseTimer );
1684        tr_timerFree( &msgs->rateTimer );
1685        tr_timerFree( &msgs->pexTimer );
1686        tr_publisherFree( &msgs->publisher );
1687        tr_list_free( &msgs->clientWillAskFor, tr_free );
1688        tr_list_free( &msgs->clientAskedFor, tr_free );
1689        tr_list_free( &msgs->peerAskedFor, tr_free );
1690        evbuffer_free( msgs->outMessages );
1691        evbuffer_free( msgs->inBlock );
1692        tr_free( msgs->pex );
1693        msgs->pexCount = 0;
1694        tr_free( msgs );
1695    }
1696}
1697
1698tr_publisher_tag
1699tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1700                      tr_delivery_func    func,
1701                      void              * userData )
1702{
1703    return tr_publisherSubscribe( peer->publisher, func, userData );
1704}
1705
1706void
1707tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1708                        tr_publisher_tag    tag )
1709{
1710    tr_publisherUnsubscribe( peer->publisher, tag );
1711}
Note: See TracBrowser for help on using the repository browser.