You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

607 lines
24 KiB

4 years ago
  1. # Extension pipelining
  2. `websocket-extensions` models the extension negotiation and processing pipeline
  3. of the WebSocket protocol. Between the driver parsing messages from the TCP
  4. stream and handing those messages off to the application, there may exist a
  5. stack of extensions that transform the message somehow.
  6. In the parlance of this framework, a *session* refers to a single instance of an
  7. extension, acting on a particular socket on either the server or the client
  8. side. A session may transform messages both incoming to the application and
  9. outgoing from the application, for example the `permessage-deflate` extension
  10. compresses outgoing messages and decompresses incoming messages. Message streams
  11. in either direction are independent; that is, incoming and outgoing messages
  12. cannot be assumed to 'pair up' as in a request-response protocol.
  13. Asynchronous processing of messages poses a number of problems that this
  14. pipeline construction is intended to solve.
  15. ## Overview
  16. Logically, we have the following:
  17. +-------------+ out +---+ +---+ +---+ +--------+
  18. | |------>| |---->| |---->| |------>| |
  19. | Application | | A | | B | | C | | Driver |
  20. | |<------| |<----| |<----| |<------| |
  21. +-------------+ in +---+ +---+ +---+ +--------+
  22. \ /
  23. +----------o----------+
  24. |
  25. sessions
  26. For outgoing messages, the driver receives the result of
  27. C.outgoing(B.outgoing(A.outgoing(message)))
  28. or, [A, B, C].reduce(((m, ext) => ext.outgoing(m)), message)
  29. For incoming messages, the application receives the result of
  30. A.incoming(B.incoming(C.incoming(message)))
  31. or, [C, B, A].reduce(((m, ext) => ext.incoming(m)), message)
  32. A session is of the following type, to borrow notation from pseudo-Haskell:
  33. type Session = {
  34. incoming :: Message -> Message
  35. outgoing :: Message -> Message
  36. close :: () -> ()
  37. }
  38. (That `() -> ()` syntax is intended to mean that `close()` is a nullary void
  39. method; I apologise to any Haskell readers for not using the right monad.)
  40. The `incoming()` and `outgoing()` methods perform message transformation in the
  41. respective directions; `close()` is called when a socket closes so the session
  42. can release any resources it's holding, for example a DEFLATE de/compression
  43. context.
  44. However because this is JavaScript, the `incoming()` and `outgoing()` methods
  45. may be asynchronous (indeed, `permessage-deflate` is based on `zlib`, whose API
  46. is stream-based). So their interface is strictly:
  47. type Session = {
  48. incoming :: Message -> Callback -> ()
  49. outgoing :: Message -> Callback -> ()
  50. close :: () -> ()
  51. }
  52. type Callback = Either Error Message -> ()
  53. This means a message *m2* can be pushed into a session while it's still
  54. processing the preceding message *m1*. The messages can be processed
  55. concurrently but they *must* be given to the next session in line (or to the
  56. application) in the same order they came in. Applications will expect to receive
  57. messages in the order they arrived over the wire, and sessions require this too.
  58. So ordering of messages must be preserved throughout the pipeline.
  59. Consider the following highly simplified extension that deflates messages on the
  60. wire. `message` is a value conforming the type:
  61. type Message = {
  62. rsv1 :: Boolean
  63. rsv2 :: Boolean
  64. rsv3 :: Boolean
  65. opcode :: Number
  66. data :: Buffer
  67. }
  68. Here's the extension:
  69. ```js
  70. var zlib = require('zlib');
  71. var deflate = {
  72. outgoing: function(message, callback) {
  73. zlib.deflateRaw(message.data, function(error, result) {
  74. message.rsv1 = true;
  75. message.data = result;
  76. callback(error, message);
  77. });
  78. },
  79. incoming: function(message, callback) {
  80. // decompress inbound messages (elided)
  81. },
  82. close: function() {
  83. // no state to clean up
  84. }
  85. };
  86. ```
  87. We can call it with a large message followed by a small one, and the small one
  88. will be returned first:
  89. ```js
  90. var crypto = require('crypto'),
  91. large = crypto.randomBytes(1 << 14),
  92. small = new Buffer('hi');
  93. deflate.outgoing({data: large}, function() {
  94. console.log(1, 'large');
  95. });
  96. deflate.outgoing({data: small}, function() {
  97. console.log(2, 'small');
  98. });
  99. /* prints: 2 'small'
  100. 1 'large' */
  101. ```
  102. So a session that processes messages asynchronously may fail to preserve message
  103. ordering.
  104. Now, this extension is stateless, so it can process messages in any order and
  105. still produce the same output. But some extensions are stateful and require
  106. message order to be preserved.
  107. For example, when using `permessage-deflate` without `no_context_takeover` set,
  108. the session retains a DEFLATE de/compression context between messages, which
  109. accumulates state as it consumes data (later messages can refer to sections of
  110. previous ones to improve compression). Reordering parts of the DEFLATE stream
  111. will result in a failed decompression. Messages must be decompressed in the same
  112. order they were compressed by the peer in order for the DEFLATE protocol to
  113. work.
  114. Finally, there is the problem of closing a socket. When a WebSocket is closed by
  115. the application, or receives a closing request from the other peer, there may be
  116. messages outgoing from the application and incoming from the peer in the
  117. pipeline. If we close the socket and pipeline immediately, two problems arise:
  118. * We may send our own closing frame to the peer before all prior messages we
  119. sent have been written to the socket, and before we have finished processing
  120. all prior messages from the peer
  121. * The session may be instructed to close its resources (e.g. its de/compression
  122. context) while it's in the middle of processing a message, or before it has
  123. received messages that are upstream of it in the pipeline
  124. Essentially, we must defer closing the sessions and sending a closing frame
  125. until after all prior messages have exited the pipeline.
  126. ## Design goals
  127. * Message order must be preserved between the protocol driver, the extension
  128. sessions, and the application
  129. * Messages should be handed off to sessions and endpoints as soon as possible,
  130. to maximise throughput of stateless sessions
  131. * The closing procedure should block any further messages from entering the
  132. pipeline, and should allow all existing messages to drain
  133. * Sessions should be closed as soon as possible to prevent them holding memory
  134. and other resources when they have no more messages to handle
  135. * The closing API should allow the caller to detect when the pipeline is empty
  136. and it is safe to continue the WebSocket closing procedure
  137. * Individual extensions should remain as simple as possible to facilitate
  138. modularity and independent authorship
  139. The final point about modularity is an important one: this framework is designed
  140. to facilitate extensions existing as plugins, by decoupling the protocol driver,
  141. extensions, and application. In an ideal world, plugins should only need to
  142. contain code for their specific functionality, and not solve these problems that
  143. apply to all sessions. Also, solving some of these problems requires
  144. consideration of all active sessions collectively, which an individual session
  145. is incapable of doing.
  146. For example, it is entirely possible to take the simple `deflate` extension
  147. above and wrap its `incoming()` and `outgoing()` methods in two `Transform`
  148. streams, producing this type:
  149. type Session = {
  150. incoming :: TransformStream
  151. outtoing :: TransformStream
  152. close :: () -> ()
  153. }
  154. The `Transform` class makes it easy to wrap an async function such that message
  155. order is preserved:
  156. ```js
  157. var stream = require('stream'),
  158. session = new stream.Transform({objectMode: true});
  159. session._transform = function(message, _, callback) {
  160. var self = this;
  161. deflate.outgoing(message, function(error, result) {
  162. self.push(result);
  163. callback();
  164. });
  165. };
  166. ```
  167. However, this has a negative impact on throughput: it works by deferring
  168. `callback()` until the async function has 'returned', which blocks `Transform`
  169. from passing further input into the `_transform()` method until the current
  170. message is dealt with completely. This would prevent sessions from processing
  171. messages concurrently, and would unnecessarily reduce the throughput of
  172. stateless extensions.
  173. So, input should be handed off to sessions as soon as possible, and all we need
  174. is a mechanism to reorder the output so that message order is preserved for the
  175. next session in line.
  176. ## Solution
  177. We now describe the model implemented here and how it meets the above design
  178. goals. The above diagram where a stack of extensions sit between the driver and
  179. application describes the data flow, but not the object graph. That looks like
  180. this:
  181. +--------+
  182. | Driver |
  183. +---o----+
  184. |
  185. V
  186. +------------+ +----------+
  187. | Extensions o----->| Pipeline |
  188. +------------+ +-----o----+
  189. |
  190. +---------------+---------------+
  191. | | |
  192. +-----o----+ +-----o----+ +-----o----+
  193. | Cell [A] | | Cell [B] | | Cell [C] |
  194. +----------+ +----------+ +----------+
  195. A driver using this framework holds an instance of the `Extensions` class, which
  196. it uses to register extension plugins, negotiate headers and transform messages.
  197. The `Extensions` instance itself holds a `Pipeline`, which contains an array of
  198. `Cell` objects, each of which wraps one of the sessions.
  199. ### Message processing
  200. Both the `Pipeline` and `Cell` classes have `incoming()` and `outgoing()`
  201. methods; the `Pipeline` interface pushes messages into the pipe, delegates the
  202. message to each `Cell` in turn, then returns it back to the driver. Outgoing
  203. messages pass through `A` then `B` then `C`, and incoming messages in the
  204. reverse order.
  205. Internally, a `Cell` contains two `Functor` objects. A `Functor` wraps an async
  206. function and makes sure its output messages maintain the order of its input
  207. messages. This name is due to [@fronx](https://github.com/fronx), on the basis
  208. that, by preserving message order, the abstraction preserves the *mapping*
  209. between input and output messages. To use our simple `deflate` extension from
  210. above:
  211. ```js
  212. var functor = new Functor(deflate, 'outgoing');
  213. functor.call({data: large}, function() {
  214. console.log(1, 'large');
  215. });
  216. functor.call({data: small}, function() {
  217. console.log(2, 'small');
  218. });
  219. /* -> 1 'large'
  220. 2 'small' */
  221. ```
  222. A `Cell` contains two of these, one for each direction:
  223. +-----------------------+
  224. +---->| Functor [A, incoming] |
  225. +----------+ | +-----------------------+
  226. | Cell [A] o------+
  227. +----------+ | +-----------------------+
  228. +---->| Functor [A, outgoing] |
  229. +-----------------------+
  230. This satisfies the message transformation requirements: the `Pipeline` simply
  231. loops over the cells in the appropriate direction to transform each message.
  232. Because each `Cell` will preserve message order, we can pass a message to the
  233. next `Cell` in line as soon as the current `Cell` returns it. This gives each
  234. `Cell` all the messages in order while maximising throughput.
  235. ### Session closing
  236. We want to close each session as soon as possible, after all existing messages
  237. have drained. To do this, each `Cell` begins with a pending message counter in
  238. each direction, labelled `in` and `out` below.
  239. +----------+
  240. | Pipeline |
  241. +-----o----+
  242. |
  243. +---------------+---------------+
  244. | | |
  245. +-----o----+ +-----o----+ +-----o----+
  246. | Cell [A] | | Cell [B] | | Cell [C] |
  247. +----------+ +----------+ +----------+
  248. in: 0 in: 0 in: 0
  249. out: 0 out: 0 out: 0
  250. When a message *m1* enters the pipeline, say in the `outgoing` direction, we
  251. increment the `pending.out` counter on all cells immediately.
  252. +----------+
  253. m1 => | Pipeline |
  254. +-----o----+
  255. |
  256. +---------------+---------------+
  257. | | |
  258. +-----o----+ +-----o----+ +-----o----+
  259. | Cell [A] | | Cell [B] | | Cell [C] |
  260. +----------+ +----------+ +----------+
  261. in: 0 in: 0 in: 0
  262. out: 1 out: 1 out: 1
  263. *m1* is handed off to `A`, meanwhile a second message `m2` arrives in the same
  264. direction. All `pending.out` counters are again incremented.
  265. +----------+
  266. m2 => | Pipeline |
  267. +-----o----+
  268. |
  269. +---------------+---------------+
  270. m1 | | |
  271. +-----o----+ +-----o----+ +-----o----+
  272. | Cell [A] | | Cell [B] | | Cell [C] |
  273. +----------+ +----------+ +----------+
  274. in: 0 in: 0 in: 0
  275. out: 2 out: 2 out: 2
  276. When the first cell's `A.outgoing` functor finishes processing *m1*, the first
  277. `pending.out` counter is decremented and *m1* is handed off to cell `B`.
  278. +----------+
  279. | Pipeline |
  280. +-----o----+
  281. |
  282. +---------------+---------------+
  283. m2 | m1 | |
  284. +-----o----+ +-----o----+ +-----o----+
  285. | Cell [A] | | Cell [B] | | Cell [C] |
  286. +----------+ +----------+ +----------+
  287. in: 0 in: 0 in: 0
  288. out: 1 out: 2 out: 2
  289. As `B` finishes with *m1*, and as `A` finishes with *m2*, the `pending.out`
  290. counters continue to decrement.
  291. +----------+
  292. | Pipeline |
  293. +-----o----+
  294. |
  295. +---------------+---------------+
  296. | m2 | m1 |
  297. +-----o----+ +-----o----+ +-----o----+
  298. | Cell [A] | | Cell [B] | | Cell [C] |
  299. +----------+ +----------+ +----------+
  300. in: 0 in: 0 in: 0
  301. out: 0 out: 1 out: 2
  302. Say `C` is a little slow, and begins processing *m2* while still processing
  303. *m1*. That's fine, the `Functor` mechanism will keep *m1* ahead of *m2* in the
  304. output.
  305. +----------+
  306. | Pipeline |
  307. +-----o----+
  308. |
  309. +---------------+---------------+
  310. | | m2 | m1
  311. +-----o----+ +-----o----+ +-----o----+
  312. | Cell [A] | | Cell [B] | | Cell [C] |
  313. +----------+ +----------+ +----------+
  314. in: 0 in: 0 in: 0
  315. out: 0 out: 0 out: 2
  316. Once all messages are dealt with, the counters return to `0`.
  317. +----------+
  318. | Pipeline |
  319. +-----o----+
  320. |
  321. +---------------+---------------+
  322. | | |
  323. +-----o----+ +-----o----+ +-----o----+
  324. | Cell [A] | | Cell [B] | | Cell [C] |
  325. +----------+ +----------+ +----------+
  326. in: 0 in: 0 in: 0
  327. out: 0 out: 0 out: 0
  328. The same process applies in the `incoming` direction, the only difference being
  329. that messages are passed to `C` first.
  330. This makes closing the sessions quite simple. When the driver wants to close the
  331. socket, it calls `Pipeline.close()`. This *immediately* calls `close()` on all
  332. the cells. If a cell has `in == out == 0`, then it immediately calls
  333. `session.close()`. Otherwise, it stores the closing call and defers it until
  334. `in` and `out` have both ticked down to zero. The pipeline will not accept new
  335. messages after `close()` has been called, so we know the pending counts will not
  336. increase after this point.
  337. This means each session is closed as soon as possible: `A` can close while the
  338. slow `C` session is still working, because it knows there are no more messages
  339. on the way. Similarly, `C` will defer closing if `close()` is called while *m1*
  340. is still in `B`, and *m2* in `A`, because its pending count means it knows it
  341. has work yet to do, even if it's not received those messages yet. This concern
  342. cannot be addressed by extensions acting only on their own local state, unless
  343. we pollute individual extensions by making them all implement this same
  344. mechanism.
  345. The actual closing API at each level is slightly different:
  346. type Session = {
  347. close :: () -> ()
  348. }
  349. type Cell = {
  350. close :: () -> Promise ()
  351. }
  352. type Pipeline = {
  353. close :: Callback -> ()
  354. }
  355. This might appear inconsistent so it's worth explaining. Remember that a
  356. `Pipeline` holds a list of `Cell` objects, each wrapping a `Session`. The driver
  357. talks (via the `Extensions` API) to the `Pipeline` interface, and it wants
  358. `Pipeline.close()` to do two things: close all the sessions, and tell me when
  359. it's safe to start the closing procedure (i.e. when all messages have drained
  360. from the pipe and been handed off to the application or socket). A callback API
  361. works well for that.
  362. At the other end of the stack, `Session.close()` is a nullary void method with
  363. no callback or promise API because we don't care what it does, and whatever it
  364. does do will not block the WebSocket protocol; we're not going to hold off
  365. processing messages while a session closes its de/compression context. We just
  366. tell it to close itself, and don't want to wait while it does that.
  367. In the middle, `Cell.close()` returns a promise rather than using a callback.
  368. This is for two reasons. First, `Cell.close()` might not do anything
  369. immediately, it might have to defer its effect while messages drain. So, if
  370. given a callback, it would have to store it in a queue for later execution.
  371. Callbacks work fine if your method does something and can then invoke the
  372. callback itself, but if you need to store callbacks somewhere so another method
  373. can execute them, a promise is a better fit. Second, it better serves the
  374. purposes of `Pipeline.close()`: it wants to call `close()` on each of a list of
  375. cells, and wait for all of them to finish. This is simple and idiomatic using
  376. promises:
  377. ```js
  378. var closed = cells.map((cell) => cell.close());
  379. Promise.all(closed).then(callback);
  380. ```
  381. (We don't actually use a full *Promises/A+* compatible promise here, we use a
  382. much simplified construction that acts as a callback aggregater and resolves
  383. synchronously and does not support chaining, but the principle is the same.)
  384. ### Error handling
  385. We've not mentioned error handling so far but it bears some explanation. The
  386. above counter system still applies, but behaves slightly differently in the
  387. presence of errors.
  388. Say we push three messages into the pipe in the outgoing direction:
  389. +----------+
  390. m3, m2, m1 => | Pipeline |
  391. +-----o----+
  392. |
  393. +---------------+---------------+
  394. | | |
  395. +-----o----+ +-----o----+ +-----o----+
  396. | Cell [A] | | Cell [B] | | Cell [C] |
  397. +----------+ +----------+ +----------+
  398. in: 0 in: 0 in: 0
  399. out: 3 out: 3 out: 3
  400. They pass through the cells successfully up to this point:
  401. +----------+
  402. | Pipeline |
  403. +-----o----+
  404. |
  405. +---------------+---------------+
  406. m3 | m2 | m1 |
  407. +-----o----+ +-----o----+ +-----o----+
  408. | Cell [A] | | Cell [B] | | Cell [C] |
  409. +----------+ +----------+ +----------+
  410. in: 0 in: 0 in: 0
  411. out: 1 out: 2 out: 3
  412. At this point, session `B` produces an error while processing *m2*, that is *m2*
  413. becomes *e2*. *m1* is still in the pipeline, and *m3* is queued behind *m2*.
  414. What ought to happen is that *m1* is handed off to the socket, then *m2* is
  415. released to the driver, which will detect the error and begin closing the
  416. socket. No further processing should be done on *m3* and it should not be
  417. released to the driver after the error is emitted.
  418. To handle this, we allow errors to pass down the pipeline just like messages do,
  419. to maintain ordering. But, once a cell sees its session produce an error, or it
  420. receives an error from upstream, it should refuse to accept any further
  421. messages. Session `B` might have begun processing *m3* by the time it produces
  422. the error *e2*, but `C` will have been given *e2* before it receives *m3*, and
  423. can simply drop *m3*.
  424. Now, say *e2* reaches the slow session `C` while *m1* is still present,
  425. meanwhile *m3* has been dropped. `C` will never receive *m3* since it will have
  426. been dropped upstream. Under the present model, its `out` counter will be `3`
  427. but it is only going to emit two more values: *m1* and *e2*. In order for
  428. closing to work, we need to decrement `out` to reflect this. The situation
  429. should look like this:
  430. +----------+
  431. | Pipeline |
  432. +-----o----+
  433. |
  434. +---------------+---------------+
  435. | | e2 | m1
  436. +-----o----+ +-----o----+ +-----o----+
  437. | Cell [A] | | Cell [B] | | Cell [C] |
  438. +----------+ +----------+ +----------+
  439. in: 0 in: 0 in: 0
  440. out: 0 out: 0 out: 2
  441. When a cell sees its session emit an error, or when it receives an error from
  442. upstream, it sets its pending count in the appropriate direction to equal the
  443. number of messages it is *currently* processing. It will not accept any messages
  444. after it sees the error, so this will allow the counter to reach zero.
  445. Note that while *e2* is in the pipeline, `Pipeline` should drop any further
  446. messages in the outgoing direction, but should continue to accept incoming
  447. messages. Until *e2* makes it out of the pipe to the driver, behind previous
  448. successful messages, the driver does not know an error has happened, and a
  449. message may arrive over the socket and make it all the way through the incoming
  450. pipe in the meantime. We only halt processing in the affected direction to avoid
  451. doing unnecessary work since messages arriving after an error should not be
  452. processed.
  453. Some unnecessary work may happen, for example any messages already in the
  454. pipeline following *m2* will be processed by `A`, since it's upstream of the
  455. error. Those messages will be dropped by `B`.
  456. ## Alternative ideas
  457. I am considering implementing `Functor` as an object-mode transform stream
  458. rather than what is essentially an async function. Being object-mode, a stream
  459. would preserve message boundaries and would also possibly help address
  460. back-pressure. I'm not sure whether this would require external API changes so
  461. that such streams could be connected to the downstream driver's streams.
  462. ## Acknowledgements
  463. Credit is due to [@mnowster](https://github.com/mnowster) for helping with the
  464. design and to [@fronx](https://github.com/fronx) for helping name things.