diff options
Diffstat (limited to 'cli/worker.rs')
-rw-r--r-- | cli/worker.rs | 102 |
1 files changed, 40 insertions, 62 deletions
diff --git a/cli/worker.rs b/cli/worker.rs index 20b8b8021..b804ff449 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -24,76 +24,55 @@ use std::task::Poll; use tokio::sync::Mutex as AsyncMutex; use url::Url; -/// Wraps mpsc channels so they can be referenced -/// from ops and used to facilitate parent-child communication -/// for workers. -#[derive(Clone)] -pub struct WorkerChannels { - pub sender: mpsc::Sender<Buf>, - pub receiver: Arc<AsyncMutex<mpsc::Receiver<Buf>>>, +/// Events that are sent to host from child +/// worker. +pub enum WorkerEvent { + Message(Buf), + Error(ErrBox), } -impl WorkerChannels { - /// Post message to worker as a host. - pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> { - let mut sender = self.sender.clone(); - sender.send(buf).map_err(ErrBox::from).await - } - - /// Get message from worker as a host. - pub fn get_message(&self) -> Pin<Box<dyn Future<Output = Option<Buf>>>> { - let receiver_mutex = self.receiver.clone(); - - async move { - let mut receiver = receiver_mutex.lock().await; - receiver.next().await - } - .boxed_local() - } +pub struct WorkerChannelsInternal { + pub sender: mpsc::Sender<WorkerEvent>, + pub receiver: mpsc::Receiver<Buf>, } -pub struct WorkerChannelsInternal(WorkerChannels); - -impl Deref for WorkerChannelsInternal { - type Target = WorkerChannels; - fn deref(&self) -> &Self::Target { - &self.0 - } +#[derive(Clone)] +pub struct WorkerHandle { + pub sender: mpsc::Sender<Buf>, + pub receiver: Arc<AsyncMutex<mpsc::Receiver<WorkerEvent>>>, + // terminate_channel } -impl DerefMut for WorkerChannelsInternal { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 +impl WorkerHandle { + pub fn terminate(&self) { + todo!() } -} - -#[derive(Clone)] -pub struct WorkerChannelsExternal(WorkerChannels); -impl Deref for WorkerChannelsExternal { - type Target = WorkerChannels; - fn deref(&self) -> &Self::Target { - &self.0 + /// Post message to worker as a host. + pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> { + let mut sender = self.sender.clone(); + sender.send(buf).map_err(ErrBox::from).await } -} -impl DerefMut for WorkerChannelsExternal { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 + // TODO: should use `try_lock` and return error if + // more than one listener tries to get event + pub async fn get_event(&self) -> Option<WorkerEvent> { + let mut receiver = self.receiver.lock().await; + receiver.next().await } } -fn create_channels() -> (WorkerChannelsInternal, WorkerChannelsExternal) { +fn create_channels() -> (WorkerChannelsInternal, WorkerHandle) { let (in_tx, in_rx) = mpsc::channel::<Buf>(1); - let (out_tx, out_rx) = mpsc::channel::<Buf>(1); - let internal_channels = WorkerChannelsInternal(WorkerChannels { + let (out_tx, out_rx) = mpsc::channel::<WorkerEvent>(1); + let internal_channels = WorkerChannelsInternal { sender: out_tx, - receiver: Arc::new(AsyncMutex::new(in_rx)), - }); - let external_channels = WorkerChannelsExternal(WorkerChannels { + receiver: in_rx, + }; + let external_channels = WorkerHandle { sender: in_tx, receiver: Arc::new(AsyncMutex::new(out_rx)), - }); + }; (internal_channels, external_channels) } @@ -113,7 +92,9 @@ pub struct Worker { pub name: String, pub isolate: Box<deno_core::EsIsolate>, pub state: State, - external_channels: WorkerChannelsExternal, + pub waker: AtomicWaker, + pub(crate) internal_channels: WorkerChannelsInternal, + external_channels: WorkerHandle, } impl Worker { @@ -127,15 +108,13 @@ impl Worker { }); let (internal_channels, external_channels) = create_channels(); - { - let mut state = state.borrow_mut(); - state.worker_channels_internal = Some(internal_channels); - } Self { name, isolate, state, + waker: AtomicWaker::new(), + internal_channels, external_channels, } } @@ -174,7 +153,7 @@ impl Worker { } /// Returns a way to communicate with the Worker from other threads. - pub fn thread_safe_handle(&self) -> WorkerChannelsExternal { + pub fn thread_safe_handle(&self) -> WorkerHandle { self.external_channels.clone() } } @@ -184,8 +163,7 @@ impl Future for Worker { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { let inner = self.get_mut(); - let waker = AtomicWaker::new(); - waker.register(cx.waker()); + inner.waker.register(cx.waker()); inner.isolate.poll_unpin(cx) } } @@ -224,7 +202,7 @@ impl MainWorker { ops::signal::init(isolate, &state); ops::timers::init(isolate, &state); ops::worker_host::init(isolate, &state); - ops::web_worker::init(isolate, &state); + ops::web_worker::init(isolate, &state, &worker.internal_channels.sender); } Self(worker) } |