summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/es_isolate.rs4
-rw-r--r--core/examples/http_bench.rs69
-rw-r--r--core/isolate.rs23
-rw-r--r--core/ops.rs32
-rw-r--r--core/plugins.rs4
5 files changed, 67 insertions, 65 deletions
diff --git a/core/es_isolate.rs b/core/es_isolate.rs
index aa17c1a46..8c2e5b26d 100644
--- a/core/es_isolate.rs
+++ b/core/es_isolate.rs
@@ -581,12 +581,12 @@ pub mod tests {
let mut isolate = EsIsolate::new(loader, StartupData::None, false);
let dispatcher =
- move |control: &[u8], _zero_copy: Option<ZeroCopyBuf>| -> CoreOp {
+ move |control: &[u8], _zero_copy: Option<ZeroCopyBuf>| -> Op {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
- Op::Async(futures::future::ok(buf).boxed())
+ Op::Async(futures::future::ready(buf).boxed())
};
isolate.register_op("test", dispatcher);
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs
index 159f23fb5..27fefc8bb 100644
--- a/core/examples/http_bench.rs
+++ b/core/examples/http_bench.rs
@@ -97,15 +97,38 @@ impl Isolate {
state: Default::default(),
};
- isolate.register_op("listen", op_listen);
+ isolate.register_sync_op("listen", op_listen);
isolate.register_op("accept", op_accept);
isolate.register_op("read", op_read);
isolate.register_op("write", op_write);
- isolate.register_op("close", op_close);
+ isolate.register_sync_op("close", op_close);
isolate
}
+ fn register_sync_op<F>(&mut self, name: &'static str, handler: F)
+ where
+ F: 'static + Fn(State, u32, Option<ZeroCopyBuf>) -> Result<u32, Error>,
+ {
+ let state = self.state.clone();
+ let core_handler =
+ move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> Op {
+ let state = state.clone();
+ let record = Record::from(control_buf);
+ let is_sync = record.promise_id == 0;
+ assert!(is_sync);
+
+ let result: i32 = match handler(state, record.rid, zero_copy_buf) {
+ Ok(r) => r as i32,
+ Err(_) => -1,
+ };
+ let buf = RecordBuf::from(Record { result, ..record })[..].into();
+ Op::Sync(buf)
+ };
+
+ self.core_isolate.register_op(name, core_handler);
+ }
+
fn register_op<F>(
&mut self,
name: &'static str,
@@ -117,10 +140,11 @@ impl Isolate {
{
let state = self.state.clone();
let core_handler =
- move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> CoreOp {
+ move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> Op {
let state = state.clone();
let record = Record::from(control_buf);
let is_sync = record.promise_id == 0;
+ assert!(!is_sync);
let fut = async move {
let op = handler(state, record.rid, zero_copy_buf);
@@ -128,14 +152,10 @@ impl Isolate {
.map_ok(|r| r.try_into().expect("op result does not fit in i32"))
.unwrap_or_else(|_| -1)
.await;
- Ok(RecordBuf::from(Record { result, ..record })[..].into())
+ RecordBuf::from(Record { result, ..record })[..].into()
};
- if is_sync {
- Op::Sync(futures::executor::block_on(fut).unwrap())
- } else {
- Op::Async(fut.boxed_local())
- }
+ Op::Async(fut.boxed_local())
};
self.core_isolate.register_op(name, core_handler);
@@ -154,32 +174,27 @@ fn op_close(
state: State,
rid: u32,
_buf: Option<ZeroCopyBuf>,
-) -> impl TryFuture<Ok = u32, Error = Error> {
+) -> Result<u32, Error> {
debug!("close rid={}", rid);
-
- async move {
- let resource_table = &mut state.borrow_mut().resource_table;
- resource_table
- .close(rid)
- .map(|_| 0)
- .ok_or_else(bad_resource)
- }
+ let resource_table = &mut state.borrow_mut().resource_table;
+ resource_table
+ .close(rid)
+ .map(|_| 0)
+ .ok_or_else(bad_resource)
}
fn op_listen(
state: State,
_rid: u32,
_buf: Option<ZeroCopyBuf>,
-) -> impl TryFuture<Ok = u32, Error = Error> {
+) -> Result<u32, Error> {
debug!("listen");
-
- async move {
- let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
- let listener = tokio::net::TcpListener::bind(&addr).await?;
- let resource_table = &mut state.borrow_mut().resource_table;
- let rid = resource_table.add("tcpListener", Box::new(listener));
- Ok(rid)
- }
+ let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
+ let std_listener = std::net::TcpListener::bind(&addr)?;
+ let listener = TcpListener::from_std(std_listener)?;
+ let resource_table = &mut state.borrow_mut().resource_table;
+ let rid = resource_table.add("tcpListener", Box::new(listener));
+ Ok(rid)
}
fn op_accept(
diff --git a/core/isolate.rs b/core/isolate.rs
index 31a4c401c..18cd84eae 100644
--- a/core/isolate.rs
+++ b/core/isolate.rs
@@ -14,7 +14,6 @@ use crate::ops::*;
use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use futures::future::FutureExt;
-use futures::future::TryFutureExt;
use futures::stream::select;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
@@ -34,6 +33,8 @@ use std::sync::{Arc, Mutex, Once};
use std::task::Context;
use std::task::Poll;
+type PendingOpFuture = Pin<Box<dyn Future<Output = (OpId, Buf)>>>;
+
/// A ZeroCopyBuf encapsulates a slice that's been borrowed from a JavaScript
/// ArrayBuffer object. JavaScript objects can normally be garbage collected,
/// but the existence of a ZeroCopyBuf inhibits this until it is dropped. It
@@ -344,7 +345,7 @@ impl Isolate {
/// Requires runtime to explicitly ask for op ids before using any of the ops.
pub fn register_op<F>(&self, name: &str, op: F) -> OpId
where
- F: Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp + 'static,
+ F: Fn(&[u8], Option<ZeroCopyBuf>) -> Op + 'static,
{
self.op_registry.register(name, op)
}
@@ -402,13 +403,13 @@ impl Isolate {
Some((op_id, buf))
}
Op::Async(fut) => {
- let fut2 = fut.map_ok(move |buf| (op_id, buf));
+ let fut2 = fut.map(move |buf| (op_id, buf));
self.pending_ops.push(fut2.boxed_local());
self.have_unpolled_ops = true;
None
}
Op::AsyncUnref(fut) => {
- let fut2 = fut.map_ok(move |buf| (op_id, buf));
+ let fut2 = fut.map(move |buf| (op_id, buf));
self.pending_unref_ops.push(fut2.boxed_local());
self.have_unpolled_ops = true;
None
@@ -528,10 +529,9 @@ impl Future for Isolate {
match select(&mut inner.pending_ops, &mut inner.pending_unref_ops)
.poll_next_unpin(cx)
{
- Poll::Ready(Some(Err(_))) => panic!("unexpected op error"),
Poll::Ready(None) => break,
Poll::Pending => break,
- Poll::Ready(Some(Ok((op_id, buf)))) => {
+ Poll::Ready(Some((op_id, buf))) => {
let successful_push = inner.shared.push(op_id, &buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
@@ -769,14 +769,14 @@ pub mod tests {
let mut isolate = Isolate::new(StartupData::None, false);
let dispatcher =
- move |control: &[u8], _zero_copy: Option<ZeroCopyBuf>| -> CoreOp {
+ move |control: &[u8], _zero_copy: Option<ZeroCopyBuf>| -> Op {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
match mode {
Mode::Async => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let buf = vec![43u8].into_boxed_slice();
- Op::Async(futures::future::ok(buf).boxed())
+ Op::Async(futures::future::ready(buf).boxed())
}
Mode::AsyncUnref => {
assert_eq!(control.len(), 1);
@@ -784,8 +784,7 @@ pub mod tests {
let fut = async {
// This future never finish.
futures::future::pending::<()>().await;
- let buf = vec![43u8].into_boxed_slice();
- Ok(buf)
+ vec![43u8].into_boxed_slice()
};
Op::AsyncUnref(fut.boxed())
}
@@ -806,7 +805,7 @@ pub mod tests {
Mode::OverflowReqAsync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
- Op::Async(futures::future::ok(buf).boxed())
+ Op::Async(futures::future::ready(buf).boxed())
}
Mode::OverflowResAsync => {
assert_eq!(control.len(), 1);
@@ -815,7 +814,7 @@ pub mod tests {
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 4;
let buf = vec.into_boxed_slice();
- Op::Async(futures::future::ok(buf).boxed())
+ Op::Async(futures::future::ready(buf).boxed())
}
}
};
diff --git a/core/ops.rs b/core/ops.rs
index 16807196e..ab183f4de 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -10,30 +10,18 @@ pub type OpId = u32;
pub type Buf = Box<[u8]>;
-pub type OpAsyncFuture<E> = Pin<Box<dyn Future<Output = Result<Buf, E>>>>;
+pub type OpAsyncFuture = Pin<Box<dyn Future<Output = Buf>>>;
-pub(crate) type PendingOpFuture =
- Pin<Box<dyn Future<Output = Result<(OpId, Buf), CoreError>>>>;
-
-pub type OpResult<E> = Result<Op<E>, E>;
-
-// TODO(ry) Op::Async should be Op::Async(Pin<Box<dyn Future<Output = Buf>>>)
-// The error should be encoded in the Buf. Notice how Sync ops do not return a
-// result. The Sync and Async should be symmetrical!
-pub enum Op<E> {
+pub enum Op {
Sync(Buf),
- Async(OpAsyncFuture<E>),
+ Async(OpAsyncFuture),
/// AsyncUnref is the variation of Async, which doesn't block the program
/// exiting.
- AsyncUnref(OpAsyncFuture<E>),
+ AsyncUnref(OpAsyncFuture),
}
-pub type CoreError = ();
-
-pub type CoreOp = Op<CoreError>;
-
/// Main type describing op
-pub type OpDispatcher = dyn Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp + 'static;
+pub type OpDispatcher = dyn Fn(&[u8], Option<ZeroCopyBuf>) -> Op + 'static;
#[derive(Default)]
pub struct OpRegistry {
@@ -54,7 +42,7 @@ impl OpRegistry {
pub fn register<F>(&self, name: &str, op: F) -> OpId
where
- F: Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp + 'static,
+ F: Fn(&[u8], Option<ZeroCopyBuf>) -> Op + 'static,
{
let mut lock = self.dispatchers.write().unwrap();
let op_id = lock.len() as u32;
@@ -83,7 +71,7 @@ impl OpRegistry {
op_id: OpId,
control: &[u8],
zero_copy_buf: Option<ZeroCopyBuf>,
- ) -> Option<CoreOp> {
+ ) -> Option<Op> {
// Op with id 0 has special meaning - it's a special op that is always
// provided to retrieve op id map. The map consists of name to `OpId`
// mappings.
@@ -113,7 +101,7 @@ fn test_op_registry() {
let test_id = op_registry.register("test", move |_, _| {
c_.fetch_add(1, atomic::Ordering::SeqCst);
- CoreOp::Sync(Box::new([]))
+ Op::Sync(Box::new([]))
});
assert!(test_id != 0);
@@ -149,9 +137,9 @@ fn register_op_during_call() {
let c__ = c_.clone();
op_registry_.register("test", move |_, _| {
c__.fetch_add(1, atomic::Ordering::SeqCst);
- CoreOp::Sync(Box::new([]))
+ Op::Sync(Box::new([]))
});
- CoreOp::Sync(Box::new([]))
+ Op::Sync(Box::new([]))
});
assert!(test_id != 0);
diff --git a/core/plugins.rs b/core/plugins.rs
index edb675120..c6e63c975 100644
--- a/core/plugins.rs
+++ b/core/plugins.rs
@@ -1,5 +1,5 @@
use crate::isolate::ZeroCopyBuf;
-use crate::ops::CoreOp;
+use crate::ops::Op;
pub type PluginInitFn = fn(context: &mut dyn PluginInitContext);
@@ -7,7 +7,7 @@ pub trait PluginInitContext {
fn register_op(
&mut self,
name: &str,
- op: Box<dyn Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp + 'static>,
+ op: Box<dyn Fn(&[u8], Option<ZeroCopyBuf>) -> Op + 'static>,
);
}