From 98878bd81231a631c494b6767576097f945eb813 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Thu, 14 Jan 2021 20:32:27 -0800 Subject: refactor: IO resource types, fix concurrent read/write and graceful close (#9118) Fixes: 9032. --- runtime/ops/net.rs | 45 ++------------------------------------------- 1 file changed, 2 insertions(+), 43 deletions(-) (limited to 'runtime/ops/net.rs') diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs index e3864b38a..7e80bb86b 100644 --- a/runtime/ops/net.rs +++ b/runtime/ops/net.rs @@ -5,7 +5,6 @@ use crate::permissions::Permissions; use crate::resolve_addr::resolve_addr; use crate::resolve_addr::resolve_addr_sync; use deno_core::error::bad_resource; -use deno_core::error::bad_resource_id; use deno_core::error::custom_error; use deno_core::error::generic_error; use deno_core::error::type_error; @@ -27,7 +26,6 @@ use std::borrow::Cow; use std::cell::RefCell; use std::net::SocketAddr; use std::rc::Rc; -use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::net::UdpSocket; @@ -42,14 +40,13 @@ use trust_dns_resolver::AsyncResolver; #[cfg(unix)] use super::net_unix; #[cfg(unix)] -use crate::ops::io::StreamResource; +use crate::ops::io::UnixStreamResource; #[cfg(unix)] use std::path::Path; pub fn init(rt: &mut deno_core::JsRuntime) { super::reg_json_async(rt, "op_accept", op_accept); super::reg_json_async(rt, "op_connect", op_connect); - super::reg_json_async(rt, "op_shutdown", op_shutdown); super::reg_json_sync(rt, "op_listen", op_listen); super::reg_json_async(rt, "op_datagram_receive", op_datagram_receive); super::reg_json_async(rt, "op_datagram_send", op_datagram_send); @@ -318,7 +315,7 @@ async fn op_connect( let remote_addr = unix_stream.peer_addr()?; let mut state_ = state.borrow_mut(); - let resource = StreamResource::unix_stream(unix_stream); + let resource = UnixStreamResource::new(unix_stream.into_split()); let rid = state_.resource_table.add(resource); Ok(json!({ "rid": rid, @@ -336,44 +333,6 @@ async fn op_connect( } } -#[derive(Deserialize)] -struct ShutdownArgs { - rid: i32, -} - -async fn op_shutdown( - state: Rc>, - args: Value, - _zero_copy: BufVec, -) -> Result { - let args: ShutdownArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - - let resource = state - .borrow() - .resource_table - .get_any(rid) - .ok_or_else(bad_resource_id)?; - if let Some(stream) = resource.downcast_rc::() { - let mut wr = stream.wr_borrow_mut().await; - wr.shutdown().await?; - return Ok(json!({})); - } - - #[cfg(unix)] - if let Some(stream) = resource.downcast_rc::() { - if stream.unix_stream.is_some() { - let mut wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap()) - .borrow_mut() - .await; - wr.shutdown().await?; - return Ok(json!({})); - } - } - - Err(bad_resource_id()) -} - struct TcpListenerResource { listener: AsyncRefCell, cancel: CancelHandle, -- cgit v1.2.3