1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
// Copyright 2018 Ryan Dahl <ry@tinyclouds.org>
// All rights reserved. MIT License.
package main
import (
"github.com/golang/protobuf/proto"
"github.com/ry/v8worker2"
"sync"
)
var resChan = make(chan *BaseMsg, 10)
var doneChan = make(chan bool)
var wg sync.WaitGroup
var stats struct {
v8workerSend int
v8workerRespond int
v8workerRecv int
v8workerBytesSent int
v8workerBytesRecv int
}
// There is a single global worker for this process.
// This file should be the only part of deno that directly access it, so that
// all interaction with V8 can go through a single point.
var worker *v8worker2.Worker
var channels = make(map[string][]Subscriber)
type Subscriber func(payload []byte) []byte
func createWorker() {
worker = v8worker2.New(recv)
}
func recv(buf []byte) (response []byte) {
stats.v8workerRecv++
stats.v8workerBytesRecv += len(buf)
msg := &BaseMsg{}
check(proto.Unmarshal(buf, msg))
assert(len(msg.Payload) > 0, "BaseMsg has empty payload.")
subscribers, ok := channels[msg.Channel]
if !ok {
panic("No subscribers for channel " + msg.Channel)
}
for i := 0; i < len(subscribers); i++ {
s := subscribers[i]
r := s(msg.Payload)
if r != nil {
response = r
}
}
if response != nil {
stats.v8workerRespond++
stats.v8workerBytesSent += len(response)
}
return response
}
func Sub(channel string, cb Subscriber) {
subscribers, ok := channels[channel]
if !ok {
subscribers = make([]Subscriber, 0)
}
subscribers = append(subscribers, cb)
channels[channel] = subscribers
}
func Pub(channel string, payload []byte) {
wg.Add(1)
resChan <- &BaseMsg{
Channel: channel,
Payload: payload,
}
}
func PubMsg(channel string, msg *Msg) {
payload, err := proto.Marshal(msg)
check(err)
Pub(channel, payload)
}
func DispatchLoop() {
wg.Add(1)
first := true
// In a goroutine, we wait on for all goroutines to complete (for example
// timers). We use this to signal to the main thread to exit.
// wg.Add(1) basically translates to uv_ref, if this was Node.
// wg.Done() basically translates to uv_unref
go func() {
wg.Wait()
doneChan <- true
}()
for {
select {
case msg := <-resChan:
out, err := proto.Marshal(msg)
if err != nil {
panic(err)
}
err = worker.SendBytes(out)
stats.v8workerSend++
stats.v8workerBytesSent += len(out)
exitOnError(err)
wg.Done() // Corresponds to the wg.Add(1) in Pub().
case <-doneChan:
// All goroutines have completed. Now we can exit main().
checkChanEmpty()
return
}
// We don't want to exit until we've received at least one message.
// This is so the program doesn't exit after sending the "start"
// message.
if first {
wg.Done()
}
first = false
}
}
func checkChanEmpty() {
// We've received a done event. As a sanity check, make sure that resChan is
// empty.
select {
case _, ok := <-resChan:
if ok {
panic("Read a message from resChan after doneChan closed.")
} else {
panic("resChan closed. Unexpected.")
}
default:
// No value ready, moving on.
}
}
|