summaryrefslogtreecommitdiff
path: root/cli/worker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/worker.rs')
-rw-r--r--cli/worker.rs102
1 files changed, 40 insertions, 62 deletions
diff --git a/cli/worker.rs b/cli/worker.rs
index 20b8b8021..b804ff449 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -24,76 +24,55 @@ use std::task::Poll;
use tokio::sync::Mutex as AsyncMutex;
use url::Url;
-/// Wraps mpsc channels so they can be referenced
-/// from ops and used to facilitate parent-child communication
-/// for workers.
-#[derive(Clone)]
-pub struct WorkerChannels {
- pub sender: mpsc::Sender<Buf>,
- pub receiver: Arc<AsyncMutex<mpsc::Receiver<Buf>>>,
+/// Events that are sent to host from child
+/// worker.
+pub enum WorkerEvent {
+ Message(Buf),
+ Error(ErrBox),
}
-impl WorkerChannels {
- /// Post message to worker as a host.
- pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> {
- let mut sender = self.sender.clone();
- sender.send(buf).map_err(ErrBox::from).await
- }
-
- /// Get message from worker as a host.
- pub fn get_message(&self) -> Pin<Box<dyn Future<Output = Option<Buf>>>> {
- let receiver_mutex = self.receiver.clone();
-
- async move {
- let mut receiver = receiver_mutex.lock().await;
- receiver.next().await
- }
- .boxed_local()
- }
+pub struct WorkerChannelsInternal {
+ pub sender: mpsc::Sender<WorkerEvent>,
+ pub receiver: mpsc::Receiver<Buf>,
}
-pub struct WorkerChannelsInternal(WorkerChannels);
-
-impl Deref for WorkerChannelsInternal {
- type Target = WorkerChannels;
- fn deref(&self) -> &Self::Target {
- &self.0
- }
+#[derive(Clone)]
+pub struct WorkerHandle {
+ pub sender: mpsc::Sender<Buf>,
+ pub receiver: Arc<AsyncMutex<mpsc::Receiver<WorkerEvent>>>,
+ // terminate_channel
}
-impl DerefMut for WorkerChannelsInternal {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.0
+impl WorkerHandle {
+ pub fn terminate(&self) {
+ todo!()
}
-}
-
-#[derive(Clone)]
-pub struct WorkerChannelsExternal(WorkerChannels);
-impl Deref for WorkerChannelsExternal {
- type Target = WorkerChannels;
- fn deref(&self) -> &Self::Target {
- &self.0
+ /// Post message to worker as a host.
+ pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> {
+ let mut sender = self.sender.clone();
+ sender.send(buf).map_err(ErrBox::from).await
}
-}
-impl DerefMut for WorkerChannelsExternal {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.0
+ // TODO: should use `try_lock` and return error if
+ // more than one listener tries to get event
+ pub async fn get_event(&self) -> Option<WorkerEvent> {
+ let mut receiver = self.receiver.lock().await;
+ receiver.next().await
}
}
-fn create_channels() -> (WorkerChannelsInternal, WorkerChannelsExternal) {
+fn create_channels() -> (WorkerChannelsInternal, WorkerHandle) {
let (in_tx, in_rx) = mpsc::channel::<Buf>(1);
- let (out_tx, out_rx) = mpsc::channel::<Buf>(1);
- let internal_channels = WorkerChannelsInternal(WorkerChannels {
+ let (out_tx, out_rx) = mpsc::channel::<WorkerEvent>(1);
+ let internal_channels = WorkerChannelsInternal {
sender: out_tx,
- receiver: Arc::new(AsyncMutex::new(in_rx)),
- });
- let external_channels = WorkerChannelsExternal(WorkerChannels {
+ receiver: in_rx,
+ };
+ let external_channels = WorkerHandle {
sender: in_tx,
receiver: Arc::new(AsyncMutex::new(out_rx)),
- });
+ };
(internal_channels, external_channels)
}
@@ -113,7 +92,9 @@ pub struct Worker {
pub name: String,
pub isolate: Box<deno_core::EsIsolate>,
pub state: State,
- external_channels: WorkerChannelsExternal,
+ pub waker: AtomicWaker,
+ pub(crate) internal_channels: WorkerChannelsInternal,
+ external_channels: WorkerHandle,
}
impl Worker {
@@ -127,15 +108,13 @@ impl Worker {
});
let (internal_channels, external_channels) = create_channels();
- {
- let mut state = state.borrow_mut();
- state.worker_channels_internal = Some(internal_channels);
- }
Self {
name,
isolate,
state,
+ waker: AtomicWaker::new(),
+ internal_channels,
external_channels,
}
}
@@ -174,7 +153,7 @@ impl Worker {
}
/// Returns a way to communicate with the Worker from other threads.
- pub fn thread_safe_handle(&self) -> WorkerChannelsExternal {
+ pub fn thread_safe_handle(&self) -> WorkerHandle {
self.external_channels.clone()
}
}
@@ -184,8 +163,7 @@ impl Future for Worker {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
- let waker = AtomicWaker::new();
- waker.register(cx.waker());
+ inner.waker.register(cx.waker());
inner.isolate.poll_unpin(cx)
}
}
@@ -224,7 +202,7 @@ impl MainWorker {
ops::signal::init(isolate, &state);
ops::timers::init(isolate, &state);
ops::worker_host::init(isolate, &state);
- ops::web_worker::init(isolate, &state);
+ ops::web_worker::init(isolate, &state, &worker.internal_channels.sender);
}
Self(worker)
}