From 29e3f4cd3a42415d73b371f87a6efc787331de86 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Fri, 21 Aug 2020 17:14:47 +0200 Subject: Split core http benchmark into 'bin_ops' and 'json_ops' variants (#7147) --- core/examples/http_bench.js | 73 --------- core/examples/http_bench.rs | 178 ---------------------- core/examples/http_bench_bin_ops.js | 145 ++++++++++++++++++ core/examples/http_bench_bin_ops.rs | 284 +++++++++++++++++++++++++++++++++++ core/examples/http_bench_json_ops.js | 73 +++++++++ core/examples/http_bench_json_ops.rs | 178 ++++++++++++++++++++++ 6 files changed, 680 insertions(+), 251 deletions(-) delete mode 100644 core/examples/http_bench.js delete mode 100644 core/examples/http_bench.rs create mode 100644 core/examples/http_bench_bin_ops.js create mode 100644 core/examples/http_bench_bin_ops.rs create mode 100644 core/examples/http_bench_json_ops.js create mode 100644 core/examples/http_bench_json_ops.rs (limited to 'core/examples') diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js deleted file mode 100644 index de6b5a9a4..000000000 --- a/core/examples/http_bench.js +++ /dev/null @@ -1,73 +0,0 @@ -// This is not a real HTTP server. We read blindly one time into 'requestBuf', -// then write this fixed 'responseBuf'. The point of this benchmark is to -// exercise the event loop in a simple yet semi-realistic way. -const requestBuf = new Uint8Array(64 * 1024); -const responseBuf = new Uint8Array( - "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n" - .split("") - .map((c) => c.charCodeAt(0)), -); - -/** Listens on 0.0.0.0:4500, returns rid. */ -function listen() { - const { rid } = Deno.core.jsonOpSync("listen", {}); - return rid; -} - -/** Accepts a connection, returns rid. */ -async function accept(serverRid) { - const { rid } = await Deno.core.jsonOpAsync("accept", { rid: serverRid }); - return rid; -} - -/** - * Reads a packet from the rid, presumably an http request. data is ignored. - * Returns bytes read. - */ -async function read(rid, data) { - const { nread } = await Deno.core.jsonOpAsync("read", { rid }, data); - return nread; -} - -/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ -async function write(rid, data) { - const { nwritten } = await Deno.core.jsonOpAsync("write", { rid }, data); - return nwritten; -} - -function close(rid) { - Deno.core.jsonOpSync("close", { rid }); -} - -async function serve(rid) { - while (true) { - const nread = await read(rid, requestBuf); - if (nread <= 0) { - break; - } - - const nwritten = await write(rid, responseBuf); - if (nwritten < 0) { - break; - } - } - close(rid); -} - -async function main() { - Deno.core.ops(); - - const listenerRid = listen(); - Deno.core.print(`listening http://127.0.0.1:4544/ rid=${listenerRid}\n`); - - for (;;) { - const rid = await accept(listenerRid); - if (rid < 0) { - Deno.core.print(`accept error ${rid}`); - return; - } - serve(rid); - } -} - -main(); diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs deleted file mode 100644 index bf9d8d137..000000000 --- a/core/examples/http_bench.rs +++ /dev/null @@ -1,178 +0,0 @@ -#[macro_use] -extern crate log; - -use deno_core::serde_json; -use deno_core::CoreIsolate; -use deno_core::CoreIsolateState; -use deno_core::ErrBox; -use deno_core::Script; -use deno_core::StartupData; -use deno_core::ZeroCopyBuf; -use futures::future::poll_fn; -use futures::future::Future; -use std::env; -use std::io::Error; -use std::io::ErrorKind; -use std::net::SocketAddr; -use std::pin::Pin; -use std::task::Poll; -use tokio::io::AsyncRead; -use tokio::io::AsyncWrite; -use tokio::net::TcpListener; -use tokio::net::TcpStream; - -struct Logger; - -impl log::Log for Logger { - fn enabled(&self, metadata: &log::Metadata) -> bool { - metadata.level() <= log::max_level() - } - - fn log(&self, record: &log::Record) { - if self.enabled(record.metadata()) { - println!("{} - {}", record.level(), record.args()); - } - } - - fn flush(&self) {} -} - -pub fn isolate_new() -> CoreIsolate { - let startup_data = StartupData::Script(Script { - source: include_str!("http_bench.js"), - filename: "http_bench.js", - }); - - let mut isolate = CoreIsolate::new(startup_data, false); - - isolate.register_op_json_sync("listen", op_listen); - isolate.register_op_json_async("accept", op_accept); - isolate.register_op_json_async("read", op_read); - isolate.register_op_json_async("write", op_write); - isolate.register_op_json_sync("close", op_close); - - isolate -} - -fn op_close( - state: &mut CoreIsolateState, - args: serde_json::Value, - _buf: &mut [ZeroCopyBuf], -) -> Result { - let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; - debug!("close rid={}", rid); - - let resource_table = &mut state.resource_table.borrow_mut(); - resource_table - .close(rid) - .map(|_| serde_json::json!(())) - .ok_or_else(bad_resource) -} - -fn op_listen( - state: &mut CoreIsolateState, - _args: serde_json::Value, - _buf: &mut [ZeroCopyBuf], -) -> Result { - debug!("listen"); - let addr = "127.0.0.1:4544".parse::().unwrap(); - let std_listener = std::net::TcpListener::bind(&addr)?; - let listener = TcpListener::from_std(std_listener)?; - let resource_table = &mut state.resource_table.borrow_mut(); - let rid = resource_table.add("tcpListener", Box::new(listener)); - Ok(serde_json::json!({ "rid": rid })) -} - -fn op_accept( - state: &mut CoreIsolateState, - args: serde_json::Value, - _buf: &mut [ZeroCopyBuf], -) -> impl Future> { - let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; - debug!("accept rid={}", rid); - - let resource_table = state.resource_table.clone(); - poll_fn(move |cx| { - let resource_table = &mut resource_table.borrow_mut(); - let listener = resource_table - .get_mut::(rid) - .ok_or_else(bad_resource)?; - listener.poll_accept(cx)?.map(|(stream, _addr)| { - let rid = resource_table.add("tcpStream", Box::new(stream)); - Ok(serde_json::json!({ "rid": rid })) - }) - }) -} - -fn op_read( - state: &mut CoreIsolateState, - args: serde_json::Value, - bufs: &mut [ZeroCopyBuf], -) -> impl Future> { - assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - - let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; - debug!("read rid={}", rid); - - let mut buf = bufs[0].clone(); - let resource_table = state.resource_table.clone(); - - poll_fn(move |cx| -> Poll> { - let resource_table = &mut resource_table.borrow_mut(); - let stream = resource_table - .get_mut::(rid) - .ok_or_else(bad_resource)?; - Pin::new(stream) - .poll_read(cx, &mut buf)? - .map(|nread| Ok(serde_json::json!({ "nread": nread }))) - }) -} - -fn op_write( - state: &mut CoreIsolateState, - args: serde_json::Value, - bufs: &mut [ZeroCopyBuf], -) -> impl Future> { - assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - - let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; - debug!("write rid={}", rid); - - let buf = bufs[0].clone(); - let resource_table = state.resource_table.clone(); - - poll_fn(move |cx| { - let resource_table = &mut resource_table.borrow_mut(); - let stream = resource_table - .get_mut::(rid) - .ok_or_else(bad_resource)?; - Pin::new(stream) - .poll_write(cx, &buf)? - .map(|nwritten| Ok(serde_json::json!({ "nwritten": nwritten }))) - }) -} - -fn bad_resource() -> ErrBox { - Error::new(ErrorKind::NotFound, "bad resource id").into() -} - -fn main() { - log::set_logger(&Logger).unwrap(); - log::set_max_level( - env::args() - .find(|a| a == "-D") - .map(|_| log::LevelFilter::Debug) - .unwrap_or(log::LevelFilter::Warn), - ); - - // NOTE: `--help` arg will display V8 help and exit - deno_core::v8_set_flags(env::args().collect()); - - let isolate = isolate_new(); - let mut runtime = tokio::runtime::Builder::new() - .basic_scheduler() - .enable_all() - .build() - .unwrap(); - deno_core::js_check(runtime.block_on(isolate)); -} diff --git a/core/examples/http_bench_bin_ops.js b/core/examples/http_bench_bin_ops.js new file mode 100644 index 000000000..b4d7add22 --- /dev/null +++ b/core/examples/http_bench_bin_ops.js @@ -0,0 +1,145 @@ +// This is not a real HTTP server. We read blindly one time into 'requestBuf', +// then write this fixed 'responseBuf'. The point of this benchmark is to +// exercise the event loop in a simple yet semi-realistic way. +const requestBuf = new Uint8Array(64 * 1024); +const responseBuf = new Uint8Array( + "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n" + .split("") + .map((c) => c.charCodeAt(0)), +); +const promiseMap = new Map(); +let nextPromiseId = 1; + +function assert(cond) { + if (!cond) { + throw Error("assert"); + } +} + +function createResolvable() { + let resolve; + let reject; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + promise.resolve = resolve; + promise.reject = reject; + return promise; +} + +const scratch32 = new Int32Array(3); +const scratchBytes = new Uint8Array( + scratch32.buffer, + scratch32.byteOffset, + scratch32.byteLength, +); +assert(scratchBytes.byteLength === 3 * 4); + +function send(promiseId, opId, rid, ...zeroCopy) { + scratch32[0] = promiseId; + scratch32[1] = rid; + scratch32[2] = -1; + return Deno.core.dispatch(opId, scratchBytes, ...zeroCopy); +} + +/** Returns Promise */ +function sendAsync(opId, rid, ...zeroCopy) { + const promiseId = nextPromiseId++; + const p = createResolvable(); + const buf = send(promiseId, opId, rid, ...zeroCopy); + if (buf) { + const record = recordFromBuf(buf); + // Sync result. + p.resolve(record.result); + } else { + // Async result. + promiseMap.set(promiseId, p); + } + return p; +} + +/** Returns i32 number */ +function sendSync(opId, rid) { + const buf = send(0, opId, rid); + const record = recordFromBuf(buf); + return record[2]; +} + +function recordFromBuf(buf) { + assert(buf.byteLength === 3 * 4); + return new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4); +} + +function handleAsyncMsgFromRust(buf) { + const record = recordFromBuf(buf); + const p = promiseMap.get(record[0]); + promiseMap.delete(record[0]); + p.resolve(record[2]); +} + +/** Listens on 0.0.0.0:4500, returns rid. */ +function listen() { + return sendSync(ops["listen"], -1); +} + +/** Accepts a connection, returns rid. */ +function accept(rid) { + return sendAsync(ops["accept"], rid); +} + +/** + * Reads a packet from the rid, presumably an http request. data is ignored. + * Returns bytes read. + */ +function read(rid, data) { + return sendAsync(ops["read"], rid, data); +} + +/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ +function write(rid, data) { + return sendAsync(ops["write"], rid, data); +} + +function close(rid) { + return sendSync(ops["close"], rid); +} + +async function serve(rid) { + while (true) { + const nread = await read(rid, requestBuf); + if (nread <= 0) { + break; + } + + const nwritten = await write(rid, responseBuf); + if (nwritten < 0) { + break; + } + } + close(rid); +} + +let ops; + +async function main() { + ops = Deno.core.ops(); + for (const opName in ops) { + Deno.core.setAsyncHandler(ops[opName], handleAsyncMsgFromRust); + } + + const listenerRid = listen(); + Deno.core.print(`http_bench_bin_ops listening on http://127.0.0.1:4544/\n`); + + for (;;) { + const rid = await accept(listenerRid); + // Deno.core.print(`accepted ${rid}`); + if (rid < 0) { + Deno.core.print(`accept error ${rid}`); + return; + } + serve(rid); + } +} + +main(); diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs new file mode 100644 index 000000000..366779e8c --- /dev/null +++ b/core/examples/http_bench_bin_ops.rs @@ -0,0 +1,284 @@ +#[macro_use] +extern crate log; + +use deno_core::CoreIsolate; +use deno_core::CoreIsolateState; +use deno_core::Op; +use deno_core::ResourceTable; +use deno_core::Script; +use deno_core::StartupData; +use deno_core::ZeroCopyBuf; +use futures::future::poll_fn; +use futures::future::FutureExt; +use futures::future::TryFuture; +use futures::future::TryFutureExt; +use std::cell::RefCell; +use std::convert::TryInto; +use std::env; +use std::fmt::Debug; +use std::io::Error; +use std::io::ErrorKind; +use std::mem::size_of; +use std::net::SocketAddr; +use std::pin::Pin; +use std::ptr; +use std::rc::Rc; +use tokio::io::AsyncRead; +use tokio::io::AsyncWrite; +use tokio::net::TcpListener; +use tokio::net::TcpStream; + +struct Logger; + +impl log::Log for Logger { + fn enabled(&self, metadata: &log::Metadata) -> bool { + metadata.level() <= log::max_level() + } + + fn log(&self, record: &log::Record) { + if self.enabled(record.metadata()) { + println!("{} - {}", record.level(), record.args()); + } + } + + fn flush(&self) {} +} + +#[derive(Copy, Clone, Debug, PartialEq)] +struct Record { + pub promise_id: u32, + pub rid: u32, + pub result: i32, +} + +type RecordBuf = [u8; size_of::()]; + +impl From<&[u8]> for Record { + fn from(buf: &[u8]) -> Self { + assert_eq!(buf.len(), size_of::()); + unsafe { *(buf as *const _ as *const RecordBuf) }.into() + } +} + +impl From for Record { + fn from(buf: RecordBuf) -> Self { + unsafe { + #[allow(clippy::cast_ptr_alignment)] + ptr::read_unaligned(&buf as *const _ as *const Self) + } + } +} + +impl From for RecordBuf { + fn from(record: Record) -> Self { + unsafe { ptr::read(&record as *const _ as *const Self) } + } +} + +pub fn isolate_new() -> CoreIsolate { + let startup_data = StartupData::Script(Script { + source: include_str!("http_bench_bin_ops.js"), + filename: "http_bench_bin_ops.js", + }); + + let mut isolate = CoreIsolate::new(startup_data, false); + + fn register_sync_op( + isolate: &mut CoreIsolate, + name: &'static str, + handler: F, + ) where + F: 'static + + Fn( + Rc>, + u32, + &mut [ZeroCopyBuf], + ) -> Result, + { + let core_handler = move |state: &mut CoreIsolateState, + zero_copy_bufs: &mut [ZeroCopyBuf]| + -> Op { + assert!(!zero_copy_bufs.is_empty()); + let record = Record::from(zero_copy_bufs[0].as_ref()); + let is_sync = record.promise_id == 0; + assert!(is_sync); + + let resource_table = state.resource_table.clone(); + let result: i32 = + match handler(resource_table, record.rid, &mut zero_copy_bufs[1..]) { + Ok(r) => r as i32, + Err(_) => -1, + }; + let buf = RecordBuf::from(Record { result, ..record })[..].into(); + Op::Sync(buf) + }; + + isolate.register_op(name, core_handler); + } + + fn register_async_op( + isolate: &mut CoreIsolate, + name: &'static str, + handler: impl Fn(Rc>, u32, &mut [ZeroCopyBuf]) -> F + + Copy + + 'static, + ) where + F: TryFuture, + F::Ok: TryInto, + >::Error: Debug, + { + let core_handler = move |state: &mut CoreIsolateState, + zero_copy_bufs: &mut [ZeroCopyBuf]| + -> Op { + assert!(!zero_copy_bufs.is_empty()); + let record = Record::from(zero_copy_bufs[0].as_ref()); + let is_sync = record.promise_id == 0; + assert!(!is_sync); + + let mut zero_copy = zero_copy_bufs[1..].to_vec(); + let resource_table = state.resource_table.clone(); + let fut = async move { + let op = handler(resource_table, record.rid, &mut zero_copy); + let result = op + .map_ok(|r| r.try_into().expect("op result does not fit in i32")) + .unwrap_or_else(|_| -1) + .await; + RecordBuf::from(Record { result, ..record })[..].into() + }; + + Op::Async(fut.boxed_local()) + }; + + isolate.register_op(name, core_handler); + } + + register_sync_op(&mut isolate, "listen", op_listen); + register_async_op(&mut isolate, "accept", op_accept); + register_async_op(&mut isolate, "read", op_read); + register_async_op(&mut isolate, "write", op_write); + register_sync_op(&mut isolate, "close", op_close); + + isolate +} + +fn op_close( + resource_table: Rc>, + rid: u32, + _buf: &mut [ZeroCopyBuf], +) -> Result { + debug!("close rid={}", rid); + let resource_table = &mut resource_table.borrow_mut(); + resource_table + .close(rid) + .map(|_| 0) + .ok_or_else(bad_resource) +} + +fn op_listen( + resource_table: Rc>, + _rid: u32, + _buf: &mut [ZeroCopyBuf], +) -> Result { + debug!("listen"); + let addr = "127.0.0.1:4544".parse::().unwrap(); + let std_listener = std::net::TcpListener::bind(&addr)?; + let listener = TcpListener::from_std(std_listener)?; + let resource_table = &mut resource_table.borrow_mut(); + let rid = resource_table.add("tcpListener", Box::new(listener)); + Ok(rid) +} + +fn op_accept( + resource_table: Rc>, + rid: u32, + _buf: &mut [ZeroCopyBuf], +) -> impl TryFuture { + debug!("accept rid={}", rid); + + poll_fn(move |cx| { + let resource_table = &mut resource_table.borrow_mut(); + let listener = resource_table + .get_mut::(rid) + .ok_or_else(bad_resource)?; + listener.poll_accept(cx).map_ok(|(stream, _addr)| { + resource_table.add("tcpStream", Box::new(stream)) + }) + }) +} + +fn op_read( + resource_table: Rc>, + rid: u32, + bufs: &mut [ZeroCopyBuf], +) -> impl TryFuture { + assert_eq!(bufs.len(), 1, "Invalid number of arguments"); + let mut buf = bufs[0].clone(); + + debug!("read rid={}", rid); + + poll_fn(move |cx| { + let resource_table = &mut resource_table.borrow_mut(); + let stream = resource_table + .get_mut::(rid) + .ok_or_else(bad_resource)?; + Pin::new(stream).poll_read(cx, &mut buf) + }) +} + +fn op_write( + resource_table: Rc>, + rid: u32, + bufs: &mut [ZeroCopyBuf], +) -> impl TryFuture { + assert_eq!(bufs.len(), 1, "Invalid number of arguments"); + let buf = bufs[0].clone(); + debug!("write rid={}", rid); + + poll_fn(move |cx| { + let resource_table = &mut resource_table.borrow_mut(); + let stream = resource_table + .get_mut::(rid) + .ok_or_else(bad_resource)?; + Pin::new(stream).poll_write(cx, &buf) + }) +} + +fn bad_resource() -> Error { + Error::new(ErrorKind::NotFound, "bad resource id") +} + +fn main() { + log::set_logger(&Logger).unwrap(); + log::set_max_level( + env::args() + .find(|a| a == "-D") + .map(|_| log::LevelFilter::Debug) + .unwrap_or(log::LevelFilter::Warn), + ); + + // NOTE: `--help` arg will display V8 help and exit + deno_core::v8_set_flags(env::args().collect()); + + let isolate = isolate_new(); + let mut runtime = tokio::runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap(); + runtime.block_on(isolate).expect("unexpected isolate error"); +} + +#[test] +fn test_record_from() { + let expected = Record { + promise_id: 1, + rid: 3, + result: 4, + }; + let buf = RecordBuf::from(expected); + if cfg!(target_endian = "little") { + assert_eq!(buf, [1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0]); + } + let actual = Record::from(buf); + assert_eq!(actual, expected); +} diff --git a/core/examples/http_bench_json_ops.js b/core/examples/http_bench_json_ops.js new file mode 100644 index 000000000..865b04d7e --- /dev/null +++ b/core/examples/http_bench_json_ops.js @@ -0,0 +1,73 @@ +// This is not a real HTTP server. We read blindly one time into 'requestBuf', +// then write this fixed 'responseBuf'. The point of this benchmark is to +// exercise the event loop in a simple yet semi-realistic way. +const requestBuf = new Uint8Array(64 * 1024); +const responseBuf = new Uint8Array( + "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n" + .split("") + .map((c) => c.charCodeAt(0)), +); + +/** Listens on 0.0.0.0:4500, returns rid. */ +function listen() { + const { rid } = Deno.core.jsonOpSync("listen", {}); + return rid; +} + +/** Accepts a connection, returns rid. */ +async function accept(serverRid) { + const { rid } = await Deno.core.jsonOpAsync("accept", { rid: serverRid }); + return rid; +} + +/** + * Reads a packet from the rid, presumably an http request. data is ignored. + * Returns bytes read. + */ +async function read(rid, data) { + const { nread } = await Deno.core.jsonOpAsync("read", { rid }, data); + return nread; +} + +/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ +async function write(rid, data) { + const { nwritten } = await Deno.core.jsonOpAsync("write", { rid }, data); + return nwritten; +} + +function close(rid) { + Deno.core.jsonOpSync("close", { rid }); +} + +async function serve(rid) { + while (true) { + const nread = await read(rid, requestBuf); + if (nread <= 0) { + break; + } + + const nwritten = await write(rid, responseBuf); + if (nwritten < 0) { + break; + } + } + close(rid); +} + +async function main() { + Deno.core.ops(); + + const listenerRid = listen(); + Deno.core.print(`http_bench_json_ops listening on http://127.0.0.1:4544/\n`); + + for (;;) { + const rid = await accept(listenerRid); + if (rid < 0) { + Deno.core.print(`accept error ${rid}`); + return; + } + serve(rid); + } +} + +main(); diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs new file mode 100644 index 000000000..f0fc5f94c --- /dev/null +++ b/core/examples/http_bench_json_ops.rs @@ -0,0 +1,178 @@ +#[macro_use] +extern crate log; + +use deno_core::serde_json; +use deno_core::CoreIsolate; +use deno_core::CoreIsolateState; +use deno_core::ErrBox; +use deno_core::Script; +use deno_core::StartupData; +use deno_core::ZeroCopyBuf; +use futures::future::poll_fn; +use futures::future::Future; +use std::env; +use std::io::Error; +use std::io::ErrorKind; +use std::net::SocketAddr; +use std::pin::Pin; +use std::task::Poll; +use tokio::io::AsyncRead; +use tokio::io::AsyncWrite; +use tokio::net::TcpListener; +use tokio::net::TcpStream; + +struct Logger; + +impl log::Log for Logger { + fn enabled(&self, metadata: &log::Metadata) -> bool { + metadata.level() <= log::max_level() + } + + fn log(&self, record: &log::Record) { + if self.enabled(record.metadata()) { + println!("{} - {}", record.level(), record.args()); + } + } + + fn flush(&self) {} +} + +pub fn isolate_new() -> CoreIsolate { + let startup_data = StartupData::Script(Script { + source: include_str!("http_bench_json_ops.js"), + filename: "http_bench_json_ops.js", + }); + + let mut isolate = CoreIsolate::new(startup_data, false); + + isolate.register_op_json_sync("listen", op_listen); + isolate.register_op_json_async("accept", op_accept); + isolate.register_op_json_async("read", op_read); + isolate.register_op_json_async("write", op_write); + isolate.register_op_json_sync("close", op_close); + + isolate +} + +fn op_close( + state: &mut CoreIsolateState, + args: serde_json::Value, + _buf: &mut [ZeroCopyBuf], +) -> Result { + let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; + debug!("close rid={}", rid); + + let resource_table = &mut state.resource_table.borrow_mut(); + resource_table + .close(rid) + .map(|_| serde_json::json!(())) + .ok_or_else(bad_resource) +} + +fn op_listen( + state: &mut CoreIsolateState, + _args: serde_json::Value, + _buf: &mut [ZeroCopyBuf], +) -> Result { + debug!("listen"); + let addr = "127.0.0.1:4544".parse::().unwrap(); + let std_listener = std::net::TcpListener::bind(&addr)?; + let listener = TcpListener::from_std(std_listener)?; + let resource_table = &mut state.resource_table.borrow_mut(); + let rid = resource_table.add("tcpListener", Box::new(listener)); + Ok(serde_json::json!({ "rid": rid })) +} + +fn op_accept( + state: &mut CoreIsolateState, + args: serde_json::Value, + _buf: &mut [ZeroCopyBuf], +) -> impl Future> { + let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; + debug!("accept rid={}", rid); + + let resource_table = state.resource_table.clone(); + poll_fn(move |cx| { + let resource_table = &mut resource_table.borrow_mut(); + let listener = resource_table + .get_mut::(rid) + .ok_or_else(bad_resource)?; + listener.poll_accept(cx)?.map(|(stream, _addr)| { + let rid = resource_table.add("tcpStream", Box::new(stream)); + Ok(serde_json::json!({ "rid": rid })) + }) + }) +} + +fn op_read( + state: &mut CoreIsolateState, + args: serde_json::Value, + bufs: &mut [ZeroCopyBuf], +) -> impl Future> { + assert_eq!(bufs.len(), 1, "Invalid number of arguments"); + + let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; + debug!("read rid={}", rid); + + let mut buf = bufs[0].clone(); + let resource_table = state.resource_table.clone(); + + poll_fn(move |cx| -> Poll> { + let resource_table = &mut resource_table.borrow_mut(); + let stream = resource_table + .get_mut::(rid) + .ok_or_else(bad_resource)?; + Pin::new(stream) + .poll_read(cx, &mut buf)? + .map(|nread| Ok(serde_json::json!({ "nread": nread }))) + }) +} + +fn op_write( + state: &mut CoreIsolateState, + args: serde_json::Value, + bufs: &mut [ZeroCopyBuf], +) -> impl Future> { + assert_eq!(bufs.len(), 1, "Invalid number of arguments"); + + let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; + debug!("write rid={}", rid); + + let buf = bufs[0].clone(); + let resource_table = state.resource_table.clone(); + + poll_fn(move |cx| { + let resource_table = &mut resource_table.borrow_mut(); + let stream = resource_table + .get_mut::(rid) + .ok_or_else(bad_resource)?; + Pin::new(stream) + .poll_write(cx, &buf)? + .map(|nwritten| Ok(serde_json::json!({ "nwritten": nwritten }))) + }) +} + +fn bad_resource() -> ErrBox { + Error::new(ErrorKind::NotFound, "bad resource id").into() +} + +fn main() { + log::set_logger(&Logger).unwrap(); + log::set_max_level( + env::args() + .find(|a| a == "-D") + .map(|_| log::LevelFilter::Debug) + .unwrap_or(log::LevelFilter::Warn), + ); + + // NOTE: `--help` arg will display V8 help and exit + deno_core::v8_set_flags(env::args().collect()); + + let isolate = isolate_new(); + let mut runtime = tokio::runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap(); + deno_core::js_check(runtime.block_on(isolate)); +} -- cgit v1.2.3