diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2018-05-21 22:07:40 -0400 |
---|---|---|
committer | Ryan Dahl <ry@tinyclouds.org> | 2018-05-21 22:07:40 -0400 |
commit | 08307fb84160602da12ba4d1b118c859e8a73cdb (patch) | |
tree | b8f3bf7c4df3904937afc964ae36803567e78719 /dispatch.ts | |
parent | 9a6621659937c55c6005a1fa6ce9641a4ceff385 (diff) |
Add dispatch pub/sub
Diffstat (limited to 'dispatch.ts')
-rw-r--r-- | dispatch.ts | 61 |
1 files changed, 61 insertions, 0 deletions
diff --git a/dispatch.ts b/dispatch.ts new file mode 100644 index 000000000..859792b74 --- /dev/null +++ b/dispatch.ts @@ -0,0 +1,61 @@ +import { typedArrayToArrayBuffer } from "./util"; +import { _global } from "./globals"; +import { main as pb } from "./msg.pb"; + +type MessageCallback = (msg: Uint8Array) => void; + +const send = V8Worker2.send; +const channels = new Map<string, MessageCallback[]>(); + +export function sub(channel: string, cb: MessageCallback): void { + let subscribers = channels.get(channel); + if (!subscribers) { + subscribers = []; + channels.set(channel, subscribers); + } + subscribers.push(cb); +} + +export function pub(channel: string, payload: Uint8Array): null | ArrayBuffer { + const msg = pb.BaseMsg.fromObject({ channel, payload }); + const ui8 = pb.BaseMsg.encode(msg).finish(); + const ab = typedArrayToArrayBuffer(ui8); + return send(ab); +} + +// Internal version of "pub". +// TODO add internal version of "sub" +// TODO rename to pubInternal() +export function sendMsgFromObject( + channel: string, + obj: pb.IMsg +): null | pb.Msg { + const msg = pb.Msg.fromObject(obj); + const ui8 = pb.Msg.encode(msg).finish(); + const resBuf = pub(channel, ui8); + if (resBuf != null && resBuf.byteLength > 0) { + const res = pb.Msg.decode(new Uint8Array(resBuf)); + if (res != null && res.error != null && res.error.length > 0) { + throw Error(res.error); + } + return res; + } else { + return null; + } +} + +V8Worker2.recv((ab: ArrayBuffer) => { + const msg = pb.BaseMsg.decode(new Uint8Array(ab)); + const subscribers = channels.get(msg.channel); + if (subscribers == null) { + throw Error(`No subscribers for channel "${msg.channel}".`); + } + + for (const subscriber of subscribers) { + subscriber(msg.payload); + } +}); + +// Delete the V8Worker2 from the global object, so that no one else can receive +// messages. +_global["V8Worker2"] = null; |