summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2022-03-08 20:10:34 +0530
committerGitHub <noreply@github.com>2022-03-08 20:10:34 +0530
commite166d7eed0e2ccda15e19eda016f9a188c4e1dbd (patch)
tree693b7bef98c6debe40e0b719c00a1b16c83a96cb
parent303d691a169e61a0ab38be48cdd2387c7573ed08 (diff)
feat(core): Event loop middlewares for Extensions (#13816)
-rw-r--r--core/examples/schedule_task.rs66
-rw-r--r--core/extensions.rs29
-rw-r--r--core/runtime.rs22
3 files changed, 117 insertions, 0 deletions
diff --git a/core/examples/schedule_task.rs b/core/examples/schedule_task.rs
new file mode 100644
index 000000000..2f4909b4f
--- /dev/null
+++ b/core/examples/schedule_task.rs
@@ -0,0 +1,66 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+use deno_core::anyhow::Error;
+use deno_core::Extension;
+use deno_core::JsRuntime;
+use deno_core::OpState;
+use deno_core::RuntimeOptions;
+use futures::channel::mpsc;
+use futures::stream::StreamExt;
+use std::task::Poll;
+
+type Task = Box<dyn FnOnce()>;
+
+fn main() {
+ let my_ext = Extension::builder()
+ .ops(vec![(
+ "op_schedule_task",
+ deno_core::op_sync(op_schedule_task),
+ )])
+ .event_loop_middleware(|state, cx| {
+ let recv = state.borrow_mut::<mpsc::UnboundedReceiver<Task>>();
+ let mut ref_loop = false;
+ while let Poll::Ready(Some(call)) = recv.poll_next_unpin(cx) {
+ call();
+ ref_loop = true; // `call` can callback into runtime and schedule new callbacks :-)
+ }
+ ref_loop
+ })
+ .state(move |state| {
+ let (tx, rx) = mpsc::unbounded::<Task>();
+ state.put(tx);
+ state.put(rx);
+
+ Ok(())
+ })
+ .build();
+
+ // Initialize a runtime instance
+ let mut js_runtime = JsRuntime::new(RuntimeOptions {
+ extensions: vec![my_ext],
+ ..Default::default()
+ });
+ let runtime = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+
+ let future = async move {
+ // Schedule 10 tasks.
+ js_runtime
+ .execute_script(
+ "<usage>",
+ r#"for (let i = 1; i <= 10; i++) Deno.core.opSync("op_schedule_task", i);"#
+ )
+ .unwrap();
+ js_runtime.run_event_loop(false).await
+ };
+ runtime.block_on(future).unwrap();
+}
+
+fn op_schedule_task(state: &mut OpState, i: u8, _: ()) -> Result<(), Error> {
+ let tx = state.borrow_mut::<mpsc::UnboundedSender<Task>>();
+ tx.unbounded_send(Box::new(move || println!("Hello, world! x{}", i)))
+ .expect("unbounded_send failed");
+ Ok(())
+}
diff --git a/core/extensions.rs b/core/extensions.rs
index 587a2d061..031cb073a 100644
--- a/core/extensions.rs
+++ b/core/extensions.rs
@@ -1,12 +1,14 @@
use crate::OpFn;
use crate::OpState;
use anyhow::Error;
+use std::task::Context;
pub type SourcePair = (&'static str, Box<SourceLoadFn>);
pub type SourceLoadFn = dyn Fn() -> Result<String, Error>;
pub type OpPair = (&'static str, Box<OpFn>);
pub type OpMiddlewareFn = dyn Fn(&'static str, Box<OpFn>) -> Box<OpFn>;
pub type OpStateFn = dyn Fn(&mut OpState) -> Result<(), Error>;
+pub type OpEventLoopFn = dyn Fn(&mut OpState, &mut Context) -> bool;
#[derive(Default)]
pub struct Extension {
@@ -14,6 +16,7 @@ pub struct Extension {
ops: Option<Vec<OpPair>>,
opstate_fn: Option<Box<OpStateFn>>,
middleware_fn: Option<Box<OpMiddlewareFn>>,
+ event_loop_middleware: Option<Box<OpEventLoopFn>>,
initialized: bool,
}
@@ -56,6 +59,22 @@ impl Extension {
pub fn init_middleware(&mut self) -> Option<Box<OpMiddlewareFn>> {
self.middleware_fn.take()
}
+
+ pub fn init_event_loop_middleware(&mut self) -> Option<Box<OpEventLoopFn>> {
+ self.event_loop_middleware.take()
+ }
+
+ pub fn run_event_loop_middleware(
+ &self,
+ op_state: &mut OpState,
+ cx: &mut Context,
+ ) -> bool {
+ self
+ .event_loop_middleware
+ .as_ref()
+ .map(|f| f(op_state, cx))
+ .unwrap_or(false)
+ }
}
// Provides a convenient builder pattern to declare Extensions
@@ -65,6 +84,7 @@ pub struct ExtensionBuilder {
ops: Vec<OpPair>,
state: Option<Box<OpStateFn>>,
middleware: Option<Box<OpMiddlewareFn>>,
+ event_loop_middleware: Option<Box<OpEventLoopFn>>,
}
impl ExtensionBuilder {
@@ -94,6 +114,14 @@ impl ExtensionBuilder {
self
}
+ pub fn event_loop_middleware<F>(&mut self, middleware_fn: F) -> &mut Self
+ where
+ F: Fn(&mut OpState, &mut Context) -> bool + 'static,
+ {
+ self.event_loop_middleware = Some(Box::new(middleware_fn));
+ self
+ }
+
pub fn build(&mut self) -> Extension {
let js_files = Some(std::mem::take(&mut self.js));
let ops = Some(std::mem::take(&mut self.ops));
@@ -102,6 +130,7 @@ impl ExtensionBuilder {
ops,
opstate_fn: self.state.take(),
middleware_fn: self.middleware.take(),
+ event_loop_middleware: self.event_loop_middleware.take(),
initialized: false,
}
}
diff --git a/core/runtime.rs b/core/runtime.rs
index b0ab92f46..42b66e13f 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -5,6 +5,7 @@ use crate::error::attach_handle_to_error;
use crate::error::generic_error;
use crate::error::ErrWithV8Handle;
use crate::error::JsError;
+use crate::extensions::OpEventLoopFn;
use crate::inspector::JsRuntimeInspector;
use crate::module_specifier::ModuleSpecifier;
use crate::modules::ModuleId;
@@ -80,6 +81,7 @@ pub struct JsRuntime {
has_snapshotted: bool,
allocations: IsolateAllocations,
extensions: Vec<Extension>,
+ event_loop_middlewares: Vec<Box<OpEventLoopFn>>,
}
struct DynImportModEvaluate {
@@ -381,6 +383,7 @@ impl JsRuntime {
snapshot_creator: maybe_snapshot_creator,
has_snapshotted: false,
allocations: IsolateAllocations::default(),
+ event_loop_middlewares: Vec::with_capacity(options.extensions.len()),
extensions: options.extensions,
};
@@ -481,6 +484,10 @@ impl JsRuntime {
for (name, opfn) in ops {
self.register_op(name, macroware(name, opfn));
}
+
+ if let Some(middleware) = e.init_event_loop_middleware() {
+ self.event_loop_middlewares.push(middleware);
+ }
}
// Restore extensions
self.extensions = extensions;
@@ -788,6 +795,18 @@ impl JsRuntime {
self.check_promise_exceptions()?;
}
+ // Event loop middlewares
+ let mut maybe_scheduling = false;
+ {
+ let state = state_rc.borrow();
+ let op_state = state.op_state.clone();
+ for f in &self.event_loop_middlewares {
+ if f(&mut op_state.borrow_mut(), cx) {
+ maybe_scheduling = true;
+ }
+ }
+ }
+
// Top level module
self.evaluate_pending_module();
@@ -815,6 +834,7 @@ impl JsRuntime {
&& !has_pending_module_evaluation
&& !has_pending_background_tasks
&& !has_tick_scheduled
+ && !maybe_scheduling
{
if wait_for_inspector && inspector_has_active_sessions {
return Poll::Pending;
@@ -833,6 +853,7 @@ impl JsRuntime {
if state.have_unpolled_ops
|| has_pending_background_tasks
|| has_tick_scheduled
+ || maybe_scheduling
{
state.waker.wake();
}
@@ -843,6 +864,7 @@ impl JsRuntime {
|| has_pending_dyn_module_evaluation
|| has_pending_background_tasks
|| has_tick_scheduled
+ || maybe_scheduling
{
// pass, will be polled again
} else {