summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2023-08-03 22:36:32 +0200
committerGitHub <noreply@github.com>2023-08-03 20:36:32 +0000
commit433ecc9047c1701a6b384fc94593c1f41f9f6bf6 (patch)
tree1e26e0b91a9f247678e347b133773ec1ad0b913a
parent7f8bf2537db0ae596a2c1baabd4011a190666ca6 (diff)
refactor: rewrite http_next ops to use op2 macro (#19934)
Ref #19915 --------- Co-authored-by: Matt Mastracci <matthew@mastracci.com>
-rw-r--r--ext/http/00_serve.js4
-rw-r--r--ext/http/http_next.rs106
2 files changed, 63 insertions, 47 deletions
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js
index b51dfee07..af4353e0e 100644
--- a/ext/http/00_serve.js
+++ b/ext/http/00_serve.js
@@ -726,7 +726,7 @@ function serveHttpOn(context, callback) {
try {
// Attempt to pull as many requests out of the queue as possible before awaiting. This API is
// a synchronous, non-blocking API that returns u32::MAX if anything goes wrong.
- while ((req = op_http_try_wait(rid)) !== 0xffffffff) {
+ while ((req = op_http_try_wait(rid)) !== -1) {
PromisePrototypeCatch(callback(req), promiseErrorHandler);
}
currentPromise = op_http_wait(rid);
@@ -741,7 +741,7 @@ function serveHttpOn(context, callback) {
}
throw new Deno.errors.Http(error);
}
- if (req === 0xffffffff) {
+ if (req === -1) {
break;
}
PromisePrototypeCatch(callback(req), promiseErrorHandler);
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 7cf088e30..3c3724924 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -117,10 +117,11 @@ impl<
{
}
-#[op]
+#[op2(fast)]
+#[smi]
pub fn op_http_upgrade_raw(
state: &mut OpState,
- slab_id: SlabId,
+ #[smi] slab_id: SlabId,
) -> Result<ResourceId, AnyError> {
// Stage 1: extract the upgrade future
let upgrade = slab_get(slab_id).upgrade()?;
@@ -184,11 +185,12 @@ pub fn op_http_upgrade_raw(
)
}
-#[op]
+#[op2(async)]
+#[smi]
pub async fn op_http_upgrade_websocket_next(
state: Rc<RefCell<OpState>>,
- slab_id: SlabId,
- headers: Vec<(ByteString, ByteString)>,
+ #[smi] slab_id: SlabId,
+ #[serde] headers: Vec<(ByteString, ByteString)>,
) -> Result<ResourceId, AnyError> {
let mut http = slab_get(slab_id);
// Stage 1: set the response to 101 Switching Protocols and send it
@@ -290,10 +292,11 @@ where
array_value.into()
}
-#[op]
+#[op2]
+#[serde]
pub fn op_http_get_request_header(
- slab_id: SlabId,
- name: String,
+ #[smi] slab_id: SlabId,
+ #[string] name: String,
) -> Option<ByteString> {
let http = slab_get(slab_id);
let value = http.request_parts().headers.get(name);
@@ -382,11 +385,11 @@ pub fn op_http_read_request_body(
state.resource_table.add_rc(body_resource)
}
-#[op(fast)]
+#[op2]
pub fn op_http_set_response_header(
- slab_id: SlabId,
- name: ByteString,
- value: ByteString,
+ #[smi] slab_id: SlabId,
+ #[serde] name: ByteString,
+ #[serde] value: ByteString,
) {
let mut http = slab_get(slab_id);
let resp_headers = http.response().headers_mut();
@@ -397,24 +400,22 @@ pub fn op_http_set_response_header(
resp_headers.append(name, value);
}
-#[op(v8)]
-fn op_http_set_response_headers(
+#[op2]
+pub fn op_http_set_response_headers(
scope: &mut v8::HandleScope,
- slab_id: SlabId,
- headers: serde_v8::Value,
+ #[smi] slab_id: SlabId,
+ headers: v8::Local<v8::Array>,
) {
let mut http = slab_get(slab_id);
// TODO(mmastrac): Invalid headers should be handled?
let resp_headers = http.response().headers_mut();
- let arr = v8::Local::<v8::Array>::try_from(headers.v8_value).unwrap();
-
- let len = arr.length();
+ let len = headers.length();
let header_len = len * 2;
resp_headers.reserve(header_len.try_into().unwrap());
for i in 0..len {
- let item = arr.get_index(scope, i).unwrap();
+ let item = headers.get_index(scope, i).unwrap();
let pair = v8::Local::<v8::Array>::try_from(item).unwrap();
let name = pair.get_index(scope, 0).unwrap();
let value = pair.get_index(scope, 1).unwrap();
@@ -429,10 +430,10 @@ fn op_http_set_response_headers(
}
}
-#[op]
+#[op2]
pub fn op_http_set_response_trailers(
- slab_id: SlabId,
- trailers: Vec<(ByteString, ByteString)>,
+ #[smi] slab_id: SlabId,
+ #[serde] trailers: Vec<(ByteString, ByteString)>,
) {
let mut http = slab_get(slab_id);
let mut trailer_map: HeaderMap = HeaderMap::with_capacity(trailers.len());
@@ -580,11 +581,11 @@ fn set_response(
response.body_mut().initialize(response_fn(compression))
}
-#[op(fast)]
+#[op2(fast)]
pub fn op_http_set_response_body_resource(
state: &mut OpState,
- slab_id: SlabId,
- stream_rid: ResourceId,
+ #[smi] slab_id: SlabId,
+ #[smi] stream_rid: ResourceId,
auto_close: bool,
) -> Result<(), AnyError> {
// If the stream is auto_close, we will hold the last ref to it until the response is complete.
@@ -605,10 +606,11 @@ pub fn op_http_set_response_body_resource(
Ok(())
}
-#[op(fast)]
+#[op2(fast)]
+#[smi]
pub fn op_http_set_response_body_stream(
state: &mut OpState,
- slab_id: SlabId,
+ #[smi] slab_id: SlabId,
) -> Result<ResourceId, AnyError> {
// TODO(mmastrac): what should this channel size be?
let (tx, rx) = tokio::sync::mpsc::channel(1);
@@ -619,8 +621,11 @@ pub fn op_http_set_response_body_stream(
Ok(state.resource_table.add(V8StreamHttpResponseBody::new(tx)))
}
-#[op(fast)]
-pub fn op_http_set_response_body_text(slab_id: SlabId, text: String) {
+#[op2(fast)]
+pub fn op_http_set_response_body_text(
+ #[smi] slab_id: SlabId,
+ #[string] text: String,
+) {
if !text.is_empty() {
set_response(slab_id, Some(text.len()), |compression| {
ResponseBytesInner::from_vec(compression, text.into_bytes())
@@ -628,8 +633,11 @@ pub fn op_http_set_response_body_text(slab_id: SlabId, text: String) {
}
}
-#[op(fast)]
-pub fn op_http_set_response_body_bytes(slab_id: SlabId, buffer: &[u8]) {
+#[op2(fast)]
+pub fn op_http_set_response_body_bytes(
+ #[smi] slab_id: SlabId,
+ #[buffer] buffer: &[u8],
+) {
if !buffer.is_empty() {
set_response(slab_id, Some(buffer.len()), |compression| {
ResponseBytesInner::from_slice(compression, buffer)
@@ -637,11 +645,11 @@ pub fn op_http_set_response_body_bytes(slab_id: SlabId, buffer: &[u8]) {
};
}
-#[op]
+#[op2(async)]
pub async fn op_http_track(
state: Rc<RefCell<OpState>>,
- slab_id: SlabId,
- server_rid: ResourceId,
+ #[smi] slab_id: SlabId,
+ #[smi] server_rid: ResourceId,
) -> Result<(), AnyError> {
let http = slab_get(slab_id);
let handle = http.body_promise();
@@ -834,10 +842,11 @@ impl Drop for HttpJoinHandle {
}
}
-#[op(v8)]
+#[op2]
+#[serde]
pub fn op_http_serve<HTTP>(
state: Rc<RefCell<OpState>>,
- listener_rid: ResourceId,
+ #[smi] listener_rid: ResourceId,
) -> Result<(ResourceId, &'static str, String), AnyError>
where
HTTP: HttpPropertyExtractor,
@@ -886,10 +895,11 @@ where
))
}
-#[op(v8)]
+#[op2]
+#[serde]
pub fn op_http_serve_on<HTTP>(
state: Rc<RefCell<OpState>>,
- connection_rid: ResourceId,
+ #[smi] connection_rid: ResourceId,
) -> Result<(ResourceId, &'static str, String), AnyError>
where
HTTP: HttpPropertyExtractor,
@@ -930,8 +940,9 @@ where
/// Synchronous, non-blocking call to see if there are any further HTTP requests. If anything
/// goes wrong in this method we return [`SlabId::MAX`] and let the async handler pick up the real error.
-#[op(fast)]
-pub fn op_http_try_wait(state: &mut OpState, rid: ResourceId) -> SlabId {
+#[op2(fast)]
+#[smi]
+pub fn op_http_try_wait(state: &mut OpState, #[smi] rid: ResourceId) -> SlabId {
// The resource needs to exist.
let Ok(join_handle) = state
.resource_table
@@ -952,10 +963,11 @@ pub fn op_http_try_wait(state: &mut OpState, rid: ResourceId) -> SlabId {
id
}
-#[op]
+#[op2(async)]
+#[smi]
pub async fn op_http_wait(
state: Rc<RefCell<OpState>>,
- rid: ResourceId,
+ #[smi] rid: ResourceId,
) -> Result<SlabId, AnyError> {
// We will get the join handle initially, as we might be consuming requests still
let join_handle = state
@@ -1088,11 +1100,15 @@ impl Resource for UpgradeStream {
}
}
-#[op(fast)]
-pub fn op_can_write_vectored(state: &mut OpState, rid: ResourceId) -> bool {
+#[op2(fast)]
+pub fn op_can_write_vectored(
+ state: &mut OpState,
+ #[smi] rid: ResourceId,
+) -> bool {
state.resource_table.get::<UpgradeStream>(rid).is_ok()
}
+// TODO(bartlomieju): op2 doesn't want to handle `usize` in the return type
#[op]
pub async fn op_raw_write_vectored(
state: Rc<RefCell<OpState>>,