summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/unit/dispatch_bin_test.ts (renamed from cli/tests/unit/dispatch_buffer_test.ts)8
-rw-r--r--cli/tests/unit/dispatch_json_test.ts4
-rw-r--r--cli/tests/unit/unit_tests.ts2
-rw-r--r--core/core.js266
-rw-r--r--core/examples/http_bench_bin_ops.js92
-rw-r--r--core/examples/http_bench_bin_ops.rs150
-rw-r--r--core/lib.rs9
-rw-r--r--core/ops.rs122
-rw-r--r--core/ops_bin.rs (renamed from runtime/ops/ops_buffer.rs)22
-rw-r--r--core/ops_json.rs134
-rw-r--r--runtime/js/10_dispatch_buffer.js150
-rw-r--r--runtime/js/11_timers.js3
-rw-r--r--runtime/js/12_io.js10
-rw-r--r--runtime/ops/io.rs8
-rw-r--r--runtime/ops/mod.rs20
-rw-r--r--runtime/ops/timers.rs2
16 files changed, 403 insertions, 599 deletions
diff --git a/cli/tests/unit/dispatch_buffer_test.ts b/cli/tests/unit/dispatch_bin_test.ts
index 0e213fe3b..b2d96f3b3 100644
--- a/cli/tests/unit/dispatch_buffer_test.ts
+++ b/cli/tests/unit/dispatch_bin_test.ts
@@ -8,9 +8,9 @@ import {
const readErrorStackPattern = new RegExp(
`^.*
- at handleError \\(.*10_dispatch_buffer\\.js:.*\\)
- at bufferOpParseResult \\(.*10_dispatch_buffer\\.js:.*\\)
- at Array.<anonymous> \\(.*10_dispatch_buffer\\.js:.*\\).*$`,
+ at handleError \\(.*core\\.js:.*\\)
+ at binOpParseResult \\(.*core\\.js:.*\\)
+ at asyncHandle \\(.*core\\.js:.*\\).*$`,
"ms",
);
@@ -33,7 +33,7 @@ declare global {
}
}
-unitTest(function bufferOpsHeaderTooShort(): void {
+unitTest(function binOpsHeaderTooShort(): void {
for (const op of ["op_read_sync", "op_read_async"]) {
const readOpId = Deno.core.ops()[op];
const res = Deno.core.send(
diff --git a/cli/tests/unit/dispatch_json_test.ts b/cli/tests/unit/dispatch_json_test.ts
index c283e20c9..3cb9506dd 100644
--- a/cli/tests/unit/dispatch_json_test.ts
+++ b/cli/tests/unit/dispatch_json_test.ts
@@ -19,7 +19,7 @@ unitTest(function malformedJsonControlBuffer(): void {
assertMatch(resObj.err.message, /\bexpected value\b/);
});
-unitTest(function invalidPromiseId(): void {
+unitTest(function invalidRequestId(): void {
const opId = Deno.core.ops()["op_open_async"];
const reqBuf = new Uint8Array([0, 0, 0, 0, 0, 0, 0]);
const resBuf = Deno.core.send(opId, reqBuf);
@@ -28,5 +28,5 @@ unitTest(function invalidPromiseId(): void {
console.error(resText);
assertStrictEquals(resObj.ok, undefined);
assertStrictEquals(resObj.err.className, "TypeError");
- assertMatch(resObj.err.message, /\bpromiseId\b/);
+ assertMatch(resObj.err.message, /\brequestId\b/);
});
diff --git a/cli/tests/unit/unit_tests.ts b/cli/tests/unit/unit_tests.ts
index 6277abdfe..d80403366 100644
--- a/cli/tests/unit/unit_tests.ts
+++ b/cli/tests/unit/unit_tests.ts
@@ -15,7 +15,7 @@ import "./console_test.ts";
import "./copy_file_test.ts";
import "./custom_event_test.ts";
import "./dir_test.ts";
-import "./dispatch_buffer_test.ts";
+import "./dispatch_bin_test.ts";
import "./dispatch_json_test.ts";
import "./error_stack_test.ts";
import "./event_test.ts";
diff --git a/core/core.js b/core/core.js
index f44bf253e..2de8e1fff 100644
--- a/core/core.js
+++ b/core/core.js
@@ -30,13 +30,22 @@ SharedQueue Binary Layout
const core = window.Deno.core;
const { recv, send } = core;
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ///////////////////////////////////////// Dispatch /////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
+ const dispatch = send;
+ const dispatchByName = (opName, control, ...zeroCopy) =>
+ dispatch(opsCache[opName], control, ...zeroCopy);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ //////////////////////////////////// Shared array buffer ///////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
let sharedBytes;
let shared32;
- let asyncHandlers;
-
let opsCache = {};
- const errorMap = {};
function init() {
const shared = core.shared;
@@ -45,6 +54,7 @@ SharedQueue Binary Layout
assert(shared32 == null);
sharedBytes = new Uint8Array(shared);
shared32 = new Int32Array(shared);
+
asyncHandlers = [];
// Callers should not call core.recv, use setAsyncHandler.
recv(handleAsyncMsgFromRust);
@@ -150,15 +160,43 @@ SharedQueue Binary Layout
return [opId, buf];
}
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////// Error handling //////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
+ const errorMap = {};
+
+ function registerErrorClass(errorName, className, args) {
+ if (typeof errorMap[errorName] !== "undefined") {
+ throw new TypeError(`Error class for "${errorName}" already registered`);
+ }
+ errorMap[errorName] = [className, args ?? []];
+ }
+
+ function handleError(className, message) {
+ if (typeof errorMap[className] === "undefined") {
+ return new Error(
+ `Unregistered error class: "${className}"\n` +
+ ` ${message}\n` +
+ ` Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
+ );
+ }
+
+ const [ErrorClass, args] = errorMap[className];
+ return new ErrorClass(message, ...args);
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////// Async handling //////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
+ let asyncHandlers = [];
+
function setAsyncHandler(opId, cb) {
assert(opId != null);
asyncHandlers[opId] = cb;
}
- function setAsyncHandlerByName(opName, cb) {
- setAsyncHandler(opsCache[opName], cb);
- }
-
function handleAsyncMsgFromRust() {
while (true) {
const opIdBuf = shift();
@@ -166,108 +204,196 @@ SharedQueue Binary Layout
break;
}
assert(asyncHandlers[opIdBuf[0]] != null);
- asyncHandlers[opIdBuf[0]](opIdBuf[1]);
+ asyncHandlers[opIdBuf[0]](opIdBuf[1], true);
}
for (let i = 0; i < arguments.length; i += 2) {
- asyncHandlers[arguments[i]](arguments[i + 1]);
+ asyncHandlers[arguments[i]](arguments[i + 1], false);
}
}
- function dispatch(opName, control, ...zeroCopy) {
- return send(opsCache[opName], control, ...zeroCopy);
- }
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ///////////////////////////// General sync & async ops handling ////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
- function registerErrorClass(errorName, className, args) {
- if (typeof errorMap[errorName] !== "undefined") {
- throw new TypeError(`Error class for "${errorName}" already registered`);
+ let nextRequestId = 1;
+ const promiseTable = {};
+
+ function asyncHandle(u8Array, isCopyNeeded, opResultParser) {
+ const [requestId, result, error] = opResultParser(u8Array, isCopyNeeded);
+ if (error !== null) {
+ promiseTable[requestId][1](error);
+ } else {
+ promiseTable[requestId][0](result);
}
- errorMap[errorName] = [className, args ?? []];
+ delete promiseTable[requestId];
}
- function getErrorClassAndArgs(errorName) {
- return errorMap[errorName] ?? [undefined, []];
+ function opAsync(opName, opRequestBuilder, opResultParser) {
+ const opId = opsCache[opName];
+ // Make sure requests of this type are handled by the asyncHandler
+ // The asyncHandler's role is to call the "promiseTable[requestId]" function
+ if (typeof asyncHandlers[opId] === "undefined") {
+ asyncHandlers[opId] = (buffer, isCopyNeeded) =>
+ asyncHandle(buffer, isCopyNeeded, opResultParser);
+ }
+
+ const requestId = nextRequestId++;
+
+ // Create and store promise
+ const promise = new Promise((resolve, reject) => {
+ promiseTable[requestId] = [resolve, reject];
+ });
+
+ // Synchronously dispatch async request
+ core.dispatch(opId, ...opRequestBuilder(requestId));
+
+ // Wait for async response
+ return promise;
}
- // Returns Uint8Array
- function encodeJson(args) {
- const s = JSON.stringify(args);
- return core.encode(s);
+ function opSync(opName, opRequestBuilder, opResultParser) {
+ const opId = opsCache[opName];
+ const u8Array = core.dispatch(opId, ...opRequestBuilder());
+
+ const [_, result, error] = opResultParser(u8Array, false);
+ if (error !== null) throw error;
+ return result;
}
- function decodeJson(ui8) {
- const s = core.decode(ui8);
- return JSON.parse(s);
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ///////////////////////////////////// Bin ops handling /////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
+ const binRequestHeaderByteLength = 8 + 4;
+ const scratchBuffer = new ArrayBuffer(binRequestHeaderByteLength);
+ const scratchView = new DataView(scratchBuffer);
+
+ function binOpBuildRequest(requestId, argument, zeroCopy) {
+ scratchView.setBigUint64(0, BigInt(requestId), true);
+ scratchView.setUint32(8, argument, true);
+ return [scratchView, ...zeroCopy];
}
- let nextPromiseId = 1;
- const promiseTable = {};
+ function binOpParseResult(u8Array, isCopyNeeded) {
+ // Decode header value from u8Array
+ const headerByteLength = 8 + 2 * 4;
+ assert(u8Array.byteLength >= headerByteLength);
+ assert(u8Array.byteLength % 4 == 0);
+ const view = new DataView(
+ u8Array.buffer,
+ u8Array.byteOffset + u8Array.byteLength - headerByteLength,
+ headerByteLength,
+ );
+
+ const requestId = Number(view.getBigUint64(0, true));
+ const status = view.getUint32(8, true);
+ const result = view.getUint32(12, true);
+
+ // Error handling
+ if (status !== 0) {
+ const className = core.decode(u8Array.subarray(0, result));
+ const message = core.decode(u8Array.subarray(result, -headerByteLength))
+ .trim();
+
+ return [requestId, null, handleError(className, message)];
+ }
- function processResponse(res) {
- if ("ok" in res) {
- return res.ok;
+ if (u8Array.byteLength === headerByteLength) {
+ return [requestId, result, null];
}
- const [ErrorClass, args] = getErrorClassAndArgs(res.err.className);
- if (!ErrorClass) {
- throw new Error(
- `Unregistered error class: "${res.err.className}"\n ${res.err.message}\n Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
- );
+
+ // Rest of response buffer is passed as reference or as a copy
+ let respBuffer = null;
+ if (isCopyNeeded) {
+ // Copy part of the response array (if sent through shared array buf)
+ respBuffer = u8Array.slice(0, result);
+ } else {
+ // Create view on existing array (if sent through overflow)
+ respBuffer = u8Array.subarray(0, result);
}
- throw new ErrorClass(res.err.message, ...args);
+
+ return [requestId, respBuffer, null];
}
- async function jsonOpAsync(opName, args = null, ...zeroCopy) {
- setAsyncHandler(opsCache[opName], jsonOpAsyncHandler);
-
- const promiseId = nextPromiseId++;
- const reqBuf = core.encode("\0".repeat(8) + JSON.stringify(args));
- new DataView(reqBuf.buffer).setBigUint64(0, BigInt(promiseId));
- dispatch(opName, reqBuf, ...zeroCopy);
- let resolve, reject;
- const promise = new Promise((resolve_, reject_) => {
- resolve = resolve_;
- reject = reject_;
- });
- promise.resolve = resolve;
- promise.reject = reject;
- promiseTable[promiseId] = promise;
- return processResponse(await promise);
+ function binOpAsync(opName, argument = 0, ...zeroCopy) {
+ return opAsync(
+ opName,
+ (requestId) => binOpBuildRequest(requestId, argument, zeroCopy),
+ binOpParseResult,
+ );
+ }
+
+ function binOpSync(opName, argument = 0, ...zeroCopy) {
+ return opSync(
+ opName,
+ () => binOpBuildRequest(0, argument, zeroCopy),
+ binOpParseResult,
+ );
}
- function jsonOpSync(opName, args = null, ...zeroCopy) {
- const argsBuf = encodeJson(args);
- const res = dispatch(opName, argsBuf, ...zeroCopy);
- return processResponse(decodeJson(res));
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ///////////////////////////////////// Json ops handling ////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
+ const jsonRequestHeaderLength = 8;
+
+ function jsonOpBuildRequest(requestId, argument, zeroCopy) {
+ const u8Array = core.encode(
+ "\0".repeat(jsonRequestHeaderLength) + JSON.stringify(argument),
+ );
+ new DataView(u8Array.buffer).setBigUint64(0, BigInt(requestId), true);
+ return [u8Array, ...zeroCopy];
}
- function jsonOpAsyncHandler(buf) {
- // Json Op.
- const res = decodeJson(buf);
- const promise = promiseTable[res.promiseId];
- delete promiseTable[res.promiseId];
- promise.resolve(res);
+ function jsonOpParseResult(u8Array, _) {
+ const data = JSON.parse(core.decode(u8Array));
+
+ if ("err" in data) {
+ return [
+ data.requestId,
+ null,
+ handleError(data.err.className, data.err.message),
+ ];
+ }
+
+ return [data.requestId, data.ok, null];
+ }
+
+ function jsonOpAsync(opName, argument = null, ...zeroCopy) {
+ return opAsync(
+ opName,
+ (requestId) => jsonOpBuildRequest(requestId, argument, zeroCopy),
+ jsonOpParseResult,
+ );
+ }
+
+ function jsonOpSync(opName, argument = null, ...zeroCopy) {
+ return opSync(
+ opName,
+ () => [core.encode(JSON.stringify(argument)), ...zeroCopy],
+ jsonOpParseResult,
+ );
}
function resources() {
return jsonOpSync("op_resources");
}
-
function close(rid) {
- jsonOpSync("op_close", { rid });
+ return jsonOpSync("op_close", { rid });
}
Object.assign(window.Deno.core, {
jsonOpAsync,
jsonOpSync,
- setAsyncHandler,
- setAsyncHandlerByName,
- dispatch: send,
- dispatchByName: dispatch,
+ binOpAsync,
+ binOpSync,
+ dispatch,
+ dispatchByName,
ops,
close,
resources,
registerErrorClass,
- getErrorClassAndArgs,
sharedQueueInit: init,
// sharedQueue is private but exposed for testing.
sharedQueue: {
@@ -279,5 +405,7 @@ SharedQueue Binary Layout
reset,
shift,
},
+ // setAsyncHandler is private but exposed for testing.
+ setAsyncHandler,
});
})(this);
diff --git a/core/examples/http_bench_bin_ops.js b/core/examples/http_bench_bin_ops.js
index f20366494..18f98419f 100644
--- a/core/examples/http_bench_bin_ops.js
+++ b/core/examples/http_bench_bin_ops.js
@@ -8,85 +8,15 @@ const responseBuf = new Uint8Array(
.split("")
.map((c) => c.charCodeAt(0)),
);
-const promiseMap = new Map();
-let nextPromiseId = 1;
-
-function assert(cond) {
- if (!cond) {
- throw Error("assert");
- }
-}
-
-function createResolvable() {
- let resolve;
- let reject;
- const promise = new Promise((res, rej) => {
- resolve = res;
- reject = rej;
- });
- promise.resolve = resolve;
- promise.reject = reject;
- return promise;
-}
-
-const scratch32 = new Int32Array(3);
-const scratchBytes = new Uint8Array(
- scratch32.buffer,
- scratch32.byteOffset,
- scratch32.byteLength,
-);
-assert(scratchBytes.byteLength === 3 * 4);
-
-function send(promiseId, opId, rid, ...zeroCopy) {
- scratch32[0] = promiseId;
- scratch32[1] = rid;
- scratch32[2] = -1;
- return Deno.core.dispatch(opId, scratchBytes, ...zeroCopy);
-}
-
-/** Returns Promise<number> */
-function sendAsync(opId, rid, ...zeroCopy) {
- const promiseId = nextPromiseId++;
- const p = createResolvable();
- const buf = send(promiseId, opId, rid, ...zeroCopy);
- if (buf) {
- const record = recordFromBuf(buf);
- // Sync result.
- p.resolve(record.result);
- } else {
- // Async result.
- promiseMap.set(promiseId, p);
- }
- return p;
-}
-
-/** Returns i32 number */
-function sendSync(opId, rid) {
- const buf = send(0, opId, rid);
- const record = recordFromBuf(buf);
- return record[2];
-}
-
-function recordFromBuf(buf) {
- assert(buf.byteLength === 3 * 4);
- return new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4);
-}
-
-function handleAsyncMsgFromRust(buf) {
- const record = recordFromBuf(buf);
- const p = promiseMap.get(record[0]);
- promiseMap.delete(record[0]);
- p.resolve(record[2]);
-}
/** Listens on 0.0.0.0:4500, returns rid. */
function listen() {
- return sendSync(ops["listen"], -1);
+ return Deno.core.binOpSync("listen");
}
/** Accepts a connection, returns rid. */
function accept(rid) {
- return sendAsync(ops["accept"], rid);
+ return Deno.core.binOpAsync("accept", rid);
}
/**
@@ -94,16 +24,16 @@ function accept(rid) {
* Returns bytes read.
*/
function read(rid, data) {
- return sendAsync(ops["read"], rid, data);
+ return Deno.core.binOpAsync("read", rid, data);
}
/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */
function write(rid, data) {
- return sendAsync(ops["write"], rid, data);
+ return Deno.core.binOpAsync("write", rid, data);
}
function close(rid) {
- return sendSync(ops["close"], rid);
+ Deno.core.binOpSync("close", rid);
}
async function serve(rid) {
@@ -121,16 +51,14 @@ async function serve(rid) {
close(rid);
}
-let ops;
-
async function main() {
- ops = Deno.core.ops();
- for (const opName in ops) {
- Deno.core.setAsyncHandler(ops[opName], handleAsyncMsgFromRust);
- }
+ Deno.core.ops();
+ Deno.core.registerErrorClass("Error", Error);
const listenerRid = listen();
- Deno.core.print(`http_bench_bin_ops listening on http://127.0.0.1:4544/\n`);
+ Deno.core.print(
+ `http_bench_bin_ops listening on http://127.0.0.1:4544/\n`,
+ );
for (;;) {
const rid = await accept(listenerRid);
diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs
index bc4ca4dce..1f649b235 100644
--- a/core/examples/http_bench_bin_ops.rs
+++ b/core/examples/http_bench_bin_ops.rs
@@ -3,30 +3,23 @@
#[macro_use]
extern crate log;
+use deno_core::error::bad_resource_id;
+use deno_core::error::AnyError;
use deno_core::AsyncRefCell;
use deno_core::BufVec;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::JsRuntime;
-use deno_core::Op;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
-use futures::future::FutureExt;
-use futures::future::TryFuture;
-use futures::future::TryFutureExt;
use std::cell::RefCell;
use std::convert::TryFrom;
-use std::convert::TryInto;
use std::env;
-use std::fmt::Debug;
use std::io::Error;
-use std::io::ErrorKind;
-use std::mem::size_of;
use std::net::SocketAddr;
-use std::ptr;
use std::rc::Rc;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
@@ -120,52 +113,21 @@ impl From<tokio::net::TcpStream> for TcpStream {
}
}
-#[derive(Copy, Clone, Debug, PartialEq)]
-struct Record {
- promise_id: u32,
- rid: ResourceId,
- result: i32,
-}
-
-type RecordBuf = [u8; size_of::<Record>()];
-
-impl From<&[u8]> for Record {
- fn from(buf: &[u8]) -> Self {
- assert_eq!(buf.len(), size_of::<RecordBuf>());
- unsafe { *(buf as *const _ as *const RecordBuf) }.into()
- }
-}
-
-impl From<RecordBuf> for Record {
- fn from(buf: RecordBuf) -> Self {
- unsafe {
- #[allow(clippy::cast_ptr_alignment)]
- ptr::read_unaligned(&buf as *const _ as *const Self)
- }
- }
-}
-
-impl From<Record> for RecordBuf {
- fn from(record: Record) -> Self {
- unsafe { ptr::read(&record as *const _ as *const Self) }
- }
-}
-
fn create_js_runtime() -> JsRuntime {
- let mut js_runtime = JsRuntime::new(Default::default());
- register_op_bin_sync(&mut js_runtime, "listen", op_listen);
- register_op_bin_sync(&mut js_runtime, "close", op_close);
- register_op_bin_async(&mut js_runtime, "accept", op_accept);
- register_op_bin_async(&mut js_runtime, "read", op_read);
- register_op_bin_async(&mut js_runtime, "write", op_write);
- js_runtime
+ let mut runtime = JsRuntime::new(Default::default());
+ runtime.register_op("listen", deno_core::bin_op_sync(op_listen));
+ runtime.register_op("close", deno_core::bin_op_sync(op_close));
+ runtime.register_op("accept", deno_core::bin_op_async(op_accept));
+ runtime.register_op("read", deno_core::bin_op_async(op_read));
+ runtime.register_op("write", deno_core::bin_op_async(op_write));
+ runtime
}
fn op_listen(
state: &mut OpState,
_rid: ResourceId,
_bufs: &mut [ZeroCopyBuf],
-) -> Result<u32, Error> {
+) -> Result<u32, AnyError> {
debug!("listen");
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let std_listener = std::net::TcpListener::bind(&addr)?;
@@ -179,7 +141,7 @@ fn op_close(
state: &mut OpState,
rid: ResourceId,
_bufs: &mut [ZeroCopyBuf],
-) -> Result<u32, Error> {
+) -> Result<u32, AnyError> {
debug!("close rid={}", rid);
state
.resource_table
@@ -192,7 +154,7 @@ async fn op_accept(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
_bufs: BufVec,
-) -> Result<u32, Error> {
+) -> Result<u32, AnyError> {
debug!("accept rid={}", rid);
let listener = state
@@ -209,7 +171,7 @@ async fn op_read(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
mut bufs: BufVec,
-) -> Result<usize, Error> {
+) -> Result<u32, AnyError> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
debug!("read rid={}", rid);
@@ -218,14 +180,15 @@ async fn op_read(
.resource_table
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
- stream.read(&mut bufs[0]).await
+ let nread = stream.read(&mut bufs[0]).await?;
+ Ok(nread as u32)
}
async fn op_write(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
bufs: BufVec,
-) -> Result<usize, Error> {
+) -> Result<u32, AnyError> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
debug!("write rid={}", rid);
@@ -234,70 +197,8 @@ async fn op_write(
.resource_table
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
- stream.write(&bufs[0]).await
-}
-
-fn register_op_bin_sync<F>(
- js_runtime: &mut JsRuntime,
- name: &'static str,
- op_fn: F,
-) where
- F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result<u32, Error> + 'static,
-{
- let base_op_fn = move |state: Rc<RefCell<OpState>>, mut bufs: BufVec| -> Op {
- let record = Record::from(bufs[0].as_ref());
- let is_sync = record.promise_id == 0;
- assert!(is_sync);
-
- let zero_copy_bufs = &mut bufs[1..];
- let result: i32 =
- match op_fn(&mut state.borrow_mut(), record.rid, zero_copy_bufs) {
- Ok(r) => r as i32,
- Err(_) => -1,
- };
- let buf = RecordBuf::from(Record { result, ..record })[..].into();
- Op::Sync(buf)
- };
-
- js_runtime.register_op(name, base_op_fn);
-}
-
-fn register_op_bin_async<F, R>(
- js_runtime: &mut JsRuntime,
- name: &'static str,
- op_fn: F,
-) where
- F: Fn(Rc<RefCell<OpState>>, u32, BufVec) -> R + Copy + 'static,
- R: TryFuture,
- R::Ok: TryInto<i32>,
- <R::Ok as TryInto<i32>>::Error: Debug,
-{
- let base_op_fn = move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
- let mut bufs_iter = bufs.into_iter();
- let record_buf = bufs_iter.next().unwrap();
- let zero_copy_bufs = bufs_iter.collect::<BufVec>();
-
- let record = Record::from(record_buf.as_ref());
- let is_sync = record.promise_id == 0;
- assert!(!is_sync);
-
- let fut = async move {
- let op = op_fn(state, record.rid, zero_copy_bufs);
- let result = op
- .map_ok(|r| r.try_into().expect("op result does not fit in i32"))
- .unwrap_or_else(|_| -1)
- .await;
- RecordBuf::from(Record { result, ..record })[..].into()
- };
-
- Op::Async(fut.boxed_local())
- };
-
- js_runtime.register_op(name, base_op_fn);
-}
-
-fn bad_resource_id() -> Error {
- Error::new(ErrorKind::NotFound, "bad resource id")
+ let nwritten = stream.write(&bufs[0]).await?;
+ Ok(nwritten as u32)
}
fn main() {
@@ -329,18 +230,3 @@ fn main() {
};
runtime.block_on(future).unwrap();
}
-
-#[test]
-fn test_record_from() {
- let expected = Record {
- promise_id: 1,
- rid: 3,
- result: 4,
- };
- let buf = RecordBuf::from(expected);
- if cfg!(target_endian = "little") {
- assert_eq!(buf, [1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0]);
- }
- let actual = Record::from(buf);
- assert_eq!(actual, expected);
-}
diff --git a/core/lib.rs b/core/lib.rs
index deea9d281..c65ed7aac 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -15,6 +15,8 @@ mod module_specifier;
mod modules;
mod normalize_path;
mod ops;
+mod ops_bin;
+mod ops_json;
pub mod plugin_api;
mod resources;
mod runtime;
@@ -58,8 +60,6 @@ pub use crate::modules::ModuleSourceFuture;
pub use crate::modules::NoopModuleLoader;
pub use crate::modules::RecursiveModuleLoad;
pub use crate::normalize_path::normalize_path;
-pub use crate::ops::json_op_async;
-pub use crate::ops::json_op_sync;
pub use crate::ops::op_close;
pub use crate::ops::op_resources;
pub use crate::ops::Op;
@@ -68,6 +68,11 @@ pub use crate::ops::OpFn;
pub use crate::ops::OpId;
pub use crate::ops::OpState;
pub use crate::ops::OpTable;
+pub use crate::ops_bin::bin_op_async;
+pub use crate::ops_bin::bin_op_sync;
+pub use crate::ops_bin::ValueOrVector;
+pub use crate::ops_json::json_op_async;
+pub use crate::ops_json::json_op_sync;
pub use crate::resources::Resource;
pub use crate::resources::ResourceId;
pub use crate::resources::ResourceTable;
diff --git a/core/ops.rs b/core/ops.rs
index eceab7feb..212a713ad 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -10,13 +10,10 @@ use crate::BufVec;
use crate::ZeroCopyBuf;
use futures::Future;
use indexmap::IndexMap;
-use serde::de::DeserializeOwned;
-use serde::Serialize;
use serde_json::json;
use serde_json::Value;
use std::cell::RefCell;
use std::collections::HashMap;
-use std::convert::TryInto;
use std::iter::once;
use std::ops::Deref;
use std::ops::DerefMut;
@@ -117,125 +114,6 @@ impl Default for OpTable {
}
}
-/// Creates an op that passes data synchronously using JSON.
-///
-/// The provided function `op_fn` has the following parameters:
-/// * `&mut OpState`: the op state, can be used to read/write resources in the runtime from an op.
-/// * `V`: the deserializable value that is passed to the Rust function.
-/// * `&mut [ZeroCopyBuf]`: raw bytes passed along, usually not needed if the JSON value is used.
-///
-/// `op_fn` returns a serializable value, which is directly returned to JavaScript.
-///
-/// When registering an op like this...
-/// ```ignore
-/// let mut runtime = JsRuntime::new(...);
-/// runtime.register_op("hello", deno_core::json_op_sync(Self::hello_op));
-/// ```
-///
-/// ...it can be invoked from JS using the provided name, for example:
-/// ```js
-/// Deno.core.ops();
-/// let result = Deno.core.jsonOpSync("function_name", args);
-/// ```
-///
-/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization.
-/// A more complete example is available in the examples directory.
-pub fn json_op_sync<F, V, R>(op_fn: F) -> Box<OpFn>
-where
- F: Fn(&mut OpState, V, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static,
- V: DeserializeOwned,
- R: Serialize,
-{
- Box::new(move |state: Rc<RefCell<OpState>>, mut bufs: BufVec| -> Op {
- let result = serde_json::from_slice(&bufs[0])
- .map_err(AnyError::from)
- .and_then(|args| op_fn(&mut state.borrow_mut(), args, &mut bufs[1..]));
- let buf =
- json_serialize_op_result(None, result, state.borrow().get_error_class_fn);
- Op::Sync(buf)
- })
-}
-
-/// Creates an op that passes data asynchronously using JSON.
-///
-/// The provided function `op_fn` has the following parameters:
-/// * `Rc<RefCell<OpState>`: the op state, can be used to read/write resources in the runtime from an op.
-/// * `V`: the deserializable value that is passed to the Rust function.
-/// * `BufVec`: raw bytes passed along, usually not needed if the JSON value is used.
-///
-/// `op_fn` returns a future, whose output is a serializable value. This value will be asynchronously
-/// returned to JavaScript.
-///
-/// When registering an op like this...
-/// ```ignore
-/// let mut runtime = JsRuntime::new(...);
-/// runtime.register_op("hello", deno_core::json_op_async(Self::hello_op));
-/// ```
-///
-/// ...it can be invoked from JS using the provided name, for example:
-/// ```js
-/// Deno.core.ops();
-/// let future = Deno.core.jsonOpAsync("function_name", args);
-/// ```
-///
-/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization.
-/// A more complete example is available in the examples directory.
-pub fn json_op_async<F, V, R, RV>(op_fn: F) -> Box<OpFn>
-where
- F: Fn(Rc<RefCell<OpState>>, V, BufVec) -> R + 'static,
- V: DeserializeOwned,
- R: Future<Output = Result<RV, AnyError>> + 'static,
- RV: Serialize,
-{
- let try_dispatch_op =
- move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Result<Op, AnyError> {
- let promise_id = bufs[0]
- .get(0..8)
- .map(|b| u64::from_be_bytes(b.try_into().unwrap()))
- .ok_or_else(|| type_error("missing or invalid `promiseId`"))?;
- let args = serde_json::from_slice(&bufs[0][8..])?;
- let bufs = bufs[1..].into();
- use crate::futures::FutureExt;
- let fut = op_fn(state.clone(), args, bufs).map(move |result| {
- json_serialize_op_result(
- Some(promise_id),
- result,
- state.borrow().get_error_class_fn,
- )
- });
- Ok(Op::Async(Box::pin(fut)))
- };
-
- Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
- match try_dispatch_op(state.clone(), bufs) {
- Ok(op) => op,
- Err(err) => Op::Sync(json_serialize_op_result(
- None,
- Err::<(), AnyError>(err),
- state.borrow().get_error_class_fn,
- )),
- }
- })
-}
-
-fn json_serialize_op_result<R: Serialize>(
- promise_id: Option<u64>,
- result: Result<R, AnyError>,
- get_error_class_fn: crate::runtime::GetErrorClassFn,
-) -> Box<[u8]> {
- let value = match result {
- Ok(v) => serde_json::json!({ "ok": v, "promiseId": promise_id }),
- Err(err) => serde_json::json!({
- "promiseId": promise_id ,
- "err": {
- "className": (get_error_class_fn)(&err),
- "message": err.to_string(),
- }
- }),
- };
- serde_json::to_vec(&value).unwrap().into_boxed_slice()
-}
-
/// Return map of resources with id as key
/// and string representation as value.
///
diff --git a/runtime/ops/ops_buffer.rs b/core/ops_bin.rs
index 6998144cf..053150bfd 100644
--- a/runtime/ops/ops_buffer.rs
+++ b/core/ops_bin.rs
@@ -1,12 +1,12 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-use deno_core::error::AnyError;
-use deno_core::futures::future::FutureExt;
-use deno_core::BufVec;
-use deno_core::Op;
-use deno_core::OpFn;
-use deno_core::OpState;
-use deno_core::ZeroCopyBuf;
+use crate::error::AnyError;
+use crate::futures::future::FutureExt;
+use crate::BufVec;
+use crate::Op;
+use crate::OpFn;
+use crate::OpState;
+use crate::ZeroCopyBuf;
use std::boxed::Box;
use std::cell::RefCell;
use std::convert::TryInto;
@@ -88,18 +88,18 @@ fn gen_padding_32bit(len: usize) -> &'static [u8] {
/// When registering an op like this...
/// ```ignore
/// let mut runtime = JsRuntime::new(...);
-/// runtime.register_op("hello", deno_core::buffer_op_sync(Self::hello_op));
+/// runtime.register_op("hello", deno_core::bin_op_sync(Self::hello_op));
/// ```
///
/// ...it can be invoked from JS using the provided name, for example:
/// ```js
/// Deno.core.ops();
-/// let result = Deno.core.bufferOpSync("function_name", args);
+/// let result = Deno.core.binOpSync("function_name", args);
/// ```
///
/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization.
/// A more complete example is available in the examples directory.
-pub fn buffer_op_sync<F, R>(op_fn: F) -> Box<OpFn>
+pub fn bin_op_sync<F, R>(op_fn: F) -> Box<OpFn>
where
F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static,
R: ValueOrVector,
@@ -202,7 +202,7 @@ where
///
/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization.
/// A more complete example is available in the examples directory.
-pub fn buffer_op_async<F, R, RV>(op_fn: F) -> Box<OpFn>
+pub fn bin_op_async<F, R, RV>(op_fn: F) -> Box<OpFn>
where
F: Fn(Rc<RefCell<OpState>>, u32, BufVec) -> R + 'static,
R: Future<Output = Result<RV, AnyError>> + 'static,
diff --git a/core/ops_json.rs b/core/ops_json.rs
new file mode 100644
index 000000000..0ef91ed33
--- /dev/null
+++ b/core/ops_json.rs
@@ -0,0 +1,134 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+use crate::error::type_error;
+use crate::error::AnyError;
+use crate::BufVec;
+use crate::Op;
+use crate::OpFn;
+use crate::OpState;
+use crate::ZeroCopyBuf;
+use serde::de::DeserializeOwned;
+use serde::Serialize;
+use std::cell::RefCell;
+use std::convert::TryInto;
+use std::future::Future;
+use std::rc::Rc;
+
+fn json_serialize_op_result<R: Serialize>(
+ request_id: Option<u64>,
+ result: Result<R, AnyError>,
+ get_error_class_fn: crate::runtime::GetErrorClassFn,
+) -> Box<[u8]> {
+ let value = match result {
+ Ok(v) => serde_json::json!({ "ok": v, "requestId": request_id }),
+ Err(err) => serde_json::json!({
+ "requestId": request_id,
+ "err": {
+ "className": (get_error_class_fn)(&err),
+ "message": err.to_string(),
+ }
+ }),
+ };
+ serde_json::to_vec(&value).unwrap().into_boxed_slice()
+}
+
+/// Creates an op that passes data synchronously using JSON.
+///
+/// The provided function `op_fn` has the following parameters:
+/// * `&mut OpState`: the op state, can be used to read/write resources in the runtime from an op.
+/// * `V`: the deserializable value that is passed to the Rust function.
+/// * `&mut [ZeroCopyBuf]`: raw bytes passed along, usually not needed if the JSON value is used.
+///
+/// `op_fn` returns a serializable value, which is directly returned to JavaScript.
+///
+/// When registering an op like this...
+/// ```ignore
+/// let mut runtime = JsRuntime::new(...);
+/// runtime.register_op("hello", deno_core::json_op_sync(Self::hello_op));
+/// ```
+///
+/// ...it can be invoked from JS using the provided name, for example:
+/// ```js
+/// Deno.core.ops();
+/// let result = Deno.core.jsonOpSync("function_name", args);
+/// ```
+///
+/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization.
+/// A more complete example is available in the examples directory.
+pub fn json_op_sync<F, V, R>(op_fn: F) -> Box<OpFn>
+where
+ F: Fn(&mut OpState, V, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static,
+ V: DeserializeOwned,
+ R: Serialize,
+{
+ Box::new(move |state: Rc<RefCell<OpState>>, mut bufs: BufVec| -> Op {
+ let result = serde_json::from_slice(&bufs[0])
+ .map_err(AnyError::from)
+ .and_then(|args| op_fn(&mut state.borrow_mut(), args, &mut bufs[1..]));
+ let buf =
+ json_serialize_op_result(None, result, state.borrow().get_error_class_fn);
+ Op::Sync(buf)
+ })
+}
+
+/// Creates an op that passes data asynchronously using JSON.
+///
+/// The provided function `op_fn` has the following parameters:
+/// * `Rc<RefCell<OpState>`: the op state, can be used to read/write resources in the runtime from an op.
+/// * `V`: the deserializable value that is passed to the Rust function.
+/// * `BufVec`: raw bytes passed along, usually not needed if the JSON value is used.
+///
+/// `op_fn` returns a future, whose output is a serializable value. This value will be asynchronously
+/// returned to JavaScript.
+///
+/// When registering an op like this...
+/// ```ignore
+/// let mut runtime = JsRuntime::new(...);
+/// runtime.register_op("hello", deno_core::json_op_async(Self::hello_op));
+/// ```
+///
+/// ...it can be invoked from JS using the provided name, for example:
+/// ```js
+/// Deno.core.ops();
+/// let future = Deno.core.jsonOpAsync("function_name", args);
+/// ```
+///
+/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization.
+/// A more complete example is available in the examples directory.
+pub fn json_op_async<F, V, R, RV>(op_fn: F) -> Box<OpFn>
+where
+ F: Fn(Rc<RefCell<OpState>>, V, BufVec) -> R + 'static,
+ V: DeserializeOwned,
+ R: Future<Output = Result<RV, AnyError>> + 'static,
+ RV: Serialize,
+{
+ let try_dispatch_op =
+ move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Result<Op, AnyError> {
+ let request_id = bufs[0]
+ .get(0..8)
+ .map(|b| u64::from_le_bytes(b.try_into().unwrap()))
+ .ok_or_else(|| type_error("missing or invalid `requestId`"))?;
+ let args = serde_json::from_slice(&bufs[0][8..])?;
+ let bufs = bufs[1..].into();
+ use crate::futures::FutureExt;
+ let fut = op_fn(state.clone(), args, bufs).map(move |result| {
+ json_serialize_op_result(
+ Some(request_id),
+ result,
+ state.borrow().get_error_class_fn,
+ )
+ });
+ Ok(Op::Async(Box::pin(fut)))
+ };
+
+ Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
+ match try_dispatch_op(state.clone(), bufs) {
+ Ok(op) => op,
+ Err(err) => Op::Sync(json_serialize_op_result(
+ None,
+ Err::<(), AnyError>(err),
+ state.borrow().get_error_class_fn,
+ )),
+ }
+ })
+}
diff --git a/runtime/js/10_dispatch_buffer.js b/runtime/js/10_dispatch_buffer.js
deleted file mode 100644
index 091fce504..000000000
--- a/runtime/js/10_dispatch_buffer.js
+++ /dev/null
@@ -1,150 +0,0 @@
-// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-"use strict";
-
-((window) => {
- const core = window.Deno.core;
-
- function assert(cond) {
- if (!cond) {
- throw Error("assert");
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////
- ////////////////////////////// General async handling //////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- // General Async response handling
- let nextRequestId = 1;
- const promiseTable = {};
-
- function opAsync(opName, opRequestBuilder, opResultParser) {
- // Make sure requests of this type are handled by the asyncHandler
- // The asyncHandler's role is to call the "promiseTable[requestId]" function
- core.setAsyncHandlerByName(opName, (bufUi8, _) => {
- const [requestId, result, error] = opResultParser(bufUi8, true);
- if (error !== null) {
- promiseTable[requestId][1](error);
- } else {
- promiseTable[requestId][0](result);
- }
- delete promiseTable[requestId];
- });
-
- const requestId = nextRequestId++;
-
- // Create and store promise
- const promise = new Promise((resolve, reject) => {
- promiseTable[requestId] = [resolve, reject];
- });
-
- // Synchronously dispatch async request
- core.dispatchByName(opName, ...opRequestBuilder(requestId));
-
- // Wait for async response
- return promise;
- }
-
- function opSync(opName, opRequestBuilder, opResultParser) {
- const rawResult = core.dispatchByName(opName, ...opRequestBuilder());
-
- const [_, result, error] = opResultParser(rawResult, false);
- if (error !== null) throw error;
- return result;
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////
- /////////////////////////////////// Error handling /////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- function handleError(className, message) {
- const [ErrorClass, args] = core.getErrorClassAndArgs(className);
- if (!ErrorClass) {
- return new Error(
- `Unregistered error class: "${className}"\n` +
- ` ${message}\n` +
- ` Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
- );
- }
- return new ErrorClass(message, ...args);
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////
- ///////////////////////////////// Buffer ops handling //////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- const scratchBytes = new ArrayBuffer(3 * 4);
- const scratchView = new DataView(
- scratchBytes,
- scratchBytes.byteOffset,
- scratchBytes.byteLength,
- );
-
- function bufferOpBuildRequest(requestId, argument, zeroCopy) {
- scratchView.setBigUint64(0, BigInt(requestId), true);
- scratchView.setUint32(8, argument, true);
- return [scratchView, ...zeroCopy];
- }
-
- function bufferOpParseResult(bufUi8, isCopyNeeded) {
- // Decode header value from ui8 buffer
- const headerByteLength = 4 * 4;
- assert(bufUi8.byteLength >= headerByteLength);
- assert(bufUi8.byteLength % 4 == 0);
- const view = new DataView(
- bufUi8.buffer,
- bufUi8.byteOffset + bufUi8.byteLength - headerByteLength,
- headerByteLength,
- );
-
- const requestId = Number(view.getBigUint64(0, true));
- const status = view.getUint32(8, true);
- const result = view.getUint32(12, true);
-
- // Error handling
- if (status !== 0) {
- const className = core.decode(bufUi8.subarray(0, result));
- const message = core.decode(bufUi8.subarray(result, -headerByteLength))
- .trim();
-
- return [requestId, null, handleError(className, message)];
- }
-
- if (bufUi8.byteLength === headerByteLength) {
- return [requestId, result, null];
- }
-
- // Rest of response buffer is passed as reference or as a copy
- let respBuffer = null;
- if (isCopyNeeded) {
- // Copy part of the response array (if sent through shared array buf)
- respBuffer = bufUi8.slice(0, result);
- } else {
- // Create view on existing array (if sent through overflow)
- respBuffer = bufUi8.subarray(0, result);
- }
-
- return [requestId, respBuffer, null];
- }
-
- function bufferOpAsync(opName, argument = 0, ...zeroCopy) {
- return opAsync(
- opName,
- (requestId) => bufferOpBuildRequest(requestId, argument, zeroCopy),
- bufferOpParseResult,
- );
- }
-
- function bufferOpSync(opName, argument = 0, ...zeroCopy) {
- return opSync(
- opName,
- () => bufferOpBuildRequest(0, argument, zeroCopy),
- bufferOpParseResult,
- );
- }
-
- window.__bootstrap.dispatchBuffer = {
- bufferOpSync,
- bufferOpAsync,
- };
-})(this);
diff --git a/runtime/js/11_timers.js b/runtime/js/11_timers.js
index f07622388..7a0307c06 100644
--- a/runtime/js/11_timers.js
+++ b/runtime/js/11_timers.js
@@ -4,7 +4,6 @@
((window) => {
const assert = window.__bootstrap.util.assert;
const core = window.Deno.core;
- const { bufferOpSync } = window.__bootstrap.dispatchBuffer;
function opStopGlobalTimer() {
core.jsonOpSync("op_global_timer_stop");
@@ -20,7 +19,7 @@
const nowBytes = new Uint8Array(8);
function opNow() {
- bufferOpSync("op_now", 0, nowBytes);
+ core.binOpSync("op_now", 0, nowBytes);
return new DataView(nowBytes.buffer).getFloat64();
}
diff --git a/runtime/js/12_io.js b/runtime/js/12_io.js
index 09e87f990..fe815c7ed 100644
--- a/runtime/js/12_io.js
+++ b/runtime/js/12_io.js
@@ -6,8 +6,8 @@
"use strict";
((window) => {
+ const core = window.Deno.core;
const DEFAULT_BUFFER_SIZE = 32 * 1024;
- const { bufferOpSync, bufferOpAsync } = window.__bootstrap.dispatchBuffer;
// Seek whence values.
// https://golang.org/pkg/io/#pkg-constants
const SeekMode = {
@@ -81,7 +81,7 @@
return 0;
}
- const nread = bufferOpSync("op_read_sync", rid, buffer);
+ const nread = core.binOpSync("op_read_sync", rid, buffer);
if (nread < 0) {
throw new Error("read error");
}
@@ -97,7 +97,7 @@
return 0;
}
- const nread = await bufferOpAsync("op_read_async", rid, buffer);
+ const nread = await core.binOpAsync("op_read_async", rid, buffer);
if (nread < 0) {
throw new Error("read error");
}
@@ -106,7 +106,7 @@
}
function writeSync(rid, data) {
- const result = bufferOpSync("op_write_sync", rid, data);
+ const result = core.binOpSync("op_write_sync", rid, data);
if (result < 0) {
throw new Error("write error");
}
@@ -115,7 +115,7 @@
}
async function write(rid, data) {
- const result = await bufferOpAsync("op_write_async", rid, data);
+ const result = await core.binOpAsync("op_write_async", rid, data);
if (result < 0) {
throw new Error("write error");
}
diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs
index e1520b2c5..1260452b6 100644
--- a/runtime/ops/io.rs
+++ b/runtime/ops/io.rs
@@ -99,11 +99,11 @@ lazy_static! {
}
pub fn init(rt: &mut JsRuntime) {
- super::reg_buffer_async(rt, "op_read_async", op_read_async);
- super::reg_buffer_async(rt, "op_write_async", op_write_async);
+ super::reg_bin_async(rt, "op_read_async", op_read_async);
+ super::reg_bin_async(rt, "op_write_async", op_write_async);
- super::reg_buffer_sync(rt, "op_read_sync", op_read_sync);
- super::reg_buffer_sync(rt, "op_write_sync", op_write_sync);
+ super::reg_bin_sync(rt, "op_read_sync", op_read_sync);
+ super::reg_bin_sync(rt, "op_write_sync", op_write_sync);
super::reg_json_async(rt, "op_shutdown", op_shutdown);
}
diff --git a/runtime/ops/mod.rs b/runtime/ops/mod.rs
index e082c5d3a..2e94d99f5 100644
--- a/runtime/ops/mod.rs
+++ b/runtime/ops/mod.rs
@@ -8,7 +8,6 @@ pub mod io;
pub mod net;
#[cfg(unix)]
mod net_unix;
-mod ops_buffer;
pub mod os;
pub mod permissions;
pub mod plugin;
@@ -25,6 +24,8 @@ pub mod websocket;
pub mod worker_host;
use crate::metrics::metrics_op;
+use deno_core::bin_op_async;
+use deno_core::bin_op_sync;
use deno_core::error::AnyError;
use deno_core::json_op_async;
use deno_core::json_op_sync;
@@ -33,10 +34,8 @@ use deno_core::serde::Serialize;
use deno_core::BufVec;
use deno_core::JsRuntime;
use deno_core::OpState;
+use deno_core::ValueOrVector;
use deno_core::ZeroCopyBuf;
-use ops_buffer::buffer_op_async;
-use ops_buffer::buffer_op_sync;
-use ops_buffer::ValueOrVector;
use std::cell::RefCell;
use std::future::Future;
use std::rc::Rc;
@@ -63,24 +62,21 @@ where
rt.register_op(name, metrics_op(name, json_op_sync(op_fn)));
}
-pub fn reg_buffer_async<F, R, RV>(
- rt: &mut JsRuntime,
- name: &'static str,
- op_fn: F,
-) where
+pub fn reg_bin_async<F, R, RV>(rt: &mut JsRuntime, name: &'static str, op_fn: F)
+where
F: Fn(Rc<RefCell<OpState>>, u32, BufVec) -> R + 'static,
R: Future<Output = Result<RV, AnyError>> + 'static,
RV: ValueOrVector,
{
- rt.register_op(name, metrics_op(name, buffer_op_async(op_fn)));
+ rt.register_op(name, metrics_op(name, bin_op_async(op_fn)));
}
-pub fn reg_buffer_sync<F, R>(rt: &mut JsRuntime, name: &'static str, op_fn: F)
+pub fn reg_bin_sync<F, R>(rt: &mut JsRuntime, name: &'static str, op_fn: F)
where
F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static,
R: ValueOrVector,
{
- rt.register_op(name, metrics_op(name, buffer_op_sync(op_fn)));
+ rt.register_op(name, metrics_op(name, bin_op_sync(op_fn)));
}
/// `UnstableChecker` is a struct so it can be placed inside `GothamState`;
diff --git a/runtime/ops/timers.rs b/runtime/ops/timers.rs
index 445b7366c..4395b4885 100644
--- a/runtime/ops/timers.rs
+++ b/runtime/ops/timers.rs
@@ -77,7 +77,7 @@ pub fn init(rt: &mut deno_core::JsRuntime) {
super::reg_json_sync(rt, "op_global_timer_stop", op_global_timer_stop);
super::reg_json_sync(rt, "op_global_timer_start", op_global_timer_start);
super::reg_json_async(rt, "op_global_timer", op_global_timer);
- super::reg_buffer_sync(rt, "op_now", op_now);
+ super::reg_bin_sync(rt, "op_now", op_now);
super::reg_json_sync(rt, "op_sleep_sync", op_sleep_sync);
}