summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2018-05-21 22:07:40 -0400
committerRyan Dahl <ry@tinyclouds.org>2018-05-21 22:07:40 -0400
commit08307fb84160602da12ba4d1b118c859e8a73cdb (patch)
treeb8f3bf7c4df3904937afc964ae36803567e78719
parent9a6621659937c55c6005a1fa6ce9641a4ceff385 (diff)
Add dispatch pub/sub
-rw-r--r--Makefile13
-rw-r--r--dispatch.go90
-rw-r--r--dispatch.ts61
-rw-r--r--handlers.go59
-rw-r--r--main.go35
-rw-r--r--main.ts46
-rw-r--r--msg.proto5
-rw-r--r--os.ts32
-rw-r--r--timers.ts27
9 files changed, 244 insertions, 124 deletions
diff --git a/Makefile b/Makefile
index 45d402c99..86a579d66 100644
--- a/Makefile
+++ b/Makefile
@@ -1,5 +1,6 @@
TS_FILES = \
tsconfig.json \
+ dispatch.ts \
main.ts \
msg.pb.d.ts \
msg.pb.js \
@@ -10,7 +11,17 @@ TS_FILES = \
util.ts \
v8_source_maps.ts
-deno: assets.go msg.pb.go main.go
+GO_FILES = \
+ assets.go \
+ deno_dir.go \
+ dispatch.go \
+ handlers.go \
+ main.go \
+ main_test.go \
+ msg.pb.go \
+ util.go
+
+deno: $(GO_FILES)
go build -o deno
assets.go: dist/main.js
diff --git a/dispatch.go b/dispatch.go
new file mode 100644
index 000000000..9ac12d25c
--- /dev/null
+++ b/dispatch.go
@@ -0,0 +1,90 @@
+package main
+
+import (
+ "github.com/golang/protobuf/proto"
+ "github.com/ry/v8worker2"
+ "sync"
+)
+
+// There is a single global worker for this process.
+// This file should be the only part of deno that directly access it, so that
+// all interaction with V8 can go through a single point.
+var worker *v8worker2.Worker
+
+var channels = make(map[string][]Subscriber)
+
+type Subscriber func(payload []byte) []byte
+
+func createWorker() {
+ worker = v8worker2.New(recv)
+}
+
+func recv(buf []byte) (response []byte) {
+ msg := &BaseMsg{}
+ check(proto.Unmarshal(buf, msg))
+ assert(len(msg.Payload) > 0, "BaseMsg has empty payload.")
+ subscribers, ok := channels[msg.Channel]
+ if !ok {
+ panic("No subscribers for channel " + msg.Channel)
+ }
+ for i := 0; i < len(subscribers); i++ {
+ s := subscribers[i]
+ r := s(msg.Payload)
+ if r != nil {
+ response = r
+ }
+ }
+ return response
+}
+
+func Sub(channel string, cb Subscriber) {
+ subscribers, ok := channels[channel]
+ if !ok {
+ subscribers = make([]Subscriber, 0)
+ }
+ subscribers = append(subscribers, cb)
+ channels[channel] = subscribers
+}
+
+func Pub(channel string, payload []byte) {
+ resChan <- &BaseMsg{
+ Channel: channel,
+ Payload: payload,
+ }
+}
+
+var resChan = make(chan *BaseMsg, 10)
+var doneChan = make(chan bool)
+var wg sync.WaitGroup
+
+func DispatchLoop() {
+ wg.Add(1)
+ first := true
+
+ // In a goroutine, we wait on for all goroutines to complete (for example
+ // timers). We use this to signal to the main thread to exit.
+ go func() {
+ wg.Wait()
+ doneChan <- true
+ }()
+
+ for {
+ select {
+ case msg := <-resChan:
+ out, err := proto.Marshal(msg)
+ err = worker.SendBytes(out)
+ check(err)
+ case <-doneChan:
+ // All goroutines have completed. Now we can exit main().
+ return
+ }
+
+ // We don't want to exit until we've received at least one message.
+ // This is so the program doesn't exit after sending the "start"
+ // message.
+ if first {
+ wg.Done()
+ }
+ first = false
+ }
+}
diff --git a/dispatch.ts b/dispatch.ts
new file mode 100644
index 000000000..859792b74
--- /dev/null
+++ b/dispatch.ts
@@ -0,0 +1,61 @@
+import { typedArrayToArrayBuffer } from "./util";
+import { _global } from "./globals";
+import { main as pb } from "./msg.pb";
+
+type MessageCallback = (msg: Uint8Array) => void;
+
+const send = V8Worker2.send;
+const channels = new Map<string, MessageCallback[]>();
+
+export function sub(channel: string, cb: MessageCallback): void {
+ let subscribers = channels.get(channel);
+ if (!subscribers) {
+ subscribers = [];
+ channels.set(channel, subscribers);
+ }
+ subscribers.push(cb);
+}
+
+export function pub(channel: string, payload: Uint8Array): null | ArrayBuffer {
+ const msg = pb.BaseMsg.fromObject({ channel, payload });
+ const ui8 = pb.BaseMsg.encode(msg).finish();
+ const ab = typedArrayToArrayBuffer(ui8);
+ return send(ab);
+}
+
+// Internal version of "pub".
+// TODO add internal version of "sub"
+// TODO rename to pubInternal()
+export function sendMsgFromObject(
+ channel: string,
+ obj: pb.IMsg
+): null | pb.Msg {
+ const msg = pb.Msg.fromObject(obj);
+ const ui8 = pb.Msg.encode(msg).finish();
+ const resBuf = pub(channel, ui8);
+ if (resBuf != null && resBuf.byteLength > 0) {
+ const res = pb.Msg.decode(new Uint8Array(resBuf));
+ if (res != null && res.error != null && res.error.length > 0) {
+ throw Error(res.error);
+ }
+ return res;
+ } else {
+ return null;
+ }
+}
+
+V8Worker2.recv((ab: ArrayBuffer) => {
+ const msg = pb.BaseMsg.decode(new Uint8Array(ab));
+ const subscribers = channels.get(msg.channel);
+ if (subscribers == null) {
+ throw Error(`No subscribers for channel "${msg.channel}".`);
+ }
+
+ for (const subscriber of subscribers) {
+ subscriber(msg.payload);
+ }
+});
+
+// Delete the V8Worker2 from the global object, so that no one else can receive
+// messages.
+_global["V8Worker2"] = null;
diff --git a/handlers.go b/handlers.go
index d65361595..237aa822e 100644
--- a/handlers.go
+++ b/handlers.go
@@ -10,29 +10,38 @@ import (
const assetPrefix string = "/$asset$/"
-func recv(buf []byte) []byte {
- msg := &Msg{}
- err := proto.Unmarshal(buf, msg)
- check(err)
- switch msg.Payload.(type) {
- case *Msg_Exit:
- payload := msg.GetExit()
- os.Exit(int(payload.Code))
- case *Msg_SourceCodeFetch:
- payload := msg.GetSourceCodeFetch()
- return HandleSourceCodeFetch(payload.ModuleSpecifier, payload.ContainingFile)
- case *Msg_SourceCodeCache:
- payload := msg.GetSourceCodeCache()
- return HandleSourceCodeCache(payload.Filename, payload.SourceCode,
- payload.OutputCode)
- case *Msg_TimerStart:
- payload := msg.GetTimerStart()
- return HandleTimerStart(payload.Id, payload.Interval, payload.Duration)
- default:
- panic("Unexpected message")
- }
+func InitHandlers() {
+ Sub("os", func(buf []byte) []byte {
+ msg := &Msg{}
+ check(proto.Unmarshal(buf, msg))
+ switch msg.Payload.(type) {
+ case *Msg_Exit:
+ payload := msg.GetExit()
+ os.Exit(int(payload.Code))
+ case *Msg_SourceCodeFetch:
+ payload := msg.GetSourceCodeFetch()
+ return HandleSourceCodeFetch(payload.ModuleSpecifier, payload.ContainingFile)
+ case *Msg_SourceCodeCache:
+ payload := msg.GetSourceCodeCache()
+ return HandleSourceCodeCache(payload.Filename, payload.SourceCode,
+ payload.OutputCode)
+ default:
+ panic("[os] Unexpected message " + string(buf))
+ }
+ return nil
+ })
- return nil
+ Sub("timers", func(buf []byte) []byte {
+ msg := &Msg{}
+ check(proto.Unmarshal(buf, msg))
+ switch msg.Payload.(type) {
+ case *Msg_TimerStart:
+ payload := msg.GetTimerStart()
+ return HandleTimerStart(payload.Id, payload.Interval, payload.Duration)
+ default:
+ panic("[timers] Unexpected message " + string(buf))
+ }
+ })
}
func HandleSourceCodeFetch(moduleSpecifier string, containingFile string) (out []byte) {
@@ -107,13 +116,15 @@ func HandleTimerStart(id int32, interval bool, duration int32) []byte {
go func() {
defer wg.Done()
time.Sleep(time.Duration(duration) * time.Millisecond)
- resChan <- &Msg{
+ payload, err := proto.Marshal(&Msg{
Payload: &Msg_TimerReady{
TimerReady: &TimerReadyMsg{
Id: id,
},
},
- }
+ })
+ check(err)
+ Pub("timers", payload)
}()
return nil
}
diff --git a/main.go b/main.go
index 9b4b6c120..409a7940d 100644
--- a/main.go
+++ b/main.go
@@ -8,7 +8,6 @@ import (
"net/url"
"os"
"path"
- "sync"
)
var flagReload = flag.Bool("reload", false, "Reload cached remote source code.")
@@ -19,9 +18,6 @@ var DenoDir string
var CompileDir string
var SrcDir string
-var wg sync.WaitGroup
-var resChan chan *Msg
-
func ResolveModule(moduleSpecifier string, containingFile string) (
moduleName string, filename string, err error) {
moduleUrl, err := url.Parse(moduleSpecifier)
@@ -58,7 +54,8 @@ func main() {
args = v8worker2.SetFlags(args)
createDirs()
- worker := v8worker2.New(recv)
+ createWorker()
+ InitHandlers()
main_js := stringAsset("main.js")
check(worker.Load("/main.js", main_js))
@@ -67,9 +64,6 @@ func main() {
cwd, err := os.Getwd()
check(err)
- resChan = make(chan *Msg)
- doneChan := make(chan bool)
-
out, err := proto.Marshal(&Msg{
Payload: &Msg_Start{
Start: &StartMsg{
@@ -82,28 +76,7 @@ func main() {
},
})
check(err)
- err = worker.SendBytes(out)
- if err != nil {
- os.Stderr.WriteString(err.Error())
- os.Exit(1)
- }
-
- // In a goroutine, we wait on for all goroutines to complete (for example
- // timers). We use this to signal to the main thread to exit.
- go func() {
- wg.Wait()
- doneChan <- true
- }()
+ Pub("start", out)
- for {
- select {
- case msg := <-resChan:
- out, err := proto.Marshal(msg)
- err = worker.SendBytes(out)
- check(err)
- case <-doneChan:
- // All goroutines have completed. Now we can exit main().
- return
- }
- }
+ DispatchLoop()
}
diff --git a/main.ts b/main.ts
index 28c1ecdff..4767f51ec 100644
--- a/main.ts
+++ b/main.ts
@@ -1,47 +1,33 @@
+import * as dispatch from "./dispatch";
import { main as pb } from "./msg.pb";
-import "./util";
+
import * as runtime from "./runtime";
-import * as timers from "./timers";
import * as util from "./util";
+// These have top-level functions that need to execute.
+import { initTimers } from "./timers";
+
// To control internal logging output
// Set with the -debug command-line flag.
export let debug = false;
+let startCalled = false;
+
+dispatch.sub("start", (payload: Uint8Array) => {
+ if (startCalled) {
+ throw Error("start message received more than once!");
+ }
+ startCalled = true;
+
+ const msg = pb.Msg.decode(payload);
+ const { cwd, argv, debugFlag, mainJs, mainMap } = msg.start;
-function start(
- cwd: string,
- argv: string[],
- debugFlag: boolean,
- mainJs: string,
- mainMap: string
-): void {
debug = debugFlag;
util.log("start", { cwd, argv, debugFlag });
+ initTimers();
runtime.setup(mainJs, mainMap);
const inputFn = argv[0];
const mod = runtime.resolveModule(inputFn, cwd + "/");
mod.compileAndRun();
-}
-
-V8Worker2.recv((ab: ArrayBuffer) => {
- const msg = pb.Msg.decode(new Uint8Array(ab));
- switch (msg.payload) {
- case "start":
- start(
- msg.start.cwd,
- msg.start.argv,
- msg.start.debugFlag,
- msg.start.mainJs,
- msg.start.mainMap
- );
- break;
- case "timerReady":
- timers.timerReady(msg.timerReady.id, msg.timerReady.done);
- break;
- default:
- console.log("Unknown message", msg);
- break;
- }
});
diff --git a/msg.proto b/msg.proto
index 821d00a20..3d90dc380 100644
--- a/msg.proto
+++ b/msg.proto
@@ -1,6 +1,11 @@
syntax = "proto3";
package main;
+message BaseMsg {
+ string channel = 1;
+ bytes payload = 2;
+}
+
message Msg {
string error = 1;
oneof payload {
diff --git a/os.ts b/os.ts
index 82baac25d..b2f9e93ba 100644
--- a/os.ts
+++ b/os.ts
@@ -1,18 +1,15 @@
-import { main as pb } from "./msg.pb";
import { ModuleInfo } from "./types";
-import { typedArrayToArrayBuffer } from "./util";
+import { sendMsgFromObject } from "./dispatch";
export function exit(code = 0): void {
- sendMsgFromObject({
- exit: { code }
- });
+ sendMsgFromObject("os", { exit: { code } });
}
export function sourceCodeFetch(
moduleSpecifier: string,
containingFile: string
): ModuleInfo {
- const res = sendMsgFromObject({
+ const res = sendMsgFromObject("os", {
sourceCodeFetch: { moduleSpecifier, containingFile }
});
return res.sourceCodeFetchRes;
@@ -23,28 +20,7 @@ export function sourceCodeCache(
sourceCode: string,
outputCode: string
): void {
- const res = sendMsgFromObject({
+ sendMsgFromObject("os", {
sourceCodeCache: { filename, sourceCode, outputCode }
});
- throwOnError(res);
-}
-
-export function sendMsgFromObject(obj: pb.IMsg): null | pb.Msg {
- const msg = pb.Msg.fromObject(obj);
- const ui8 = pb.Msg.encode(msg).finish();
- const ab = typedArrayToArrayBuffer(ui8);
- const resBuf = V8Worker2.send(ab);
- if (resBuf != null && resBuf.byteLength > 0) {
- const res = pb.Msg.decode(new Uint8Array(resBuf));
- throwOnError(res);
- return res;
- } else {
- return null;
- }
-}
-
-function throwOnError(res: pb.Msg) {
- if (res != null && res.error != null && res.error.length > 0) {
- throw Error(res.error);
- }
}
diff --git a/timers.ts b/timers.ts
index 6603b3d16..117fde2a8 100644
--- a/timers.ts
+++ b/timers.ts
@@ -1,4 +1,5 @@
-import { sendMsgFromObject } from "./os";
+import { main as pb } from "./msg.pb";
+import * as dispatch from "./dispatch";
let nextTimerId = 1;
@@ -14,6 +15,20 @@ interface Timer {
const timers = new Map<number, Timer>();
+export function initTimers() {
+ dispatch.sub("timers", onMessage);
+}
+
+function onMessage(payload: Uint8Array) {
+ const msg = pb.Msg.decode(payload);
+ const { id, done } = msg.timerReady;
+ const timer = timers.get(id);
+ timer.cb();
+ if (done) {
+ timers.delete(id);
+ }
+}
+
export function setTimeout(cb: TimerCallback, duration: number): number {
const timer = {
id: nextTimerId++,
@@ -22,7 +37,7 @@ export function setTimeout(cb: TimerCallback, duration: number): number {
cb
};
timers.set(timer.id, timer);
- sendMsgFromObject({
+ dispatch.sendMsgFromObject("timers", {
timerStart: {
id: timer.id,
interval: false,
@@ -31,11 +46,3 @@ export function setTimeout(cb: TimerCallback, duration: number): number {
});
return timer.id;
}
-
-export function timerReady(id: number, done: boolean): void {
- const timer = timers.get(id);
- timer.cb();
- if (done) {
- timers.delete(id);
- }
-}