source: trunk/libtransmission/peer-msgs.c @ 6071

Last change on this file since 6071 was 6071, checked in by charles, 13 years ago

minor refactoring of tr_bitfield to (a) simplify the tests and (b) make things easier to read

  • Property svn:keywords set to Date Rev Author Id
File size: 56.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 6071 2008-06-07 01:44:54Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "completion.h"
26#include "inout.h"
27#include "peer-io.h"
28#include "peer-mgr.h"
29#include "peer-mgr-private.h"
30#include "peer-msgs.h"
31#include "ratecontrol.h"
32#include "stats.h"
33#include "torrent.h"
34#include "trevent.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41enum
42{
43    BT_CHOKE                = 0,
44    BT_UNCHOKE              = 1,
45    BT_INTERESTED           = 2,
46    BT_NOT_INTERESTED       = 3,
47    BT_HAVE                 = 4,
48    BT_BITFIELD             = 5,
49    BT_REQUEST              = 6,
50    BT_PIECE                = 7,
51    BT_CANCEL               = 8,
52    BT_PORT                 = 9,
53    BT_SUGGEST              = 13,
54    BT_HAVE_ALL             = 14,
55    BT_HAVE_NONE            = 15,
56    BT_REJECT               = 16,
57    BT_ALLOWED_FAST         = 17,
58    BT_LTEP                 = 20,
59
60    LTEP_HANDSHAKE          = 0,
61
62    TR_LTEP_PEX             = 1,
63
64    MIN_CHOKE_PERIOD_SEC    = (10),
65
66    /* idle seconds before we send a keepalive */
67    KEEPALIVE_INTERVAL_SECS = 100,
68
69    PEX_INTERVAL            = (90 * 1000), /* msec between sendPex() calls */
70    PEER_PULSE_INTERVAL     = (50),        /* msec between pulse() calls */
71    RATE_PULSE_INTERVAL     = (250),       /* msec between ratePulse() calls */
72
73    MAX_QUEUE_SIZE          = (100),
74     
75    /* (fast peers) max number of pieces we fast-allow to another peer */
76    MAX_FAST_ALLOWED_COUNT   = 10,
77
78    /* (fast peers) max threshold for allowing fast-pieces requests */
79    MAX_FAST_ALLOWED_THRESHOLD = 10,
80
81    /* how long an unsent request can stay queued before it's returned
82       back to the peer-mgr's pool of requests */
83    QUEUED_REQUEST_TTL_SECS = 20,
84
85    /* how long a sent request can stay queued before it's returned
86       back to the peer-mgr's pool of requests */
87    SENT_REQUEST_TTL_SECS = 120,
88
89    /* used in lowering the outMessages queue period */
90    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
91    HIGH_PRIORITY_INTERVAL_SECS = 5,
92    LOW_PRIORITY_INTERVAL_SECS = 30
93};
94
95/**
96***  REQUEST MANAGEMENT
97**/
98
99enum
100{
101    AWAITING_BT_LENGTH,
102    AWAITING_BT_ID,
103    AWAITING_BT_MESSAGE,
104    AWAITING_BT_PIECE
105};
106
107struct peer_request
108{
109    uint32_t index;
110    uint32_t offset;
111    uint32_t length;
112    time_t time_requested;
113};
114
115static int
116compareRequest( const void * va, const void * vb )
117{
118    int i;
119    const struct peer_request * a = va;
120    const struct peer_request * b = vb;
121    if(( i = tr_compareUint32( a->index, b->index ))) return i;
122    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
123    if(( i = tr_compareUint32( a->length, b->length ))) return i;
124    return 0;
125}
126
127struct request_list
128{
129    uint16_t count;
130    uint16_t max;
131    struct peer_request * requests;
132};
133
134static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
135
136static void
137reqListReserve( struct request_list * list, uint16_t max )
138{
139    if( list->max < max )
140    {
141        list->max = max;
142        list->requests = tr_renew( struct peer_request,
143                                   list->requests,
144                                   list->max );
145    }
146}
147
148static void
149reqListClear( struct request_list * list )
150{
151    tr_free( list->requests );
152    *list = REQUEST_LIST_INIT;
153}
154
155static void
156reqListCopy( struct request_list * dest, const struct request_list * src )
157{
158    dest->count = src->count;
159    dest->max = src->max;
160    dest->requests = tr_new( struct peer_request, dest->max );
161    memcpy( dest->requests, src->requests, sizeof( struct peer_request ) * dest->count );
162}
163
164static void
165reqListRemoveOne( struct request_list * list, int i )
166{
167    assert( 0<=i && i<list->count );
168
169    memmove( &list->requests[i],
170             &list->requests[i+1],
171             sizeof( struct peer_request ) * ( --list->count - i ) );
172}
173
174static void
175reqListAppend( struct request_list * list, const struct peer_request * req )
176{
177    if( ++list->count >= list->max )
178        reqListReserve( list, list->max + 8 );
179
180    list->requests[list->count-1] = *req;
181}
182
183static tr_errno
184reqListPop( struct request_list * list, struct peer_request * setme )
185{
186    tr_errno err;
187
188    if( !list->count )
189        err = TR_ERROR;
190    else {
191        *setme = list->requests[0];
192        reqListRemoveOne( list, 0 );
193        err = TR_OK;
194    }
195
196    return err;
197}
198
199static int
200reqListFind( struct request_list * list, const struct peer_request * key )
201{
202    uint16_t i;
203    for( i=0; i<list->count; ++i )
204        if( !compareRequest( key, list->requests+i ) )
205            return i;
206    return -1;
207}
208
209static tr_errno
210reqListRemove( struct request_list * list, const struct peer_request * key )
211{
212    tr_errno err;
213    const int i = reqListFind( list, key );
214
215    if( i < 0 )
216        err = TR_ERROR;
217    else {
218        err = TR_OK;
219        reqListRemoveOne( list, i );
220    }
221
222    return err;
223}
224
225/**
226***
227**/
228
229/* this is raw, unchanged data from the peer regarding
230 * the current message that it's sending us. */
231struct tr_incoming
232{
233    uint8_t id;
234    uint32_t length; /* includes the +1 for id length */
235    struct peer_request blockReq; /* metadata for incoming blocks */
236    struct evbuffer * block; /* piece data for incoming blocks */
237};
238
239struct tr_peermsgs
240{
241    unsigned int peerSentBitfield         : 1;
242    unsigned int peerSupportsPex          : 1;
243    unsigned int clientSentLtepHandshake  : 1;
244    unsigned int peerSentLtepHandshake    : 1;
245    unsigned int sendingBlock             : 1;
246   
247    uint8_t state;
248    uint8_t ut_pex_id;
249    uint16_t pexCount;
250    uint16_t maxActiveRequests;
251    uint16_t minActiveRequests;
252
253    tr_peer * info;
254
255    tr_handle * handle;
256    tr_torrent * torrent;
257    tr_peerIo * io;
258
259    tr_publisher_t * publisher;
260
261    struct evbuffer * outBlock;    /* buffer of the current piece message */
262    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
263
264    struct request_list peerAskedFor;
265    struct request_list peerAskedForFast;
266    struct request_list clientAskedFor;
267    struct request_list clientWillAskFor;
268
269    tr_timer * rateTimer;
270    tr_timer * pulseTimer;
271    tr_timer * pexTimer;
272
273    time_t clientSentPexAt;
274    time_t clientSentAnythingAt;
275
276    /* when we started batching the outMessages */
277    time_t outMessagesBatchedAt;
278
279    /* how long the outMessages batch should be allowed to grow before
280     * it's flushed -- some messages (like requests >:) should be sent
281     * very quickly; others aren't as urgent. */
282    int outMessagesBatchPeriod;
283   
284    tr_bitfield * peerAllowedPieces;
285
286    struct tr_incoming incoming; 
287
288    tr_pex * pex;
289};
290
291/**
292***
293**/
294
295static void
296myDebug( const char * file, int line,
297         const struct tr_peermsgs * msgs,
298         const char * fmt, ... )
299{
300    FILE * fp = tr_getLog( );
301    if( fp != NULL )
302    {
303        va_list args;
304        char timestr[64];
305        struct evbuffer * buf = evbuffer_new( );
306        char * myfile = tr_strdup( file );
307
308        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
309                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
310                             msgs->torrent->info.name,
311                             tr_peerIoGetAddrStr( msgs->io ),
312                             msgs->info->client );
313        va_start( args, fmt );
314        evbuffer_add_vprintf( buf, fmt, args );
315        va_end( args );
316        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
317        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
318
319        tr_free( myfile );
320        evbuffer_free( buf );
321    }
322}
323
324#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
325
326/**
327***
328**/
329
330static void
331pokeBatchPeriod( tr_peermsgs * msgs, int interval )
332{
333    if( msgs->outMessagesBatchPeriod > interval )
334    {
335        msgs->outMessagesBatchPeriod = interval;
336        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
337    }
338}
339
340static void
341protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
342{
343    tr_peerIo * io = msgs->io;
344    struct evbuffer * out = msgs->outMessages;
345
346    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
347    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
348    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
349    tr_peerIoWriteUint32( io, out, req->index );
350    tr_peerIoWriteUint32( io, out, req->offset );
351    tr_peerIoWriteUint32( io, out, req->length );
352    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
353}
354
355static void
356protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
357{
358    tr_peerIo * io = msgs->io;
359    struct evbuffer * out = msgs->outMessages;
360
361    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
362    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
363    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
364    tr_peerIoWriteUint32( io, out, req->index );
365    tr_peerIoWriteUint32( io, out, req->offset );
366    tr_peerIoWriteUint32( io, out, req->length );
367    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
368}
369
370static void
371protocolSendHave( tr_peermsgs * msgs, uint32_t index )
372{
373    tr_peerIo * io = msgs->io;
374    struct evbuffer * out = msgs->outMessages;
375
376    dbgmsg( msgs, "sending Have %u", index );
377    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
378    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
379    tr_peerIoWriteUint32( io, out, index );
380    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
381}
382
383static void
384protocolSendChoke( tr_peermsgs * msgs, int choke )
385{
386    tr_peerIo * io = msgs->io;
387    struct evbuffer * out = msgs->outMessages;
388
389    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
390    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
391    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
392    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
393}
394
395/**
396***  EVENTS
397**/
398
399static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
400
401static void
402publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
403{
404    tr_publisherPublish( msgs->publisher, msgs->info, e );
405}
406
407static void
408fireError( tr_peermsgs * msgs, tr_errno err )
409{
410    tr_peermsgs_event e = blankEvent;
411    e.eventType = TR_PEERMSG_ERROR;
412    e.err = err;
413    publish( msgs, &e );
414}
415
416static void
417fireNeedReq( tr_peermsgs * msgs )
418{
419    tr_peermsgs_event e = blankEvent;
420    e.eventType = TR_PEERMSG_NEED_REQ;
421    publish( msgs, &e );
422}
423
424static void
425firePeerProgress( tr_peermsgs * msgs )
426{
427    tr_peermsgs_event e = blankEvent;
428    e.eventType = TR_PEERMSG_PEER_PROGRESS;
429    e.progress = msgs->info->progress;
430    publish( msgs, &e );
431}
432
433static void
434fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
435{
436    tr_peermsgs_event e = blankEvent;
437    e.eventType = TR_PEERMSG_CLIENT_HAVE;
438    e.pieceIndex = pieceIndex;
439    publish( msgs, &e );
440}
441
442static void
443fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
444{
445    tr_peermsgs_event e = blankEvent;
446    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
447    e.pieceIndex = req->index;
448    e.offset = req->offset;
449    e.length = req->length;
450    publish( msgs, &e );
451}
452
453static void
454firePieceData( tr_peermsgs * msgs )
455{
456    tr_peermsgs_event e = blankEvent;
457    e.eventType = TR_PEERMSG_PIECE_DATA;
458    publish( msgs, &e );
459}
460
461static void
462fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
463{
464    tr_peermsgs_event e = blankEvent;
465    e.eventType = TR_PEERMSG_CANCEL;
466    e.pieceIndex = req->index;
467    e.offset = req->offset;
468    e.length = req->length;
469    publish( msgs, &e );
470}
471
472/**
473***  INTEREST
474**/
475
476static int
477isPieceInteresting( const tr_peermsgs   * peer,
478                    int                   piece )
479{
480    const tr_torrent * torrent = peer->torrent;
481
482    return ( ( !torrent->info.pieces[piece].dnd )               /* we want it */
483          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
484          && ( tr_bitfieldHas( peer->info->have, piece ) ) );  /* peer has it */
485}
486
487/* "interested" means we'll ask for piece data if they unchoke us */
488static int
489isPeerInteresting( const tr_peermsgs * msgs )
490{
491    tr_piece_index_t i;
492    const tr_torrent * torrent;
493    const tr_bitfield * bitfield;
494    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
495
496    if( clientIsSeed )
497        return FALSE;
498
499    torrent = msgs->torrent;
500    bitfield = tr_cpPieceBitfield( torrent->completion );
501
502    if( !msgs->info->have )
503        return TRUE;
504
505    assert( bitfield->byteCount == msgs->info->have->byteCount );
506
507    for( i=0; i<torrent->info.pieceCount; ++i )
508        if( isPieceInteresting( msgs, i ) )
509            return TRUE;
510
511    return FALSE;
512}
513
514static void
515sendInterest( tr_peermsgs * msgs, int weAreInterested )
516{
517    assert( msgs != NULL );
518    assert( weAreInterested==0 || weAreInterested==1 );
519
520    msgs->info->clientIsInterested = weAreInterested;
521    dbgmsg( msgs, "Sending %s",
522            weAreInterested ? "Interested" : "Not Interested");
523
524    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
525    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
526                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
527    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
528}
529
530static void
531updateInterest( tr_peermsgs * msgs )
532{
533    const int i = isPeerInteresting( msgs );
534    if( i != msgs->info->clientIsInterested )
535        sendInterest( msgs, i );
536    if( i )
537        fireNeedReq( msgs );
538}
539
540static void
541cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
542{
543    reqListClear( &msgs->peerAskedFor );
544}
545
546void
547tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
548{
549    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
550
551    assert( msgs != NULL );
552    assert( msgs->info != NULL );
553    assert( choke==0 || choke==1 );
554
555    if( msgs->info->chokeChangedAt > fibrillationTime )
556    {
557        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
558    }
559    else if( msgs->info->peerIsChoked != choke )
560    {
561        msgs->info->peerIsChoked = choke;
562        if( choke )
563            cancelAllRequestsToClientExceptFast( msgs );
564        protocolSendChoke( msgs, choke );
565        msgs->info->chokeChangedAt = time( NULL );
566    }
567}
568
569/**
570***
571**/
572
573void
574tr_peerMsgsHave( tr_peermsgs * msgs,
575                 uint32_t      index )
576{
577    protocolSendHave( msgs, index );
578
579    /* since we have more pieces now, we might not be interested in this peer */
580    updateInterest( msgs );
581}
582#if 0
583static void
584sendFastSuggest( tr_peermsgs * msgs,
585                 uint32_t      pieceIndex )
586{
587    assert( msgs != NULL );
588   
589    if( tr_peerIoSupportsFEXT( msgs->io ) )
590    {
591        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
592        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
593        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
594    }
595}
596static void
597sendFastHave( tr_peermsgs * msgs, int all )
598{
599    assert( msgs != NULL );
600   
601    if( tr_peerIoSupportsFEXT( msgs->io ) )
602    {
603        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
604        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
605                                                                : BT_HAVE_NONE ) );
606        updateInterest( msgs );
607    }
608}
609#endif
610
611static void
612sendFastReject( tr_peermsgs * msgs,
613                uint32_t      pieceIndex,
614                uint32_t      offset,
615                uint32_t      length )
616{
617    assert( msgs != NULL );
618
619    if( tr_peerIoSupportsFEXT( msgs->io ) )
620    {
621        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
622        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
623        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
624        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
625        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
626        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
627        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
628    }
629}
630
631static tr_bitfield*
632getPeerAllowedPieces( tr_peermsgs * msgs )
633{
634    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
635    {
636        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
637            MAX_FAST_ALLOWED_COUNT,
638            msgs->torrent->info.pieceCount,
639            msgs->torrent->info.hash,
640            tr_peerIoGetAddress( msgs->io, NULL ) );
641    }
642
643    return msgs->peerAllowedPieces;
644}
645
646static void
647sendFastAllowed( tr_peermsgs * msgs,
648                 uint32_t      pieceIndex)
649{
650    assert( msgs != NULL );
651   
652    if( tr_peerIoSupportsFEXT( msgs->io ) )
653    {
654        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
655        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
656        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
657        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
658    }
659}
660
661static void
662sendFastAllowedSet( tr_peermsgs * msgs )
663{
664    tr_piece_index_t i = 0;
665
666    while (i <= msgs->torrent->info.pieceCount )
667    {
668        if ( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i) )
669            sendFastAllowed( msgs, i );
670        i++;
671    }
672}
673
674static void
675maybeSendFastAllowedSet( tr_peermsgs * msgs )
676{
677    if( tr_bitfieldCountTrueBits( msgs->info->have ) <= MAX_FAST_ALLOWED_THRESHOLD )
678        sendFastAllowedSet( msgs );
679}
680
681
682/**
683***
684**/
685
686static int
687reqIsValid( const tr_peermsgs   * msgs,
688            uint32_t              index,
689            uint32_t              offset,
690            uint32_t              length )
691{
692    return tr_torrentReqIsValid( msgs->torrent, index, offset, length );
693}
694
695static int
696requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
697{
698    return reqIsValid( msgs, req->index, req->offset, req->length );
699}
700
701static void
702expireOldRequests( tr_peermsgs * msgs )
703{
704    int i;
705    time_t oldestAllowed;
706    struct request_list tmp = REQUEST_LIST_INIT;
707
708    /* cancel requests that have been queued for too long */
709    oldestAllowed = time( NULL ) - QUEUED_REQUEST_TTL_SECS;
710    reqListCopy( &tmp, &msgs->clientWillAskFor );
711    for( i=0; i<tmp.count; ++i ) {
712        const struct peer_request * req = &tmp.requests[i];
713        if( req->time_requested < oldestAllowed )
714            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
715    }
716    reqListClear( &tmp );
717
718    /* cancel requests that were sent too long ago */
719    oldestAllowed = time( NULL ) - SENT_REQUEST_TTL_SECS;
720    reqListCopy( &tmp, &msgs->clientAskedFor );
721    for( i=0; i<tmp.count; ++i ) {
722        const struct peer_request * req = &tmp.requests[i];
723        if( req->time_requested < oldestAllowed )
724            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
725    }
726    reqListClear( &tmp );
727}
728
729static void
730pumpRequestQueue( tr_peermsgs * msgs )
731{
732    const int max = msgs->maxActiveRequests;
733    const int min = msgs->minActiveRequests;
734    const time_t now = time( NULL );
735    int sent = 0;
736    int count = msgs->clientAskedFor.count;
737    struct peer_request req;
738
739    if( count > min )
740        return;
741    if( msgs->info->clientIsChoked )
742        return;
743
744    while( ( count < max ) && !reqListPop( &msgs->clientWillAskFor, &req ) )
745    {
746        assert( requestIsValid( msgs, &req ) );
747        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
748
749        protocolSendRequest( msgs, &req );
750        req.time_requested = now;
751        reqListAppend( &msgs->clientAskedFor, &req );
752
753        ++count;
754        ++sent;
755    }
756
757    if( sent )
758        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
759                sent,
760                msgs->clientAskedFor.count,
761                msgs->clientWillAskFor.count );
762
763    if( count < max )
764        fireNeedReq( msgs );
765}
766
767static int
768pulse( void * vmsgs );
769
770int
771tr_peerMsgsAddRequest( tr_peermsgs * msgs,
772                       uint32_t      index, 
773                       uint32_t      offset, 
774                       uint32_t      length )
775{
776    const int req_max = msgs->maxActiveRequests;
777    struct peer_request req;
778
779    assert( msgs != NULL );
780    assert( msgs->torrent != NULL );
781    assert( reqIsValid( msgs, index, offset, length ) );
782
783    /**
784    ***  Reasons to decline the request
785    **/
786
787    /* don't send requests to choked clients */
788    if( msgs->info->clientIsChoked ) {
789        dbgmsg( msgs, "declining request because they're choking us" );
790        return TR_ADDREQ_CLIENT_CHOKED;
791    }
792
793    /* peer doesn't have this piece */
794    if( !tr_bitfieldHas( msgs->info->have, index ) )
795        return TR_ADDREQ_MISSING;
796
797    /* peer's queue is full */
798    if( msgs->clientWillAskFor.count >= req_max ) {
799        dbgmsg( msgs, "declining request because we're full" );
800        return TR_ADDREQ_FULL;
801    }
802
803    /* have we already asked for this piece? */
804    req.index = index;
805    req.offset = offset;
806    req.length = length;
807    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
808        dbgmsg( msgs, "declining because it's a duplicate" );
809        return TR_ADDREQ_DUPLICATE;
810    }
811    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
812        dbgmsg( msgs, "declining because it's a duplicate" );
813        return TR_ADDREQ_DUPLICATE;
814    }
815
816    /**
817    ***  Accept this request
818    **/
819
820    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
821    req.time_requested = time( NULL );
822    reqListAppend( &msgs->clientWillAskFor, &req );
823    return TR_ADDREQ_OK;
824}
825
826static void
827cancelAllRequestsToPeer( tr_peermsgs * msgs )
828{
829    int i;
830    struct request_list a = msgs->clientWillAskFor;
831    struct request_list b = msgs->clientAskedFor;
832
833    msgs->clientAskedFor = REQUEST_LIST_INIT;
834    msgs->clientWillAskFor = REQUEST_LIST_INIT;
835
836    for( i=0; i<a.count; ++i )
837        fireCancelledReq( msgs, &a.requests[i] );
838
839    for( i=0; i<b.count; ++i ) {
840        fireCancelledReq( msgs, &b.requests[i] );
841        protocolSendCancel( msgs, &b.requests[i] );
842    }
843
844    reqListClear( &a );
845    reqListClear( &b );
846}
847
848void
849tr_peerMsgsCancel( tr_peermsgs * msgs,
850                   uint32_t      pieceIndex,
851                   uint32_t      offset,
852                   uint32_t      length )
853{
854    struct peer_request req;
855
856    assert( msgs != NULL );
857    assert( length > 0 );
858
859    /* have we asked the peer for this piece? */
860    req.index = pieceIndex;
861    req.offset = offset;
862    req.length = length;
863
864    /* if it's only in the queue and hasn't been sent yet, free it */
865    if( !reqListRemove( &msgs->clientWillAskFor, &req ) )
866        fireCancelledReq( msgs, &req );
867
868    /* if it's already been sent, send a cancel message too */
869    if( !reqListRemove( &msgs->clientAskedFor, &req ) ) {
870        protocolSendCancel( msgs, &req );
871        fireCancelledReq( msgs, &req );
872    }
873}
874
875/**
876***
877**/
878
879static void
880sendLtepHandshake( tr_peermsgs * msgs )
881{
882    tr_benc val, *m;
883    char * buf;
884    int len;
885    int pex;
886    struct evbuffer * outbuf;
887
888    if( msgs->clientSentLtepHandshake )
889        return;
890
891    outbuf = evbuffer_new( );
892    dbgmsg( msgs, "sending an ltep handshake" );
893    msgs->clientSentLtepHandshake = 1;
894
895    /* decide if we want to advertise pex support */
896    if( !tr_torrentAllowsPex( msgs->torrent ) )
897        pex = 0;
898    else if( msgs->peerSentLtepHandshake )
899        pex = msgs->peerSupportsPex ? 1 : 0;
900    else
901        pex = 1;
902
903    tr_bencInitDict( &val, 4 );
904    tr_bencDictAddInt( &val, "e", msgs->handle->encryptionMode != TR_PLAINTEXT_PREFERRED );
905    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->handle ) );
906    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
907    m  = tr_bencDictAddDict( &val, "m", 1 );
908    if( pex )
909        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
910    buf = tr_bencSave( &val, &len );
911
912    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
913    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
914    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
915    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
916
917    tr_peerIoWriteBuf( msgs->io, outbuf );
918
919    dbgmsg( msgs, "here is the ltep handshake we sent [%*.*s]", len, len, buf );
920
921    /* cleanup */
922    tr_bencFree( &val );
923    tr_free( buf );
924    evbuffer_free( outbuf );
925}
926
927static void
928parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
929{
930    tr_benc val, * sub;
931    uint8_t * tmp = tr_new( uint8_t, len );
932
933    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
934    msgs->peerSentLtepHandshake = 1;
935
936    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
937        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
938        tr_free( tmp );
939        return;
940    }
941
942    dbgmsg( msgs, "here is the ltep handshake we got [%*.*s]", len, len, tmp );
943
944    /* does the peer prefer encrypted connections? */
945    if(( sub = tr_bencDictFindType( &val, "e", TYPE_INT )))
946        msgs->info->encryption_preference = sub->val.i
947                                      ? ENCRYPTION_PREFERENCE_YES
948                                      : ENCRYPTION_PREFERENCE_NO;
949
950    /* check supported messages for utorrent pex */
951    msgs->peerSupportsPex = 0;
952    if(( sub = tr_bencDictFindType( &val, "m", TYPE_DICT ))) {
953        if(( sub = tr_bencDictFindType( sub, "ut_pex", TYPE_INT ))) {
954            msgs->ut_pex_id = (uint8_t) sub->val.i;
955            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
956            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
957        }
958    }
959
960    /* get peer's listening port */
961    if(( sub = tr_bencDictFindType( &val, "p", TYPE_INT ))) {
962        msgs->info->port = htons( (uint16_t)sub->val.i );
963        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
964    }
965
966    tr_bencFree( &val );
967    tr_free( tmp );
968}
969
970static void
971parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
972{
973    int loaded = 0;
974    uint8_t * tmp = tr_new( uint8_t, msglen );
975    tr_benc val, *added;
976    const tr_torrent * tor = msgs->torrent;
977    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
978
979    if( tr_torrentAllowsPex( tor )
980        && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
981        && (( added = tr_bencDictFindType( &val, "added", TYPE_STR ))))
982    {
983        const char * added_f = NULL;
984        tr_pex * pex;
985        size_t i, n;
986        tr_bencDictFindStr( &val, "added.f", &added_f );
987        pex = tr_peerMgrCompactToPex( added->val.s.s, added->val.s.i, added_f, &n );
988        for( i=0; i<n; ++i )
989            tr_peerMgrAddPex( msgs->handle->peerMgr, tor->info.hash,
990                              TR_PEER_FROM_PEX, pex+i );
991        tr_free( pex );
992    }
993
994    if( loaded )
995        tr_bencFree( &val );
996    tr_free( tmp );
997}
998
999static void
1000sendPex( tr_peermsgs * msgs );
1001
1002static void
1003parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1004{
1005    uint8_t ltep_msgid;
1006
1007    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1008    msglen--;
1009
1010    if( ltep_msgid == LTEP_HANDSHAKE )
1011    {
1012        dbgmsg( msgs, "got ltep handshake" );
1013        parseLtepHandshake( msgs, msglen, inbuf );
1014        if( tr_peerIoSupportsLTEP( msgs->io ) )
1015        {
1016            sendLtepHandshake( msgs );
1017            sendPex( msgs );
1018        }
1019    }
1020    else if( ltep_msgid == TR_LTEP_PEX )
1021    {
1022        dbgmsg( msgs, "got ut pex" );
1023        msgs->peerSupportsPex = 1;
1024        parseUtPex( msgs, msglen, inbuf );
1025    }
1026    else
1027    {
1028        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1029        evbuffer_drain( inbuf, msglen );
1030    }
1031}
1032
1033static int
1034readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1035{
1036    uint32_t len;
1037
1038    if( inlen < sizeof(len) )
1039        return READ_MORE;
1040
1041    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1042
1043    if( len == 0 ) /* peer sent us a keepalive message */
1044        dbgmsg( msgs, "got KeepAlive" );
1045    else {
1046        msgs->incoming.length = len;
1047        msgs->state = AWAITING_BT_ID;
1048    }
1049
1050    return READ_AGAIN;
1051}
1052
1053static int
1054readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
1055
1056static int
1057readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1058{
1059    uint8_t id;
1060
1061    if( inlen < sizeof(uint8_t) )
1062        return READ_MORE;
1063
1064    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1065    msgs->incoming.id = id;
1066
1067    if( id==BT_PIECE )
1068    {
1069        msgs->state = AWAITING_BT_PIECE;
1070        return READ_AGAIN;
1071    }
1072    else if( msgs->incoming.length != 1 )
1073    {
1074        msgs->state = AWAITING_BT_MESSAGE;
1075        return READ_AGAIN;
1076    }
1077    else return readBtMessage( msgs, inbuf, inlen-1 );
1078}
1079
1080static void
1081updatePeerProgress( tr_peermsgs * msgs )
1082{
1083    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1084                         / (float)msgs->torrent->info.pieceCount;
1085    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1086    updateInterest( msgs );
1087    firePeerProgress( msgs );
1088}
1089
1090static int
1091clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1092{
1093    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1094    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
1095        return FALSE;
1096   
1097    /* ...or if we don't have ourself enough pieces */
1098    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
1099        return FALSE;
1100
1101    /* Maybe a bandwidth limit ? */
1102    return TRUE;
1103}
1104
1105static void
1106peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1107{
1108    const int reqIsValid = requestIsValid( msgs, req );
1109    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1110    const int peerIsChoked = msgs->info->peerIsChoked;
1111    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1112    const int pieceIsFast = reqIsValid && tr_bitfieldHas( getPeerAllowedPieces( msgs ), req->index );
1113    const int canSendFast = clientCanSendFastBlock( msgs );
1114
1115    if( !reqIsValid ) /* bad request */
1116    {
1117        dbgmsg( msgs, "rejecting an invalid request." );
1118        sendFastReject( msgs, req->index, req->offset, req->length );
1119    }
1120    else if( !clientHasPiece ) /* we don't have it */
1121    {
1122        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1123        sendFastReject( msgs, req->index, req->offset, req->length );
1124    }
1125    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1126    {
1127        tr_peerMsgsSetChoke( msgs, 1 );
1128        sendFastReject( msgs, req->index, req->offset, req->length );
1129    }
1130    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1131    {
1132        sendFastReject( msgs, req->index, req->offset, req->length );
1133    }
1134    else /* YAY */
1135    {
1136        if( peerIsFast && pieceIsFast )
1137            reqListAppend( &msgs->peerAskedForFast, req );
1138        else
1139            reqListAppend( &msgs->peerAskedFor, req );
1140    }
1141}
1142
1143static int
1144messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1145{
1146    switch( id )
1147    {
1148        case BT_CHOKE:
1149        case BT_UNCHOKE:
1150        case BT_INTERESTED:
1151        case BT_NOT_INTERESTED:
1152        case BT_HAVE_ALL:
1153        case BT_HAVE_NONE:
1154            return len==1;
1155
1156        case BT_HAVE:
1157        case BT_SUGGEST:
1158        case BT_ALLOWED_FAST:
1159            return len==5;
1160
1161        case BT_BITFIELD:
1162            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1163       
1164        case BT_REQUEST:
1165        case BT_CANCEL:
1166        case BT_REJECT:
1167            return len==13;
1168
1169        case BT_PIECE:
1170            return len>9 && len<=16393;
1171
1172        case BT_PORT:
1173            return len==3;
1174
1175        case BT_LTEP:
1176            return len >= 2;
1177
1178        default:
1179            return FALSE;
1180    }
1181}
1182
1183static int
1184clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1185
1186static void
1187clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1188{
1189    const time_t now = time( NULL );
1190    tr_torrent * tor = msgs->torrent;
1191    tor->activityDate = now;
1192    tor->downloadedCur += byteCount;
1193    msgs->info->pieceDataActivityDate = now;
1194    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1195    tr_rcTransferred( tor->download, byteCount );
1196    tr_rcTransferred( tor->handle->download, byteCount );
1197    tr_statsAddDownloaded( msgs->handle, byteCount );
1198    firePieceData( msgs );
1199}
1200
1201static int
1202readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1203{
1204    struct peer_request * req = &msgs->incoming.blockReq;
1205    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1206    dbgmsg( msgs, "In readBtPiece" );
1207
1208    if( !req->length )
1209    {
1210        if( inlen < 8 )
1211            return READ_MORE;
1212
1213        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1214        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1215        req->length = msgs->incoming.length - 9;
1216        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1217        return READ_AGAIN;
1218    }
1219    else
1220    {
1221        int err;
1222
1223        /* read in another chunk of data */
1224        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1225        size_t n = MIN( nLeft, inlen );
1226        uint8_t * buf = tr_new( uint8_t, n );
1227        assert( EVBUFFER_LENGTH(inbuf) >= n );
1228        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1229        evbuffer_add( msgs->incoming.block, buf, n );
1230        clientGotBytes( msgs, n );
1231        tr_free( buf );
1232        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1233               (int)n, req->index, req->offset, req->length,
1234               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1235        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1236            return READ_MORE;
1237
1238        /* we've got the whole block ... process it */
1239        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1240
1241        /* cleanup */
1242        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1243        req->length = 0;
1244        msgs->state = AWAITING_BT_LENGTH;
1245        if( !err )
1246            return READ_AGAIN;
1247        else {
1248            fireError( msgs, err );
1249            return READ_DONE;
1250        }
1251    }
1252}
1253
1254static int
1255readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1256{
1257    uint32_t ui32;
1258    uint32_t msglen = msgs->incoming.length;
1259    const uint8_t id = msgs->incoming.id;
1260    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1261
1262    --msglen; /* id length */
1263
1264    if( inlen < msglen )
1265        return READ_MORE;
1266
1267    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1268
1269    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1270    {
1271        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1272        fireError( msgs, TR_ERROR );
1273        return READ_DONE;
1274    }
1275
1276    switch( id )
1277    {
1278        case BT_CHOKE:
1279            dbgmsg( msgs, "got Choke" );
1280            msgs->info->clientIsChoked = 1;
1281            cancelAllRequestsToPeer( msgs );
1282            cancelAllRequestsToClientExceptFast( msgs );
1283            break;
1284
1285        case BT_UNCHOKE:
1286            dbgmsg( msgs, "got Unchoke" );
1287            msgs->info->clientIsChoked = 0;
1288            fireNeedReq( msgs );
1289            break;
1290
1291        case BT_INTERESTED:
1292            dbgmsg( msgs, "got Interested" );
1293            msgs->info->peerIsInterested = 1;
1294            tr_peerMsgsSetChoke( msgs, 0 );
1295            break;
1296
1297        case BT_NOT_INTERESTED:
1298            dbgmsg( msgs, "got Not Interested" );
1299            msgs->info->peerIsInterested = 0;
1300            break;
1301           
1302        case BT_HAVE:
1303            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1304            dbgmsg( msgs, "got Have: %u", ui32 );
1305            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1306                fireError( msgs, TR_ERROR_PEER_MESSAGE );
1307            updatePeerProgress( msgs );
1308            tr_rcTransferred( msgs->torrent->swarmSpeed, msgs->torrent->info.pieceSize );
1309            break;
1310
1311        case BT_BITFIELD: {
1312            dbgmsg( msgs, "got a bitfield" );
1313            msgs->peerSentBitfield = 1;
1314            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1315            updatePeerProgress( msgs );
1316            maybeSendFastAllowedSet( msgs );
1317            fireNeedReq( msgs );
1318            break;
1319        }
1320
1321        case BT_REQUEST: {
1322            struct peer_request r;
1323            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1324            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1325            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1326            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1327            peerMadeRequest( msgs, &r );
1328            break;
1329        }
1330
1331        case BT_CANCEL: {
1332            struct peer_request r;
1333            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1334            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1335            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1336            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1337            reqListRemove( &msgs->peerAskedForFast, &r );
1338            reqListRemove( &msgs->peerAskedFor, &r );
1339            break;
1340        }
1341
1342        case BT_PIECE:
1343            assert( 0 ); /* handled elsewhere! */
1344            break;
1345       
1346        case BT_PORT:
1347            dbgmsg( msgs, "Got a BT_PORT" );
1348            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1349            break;
1350           
1351        case BT_SUGGEST: {
1352            dbgmsg( msgs, "Got a BT_SUGGEST" );
1353            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1354            /* we don't do anything with this yet */
1355            break;
1356        }
1357           
1358        case BT_HAVE_ALL:
1359            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1360            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1361            updatePeerProgress( msgs );
1362            maybeSendFastAllowedSet( msgs );
1363            break;
1364       
1365       
1366        case BT_HAVE_NONE:
1367            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1368            tr_bitfieldClear( msgs->info->have );
1369            updatePeerProgress( msgs );
1370            maybeSendFastAllowedSet( msgs );
1371            break;
1372       
1373        case BT_REJECT: {
1374            struct peer_request r;
1375            dbgmsg( msgs, "Got a BT_REJECT" );
1376            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1377            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1378            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1379            reqListRemove( &msgs->clientAskedFor, &r );
1380            break;
1381        }
1382
1383        case BT_ALLOWED_FAST: {
1384            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1385            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1386            /* we don't do anything with this yet */
1387            break;
1388        }
1389
1390        case BT_LTEP:
1391            dbgmsg( msgs, "Got a BT_LTEP" );
1392            parseLtep( msgs, msglen, inbuf );
1393            break;
1394
1395        default:
1396            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1397            tr_peerIoDrain( msgs->io, inbuf, msglen );
1398            break;
1399    }
1400
1401    assert( msglen + 1 == msgs->incoming.length );
1402    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1403
1404    msgs->state = AWAITING_BT_LENGTH;
1405    return READ_AGAIN;
1406}
1407
1408static void
1409peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1410{
1411    const time_t now = time( NULL );
1412    tr_torrent * tor = msgs->torrent;
1413    tor->activityDate = now;
1414    tor->uploadedCur += byteCount;
1415    msgs->info->pieceDataActivityDate = now;
1416    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1417    tr_rcTransferred( tor->upload, byteCount );
1418    tr_rcTransferred( tor->handle->upload, byteCount );
1419    tr_statsAddUploaded( msgs->handle, byteCount );
1420    firePieceData( msgs );
1421}
1422
1423static size_t
1424getDownloadMax( const tr_peermsgs * msgs )
1425{
1426    static const size_t maxval = ~0;
1427    const tr_torrent * tor = msgs->torrent;
1428
1429    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1430        return tor->handle->useDownloadLimit
1431            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1432
1433    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1434        return tr_rcBytesLeft( tor->download );
1435
1436    return maxval;
1437}
1438
1439static void
1440decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1441{
1442    tr_torrent * tor = msgs->torrent;
1443    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1444}
1445
1446static void
1447reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1448{
1449    tr_torrent * tor = msgs->torrent;
1450
1451    tor->corruptCur += byteCount;
1452
1453    decrementDownloadedCount( msgs, byteCount );
1454}
1455
1456static void
1457gotBadPiece( tr_peermsgs * msgs, tr_piece_index_t pieceIndex )
1458{
1459    const uint32_t byteCount =
1460        tr_torPieceCountBytes( msgs->torrent, pieceIndex );
1461    reassignBytesToCorrupt( msgs, byteCount );
1462}
1463
1464static void
1465clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1466{
1467    decrementDownloadedCount( msgs, req->length );
1468}
1469
1470static void
1471addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1472{
1473    if( !msgs->info->blame )
1474         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1475    tr_bitfieldAdd( msgs->info->blame, index );
1476}
1477
1478static tr_errno
1479clientGotBlock( tr_peermsgs                * msgs,
1480                const uint8_t              * data,
1481                const struct peer_request  * req )
1482{
1483    int err;
1484    tr_torrent * tor = msgs->torrent;
1485    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1486
1487    assert( msgs != NULL );
1488    assert( req != NULL );
1489
1490    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1491    {
1492        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1493                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1494        return TR_ERROR;
1495    }
1496
1497    /* save the block */
1498    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1499
1500    /**
1501    *** Remove the block from our `we asked for this' list
1502    **/
1503
1504    if( reqListRemove( &msgs->clientAskedFor, req ) )
1505    {
1506        clientGotUnwantedBlock( msgs, req );
1507        dbgmsg( msgs, "we didn't ask for this message..." );
1508        return 0;
1509    }
1510
1511    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1512                  msgs->clientAskedFor.count );
1513
1514    /**
1515    *** Error checks
1516    **/
1517
1518    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1519        dbgmsg( msgs, "we have this block already..." );
1520        clientGotUnwantedBlock( msgs, req );
1521        return 0;
1522    }
1523
1524    /**
1525    ***  Save the block
1526    **/
1527
1528    msgs->info->peerSentPieceDataAt = time( NULL );
1529    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1530        return err;
1531
1532    tr_cpBlockAdd( tor->completion, block );
1533
1534    addPeerToBlamefield( msgs, req->index );
1535
1536    fireGotBlock( msgs, req );
1537
1538    /**
1539    ***  Handle if this was the last block in the piece
1540    **/
1541
1542    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1543    {
1544        const tr_errno err = tr_ioTestPiece( tor, req->index );
1545
1546        if( err )
1547            tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test: %s" ),
1548                       (unsigned long)req->index,
1549                       tr_errorString( err ) );
1550
1551        tr_torrentSetHasPiece( tor, req->index, !err );
1552        tr_torrentSetPieceChecked( tor, req->index, TRUE );
1553        tr_peerMgrSetBlame( tor->handle->peerMgr, tor->info.hash, req->index, !err );
1554
1555        if( !err )
1556            fireClientHave( msgs, req->index );
1557        else
1558            gotBadPiece( msgs, req->index );
1559    }
1560
1561    return 0;
1562}
1563
1564static void
1565didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1566{
1567    pulse( vmsgs );
1568}
1569
1570static ReadState
1571canRead( struct bufferevent * evin, void * vmsgs )
1572{
1573    ReadState ret;
1574    tr_peermsgs * msgs = vmsgs;
1575    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1576    const size_t inlen = EVBUFFER_LENGTH( in );
1577
1578    if( !inlen )
1579    {
1580        ret = READ_DONE;
1581    }
1582    else if( msgs->state == AWAITING_BT_PIECE )
1583    {
1584        const size_t downloadMax = getDownloadMax( msgs );
1585        const size_t n = MIN( inlen, downloadMax );
1586        ret = n ? readBtPiece( msgs, in, n ) : READ_DONE;
1587    }
1588    else switch( msgs->state )
1589    {
1590        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, in, inlen ); break;
1591        case AWAITING_BT_ID:      ret = readBtId     ( msgs, in, inlen ); break;
1592        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, in, inlen ); break;
1593        default:                  assert( 0 );
1594    }
1595
1596    return ret;
1597}
1598
1599static void
1600sendKeepalive( tr_peermsgs * msgs )
1601{
1602    dbgmsg( msgs, "sending a keepalive message" );
1603    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1604    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1605}
1606
1607/**
1608***
1609**/
1610
1611static size_t
1612getUploadMax( const tr_peermsgs * msgs )
1613{
1614    static const size_t maxval = ~0;
1615    const tr_torrent * tor = msgs->torrent;
1616    int speedLeft;
1617    int bufLeft;
1618
1619    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1620        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1621    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1622        speedLeft = tr_rcBytesLeft( tor->upload );
1623    else
1624        speedLeft = INT_MAX;
1625
1626    /* this basically says the outbuf shouldn't have more than one block
1627     * queued up in it... blocksize, +13 for the size of the BT protocol's
1628     * block message overhead */
1629    bufLeft = tor->blockSize + 13 - tr_peerIoWriteBytesWaiting( msgs->io );
1630    return MIN( speedLeft, bufLeft );
1631}
1632
1633static int
1634ratePulse( void * vmsgs )
1635{
1636    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1637    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1638    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1639    msgs->maxActiveRequests = MIN( 4 + (int)(msgs->info->rateToClient/4), MAX_QUEUE_SIZE );
1640    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1641    return TRUE;
1642}
1643
1644static tr_errno
1645popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
1646{
1647    if( !reqListPop( &msgs->peerAskedForFast, setme ) )
1648        return 0;
1649    if( !reqListPop( &msgs->peerAskedFor, setme ) )
1650        return 0;
1651
1652    return TR_ERROR;
1653}
1654
1655static int
1656pulse( void * vmsgs )
1657{
1658    const time_t now = time( NULL );
1659    tr_peermsgs * msgs = vmsgs;
1660
1661    tr_peerIoTryRead( msgs->io );
1662    pumpRequestQueue( msgs );
1663    expireOldRequests( msgs );
1664
1665    if( msgs->sendingBlock )
1666    {
1667        const size_t uploadMax = getUploadMax( msgs );
1668        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1669        const size_t outlen = MIN( len, uploadMax );
1670
1671        assert( len );
1672
1673        if( outlen )
1674        {
1675            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1676            evbuffer_drain( msgs->outBlock, outlen );
1677            peerGotBytes( msgs, outlen );
1678
1679            len -= outlen;
1680            msgs->clientSentAnythingAt = now;
1681            msgs->sendingBlock = len!=0;
1682
1683            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1684        }
1685        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1686    }
1687
1688    if( !msgs->sendingBlock )
1689    {
1690        struct peer_request req;
1691        int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1692
1693        if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1694        {
1695            dbgmsg( msgs, "started an outMessages batch (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1696            msgs->outMessagesBatchedAt = now;
1697        }
1698        else if( haveMessages
1699                 && ( ( now - msgs->outMessagesBatchedAt ) > msgs->outMessagesBatchPeriod ) )
1700        {
1701            dbgmsg( msgs, "flushing outMessages... (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1702            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1703            msgs->clientSentAnythingAt = now;
1704            msgs->outMessagesBatchedAt = 0;
1705            msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1706        }
1707        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1708            && !popNextRequest( msgs, &req )
1709            && requestIsValid( msgs, &req )
1710            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1711        {
1712            uint8_t * buf = tr_new( uint8_t, req.length );
1713
1714            if( !tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ) )
1715            {
1716                tr_peerIo * io = msgs->io;
1717                struct evbuffer * out = msgs->outBlock;
1718
1719                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1720                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + req.length );
1721                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1722                tr_peerIoWriteUint32( io, out, req.index );
1723                tr_peerIoWriteUint32( io, out, req.offset );
1724                tr_peerIoWriteBytes ( io, out, buf, req.length );
1725                msgs->sendingBlock = 1;
1726            }
1727
1728            tr_free( buf );
1729        }
1730        else if(    ( !haveMessages )
1731                 && ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1732        {
1733            sendKeepalive( msgs );
1734        }
1735    }
1736
1737    return TRUE; /* loop forever */
1738}
1739
1740static void
1741gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1742{
1743    if( what & EVBUFFER_TIMEOUT )
1744        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1745    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1746        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1747                what, errno, tr_strerror(errno) );
1748    fireError( vmsgs, TR_ERROR );
1749}
1750
1751static void
1752sendBitfield( tr_peermsgs * msgs )
1753{
1754    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1755    struct evbuffer * out = msgs->outMessages;
1756
1757    dbgmsg( msgs, "sending peer a bitfield message" );
1758    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->byteCount );
1759    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1760    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->byteCount );
1761    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1762}
1763
1764/**
1765***
1766**/
1767
1768/* some peers give us error messages if we send
1769   more than this many peers in a single pex message
1770   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1771#define MAX_PEX_ADDED 50
1772#define MAX_PEX_DROPPED 50
1773
1774typedef struct
1775{
1776    tr_pex * added;
1777    tr_pex * dropped;
1778    tr_pex * elements;
1779    int addedCount;
1780    int droppedCount;
1781    int elementCount;
1782}
1783PexDiffs;
1784
1785static void
1786pexAddedCb( void * vpex, void * userData )
1787{
1788    PexDiffs * diffs = userData;
1789    tr_pex * pex = vpex;
1790    if( diffs->addedCount < MAX_PEX_ADDED )
1791    {
1792        diffs->added[diffs->addedCount++] = *pex;
1793        diffs->elements[diffs->elementCount++] = *pex;
1794    }
1795}
1796
1797static void
1798pexDroppedCb( void * vpex, void * userData )
1799{
1800    PexDiffs * diffs = userData;
1801    tr_pex * pex = vpex;
1802    if( diffs->droppedCount < MAX_PEX_DROPPED )
1803    {
1804        diffs->dropped[diffs->droppedCount++] = *pex;
1805    }
1806}
1807
1808static void
1809pexElementCb( void * vpex, void * userData )
1810{
1811    PexDiffs * diffs = userData;
1812    tr_pex * pex = vpex;
1813    diffs->elements[diffs->elementCount++] = *pex;
1814}
1815
1816static void
1817sendPex( tr_peermsgs * msgs )
1818{
1819    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1820    {
1821        int i;
1822        tr_pex * newPex = NULL;
1823        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1824        PexDiffs diffs;
1825        tr_benc val, *added, *dropped;
1826        uint8_t *tmp, *walk;
1827        char * benc;
1828        int bencLen;
1829
1830        /* build the diffs */
1831        diffs.added = tr_new( tr_pex, newCount );
1832        diffs.addedCount = 0;
1833        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1834        diffs.droppedCount = 0;
1835        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1836        diffs.elementCount = 0;
1837        tr_set_compare( msgs->pex, msgs->pexCount,
1838                        newPex, newCount,
1839                        tr_pexCompare, sizeof(tr_pex),
1840                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1841        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1842
1843        /* update peer */
1844        tr_free( msgs->pex );
1845        msgs->pex = diffs.elements;
1846        msgs->pexCount = diffs.elementCount;
1847
1848        /* build the pex payload */
1849        tr_bencInitDict( &val, 3 );
1850
1851        /* "added" */
1852        added = tr_bencDictAdd( &val, "added" );
1853        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1854        for( i=0; i<diffs.addedCount; ++i ) {
1855            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1856            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1857        }
1858        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1859        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1860
1861        /* "added.f" */
1862        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1863        for( i=0; i<diffs.addedCount; ++i )
1864            *walk++ = diffs.added[i].flags;
1865        assert( ( walk - tmp ) == diffs.addedCount );
1866        tr_bencDictAddRaw( &val, "added.f", tmp, walk-tmp );
1867
1868        /* "dropped" */
1869        dropped = tr_bencDictAdd( &val, "dropped" );
1870        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1871        for( i=0; i<diffs.droppedCount; ++i ) {
1872            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1873            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1874        }
1875        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1876        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1877
1878        /* write the pex message */
1879        benc = tr_bencSave( &val, &bencLen );
1880        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1881        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1882        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, msgs->ut_pex_id );
1883        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1884        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1885
1886        /* cleanup */
1887        tr_free( benc );
1888        tr_bencFree( &val );
1889        tr_free( diffs.added );
1890        tr_free( diffs.dropped );
1891        tr_free( newPex );
1892
1893        msgs->clientSentPexAt = time( NULL );
1894    }
1895}
1896
1897static int
1898pexPulse( void * vpeer )
1899{
1900    sendPex( vpeer );
1901    return TRUE;
1902}
1903
1904/**
1905***
1906**/
1907
1908tr_peermsgs*
1909tr_peerMsgsNew( struct tr_torrent * torrent,
1910                struct tr_peer    * info,
1911                tr_delivery_func    func,
1912                void              * userData,
1913                tr_publisher_tag  * setme )
1914{
1915    tr_peermsgs * m;
1916
1917    assert( info != NULL );
1918    assert( info->io != NULL );
1919
1920    m = tr_new0( tr_peermsgs, 1 );
1921    m->publisher = tr_publisherNew( );
1922    m->info = info;
1923    m->handle = torrent->handle;
1924    m->torrent = torrent;
1925    m->io = info->io;
1926    m->info->clientIsChoked = 1;
1927    m->info->peerIsChoked = 1;
1928    m->info->clientIsInterested = 0;
1929    m->info->peerIsInterested = 0;
1930    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1931    m->state = AWAITING_BT_LENGTH;
1932    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1933    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1934    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1935    m->outMessages = evbuffer_new( );
1936    m->outMessagesBatchedAt = 0;
1937    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1938    m->incoming.block = evbuffer_new( );
1939    m->outBlock = evbuffer_new( );
1940    m->peerAllowedPieces = NULL;
1941    m->peerAskedFor = REQUEST_LIST_INIT;
1942    m->peerAskedForFast = REQUEST_LIST_INIT;
1943    m->clientAskedFor = REQUEST_LIST_INIT;
1944    m->clientWillAskFor = REQUEST_LIST_INIT;
1945    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1946
1947    if ( tr_peerIoSupportsLTEP( m->io ) )
1948        sendLtepHandshake( m );
1949
1950    sendBitfield( m );
1951   
1952    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1953    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1954    ratePulse( m );
1955
1956    return m;
1957}
1958
1959void
1960tr_peerMsgsFree( tr_peermsgs* msgs )
1961{
1962    if( msgs != NULL )
1963    {
1964        tr_timerFree( &msgs->pulseTimer );
1965        tr_timerFree( &msgs->rateTimer );
1966        tr_timerFree( &msgs->pexTimer );
1967        tr_publisherFree( &msgs->publisher );
1968        reqListClear( &msgs->clientWillAskFor );
1969        reqListClear( &msgs->clientAskedFor );
1970        reqListClear( &msgs->peerAskedForFast );
1971        reqListClear( &msgs->peerAskedFor );
1972        tr_bitfieldFree( msgs->peerAllowedPieces );
1973        evbuffer_free( msgs->incoming.block );
1974        evbuffer_free( msgs->outMessages );
1975        evbuffer_free( msgs->outBlock );
1976        tr_free( msgs->pex );
1977
1978        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1979        tr_free( msgs );
1980    }
1981}
1982
1983tr_publisher_tag
1984tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1985                      tr_delivery_func    func,
1986                      void              * userData )
1987{
1988    return tr_publisherSubscribe( peer->publisher, func, userData );
1989}
1990
1991void
1992tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1993                        tr_publisher_tag    tag )
1994{
1995    tr_publisherUnsubscribe( peer->publisher, tag );
1996}
Note: See TracBrowser for help on using the repository browser.