diff options
author | Bert Belder <bertbelder@gmail.com> | 2020-11-25 00:38:23 +0100 |
---|---|---|
committer | Bert Belder <bertbelder@gmail.com> | 2020-11-25 01:15:14 +0100 |
commit | 8d12653738066facfc228b1d0d9e31b76c6d9de0 (patch) | |
tree | e45c90cc79607fd5c8a29d0231f9cc43353399ac /core | |
parent | 605874ee98b52f5de7d1d1284507d5a9cb9eea9d (diff) |
core: implement 'AsyncRefCell' and 'ResourceTable2' (#8273)
Diffstat (limited to 'core')
-rw-r--r-- | core/Cargo.toml | 4 | ||||
-rw-r--r-- | core/async_cell.rs | 713 | ||||
-rw-r--r-- | core/examples/http_bench_bin_ops.js | 1 | ||||
-rw-r--r-- | core/examples/http_bench_bin_ops.rs | 145 | ||||
-rw-r--r-- | core/examples/http_bench_json_ops.rs | 155 | ||||
-rw-r--r-- | core/lib.rs | 11 | ||||
-rw-r--r-- | core/ops.rs | 6 | ||||
-rw-r--r-- | core/resources.rs | 5 | ||||
-rw-r--r-- | core/resources2.rs | 140 |
9 files changed, 1075 insertions, 105 deletions
diff --git a/core/Cargo.toml b/core/Cargo.toml index 84a14f73d..ed1f71c75 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -14,7 +14,7 @@ path = "lib.rs" [dependencies] anyhow = "1.0.32" -futures = "0.3.5" +futures = "0.3.8" indexmap = "1.6.0" lazy_static = "1.4.0" libc = "0.2.77" @@ -35,4 +35,4 @@ path = "examples/http_bench_json_ops.rs" # These dependendencies are only used for the 'http_bench_*_ops' examples. [dev-dependencies] -tokio = { version = "0.2.22", features = ["full"] } +tokio = { version = "0.3.4", features = ["full"] } diff --git a/core/async_cell.rs b/core/async_cell.rs new file mode 100644 index 000000000..a140dceb1 --- /dev/null +++ b/core/async_cell.rs @@ -0,0 +1,713 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use std::any::Any; +use std::borrow::Borrow; +use std::cell::Cell; +use std::cell::UnsafeCell; +use std::collections::VecDeque; +use std::ops::Deref; +use std::rc::Rc; + +use self::internal as i; + +pub type AsyncRef<T> = i::AsyncBorrowImpl<T, i::Shared>; +pub type AsyncMut<T> = i::AsyncBorrowImpl<T, i::Exclusive>; + +pub type AsyncRefFuture<T> = i::AsyncBorrowFutureImpl<T, i::Shared>; +pub type AsyncMutFuture<T> = i::AsyncBorrowFutureImpl<T, i::Exclusive>; + +pub struct AsyncRefCell<T> { + value: UnsafeCell<T>, + borrow_count: Cell<i::BorrowCount>, + waiters: Cell<VecDeque<Option<i::Waiter>>>, + turn: Cell<usize>, +} + +impl<T: 'static> AsyncRefCell<T> { + /// Create a new `AsyncRefCell` that encapsulates the specified value. + /// Note that in order to borrow the inner value, the `AsyncRefCell` + /// needs to be wrapped in an `Rc` or an `RcRef`. These can be created + /// either manually, or by using the convenience method + /// `AsyncRefCell::new_rc()`. + pub fn new(value: T) -> Self { + Self { + value: UnsafeCell::new(value), + borrow_count: Default::default(), + waiters: Default::default(), + turn: Default::default(), + } + } + + pub fn new_rc(value: T) -> Rc<Self> { + Rc::new(Self::new(value)) + } + + pub fn as_ptr(&self) -> *mut T { + self.value.get() + } +} + +impl<T: Default + 'static> Default for AsyncRefCell<T> { + fn default() -> Self { + Self::new(Default::default()) + } +} + +impl<T: Default + 'static> AsyncRefCell<T> { + pub fn default_rc() -> Rc<Self> { + Rc::new(Default::default()) + } +} + +impl<T: 'static> From<T> for AsyncRefCell<T> { + fn from(value: T) -> Self { + Self::new(value) + } +} + +impl<T> AsyncRefCell<T> { + pub fn borrow(self: &Rc<Self>) -> AsyncRefFuture<T> { + AsyncRefFuture::new(self) + } + + pub fn borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<T> { + AsyncMutFuture::new(self) + } + + pub fn try_borrow(self: &Rc<Self>) -> Option<AsyncRef<T>> { + Self::borrow_sync(self) + } + + pub fn try_borrow_mut(self: &Rc<Self>) -> Option<AsyncMut<T>> { + Self::borrow_sync(self) + } +} + +impl<T> RcRef<AsyncRefCell<T>> { + pub fn borrow(&self) -> AsyncRefFuture<T> { + AsyncRefFuture::new(self) + } + + pub fn borrow_mut(&self) -> AsyncMutFuture<T> { + AsyncMutFuture::new(self) + } + + pub fn try_borrow(&self) -> Option<AsyncRef<T>> { + AsyncRefCell::<T>::borrow_sync(self) + } + + pub fn try_borrow_mut(&self) -> Option<AsyncMut<T>> { + AsyncRefCell::<T>::borrow_sync(self) + } +} + +/// An `RcRef` encapsulates a reference counted pointer, just like a regular +/// `std::rc::Rc`. However, unlike a regular `Rc`, it can be remapped so that +/// it dereferences to any value that's reachable through the reference-counted +/// pointer. This is achieved through the associated method, `RcRef::map()`, +/// similar to how `std::cell::Ref::map()` works. Example: +/// +/// ```rust +/// # use std::rc::Rc; +/// # use deno_core::async_cell::RcRef; +/// +/// struct Stuff { +/// foo: u32, +/// bar: String, +/// } +/// +/// let stuff_rc = Rc::new(Stuff { +/// foo: 42, +/// bar: "hello".to_owned(), +/// }); +/// +/// // `foo_rc` and `bar_rc` dereference to different types, however +/// // they share a reference count. +/// let foo_rc: RcRef<u32> = RcRef::map(stuff_rc.clone(), |v| &v.foo); +/// let bar_rc: RcRef<String> = RcRef::map(stuff_rc, |v| &v.bar); +/// ``` +pub struct RcRef<T> { + rc: Rc<dyn Any>, + value: *const T, +} + +impl<T: 'static> RcRef<T> { + pub fn new(value: T) -> Self { + Self::from(Rc::new(value)) + } + + pub fn map<S: 'static, R: i::RcLike<S>, F: FnOnce(&S) -> &T>( + source: R, + map_fn: F, + ) -> RcRef<T> { + let RcRef::<S> { rc, value } = source.into(); + let value = map_fn(unsafe { &*value }); + RcRef { rc, value } + } +} + +impl<T: Default + 'static> Default for RcRef<T> { + fn default() -> Self { + Self::new(Default::default()) + } +} + +impl<T: 'static> From<Rc<T>> for RcRef<T> { + fn from(rc: Rc<T>) -> Self { + Self { + value: &*rc, + rc: rc as Rc<_>, + } + } +} + +impl<T> Clone for RcRef<T> { + fn clone(&self) -> Self { + Self { + rc: self.rc.clone(), + value: self.value, + } + } +} + +impl<T> Deref for RcRef<T> { + type Target = T; + fn deref(&self) -> &Self::Target { + unsafe { &*self.value } + } +} + +impl<T> Borrow<T> for RcRef<T> { + fn borrow(&self) -> &T { + &**self + } +} + +impl<T> AsRef<T> for RcRef<T> { + fn as_ref(&self) -> &T { + &**self + } +} + +mod internal { + use super::AsyncRefCell; + use super::RcRef; + use futures::future::Future; + use futures::ready; + use futures::task::Context; + use futures::task::Poll; + use futures::task::Waker; + use std::borrow::Borrow; + use std::borrow::BorrowMut; + use std::fmt::Debug; + use std::marker::PhantomData; + use std::ops::Deref; + use std::ops::DerefMut; + use std::pin::Pin; + use std::rc::Rc; + + impl<T> AsyncRefCell<T> { + /// Borrow the cell's contents synchronouslym without creating an + /// intermediate future. If the cell has already been borrowed and either + /// the existing or the requested borrow is exclusive, this function returns + /// `None`. + pub(super) fn borrow_sync< + M: BorrowModeTrait, + R: RcLike<AsyncRefCell<T>>, + >( + cell: &R, + ) -> Option<AsyncBorrowImpl<T, M>> { + // Don't allow synchronous borrows to cut in line; if there are any + // enqueued waiters, return `None`, even if the current borrow is a shared + // one and the requested borrow is too. + let waiters = unsafe { &mut *cell.waiters.as_ptr() }; + if waiters.is_empty() { + // There are no enqueued waiters, but it is still possible that the cell + // is currently borrowed. If there are no current borrows, or both the + // existing and requested ones are shared, `try_add()` returns the + // adjusted borrow count. + let new_borrow_count = + cell.borrow_count.get().try_add(M::borrow_mode())?; + cell.borrow_count.set(new_borrow_count); + Some(AsyncBorrowImpl::<T, M>::new(cell.clone().into())) + } else { + None + } + } + + fn drop_borrow<M: BorrowModeTrait>(&self) { + let new_borrow_count = self.borrow_count.get().remove(M::borrow_mode()); + self.borrow_count.set(new_borrow_count); + + if new_borrow_count.is_empty() { + self.wake_waiters() + } + } + + fn create_waiter<M: BorrowModeTrait>(&self) -> usize { + let waiter = Waiter::new(M::borrow_mode()); + let turn = self.turn.get(); + let index = { + let waiters = unsafe { &mut *self.waiters.as_ptr() }; + waiters.push_back(Some(waiter)); + waiters.len() - 1 + }; + if index == 0 { + // SAFETY: the `waiters` reference used above *must* be dropped here. + self.wake_waiters() + } + // Return the new waiter's id. + turn + index + } + + fn poll_waiter<M: BorrowModeTrait>( + &self, + id: usize, + cx: &mut Context, + ) -> Poll<()> { + let borrow_count = self.borrow_count.get(); + let turn = self.turn.get(); + if id < turn { + // This waiter made it to the front of the line; we reserved a borrow + // for it, woke its Waker, and removed the waiter from the queue. + // Assertion: BorrowCount::remove() will panic if `mode` is incorrect. + let _ = borrow_count.remove(M::borrow_mode()); + Poll::Ready(()) + } else { + // This waiter is still in line and has not yet been woken. + let waiters = unsafe { &mut *self.waiters.as_ptr() }; + // Sanity check: id cannot be higher than the last queue element. + assert!(id < turn + waiters.len()); + // Sanity check: since we always call wake_waiters() when the queue head + // is updated, it should be impossible to add it to the current borrow. + assert!(id > turn || borrow_count.try_add(M::borrow_mode()).is_none()); + // Save or update the waiter's Waker. + // TODO(piscisaureus): Use will_wake() to make this more efficient. + let waiter_mut = waiters[id - turn].as_mut().unwrap(); + waiter_mut.set_waker(cx.waker().clone()); + Poll::Pending + } + } + + fn wake_waiters(&self) { + let mut borrow_count = self.borrow_count.get(); + let waiters = unsafe { &mut *self.waiters.as_ptr() }; + let mut turn = self.turn.get(); + + loop { + let waiter_entry = match waiters.front().map(Option::as_ref) { + None => break, // Queue empty. + Some(w) => w, + }; + let borrow_mode = match waiter_entry { + None => { + // Queue contains a hole. This happens when a Waiter is dropped + // before it makes it to the front of the queue. + waiters.pop_front(); + turn += 1; + continue; + } + Some(waiter) => waiter.borrow_mode(), + }; + // See if the waiter at the front of the queue can borrow the cell's + // value now. If it does, `try_add()` returns the new borrow count, + // effectively "reserving" the borrow until the associated + // AsyncBorrowFutureImpl future gets polled and produces the actual + // borrow. + borrow_count = match borrow_count.try_add(borrow_mode) { + None => break, // Can't borrow yet. + Some(b) => b, + }; + // Drop from queue. + let mut waiter = waiters.pop_front().unwrap().unwrap(); + turn += 1; + // Wake this waiter, so the AsyncBorrowFutureImpl future gets polled. + if let Some(waker) = waiter.take_waker() { + waker.wake() + } + } + // Save updated counters. + self.borrow_count.set(borrow_count); + self.turn.set(turn); + } + + fn drop_waiter<M: BorrowModeTrait>(&self, id: usize) { + let turn = self.turn.get(); + if id < turn { + // We already made a borrow count reservation for this waiter but the + // borrow will never be picked up and removesequently, never dropped. + // Therefore, call the borrow drop handler here. + self.drop_borrow::<M>(); + } else { + // This waiter is still in the queue, take it out and leave a "hole". + let waiters = unsafe { &mut *self.waiters.as_ptr() }; + waiters[id - turn].take().unwrap(); + } + + if id == turn { + // Since the first entry in the waiter queue was touched we have to + // reprocess the waiter queue. + self.wake_waiters() + } + } + } + + pub struct AsyncBorrowFutureImpl<T: 'static, M: BorrowModeTrait> { + cell: Option<RcRef<AsyncRefCell<T>>>, + id: usize, + _phantom: PhantomData<M>, + } + + impl<T, M: BorrowModeTrait> AsyncBorrowFutureImpl<T, M> { + pub fn new<R: RcLike<AsyncRefCell<T>>>(cell: &R) -> Self { + Self { + cell: Some(cell.clone().into()), + id: cell.create_waiter::<M>(), + _phantom: PhantomData, + } + } + } + + impl<T: 'static, M: BorrowModeTrait> Future for AsyncBorrowFutureImpl<T, M> { + type Output = AsyncBorrowImpl<T, M>; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + ready!(self.cell.as_ref().unwrap().poll_waiter::<M>(self.id, cx)); + let self_mut = unsafe { Pin::get_unchecked_mut(self) }; + let cell = self_mut.cell.take().unwrap(); + Poll::Ready(AsyncBorrowImpl::<T, M>::new(cell)) + } + } + + impl<T, M: BorrowModeTrait> Drop for AsyncBorrowFutureImpl<T, M> { + fn drop(&mut self) { + // The expected mode of operation is that this future gets polled until it + // is ready and yields a value of type `AsyncBorrowImpl`, which has a drop + // handler that adjusts the `AsyncRefCell` borrow counter. However if the + // `cell` field still holds a value at this point, it means that the + // future was never polled to completion and no `AsyncBorrowImpl` was ever + // created, so we have to adjust the borrow count here. + if let Some(cell) = self.cell.take() { + cell.drop_waiter::<M>(self.id) + } + } + } + + pub struct AsyncBorrowImpl<T: 'static, M: BorrowModeTrait> { + cell: RcRef<AsyncRefCell<T>>, + _phantom: PhantomData<M>, + } + + impl<T, M: BorrowModeTrait> AsyncBorrowImpl<T, M> { + fn new(cell: RcRef<AsyncRefCell<T>>) -> Self { + Self { + cell, + _phantom: PhantomData, + } + } + } + + impl<T, M: BorrowModeTrait> Deref for AsyncBorrowImpl<T, M> { + type Target = T; + fn deref(&self) -> &Self::Target { + unsafe { &*self.cell.as_ptr() } + } + } + + impl<T, M: BorrowModeTrait> Borrow<T> for AsyncBorrowImpl<T, M> { + fn borrow(&self) -> &T { + &**self + } + } + + impl<T, M: BorrowModeTrait> AsRef<T> for AsyncBorrowImpl<T, M> { + fn as_ref(&self) -> &T { + &**self + } + } + + impl<T> DerefMut for AsyncBorrowImpl<T, Exclusive> { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { &mut *self.cell.as_ptr() } + } + } + + impl<T> BorrowMut<T> for AsyncBorrowImpl<T, Exclusive> { + fn borrow_mut(&mut self) -> &mut T { + &mut **self + } + } + + impl<T> AsMut<T> for AsyncBorrowImpl<T, Exclusive> { + fn as_mut(&mut self) -> &mut T { + &mut **self + } + } + + impl<T, M: BorrowModeTrait> Drop for AsyncBorrowImpl<T, M> { + fn drop(&mut self) { + self.cell.drop_borrow::<M>() + } + } + + #[derive(Copy, Clone, Debug, Eq, PartialEq)] + pub enum BorrowMode { + Shared, + Exclusive, + } + + pub trait BorrowModeTrait: Copy { + fn borrow_mode() -> BorrowMode; + } + + #[derive(Copy, Clone, Debug)] + pub struct Shared; + + impl BorrowModeTrait for Shared { + fn borrow_mode() -> BorrowMode { + BorrowMode::Shared + } + } + + #[derive(Copy, Clone, Debug)] + pub struct Exclusive; + + impl BorrowModeTrait for Exclusive { + fn borrow_mode() -> BorrowMode { + BorrowMode::Exclusive + } + } + + #[derive(Copy, Clone, Debug, Eq, PartialEq)] + pub enum BorrowCount { + Shared(usize), + Exclusive, + } + + impl Default for BorrowCount { + fn default() -> Self { + Self::Shared(0) + } + } + + impl BorrowCount { + pub fn is_empty(self) -> bool { + matches!(self, BorrowCount::Shared(0)) + } + + pub fn try_add(self, mode: BorrowMode) -> Option<BorrowCount> { + match (self, mode) { + (BorrowCount::Shared(refs), BorrowMode::Shared) => { + Some(BorrowCount::Shared(refs + 1)) + } + (BorrowCount::Shared(0), BorrowMode::Exclusive) => { + Some(BorrowCount::Exclusive) + } + _ => None, + } + } + + #[allow(dead_code)] + pub fn add(self, mode: BorrowMode) -> BorrowCount { + match self.try_add(mode) { + Some(value) => value, + None => panic!("Can't add {:?} to {:?}", mode, self), + } + } + + pub fn try_remove(self, mode: BorrowMode) -> Option<BorrowCount> { + match (self, mode) { + (BorrowCount::Shared(refs), BorrowMode::Shared) if refs > 0 => { + Some(BorrowCount::Shared(refs - 1)) + } + (BorrowCount::Exclusive, BorrowMode::Exclusive) => { + Some(BorrowCount::Shared(0)) + } + _ => None, + } + } + + pub fn remove(self, mode: BorrowMode) -> BorrowCount { + match self.try_remove(mode) { + Some(value) => value, + None => panic!("Can't remove {:?} from {:?}", mode, self), + } + } + } + + /// The `waiters` queue that is associated with an individual `AsyncRefCell` + /// contains elements of the `Waiter` type. + pub struct Waiter { + borrow_mode: BorrowMode, + waker: Option<Waker>, + } + + impl Waiter { + pub fn new(borrow_mode: BorrowMode) -> Self { + Self { + borrow_mode, + waker: None, + } + } + + pub fn borrow_mode(&self) -> BorrowMode { + self.borrow_mode + } + + pub fn set_waker(&mut self, waker: Waker) { + self.waker.replace(waker); + } + + pub fn take_waker(&mut self) -> Option<Waker> { + self.waker.take() + } + } + + /// The `RcLike` trait provides an abstraction over `std::rc::Rc` and `RcRef`, + /// so that applicable methods can operate on either type. + pub trait RcLike<T>: Clone + Deref<Target = T> + Into<RcRef<T>> {} + + impl<T: 'static> RcLike<T> for Rc<T> {} + impl<T: 'static> RcLike<T> for RcRef<T> {} +} + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Default)] + struct Thing { + touch_count: usize, + _private: (), + } + + impl Thing { + pub fn look(&self) -> usize { + self.touch_count + } + + pub fn touch(&mut self) -> usize { + self.touch_count += 1; + self.touch_count + } + } + + #[tokio::test] + async fn async_ref_cell_borrow() { + let cell = AsyncRefCell::<Thing>::default_rc(); + + let fut1 = cell.borrow(); + let fut2 = cell.borrow_mut(); + let fut3 = cell.borrow(); + let fut4 = cell.borrow(); + let fut5 = cell.borrow(); + let fut6 = cell.borrow(); + let fut7 = cell.borrow_mut(); + let fut8 = cell.borrow(); + + // The `try_borrow` and `try_borrow_mut` methods should always return `None` + // if there's a queue of async borrowers. + assert!(cell.try_borrow().is_none()); + assert!(cell.try_borrow_mut().is_none()); + + assert_eq!(fut1.await.look(), 0); + + assert_eq!(fut2.await.touch(), 1); + + { + let ref5 = fut5.await; + let ref4 = fut4.await; + let ref3 = fut3.await; + let ref6 = fut6.await; + assert_eq!(ref3.look(), 1); + assert_eq!(ref4.look(), 1); + assert_eq!(ref5.look(), 1); + assert_eq!(ref6.look(), 1); + } + + { + let mut ref7 = fut7.await; + assert_eq!(ref7.look(), 1); + assert_eq!(ref7.touch(), 2); + } + + { + let ref8 = fut8.await; + assert_eq!(ref8.look(), 2); + } + } + + #[test] + fn async_ref_cell_try_borrow() { + let cell = AsyncRefCell::<Thing>::default_rc(); + + { + let ref1 = cell.try_borrow().unwrap(); + assert_eq!(ref1.look(), 0); + assert!(cell.try_borrow_mut().is_none()); + } + + { + let mut ref2 = cell.try_borrow_mut().unwrap(); + assert_eq!(ref2.touch(), 1); + assert!(cell.try_borrow().is_none()); + assert!(cell.try_borrow_mut().is_none()); + } + + { + let ref3 = cell.try_borrow().unwrap(); + let ref4 = cell.try_borrow().unwrap(); + let ref5 = cell.try_borrow().unwrap(); + let ref6 = cell.try_borrow().unwrap(); + assert_eq!(ref3.look(), 1); + assert_eq!(ref4.look(), 1); + assert_eq!(ref5.look(), 1); + assert_eq!(ref6.look(), 1); + assert!(cell.try_borrow_mut().is_none()); + } + + { + let mut ref7 = cell.try_borrow_mut().unwrap(); + assert_eq!(ref7.look(), 1); + assert_eq!(ref7.touch(), 2); + assert!(cell.try_borrow().is_none()); + assert!(cell.try_borrow_mut().is_none()); + } + + { + let ref8 = cell.try_borrow().unwrap(); + assert_eq!(ref8.look(), 2); + assert!(cell.try_borrow_mut().is_none()); + assert!(cell.try_borrow().is_some()); + } + } + + #[derive(Default)] + struct ThreeThings { + pub thing1: AsyncRefCell<Thing>, + pub thing2: AsyncRefCell<Thing>, + pub thing3: AsyncRefCell<Thing>, + } + + #[tokio::test] + async fn rc_ref_map() { + let three_cells = Rc::new(ThreeThings::default()); + + let rc1 = RcRef::map(three_cells.clone(), |things| &things.thing1); + let rc2 = RcRef::map(three_cells.clone(), |things| &things.thing2); + let rc3 = RcRef::map(three_cells, |things| &things.thing3); + + let mut ref1 = rc1.borrow_mut().await; + let ref2 = rc2.borrow().await; + let mut ref3 = rc3.borrow_mut().await; + + assert_eq!(ref1.look(), 0); + assert_eq!(ref3.touch(), 1); + assert_eq!(ref1.touch(), 1); + assert_eq!(ref2.look(), 0); + assert_eq!(ref3.touch(), 2); + assert_eq!(ref1.look(), 1); + assert_eq!(ref1.touch(), 2); + assert_eq!(ref3.touch(), 3); + assert_eq!(ref1.touch(), 3); + } +} diff --git a/core/examples/http_bench_bin_ops.js b/core/examples/http_bench_bin_ops.js index 066d5bf58..a90be70c0 100644 --- a/core/examples/http_bench_bin_ops.js +++ b/core/examples/http_bench_bin_ops.js @@ -134,7 +134,6 @@ async function main() { for (;;) { const rid = await accept(listenerRid); - // Deno.core.print(`accepted ${rid}`); if (rid < 0) { Deno.core.print(`accept error ${rid}`); return; diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs index 7335b8670..9af74d980 100644 --- a/core/examples/http_bench_bin_ops.rs +++ b/core/examples/http_bench_bin_ops.rs @@ -3,16 +3,21 @@ #[macro_use] extern crate log; +use deno_core::AsyncMutFuture; +use deno_core::AsyncRefCell; +use deno_core::AsyncRefFuture; use deno_core::BufVec; use deno_core::JsRuntime; use deno_core::Op; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::ZeroCopyBuf; -use futures::future::poll_fn; use futures::future::FutureExt; use futures::future::TryFuture; use futures::future::TryFutureExt; use std::cell::RefCell; +use std::convert::TryFrom; use std::convert::TryInto; use std::env; use std::fmt::Debug; @@ -20,14 +25,10 @@ use std::io::Error; use std::io::ErrorKind; use std::mem::size_of; use std::net::SocketAddr; -use std::pin::Pin; use std::ptr; use std::rc::Rc; -use tokio::io::AsyncRead; -use tokio::io::AsyncWrite; -use tokio::net::TcpListener; -use tokio::net::TcpStream; -use tokio::runtime; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; struct Logger; @@ -45,6 +46,64 @@ impl log::Log for Logger { fn flush(&self) {} } +// Note: it isn't actually necessary to wrap the `tokio::net::TcpListener` in +// a cell, because it only supports one op (`accept`) which does not require +// a mutable reference to the listener. +struct TcpListener(AsyncRefCell<tokio::net::TcpListener>); + +impl Resource for TcpListener {} + +impl TcpListener { + /// Returns a future that yields a shared borrow of the TCP listener. + fn borrow(self: Rc<Self>) -> AsyncRefFuture<tokio::net::TcpListener> { + RcRef::map(self, |r| &r.0).borrow() + } +} + +impl TryFrom<std::net::TcpListener> for TcpListener { + type Error = Error; + fn try_from(l: std::net::TcpListener) -> Result<Self, Self::Error> { + tokio::net::TcpListener::try_from(l) + .map(AsyncRefCell::new) + .map(Self) + } +} + +struct TcpStream { + rd: AsyncRefCell<tokio::net::tcp::OwnedReadHalf>, + wr: AsyncRefCell<tokio::net::tcp::OwnedWriteHalf>, +} + +impl Resource for TcpStream {} + +impl TcpStream { + /// Returns a future that yields an exclusive borrow of the read end of the + /// tcp stream. + fn rd_borrow_mut( + self: Rc<Self>, + ) -> AsyncMutFuture<tokio::net::tcp::OwnedReadHalf> { + RcRef::map(self, |r| &r.rd).borrow_mut() + } + + /// Returns a future that yields an exclusive borrow of the write end of the + /// tcp stream. + fn wr_borrow_mut( + self: Rc<Self>, + ) -> AsyncMutFuture<tokio::net::tcp::OwnedWriteHalf> { + RcRef::map(self, |r| &r.wr).borrow_mut() + } +} + +impl From<tokio::net::TcpStream> for TcpStream { + fn from(s: tokio::net::TcpStream) -> Self { + let (rd, wr) = s.into_split(); + Self { + rd: rd.into(), + wr: wr.into(), + } + } +} + #[derive(Copy, Clone, Debug, PartialEq)] struct Record { promise_id: u32, @@ -94,8 +153,9 @@ fn op_listen( debug!("listen"); let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); let std_listener = std::net::TcpListener::bind(&addr)?; - let listener = TcpListener::from_std(std_listener)?; - let rid = state.resource_table.add("tcpListener", Box::new(listener)); + std_listener.set_nonblocking(true)?; + let listener = TcpListener::try_from(std_listener)?; + let rid = state.resource_table_2.add(listener); Ok(rid) } @@ -106,7 +166,7 @@ fn op_close( ) -> Result<u32, Error> { debug!("close rid={}", rid); state - .resource_table + .resource_table_2 .close(rid) .map(|_| 0) .ok_or_else(bad_resource_id) @@ -119,56 +179,52 @@ async fn op_accept( ) -> Result<u32, Error> { debug!("accept rid={}", rid); - poll_fn(move |cx| { - let resource_table = &mut state.borrow_mut().resource_table; - - let listener = resource_table - .get_mut::<TcpListener>(rid) - .ok_or_else(bad_resource_id)?; - listener.poll_accept(cx).map_ok(|(stream, _addr)| { - resource_table.add("tcpStream", Box::new(stream)) - }) - }) - .await + let listener_rc = state + .borrow() + .resource_table_2 + .get::<TcpListener>(rid) + .ok_or_else(bad_resource_id)?; + let listener_ref = listener_rc.borrow().await; + + let stream: TcpStream = listener_ref.accept().await?.0.into(); + let rid = state.borrow_mut().resource_table_2.add(stream); + Ok(rid) } -fn op_read( +async fn op_read( state: Rc<RefCell<OpState>>, rid: u32, - bufs: BufVec, -) -> impl TryFuture<Ok = usize, Error = Error> { + mut bufs: BufVec, +) -> Result<usize, Error> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let mut buf = bufs[0].clone(); - debug!("read rid={}", rid); - poll_fn(move |cx| { - let resource_table = &mut state.borrow_mut().resource_table; + let stream_rc = state + .borrow() + .resource_table_2 + .get::<TcpStream>(rid) + .ok_or_else(bad_resource_id)?; + let mut rd_stream_mut = stream_rc.rd_borrow_mut().await; - let stream = resource_table - .get_mut::<TcpStream>(rid) - .ok_or_else(bad_resource_id)?; - Pin::new(stream).poll_read(cx, &mut buf) - }) + rd_stream_mut.read(&mut bufs[0]).await } -fn op_write( +async fn op_write( state: Rc<RefCell<OpState>>, rid: u32, bufs: BufVec, -) -> impl TryFuture<Ok = usize, Error = Error> { +) -> Result<usize, Error> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let buf = bufs[0].clone(); debug!("write rid={}", rid); - poll_fn(move |cx| { - let resource_table = &mut state.borrow_mut().resource_table; + let stream_rc = state + .borrow() + .resource_table_2 + .get::<TcpStream>(rid) + .ok_or_else(bad_resource_id)?; + let mut wr_stream_mut = stream_rc.wr_borrow_mut().await; - let stream = resource_table - .get_mut::<TcpStream>(rid) - .ok_or_else(bad_resource_id)?; - Pin::new(stream).poll_write(cx, &buf) - }) + wr_stream_mut.write(&bufs[0]).await } fn register_op_bin_sync<F>( @@ -247,8 +303,7 @@ fn main() { deno_core::v8_set_flags(env::args().collect()); let mut js_runtime = create_js_runtime(); - let mut runtime = runtime::Builder::new() - .basic_scheduler() + let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs index 2cf3d09e3..77f5b9dbe 100644 --- a/core/examples/http_bench_json_ops.rs +++ b/core/examples/http_bench_json_ops.rs @@ -5,25 +5,25 @@ extern crate log; use deno_core::error::bad_resource_id; use deno_core::error::AnyError; +use deno_core::AsyncMutFuture; +use deno_core::AsyncRefCell; +use deno_core::AsyncRefFuture; use deno_core::BufVec; use deno_core::JsRuntime; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::ZeroCopyBuf; -use futures::future::poll_fn; -use futures::future::Future; use serde_json::Value; use std::cell::RefCell; +use std::convert::TryFrom; use std::convert::TryInto; use std::env; +use std::io::Error; use std::net::SocketAddr; -use std::pin::Pin; use std::rc::Rc; -use std::task::Poll; -use tokio::io::AsyncRead; -use tokio::io::AsyncWrite; -use tokio::net::TcpListener; -use tokio::net::TcpStream; -use tokio::runtime; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; struct Logger; @@ -41,6 +41,64 @@ impl log::Log for Logger { fn flush(&self) {} } +// Note: it isn't actually necessary to wrap the `tokio::net::TcpListener` in +// a cell, because it only supports one op (`accept`) which does not require +// a mutable reference to the listener. +struct TcpListener(AsyncRefCell<tokio::net::TcpListener>); + +impl Resource for TcpListener {} + +impl TcpListener { + /// Returns a future that yields a shared borrow of the TCP listener. + fn borrow(self: Rc<Self>) -> AsyncRefFuture<tokio::net::TcpListener> { + RcRef::map(self, |r| &r.0).borrow() + } +} + +impl TryFrom<std::net::TcpListener> for TcpListener { + type Error = Error; + fn try_from(l: std::net::TcpListener) -> Result<Self, Self::Error> { + tokio::net::TcpListener::try_from(l) + .map(AsyncRefCell::new) + .map(Self) + } +} + +struct TcpStream { + rd: AsyncRefCell<tokio::net::tcp::OwnedReadHalf>, + wr: AsyncRefCell<tokio::net::tcp::OwnedWriteHalf>, +} + +impl Resource for TcpStream {} + +impl TcpStream { + /// Returns a future that yields an exclusive borrow of the read end of the + /// tcp stream. + fn rd_borrow_mut( + self: Rc<Self>, + ) -> AsyncMutFuture<tokio::net::tcp::OwnedReadHalf> { + RcRef::map(self, |r| &r.rd).borrow_mut() + } + + /// Returns a future that yields an exclusive borrow of the write end of the + /// tcp stream. + fn wr_borrow_mut( + self: Rc<Self>, + ) -> AsyncMutFuture<tokio::net::tcp::OwnedWriteHalf> { + RcRef::map(self, |r| &r.wr).borrow_mut() + } +} + +impl From<tokio::net::TcpStream> for TcpStream { + fn from(s: tokio::net::TcpStream) -> Self { + let (rd, wr) = s.into_split(); + Self { + rd: rd.into(), + wr: wr.into(), + } + } +} + fn create_js_runtime() -> JsRuntime { let mut runtime = JsRuntime::new(Default::default()); runtime.register_op("listen", deno_core::json_op_sync(op_listen)); @@ -59,8 +117,9 @@ fn op_listen( debug!("listen"); let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); let std_listener = std::net::TcpListener::bind(&addr)?; - let listener = TcpListener::from_std(std_listener)?; - let rid = state.resource_table.add("tcpListener", Box::new(listener)); + std_listener.set_nonblocking(true)?; + let listener = TcpListener::try_from(std_listener)?; + let rid = state.resource_table_2.add(listener); Ok(serde_json::json!({ "rid": rid })) } @@ -78,17 +137,17 @@ fn op_close( .unwrap(); debug!("close rid={}", rid); state - .resource_table + .resource_table_2 .close(rid) .map(|_| serde_json::json!(())) .ok_or_else(bad_resource_id) } -fn op_accept( +async fn op_accept( state: Rc<RefCell<OpState>>, args: Value, _bufs: BufVec, -) -> impl Future<Output = Result<Value, AnyError>> { +) -> Result<Value, AnyError> { let rid: u32 = args .get("rid") .unwrap() @@ -98,26 +157,24 @@ fn op_accept( .unwrap(); debug!("accept rid={}", rid); - poll_fn(move |cx| { - let resource_table = &mut state.borrow_mut().resource_table; - - let listener = resource_table - .get_mut::<TcpListener>(rid) - .ok_or_else(bad_resource_id)?; - listener.poll_accept(cx)?.map(|(stream, _addr)| { - let rid = resource_table.add("tcpStream", Box::new(stream)); - Ok(serde_json::json!({ "rid": rid })) - }) - }) + let listener_rc = state + .borrow() + .resource_table_2 + .get::<TcpListener>(rid) + .ok_or_else(bad_resource_id)?; + let listener_ref = listener_rc.borrow().await; + + let stream: TcpStream = listener_ref.accept().await?.0.into(); + let rid = state.borrow_mut().resource_table_2.add(stream); + Ok(serde_json::json!({ "rid": rid })) } -fn op_read( +async fn op_read( state: Rc<RefCell<OpState>>, args: Value, mut bufs: BufVec, -) -> impl Future<Output = Result<Value, AnyError>> { +) -> Result<Value, AnyError> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let rid: u32 = args .get("rid") .unwrap() @@ -127,25 +184,23 @@ fn op_read( .unwrap(); debug!("read rid={}", rid); - poll_fn(move |cx| -> Poll<Result<Value, AnyError>> { - let resource_table = &mut state.borrow_mut().resource_table; + let stream_rc = state + .borrow() + .resource_table_2 + .get::<TcpStream>(rid) + .ok_or_else(bad_resource_id)?; + let mut rd_stream_mut = stream_rc.rd_borrow_mut().await; - let stream = resource_table - .get_mut::<TcpStream>(rid) - .ok_or_else(bad_resource_id)?; - Pin::new(stream) - .poll_read(cx, &mut bufs[0])? - .map(|nread| Ok(serde_json::json!({ "nread": nread }))) - }) + let nread = rd_stream_mut.read(&mut bufs[0]).await?; + Ok(serde_json::json!({ "nread": nread })) } -fn op_write( +async fn op_write( state: Rc<RefCell<OpState>>, args: Value, bufs: BufVec, -) -> impl Future<Output = Result<Value, AnyError>> { +) -> Result<Value, AnyError> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let rid: u32 = args .get("rid") .unwrap() @@ -155,16 +210,15 @@ fn op_write( .unwrap(); debug!("write rid={}", rid); - poll_fn(move |cx| { - let resource_table = &mut state.borrow_mut().resource_table; + let stream_rc = state + .borrow() + .resource_table_2 + .get::<TcpStream>(rid) + .ok_or_else(bad_resource_id)?; + let mut wr_stream_mut = stream_rc.wr_borrow_mut().await; - let stream = resource_table - .get_mut::<TcpStream>(rid) - .ok_or_else(bad_resource_id)?; - Pin::new(stream) - .poll_write(cx, &bufs[0])? - .map(|nwritten| Ok(serde_json::json!({ "nwritten": nwritten }))) - }) + let nwritten = wr_stream_mut.write(&bufs[0]).await?; + Ok(serde_json::json!({ "nwritten": nwritten })) } fn main() { @@ -180,8 +234,7 @@ fn main() { deno_core::v8_set_flags(env::args().collect()); let mut js_runtime = create_js_runtime(); - let mut runtime = runtime::Builder::new() - .basic_scheduler() + let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); diff --git a/core/lib.rs b/core/lib.rs index 48968fa97..34bd25bcb 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -5,6 +5,7 @@ extern crate lazy_static; #[macro_use] extern crate log; +mod async_cell; mod bindings; pub mod error; mod flags; @@ -15,6 +16,7 @@ mod normalize_path; mod ops; pub mod plugin_api; mod resources; +mod resources2; mod runtime; mod shared_queue; mod zero_copy_buf; @@ -26,6 +28,12 @@ pub use serde; pub use serde_json; pub use url; +pub use crate::async_cell::AsyncMut; +pub use crate::async_cell::AsyncMutFuture; +pub use crate::async_cell::AsyncRef; +pub use crate::async_cell::AsyncRefCell; +pub use crate::async_cell::AsyncRefFuture; +pub use crate::async_cell::RcRef; pub use crate::flags::v8_set_flags; pub use crate::module_specifier::ModuleResolutionError; pub use crate::module_specifier::ModuleSpecifier; @@ -47,6 +55,9 @@ pub use crate::ops::OpId; pub use crate::ops::OpState; pub use crate::ops::OpTable; pub use crate::resources::ResourceTable; +pub use crate::resources2::Resource; +pub use crate::resources2::ResourceId; +pub use crate::resources2::ResourceTable2; pub use crate::runtime::GetErrorClassFn; pub use crate::runtime::JsRuntime; pub use crate::runtime::RuntimeOptions; diff --git a/core/ops.rs b/core/ops.rs index ed74ce873..bf10d3d86 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -34,6 +34,7 @@ pub enum Op { /// Maintains the resources and ops inside a JS runtime. pub struct OpState { pub resource_table: crate::ResourceTable, + pub resource_table_2: crate::resources2::ResourceTable, pub op_table: OpTable, pub get_error_class_fn: crate::runtime::GetErrorClassFn, gotham_state: GothamState, @@ -45,10 +46,11 @@ impl Default for OpState { // pub(crate) fn new() -> OpState fn default() -> OpState { OpState { - resource_table: crate::ResourceTable::default(), + resource_table: Default::default(), + resource_table_2: Default::default(), op_table: OpTable::default(), get_error_class_fn: &|_| "Error", - gotham_state: GothamState::default(), + gotham_state: Default::default(), } } } diff --git a/core/resources.rs b/core/resources.rs index 25ea95f41..753fa9713 100644 --- a/core/resources.rs +++ b/core/resources.rs @@ -6,13 +6,10 @@ // Resources may or may not correspond to a real operating system file // descriptor (hence the different name). +use crate::resources2::ResourceId; use std::any::Any; use std::collections::HashMap; -/// ResourceId is Deno's version of a file descriptor. ResourceId is also referred -/// to as `rid` in the code base. -pub type ResourceId = u32; - /// These store Deno's file descriptors. These are not necessarily the operating /// system ones. type ResourceMap = HashMap<ResourceId, (String, Box<dyn Any>)>; diff --git a/core/resources2.rs b/core/resources2.rs new file mode 100644 index 000000000..52bd4f3e7 --- /dev/null +++ b/core/resources2.rs @@ -0,0 +1,140 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +// Think of Resources as File Descriptors. They are integers that are allocated +// by the privileged side of Deno which refer to various rust objects that need +// to be persisted between various ops. For example, network sockets are +// resources. Resources may or may not correspond to a real operating system +// file descriptor (hence the different name). + +use std::any::type_name; +use std::any::Any; +use std::any::TypeId; +use std::borrow::Cow; +use std::collections::HashMap; +use std::iter::Iterator; +use std::rc::Rc; + +/// All objects that can be store in the resource table should implement the +/// `Resource` trait. +pub trait Resource: Any + 'static { + /// Returns a string representation of the resource which is made available + /// to JavaScript code through `op_resources`. The default implementation + /// returns the Rust type name, but specific resource types may override this + /// trait method. + fn name(&self) -> Cow<str> { + type_name::<Self>().into() + } +} + +impl dyn Resource { + #[inline(always)] + fn is<T: Resource>(&self) -> bool { + self.type_id() == TypeId::of::<T>() + } + + #[inline(always)] + fn downcast_rc<'a, T: Resource>(self: &'a Rc<Self>) -> Option<&'a Rc<T>> { + if self.is::<T>() { + let ptr = self as *const Rc<_> as *const Rc<T>; + Some(unsafe { &*ptr }) + } else { + None + } + } +} + +/// A `ResourceId` is an integer value referencing a resource. It could be +/// considered to be the Deno equivalent of a `file descriptor` in POSIX like +/// operating systems. Elsewhere in the code base it is commonly abbreviated +/// to `rid`. +// TODO: use `u64` instead? +pub type ResourceId = u32; + +/// Temporary alias for `crate::resources2::ResourceTable`. +// TODO: remove this when the old `ResourceTable` is obsolete. +pub type ResourceTable2 = ResourceTable; + +/// Map-like data structure storing Deno's resources (equivalent to file +/// descriptors). +/// +/// Provides basic methods for element access. A resource can be of any type. +/// Different types of resources can be stored in the same map, and provided +/// with a name for description. +/// +/// Each resource is identified through a _resource ID (rid)_, which acts as +/// the key in the map. +#[derive(Default)] +pub struct ResourceTable { + index: HashMap<ResourceId, Rc<dyn Resource>>, + next_rid: ResourceId, +} + +impl ResourceTable { + /// Returns true if any resource with the given `rid` is exists. + pub fn has(&self, rid: ResourceId) -> bool { + self.index.contains_key(&rid) + } + + /// Returns a reference counted pointer to the resource of type `T` with the + /// given `rid`. If `rid` is not present or has a type different than `T`, + /// this function returns `None`. + pub fn get<T: Resource>(&self, rid: ResourceId) -> Option<Rc<T>> { + self + .index + .get(&rid) + .and_then(|resource| resource.downcast_rc::<T>()) + .map(Clone::clone) + } + + /// Inserts resource into the resource table, which takes ownership of it. + /// + /// The resource type is erased at runtime and must be statically known + /// when retrieving it through `get()`. + /// + /// Returns a unique resource ID, which acts as a key for this resource. + pub fn add<T: Resource>(&mut self, resource: T) -> ResourceId { + self.add_rc(Rc::new(resource)) + } + + /// Inserts a `Rc`-wrapped resource into the resource table. + /// + /// The resource type is erased at runtime and must be statically known + /// when retrieving it through `get()`. + /// + /// Returns a unique resource ID, which acts as a key for this resource. + pub fn add_rc<T: Resource>(&mut self, resource: Rc<T>) -> ResourceId { + let resource = resource as Rc<dyn Resource>; + let rid = self.next_rid; + let removed_resource = self.index.insert(rid, resource); + assert!(removed_resource.is_none()); + self.next_rid += 1; + rid + } + + /// Removes the resource with the given `rid` from the resource table. If the + /// only reference to this resource existed in the resource table, this will + /// cause the resource to be dropped. However, since resources are reference + /// counted, therefore pending ops are not automatically cancelled. + pub fn close(&mut self, rid: ResourceId) -> Option<()> { + self.index.remove(&rid).map(|_| ()) + } + + /// Returns an iterator that yields a `(id, name)` pair for every resource + /// that's currently in the resource table. This can be used for debugging + /// purposes or to implement the `op_resources` op. Note that the order in + /// which items appear is not specified. + /// + /// # Example + /// + /// ``` + /// # use deno_core::resources2::ResourceTable; + /// # let resource_table = ResourceTable::default(); + /// let resource_names = resource_table.names().collect::<Vec<_>>(); + /// ``` + pub fn names(&self) -> impl Iterator<Item = (ResourceId, Cow<str>)> { + self + .index + .iter() + .map(|(&id, resource)| (id, resource.name())) + } +} |