From 79a974229ad533bc8880a5ce067b4310002c9572 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Tue, 16 Apr 2019 17:53:43 -0400 Subject: Move deno_core_http_bench into examples dir (#2127) --- Cargo.lock | 1 + core/BUILD.gn | 4 +- core/Cargo.toml | 8 ++ core/examples/http_bench.js | 143 ++++++++++++++++++++ core/examples/http_bench.rs | 315 ++++++++++++++++++++++++++++++++++++++++++++ core/http_bench.js | 143 -------------------- core/http_bench.rs | 315 -------------------------------------------- 7 files changed, 469 insertions(+), 460 deletions(-) create mode 100644 core/examples/http_bench.js create mode 100644 core/examples/http_bench.rs delete mode 100644 core/http_bench.js delete mode 100644 core/http_bench.rs diff --git a/Cargo.lock b/Cargo.lock index a9c6a2298..c2ece7528 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -202,6 +202,7 @@ dependencies = [ "libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/core/BUILD.gn b/core/BUILD.gn index e74c26cca..821d5a46b 100644 --- a/core/BUILD.gn +++ b/core/BUILD.gn @@ -54,7 +54,7 @@ if (is_win) { } rust_executable("deno_core_http_bench") { - source_root = "http_bench.rs" + source_root = "examples/http_bench.rs" deps = [ ":deno_core_deps", ] @@ -62,7 +62,7 @@ rust_executable("deno_core_http_bench") { } rust_test("deno_core_http_bench_test") { - source_root = "http_bench.rs" + source_root = "examples/http_bench.rs" deps = [ ":deno_core_deps", ] diff --git a/core/Cargo.toml b/core/Cargo.toml index e00b91689..32cd6fe79 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -19,3 +19,11 @@ lazy_static = "1.3.0" libc = "0.2.51" log = "0.4.6" serde_json = "1.0.39" + +[[example]] +name = "deno_core_http_bench" +path = "examples/http_bench.rs" + +# tokio is only used for deno_core_http_bench +[dev_dependencies] +tokio = "0.1.18" diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js new file mode 100644 index 000000000..8eb764b55 --- /dev/null +++ b/core/examples/http_bench.js @@ -0,0 +1,143 @@ +// This is not a real HTTP server. We read blindly one time into 'requestBuf', +// then write this fixed 'responseBuf'. The point of this benchmark is to +// exercise the event loop in a simple yet semi-realistic way. +const OP_LISTEN = 1; +const OP_ACCEPT = 2; +const OP_READ = 3; +const OP_WRITE = 4; +const OP_CLOSE = 5; +const requestBuf = new Uint8Array(64 * 1024); +const responseBuf = new Uint8Array( + "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n" + .split("") + .map(c => c.charCodeAt(0)) +); +const promiseMap = new Map(); +let nextPromiseId = 1; + +function assert(cond) { + if (!cond) { + throw Error("assert"); + } +} + +function createResolvable() { + let methods; + const promise = new Promise((resolve, reject) => { + methods = { resolve, reject }; + }); + return Object.assign(promise, methods); +} + +const scratch32 = new Int32Array(4); +const scratchBytes = new Uint8Array( + scratch32.buffer, + scratch32.byteOffset, + scratch32.byteLength +); +assert(scratchBytes.byteLength === 4 * 4); + +function send(promiseId, opId, arg, zeroCopy = null) { + scratch32[0] = promiseId; + scratch32[1] = opId; + scratch32[2] = arg; + scratch32[3] = -1; + return Deno.core.dispatch(scratchBytes, zeroCopy); +} + +/** Returns Promise */ +function sendAsync(opId, arg, zeroCopy = null) { + const promiseId = nextPromiseId++; + const p = createResolvable(); + promiseMap.set(promiseId, p); + send(promiseId, opId, arg, zeroCopy); + return p; +} + +function recordFromBuf(buf) { + assert(buf.byteLength === 16); + const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4); + return { + promiseId: buf32[0], + opId: buf32[1], + arg: buf32[2], + result: buf32[3] + }; +} + +/** Returns i32 number */ +function sendSync(opId, arg) { + const buf = send(0, opId, arg); + const record = recordFromBuf(buf); + return record.result; +} + +function handleAsyncMsgFromRust(buf) { + const record = recordFromBuf(buf); + const { promiseId, result } = record; + const p = promiseMap.get(promiseId); + promiseMap.delete(promiseId); + p.resolve(result); +} + +/** Listens on 0.0.0.0:4500, returns rid. */ +function listen() { + return sendSync(OP_LISTEN, -1); +} + +/** Accepts a connection, returns rid. */ +async function accept(rid) { + return await sendAsync(OP_ACCEPT, rid); +} + +/** + * Reads a packet from the rid, presumably an http request. data is ignored. + * Returns bytes read. + */ +async function read(rid, data) { + return await sendAsync(OP_READ, rid, data); +} + +/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ +async function write(rid, data) { + return await sendAsync(OP_WRITE, rid, data); +} + +function close(rid) { + return sendSync(OP_CLOSE, rid); +} + +async function serve(rid) { + while (true) { + const nread = await read(rid, requestBuf); + if (nread <= 0) { + break; + } + + const nwritten = await write(rid, responseBuf); + if (nwritten < 0) { + break; + } + } + close(rid); +} + +async function main() { + Deno.core.setAsyncHandler(handleAsyncMsgFromRust); + + Deno.core.print("http_bench.js start\n"); + + const listenerRid = listen(); + Deno.core.print(`listening http://127.0.0.1:4544/ rid = ${listenerRid}\n`); + while (true) { + const rid = await accept(listenerRid); + // Deno.core.print(`accepted ${rid}`); + if (rid < 0) { + Deno.core.print(`accept error ${rid}`); + return; + } + serve(rid); + } +} + +main(); diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs new file mode 100644 index 000000000..e4598a7b3 --- /dev/null +++ b/core/examples/http_bench.rs @@ -0,0 +1,315 @@ +/// To run this benchmark: +/// +/// > DENO_BUILD_MODE=release ./tools/build.py && \ +/// ./target/release/deno_core_http_bench --multi-thread +extern crate deno; +extern crate futures; +extern crate libc; +extern crate tokio; + +#[macro_use] +extern crate log; +#[macro_use] +extern crate lazy_static; + +use deno::*; +use futures::future::lazy; +use std::collections::HashMap; +use std::env; +use std::net::SocketAddr; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Mutex; +use tokio::prelude::*; + +static LOGGER: Logger = Logger; +struct Logger; +impl log::Log for Logger { + fn enabled(&self, metadata: &log::Metadata) -> bool { + metadata.level() <= log::max_level() + } + fn log(&self, record: &log::Record) { + if self.enabled(record.metadata()) { + println!("{} - {}", record.level(), record.args()); + } + } + fn flush(&self) {} +} + +const OP_LISTEN: i32 = 1; +const OP_ACCEPT: i32 = 2; +const OP_READ: i32 = 3; +const OP_WRITE: i32 = 4; +const OP_CLOSE: i32 = 5; + +#[derive(Clone, Debug, PartialEq)] +pub struct Record { + pub promise_id: i32, + pub op_id: i32, + pub arg: i32, + pub result: i32, +} + +impl Into for Record { + fn into(self) -> Buf { + let buf32 = vec![self.promise_id, self.op_id, self.arg, self.result] + .into_boxed_slice(); + let ptr = Box::into_raw(buf32) as *mut [u8; 16]; + unsafe { Box::from_raw(ptr) } + } +} + +impl From<&[u8]> for Record { + fn from(s: &[u8]) -> Record { + let ptr = s.as_ptr() as *const i32; + let ints = unsafe { std::slice::from_raw_parts(ptr, 4) }; + Record { + promise_id: ints[0], + op_id: ints[1], + arg: ints[2], + result: ints[3], + } + } +} + +impl From for Record { + fn from(buf: Buf) -> Record { + assert_eq!(buf.len(), 4 * 4); + //let byte_len = buf.len(); + let ptr = Box::into_raw(buf) as *mut [i32; 4]; + let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) }; + assert_eq!(ints.len(), 4); + Record { + promise_id: ints[0], + op_id: ints[1], + arg: ints[2], + result: ints[3], + } + } +} + +#[test] +fn test_record_from() { + let r = Record { + promise_id: 1, + op_id: 2, + arg: 3, + result: 4, + }; + let expected = r.clone(); + let buf: Buf = r.into(); + #[cfg(target_endian = "little")] + assert_eq!( + buf, + vec![1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice() + ); + let actual = Record::from(buf); + assert_eq!(actual, expected); + // TODO test From<&[u8]> for Record +} + +pub type HttpBenchOp = dyn Future + Send; + +struct HttpBench(); + +impl Dispatch for HttpBench { + fn dispatch( + &mut self, + control: &[u8], + zero_copy_buf: deno_buf, + ) -> (bool, Box) { + let record = Record::from(control); + let is_sync = record.promise_id == 0; + let http_bench_op = match record.op_id { + OP_LISTEN => { + assert!(is_sync); + op_listen() + } + OP_CLOSE => { + assert!(is_sync); + let rid = record.arg; + op_close(rid) + } + OP_ACCEPT => { + assert!(!is_sync); + let listener_rid = record.arg; + op_accept(listener_rid) + } + OP_READ => { + assert!(!is_sync); + let rid = record.arg; + op_read(rid, zero_copy_buf) + } + OP_WRITE => { + assert!(!is_sync); + let rid = record.arg; + op_write(rid, zero_copy_buf) + } + _ => panic!("bad op {}", record.op_id), + }; + let mut record_a = record.clone(); + let mut record_b = record.clone(); + + let op = Box::new( + http_bench_op + .and_then(move |result| { + record_a.result = result; + Ok(record_a) + }).or_else(|err| -> Result { + eprintln!("unexpected err {}", err); + record_b.result = -1; + Ok(record_b) + }).then(|result| -> Result { + let record = result.unwrap(); + Ok(record.into()) + }), + ); + (is_sync, op) + } +} + +fn main() { + let main_future = lazy(move || { + // TODO currently isolate.execute() must be run inside tokio, hence the + // lazy(). It would be nice to not have that contraint. Probably requires + // using v8::MicrotasksPolicy::kExplicit + + let js_source = include_str!("http_bench.js"); + + let startup_data = StartupData::Script(Script { + source: js_source, + filename: "http_bench.js", + }); + + let isolate = deno::Isolate::new(startup_data, HttpBench()); + + isolate.then(|r| { + js_check(r); + Ok(()) + }) + }); + + let args: Vec = env::args().collect(); + let args = deno::v8_set_flags(args); + + log::set_logger(&LOGGER).unwrap(); + log::set_max_level(if args.iter().any(|a| a == "-D") { + log::LevelFilter::Debug + } else { + log::LevelFilter::Warn + }); + + if args.iter().any(|a| a == "--multi-thread") { + println!("multi-thread"); + tokio::run(main_future); + } else { + println!("single-thread"); + tokio::runtime::current_thread::run(main_future); + } +} + +enum Repr { + TcpListener(tokio::net::TcpListener), + TcpStream(tokio::net::TcpStream), +} + +type ResourceTable = HashMap; +lazy_static! { + static ref RESOURCE_TABLE: Mutex = Mutex::new(HashMap::new()); + static ref NEXT_RID: AtomicUsize = AtomicUsize::new(3); +} + +fn new_rid() -> i32 { + let rid = NEXT_RID.fetch_add(1, Ordering::SeqCst); + rid as i32 +} + +fn op_accept(listener_rid: i32) -> Box { + debug!("accept {}", listener_rid); + Box::new( + futures::future::poll_fn(move || { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&listener_rid); + match maybe_repr { + Some(Repr::TcpListener(ref mut listener)) => listener.poll_accept(), + _ => panic!("bad rid {}", listener_rid), + } + }).and_then(move |(stream, addr)| { + debug!("accept success {}", addr); + let rid = new_rid(); + + let mut guard = RESOURCE_TABLE.lock().unwrap(); + guard.insert(rid, Repr::TcpStream(stream)); + + Ok(rid as i32) + }), + ) +} + +fn op_listen() -> Box { + debug!("listen"); + + Box::new(lazy(move || { + let addr = "127.0.0.1:4544".parse::().unwrap(); + let listener = tokio::net::TcpListener::bind(&addr).unwrap(); + let rid = new_rid(); + + let mut guard = RESOURCE_TABLE.lock().unwrap(); + guard.insert(rid, Repr::TcpListener(listener)); + futures::future::ok(rid) + })) +} + +fn op_close(rid: i32) -> Box { + debug!("close"); + Box::new(lazy(move || { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let r = table.remove(&rid); + let result = if r.is_some() { 0 } else { -1 }; + futures::future::ok(result) + })) +} + +fn op_read(rid: i32, mut zero_copy_buf: deno_buf) -> Box { + debug!("read rid={}", rid); + Box::new( + futures::future::poll_fn(move || { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&rid); + match maybe_repr { + Some(Repr::TcpStream(ref mut stream)) => { + stream.poll_read(&mut zero_copy_buf) + } + _ => panic!("bad rid"), + } + }).and_then(move |nread| { + debug!("read success {}", nread); + Ok(nread as i32) + }), + ) +} + +fn op_write(rid: i32, zero_copy_buf: deno_buf) -> Box { + debug!("write rid={}", rid); + Box::new( + futures::future::poll_fn(move || { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&rid); + match maybe_repr { + Some(Repr::TcpStream(ref mut stream)) => { + stream.poll_write(&zero_copy_buf) + } + _ => panic!("bad rid"), + } + }).and_then(move |nwritten| { + debug!("write success {}", nwritten); + Ok(nwritten as i32) + }), + ) +} + +fn js_check(r: Result<(), JSError>) { + if let Err(e) = r { + panic!(e.to_string()); + } +} diff --git a/core/http_bench.js b/core/http_bench.js deleted file mode 100644 index 8eb764b55..000000000 --- a/core/http_bench.js +++ /dev/null @@ -1,143 +0,0 @@ -// This is not a real HTTP server. We read blindly one time into 'requestBuf', -// then write this fixed 'responseBuf'. The point of this benchmark is to -// exercise the event loop in a simple yet semi-realistic way. -const OP_LISTEN = 1; -const OP_ACCEPT = 2; -const OP_READ = 3; -const OP_WRITE = 4; -const OP_CLOSE = 5; -const requestBuf = new Uint8Array(64 * 1024); -const responseBuf = new Uint8Array( - "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n" - .split("") - .map(c => c.charCodeAt(0)) -); -const promiseMap = new Map(); -let nextPromiseId = 1; - -function assert(cond) { - if (!cond) { - throw Error("assert"); - } -} - -function createResolvable() { - let methods; - const promise = new Promise((resolve, reject) => { - methods = { resolve, reject }; - }); - return Object.assign(promise, methods); -} - -const scratch32 = new Int32Array(4); -const scratchBytes = new Uint8Array( - scratch32.buffer, - scratch32.byteOffset, - scratch32.byteLength -); -assert(scratchBytes.byteLength === 4 * 4); - -function send(promiseId, opId, arg, zeroCopy = null) { - scratch32[0] = promiseId; - scratch32[1] = opId; - scratch32[2] = arg; - scratch32[3] = -1; - return Deno.core.dispatch(scratchBytes, zeroCopy); -} - -/** Returns Promise */ -function sendAsync(opId, arg, zeroCopy = null) { - const promiseId = nextPromiseId++; - const p = createResolvable(); - promiseMap.set(promiseId, p); - send(promiseId, opId, arg, zeroCopy); - return p; -} - -function recordFromBuf(buf) { - assert(buf.byteLength === 16); - const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4); - return { - promiseId: buf32[0], - opId: buf32[1], - arg: buf32[2], - result: buf32[3] - }; -} - -/** Returns i32 number */ -function sendSync(opId, arg) { - const buf = send(0, opId, arg); - const record = recordFromBuf(buf); - return record.result; -} - -function handleAsyncMsgFromRust(buf) { - const record = recordFromBuf(buf); - const { promiseId, result } = record; - const p = promiseMap.get(promiseId); - promiseMap.delete(promiseId); - p.resolve(result); -} - -/** Listens on 0.0.0.0:4500, returns rid. */ -function listen() { - return sendSync(OP_LISTEN, -1); -} - -/** Accepts a connection, returns rid. */ -async function accept(rid) { - return await sendAsync(OP_ACCEPT, rid); -} - -/** - * Reads a packet from the rid, presumably an http request. data is ignored. - * Returns bytes read. - */ -async function read(rid, data) { - return await sendAsync(OP_READ, rid, data); -} - -/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ -async function write(rid, data) { - return await sendAsync(OP_WRITE, rid, data); -} - -function close(rid) { - return sendSync(OP_CLOSE, rid); -} - -async function serve(rid) { - while (true) { - const nread = await read(rid, requestBuf); - if (nread <= 0) { - break; - } - - const nwritten = await write(rid, responseBuf); - if (nwritten < 0) { - break; - } - } - close(rid); -} - -async function main() { - Deno.core.setAsyncHandler(handleAsyncMsgFromRust); - - Deno.core.print("http_bench.js start\n"); - - const listenerRid = listen(); - Deno.core.print(`listening http://127.0.0.1:4544/ rid = ${listenerRid}\n`); - while (true) { - const rid = await accept(listenerRid); - // Deno.core.print(`accepted ${rid}`); - if (rid < 0) { - Deno.core.print(`accept error ${rid}`); - return; - } - serve(rid); - } -} - -main(); diff --git a/core/http_bench.rs b/core/http_bench.rs deleted file mode 100644 index e4598a7b3..000000000 --- a/core/http_bench.rs +++ /dev/null @@ -1,315 +0,0 @@ -/// To run this benchmark: -/// -/// > DENO_BUILD_MODE=release ./tools/build.py && \ -/// ./target/release/deno_core_http_bench --multi-thread -extern crate deno; -extern crate futures; -extern crate libc; -extern crate tokio; - -#[macro_use] -extern crate log; -#[macro_use] -extern crate lazy_static; - -use deno::*; -use futures::future::lazy; -use std::collections::HashMap; -use std::env; -use std::net::SocketAddr; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; -use std::sync::Mutex; -use tokio::prelude::*; - -static LOGGER: Logger = Logger; -struct Logger; -impl log::Log for Logger { - fn enabled(&self, metadata: &log::Metadata) -> bool { - metadata.level() <= log::max_level() - } - fn log(&self, record: &log::Record) { - if self.enabled(record.metadata()) { - println!("{} - {}", record.level(), record.args()); - } - } - fn flush(&self) {} -} - -const OP_LISTEN: i32 = 1; -const OP_ACCEPT: i32 = 2; -const OP_READ: i32 = 3; -const OP_WRITE: i32 = 4; -const OP_CLOSE: i32 = 5; - -#[derive(Clone, Debug, PartialEq)] -pub struct Record { - pub promise_id: i32, - pub op_id: i32, - pub arg: i32, - pub result: i32, -} - -impl Into for Record { - fn into(self) -> Buf { - let buf32 = vec![self.promise_id, self.op_id, self.arg, self.result] - .into_boxed_slice(); - let ptr = Box::into_raw(buf32) as *mut [u8; 16]; - unsafe { Box::from_raw(ptr) } - } -} - -impl From<&[u8]> for Record { - fn from(s: &[u8]) -> Record { - let ptr = s.as_ptr() as *const i32; - let ints = unsafe { std::slice::from_raw_parts(ptr, 4) }; - Record { - promise_id: ints[0], - op_id: ints[1], - arg: ints[2], - result: ints[3], - } - } -} - -impl From for Record { - fn from(buf: Buf) -> Record { - assert_eq!(buf.len(), 4 * 4); - //let byte_len = buf.len(); - let ptr = Box::into_raw(buf) as *mut [i32; 4]; - let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) }; - assert_eq!(ints.len(), 4); - Record { - promise_id: ints[0], - op_id: ints[1], - arg: ints[2], - result: ints[3], - } - } -} - -#[test] -fn test_record_from() { - let r = Record { - promise_id: 1, - op_id: 2, - arg: 3, - result: 4, - }; - let expected = r.clone(); - let buf: Buf = r.into(); - #[cfg(target_endian = "little")] - assert_eq!( - buf, - vec![1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice() - ); - let actual = Record::from(buf); - assert_eq!(actual, expected); - // TODO test From<&[u8]> for Record -} - -pub type HttpBenchOp = dyn Future + Send; - -struct HttpBench(); - -impl Dispatch for HttpBench { - fn dispatch( - &mut self, - control: &[u8], - zero_copy_buf: deno_buf, - ) -> (bool, Box) { - let record = Record::from(control); - let is_sync = record.promise_id == 0; - let http_bench_op = match record.op_id { - OP_LISTEN => { - assert!(is_sync); - op_listen() - } - OP_CLOSE => { - assert!(is_sync); - let rid = record.arg; - op_close(rid) - } - OP_ACCEPT => { - assert!(!is_sync); - let listener_rid = record.arg; - op_accept(listener_rid) - } - OP_READ => { - assert!(!is_sync); - let rid = record.arg; - op_read(rid, zero_copy_buf) - } - OP_WRITE => { - assert!(!is_sync); - let rid = record.arg; - op_write(rid, zero_copy_buf) - } - _ => panic!("bad op {}", record.op_id), - }; - let mut record_a = record.clone(); - let mut record_b = record.clone(); - - let op = Box::new( - http_bench_op - .and_then(move |result| { - record_a.result = result; - Ok(record_a) - }).or_else(|err| -> Result { - eprintln!("unexpected err {}", err); - record_b.result = -1; - Ok(record_b) - }).then(|result| -> Result { - let record = result.unwrap(); - Ok(record.into()) - }), - ); - (is_sync, op) - } -} - -fn main() { - let main_future = lazy(move || { - // TODO currently isolate.execute() must be run inside tokio, hence the - // lazy(). It would be nice to not have that contraint. Probably requires - // using v8::MicrotasksPolicy::kExplicit - - let js_source = include_str!("http_bench.js"); - - let startup_data = StartupData::Script(Script { - source: js_source, - filename: "http_bench.js", - }); - - let isolate = deno::Isolate::new(startup_data, HttpBench()); - - isolate.then(|r| { - js_check(r); - Ok(()) - }) - }); - - let args: Vec = env::args().collect(); - let args = deno::v8_set_flags(args); - - log::set_logger(&LOGGER).unwrap(); - log::set_max_level(if args.iter().any(|a| a == "-D") { - log::LevelFilter::Debug - } else { - log::LevelFilter::Warn - }); - - if args.iter().any(|a| a == "--multi-thread") { - println!("multi-thread"); - tokio::run(main_future); - } else { - println!("single-thread"); - tokio::runtime::current_thread::run(main_future); - } -} - -enum Repr { - TcpListener(tokio::net::TcpListener), - TcpStream(tokio::net::TcpStream), -} - -type ResourceTable = HashMap; -lazy_static! { - static ref RESOURCE_TABLE: Mutex = Mutex::new(HashMap::new()); - static ref NEXT_RID: AtomicUsize = AtomicUsize::new(3); -} - -fn new_rid() -> i32 { - let rid = NEXT_RID.fetch_add(1, Ordering::SeqCst); - rid as i32 -} - -fn op_accept(listener_rid: i32) -> Box { - debug!("accept {}", listener_rid); - Box::new( - futures::future::poll_fn(move || { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let maybe_repr = table.get_mut(&listener_rid); - match maybe_repr { - Some(Repr::TcpListener(ref mut listener)) => listener.poll_accept(), - _ => panic!("bad rid {}", listener_rid), - } - }).and_then(move |(stream, addr)| { - debug!("accept success {}", addr); - let rid = new_rid(); - - let mut guard = RESOURCE_TABLE.lock().unwrap(); - guard.insert(rid, Repr::TcpStream(stream)); - - Ok(rid as i32) - }), - ) -} - -fn op_listen() -> Box { - debug!("listen"); - - Box::new(lazy(move || { - let addr = "127.0.0.1:4544".parse::().unwrap(); - let listener = tokio::net::TcpListener::bind(&addr).unwrap(); - let rid = new_rid(); - - let mut guard = RESOURCE_TABLE.lock().unwrap(); - guard.insert(rid, Repr::TcpListener(listener)); - futures::future::ok(rid) - })) -} - -fn op_close(rid: i32) -> Box { - debug!("close"); - Box::new(lazy(move || { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let r = table.remove(&rid); - let result = if r.is_some() { 0 } else { -1 }; - futures::future::ok(result) - })) -} - -fn op_read(rid: i32, mut zero_copy_buf: deno_buf) -> Box { - debug!("read rid={}", rid); - Box::new( - futures::future::poll_fn(move || { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let maybe_repr = table.get_mut(&rid); - match maybe_repr { - Some(Repr::TcpStream(ref mut stream)) => { - stream.poll_read(&mut zero_copy_buf) - } - _ => panic!("bad rid"), - } - }).and_then(move |nread| { - debug!("read success {}", nread); - Ok(nread as i32) - }), - ) -} - -fn op_write(rid: i32, zero_copy_buf: deno_buf) -> Box { - debug!("write rid={}", rid); - Box::new( - futures::future::poll_fn(move || { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let maybe_repr = table.get_mut(&rid); - match maybe_repr { - Some(Repr::TcpStream(ref mut stream)) => { - stream.poll_write(&zero_copy_buf) - } - _ => panic!("bad rid"), - } - }).and_then(move |nwritten| { - debug!("write success {}", nwritten); - Ok(nwritten as i32) - }), - ) -} - -fn js_check(r: Result<(), JSError>) { - if let Err(e) = r { - panic!(e.to_string()); - } -} -- cgit v1.2.3