Changeset 13954
- Timestamp:
- Feb 4, 2013, 4:23:33 PM (8 years ago)
- Location:
- trunk/libtransmission
- Files:
-
- 10 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/libtransmission/clients.c
r13625 r13954 144 144 } 145 145 146 void 146 char * 147 147 tr_clientForId (char * buf, size_t buflen, const void * id_in) 148 148 { … … 152 152 153 153 if (!id) 154 return ;154 return buf; 155 155 156 156 /* Azureus-style */ … … 309 309 310 310 if (*buf) 311 return ;311 return buf; 312 312 } 313 313 … … 332 332 333 333 if (*buf) 334 return ;334 return buf; 335 335 } 336 336 … … 340 340 if (*id=='M') mainline_style (buf, buflen, "BitTorrent", id); 341 341 if (*id=='Q') mainline_style (buf, buflen, "Queen Bee", id); 342 if (*buf) return ;342 if (*buf) return buf; 343 343 } 344 344 345 345 if (decodeBitCometClient (buf, buflen, id)) 346 return ;346 return buf; 347 347 348 348 /* Clients with no version */ … … 441 441 { 442 442 tr_snprintf (buf, buflen, "%s %d.%d.%d", name, a, b, c); 443 return ;443 return buf; 444 444 } 445 445 } … … 462 462 tr_strlcpy (buf, out, buflen); 463 463 } 464 } 464 465 return buf; 466 } -
trunk/libtransmission/clients.h
r13625 r13954 22 22 * @ingroup utils 23 23 */ 24 voidtr_clientForId (char * buf, size_t buflen, const void * peer_id);24 char* tr_clientForId (char * buf, size_t buflen, const void * peer_id); 25 25 26 26 #endif -
trunk/libtransmission/peer-common.h
r13949 r13954 27 27 * @{ 28 28 */ 29 30 struct tr_peer; 31 struct tr_swarm; 29 32 30 33 enum … … 73 76 extern const tr_peer_event TR_PEER_EVENT_INIT; 74 77 78 typedef void tr_peer_callback (struct tr_peer * peer, 79 const tr_peer_event * event, 80 void * client_data); 81 82 /*** 83 **** 84 ***/ 85 86 typedef void (*tr_peer_destruct_func)(struct tr_peer * peer); 87 typedef bool (*tr_peer_is_transferring_pieces_func)(const struct tr_peer * peer, 88 uint64_t now, 89 tr_direction direction, 90 unsigned int * Bps); 91 struct tr_peer_virtual_funcs 92 { 93 tr_peer_destruct_func destruct; 94 tr_peer_is_transferring_pieces_func is_transferring_pieces; 95 }; 96 75 97 /** 76 98 * State information about a connected peer. 77 99 * 78 100 * @see struct peer_atom 79 * @see tr_peer msgs101 * @see tr_peerMsgs 80 102 */ 81 103 typedef struct tr_peer … … 84 106 NOTE: private to peer-mgr.c */ 85 107 bool doPurge; 86 87 /* Whether or not we've choked this peer.88 Only applies to BitTorrent peers */89 bool peerIsChoked;90 91 /* whether or not the peer has indicated it will download from us.92 Only applies to BitTorrent peers */93 bool peerIsInterested;94 95 /* whether or the peer is choking us.96 Only applies to BitTorrent peers */97 bool clientIsChoked;98 99 /* whether or not we've indicated to the peer that we would download from them if unchoked.100 Only applies to BitTorrent peers */101 bool clientIsInterested;102 108 103 109 /* number of bad pieces they've contributed to */ … … 110 116 int pendingReqsToPeer; 111 117 112 struct tr_peerIo * io;113 114 118 /* Hook to private peer-mgr information */ 115 119 struct peer_atom * atom; 120 121 struct tr_swarm * swarm; 116 122 117 123 /** how complete the peer's copy of the torrent is. [0.0...1.0] */ … … 125 131 tr_quark client; 126 132 127 time_t chokeChangedAt;128 129 133 tr_recentHistory blocksSentToClient; 130 134 tr_recentHistory blocksSentToPeer; … … 133 137 tr_recentHistory cancelsSentToPeer; 134 138 135 struct tr_peermsgs * msgs;139 const struct tr_peer_virtual_funcs * funcs; 136 140 } 137 141 tr_peer; 138 142 139 typedef void tr_peer_callback (struct tr_peer * peer, 140 const tr_peer_event * event, 141 void * client_data); 143 144 void tr_peerConstruct (struct tr_peer * peer, const tr_torrent * tor); 145 146 void tr_peerDestruct (struct tr_peer * peer); 147 142 148 143 149 /** Update the tr_peer.progress field based on the 'have' bitset. */ -
trunk/libtransmission/peer-io.h
r13900 r13954 91 91 tr_port port; 92 92 int socket; 93 struct UTPSocket *utp_socket;93 struct UTPSocket * utp_socket; 94 94 95 95 int refCount; … … 139 139 struct UTPSocket * utp_socket); 140 140 141 void tr_peerIoRefImpl (const char * file,141 void tr_peerIoRefImpl (const char * file, 142 142 int line, 143 143 tr_peerIo * io); … … 145 145 #define tr_peerIoRef(io) tr_peerIoRefImpl (__FILE__, __LINE__, (io)); 146 146 147 void tr_peerIoUnrefImpl (const char * file,147 void tr_peerIoUnrefImpl (const char * file, 148 148 int line, 149 149 tr_peerIo * io); -
trunk/libtransmission/peer-mgr.c
r13903 r13954 115 115 * 116 116 * @see tr_peer 117 * @see tr_peer msgs117 * @see tr_peerMsgs 118 118 */ 119 119 struct peer_atom … … 187 187 tr_ptrArray outgoingHandshakes; /* tr_handshake */ 188 188 tr_ptrArray pool; /* struct peer_atom */ 189 tr_ptrArray peers; /* tr_peer */189 tr_ptrArray peers; /* tr_peerMsgs */ 190 190 tr_ptrArray webseeds; /* tr_webseed */ 191 191 … … 193 193 struct tr_peerMgr * manager; 194 194 195 tr_peer 195 tr_peerMsgs * optimistic; /* the optimistic peer, or NULL if none */ 196 196 int optimisticUnchokeTimeScaler; 197 197 … … 253 253 254 254 /** 255 *** 255 *** tr_peer virtual functions 256 256 **/ 257 257 258 static inline void 259 managerLock (const struct tr_peerMgr * manager) 260 { 261 tr_sessionLock (manager->session); 262 } 263 264 static inline void 265 managerUnlock (const struct tr_peerMgr * manager) 266 { 267 tr_sessionUnlock (manager->session); 268 } 269 270 static inline void 271 swarmLock (tr_swarm * swarm) 272 { 273 managerLock (swarm->manager); 274 } 275 276 static inline void 277 swarmUnlock (tr_swarm * swarm) 278 { 279 managerUnlock (swarm->manager); 280 } 281 282 static inline int 283 swarmIsLocked (const tr_swarm * swarm) 284 { 285 return tr_sessionIsLocked (swarm->manager->session); 258 static bool 259 tr_peerIsTransferringPieces (const tr_peer * peer, 260 uint64_t now, 261 tr_direction direction, 262 unsigned int * Bps) 263 { 264 assert (peer != NULL); 265 assert (peer->funcs != NULL); 266 267 return (*peer->funcs->is_transferring_pieces)(peer, now, direction, Bps); 268 } 269 270 unsigned int 271 tr_peerGetPieceSpeed_Bps (const tr_peer * peer, 272 uint64_t now, 273 tr_direction direction) 274 { 275 unsigned int Bps = 0; 276 tr_peerIsTransferringPieces (peer, now, direction, &Bps); 277 return Bps; 278 } 279 280 static void 281 tr_peerFree (tr_peer * peer) 282 { 283 assert (peer != NULL); 284 assert (peer->funcs != NULL); 285 286 (*peer->funcs->destruct)(peer); 287 288 tr_free (peer); 289 } 290 291 void 292 tr_peerConstruct (tr_peer * peer, const tr_torrent * tor) 293 { 294 assert (peer != NULL); 295 assert (tr_isTorrent (tor)); 296 297 memset (peer, 0, sizeof (tr_peer)); 298 299 peer->client = TR_KEY_NONE; 300 peer->swarm = tor->swarm; 301 tr_bitfieldConstruct (&peer->have, tor->info.pieceCount); 302 tr_bitfieldConstruct (&peer->blame, tor->blockCount); 303 } 304 305 static void peerDeclinedAllRequests (tr_swarm *, const tr_peer *); 306 307 void 308 tr_peerDestruct (tr_peer * peer) 309 { 310 assert (peer != NULL); 311 312 if (peer->swarm != NULL) 313 peerDeclinedAllRequests (peer->swarm, peer); 314 315 tr_bitfieldDestruct (&peer->have); 316 tr_bitfieldDestruct (&peer->blame); 317 318 if (peer->atom) 319 peer->atom->peer = NULL; 286 320 } 287 321 … … 290 324 **/ 291 325 292 static int 293 handshakeCompareToAddr (const void * va, const void * vb) 294 { 295 const tr_handshake * a = va; 296 297 return tr_address_compare (tr_handshakeGetAddr (a, NULL), vb); 298 } 299 300 static int 301 handshakeCompare (const void * a, const void * b) 302 { 303 return handshakeCompareToAddr (a, tr_handshakeGetAddr (b, NULL)); 304 } 305 306 static inline tr_handshake* 307 getExistingHandshake (tr_ptrArray * handshakes, const tr_address * addr) 308 { 309 if (tr_ptrArrayEmpty (handshakes)) 310 return NULL; 311 312 return tr_ptrArrayFindSorted (handshakes, addr, handshakeCompareToAddr); 313 } 314 315 static int 316 comparePeerAtomToAddress (const void * va, const void * vb) 317 { 318 const struct peer_atom * a = va; 319 320 return tr_address_compare (&a->addr, vb); 321 } 322 323 static int 324 compareAtomsByAddress (const void * va, const void * vb) 325 { 326 const struct peer_atom * b = vb; 327 328 assert (tr_isAtom (b)); 329 330 return comparePeerAtomToAddress (va, &b->addr); 326 static inline void 327 managerLock (const struct tr_peerMgr * manager) 328 { 329 tr_sessionLock (manager->session); 330 } 331 332 static inline void 333 managerUnlock (const struct tr_peerMgr * manager) 334 { 335 tr_sessionUnlock (manager->session); 336 } 337 338 static inline void 339 swarmLock (tr_swarm * swarm) 340 { 341 managerLock (swarm->manager); 342 } 343 344 static inline void 345 swarmUnlock (tr_swarm * swarm) 346 { 347 managerUnlock (swarm->manager); 348 } 349 350 static inline int 351 swarmIsLocked (const tr_swarm * swarm) 352 { 353 return tr_sessionIsLocked (swarm->manager->session); 331 354 } 332 355 … … 335 358 **/ 336 359 360 static int 361 handshakeCompareToAddr (const void * va, const void * vb) 362 { 363 const tr_handshake * a = va; 364 365 return tr_address_compare (tr_handshakeGetAddr (a, NULL), vb); 366 } 367 368 static int 369 handshakeCompare (const void * a, const void * b) 370 { 371 return handshakeCompareToAddr (a, tr_handshakeGetAddr (b, NULL)); 372 } 373 374 static inline tr_handshake* 375 getExistingHandshake (tr_ptrArray * handshakes, const tr_address * addr) 376 { 377 if (tr_ptrArrayEmpty (handshakes)) 378 return NULL; 379 380 return tr_ptrArrayFindSorted (handshakes, addr, handshakeCompareToAddr); 381 } 382 383 static int 384 comparePeerAtomToAddress (const void * va, const void * vb) 385 { 386 const struct peer_atom * a = va; 387 388 return tr_address_compare (&a->addr, vb); 389 } 390 391 static int 392 compareAtomsByAddress (const void * va, const void * vb) 393 { 394 const struct peer_atom * b = vb; 395 396 assert (tr_isAtom (b)); 397 398 return comparePeerAtomToAddress (va, &b->addr); 399 } 400 401 /** 402 *** 403 **/ 404 337 405 const tr_address * 338 406 tr_peerAddress (const tr_peer * peer) … … 376 444 } 377 445 378 void379 tr_peerConstruct (tr_peer * peer)380 {381 memset (peer, 0, sizeof (tr_peer));382 383 peer->have = TR_BITFIELD_INIT;384 }385 386 static tr_peer*387 peerNew (struct peer_atom * atom)388 {389 tr_peer * peer = tr_new (tr_peer, 1);390 tr_peerConstruct (peer);391 392 peer->atom = atom;393 atom->peer = peer;394 395 return peer;396 }397 398 static tr_peer*399 getPeer (tr_swarm * s, struct peer_atom * atom)400 {401 tr_peer * peer;402 403 assert (swarmIsLocked (s));404 405 peer = atom->peer;406 407 if (peer == NULL)408 {409 peer = peerNew (atom);410 tr_bitfieldConstruct (&peer->have, s->tor->info.pieceCount);411 tr_bitfieldConstruct (&peer->blame, s->tor->blockCount);412 tr_ptrArrayInsertSorted (&s->peers, peer, peerCompare);413 }414 415 return peer;416 }417 418 static void peerDeclinedAllRequests (tr_swarm *, const tr_peer *);419 420 void421 tr_peerDestruct (tr_torrent * tor, tr_peer * peer)422 {423 assert (peer != NULL);424 425 peerDeclinedAllRequests (tor->swarm, peer);426 427 if (peer->msgs != NULL)428 tr_peerMsgsFree (peer->msgs);429 430 if (peer->io)431 {432 tr_peerIoClear (peer->io);433 tr_peerIoUnref (peer->io); /* balanced by the ref in handshakeDoneCB () */434 }435 436 tr_bitfieldDestruct (&peer->have);437 tr_bitfieldDestruct (&peer->blame);438 439 if (peer->atom)440 peer->atom->peer = NULL;441 }442 443 static void444 peerDelete (tr_swarm * s, tr_peer * peer)445 {446 tr_peerDestruct (s->tor, peer);447 tr_free (peer);448 }449 450 446 static inline bool 451 447 replicationExists (const tr_swarm * s) … … 467 463 tr_piece_index_t piece_i; 468 464 const tr_piece_index_t piece_count = s->tor->info.pieceCount; 469 tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&s->peers); 470 const int peer_count = tr_ptrArraySize (&s->peers); 465 const int n = tr_ptrArraySize (&s->peers); 471 466 472 467 assert (!replicationExists (s)); … … 480 475 uint16_t r = 0; 481 476 482 for (peer_i=0; peer_i<peer_count; ++peer_i) 483 if (tr_bitfieldHas (&peers[peer_i]->have, piece_i)) 484 ++r; 477 for (peer_i=0; peer_i<n; ++peer_i) 478 { 479 tr_peer * peer = tr_ptrArrayNth (&s->peers, peer_i); 480 if (tr_bitfieldHas (&peer->have, piece_i)) 481 ++r; 482 } 485 483 486 484 s->pieceReplication[piece_i] = r; … … 499 497 assert (tr_ptrArrayEmpty (&s->peers)); 500 498 501 tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_ webseedFree);499 tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_peerFree); 502 500 tr_ptrArrayDestruct (&s->pool, (PtrArrayForeachFunc)tr_free); 503 501 tr_ptrArrayDestruct (&s->outgoingHandshakes, NULL); … … 520 518 521 519 /* clear the array */ 522 tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_ webseedFree);520 tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_peerFree); 523 521 s->webseeds = TR_PTR_ARRAY_INIT; 524 522 525 523 /* repopulate it */ 526 for (i = 0; i <inf->webseedCount; ++i)524 for (i=0; i<inf->webseedCount; ++i) 527 525 { 528 526 tr_webseed * w = tr_webseedNew (tor, inf->webseeds[i], peerCallbackFunc, s); … … 599 597 600 598 static int 601 clientIsDownloadingFrom (const tr_torrent * tor, const tr_peer * peer)599 clientIsDownloadingFrom (const tr_torrent * tor, const tr_peerMsgs * p) 602 600 { 603 601 if (!tr_torrentHasMetadata (tor)) 604 602 return true; 605 603 606 return peer->clientIsInterested && !peer->clientIsChoked;604 return tr_peerMsgsIsClientInterested (p) && !tr_peerMsgsIsClientChoked (p); 607 605 } 608 606 609 607 static int 610 clientIsUploadingTo (const tr_peer * peer)611 { 612 return peer->peerIsInterested && !peer->peerIsChoked;608 clientIsUploadingTo (const tr_peerMsgs * p) 609 { 610 return tr_peerMsgsIsPeerInterested (p) && !tr_peerMsgsIsPeerChoked (p); 613 611 } 614 612 … … 869 867 870 868 static int 871 countActiveWebseeds (const tr_swarm * s) 872 { 869 countActiveWebseeds (tr_swarm * s) 870 { 871 int i; 873 872 int activeCount = 0; 874 const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase (&s->webseeds);875 const tr_webseed ** const wend = w + tr_ptrArraySize (&s->webseeds);876 877 for ( ; w!=wend; ++w)878 if (tr_ webseedIsActive (*w))873 const int n = tr_ptrArraySize (&s->webseeds); 874 const uint64_t now = tr_time_msec (); 875 876 for (i=0; i<n; ++i) 877 if (tr_peerIsTransferringPieces (tr_ptrArrayNth(&s->webseeds,i), now, TR_DOWN, NULL)) 879 878 ++activeCount; 880 879 … … 903 902 else if (!s->endgame) /* only recalculate when endgame first begins */ 904 903 { 904 int i; 905 905 int numDownloading = 0; 906 const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase (&s->peers); 907 const tr_peer ** const pend = p + tr_ptrArraySize (&s->peers); 906 const int n = tr_ptrArraySize (&s->peers); 908 907 909 908 /* add the active bittorrent peers... */ 910 for (; p!=pend; ++p) 911 if ((*p)->pendingReqsToPeer > 0) 912 ++numDownloading; 909 for (i=0; i<n; ++i) 910 { 911 const tr_peer * p = tr_ptrArrayNth (&s->peers, i); 912 if (p->pendingReqsToPeer > 0) 913 ++numDownloading; 914 } 913 915 914 916 /* add the active webseeds... */ … … 1341 1343 /* sanity clause */ 1342 1344 assert (tr_isTorrent (tor)); 1343 assert (peer->clientIsInterested);1344 assert (!peer->clientIsChoked);1345 1345 assert (numwant > 0); 1346 1346 … … 1516 1516 for (it=s->requests, end=it+n; it!=end; ++it) 1517 1517 { 1518 if ((it->sentAt <= too_old) && it->peer->msgs && !tr_peerMsgsIsReadingBlock (it->peer->msgs, it->block)) 1518 tr_peerMsgs * msgs = PEER_MSGS(it->peer); 1519 1520 if ((msgs !=NULL) && (it->sentAt <= too_old) && !tr_peerMsgsIsReadingBlock (msgs, it->block)) 1519 1521 cancel[cancelCount++] = *it; 1520 1522 else … … 1530 1532 1531 1533 /* send cancel messages for all the "cancel" ones */ 1532 for (it=cancel, end=it+cancelCount; it!=end; ++it) { 1533 if ((it->peer != NULL) && (it->peer->msgs != NULL)) { 1534 tr_historyAdd (&it->peer->cancelsSentToPeer, now, 1); 1535 tr_peerMsgsCancel (it->peer->msgs, it->block); 1536 decrementPendingReqCount (it); 1534 for (it=cancel, end=it+cancelCount; it!=end; ++it) 1535 { 1536 tr_peerMsgs * msgs = PEER_MSGS(it->peer); 1537 1538 if (msgs != NULL) 1539 { 1540 tr_historyAdd (&it->peer->cancelsSentToPeer, now, 1); 1541 tr_peerMsgsCancel (msgs, it->block); 1542 decrementPendingReqCount (it); 1537 1543 } 1538 1544 } … … 1656 1662 tr_peer * p = peers[i]; 1657 1663 1658 if ((p != no_notify) && (p ->msgs!= NULL))1664 if ((p != no_notify) && (p != NULL)) 1659 1665 { 1660 1666 tr_historyAdd (&p->cancelsSentToPeer, tr_time (), 1); 1661 tr_peerMsgsCancel ( p->msgs, block);1667 tr_peerMsgsCancel (PEER_MSGS(p), block); 1662 1668 } 1663 1669 … … 1671 1677 tr_peerMgrPieceCompleted (tr_torrent * tor, tr_piece_index_t p) 1672 1678 { 1679 int i; 1673 1680 bool pieceCameFromPeers = false; 1674 1681 tr_swarm * const s = tor->swarm; 1675 const tr_peer ** peer = (const tr_peer **) tr_ptrArrayBase (&s->peers); 1676 const tr_peer ** const pend = peer + tr_ptrArraySize (&s->peers); 1682 const int n = tr_ptrArraySize (&s->peers); 1677 1683 1678 1684 /* walk through our peers */ 1679 for ( ; peer!=pend; ++peer) 1680 { 1685 for (i=0; i<n; ++i) 1686 { 1687 tr_peer * peer = tr_ptrArrayNth (&s->peers, i); 1688 1681 1689 /* notify the peer that we now have this piece */ 1682 tr_peerMsgsHave ( (*peer)->msgs, p);1690 tr_peerMsgsHave (PEER_MSGS(peer), p); 1683 1691 1684 1692 if (!pieceCameFromPeers) 1685 pieceCameFromPeers = tr_bitfieldHas (& (*peer)->blame, p);1693 pieceCameFromPeers = tr_bitfieldHas (&peer->blame, p); 1686 1694 } 1687 1695 … … 1902 1910 } 1903 1911 1912 1913 static void 1914 createBitTorrentPeer (tr_torrent * tor, 1915 struct tr_peerIo * io, 1916 struct peer_atom * atom, 1917 tr_quark client) 1918 { 1919 tr_peer * peer; 1920 tr_swarm * swarm; 1921 1922 assert (atom != NULL); 1923 assert (tr_isTorrent (tor)); 1924 assert (tor->swarm != NULL); 1925 1926 swarm = tor->swarm; 1927 1928 peer = (tr_peer*) tr_peerMsgsNew (tor, io, peerCallbackFunc, swarm); 1929 peer->atom = atom; 1930 peer->client = client; 1931 atom->peer = peer; 1932 1933 tr_ptrArrayInsertSorted (&swarm->peers, peer, peerCompare); 1934 } 1935 1936 1904 1937 /* FIXME: this is kind of a mess. */ 1905 1938 static bool … … 1998 2031 else 1999 2032 { 2000 peer = getPeer (s, atom); 2001 2002 if (!peer_id) 2003 peer->client = TR_KEY_NONE; 2033 tr_quark client; 2034 tr_peerIo * io; 2035 char buf[128]; 2036 2037 if (peer_id != NULL) 2038 client = tr_quark_new (tr_clientForId (buf, sizeof (buf), peer_id), -1); 2004 2039 else 2005 { 2006 char client[128]; 2007 tr_clientForId (client, sizeof (client), peer_id); 2008 peer->client = tr_quark_new (client, -1); 2009 } 2010 2011 peer->io = tr_handshakeStealIO (handshake); /* this steals its refcount too, which is 2012 balanced by our unref in peerDelete () */ 2013 tr_peerIoSetParent (peer->io, &s->tor->bandwidth); 2014 tr_peerMsgsNew (s->tor, peer, peerCallbackFunc, s); 2040 client = TR_KEY_NONE; 2041 2042 io = tr_handshakeStealIO (handshake); /* this steals its refcount too, which is 2043 balanced by our unref in peerDelete () */ 2044 tr_peerIoSetParent (io, &s->tor->bandwidth); 2045 createBitTorrentPeer (s->tor, io, atom, client); 2015 2046 2016 2047 success = true; … … 2401 2432 /* disconnect the peers. */ 2402 2433 while ((peer = tr_ptrArrayPop (&swarm->peers))) 2403 peerDelete (swarm,peer);2434 tr_peerFree (peer); 2404 2435 2405 2436 /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB (), … … 2534 2565 peerIsSeed (const tr_peer * peer) 2535 2566 { 2536 2537 2538 2539 2540 2541 2542 2567 if (peer->progress >= 1.0) 2568 return true; 2569 2570 if (peer->atom && atomIsSeed (peer->atom)) 2571 return true; 2572 2573 return false; 2543 2574 } 2544 2575 … … 2596 2627 { 2597 2628 int i; 2598 int size;2629 int n; 2599 2630 tr_swarm * s; 2600 const tr_peer ** peers;2601 2631 2602 2632 assert (tr_isTorrent (tor)); … … 2608 2638 2609 2639 s = tor->swarm; 2610 size = tr_ptrArraySize (&s->peers); 2611 peers = (const tr_peer **) tr_ptrArrayBase (&s->peers); 2640 n = tr_ptrArraySize (&s->peers); 2612 2641 2613 2642 for (i=0; i<TR_PEER_FROM__MAX; ++i) 2614 2643 setmePeersFrom[i] = 0; 2615 2644 2616 for (i=0; i<size; ++i) 2617 { 2618 const tr_peer * peer = peers[i]; 2645 for (i=0; i<n; ++i) 2646 { 2647 tr_peer * peer = tr_ptrArrayNth (&s->peers, i); 2648 tr_peerMsgs * msgs = PEER_MSGS (peer); 2619 2649 const struct peer_atom * atom = peer->atom; 2620 2650 2621 if (peer->io == NULL) /* not connected */ 2622 continue; 2651 assert (msgs != NULL); 2623 2652 2624 2653 ++*setmePeersConnected; … … 2626 2655 ++setmePeersFrom[atom->fromFirst]; 2627 2656 2628 if (clientIsDownloadingFrom (tor, peer))2657 if (clientIsDownloadingFrom (tor, msgs)) 2629 2658 ++*setmePeersSendingToUs; 2630 2659 2631 if (clientIsUploadingTo ( peer))2660 if (clientIsUploadingTo (msgs)) 2632 2661 ++*setmePeersGettingFromUs; 2633 2662 } … … 2640 2669 { 2641 2670 unsigned int i; 2642 unsigned int webseedCount; 2643 const tr_swarm * s; 2644 const tr_webseed ** webseeds; 2671 tr_swarm * s; 2672 unsigned int n; 2645 2673 double * ret = NULL; 2646 2674 const uint64_t now = tr_time_msec (); … … 2649 2677 2650 2678 s = tor->swarm; 2651 webseedCount = tr_ptrArraySize (&s->webseeds); 2652 webseeds = (const tr_webseed**) tr_ptrArrayBase (&s->webseeds); 2653 ret = tr_new0 (double, webseedCount); 2679 n = tr_ptrArraySize (&s->webseeds); 2680 ret = tr_new0 (double, n); 2654 2681 2655 2682 assert (s->manager != NULL); 2656 assert ( webseedCount== tor->info.webseedCount);2657 2658 for (i=0; i< webseedCount; ++i)2659 { 2660 unsigned int Bps ;2661 if (tr_ webseedGetSpeed_Bps (webseeds[i], now, &Bps))2683 assert (n == tor->info.webseedCount); 2684 2685 for (i=0; i<n; ++i) 2686 { 2687 unsigned int Bps = 0; 2688 if (tr_peerIsTransferringPieces (tr_ptrArrayNth(&s->webseeds,i), now, TR_DOWN, &Bps)) 2662 2689 ret[i] = Bps / (double)tr_speed_K; 2663 2690 else … … 2666 2693 2667 2694 return ret; 2668 }2669 2670 unsigned int2671 tr_peerGetPieceSpeed_Bps (const tr_peer * peer, uint64_t now, tr_direction direction)2672 {2673 return peer->io ? tr_peerIoGetPieceSpeed_Bps (peer->io, now, direction) : 0.0;2674 2695 } 2675 2696 … … 2681 2702 tr_peer_stat * ret; 2682 2703 const tr_swarm * s; 2683 consttr_peer ** peers;2704 tr_peer ** peers; 2684 2705 const time_t now = tr_time (); 2685 2706 const uint64_t now_msec = tr_time_msec (); … … 2689 2710 2690 2711 s = tor->swarm; 2691 peers = ( consttr_peer**) tr_ptrArrayBase (&s->peers);2712 peers = (tr_peer**) tr_ptrArrayBase (&s->peers); 2692 2713 size = tr_ptrArraySize (&s->peers); 2693 2714 ret = tr_new0 (tr_peer_stat, size); … … 2696 2717 { 2697 2718 char * pch; 2698 const tr_peer * peer = peers[i]; 2719 tr_peer * peer = peers[i]; 2720 tr_peerMsgs * msgs = PEER_MSGS (peer); 2699 2721 const struct peer_atom * atom = peer->atom; 2700 2722 tr_peer_stat * stat = ret + i; … … 2705 2727 stat->from = atom->fromFirst; 2706 2728 stat->progress = peer->progress; 2707 stat->isUTP = peer->io->utp_socket != NULL;2708 stat->isEncrypted = tr_peer IoIsEncrypted (peer->io) ? 1 : 0;2729 stat->isUTP = tr_peerMsgsIsUtpConnection (msgs); 2730 stat->isEncrypted = tr_peerMsgsIsEncrypted (msgs); 2709 2731 stat->rateToPeer_KBps = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_CLIENT_TO_PEER)); 2710 2732 stat->rateToClient_KBps = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_PEER_TO_CLIENT)); 2711 stat->peerIsChoked = peer->peerIsChoked;2712 stat->peerIsInterested = peer->peerIsInterested;2713 stat->clientIsChoked = peer->clientIsChoked;2714 stat->clientIsInterested = peer->clientIsInterested;2715 stat->isIncoming = tr_peer IoIsIncoming (peer->io);2716 stat->isDownloadingFrom = clientIsDownloadingFrom (tor, peer);2717 stat->isUploadingTo = clientIsUploadingTo ( peer);2733 stat->peerIsChoked = tr_peerMsgsIsPeerChoked (msgs); 2734 stat->peerIsInterested = tr_peerMsgsIsPeerInterested (msgs); 2735 stat->clientIsChoked = tr_peerMsgsIsClientChoked (msgs); 2736 stat->clientIsInterested = tr_peerMsgsIsClientInterested (msgs); 2737 stat->isIncoming = tr_peerMsgsIsIncomingConnection (msgs); 2738 stat->isDownloadingFrom = clientIsDownloadingFrom (tor, msgs); 2739 stat->isUploadingTo = clientIsUploadingTo (msgs); 2718 2740 stat->isSeed = peerIsSeed (peer); 2719 2741 … … 2728 2750 pch = stat->flagStr; 2729 2751 if (stat->isUTP) *pch++ = 'T'; 2730 if (s->optimistic == peer) *pch++ = 'O';2752 if (s->optimistic == msgs) *pch++ = 'O'; 2731 2753 if (stat->isDownloadingFrom) *pch++ = 'D'; 2732 2754 else if (stat->clientIsInterested) *pch++ = 'd'; … … 2762 2784 2763 2785 for (i=0; i<peerCount; ++i) 2764 { 2765 const tr_peer * peer = tr_ptrArrayNth (&s->peers, i); 2766 tr_peerMsgsSetInterested (peer->msgs, false); 2767 } 2786 tr_peerMsgsSetInterested (tr_ptrArrayNth (&s->peers, i), false); 2768 2787 } 2769 2788 … … 2921 2940 if (!isPeerInteresting (s->tor, piece_is_interesting, peer)) 2922 2941 { 2923 tr_peerMsgsSetInterested ( peer->msgs, false);2942 tr_peerMsgsSetInterested (PEER_MSGS(peer), false); 2924 2943 } 2925 2944 else … … 2958 2977 s->interestedCount = MIN (maxPeers, rechoke_count); 2959 2978 for (i=0; i<rechoke_count; ++i) 2960 tr_peerMsgsSetInterested ( rechoke[i].peer->msgs, i<s->interestedCount);2979 tr_peerMsgsSetInterested (PEER_MSGS(rechoke[i].peer), i<s->interestedCount); 2961 2980 2962 2981 /* cleanup */ … … 2970 2989 struct ChokeData 2971 2990 { 2972 bool isInterested;2973 bool wasChoked;2974 bool isChoked;2975 int rate;2976 int salt;2977 tr_peer * peer;2991 bool isInterested; 2992 bool wasChoked; 2993 bool isChoked; 2994 int rate; 2995 int salt; 2996 tr_peerMsgs * msgs; 2978 2997 }; 2979 2998 … … 2997 3016 2998 3017 /* is this a new connection? */ 2999 static int3000 isNew (const tr_peer * peer)3001 { 3002 return peer && peer->io && tr_peerIoGetAge (peer->io) < 45;3018 static bool 3019 isNew (const tr_peerMsgs * msgs) 3020 { 3021 return (msgs != NULL) && (tr_peerMsgsGetConnectionAge (msgs) < 45); 3003 3022 } 3004 3023 … … 3066 3085 { 3067 3086 tr_peer * peer = peers[i]; 3087 tr_peerMsgs * msgs = PEER_MSGS (peer); 3088 3068 3089 struct peer_atom * atom = peer->atom; 3069 3090 3070 3091 if (peerIsSeed (peer)) /* choke seeds and partial seeds */ 3071 3092 { 3072 tr_peerMsgsSetChoke ( peer->msgs, true);3093 tr_peerMsgsSetChoke (PEER_MSGS(peer), true); 3073 3094 } 3074 3095 else if (chokeAll) /* choke everyone if we're not uploading */ 3075 3096 { 3076 tr_peerMsgsSetChoke ( peer->msgs, true);3097 tr_peerMsgsSetChoke (PEER_MSGS(peer), true); 3077 3098 } 3078 else if ( peer!= s->optimistic)3099 else if (msgs != s->optimistic) 3079 3100 { 3080 3101 struct ChokeData * n = &choke[size++]; 3081 n-> peer = peer;3082 n->isInterested = peer->peerIsInterested;3083 n->wasChoked = peer->peerIsChoked;3102 n->msgs = msgs; 3103 n->isInterested = tr_peerMsgsIsPeerInterested (msgs); 3104 n->wasChoked = tr_peerMsgsIsPeerChoked (msgs); 3084 3105 n->rate = getRate (s->tor, atom, now); 3085 3106 n->salt = tr_cryptoWeakRandInt (INT_MAX); … … 3124 3145 if (choke[i].isInterested) 3125 3146 { 3126 const tr_peer * peer = choke[i].peer;3147 const tr_peerMsgs * msgs = choke[i].msgs; 3127 3148 int x = 1, y; 3128 if (isNew ( peer)) x *= 3;3149 if (isNew (msgs)) x *= 3; 3129 3150 for (y=0; y<x; ++y) 3130 3151 tr_ptrArrayAppend (&randPool, &choke[i]); … … 3136 3157 c = tr_ptrArrayNth (&randPool, tr_cryptoWeakRandInt (n)); 3137 3158 c->isChoked = false; 3138 s->optimistic = c-> peer;3159 s->optimistic = c->msgs; 3139 3160 s->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER; 3140 3161 } … … 3144 3165 3145 3166 for (i=0; i<size; ++i) 3146 tr_peerMsgsSetChoke (choke[i]. peer->msgs, choke[i].isChoked);3167 tr_peerMsgsSetChoke (choke[i].msgs, choke[i].isChoked); 3147 3168 3148 3169 /* cleanup */ … … 3306 3327 3307 3328 assert (removed == peer); 3308 peerDelete (s,removed);3329 tr_peerFree (removed); 3309 3330 } 3310 3331 … … 3333 3354 } 3334 3355 3335 tordbg (s, "removing bad peer %s", tr_ peerIoGetAddrStr (peer->io));3356 tordbg (s, "removing bad peer %s", tr_atomAddrStr (peer->atom)); 3336 3357 removePeer (s, peer); 3337 3358 } … … 3555 3576 3556 3577 for (j=0; j<tr_ptrArraySize (&s->peers); ++j) 3557 { 3558 tr_peer * peer = tr_ptrArrayNth (&s->peers, j); 3559 tr_peerMsgsPulse (peer->msgs); 3560 } 3578 tr_peerMsgsPulse (tr_ptrArrayNth (&s->peers, j)); 3561 3579 } 3562 3580 } -
trunk/libtransmission/peer-mgr.h
r13948 r13954 66 66 tr_pex; 67 67 68 68 struct peer_atom; 69 69 struct tr_peerIo; 70 struct tr_peermsgs; 71 72 /* opaque forward declaration */ 73 struct peer_atom; 74 75 void tr_peerConstruct (struct tr_peer * peer); 76 77 void tr_peerDestruct (tr_torrent * tor, struct tr_peer * peer); 78 70 struct tr_peerMsgs; 71 struct tr_swarm; 79 72 80 73 static inline bool 81 74 tr_isPex (const tr_pex * pex) 82 75 { 83 76 return pex && tr_address_is_valid (&pex->addr); 84 77 } 85 78 … … 88 81 int tr_pexCompare (const void * a, const void * b); 89 82 90 tr_peerMgr * tr_peerMgrNew (tr_session *);91 92 void tr_peerMgrFree (tr_peerMgr* manager);93 94 bool tr_peerMgrPeerIsSeed (const tr_torrent* tor,95 const tr_address* addr);96 97 void tr_peerMgrSetUtpSupported (tr_torrent* tor,98 const tr_address* addr);99 100 void tr_peerMgrSetUtpFailed (tr_torrent *tor,101 const tr_address *addr,102 boolfailed);103 104 void tr_peerMgrGetNextRequests(tr_torrent * torrent,105 tr_peer * peer,106 int numwant,107 tr_block_index_t * setme,108 int * numgot,109 bool get_intervals);110 111 bool tr_peerMgrDidPeerRequest (const tr_torrent* torrent,112 const tr_peer* peer,113 tr_block_index_tblock);114 115 void tr_peerMgrRebuildRequests (tr_torrent* torrent);116 117 void tr_peerMgrAddIncoming (tr_peerMgr* manager,118 tr_address* addr,119 tr_portport,120 intsocket,121 struct UTPSocket *utp_socket);122 123 tr_pex * tr_peerMgrCompactToPex (const void* compact,124 size_tcompactLen,125 const uint8_t* added_f,126 size_tadded_f_len,127 size_t* setme_pex_count);128 129 tr_pex * tr_peerMgrCompact6ToPex (const void* compact,130 size_tcompactLen,131 const uint8_t* added_f,132 size_tadded_f_len,133 size_t* pexCount);134 135 tr_pex * tr_peerMgrArrayToPex (const void* array,136 size_tarrayLen,137 size_t* setme_pex_count);83 tr_peerMgr * tr_peerMgrNew (tr_session * session); 84 85 void tr_peerMgrFree (tr_peerMgr * manager); 86 87 bool tr_peerMgrPeerIsSeed (const tr_torrent * tor, 88 const tr_address * addr); 89 90 void tr_peerMgrSetUtpSupported (tr_torrent * tor, 91 const tr_address * addr); 92 93 void tr_peerMgrSetUtpFailed (tr_torrent * tor, 94 const tr_address * addr, 95 bool failed); 96 97 void tr_peerMgrGetNextRequests (tr_torrent * torrent, 98 tr_peer * peer, 99 int numwant, 100 tr_block_index_t * setme, 101 int * numgot, 102 bool get_intervals); 103 104 bool tr_peerMgrDidPeerRequest (const tr_torrent * torrent, 105 const tr_peer * peer, 106 tr_block_index_t block); 107 108 void tr_peerMgrRebuildRequests (tr_torrent * torrent); 109 110 void tr_peerMgrAddIncoming (tr_peerMgr * manager, 111 tr_address * addr, 112 tr_port port, 113 int socket, 114 struct UTPSocket * utp_socket); 115 116 tr_pex * tr_peerMgrCompactToPex (const void * compact, 117 size_t compactLen, 118 const uint8_t * added_f, 119 size_t added_f_len, 120 size_t * setme_pex_count); 121 122 tr_pex * tr_peerMgrCompact6ToPex (const void * compact, 123 size_t compactLen, 124 const uint8_t * added_f, 125 size_t added_f_len, 126 size_t * pexCount); 127 128 tr_pex * tr_peerMgrArrayToPex (const void * array, 129 size_t arrayLen, 130 size_t * setme_pex_count); 138 131 139 132 /** 140 133 * @param seedProbability [0..100] for likelihood that the peer is a seed; -1 for unknown 141 134 */ 142 void tr_peerMgrAddPex (tr_torrent* tor,143 uint8_tfrom,144 const tr_pex* pex,145 int8_tseedProbability);146 147 void tr_peerMgrMarkAllAsSeeds (tr_torrent* tor);135 void tr_peerMgrAddPex (tr_torrent * tor, 136 uint8_t from, 137 const tr_pex * pex, 138 int8_t seedProbability); 139 140 void tr_peerMgrMarkAllAsSeeds (tr_torrent * tor); 148 141 149 142 enum 150 143 { 151 152 144 TR_PEERS_CONNECTED, 145 TR_PEERS_INTERESTING 153 146 }; 154 147 155 int tr_peerMgrGetPeers (tr_torrent * tor, 156 tr_pex ** setme_pex, 157 uint8_t address_type, 158 uint8_t peer_list_mode, 159 int max_peer_count); 160 161 void tr_peerMgrStartTorrent (tr_torrent * tor); 162 163 void tr_peerMgrStopTorrent (tr_torrent * tor); 164 165 void tr_peerMgrAddTorrent (tr_peerMgr * manager, 166 struct tr_torrent * tor); 167 168 void tr_peerMgrRemoveTorrent (tr_torrent * tor); 169 170 void tr_peerMgrTorrentAvailability (const tr_torrent * tor, 171 int8_t * tab, 172 unsigned int tabCount); 173 174 uint64_t tr_peerMgrGetDesiredAvailable (const tr_torrent * tor); 175 176 void tr_peerMgrOnTorrentGotMetainfo (tr_torrent * tor); 177 178 void tr_peerMgrOnBlocklistChanged (tr_peerMgr * manager); 179 180 void tr_peerMgrTorrentStats (tr_torrent * tor, 181 int * setmePeersConnected, 182 int * setmeWebseedsSendingToUs, 183 int * setmePeersSendingToUs, 184 int * setmePeersGettingFromUs, 185 int * setmePeersFrom); /* TR_PEER_FROM__MAX */ 186 187 struct tr_peer_stat* tr_peerMgrPeerStats (const tr_torrent * tor, 188 int * setmeCount); 189 190 double* tr_peerMgrWebSpeeds_KBps (const tr_torrent * tor); 191 192 193 unsigned int tr_peerGetPieceSpeed_Bps (const tr_peer * peer, 194 uint64_t now, 195 tr_direction direction); 196 197 void tr_peerMgrClearInterest (tr_torrent * tor); 198 199 void tr_peerMgrGotBadPiece (tr_torrent * tor, tr_piece_index_t pieceIndex); 200 201 void tr_peerMgrPieceCompleted (tr_torrent * tor, tr_piece_index_t pieceIndex); 148 int tr_peerMgrGetPeers (tr_torrent * tor, 149 tr_pex ** setme_pex, 150 uint8_t address_type, 151 uint8_t peer_list_mode, 152 int max_peer_count); 153 154 void tr_peerMgrStartTorrent (tr_torrent * tor); 155 156 void tr_peerMgrStopTorrent (tr_torrent * tor); 157 158 void tr_peerMgrAddTorrent (tr_peerMgr * manager, 159 struct tr_torrent * tor); 160 161 void tr_peerMgrRemoveTorrent (tr_torrent * tor); 162 163 void tr_peerMgrTorrentAvailability (const tr_torrent * tor, 164 int8_t * tab, 165 unsigned int tabCount); 166 167 uint64_t tr_peerMgrGetDesiredAvailable (const tr_torrent * tor); 168 169 void tr_peerMgrOnTorrentGotMetainfo (tr_torrent * tor); 170 171 void tr_peerMgrOnBlocklistChanged (tr_peerMgr * manager); 172 173 void tr_peerMgrTorrentStats (tr_torrent * tor, 174 int * setmePeersConnected, 175 int * setmeWebseedsSendingToUs, 176 int * setmePeersSendingToUs, 177 int * setmePeersGettingFromUs, 178 int * setmePeersFrom); /* TR_PEER_FROM__MAX */ 179 180 struct tr_peer_stat * tr_peerMgrPeerStats (const tr_torrent * tor, 181 int * setmeCount); 182 183 double * tr_peerMgrWebSpeeds_KBps (const tr_torrent * tor); 184 185 unsigned int tr_peerGetPieceSpeed_Bps (const tr_peer * peer, 186 uint64_t now, 187 tr_direction direction); 188 189 void tr_peerMgrClearInterest (tr_torrent * tor); 190 191 void tr_peerMgrGotBadPiece (tr_torrent * tor, 192 tr_piece_index_t pieceIndex); 193 194 void tr_peerMgrPieceCompleted (tr_torrent * tor, 195 tr_piece_index_t pieceIndex); 202 196 203 197 -
trunk/libtransmission/peer-msgs.c
r13949 r13954 43 43 enum 44 44 { 45 BT_CHOKE = 0, 46 BT_UNCHOKE = 1, 47 BT_INTERESTED = 2, 48 BT_NOT_INTERESTED = 3, 49 BT_HAVE = 4, 50 BT_BITFIELD = 5, 51 BT_REQUEST = 6, 52 BT_PIECE = 7, 53 BT_CANCEL = 8, 54 BT_PORT = 9, 55 56 BT_FEXT_SUGGEST = 13, 57 BT_FEXT_HAVE_ALL = 14, 58 BT_FEXT_HAVE_NONE = 15, 59 BT_FEXT_REJECT = 16, 60 BT_FEXT_ALLOWED_FAST = 17, 61 62 BT_LTEP = 20, 63 64 LTEP_HANDSHAKE = 0, 65 66 UT_PEX_ID = 1, 67 UT_METADATA_ID = 3, 68 69 MAX_PEX_PEER_COUNT = 50, 70 71 MIN_CHOKE_PERIOD_SEC = 10, 72 73 /* idle seconds before we send a keepalive */ 74 KEEPALIVE_INTERVAL_SECS = 100, 75 76 PEX_INTERVAL_SECS = 90, /* sec between sendPex () calls */ 77 78 REQQ = 512, 79 80 METADATA_REQQ = 64, 81 82 /* used in lowering the outMessages queue period */ 83 IMMEDIATE_PRIORITY_INTERVAL_SECS = 0, 84 HIGH_PRIORITY_INTERVAL_SECS = 2, 85 LOW_PRIORITY_INTERVAL_SECS = 10, 86 87 /* number of pieces we'll allow in our fast set */ 88 MAX_FAST_SET_SIZE = 3, 89 90 /* how many blocks to keep prefetched per peer */ 91 PREFETCH_SIZE = 18, 92 93 /* when we're making requests from another peer, 94 batch them together to send enough requests to 95 meet our bandwidth goals for the next N seconds */ 96 REQUEST_BUF_SECS = 10, 97 98 /* defined in BEP #9 */ 99 METADATA_MSG_TYPE_REQUEST = 0, 100 METADATA_MSG_TYPE_DATA = 1, 101 METADATA_MSG_TYPE_REJECT = 2 45 BT_CHOKE = 0, 46 BT_UNCHOKE = 1, 47 BT_INTERESTED = 2, 48 BT_NOT_INTERESTED = 3, 49 BT_HAVE = 4, 50 BT_BITFIELD = 5, 51 BT_REQUEST = 6, 52 BT_PIECE = 7, 53 BT_CANCEL = 8, 54 BT_PORT = 9, 55 56 BT_FEXT_SUGGEST = 13, 57 BT_FEXT_HAVE_ALL = 14, 58 BT_FEXT_HAVE_NONE = 15, 59 BT_FEXT_REJECT = 16, 60 BT_FEXT_ALLOWED_FAST = 17, 61 62 BT_LTEP = 20, 63 64 LTEP_HANDSHAKE = 0, 65 66 UT_PEX_ID = 1, 67 UT_METADATA_ID = 3, 68 69 MAX_PEX_PEER_COUNT = 50, 70 71 MIN_CHOKE_PERIOD_SEC = 10, 72 73 /* idle seconds before we send a keepalive */ 74 KEEPALIVE_INTERVAL_SECS = 100, 75 76 PEX_INTERVAL_SECS = 90, /* sec between sendPex () calls */ 77 78 REQQ = 512, 79 80 METADATA_REQQ = 64, 81 82 MAGIC_NUMBER = 21549, 83 84 /* used in lowering the outMessages queue period */ 85 IMMEDIATE_PRIORITY_INTERVAL_SECS = 0, 86 HIGH_PRIORITY_INTERVAL_SECS = 2, 87 LOW_PRIORITY_INTERVAL_SECS = 10, 88 89 /* number of pieces we'll allow in our fast set */ 90 MAX_FAST_SET_SIZE = 3, 91 92 /* how many blocks to keep prefetched per peer */ 93 PREFETCH_SIZE = 18, 94 95 /* when we're making requests from another peer, 96 batch them together to send enough requests to 97 meet our bandwidth goals for the next N seconds */ 98 REQUEST_BUF_SECS = 10, 99 100 /* defined in BEP #9 */ 101 METADATA_MSG_TYPE_REQUEST = 0, 102 METADATA_MSG_TYPE_DATA = 1, 103 METADATA_MSG_TYPE_REJECT = 2 102 104 }; 103 105 104 106 enum 105 107 { 106 107 108 109 108 AWAITING_BT_LENGTH, 109 AWAITING_BT_ID, 110 AWAITING_BT_MESSAGE, 111 AWAITING_BT_PIECE 110 112 }; 111 113 … … 118 120 encryption_preference_t; 119 121 120 121 122 122 /** 123 123 *** … … 126 126 struct peer_request 127 127 { 128 uint32_tindex;129 uint32_toffset;130 uint32_tlength;128 uint32_t index; 129 uint32_t offset; 130 uint32_t length; 131 131 }; 132 132 … … 136 136 struct peer_request * setme) 137 137 { 138 139 140 138 tr_torrentGetBlockLocation (tor, block, &setme->index, 139 &setme->offset, 140 &setme->length); 141 141 } 142 142 … … 149 149 struct tr_incoming 150 150 { 151 152 153 154 struct evbuffer *block; /* piece data for incoming blocks */151 uint8_t id; 152 uint32_t length; /* includes the +1 for id length */ 153 struct peer_request blockReq; /* metadata for incoming blocks */ 154 struct evbuffer * block; /* piece data for incoming blocks */ 155 155 }; 156 156 … … 169 169 * @see tr_peer 170 170 */ 171 struct tr_peermsgs 172 { 173 bool peerSupportsPex; 174 bool peerSupportsMetadataXfer; 175 bool clientSentLtepHandshake; 176 bool peerSentLtepHandshake; 177 178 /*bool haveFastSet;*/ 179 180 int desiredRequestCount; 181 182 int prefetchCount; 183 184 /* how long the outMessages batch should be allowed to grow before 185 * it's flushed -- some messages (like requests >:) should be sent 186 * very quickly; others aren't as urgent. */ 187 int8_t outMessagesBatchPeriod; 188 189 uint8_t state; 190 uint8_t ut_pex_id; 191 uint8_t ut_metadata_id; 192 uint16_t pexCount; 193 uint16_t pexCount6; 194 195 tr_port dht_port; 196 197 encryption_preference_t encryption_preference; 198 199 size_t metadata_size_hint; 171 struct tr_peerMsgs 172 { 173 struct tr_peer peer; /* parent */ 174 175 uint16_t magic_number; 176 177 /* Whether or not we've choked this peer. */ 178 bool peer_is_choked; 179 180 /* whether or not the peer has indicated it will download from us. */ 181 bool peer_is_interested; 182 183 /* whether or the peer is choking us. */ 184 bool client_is_choked; 185 186 /* whether or not we've indicated to the peer that we would download from them if unchoked. */ 187 bool client_is_interested; 188 189 190 bool peerSupportsPex; 191 bool peerSupportsMetadataXfer; 192 bool clientSentLtepHandshake; 193 bool peerSentLtepHandshake; 194 195 /*bool haveFastSet;*/ 196 197 int desiredRequestCount; 198 199 int prefetchCount; 200 201 /* how long the outMessages batch should be allowed to grow before 202 * it's flushed -- some messages (like requests >:) should be sent 203 * very quickly; others aren't as urgent. */ 204 int8_t outMessagesBatchPeriod; 205 206 uint8_t state; 207 uint8_t ut_pex_id; 208 uint8_t ut_metadata_id; 209 uint16_t pexCount; 210 uint16_t pexCount6; 211 212 tr_port dht_port; 213 214 encryption_preference_t encryption_preference; 215 216 size_t metadata_size_hint; 200 217 #if 0 201 202 218 size_t fastsetSize; 219 tr_piece_index_t fastset[MAX_FAST_SET_SIZE]; 203 220 #endif 204 221 205 tr_peer * peer; 206 207 tr_torrent * torrent; 208 209 tr_peer_callback * callback; 210 void * callbackData; 211 212 struct evbuffer * outMessages; /* all the non-piece messages */ 213 214 struct peer_request peerAskedFor[REQQ]; 215 216 int peerAskedForMetadata[METADATA_REQQ]; 217 int peerAskedForMetadataCount; 218 219 tr_pex * pex; 220 tr_pex * pex6; 221 222 /*time_t clientSentPexAt;*/ 223 time_t clientSentAnythingAt; 224 225 /* when we started batching the outMessages */ 226 time_t outMessagesBatchedAt; 227 228 struct tr_incoming incoming; 229 230 /* if the peer supports the Extension Protocol in BEP 10 and 231 supplied a reqq argument, it's stored here. Otherwise, the 232 value is zero and should be ignored. */ 233 int64_t reqq; 234 235 struct event * pexTimer; 222 tr_torrent * torrent; 223 224 tr_peer_callback * callback; 225 void * callbackData; 226 227 struct evbuffer * outMessages; /* all the non-piece messages */ 228 229 struct peer_request peerAskedFor[REQQ]; 230 231 int peerAskedForMetadata[METADATA_REQQ]; 232 int peerAskedForMetadataCount; 233 234 tr_pex * pex; 235 tr_pex * pex6; 236 237 /*time_t clientSentPexAt;*/ 238 time_t clientSentAnythingAt; 239 240 time_t chokeChangedAt; 241 242 /* when we started batching the outMessages */ 243 time_t outMessagesBatchedAt; 244 245 struct tr_incoming incoming; 246 247 /* if the peer supports the Extension Protocol in BEP 10 and 248 supplied a reqq argument, it's stored here. Otherwise, the 249 value is zero and should be ignored. */ 250 int64_t reqq; 251 252 struct event * pexTimer; 253 254 struct tr_peerIo * io; 236 255 }; 237 256 … … 241 260 242 261 static inline tr_session* 243 getSession (struct tr_peer msgs * msgs)244 { 245 262 getSession (struct tr_peerMsgs * msgs) 263 { 264 return msgs->torrent->session; 246 265 } 247 266 … … 252 271 static void 253 272 myDebug (const char * file, int line, 254 const struct tr_peer msgs * msgs,273 const struct tr_peerMsgs * msgs, 255 274 const char * fmt, ...) 256 275 { 257 258 259 260 { 261 262 263 264 265 266 267 268 269 270 tr_peerIoGetAddrStr (msgs->peer->io),271 tr_quark_get_string (msgs->peer->client, NULL));272 273 274 275 276 277 278 279 280 281 276 FILE * fp = tr_logGetFile (); 277 278 if (fp) 279 { 280 va_list args; 281 char timestr[64]; 282 struct evbuffer * buf = evbuffer_new (); 283 char * base = tr_basename (file); 284 char * message; 285 286 evbuffer_add_printf (buf, "[%s] %s - %s [%s]: ", 287 tr_logGetTimeStr (timestr, sizeof (timestr)), 288 tr_torrentName (msgs->torrent), 289 tr_peerIoGetAddrStr (msgs->io), 290 tr_quark_get_string (msgs->peer.client, NULL)); 291 va_start (args, fmt); 292 evbuffer_add_vprintf (buf, fmt, args); 293 va_end (args); 294 evbuffer_add_printf (buf, " (%s:%d)\n", base, line); 295 296 message = evbuffer_free_to_str (buf); 297 fputs (message, fp); 298 299 tr_free (base); 300 tr_free (message); 282 301 } 283 302 } … … 296 315 297 316 static void 298 pokeBatchPeriod (tr_peer msgs * msgs, int interval)299 { 300 301 { 302 303 304 } 305 } 306 307 static void 308 dbgOutMessageLen (tr_peer msgs * msgs)309 { 310 311 } 312 313 static void 314 protocolSendReject (tr_peer msgs * msgs, const struct peer_request * req)315 { 316 317 318 assert (tr_peerIoSupportsFEXT (msgs->peer->io));319 320 321 322 323 324 325 326 327 328 } 329 330 static void 331 protocolSendRequest (tr_peer msgs * msgs, const struct peer_request * req)332 { 333 334 335 336 337 338 339 340 341 342 343 344 } 345 346 static void 347 protocolSendCancel (tr_peer msgs * msgs, const struct peer_request * req)348 { 349 350 351 352 353 354 355 356 357 358 359 360 } 361 362 static void 363 protocolSendPort (tr_peer msgs *msgs, uint16_t port)364 { 365 366 367 368 369 370 371 } 372 373 static void 374 protocolSendHave (tr_peer msgs * msgs, uint32_t index)375 { 376 377 378 379 380 381 382 383 384 317 pokeBatchPeriod (tr_peerMsgs * msgs, int interval) 318 { 319 if (msgs->outMessagesBatchPeriod > interval) 320 { 321 msgs->outMessagesBatchPeriod = interval; 322 dbgmsg (msgs, "lowering batch interval to %d seconds", interval); 323 } 324 } 325 326 static void 327 dbgOutMessageLen (tr_peerMsgs * msgs) 328 { 329 dbgmsg (msgs, "outMessage size is now %zu", evbuffer_get_length (msgs->outMessages)); 330 } 331 332 static void 333 protocolSendReject (tr_peerMsgs * msgs, const struct peer_request * req) 334 { 335 struct evbuffer * out = msgs->outMessages; 336 337 assert (tr_peerIoSupportsFEXT (msgs->io)); 338 339 evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t)); 340 evbuffer_add_uint8 (out, BT_FEXT_REJECT); 341 evbuffer_add_uint32 (out, req->index); 342 evbuffer_add_uint32 (out, req->offset); 343 evbuffer_add_uint32 (out, req->length); 344 345 dbgmsg (msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length); 346 dbgOutMessageLen (msgs); 347 } 348 349 static void 350 protocolSendRequest (tr_peerMsgs * msgs, const struct peer_request * req) 351 { 352 struct evbuffer * out = msgs->outMessages; 353 354 evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t)); 355 evbuffer_add_uint8 (out, BT_REQUEST); 356 evbuffer_add_uint32 (out, req->index); 357 evbuffer_add_uint32 (out, req->offset); 358 evbuffer_add_uint32 (out, req->length); 359 360 dbgmsg (msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length); 361 dbgOutMessageLen (msgs); 362 pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); 363 } 364 365 static void 366 protocolSendCancel (tr_peerMsgs * msgs, const struct peer_request * req) 367 { 368 struct evbuffer * out = msgs->outMessages; 369 370 evbuffer_add_uint32 (out, sizeof (uint8_t) + 3 * sizeof (uint32_t)); 371 evbuffer_add_uint8 (out, BT_CANCEL); 372 evbuffer_add_uint32 (out, req->index); 373 evbuffer_add_uint32 (out, req->offset); 374 evbuffer_add_uint32 (out, req->length); 375 376 dbgmsg (msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length); 377 dbgOutMessageLen (msgs); 378 pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); 379 } 380 381 static void 382 protocolSendPort (tr_peerMsgs *msgs, uint16_t port) 383 { 384 struct evbuffer * out = msgs->outMessages; 385 386 dbgmsg (msgs, "sending Port %u", port); 387 evbuffer_add_uint32 (out, 3); 388 evbuffer_add_uint8 (out, BT_PORT); 389 evbuffer_add_uint16 (out, port); 390 } 391 392 static void 393 protocolSendHave (tr_peerMsgs * msgs, uint32_t index) 394 { 395 struct evbuffer * out = msgs->outMessages; 396 397 evbuffer_add_uint32 (out, sizeof (uint8_t) + sizeof (uint32_t)); 398 evbuffer_add_uint8 (out, BT_HAVE); 399 evbuffer_add_uint32 (out, index); 400 401 dbgmsg (msgs, "sending Have %u", index); 402 dbgOutMessageLen (msgs); 403 pokeBatchPeriod (msgs, LOW_PRIORITY_INTERVAL_SECS); 385 404 } 386 405 387 406 #if 0 388 407 static void 389 protocolSendAllowedFast (tr_peer msgs * msgs, uint32_t pieceIndex)390 { 391 tr_peerIo * io = msgs->peer->io;392 393 394 assert (tr_peerIoSupportsFEXT (msgs->peer->io));395 396 397 398 399 400 401 408 protocolSendAllowedFast (tr_peerMsgs * msgs, uint32_t pieceIndex) 409 { 410 tr_peerIo * io = msgs->io; 411 struct evbuffer * out = msgs->outMessages; 412 413 assert (tr_peerIoSupportsFEXT (msgs->io)); 414 415 evbuffer_add_uint32 (io, out, sizeof (uint8_t) + sizeof (uint32_t)); 416 evbuffer_add_uint8 (io, out, BT_FEXT_ALLOWED_FAST); 417 evbuffer_add_uint32 (io, out, pieceIndex); 418 419 dbgmsg (msgs, "sending Allowed Fast %u...", pieceIndex); 420 dbgOutMessageLen (msgs); 402 421 } 403 422 #endif 404 423 405 424 static void 406 protocolSendChoke (tr_peer msgs * msgs, int choke)407 { 408 409 410 411 412 413 414 415 416 } 417 418 static void 419 protocolSendHaveAll (tr_peer msgs * msgs)420 { 421 422 423 assert (tr_peerIoSupportsFEXT (msgs->peer->io));424 425 426 427 428 429 430 431 } 432 433 static void 434 protocolSendHaveNone (tr_peer msgs * msgs)435 { 436 437 438 assert (tr_peerIoSupportsFEXT (msgs->peer->io));439 440 441 442 443 444 445 425 protocolSendChoke (tr_peerMsgs * msgs, int choke) 426 { 427 struct evbuffer * out = msgs->outMessages; 428 429 evbuffer_add_uint32 (out, sizeof (uint8_t)); 430 evbuffer_add_uint8 (out, choke ? BT_CHOKE : BT_UNCHOKE); 431 432 dbgmsg (msgs, "sending %s...", choke ? "Choke" : "Unchoke"); 433 dbgOutMessageLen (msgs); 434 pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); 435 } 436 437 static void 438 protocolSendHaveAll (tr_peerMsgs * msgs) 439 { 440 struct evbuffer * out = msgs->outMessages; 441 442 assert (tr_peerIoSupportsFEXT (msgs->io)); 443 444 evbuffer_add_uint32 (out, sizeof (uint8_t)); 445 evbuffer_add_uint8 (out, BT_FEXT_HAVE_ALL); 446 447 dbgmsg (msgs, "sending HAVE_ALL..."); 448 dbgOutMessageLen (msgs); 449 pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); 450 } 451 452 static void 453 protocolSendHaveNone (tr_peerMsgs * msgs) 454 { 455 struct evbuffer * out = msgs->outMessages; 456 457 assert (tr_peerIoSupportsFEXT (msgs->io)); 458 459 evbuffer_add_uint32 (out, sizeof (uint8_t)); 460 evbuffer_add_uint8 (out, BT_FEXT_HAVE_NONE); 461 462 dbgmsg (msgs, "sending HAVE_NONE..."); 463 dbgOutMessageLen (msgs); 464 pokeBatchPeriod (msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); 446 465 } 447 466 … … 451 470 452 471 static void 453 publish (tr_peermsgs * msgs, tr_peer_event * e) 454 { 455 assert (msgs->peer); 456 assert (msgs->peer->msgs == msgs); 457 458 if (msgs->callback != NULL) 459 msgs->callback (msgs->peer, e, msgs->callbackData); 460 } 461 462 static void 463 fireError (tr_peermsgs * msgs, int err) 464 { 465 tr_peer_event e = TR_PEER_EVENT_INIT; 466 e.eventType = TR_PEER_ERROR; 467 e.err = err; 468 publish (msgs, &e); 469 } 470 471 static void 472 fireGotBlock (tr_peermsgs * msgs, const struct peer_request * req) 473 { 474 tr_peer_event e = TR_PEER_EVENT_INIT; 475 e.eventType = TR_PEER_CLIENT_GOT_BLOCK; 476 e.pieceIndex = req->index; 477 e.offset = req->offset; 478 e.length = req->length; 479 publish (msgs, &e); 480 } 481 482 static void 483 fireGotRej (tr_peermsgs * msgs, const struct peer_request * req) 484 { 485 tr_peer_event e = TR_PEER_EVENT_INIT; 486 e.eventType = TR_PEER_CLIENT_GOT_REJ; 487 e.pieceIndex = req->index; 488 e.offset = req->offset; 489 e.length = req->length; 490 publish (msgs, &e); 491 } 492 493 static void 494 fireGotChoke (tr_peermsgs * msgs) 495 { 496 tr_peer_event e = TR_PEER_EVENT_INIT; 497 e.eventType = TR_PEER_CLIENT_GOT_CHOKE; 498 publish (msgs, &e); 499 } 500 501 static void 502 fireClientGotHaveAll (tr_peermsgs * msgs) 503 { 504 tr_peer_event e = TR_PEER_EVENT_INIT; 505 e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL; 506 publish (msgs, &e); 507 } 508 509 static void 510 fireClientGotHaveNone (tr_peermsgs * msgs) 511 { 512 tr_peer_event e = TR_PEER_EVENT_INIT; 513 e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE; 514 publish (msgs, &e); 515 } 516 517 static void 518 fireClientGotPieceData (tr_peermsgs * msgs, uint32_t length) 519 { 520 tr_peer_event e = TR_PEER_EVENT_INIT; 521 e.length = length; 522 e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA; 523 publish (msgs, &e); 524 } 525 526 static void 527 firePeerGotPieceData (tr_peermsgs * msgs, uint32_t length) 528 { 529 tr_peer_event e = TR_PEER_EVENT_INIT; 530 e.length = length; 531 e.eventType = TR_PEER_PEER_GOT_PIECE_DATA; 532 publish (msgs, &e); 533 } 534 535 static void 536 fireClientGotSuggest (tr_peermsgs * msgs, uint32_t pieceIndex) 537 { 538 tr_peer_event e = TR_PEER_EVENT_INIT; 539 e.eventType = TR_PEER_CLIENT_GOT_SUGGEST; 540 e.pieceIndex = pieceIndex; 541 publish (msgs, &e); 542 } 543 544 static void 545 fireClientGotPort (tr_peermsgs * msgs, tr_port port) 546 { 547 tr_peer_event e = TR_PEER_EVENT_INIT; 548 e.eventType = TR_PEER_CLIENT_GOT_PORT; 549 e.port = port; 550 publish (msgs, &e); 551 } 552 553 static void 554 fireClientGotAllowedFast (tr_peermsgs * msgs, uint32_t pieceIndex) 555 { 556 tr_peer_event e = TR_PEER_EVENT_INIT; 557 e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST; 558 e.pieceIndex = pieceIndex; 559 publish (msgs, &e); 560 } 561 562 static void 563 fireClientGotBitfield (tr_peermsgs * msgs, tr_bitfield * bitfield) 564 { 565 tr_peer_event e = TR_PEER_EVENT_INIT; 566 e.eventType = TR_PEER_CLIENT_GOT_BITFIELD; 567 e.bitfield = bitfield; 568 publish (msgs, &e); 569 } 570 571 static void 572 fireClientGotHave (tr_peermsgs * msgs, tr_piece_index_t index) 573 { 574 tr_peer_event e = TR_PEER_EVENT_INIT; 575 e.eventType = TR_PEER_CLIENT_GOT_HAVE; 576 e.pieceIndex = index; 577 publish (msgs, &e); 472 publish (tr_peerMsgs * msgs, tr_peer_event * e) 473 { 474 if (msgs->callback != NULL) 475 msgs->callback (&msgs->peer, e, msgs->callbackData); 476 } 477 478 static void 479 fireError (tr_peerMsgs * msgs, int err) 480 { 481 tr_peer_event e = TR_PEER_EVENT_INIT; 482 e.eventType = TR_PEER_ERROR; 483 e.err = err; 484 publish (msgs, &e); 485 } 486 487 static void 488 fireGotBlock (tr_peerMsgs * msgs, const struct peer_request * req) 489 { 490 tr_peer_event e = TR_PEER_EVENT_INIT; 491 e.eventType = TR_PEER_CLIENT_GOT_BLOCK; 492 e.pieceIndex = req->index; 493 e.offset = req->offset; 494 e.length = req->length; 495 publish (msgs, &e); 496 } 497 498 static void 499 fireGotRej (tr_peerMsgs * msgs, const struct peer_request * req) 500 { 501 tr_peer_event e = TR_PEER_EVENT_INIT; 502 e.eventType = TR_PEER_CLIENT_GOT_REJ; 503 e.pieceIndex = req->index; 504 e.offset = req->offset; 505 e.length = req->length; 506 publish (msgs, &e); 507 } 508 509 static void 510 fireGotChoke (tr_peerMsgs * msgs) 511 { 512 tr_peer_event e = TR_PEER_EVENT_INIT; 513 e.eventType = TR_PEER_CLIENT_GOT_CHOKE; 514 publish (msgs, &e); 515 } 516 517 static void 518 fireClientGotHaveAll (tr_peerMsgs * msgs) 519 { 520 tr_peer_event e = TR_PEER_EVENT_INIT; 521 e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL; 522 publish (msgs, &e); 523 } 524 525 static void 526 fireClientGotHaveNone (tr_peerMsgs * msgs) 527 { 528 tr_peer_event e = TR_PEER_EVENT_INIT; 529 e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE; 530 publish (msgs, &e); 531 } 532 533 static void 534 fireClientGotPieceData (tr_peerMsgs * msgs, uint32_t length) 535 { 536 tr_peer_event e = TR_PEER_EVENT_INIT; 537 e.length = length; 538 e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA; 539 publish (msgs, &e); 540 } 541 542 static void 543 firePeerGotPieceData (tr_peerMsgs * msgs, uint32_t length) 544 { 545 tr_peer_event e = TR_PEER_EVENT_INIT; 546 e.length = length; 547 e.eventType = TR_PEER_PEER_GOT_PIECE_DATA; 548 publish (msgs, &e); 549 } 550 551 static void 552 fireClientGotSuggest (tr_peerMsgs * msgs, uint32_t pieceIndex) 553 { 554 tr_peer_event e = TR_PEER_EVENT_INIT; 555 e.eventType = TR_PEER_CLIENT_GOT_SUGGEST; 556 e.pieceIndex = pieceIndex; 557 publish (msgs, &e); 558 } 559 560 static void 561 fireClientGotPort (tr_peerMsgs * msgs, tr_port port) 562 { 563 tr_peer_event e = TR_PEER_EVENT_INIT; 564 e.eventType = TR_PEER_CLIENT_GOT_PORT; 565 e.port = port; 566 publish (msgs, &e); 567 } 568 569 static void 570 fireClientGotAllowedFast (tr_peerMsgs * msgs, uint32_t pieceIndex) 571 { 572 tr_peer_event e = TR_PEER_EVENT_INIT; 573 e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST; 574 e.pieceIndex = pieceIndex; 575 publish (msgs, &e); 576 } 577 578 static void 579 fireClientGotBitfield (tr_peerMsgs * msgs, tr_bitfield * bitfield) 580 { 581 tr_peer_event e = TR_PEER_EVENT_INIT; 582 e.eventType = TR_PEER_CLIENT_GOT_BITFIELD; 583 e.bitfield = bitfield; 584 publish (msgs, &e); 585 } 586 587 static void 588 fireClientGotHave (tr_peerMsgs * msgs, tr_piece_index_t index) 589 { 590 tr_peer_event e = TR_PEER_EVENT_INIT; 591 e.eventType = TR_PEER_CLIENT_GOT_HAVE; 592 e.pieceIndex = index; 593 publish (msgs, &e); 578 594 } 579 595 … … 640 656 641 657 static void 642 updateFastSet (tr_peer msgs * msgs UNUSED)643 { 644 const bool fext = tr_peerIoSupportsFEXT (msgs-> peer->io);658 updateFastSet (tr_peerMsgs * msgs UNUSED) 659 { 660 const bool fext = tr_peerIoSupportsFEXT (msgs->io); 645 661 const int peerIsNeedy = msgs->peer->progress < 0.10; 646 662 … … 648 664 { 649 665 size_t i; 650 const struct tr_address * addr = tr_peerIoGetAddress (msgs-> peer->io, NULL);666 const struct tr_address * addr = tr_peerIoGetAddress (msgs->io, NULL); 651 667 const tr_info * inf = &msgs->torrent->info; 652 668 const size_t numwant = MIN (MAX_FAST_SET_SIZE, inf->pieceCount); … … 668 684 669 685 static void 670 sendInterest (tr_peer msgs * msgs, bool clientIsInterested)671 { 672 673 674 675 assert (tr_isBool (clientIsInterested));676 677 msgs->peer->clientIsInterested = clientIsInterested;678 dbgmsg (msgs, "Sending %s", clientIsInterested? "Interested" : "Not Interested");679 680 evbuffer_add_uint8 (out, clientIsInterested? BT_INTERESTED : BT_NOT_INTERESTED);681 682 683 684 } 685 686 static void 687 updateInterest (tr_peer msgs * msgs UNUSED)686 sendInterest (tr_peerMsgs * msgs, bool b) 687 { 688 struct evbuffer * out = msgs->outMessages; 689 690 assert (msgs); 691 assert (tr_isBool (b)); 692 693 msgs->client_is_interested = b; 694 dbgmsg (msgs, "Sending %s", b ? "Interested" : "Not Interested"); 695 evbuffer_add_uint32 (out, sizeof (uint8_t)); 696 evbuffer_add_uint8 (out, b ? BT_INTERESTED : BT_NOT_INTERESTED); 697 698 pokeBatchPeriod (msgs, HIGH_PRIORITY_INTERVAL_SECS); 699 dbgOutMessageLen (msgs); 700 } 701 702 static void 703 updateInterest (tr_peerMsgs * msgs UNUSED) 688 704 { 689 705 /* FIXME -- might need to poke the mgr on startup */ … … 691 707 692 708 void 693 tr_peerMsgsSetInterested (tr_peer msgs * msgs, bool clientIsInterested)694 { 695 assert (tr_isBool (clientIsInterested));696 697 if (clientIsInterested != msgs->peer->clientIsInterested)698 sendInterest (msgs, clientIsInterested);709 tr_peerMsgsSetInterested (tr_peerMsgs * msgs, bool b) 710 { 711 assert (tr_isBool (b)); 712 713 if (msgs->client_is_interested != b) 714 sendInterest (msgs, b); 699 715 } 700 716 701 717 static bool 702 popNextMetadataRequest (tr_peer msgs * msgs, int * piece)703 { 704 705 706 707 708 709 710 711 712 718 popNextMetadataRequest (tr_peerMsgs * msgs, int * piece) 719 { 720 if (msgs->peerAskedForMetadataCount == 0) 721 return false; 722 723 *piece = msgs->peerAskedForMetadata[0]; 724 725 tr_removeElementFromArray (msgs->peerAskedForMetadata, 0, sizeof (int), 726 msgs->peerAskedForMetadataCount--); 727 728 return true; 713 729 } 714 730 715 731 static bool 716 popNextRequest (tr_peermsgs * msgs, struct peer_request * setme) 717 { 718 if (msgs->peer->pendingReqsToClient == 0) 719 return false; 720 721 *setme = msgs->peerAskedFor[0]; 722 723 tr_removeElementFromArray (msgs->peerAskedFor, 0, sizeof (struct peer_request), 724 msgs->peer->pendingReqsToClient--); 725 726 return true; 727 } 728 729 static void 730 cancelAllRequestsToClient (tr_peermsgs * msgs) 731 { 732 struct peer_request req; 733 const int mustSendCancel = tr_peerIoSupportsFEXT (msgs->peer->io); 734 735 while (popNextRequest (msgs, &req)) 736 if (mustSendCancel) 737 protocolSendReject (msgs, &req); 732 popNextRequest (tr_peerMsgs * msgs, struct peer_request * setme) 733 { 734 if (msgs->peer.pendingReqsToClient == 0) 735 return false; 736 737 *setme = msgs->peerAskedFor[0]; 738 739 tr_removeElementFromArray (msgs->peerAskedFor, 740 0, 741 sizeof (struct peer_request), 742 msgs->peer.pendingReqsToClient--); 743 744 return true; 745 } 746 747 static void 748 cancelAllRequestsToClient (tr_peerMsgs * msgs) 749 { 750 struct peer_request req; 751 const int mustSendCancel = tr_peerIoSupportsFEXT (msgs->io); 752 753 while (popNextRequest (msgs, &req)) 754 if (mustSendCancel) 755 protocolSendReject (msgs, &req); 738 756 } 739 757 740 758 void 741 tr_peerMsgsSetChoke (tr_peermsgs * msgs, bool peerIsChoked) 742 { 743 const time_t now = tr_time (); 744 const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC; 745 746 assert (msgs); 747 assert (msgs->peer); 748 assert (tr_isBool (peerIsChoked)); 749 750 if (msgs->peer->chokeChangedAt > fibrillationTime) 751 { 752 dbgmsg (msgs, "Not changing choke to %d to avoid fibrillation", peerIsChoked); 753 } 754 else if (msgs->peer->peerIsChoked != peerIsChoked) 755 { 756 msgs->peer->peerIsChoked = peerIsChoked; 757 if (peerIsChoked) 758 cancelAllRequestsToClient (msgs); 759 protocolSendChoke (msgs, peerIsChoked); 760 msgs->peer->chokeChangedAt = now; 759 tr_peerMsgsSetChoke (tr_peerMsgs * msgs, bool peer_is_choked) 760 { 761 const time_t now = tr_time (); 762 const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC; 763 764 assert (msgs != NULL); 765 assert (tr_isBool (peer_is_choked)); 766 767 if (msgs->chokeChangedAt > fibrillationTime) 768 { 769 dbgmsg (msgs, "Not changing choke to %d to avoid fibrillation", peer_is_choked); 770 } 771 else if (msgs->peer_is_choked != peer_is_choked) 772 { 773 msgs->peer_is_choked = peer_is_choked; 774 if (peer_is_choked) 775 cancelAllRequestsToClient (msgs); 776 protocolSendChoke (msgs, peer_is_choked); 777 msgs->chokeChangedAt = now; 761 778 } 762 779 } … … 767 784 768 785 void 769 tr_peerMsgsHave (tr_peer msgs * msgs, uint32_t index)770 { 771 772 773 774 786 tr_peerMsgsHave (tr_peerMsgs * msgs, uint32_t index) 787 { 788 protocolSendHave (msgs, index); 789 790 /* since we have more pieces now, we might not be interested in this peer */ 791 updateInterest (msgs); 775 792 } 776 793 … … 780 797 781 798 static bool 782 reqIsValid (const tr_peer msgs * peer,799 reqIsValid (const tr_peerMsgs * peer, 783 800 uint32_t index, 784 801 uint32_t offset, … … 789 806 790 807 static bool 791 requestIsValid (const tr_peer msgs * msgs, const struct peer_request * req)808 requestIsValid (const tr_peerMsgs * msgs, const struct peer_request * req) 792 809 { 793 810 return reqIsValid (msgs, req->index, req->offset, req->length); … … 795 812 796 813 void 797 tr_peerMsgsCancel (tr_peer msgs * msgs, tr_block_index_t block)814 tr_peerMsgsCancel (tr_peerMsgs * msgs, tr_block_index_t block) 798 815 { 799 816 struct peer_request req; … … 808 825 809 826 static void 810 sendLtepHandshake (tr_peer msgs * msgs)827 sendLtepHandshake (tr_peerMsgs * msgs) 811 828 { 812 829 tr_variant val; … … 875 892 876 893 static void 877 parseLtepHandshake (tr_peer msgs * msgs, int len, struct evbuffer * inbuf)894 parseLtepHandshake (tr_peerMsgs * msgs, int len, struct evbuffer * inbuf) 878 895 { 879 896 int64_t i; … … 887 904 memset (&pex, 0, sizeof (tr_pex)); 888 905 889 tr_peerIoReadBytes (msgs-> peer->io, inbuf, tmp, len);906 tr_peerIoReadBytes (msgs->io, inbuf, tmp, len); 890 907 msgs->peerSentLtepHandshake = 1; 891 908 … … 926 943 it implies support for µTP, so use it to indicate that. */ 927 944 tr_peerMgrSetUtpFailed (msgs->torrent, 928 tr_peerIoGetAddress (msgs-> peer->io, NULL),945 tr_peerIoGetAddress (msgs->io, NULL), 929 946 false); 930 947 } … … 948 965 } 949 966 950 if (tr_peerIoIsIncoming (msgs-> peer->io)967 if (tr_peerIoIsIncoming (msgs->io) 951 968 && tr_variantDictFindRaw (&val, TR_KEY_ipv4, &addr, &addr_len) 952 969 && (addr_len == 4)) … … 957 974 } 958 975 959 if (tr_peerIoIsIncoming (msgs-> peer->io)976 if (tr_peerIoIsIncoming (msgs->io) 960 977 && tr_variantDictFindRaw (&val, TR_KEY_ipv6, &addr, &addr_len) 961 978 && (addr_len == 16)) … … 975 992 976 993 static void 977 parseUtMetadata (tr_peer msgs * msgs, int msglen, struct evbuffer * inbuf)994 parseUtMetadata (tr_peerMsgs * msgs, int msglen, struct evbuffer * inbuf) 978 995 { 979 996 tr_variant dict; … … 985 1002 uint8_t * tmp = tr_new (uint8_t, msglen); 986 1003 987 tr_peerIoReadBytes (msgs-> peer->io, inbuf, tmp, msglen);1004 tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen); 988 1005 msg_end = (char*)tmp + msglen; 989 1006 … … 1052 1069 1053 1070 static void 1054 parseUtPex (tr_peer msgs * msgs, int msglen, struct evbuffer * inbuf)1071 parseUtPex (tr_peerMsgs * msgs, int msglen, struct evbuffer * inbuf) 1055 1072 { 1056 1073 int loaded = 0; … … 1061 1078 size_t added_len; 1062 1079 1063 tr_peerIoReadBytes (msgs-> peer->io, inbuf, tmp, msglen);1080 tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen); 1064 1081 1065 1082 if (tr_torrentAllowsPex (tor) … … 1114 1131 } 1115 1132 1116 static void sendPex (tr_peer msgs * msgs);1117 1118 static void 1119 parseLtep (tr_peer msgs * msgs, int msglen, struct evbuffer * inbuf)1133 static void sendPex (tr_peerMsgs * msgs); 1134 1135 static void 1136 parseLtep (tr_peerMsgs * msgs, int msglen, struct evbuffer * inbuf) 1120 1137 { 1121 1138 uint8_t ltep_msgid; 1122 1139 1123 tr_peerIoReadUint8 (msgs-> peer->io, inbuf, <ep_msgid);1140 tr_peerIoReadUint8 (msgs->io, inbuf, <ep_msgid); 1124 1141 msglen--; 1125 1142 … … 1128 1145 dbgmsg (msgs, "got ltep handshake"); 1129 1146 parseLtepHandshake (msgs, msglen, inbuf); 1130 if (tr_peerIoSupportsLTEP (msgs-> peer->io))1147 if (tr_peerIoSupportsLTEP (msgs->io)) 1131 1148 { 1132 1149 sendLtepHandshake (msgs); … … 1154 1171 1155 1172 static int 1156 readBtLength (tr_peer msgs * msgs, struct evbuffer * inbuf, size_t inlen)1173 readBtLength (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen) 1157 1174 { 1158 1175 uint32_t len; … … 1161 1178 return READ_LATER; 1162 1179 1163 tr_peerIoReadUint32 (msgs-> peer->io, inbuf, &len);1180 tr_peerIoReadUint32 (msgs->io, inbuf, &len); 1164 1181 1165 1182 if (len == 0) /* peer sent us a keepalive message */ … … 1174 1191 } 1175 1192 1176 static int readBtMessage (tr_peer msgs *, struct evbuffer *, size_t);1193 static int readBtMessage (tr_peerMsgs *, struct evbuffer *, size_t); 1177 1194 1178 1195 static int 1179 readBtId (tr_peer msgs * msgs, struct evbuffer * inbuf, size_t inlen)1196 readBtId (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen) 1180 1197 { 1181 1198 uint8_t id; … … 1184 1201 return READ_LATER; 1185 1202 1186 tr_peerIoReadUint8 (msgs-> peer->io, inbuf, &id);1203 tr_peerIoReadUint8 (msgs->io, inbuf, &id); 1187 1204 msgs->incoming.id = id; 1188 1205 dbgmsg (msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length); … … 1202 1219 1203 1220 static void 1204 updatePeerProgress (tr_peer msgs * msgs)1205 { 1206 tr_peerUpdateProgress (msgs->torrent,msgs->peer);1207 1208 1209 1210 } 1211 1212 static void 1213 prefetchPieces (tr_peer msgs *msgs)1214 { 1215 1216 1217 1218 1219 1220 for (i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<PREFETCH_SIZE; ++i)1221 { 1222 1223 1221 updatePeerProgress (tr_peerMsgs * msgs) 1222 { 1223 tr_peerUpdateProgress (msgs->torrent, &msgs->peer); 1224 1225 /*updateFastSet (msgs);*/ 1226 updateInterest (msgs); 1227 } 1228 1229 static void 1230 prefetchPieces (tr_peerMsgs *msgs) 1231 { 1232 int i; 1233 1234 if (!getSession (msgs)->isPrefetchEnabled) 1235 return; 1236 1237 for (i=msgs->prefetchCount; i<msgs->peer.pendingReqsToClient && i<PREFETCH_SIZE; ++i) 1238 { 1239 const struct peer_request * req = msgs->peerAskedFor + i; 1240 if (requestIsValid (msgs, req)) 1224 1241 { 1225 1226 1242 tr_cachePrefetchBlock (getSession (msgs)->cache, msgs->torrent, req->index, req->offset, req->length); 1243 ++msgs->prefetchCount; 1227 1244 } 1228 1245 } … … 1230 1247 1231 1248 static void 1232 peerMadeRequest (tr_peer msgs * msgs, const struct peer_request * req)1233 { 1234 const bool fext = tr_peerIoSupportsFEXT (msgs-> peer->io);1249 peerMadeRequest (tr_peerMsgs * msgs, const struct peer_request * req) 1250 { 1251 const bool fext = tr_peerIoSupportsFEXT (msgs->io); 1235 1252 const int reqIsValid = requestIsValid (msgs, req); 1236 1253 const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete (&msgs->torrent->completion, req->index); 1237 const int peerIsChoked = msgs->peer ->peerIsChoked;1254 const int peerIsChoked = msgs->peer_is_choked; 1238 1255 1239 1256 int allow = false; … … 1245 1262 else if (peerIsChoked) 1246 1263 dbgmsg (msgs, "rejecting request from choked peer"); 1247 else if (msgs->peer ->pendingReqsToClient + 1 >= REQQ)1264 else if (msgs->peer.pendingReqsToClient + 1 >= REQQ) 1248 1265 dbgmsg (msgs, "rejecting request ... reqq is full"); 1249 1266 else … … 1251 1268 1252 1269 if (allow) { 1253 msgs->peerAskedFor[msgs->peer ->pendingReqsToClient++] = *req;1270 msgs->peerAskedFor[msgs->peer.pendingReqsToClient++] = *req; 1254 1271 prefetchPieces (msgs); 1255 1272 } else if (fext) { … … 1259 1276 1260 1277 static bool 1261 messageLengthIsCorrect (const tr_peer msgs * msg, uint8_t id, uint32_t len)1278 messageLengthIsCorrect (const tr_peerMsgs * msg, uint8_t id, uint32_t len) 1262 1279 { 1263 1280 switch (id) … … 1304 1321 } 1305 1322 1306 static int clientGotBlock (tr_peer msgs * msgs,1323 static int clientGotBlock (tr_peerMsgs * msgs, 1307 1324 struct evbuffer * block, 1308 1325 const struct peer_request * req); 1309 1326 1310 1327 static int 1311 readBtPiece (tr_peer msgs * msgs,1328 readBtPiece (tr_peerMsgs * msgs, 1312 1329 struct evbuffer * inbuf, 1313 1330 size_t inlen, … … 1324 1341 return READ_LATER; 1325 1342 1326 tr_peerIoReadUint32 (msgs-> peer->io, inbuf, &req->index);1327 tr_peerIoReadUint32 (msgs-> peer->io, inbuf, &req->offset);1343 tr_peerIoReadUint32 (msgs->io, inbuf, &req->index); 1344 tr_peerIoReadUint32 (msgs->io, inbuf, &req->offset); 1328 1345 req->length = msgs->incoming.length - 9; 1329 1346 dbgmsg (msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length); … … 1345 1362 n = MIN (nLeft, inlen); 1346 1363 1347 tr_peerIoReadBytesToBuf (msgs-> peer->io, inbuf, block_buffer, n);1364 tr_peerIoReadBytesToBuf (msgs->io, inbuf, block_buffer, n); 1348 1365 1349 1366 fireClientGotPieceData (msgs, n); … … 1366 1383 } 1367 1384 1368 static void updateDesiredRequestCount (tr_peer msgs * msgs);1385 static void updateDesiredRequestCount (tr_peerMsgs * msgs); 1369 1386 1370 1387 static int 1371 readBtMessage (tr_peer msgs * msgs, struct evbuffer * inbuf, size_t inlen)1388 readBtMessage (tr_peerMsgs * msgs, struct evbuffer * inbuf, size_t inlen) 1372 1389 { 1373 1390 uint32_t ui32; … … 1377 1394 const size_t startBufLen = evbuffer_get_length (inbuf); 1378 1395 #endif 1379 const bool fext = tr_peerIoSupportsFEXT (msgs-> peer->io);1396 const bool fext = tr_peerIoSupportsFEXT (msgs->io); 1380 1397 1381 1398 --msglen; /* id length */ … … 1397 1414 case BT_CHOKE: 1398 1415 dbgmsg (msgs, "got Choke"); 1399 msgs-> peer->clientIsChoked = 1;1416 msgs->client_is_choked = true; 1400 1417 if (!fext) 1401 1418 fireGotChoke (msgs); … … 1404 1421 case BT_UNCHOKE: 1405 1422 dbgmsg (msgs, "got Unchoke"); 1406 msgs-> peer->clientIsChoked = 0;1423 msgs->client_is_choked = false; 1407 1424 updateDesiredRequestCount (msgs); 1408 1425 break; … … 1410 1427 case BT_INTERESTED: 1411 1428 dbgmsg (msgs, "got Interested"); 1412 msgs->peer ->peerIsInterested = 1;1429 msgs->peer_is_interested = true; 1413 1430 break; 1414 1431 1415 1432 case BT_NOT_INTERESTED: 1416 1433 dbgmsg (msgs, "got Not Interested"); 1417 msgs->peer ->peerIsInterested = 0;1434 msgs->peer_is_interested = false; 1418 1435 break; 1419 1436 1420 1437 case BT_HAVE: 1421 tr_peerIoReadUint32 (msgs-> peer->io, inbuf, &ui32);1438 tr_peerIoReadUint32 (msgs->io, inbuf, &ui32); 1422 1439 dbgmsg (msgs, "got Have: %u", ui32); 1423 1440 if (tr_torrentHasMetadata (msgs->torrent) … … 1429 1446 1430 1447 /* a peer can send the same HAVE message twice... */ 1431 if (!tr_bitfieldHas (&msgs->peer ->have, ui32)) {1432 tr_bitfieldAdd (&msgs->peer ->have, ui32);1448 if (!tr_bitfieldHas (&msgs->peer.have, ui32)) { 1449 tr_bitfieldAdd (&msgs->peer.have, ui32); 1433 1450 fireClientGotHave (msgs, ui32); 1434 1451 } … … 1439 1456 uint8_t * tmp = tr_new (uint8_t, msglen); 1440 1457 dbgmsg (msgs, "got a bitfield"); 1441 tr_peerIoReadBytes (msgs-> peer->io, inbuf, tmp, msglen);1442 tr_bitfieldSetRaw (&msgs->peer ->have, tmp, msglen, tr_torrentHasMetadata (msgs->torrent));1443 fireClientGotBitfield (msgs, &msgs->peer ->have);1458 tr_peerIoReadBytes (msgs->io, inbuf, tmp, msglen); 1459 tr_bitfieldSetRaw (&msgs->peer.have, tmp, msglen, tr_torrentHasMetadata (msgs->torrent)); 1460 fireClientGotBitfield (msgs, &msgs->peer.have); 1444 1461 updatePeerProgress (msgs); 1445 1462 tr_free (tmp); … … 1450 1467 { 1451 1468 struct peer_request r; 1452 tr_peerIoReadUint32 (msgs-> peer->io, inbuf, &r.index);1453 tr_peerIoReadUint32 (msgs-> peer->io, inbuf, &r.offset);1454 tr_peerIoReadUint32 (msgs-> peer->io, inbuf, &r.length);1469 tr_peerIoReadUint32 (msgs->io, inbuf, &r.index); 1470 tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset); 1471 tr_peerIoReadUint32 (msgs->io, inbuf, &r.length); 1455 1472 dbgmsg (msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length); 1456 1473 peerMadeRequest (msgs, &r); … … 1462 1479 int i; 1463 1480 struct peer_request r; 1464 tr_peerIoReadUint32 (msgs-> peer->io, inbuf, &r.index);1465 tr_peerIoReadUint32 (msgs-> peer->io, inbuf, &r.offset);1466 tr_peerIoReadUint32 (msgs-> peer->io, inbuf, &r.length);1467 tr_historyAdd (&msgs->peer ->cancelsSentToClient, tr_time (), 1);1481 tr_peerIoReadUint32 (msgs->io, inbuf, &r.index); 1482 tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset); 1483 tr_peerIoReadUint32 (msgs->io, inbuf, &r.length); 1484 tr_historyAdd (&msgs->peer.cancelsSentToClient, tr_time (), 1); 1468 1485 dbgmsg (msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length); 1469 1486 1470 for (i=0; i<msgs->peer ->pendingReqsToClient; ++i) {1487 for (i=0; i<msgs->peer.pendingReqsToClient; ++i) { 1471 1488 const struct peer_request * req = msgs->peerAskedFor + i; 1472 1489 if ((req->index == r.index) && (req->offset == r.offset) && (req->length == r.length)) … … 1474 1491 } 1475 1492 1476 if (i < msgs->peer ->pendingReqsToClient)1493 if (i < msgs->peer.pendingReqsToClient) 1477 1494 tr_removeElementFromArray (msgs->peerAskedFor, i, sizeof (struct peer_request), 1478 msgs->peer ->pendingReqsToClient--);1495 msgs->peer.pendingReqsToClient--); 1479 1496 break; 1480 1497 } … … 1486 1503 case BT_PORT: 1487 1504 dbgmsg (msgs, "Got a BT_PORT"); 1488 tr_peerIoReadUint16 (msgs-> peer->io, inbuf, &msgs->dht_port);1505 tr_peerIoReadUint16 (msgs->io, inbuf, &msgs->dht_port); 1489 1506 if (msgs->dht_port > 0) 1490 1507 tr_dhtAddNode (getSession (msgs), 1491 tr_peerAddress ( msgs->peer),1508 tr_peerAddress (&msgs->peer), 1492 1509 msgs->dht_port, 0); 1493 1510 break; … … 1495 1512 case BT_FEXT_SUGGEST: 1496 1513 dbgmsg (msgs, "Got a BT_FEXT_SUGGEST"); 1497 tr_peerIoReadUint32 (msgs-> peer->io, inbuf, &ui32);1514 tr_peerIoReadUint32 (msgs->io, inbuf, &ui32); 1498 1515 if (fext) 1499 1516 fireClientGotSuggest (msgs, ui32); … … 1506 1523 case BT_FEXT_ALLOWED_FAST: 1507 1524 dbgmsg (msgs, "Got a BT_FEXT_ALLOWED_FAST"); 1508 tr_peerIoReadUint32 (msgs-> peer->io, inbuf, &ui32);1525 tr_peerIoReadUint32 (msgs->io, inbuf, &ui32); 1509 1526 if (fext) 1510 1527 fireClientGotAllowedFast (msgs, ui32); … … 1518 1535 dbgmsg (msgs, "Got a BT_FEXT_HAVE_ALL"); 1519 1536 if (fext) { 1520 tr_bitfieldSetHasAll (&msgs->peer ->have);1521 assert (tr_bitfieldHasAll (&msgs->peer ->have));1537 tr_bitfieldSetHasAll (&msgs->peer.have); 1538 assert (tr_bitfieldHasAll (&msgs->peer.have)); 1522 1539 fireClientGotHaveAll (msgs); 1523 1540 updatePeerProgress (msgs); … … 1531 1548 dbgmsg (msgs, "Got a BT_FEXT_HAVE_NONE"); 1532 1549 if (fext) { 1533 tr_bitfieldSetHasNone (&msgs->peer ->have);1550 tr_bitfieldSetHasNone (&msgs->peer.have); 1534 1551 fireClientGotHaveNone (msgs); 1535 1552 updatePeerProgress (msgs); … … 1544 1561 struct peer_request r; 1545 1562 dbgmsg (msgs, "Got a BT_FEXT_REJECT"); 1546 tr_peerIoReadUint32 (msgs-> peer->io, inbuf, &r.index);1547 tr_peerIoReadUint32 (msgs-> peer->io, inbuf, &r.offset);1548 tr_peerIoReadUint32 (msgs-> peer->io, inbuf, &r.length);1563 tr_peerIoReadUint32 (msgs->io, inbuf, &r.index); 1564 tr_peerIoReadUint32 (msgs->io, inbuf, &r.offset); 1565 tr_peerIoReadUint32 (msgs->io, inbuf, &r.length); 1549 1566 if (fext) 1550 1567 fireGotRej (msgs, &r); … … 1563 1580 default: 1564 1581 dbgmsg (msgs, "peer sent us an UNKNOWN: %d", (int)id); 1565 tr_peerIoDrain (msgs-> peer->io, inbuf, msglen);1582 tr_peerIoDrain (msgs->io, inbuf, msglen); 1566 1583 break; 1567 1584 } … … 1576 1593 /* returns 0 on success, or an errno on failure */ 1577 1594 static int 1578 clientGotBlock (tr_peer msgs * msgs,1595 clientGotBlock (tr_peerMsgs * msgs, 1579 1596 struct evbuffer * data, 1580 1597 const struct peer_request * req) … … 1595 1612 dbgmsg (msgs, "got block %u:%u->%u", req->index, req->offset, req->length); 1596 1613 1597 if (!tr_peerMgrDidPeerRequest (msgs->torrent, msgs->peer, block)) {1614 if (!tr_peerMgrDidPeerRequest (msgs->torrent, &msgs->peer, block)) { 1598 1615 dbgmsg (msgs, "we didn't ask for this message..."); 1599 1616 return 0; … … 1611 1628 return err; 1612 1629 1613 tr_bitfieldAdd (&msgs->peer ->blame, req->index);1630 tr_bitfieldAdd (&msgs->peer.blame, req->index); 1614 1631 fireGotBlock (msgs, req); 1615 1632 return 0; … … 1621 1638 didWrite (tr_peerIo * io UNUSED, size_t bytesWritten, bool wasPieceData, void * vmsgs) 1622 1639 { 1623 tr_peer msgs * msgs = vmsgs;1640 tr_peerMsgs * msgs = vmsgs; 1624 1641 1625 1642 if (wasPieceData) … … 1634 1651 { 1635 1652 ReadState ret; 1636 tr_peer msgs * msgs = vmsgs;1653 tr_peerMsgs * msgs = vmsgs; 1637 1654 struct evbuffer * in = tr_peerIoGetReadBuffer (io); 1638 1655 const size_t inlen = evbuffer_get_length (in); … … 1670 1687 1671 1688 int 1672 tr_peerMsgsIsReadingBlock (const tr_peer msgs * msgs, tr_block_index_t block)1689 tr_peerMsgsIsReadingBlock (const tr_peerMsgs * msgs, tr_block_index_t block) 1673 1690 { 1674 1691 if (msgs->state != AWAITING_BT_PIECE) … … 1685 1702 1686 1703 static void 1687 updateDesiredRequestCount (tr_peer msgs * msgs)1704 updateDesiredRequestCount (tr_peerMsgs * msgs) 1688 1705 { 1689 1706 tr_torrent * const torrent = msgs->torrent; … … 1691 1708 /* there are lots of reasons we might not want to request any blocks... */ 1692 1709 if (tr_torrentIsSeed (torrent) || !tr_torrentHasMetadata (torrent) 1693 || msgs-> peer->clientIsChoked1694 || !msgs-> peer->clientIsInterested)1710 || msgs->client_is_choked 1711 || !msgs->client_is_interested) 1695 1712 { 1696 1713 msgs->desiredRequestCount = 0; … … 1707 1724 /* Get the rate limit we should use. 1708 1725 * FIXME: this needs to consider all the other peers as well... */ 1709 rate_Bps = tr_peerGetPieceSpeed_Bps ( msgs->peer, now, TR_PEER_TO_CLIENT);1726 rate_Bps = tr_peerGetPieceSpeed_Bps (&msgs->peer, now, TR_PEER_TO_CLIENT); 1710 1727 if (tr_torrentUsesSpeedLimit (torrent, TR_PEER_TO_CLIENT)) 1711 1728 rate_Bps = MIN (rate_Bps, tr_torrentGetSpeedLimit_Bps (torrent, TR_PEER_TO_CLIENT)); … … 1729 1746 1730 1747 static void 1731 updateMetadataRequests (tr_peer msgs * msgs, time_t now)1748 updateMetadataRequests (tr_peerMsgs * msgs, time_t now) 1732 1749 { 1733 1750 int piece; … … 1763 1780 1764 1781 static void 1765 updateBlockRequests (tr_peer msgs * msgs)1782 updateBlockRequests (tr_peerMsgs * msgs) 1766 1783 { 1767 1784 if (tr_torrentIsPieceTransferAllowed (msgs->torrent, TR_PEER_TO_CLIENT) 1768 1785 && (msgs->desiredRequestCount > 0) 1769 && (msgs->peer ->pendingReqsToPeer <= (msgs->desiredRequestCount * 0.66)))1786 && (msgs->peer.pendingReqsToPeer <= (msgs->desiredRequestCount * 0.66))) 1770 1787 { 1771 1788 int i; 1772 1789 int n; 1773 const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer; 1774 tr_block_index_t * blocks = tr_new (tr_block_index_t, numwant); 1775 1776 tr_peerMgrGetNextRequests (msgs->torrent, msgs->peer, numwant, blocks, &n, false); 1790 tr_block_index_t * blocks; 1791 const int numwant = msgs->desiredRequestCount - msgs->peer.pendingReqsToPeer; 1792 1793 assert (tr_peerMsgsIsClientInterested (msgs)); 1794 assert (!tr_peerMsgsIsClientChoked (msgs)); 1795 1796 blocks = tr_new (tr_block_index_t, numwant); 1797 tr_peerMgrGetNextRequests (msgs->torrent, &msgs->peer, numwant, blocks, &n, false); 1777 1798 1778 1799 for (i=0; i<n; ++i) … … 1788 1809 1789 1810 static size_t 1790 fillOutputBuffer (tr_peer msgs * msgs, time_t now)1811 fillOutputBuffer (tr_peerMsgs * msgs, time_t now) 1791 1812 { 1792 1813 int piece; … … 1794 1815 struct peer_request req; 1795 1816 const bool haveMessages = evbuffer_get_length (msgs->outMessages) != 0; 1796 const bool fext = tr_peerIoSupportsFEXT (msgs-> peer->io);1817 const bool fext = tr_peerIoSupportsFEXT (msgs->io); 1797 1818 1798 1819 /** … … 1809 1830 const size_t len = evbuffer_get_length (msgs->outMessages); 1810 1831 /* flush the protocol messages */ 1811 dbgmsg (msgs, "flushing outMessages... to %p (length is %zu)", msgs-> peer->io, len);1812 tr_peerIoWriteBuf (msgs-> peer->io, msgs->outMessages, false);1832 dbgmsg (msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len); 1833 tr_peerIoWriteBuf (msgs->io, msgs->outMessages, false); 1813 1834 msgs->clientSentAnythingAt = now; 1814 1835 msgs->outMessagesBatchedAt = 0; … … 1821 1842 **/ 1822 1843 1823 if ((tr_peerIoGetWriteBufferSpace (msgs-> peer->io, now) >= METADATA_PIECE_SIZE)1844 if ((tr_peerIoGetWriteBufferSpace (msgs->io, now) >= METADATA_PIECE_SIZE) 1824 1845 && popNextMetadataRequest (msgs, &piece)) 1825 1846 { … … 1887 1908 **/ 1888 1909 1889 if ((tr_peerIoGetWriteBufferSpace (msgs-> peer->io, now) >= msgs->torrent->blockSize)1910 if ((tr_peerIoGetWriteBufferSpace (msgs->io, now) >= msgs->torrent->blockSize) 1890 1911 && popNextRequest (msgs, &req)) 1891 1912 { … … 1928 1949 dbgmsg (msgs, "sending block %u:%u->%u", req.index, req.offset, req.length); 1929 1950 assert (n == msglen); 1930 tr_peerIoWriteBuf (msgs-> peer->io, out, true);1951 tr_peerIoWriteBuf (msgs->io, out, true); 1931 1952 bytesWritten += n; 1932 1953 msgs->clientSentAnythingAt = now; 1933 tr_historyAdd (&msgs->peer ->blocksSentToPeer, tr_time (), 1);1954 tr_historyAdd (&msgs->peer.blocksSentToPeer, tr_time (), 1); 1934 1955 } 1935 1956 … … 1970 1991 peerPulse (void * vmsgs) 1971 1992 { 1972 tr_peer msgs * msgs = vmsgs;1993 tr_peerMsgs * msgs = vmsgs; 1973 1994 const time_t now = tr_time (); 1974 1995 1975 if (tr_isPeerIo (msgs-> peer->io)) {1996 if (tr_isPeerIo (msgs->io)) { 1976 1997 updateDesiredRequestCount (msgs); 1977 1998 updateBlockRequests (msgs); … … 1987 2008 1988 2009 void 1989 tr_peerMsgsPulse (tr_peer msgs * msgs)2010 tr_peerMsgsPulse (tr_peerMsgs * msgs) 1990 2011 { 1991 2012 if (msgs != NULL) … … 2005 2026 2006 2027 static void 2007 sendBitfield (tr_peer msgs * msgs)2028 sendBitfield (tr_peerMsgs * msgs) 2008 2029 { 2009 2030 void * bytes; … … 2024 2045 2025 2046 static void 2026 tellPeerWhatWeHave (tr_peer msgs * msgs)2027 { 2028 const bool fext = tr_peerIoSupportsFEXT (msgs-> peer->io);2047 tellPeerWhatWeHave (tr_peerMsgs * msgs) 2048 { 2049 const bool fext = tr_peerIoSupportsFEXT (msgs->io); 2029 2050 2030 2051 if (fext && tr_cpHasAll (&msgs->torrent->completion)) … … 2165 2186 2166 2187 static void 2167 sendPex (tr_peer msgs * msgs)2188 sendPex (tr_peerMsgs * msgs) 2168 2189 { 2169 2190 if (msgs->peerSupportsPex && tr_torrentAllowsPex (msgs->torrent)) … … 2334 2355 pexPulse (int foo UNUSED, short bar UNUSED, void * vmsgs) 2335 2356 { 2336 struct tr_peer msgs * msgs = vmsgs;2357 struct tr_peerMsgs * msgs = vmsgs; 2337 2358 2338 2359 sendPex (msgs); … … 2342 2363 } 2343 2364 2344 /** 2345 *** 2346 **/ 2347 2348 tr_peermsgs* 2365 /*** 2366 **** tr_peer virtual functions 2367 ***/ 2368 2369 static bool 2370 peermsgs_is_transferring_pieces (const struct tr_peer * peer, 2371 uint64_t now, 2372 tr_direction direction, 2373 unsigned int * setme_Bps) 2374 { 2375 unsigned int Bps = 0; 2376 2377 if (tr_isPeerMsgs (peer)) 2378 { 2379 const tr_peerMsgs * msgs = (const tr_peerMsgs *) peer; 2380 Bps = tr_peerIoGetPieceSpeed_Bps (msgs->io, now, direction); 2381 } 2382 2383 if (setme_Bps != NULL) 2384 *setme_Bps = Bps; 2385 2386 return Bps > 0; 2387 } 2388 2389 static void 2390 peermsgs_destruct (tr_peer * peer) 2391 { 2392 tr_peerMsgs * msgs = PEER_MSGS (peer); 2393 2394 assert (msgs != NULL); 2395 2396 if (msgs->pexTimer != NULL) 2397 event_free (msgs->pexTimer); 2398 2399 if (msgs->incoming.block != NULL) 2400 evbuffer_free (msgs->incoming.block); 2401 2402 if (msgs->io) 2403 { 2404 tr_peerIoClear (msgs->io); 2405 tr_peerIoUnref (msgs->io); /* balanced by the ref in handshakeDoneCB () */ 2406 } 2407 2408 evbuffer_free (msgs->outMessages); 2409 tr_free (msgs->pex6); 2410 tr_free (msgs->pex); 2411 2412 tr_peerDestruct (&msgs->peer); 2413 2414 memset (msgs, ~0, sizeof (tr_peerMsgs)); 2415 } 2416 2417 static const struct tr_peer_virtual_funcs my_funcs = 2418 { 2419 .destruct = peermsgs_destruct, 2420 .is_transferring_pieces = peermsgs_is_transferring_pieces 2421 }; 2422 2423 /*** 2424 **** 2425 ***/ 2426 2427 time_t 2428 tr_peerMsgsGetConnectionAge (const tr_peerMsgs * msgs) 2429 { 2430 assert (tr_isPeerMsgs (msgs)); 2431 2432 return tr_peerIoGetAge (msgs->io); 2433 } 2434 2435 bool 2436 tr_peerMsgsIsPeerChoked (const tr_peerMsgs * msgs) 2437 { 2438 assert (tr_isPeerMsgs (msgs)); 2439 2440 return msgs->peer_is_choked; 2441 } 2442 2443 bool 2444 tr_peerMsgsIsPeerInterested (const tr_peerMsgs * msgs) 2445 { 2446 assert (tr_isPeerMsgs (msgs)); 2447 2448 return msgs->peer_is_interested; 2449 } 2450 2451 bool 2452 tr_peerMsgsIsClientChoked (const tr_peerMsgs * msgs) 2453 { 2454 assert (tr_isPeerMsgs (msgs)); 2455 2456 return msgs->client_is_choked; 2457 } 2458 2459 bool 2460 tr_peerMsgsIsClientInterested (const tr_peerMsgs * msgs) 2461 { 2462 assert (tr_isPeerMsgs (msgs)); 2463 2464 return msgs->client_is_interested; 2465 } 2466 2467 bool 2468 tr_peerMsgsIsUtpConnection (const tr_peerMsgs * msgs) 2469 { 2470 assert (tr_isPeerMsgs (msgs)); 2471 2472 return msgs->io->utp_socket != NULL; 2473 } 2474 2475 bool 2476 tr_peerMsgsIsEncrypted (const tr_peerMsgs * msgs) 2477 { 2478 assert (tr_isPeerMsgs (msgs)); 2479 2480 return tr_peerIoIsEncrypted (msgs->io); 2481 } 2482 2483 bool 2484 tr_peerMsgsIsIncomingConnection (const tr_peerMsgs * msgs) 2485 { 2486 assert (tr_isPeerMsgs (msgs)); 2487 2488 return tr_peerIoIsIncoming (msgs->io); 2489 } 2490 2491 /*** 2492 **** 2493 ***/ 2494 2495 bool 2496 tr_isPeerMsgs (const void * msgs) 2497 { 2498 /* FIXME: this is pretty crude */ 2499 return (msgs != NULL) 2500 && (((struct tr_peerMsgs*)msgs)->magic_number == MAGIC_NUMBER); 2501 } 2502 2503 tr_peerMsgs * 2504 tr_peerMsgsCast (void * vm) 2505 { 2506 return tr_isPeerMsgs(vm) ? vm : NULL; 2507 } 2508 2509 tr_peerMsgs * 2349 2510 tr_peerMsgsNew (struct tr_torrent * torrent, 2350 struct tr_peer * peer,2511 struct tr_peerIo * io, 2351 2512 tr_peer_callback * callback, 2352 2513 void * callbackData) 2353 2514 { 2354 tr_peermsgs * m; 2355 2356 assert (peer); 2357 assert (peer->io); 2358 2359 m = tr_new0 (tr_peermsgs, 1); 2360 m->callback = callback; 2361 m->callbackData = callbackData; 2362 m->peer = peer; 2363 m->torrent = torrent; 2364 m->peer->clientIsChoked = 1; 2365 m->peer->peerIsChoked = 1; 2366 m->peer->clientIsInterested = 0; 2367 m->peer->peerIsInterested = 0; 2368 m->state = AWAITING_BT_LENGTH; 2369 m->outMessages = evbuffer_new (); 2370 m->outMessagesBatchedAt = 0; 2371 m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS; 2372 peer->msgs = m; 2373 2374 if (tr_torrentAllowsPex (torrent)) { 2375 m->pexTimer = evtimer_new (torrent->session->event_base, pexPulse, m); 2376 tr_timerAdd (m->pexTimer, PEX_INTERVAL_SECS, 0); 2377 } 2378 2379 if (tr_peerIoSupportsUTP (peer->io)) { 2380 const tr_address * addr = tr_peerIoGetAddress (peer->io, NULL); 2381 tr_peerMgrSetUtpSupported (torrent, addr); 2382 tr_peerMgrSetUtpFailed (torrent, addr, false); 2383 } 2384 2385 if (tr_peerIoSupportsLTEP (peer->io)) 2386 sendLtepHandshake (m); 2387 2388 tellPeerWhatWeHave (m); 2389 2390 if (tr_dhtEnabled (torrent->session) && tr_peerIoSupportsDHT (peer->io)) 2391 { 2392 /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */ 2393 const struct tr_address *addr = tr_peerIoGetAddress (peer->io, NULL); 2394 if (addr->type == TR_AF_INET || tr_globalIPv6 ()) { 2395 protocolSendPort (m, tr_dhtPort (torrent->session)); 2396 } 2397 } 2398 2399 tr_peerIoSetIOFuncs (m->peer->io, canRead, didWrite, gotError, m); 2400 updateDesiredRequestCount (m); 2401 2402 return m; 2403 } 2404 2405 void 2406 tr_peerMsgsFree (tr_peermsgs* msgs) 2407 { 2408 if (msgs) 2409 { 2410 if (msgs->pexTimer != NULL) 2411 event_free (msgs->pexTimer); 2412 2413 if (msgs->incoming.block != NULL) 2414 evbuffer_free (msgs->incoming.block); 2415 2416 evbuffer_free (msgs->outMessages); 2417 tr_free (msgs->pex6); 2418 tr_free (msgs->pex); 2419 2420 memset (msgs, ~0, sizeof (tr_peermsgs)); 2421 tr_free (msgs); 2422 } 2423 } 2515 tr_peerMsgs * m; 2516 2517 assert (io != NULL); 2518 2519 m = tr_new0 (tr_peerMsgs, 1); 2520 2521 tr_peerConstruct (&m->peer, torrent); 2522 m->peer.funcs = &my_funcs; 2523 2524 m->magic_number = MAGIC_NUMBER; 2525 m->client_is_choked = true; 2526 m->peer_is_choked = true; 2527 m->client_is_interested = false; 2528 m->peer_is_interested = false; 2529 m->callback = callback; 2530 m->callbackData = callbackData; 2531 m->io = io; 2532 m->torrent = torrent; 2533 m->state = AWAITING_BT_LENGTH; 2534 m->outMessages = evbuffer_new (); 2535 m->outMessagesBatchedAt = 0; 2536 m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS; 2537 2538 if (tr_torrentAllowsPex (torrent)) 2539 { 2540 m->pexTimer = evtimer_new (torrent->session->event_base, pexPulse, m); 2541 tr_timerAdd (m->pexTimer, PEX_INTERVAL_SECS, 0); 2542 } 2543 2544 if (tr_peerIoSupportsUTP (m->io)) 2545 { 2546 const tr_address * addr = tr_peerIoGetAddress (m->io, NULL); 2547 tr_peerMgrSetUtpSupported (torrent, addr); 2548 tr_peerMgrSetUtpFailed (torrent, addr, false); 2549 } 2550 2551 if (tr_peerIoSupportsLTEP (m->io)) 2552 sendLtepHandshake (m); 2553 2554 tellPeerWhatWeHave (m); 2555 2556 if (tr_dhtEnabled (torrent->session) && tr_peerIoSupportsDHT (m->io)) 2557 { 2558 /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */ 2559 const struct tr_address *addr = tr_peerIoGetAddress (m->io, NULL); 2560 if (addr->type == TR_AF_INET || tr_globalIPv6 ()) 2561 protocolSendPort (m, tr_dhtPort (torrent->session)); 2562 } 2563 2564 tr_peerIoSetIOFuncs (m->io, canRead, didWrite, gotError, m); 2565 updateDesiredRequestCount (m); 2566 2567 return m; 2568 } -
trunk/libtransmission/peer-msgs.h
r13936 r13954 12 12 13 13 #ifndef __TRANSMISSION__ 14 #error only libtransmission should #include this header.14 #error only libtransmission should #include this header. 15 15 #endif 16 16 … … 24 24 struct tr_bitfield; 25 25 struct tr_peer; 26 struct tr_peerIo; 26 27 struct tr_torrent; 27 28 … … 31 32 */ 32 33 33 typedef struct tr_peer msgs tr_peermsgs;34 typedef struct tr_peerMsgs tr_peerMsgs; 34 35 35 tr_peermsgs* tr_peerMsgsNew (struct tr_torrent * torrent, 36 struct tr_peer * peer, 37 tr_peer_callback * callback, 38 void * callback_data); 36 #define PEER_MSGS(o) (tr_peerMsgsCast(o)) 39 37 40 void tr_peerMsgsSetChoke (tr_peermsgs * msgs, 41 bool peerIsChoked); 38 bool tr_isPeerMsgs (const void * msgs); 42 39 43 int tr_peerMsgsIsReadingBlock (const tr_peermsgs * msgs, 44 tr_block_index_t block); 40 tr_peerMsgs* tr_peerMsgsCast (void * msgs); 45 41 46 void tr_peerMsgsSetInterested (tr_peermsgs * msgs, 47 bool clientIsInterested); 42 tr_peerMsgs* tr_peerMsgsNew (struct tr_torrent * torrent, 43 struct tr_peerIo * io, 44 tr_peer_callback * callback, 45 void * callback_data); 48 46 49 void tr_peerMsgsHave (tr_peermsgs * msgs, 50 uint32_t pieceIndex); 47 bool tr_peerMsgsIsPeerChoked (const tr_peerMsgs * msgs); 51 48 52 void tr_peerMsgsPulse (tr_peermsgs* msgs);49 bool tr_peerMsgsIsPeerInterested (const tr_peerMsgs * msgs); 53 50 54 void tr_peerMsgsCancel (tr_peermsgs * msgs, 55 tr_block_index_t block); 51 bool tr_peerMsgsIsClientChoked (const tr_peerMsgs * msgs); 56 52 57 void tr_peerMsgsFree (tr_peermsgs* msgs);53 bool tr_peerMsgsIsClientInterested (const tr_peerMsgs * msgs); 58 54 59 size_t tr_generateAllowedSet (tr_piece_index_t * setmePieces, 60 size_t desiredSetSize, 61 size_t pieceCount, 62 const uint8_t * infohash, 63 const struct tr_address * addr); 55 time_t tr_peerMsgsGetConnectionAge (const tr_peerMsgs * msgs); 56 57 bool tr_peerMsgsIsUtpConnection (const tr_peerMsgs * msgs); 58 59 bool tr_peerMsgsIsEncrypted (const tr_peerMsgs * msgs); 60 61 bool tr_peerMsgsIsIncomingConnection (const tr_peerMsgs * msgs); 62 63 void tr_peerMsgsSetChoke (tr_peerMsgs * msgs, 64 bool peerIsChoked); 65 66 int tr_peerMsgsIsReadingBlock (const tr_peerMsgs * msgs, 67 tr_block_index_t block); 68 69 void tr_peerMsgsSetInterested (tr_peerMsgs * msgs, 70 bool clientIsInterested); 71 72 void tr_peerMsgsHave (tr_peerMsgs * msgs, 73 uint32_t pieceIndex); 74 75 void tr_peerMsgsPulse (tr_peerMsgs * msgs); 76 77 void tr_peerMsgsCancel (tr_peerMsgs * msgs, 78 tr_block_index_t block); 79 80 size_t tr_generateAllowedSet (tr_piece_index_t * setmePieces, 81 size_t desiredSetSize, 82 size_t pieceCount, 83 const uint8_t * infohash, 84 const struct tr_address * addr); 64 85 65 86 -
trunk/libtransmission/webseed.c
r13900 r13954 30 30 struct tr_webseed_task 31 31 { 32 struct evbuffer * content; 33 struct tr_webseed * webseed; 34 tr_block_index_t block; 35 tr_piece_index_t piece_index; 36 uint32_t piece_offset; 37 uint32_t length; 38 tr_block_index_t blocks_done; 39 uint32_t block_size; 40 struct tr_web_task * web_task; 41 long response_code; 32 bool dead; 33 struct evbuffer * content; 34 struct tr_webseed * webseed; 35 tr_session * session; 36 tr_block_index_t block; 37 tr_piece_index_t piece_index; 38 uint32_t piece_offset; 39 uint32_t length; 40 tr_block_index_t blocks_done; 41 uint32_t block_size; 42 struct tr_web_task * web_task; 43 long response_code; 42 44 }; 43 45 44 46 struct tr_webseed 45 47 { 46 tr_peer parent; 47 tr_bandwidth bandwidth; 48 tr_session * session; 49 tr_peer_callback * callback; 50 void * callback_data; 51 tr_list * tasks; 52 struct event * timer; 53 char * base_url; 54 size_t base_url_len; 55 int torrent_id; 56 bool is_stopping; 57 int consecutive_failures; 58 int retry_tickcount; 59 int retry_challenge; 60 int idle_connections; 61 int active_transfers; 62 char ** file_urls; 48 tr_peer parent; 49 tr_bandwidth bandwidth; 50 tr_session * session; 51 tr_peer_callback * callback; 52 void * callback_data; 53 tr_list * tasks; 54 struct event * timer; 55 char * base_url; 56 size_t base_url_len; 57 int torrent_id; 58 int consecutive_failures; 59 int retry_tickcount; 60 int retry_challenge; 61 int idle_connections; 62 int active_transfers; 63 char ** file_urls; 63 64 }; 64 65 65 66 enum 66 67 { 67 68 69 70 71 72 73 68 TR_IDLE_TIMER_MSEC = 2000, 69 70 FAILURE_RETRY_INTERVAL = 150, 71 72 MAX_CONSECUTIVE_FAILURES = 5, 73 74 MAX_WEBSEED_CONNECTIONS = 4 74 75 }; 75 76 static void77 webseed_free (struct tr_webseed * w)78 {79 tr_torrent * tor = tr_torrentFindFromId (w->session, w->torrent_id);80 const tr_info * inf = tr_torrentInfo (tor);81 tr_file_index_t i;82 83 /* if we have an array of file URLs, free it */84 if (w->file_urls != NULL) {85 for (i=0; i<inf->fileCount; ++i)86 tr_free (w->file_urls[i]);87 tr_free (w->file_urls);88 }89 90 /* webseed destruct */91 event_free (w->timer);92 tr_bandwidthDestruct (&w->bandwidth);93 tr_free (w->base_url);94 95 /* parent class destruct */96 tr_peerDestruct (tor, &w->parent);97 98 tr_free (w);99 }100 76 101 77 /*** … … 111 87 112 88 static void 113 fire_client_got_rejs (tr_torrent * tor, tr_webseed * w, 114 tr_block_index_t block, tr_block_index_t count) 115 { 116 tr_block_index_t i; 117 tr_peer_event e = TR_PEER_EVENT_INIT; 118 e.eventType = TR_PEER_CLIENT_GOT_REJ; 119 tr_torrentGetBlockLocation (tor, block, &e.pieceIndex, &e.offset, &e.length); 120 for (i = 1; i <= count; i++) { 121 if (i == count) 122 e.length = tr_torBlockCountBytes (tor, block + count - 1); 123 publish (w, &e); 124 e.offset += e.length; 125 } 126 } 127 128 static void 129 fire_client_got_blocks (tr_torrent * tor, tr_webseed * w, 130 tr_block_index_t block, tr_block_index_t count) 131 { 132 tr_block_index_t i; 133 tr_peer_event e = TR_PEER_EVENT_INIT; 134 e.eventType = TR_PEER_CLIENT_GOT_BLOCK; 135 tr_torrentGetBlockLocation (tor, block, &e.pieceIndex, &e.offset, &e.length); 136 for (i = 1; i <= count; i++) { 137 if (i == count) 138 e.length = tr_torBlockCountBytes (tor, block + count - 1); 139 publish (w, &e); 140 e.offset += e.length; 89 fire_client_got_rejs (tr_torrent * tor, 90 tr_webseed * w, 91 tr_block_index_t block, 92 tr_block_index_t count) 93 { 94 tr_block_index_t i; 95 tr_peer_event e = TR_PEER_EVENT_INIT; 96 e.eventType = TR_PEER_CLIENT_GOT_REJ; 97 tr_torrentGetBlockLocation (tor, block, &e.pieceIndex, &e.offset, &e.length); 98 for (i = 1; i <= count; i++) 99 { 100 if (i == count) 101 e.length = tr_torBlockCountBytes (tor, block + count - 1); 102 publish (w, &e); 103 e.offset += e.length; 104 } 105 } 106 107 static void 108 fire_client_got_blocks (tr_torrent * tor, 109 tr_webseed * w, 110 tr_block_index_t block, 111 tr_block_index_t count) 112 { 113 tr_block_index_t i; 114 tr_peer_event e = TR_PEER_EVENT_INIT; 115 e.eventType = TR_PEER_CLIENT_GOT_BLOCK; 116 tr_torrentGetBlockLocation (tor, block, &e.pieceIndex, &e.offset, &e.length); 117 for (i = 1; i <= count; i++) 118 { 119 if (i == count) 120 e.length = tr_torBlockCountBytes (tor, block + count - 1); 121 publish (w, &e); 122 e.offset += e.length; 141 123 } 142 124 } … … 145 127 fire_client_got_piece_data (tr_webseed * w, uint32_t length) 146 128 { 147 148 149 150 129 tr_peer_event e = TR_PEER_EVENT_INIT; 130 e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA; 131 e.length = length; 132 publish (w, &e); 151 133 } 152 134 … … 157 139 struct write_block_data 158 140 { 159 struct tr_webseed * webseed; 160 struct evbuffer * content; 161 tr_piece_index_t piece_index; 162 tr_block_index_t block_index; 163 tr_block_index_t count; 164 uint32_t block_offset; 141 tr_session * session; 142 int torrent_id; 143 struct tr_webseed * webseed; 144 struct evbuffer * content; 145 tr_piece_index_t piece_index; 146 tr_block_index_t block_index; 147 tr_block_index_t count; 148 uint32_t block_offset; 165 149 }; 166 150 … … 168 152 write_block_func (void * vdata) 169 153 { 170 171 172 173 174 175 tor = tr_torrentFindFromId (w->session, w->torrent_id);176 if (tor)177 { 178 179 180 181 tr_cache * cache = w->session->cache;182 183 184 154 struct write_block_data * data = vdata; 155 struct tr_webseed * w = data->webseed; 156 struct evbuffer * buf = data->content; 157 struct tr_torrent * tor; 158 159 tor = tr_torrentFindFromId (data->session, data->torrent_id); 160 if (tor != NULL) 161 { 162 const uint32_t block_size = tor->blockSize; 163 uint32_t len = evbuffer_get_length (buf); 164 const uint32_t offset_end = data->block_offset + len; 165 tr_cache * cache = data->session->cache; 166 const tr_piece_index_t piece = data->piece_index; 167 168 while (len > 0) 185 169 { 186 187 188 170 const uint32_t bytes_this_pass = MIN (len, block_size); 171 tr_cacheWriteBlock (cache, tor, piece, offset_end - len, bytes_this_pass, buf); 172 len -= bytes_this_pass; 189 173 } 190 174 191 192 } 193 194 195 175 fire_client_got_blocks (tor, w, data->block_index, data->count); 176 } 177 178 evbuffer_free (buf); 179 tr_free (data); 196 180 } 197 181 … … 202 186 struct connection_succeeded_data 203 187 { 204 205 206 207 188 struct tr_webseed * webseed; 189 char * real_url; 190 tr_piece_index_t piece_index; 191 uint32_t piece_offset; 208 192 }; 209 193 … … 211 195 connection_succeeded (void * vdata) 212 196 { 213 tr_torrent * tor; 214 struct connection_succeeded_data * data = vdata; 215 struct tr_webseed * w = data->webseed; 216 217 if (++w->active_transfers >= w->retry_challenge && w->retry_challenge) 218 /* the server seems to be accepting more connections now */ 219 w->consecutive_failures = w->retry_tickcount = w->retry_challenge = 0; 220 221 if (data->real_url && 222 (tor = tr_torrentFindFromId (w->session, w->torrent_id))) 223 { 224 uint64_t file_offset; 225 tr_file_index_t file_index; 226 227 tr_ioFindFileLocation (tor, data->piece_index, data->piece_offset, 228 &file_index, &file_offset); 229 tr_free (w->file_urls[file_index]); 230 w->file_urls[file_index] = data->real_url; 231 } 197 tr_torrent * tor; 198 struct connection_succeeded_data * data = vdata; 199 struct tr_webseed * w = data->webseed; 200 201 if (++w->active_transfers >= w->retry_challenge && w->retry_challenge) 202 /* the server seems to be accepting more connections now */ 203 w->consecutive_failures = w->retry_tickcount = w->retry_challenge = 0; 204 205 if (data->real_url && (tor = tr_torrentFindFromId (w->session, w->torrent_id))) 206 { 207 uint64_t file_offset; 208 tr_file_index_t file_index; 209 210 tr_ioFindFileLocation (tor, data->piece_index, data->piece_offset, 211 &file_index, &file_offset); 212 tr_free (w->file_urls[file_index]); 213 w->file_urls[file_index] = data->real_url; 214 data->real_url = NULL; 215 } 216 217 tr_free (data->real_url); 218 tr_free (data); 232 219 } 233 220 … … 241 228 void * vtask) 242 229 { 243 uint32_t len; 244 const size_t n_added = info->n_added; 245 struct tr_webseed_task * task = vtask; 246 struct tr_webseed * w = task->webseed; 247 248 if (n_added <= 0) 249 return; 250 251 if (!w->is_stopping) 252 { 253 tr_bandwidthUsed (&w->bandwidth, TR_DOWN, n_added, true, tr_time_msec ()); 254 fire_client_got_piece_data (w, n_added); 255 } 256 257 len = evbuffer_get_length (buf); 258 259 if (!task->response_code) 260 { 261 tr_webGetTaskInfo (task->web_task, TR_WEB_GET_CODE, &task->response_code); 262 263 if (task->response_code == 206) 230 const size_t n_added = info->n_added; 231 struct tr_webseed_task * task = vtask; 232 tr_session * session = task->session; 233 234 tr_sessionLock (session); 235 236 if (!task->dead && (n_added>0)) 237 { 238 uint32_t len; 239 struct tr_webseed * w = task->webseed; 240 241 tr_bandwidthUsed (&w->bandwidth, TR_DOWN, n_added, true, tr_time_msec ()); 242 fire_client_got_piece_data (w, n_added); 243 len = evbuffer_get_length (buf); 244 245 if (!task->response_code) 264 246 { 265 const char * url; 266 struct connection_succeeded_data * data; 267 268 url = NULL; 269 tr_webGetTaskInfo (task->web_task, TR_WEB_GET_REAL_URL, &url); 270 271 data = tr_new (struct connection_succeeded_data, 1); 272 data->webseed = w; 273 data->real_url = tr_strdup (url); 274 data->piece_index = task->piece_index; 275 data->piece_offset = task->piece_offset 276 + (task->blocks_done * task->block_size) 277 + (len - 1); 278 279 /* processing this uses a tr_torrent pointer, 280 so push the work to the libevent thread... */ 281 tr_runInEventThread (w->session, connection_succeeded, data); 247 tr_webGetTaskInfo (task->web_task, TR_WEB_GET_CODE, &task->response_code); 248 249 if (task->response_code == 206) 250 { 251 const char * url; 252 struct connection_succeeded_data * data; 253 254 url = NULL; 255 tr_webGetTaskInfo (task->web_task, TR_WEB_GET_REAL_URL, &url); 256 257 data = tr_new (struct connection_succeeded_data, 1); 258 data->webseed = w; 259 data->real_url = tr_strdup (url); 260 data->piece_index = task->piece_index; 261 data->piece_offset = task->piece_offset + (task->blocks_done * task->block_size) + (len - 1); 262 263 /* processing this uses a tr_torrent pointer, 264 so push the work to the libevent thread... */ 265 tr_runInEventThread (w->session, connection_succeeded, data); 266 } 282 267 } 283 } 284 285 if ((task->response_code == 206) && (len >= task->block_size)) 286 { 287 /* once we've got at least one full block, save it */ 288 289 struct write_block_data * data; 290 const uint32_t block_size = task->block_size; 291 const tr_block_index_t completed = len / block_size; 292 293 data = tr_new (struct write_block_data, 1); 294 data->webseed = task->webseed; 295 data->piece_index = task->piece_index; 296 data->block_index = task->block + task->blocks_done; 297 data->count = completed; 298 data->block_offset = task->piece_offset + task->blocks_done * block_size; 299 data->content = evbuffer_new (); 300 301 /* we don't use locking on this evbuffer so we must copy out the data 302 that will be needed when writing the block in a different thread */ 303 evbuffer_remove_buffer (task->content, data->content, 304 block_size * completed); 305 306 tr_runInEventThread (w->session, write_block_func, data); 307 task->blocks_done += completed; 308 } 268 269 if ((task->response_code == 206) && (len >= task->block_size)) 270 { 271 /* once we've got at least one full block, save it */ 272 273 struct write_block_data * data; 274 const uint32_t block_size = task->block_size; 275 const tr_block_index_t completed = len / block_size; 276 277 data = tr_new (struct write_block_data, 1); 278 data->webseed = task->webseed; 279 data->piece_index = task->piece_index; 280 data->block_index = task->block + task->blocks_done; 281 data->count = completed; 282 data->block_offset = task->piece_offset + task->blocks_done * block_size; 283 data->content = evbuffer_new (); 284 data->torrent_id = w->torrent_id; 285 data->session = w->session; 286 287 /* we don't use locking on this evbuffer so we must copy out the data 288 that will be needed when writing the block in a different thread */ 289 evbuffer_remove_buffer (task->content, data->content, 290 block_size * completed); 291 292 tr_runInEventThread (w->session, write_block_func, data); 293 task->blocks_done += completed; 294 } 295 } 296 297 tr_sessionUnlock (session); 309 298 } 310 299 311 300 static void task_request_next_chunk (struct tr_webseed_task * task); 312 301 313 static bool314 webseed_has_tasks (const tr_webseed * w)315 {316 return w->tasks != NULL;317 }318 319 320 302 static void 321 303 on_idle (tr_webseed * w) 322 304 { 323 tr_torrent * tor = tr_torrentFindFromId (w->session, w->torrent_id); 324 int want, running_tasks = tr_list_size (w->tasks); 325 326 if (w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES) { 327 want = w->idle_connections; 328 329 if (w->retry_tickcount >= FAILURE_RETRY_INTERVAL) { 330 /* some time has passed since our connection attempts failed. try again */ 331 ++want; 332 /* if this challenge is fulfilled we will reset consecutive_failures */ 333 w->retry_challenge = running_tasks + want; 305 int want; 306 int running_tasks = tr_list_size (w->tasks); 307 tr_torrent * tor = tr_torrentFindFromId (w->session, w->torrent_id); 308 309 if (w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES) 310 { 311 want = w->idle_connections; 312 313 if (w->retry_tickcount >= FAILURE_RETRY_INTERVAL) 314 { 315 /* some time has passed since our connection attempts failed. try again */ 316 ++want; 317 /* if this challenge is fulfilled we will reset consecutive_failures */ 318 w->retry_challenge = running_tasks + want; 334 319 } 335 320 } 336 else { 337 want = MAX_WEBSEED_CONNECTIONS - running_tasks; 338 w->retry_challenge = running_tasks + w->idle_connections + 1; 339 } 340 341 if (w->is_stopping && !webseed_has_tasks (w)) 342 { 343 webseed_free (w); 344 } 345 else if (!w->is_stopping && tor 346 && tor->isRunning 347 && !tr_torrentIsSeed (tor) 348 && want) 349 { 350 int i; 351 int got = 0; 352 tr_block_index_t * blocks = NULL; 353 354 blocks = tr_new (tr_block_index_t, want*2); 355 tr_peerMgrGetNextRequests (tor, &w->parent, want, blocks, &got, true); 356 357 w->idle_connections -= MIN (w->idle_connections, got); 358 if (w->retry_tickcount >= FAILURE_RETRY_INTERVAL && got == want) 359 w->retry_tickcount = 0; 360 361 for (i=0; i<got; ++i) 321 else 322 { 323 want = MAX_WEBSEED_CONNECTIONS - running_tasks; 324 w->retry_challenge = running_tasks + w->idle_connections + 1; 325 } 326 327 if (tor && tor->isRunning && !tr_torrentIsSeed (tor) && want) 328 { 329 int i; 330 int got = 0; 331 tr_block_index_t * blocks = NULL; 332 333 blocks = tr_new (tr_block_index_t, want*2); 334 tr_peerMgrGetNextRequests (tor, &w->parent, want, blocks, &got, true); 335 336 w->idle_connections -= MIN (w->idle_connections, got); 337 if (w->retry_tickcount >= FAILURE_RETRY_INTERVAL && got == want) 338 w->retry_tickcount = 0; 339 340 for (i=0; i<got; ++i) 362 341 { 363 const tr_block_index_t b = blocks[i*2]; 364 const tr_block_index_t be = blocks[i*2+1]; 365 struct tr_webseed_task * task = tr_new (struct tr_webseed_task, 1); 366 task->webseed = w; 367 task->block = b; 368 task->piece_index = tr_torBlockPiece (tor, b); 369 task->piece_offset = (tor->blockSize * b) 370 - (tor->info.pieceSize * task->piece_index); 371 task->length = (be - b) * tor->blockSize + tr_torBlockCountBytes (tor, be); 372 task->blocks_done = 0; 373 task->response_code = 0; 374 task->block_size = tor->blockSize; 375 task->content = evbuffer_new (); 376 evbuffer_add_cb (task->content, on_content_changed, task); 377 tr_list_append (&w->tasks, task); 378 task_request_next_chunk (task); 342 const tr_block_index_t b = blocks[i*2]; 343 const tr_block_index_t be = blocks[i*2+1]; 344 struct tr_webseed_task * task; 345 346 task = tr_new0 (struct tr_webseed_task, 1); 347 task->session = tor->session; 348 task->webseed = w; 349 task->block = b; 350 task->piece_index = tr_torBlockPiece (tor, b); 351 task->piece_offset = (tor->blockSize * b) - (tor->info.pieceSize * task->piece_index); 352 task->length = (be - b) * tor->blockSize + tr_torBlockCountBytes (tor, be); 353 task->blocks_done = 0; 354 task->response_code = 0; 355 task->block_size = tor->blockSize; 356 task->content = evbuffer_new (); 357 evbuffer_add_cb (task->content, on_content_changed, task); 358 tr_list_append (&w->tasks, task); 359 task_request_next_chunk (task); 379 360 } 380 361 381 362 tr_free (blocks); 382 363 } 383 364 } … … 393 374 void * vtask) 394 375 { 395 struct tr_webseed_task * t = vtask; 396 tr_webseed * w = t->webseed; 397 tr_torrent * tor = tr_torrentFindFromId (session, w->torrent_id); 398 const int success = (response_code == 206); 399 400 if (tor) 401 { 402 /* active_transfers was only increased if the connection was successful */ 403 if (t->response_code == 206) 404 --w->active_transfers; 405 406 if (!success) 376 tr_webseed * w; 377 tr_torrent * tor; 378 struct tr_webseed_task * t = vtask; 379 const int success = (response_code == 206); 380 381 if (t->dead) 382 { 383 evbuffer_free (t->content); 384 tr_free (t); 385 return; 386 } 387 388 w = t->webseed; 389 tor = tr_torrentFindFromId (session, w->torrent_id); 390 if (tor != NULL) 391 { 392 /* active_transfers was only increased if the connection was successful */ 393 if (t->response_code == 206) 394 --w->active_transfers; 395 396 if (!success) 407 397 { 408 398 const tr_block_index_t blocks_remain = (t->length + tor->blockSize - 1) 409 399 / tor->blockSize - t->blocks_done; 410 400 411 412 413 414 415 416 417 418 419 420 421 422 401 if (blocks_remain) 402 fire_client_got_rejs (tor, w, t->block + t->blocks_done, blocks_remain); 403 404 if (t->blocks_done) 405 ++w->idle_connections; 406 else if (++w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES && !w->retry_tickcount) 407 /* now wait a while until retrying to establish a connection */ 408 ++w->retry_tickcount; 409 410 tr_list_remove_data (&w->tasks, t); 411 evbuffer_free (t->content); 412 tr_free (t); 423 413 } 424 414 else 425 415 { 426 427 428 429 416 const uint32_t bytes_done = t->blocks_done * tor->blockSize; 417 const uint32_t buf_len = evbuffer_get_length (t->content); 418 419 if (bytes_done + buf_len < t->length) 430 420 { 431 432 means we've reached the end of a file and need to request the next one */433 434 421 /* request finished successfully but there's still data missing. that 422 means we've reached the end of a file and need to request the next one */ 423 t->response_code = 0; 424 task_request_next_chunk (t); 435 425 } 436 426 else 437 427 { 438 if (buf_len) { 439 /* on_content_changed () will not write a block if it is smaller than 440 the torrent's block size, i.e. the torrent's very last block */ 441 tr_cacheWriteBlock (session->cache, tor, 442 t->piece_index, t->piece_offset + bytes_done, 443 buf_len, t->content); 444 445 fire_client_got_blocks (tor, t->webseed, 446 t->block + t->blocks_done, 1); 428 if (buf_len) 429 { 430 /* on_content_changed () will not write a block if it is smaller than 431 the torrent's block size, i.e. the torrent's very last block */ 432 tr_cacheWriteBlock (session->cache, tor, 433 t->piece_index, t->piece_offset + bytes_done, 434 buf_len, t->content); 435 436 fire_client_got_blocks (tor, t->webseed, 437 t->block + t->blocks_done, 1); 447 438 } 448 439 449 450 451 452 453 454 455 440 ++w->idle_connections; 441 442 tr_list_remove_data (&w->tasks, t); 443 evbuffer_free (t->content); 444 tr_free (t); 445 446 on_idle (w); 456 447 } 457 448 } … … 462 453 make_url (tr_webseed * w, const tr_file * file) 463 454 { 464 465 466 467 468 469 470 471 472 455 struct evbuffer * buf = evbuffer_new (); 456 457 evbuffer_add (buf, w->base_url, w->base_url_len); 458 459 /* if url ends with a '/', add the torrent name */ 460 if (w->base_url[w->base_url_len - 1] == '/' && file->name) 461 tr_http_escape (buf, file->name, strlen (file->name), false); 462 463 return buf; 473 464 } 474 465 … … 476 467 task_request_next_chunk (struct tr_webseed_task * t) 477 468 { 478 tr_webseed * w = t->webseed; 479 tr_torrent * tor = tr_torrentFindFromId (w->session, w->torrent_id); 480 if (tor != NULL) 481 { 482 char range[64]; 483 char ** urls = t->webseed->file_urls; 484 485 const tr_info * inf = tr_torrentInfo (tor); 486 const uint64_t remain = t->length - t->blocks_done * tor->blockSize 487 - evbuffer_get_length (t->content); 488 489 const uint64_t total_offset = tr_pieceOffset (tor, t->piece_index, 490 t->piece_offset, 491 t->length - remain); 492 const tr_piece_index_t step_piece = total_offset / inf->pieceSize; 493 const uint64_t step_piece_offset 494 = total_offset - (inf->pieceSize * step_piece); 495 496 tr_file_index_t file_index; 497 const tr_file * file; 498 uint64_t file_offset; 499 uint64_t this_pass; 500 501 tr_ioFindFileLocation (tor, step_piece, step_piece_offset, 502 &file_index, &file_offset); 503 file = &inf->files[file_index]; 504 this_pass = MIN (remain, file->length - file_offset); 505 506 if (!urls[file_index]) 507 urls[file_index] = evbuffer_free_to_str (make_url (t->webseed, file)); 508 509 tr_snprintf (range, sizeof range, "%"PRIu64"-%"PRIu64, 510 file_offset, file_offset + this_pass - 1); 511 t->web_task = tr_webRunWithBuffer (w->session, urls[file_index], 512 range, NULL, web_response_func, t, t->content); 513 } 514 } 515 516 bool 517 tr_webseedGetSpeed_Bps (const tr_webseed * w, 518 uint64_t now, 519 unsigned int * setme_Bps) 520 { 521 const bool is_active = webseed_has_tasks (w); 522 *setme_Bps = is_active ? tr_bandwidthGetPieceSpeed_Bps (&w->bandwidth, now, TR_DOWN) : 0; 523 return is_active; 524 } 525 526 bool 527 tr_webseedIsActive (const tr_webseed * w) 528 { 529 unsigned int Bps = 0; 530 return tr_webseedGetSpeed_Bps (w, tr_time_msec (), &Bps) && (Bps > 0); 469 tr_webseed * w = t->webseed; 470 tr_torrent * tor = tr_torrentFindFromId (w->session, w->torrent_id); 471 if (tor != NULL) 472 { 473 char range[64]; 474 char ** urls = t->webseed->file_urls; 475 476 const tr_info * inf = tr_torrentInfo (tor); 477 const uint64_t remain = t->length - t->blocks_done * tor->blockSize 478 - evbuffer_get_length (t->content); 479 480 const uint64_t total_offset = tr_pieceOffset (tor, t->piece_index, 481 t->piece_offset, 482 t->length - remain); 483 const tr_piece_index_t step_piece = total_offset / inf->pieceSize; 484 const uint64_t step_piece_offset = total_offset - (inf->pieceSize * step_piece); 485 486 tr_file_index_t file_index; 487 const tr_file * file; 488 uint64_t file_offset; 489 uint64_t this_pass; 490 491 tr_ioFindFileLocation (tor, step_piece, step_piece_offset, 492 &file_index, &file_offset); 493 file = &inf->files[file_index]; 494 this_pass = MIN (remain, file->length - file_offset); 495 496 if (!urls[file_index]) 497 urls[file_index] = evbuffer_free_to_str (make_url (t->webseed, file)); 498 499 tr_snprintf (range, sizeof range, "%"PRIu64"-%"PRIu64, 500 file_offset, file_offset + this_pass - 1); 501 t->web_task = tr_webRunWithBuffer (w->session, urls[file_index], 502 range, NULL, web_response_func, t, t->content); 503 } 531 504 } 532 505 … … 538 511 webseed_timer_func (evutil_socket_t foo UNUSED, short bar UNUSED, void * vw) 539 512 { 540 tr_webseed * w = vw; 541 if (w->retry_tickcount) 542 ++w->retry_tickcount; 543 on_idle (w); 544 tr_timerAddMsec (w->timer, TR_IDLE_TIMER_MSEC); 545 } 513 tr_webseed * w = vw; 514 515 if (w->retry_tickcount) 516 ++w->retry_tickcount; 517 518 on_idle (w); 519 520 tr_timerAddMsec (w->timer, TR_IDLE_TIMER_MSEC); 521 } 522 523 /*** 524 **** tr_peer virtual functions 525 ***/ 526 527 static bool 528 webseed_is_transferring_pieces (const tr_peer * peer, 529 uint64_t now, 530 tr_direction direction, 531 unsigned int * setme_Bps) 532 { 533 unsigned int Bps = 0; 534 bool is_active = false; 535 536 if (direction == TR_DOWN) 537 { 538 const tr_webseed * w = (const tr_webseed *) peer; 539 is_active = w->tasks != NULL; 540 Bps = tr_bandwidthGetPieceSpeed_Bps (&w->bandwidth, now, direction); 541 } 542 543 if (setme_Bps != NULL) 544 *setme_Bps = Bps; 545 546 return is_active; 547 } 548 549 static void 550 webseed_destruct (tr_peer * peer) 551 { 552 tr_list * l; 553 tr_webseed * w = (tr_webseed *) peer; 554 555 /* flag all the pending tasks as dead */ 556 for (l=w->tasks; l!=NULL; l=l->next) 557 { 558 struct tr_webseed_task * task = l->data; 559 task->dead = true; 560 } 561 tr_list_free (&w->tasks, NULL); 562 563 /* if we have an array of file URLs, free it */ 564 if (w->file_urls != NULL) 565 { 566 tr_file_index_t i; 567 tr_torrent * tor = tr_torrentFindFromId (w->session, w->torrent_id); 568 const tr_info * inf = tr_torrentInfo (tor); 569 570 for (i=0; i<inf->fileCount; ++i) 571 tr_free (w->file_urls[i]); 572 tr_free (w->file_urls); 573 } 574 575 /* webseed destruct */ 576 event_free (w->timer); 577 tr_bandwidthDestruct (&w->bandwidth); 578 tr_free (w->base_url); 579 580 /* parent class destruct */ 581 tr_peerDestruct (&w->parent); 582 } 583 584 static const struct tr_peer_virtual_funcs my_funcs = 585 { 586 .destruct = webseed_destruct, 587 .is_transferring_pieces = webseed_is_transferring_pieces 588 }; 589 590 /*** 591 **** 592 ***/ 546 593 547 594 tr_webseed* … … 551 598 void * callback_data) 552 599 { 553 tr_webseed * w = tr_new0 (tr_webseed, 1); 554 tr_peer * peer = &w->parent; 555 const tr_info * inf = tr_torrentInfo (tor); 556 557 /* construct parent class */ 558 tr_peerConstruct (peer); 559 peer->peerIsChoked = true; 560 peer->clientIsInterested = !tr_torrentIsSeed (tor); 561 peer->client = TR_KEY_webseeds; 562 tr_bitfieldSetHasAll (&peer->have); 563 tr_peerUpdateProgress (tor, peer); 564 565 w->torrent_id = tr_torrentId (tor); 566 w->session = tor->session; 567 w->base_url_len = strlen (url); 568 w->base_url = tr_strndup (url, w->base_url_len); 569 w->callback = callback; 570 w->callback_data = callback_data; 571 w->file_urls = tr_new0 (char *, inf->fileCount); 572 //tr_rcConstruct (&w->download_rate); 573 tr_bandwidthConstruct (&w->bandwidth, tor->session, &tor->bandwidth); 574 w->timer = evtimer_new (w->session->event_base, webseed_timer_func, w); 575 tr_timerAddMsec (w->timer, TR_IDLE_TIMER_MSEC); 576 return w; 577 } 578 579 void 580 tr_webseedFree (tr_webseed * w) 581 { 582 if (w) 583 { 584 if (webseed_has_tasks (w)) 585 w->is_stopping = true; 586 else 587 webseed_free (w); 588 } 589 } 600 tr_webseed * w = tr_new0 (tr_webseed, 1); 601 tr_peer * peer = &w->parent; 602 const tr_info * inf = tr_torrentInfo (tor); 603 604 /* construct parent class */ 605 tr_peerConstruct (peer, tor); 606 peer->client = TR_KEY_webseeds; 607 peer->funcs = &my_funcs; 608 tr_bitfieldSetHasAll (&peer->have); 609 tr_peerUpdateProgress (tor, peer); 610 611 w->torrent_id = tr_torrentId (tor); 612 w->session = tor->session; 613 w->base_url_len = strlen (url); 614 w->base_url = tr_strndup (url, w->base_url_len); 615 w->callback = callback; 616 w->callback_data = callback_data; 617 w->file_urls = tr_new0 (char *, inf->fileCount); 618 //tr_rcConstruct (&w->download_rate); 619 tr_bandwidthConstruct (&w->bandwidth, tor->session, &tor->bandwidth); 620 w->timer = evtimer_new (w->session->event_base, webseed_timer_func, w); 621 tr_timerAddMsec (w->timer, TR_IDLE_TIMER_MSEC); 622 return w; 623 } -
trunk/libtransmission/webseed.h
r13625 r13954 27 27 void * callback_data); 28 28 29 void tr_webseedFree (tr_webseed *);30 31 /** @return true if a request is being processed, or false if idle */32 bool tr_webseedGetSpeed_Bps (const tr_webseed * w,33 uint64_t now,34 unsigned int * setme_Bps);35 36 /** @return true if a request is being processed, or false if idle */37 bool tr_webseedIsActive (const tr_webseed * w);38 39 40 29 #endif
Note: See TracChangeset
for help on using the changeset viewer.