summaryrefslogtreecommitdiff
path: root/cli/ops.rs
diff options
context:
space:
mode:
authorBert Belder <bertbelder@gmail.com>2019-04-28 21:31:10 +0200
committerBert Belder <bertbelder@gmail.com>2019-05-01 21:11:09 +0200
commit41c7e96f1a81ea416ebb3ba45f2815e0202d6b75 (patch)
tree5fcb15b8664d7579cf4db76d16754c8efa7c4667 /cli/ops.rs
parentabdb98a2516a9d6ec313805dffbc2107d38f8ed4 (diff)
Refactor zero-copy buffers for performance and to prevent memory leaks
* In order to prevent ArrayBuffers from getting garbage collected by V8, we used to store a v8::Persistent<ArrayBuffer> in a map. This patch introduces a custom ArrayBuffer allocator which doesn't use Persistent handles, but instead stores a pointer to the actual ArrayBuffer data alongside with a reference count. Since creating Persistent handles has quite a bit of overhead, this change significantly increases performance. Various HTTP server benchmarks report about 5-10% more requests per second than before. * Previously the Persistent handle that prevented garbage collection had to be released manually, and this wasn't always done, which was causing memory leaks. This has been resolved by introducing a new `PinnedBuf` type in both Rust and C++ that automatically re-enables garbage collection when it goes out of scope. * Zero-copy buffers are now correctly wrapped in an Option if there is a possibility that they're not present. This clears up a correctness issue where we were creating zero-length slices from a null pointer, which is against the rules.
Diffstat (limited to 'cli/ops.rs')
-rw-r--r--cli/ops.rs213
1 files changed, 106 insertions, 107 deletions
diff --git a/cli/ops.rs b/cli/ops.rs
index d8a0c5cfa..5463bac4d 100644
--- a/cli/ops.rs
+++ b/cli/ops.rs
@@ -23,11 +23,11 @@ use crate::tokio_write;
use crate::version;
use crate::worker::root_specifier_to_url;
use crate::worker::Worker;
-use deno::deno_buf;
use deno::js_check;
use deno::Buf;
use deno::JSError;
use deno::Op;
+use deno::PinnedBuf;
use flatbuffers::FlatBufferBuilder;
use futures;
use futures::Async;
@@ -64,7 +64,7 @@ pub type OpWithError = dyn Future<Item = Buf, Error = DenoError> + Send;
// TODO Ideally we wouldn't have to box the OpWithError being returned.
// The box is just to make it easier to get a prototype refactor working.
type OpCreator =
- fn(state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf)
+ fn(state: &ThreadSafeState, base: &msg::Base<'_>, data: Option<PinnedBuf>)
-> Box<OpWithError>;
pub type OpSelector = fn(inner_type: msg::Any) -> Option<OpCreator>;
@@ -81,11 +81,11 @@ fn empty_buf() -> Buf {
pub fn dispatch_all(
state: &ThreadSafeState,
control: &[u8],
- zero_copy: deno_buf,
+ zero_copy: Option<PinnedBuf>,
op_selector: OpSelector,
) -> (bool, Box<Op>) {
let bytes_sent_control = control.len();
- let bytes_sent_zero_copy = zero_copy.len();
+ let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0);
let base = msg::get_root_as_base(&control);
let is_sync = base.sync();
let inner_type = base.inner_type();
@@ -226,9 +226,9 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> {
fn op_now(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let seconds = state.start_time.elapsed().as_secs();
let mut subsec_nanos = state.start_time.elapsed().subsec_nanos();
let reduced_time_precision = 2_000_000; // 2ms in nanoseconds
@@ -262,7 +262,7 @@ fn op_now(
fn op_is_tty(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
- _data: deno_buf,
+ _data: Option<PinnedBuf>,
) -> Box<OpWithError> {
let builder = &mut FlatBufferBuilder::new();
let inner = msg::IsTTYRes::create(
@@ -287,7 +287,7 @@ fn op_is_tty(
fn op_exit(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
- _data: deno_buf,
+ _data: Option<PinnedBuf>,
) -> Box<OpWithError> {
let inner = base.inner_as_exit().unwrap();
std::process::exit(inner.code())
@@ -296,9 +296,9 @@ fn op_exit(
fn op_start(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let mut builder = FlatBufferBuilder::new();
let state = state;
@@ -351,9 +351,9 @@ fn op_start(
fn op_format_error(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_format_error().unwrap();
let orig_error = String::from(inner.error().unwrap());
@@ -410,9 +410,9 @@ pub fn odd_future(err: DenoError) -> Box<OpWithError> {
fn op_fetch_module_meta_data(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_fetch_module_meta_data().unwrap();
let cmd_id = base.cmd_id();
let specifier = inner.specifier().unwrap();
@@ -452,9 +452,9 @@ fn op_fetch_module_meta_data(
fn op_compiler_config(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_compiler_config().unwrap();
let cmd_id = base.cmd_id();
let compiler_type = inner.compiler_type().unwrap();
@@ -486,9 +486,9 @@ fn op_compiler_config(
fn op_chdir(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_chdir().unwrap();
let directory = inner.directory().unwrap();
Box::new(futures::future::result(|| -> OpResult {
@@ -500,10 +500,10 @@ fn op_chdir(
fn op_global_timer_stop(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
assert!(base.sync());
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let state = state;
let mut t = state.global_timer.lock().unwrap();
t.cancel();
@@ -513,10 +513,10 @@ fn op_global_timer_stop(
fn op_global_timer(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
assert!(!base.sync());
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let cmd_id = base.cmd_id();
let inner = base.inner_as_global_timer().unwrap();
let val = inner.timeout();
@@ -546,9 +546,9 @@ fn op_global_timer(
fn op_set_env(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_set_env().unwrap();
let key = inner.key().unwrap();
let value = inner.value().unwrap();
@@ -562,9 +562,9 @@ fn op_set_env(
fn op_env(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let cmd_id = base.cmd_id();
if let Err(e) = state.check_env() {
@@ -594,9 +594,9 @@ fn op_env(
fn op_permissions(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let cmd_id = base.cmd_id();
let builder = &mut FlatBufferBuilder::new();
let inner = msg::PermissionsRes::create(
@@ -624,9 +624,9 @@ fn op_permissions(
fn op_revoke_permission(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_permission_revoke().unwrap();
let permission = inner.permission().unwrap();
let result = match permission {
@@ -647,7 +647,7 @@ fn op_revoke_permission(
fn op_fetch(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
let inner = base.inner_as_fetch().unwrap();
let cmd_id = base.cmd_id();
@@ -656,10 +656,9 @@ fn op_fetch(
assert!(header.is_request());
let url = header.url().unwrap();
- let body = if data.is_empty() {
- hyper::Body::empty()
- } else {
- hyper::Body::from(Vec::from(&*data))
+ let body = match data {
+ None => hyper::Body::empty(),
+ Some(buf) => hyper::Body::from(Vec::from(&*buf)),
};
let maybe_req = msg_util::deserialize_request(header, body);
@@ -734,9 +733,9 @@ where
fn op_make_temp_dir(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let base = Box::new(*base);
let inner = base.inner_as_make_temp_dir().unwrap();
let cmd_id = base.cmd_id();
@@ -783,9 +782,9 @@ fn op_make_temp_dir(
fn op_mkdir(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_mkdir().unwrap();
let path = String::from(inner.path().unwrap());
let recursive = inner.recursive();
@@ -805,9 +804,9 @@ fn op_mkdir(
fn op_chmod(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_chmod().unwrap();
let _mode = inner.mode();
let path = String::from(inner.path().unwrap());
@@ -844,9 +843,9 @@ fn op_chmod(
fn op_open(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let cmd_id = base.cmd_id();
let inner = base.inner_as_open().unwrap();
let filename_str = inner.filename().unwrap();
@@ -934,9 +933,9 @@ fn op_open(
fn op_close(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_close().unwrap();
let rid = inner.rid();
match resources::lookup(rid) {
@@ -951,9 +950,9 @@ fn op_close(
fn op_kill(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_kill().unwrap();
let pid = inner.pid();
let signo = inner.signo();
@@ -966,9 +965,9 @@ fn op_kill(
fn op_shutdown(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_shutdown().unwrap();
let rid = inner.rid();
let how = inner.how();
@@ -992,7 +991,7 @@ fn op_shutdown(
fn op_read(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
let cmd_id = base.cmd_id();
let inner = base.inner_as_read().unwrap();
@@ -1001,7 +1000,7 @@ fn op_read(
match resources::lookup(rid) {
None => odd_future(errors::bad_resource()),
Some(resource) => {
- let op = tokio::io::read(resource, data)
+ let op = tokio::io::read(resource, data.unwrap())
.map_err(DenoError::from)
.and_then(move |(_resource, _buf, nread)| {
let builder = &mut FlatBufferBuilder::new();
@@ -1030,7 +1029,7 @@ fn op_read(
fn op_write(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
let cmd_id = base.cmd_id();
let inner = base.inner_as_write().unwrap();
@@ -1039,7 +1038,7 @@ fn op_write(
match resources::lookup(rid) {
None => odd_future(errors::bad_resource()),
Some(resource) => {
- let op = tokio_write::write(resource, data)
+ let op = tokio_write::write(resource, data.unwrap())
.map_err(DenoError::from)
.and_then(move |(_resource, _buf, nwritten)| {
let builder = &mut FlatBufferBuilder::new();
@@ -1067,9 +1066,9 @@ fn op_write(
fn op_seek(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let _cmd_id = base.cmd_id();
let inner = base.inner_as_seek().unwrap();
let rid = inner.rid();
@@ -1089,9 +1088,9 @@ fn op_seek(
fn op_remove(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_remove().unwrap();
let path_ = inner.path().unwrap();
let path = PathBuf::from(path_);
@@ -1118,9 +1117,9 @@ fn op_remove(
fn op_copy_file(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_copy_file().unwrap();
let from_ = inner.from().unwrap();
let from = PathBuf::from(from_);
@@ -1174,9 +1173,9 @@ fn get_mode(_perm: &fs::Permissions) -> u32 {
fn op_cwd(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let cmd_id = base.cmd_id();
Box::new(futures::future::result(|| -> OpResult {
let path = std::env::current_dir()?;
@@ -1200,9 +1199,9 @@ fn op_cwd(
fn op_stat(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_stat().unwrap();
let cmd_id = base.cmd_id();
let filename_ = inner.filename().unwrap();
@@ -1252,9 +1251,9 @@ fn op_stat(
fn op_read_dir(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_read_dir().unwrap();
let cmd_id = base.cmd_id();
let path = String::from(inner.path().unwrap());
@@ -1313,9 +1312,9 @@ fn op_read_dir(
fn op_rename(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_rename().unwrap();
let oldpath = PathBuf::from(inner.oldpath().unwrap());
let newpath_ = inner.newpath().unwrap();
@@ -1333,9 +1332,9 @@ fn op_rename(
fn op_link(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_link().unwrap();
let oldname = PathBuf::from(inner.oldname().unwrap());
let newname_ = inner.newname().unwrap();
@@ -1355,9 +1354,9 @@ fn op_link(
fn op_symlink(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_symlink().unwrap();
let oldname = PathBuf::from(inner.oldname().unwrap());
let newname_ = inner.newname().unwrap();
@@ -1384,9 +1383,9 @@ fn op_symlink(
fn op_read_link(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_readlink().unwrap();
let cmd_id = base.cmd_id();
let name_ = inner.name().unwrap();
@@ -1422,9 +1421,9 @@ fn op_read_link(
fn op_repl_start(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_repl_start().unwrap();
let cmd_id = base.cmd_id();
let history_file = String::from(inner.history_file().unwrap());
@@ -1453,9 +1452,9 @@ fn op_repl_start(
fn op_repl_readline(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_repl_readline().unwrap();
let cmd_id = base.cmd_id();
let rid = inner.rid();
@@ -1489,9 +1488,9 @@ fn op_repl_readline(
fn op_truncate(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_truncate().unwrap();
let filename = String::from(inner.name().unwrap());
@@ -1512,9 +1511,9 @@ fn op_truncate(
fn op_utime(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_utime().unwrap();
let filename = String::from(inner.filename().unwrap());
@@ -1535,9 +1534,9 @@ fn op_utime(
fn op_listen(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
if let Err(e) = state.check_net("listen") {
return odd_future(e);
}
@@ -1597,9 +1596,9 @@ fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> OpResult {
fn op_accept(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
if let Err(e) = state.check_net("accept") {
return odd_future(e);
}
@@ -1623,9 +1622,9 @@ fn op_accept(
fn op_dial(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
if let Err(e) = state.check_net("dial") {
return odd_future(e);
}
@@ -1649,9 +1648,9 @@ fn op_dial(
fn op_metrics(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let cmd_id = base.cmd_id();
let builder = &mut FlatBufferBuilder::new();
@@ -1673,9 +1672,9 @@ fn op_metrics(
fn op_resources(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let cmd_id = base.cmd_id();
let builder = &mut FlatBufferBuilder::new();
@@ -1725,7 +1724,7 @@ fn subprocess_stdio_map(v: msg::ProcessStdio) -> std::process::Stdio {
fn op_run(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
assert!(base.sync());
let cmd_id = base.cmd_id();
@@ -1734,7 +1733,7 @@ fn op_run(
return odd_future(e);
}
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let inner = base.inner_as_run().unwrap();
let args = inner.args().unwrap();
let env = inner.env().unwrap();
@@ -1798,9 +1797,9 @@ fn op_run(
fn op_run_status(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let cmd_id = base.cmd_id();
let inner = base.inner_as_run_status().unwrap();
let rid = inner.rid();
@@ -1871,9 +1870,9 @@ impl Future for GetMessageFuture {
fn op_worker_get_message(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let cmd_id = base.cmd_id();
let op = GetMessageFuture {
@@ -1906,11 +1905,11 @@ fn op_worker_get_message(
fn op_worker_post_message(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
let cmd_id = base.cmd_id();
- let d = Vec::from(data.as_ref()).into_boxed_slice();
+ let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let tx = {
let wc = state.worker_channels.lock().unwrap();
@@ -1936,9 +1935,9 @@ fn op_worker_post_message(
fn op_create_worker(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let cmd_id = base.cmd_id();
let inner = base.inner_as_create_worker().unwrap();
let specifier = inner.specifier().unwrap();
@@ -1995,9 +1994,9 @@ fn op_create_worker(
fn op_host_get_worker_closed(
state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let cmd_id = base.cmd_id();
let inner = base.inner_as_host_get_worker_closed().unwrap();
let rid = inner.rid();
@@ -2026,9 +2025,9 @@ fn op_host_get_worker_closed(
fn op_host_get_message(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
- assert_eq!(data.len(), 0);
+ assert!(data.is_none());
let cmd_id = base.cmd_id();
let inner = base.inner_as_host_get_message().unwrap();
let rid = inner.rid();
@@ -2060,13 +2059,13 @@ fn op_host_get_message(
fn op_host_post_message(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
- data: deno_buf,
+ data: Option<PinnedBuf>,
) -> Box<OpWithError> {
let cmd_id = base.cmd_id();
let inner = base.inner_as_host_post_message().unwrap();
let rid = inner.rid();
- let d = Vec::from(data.as_ref()).into_boxed_slice();
+ let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let op = resources::post_message_to_worker(rid, d);
let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string()));