From 593c3aa857bec2992aba7b7ec588dc031e379572 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Tue, 12 May 2020 11:09:28 -0400 Subject: Clean up core/shared_queue.js (#5237) --- core/core.js | 203 ++++++++++++++++++++++++++++++++++++++++++++ core/core_test.js | 87 +++++++++++++++++++ core/isolate.rs | 13 +-- core/shared_queue.js | 211 ---------------------------------------------- core/shared_queue_test.js | 87 ------------------- 5 files changed, 293 insertions(+), 308 deletions(-) create mode 100644 core/core.js create mode 100644 core/core_test.js delete mode 100644 core/shared_queue.js delete mode 100644 core/shared_queue_test.js diff --git a/core/core.js b/core/core.js new file mode 100644 index 000000000..4c6f708bb --- /dev/null +++ b/core/core.js @@ -0,0 +1,203 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +/* +SharedQueue Binary Layout ++-------------------------------+-------------------------------+ +| NUM_RECORDS (32) | ++---------------------------------------------------------------+ +| NUM_SHIFTED_OFF (32) | ++---------------------------------------------------------------+ +| HEAD (32) | ++---------------------------------------------------------------+ +| OFFSETS (32) | ++---------------------------------------------------------------+ +| RECORD_ENDS (*MAX_RECORDS) ... ++---------------------------------------------------------------+ +| RECORDS (*MAX_RECORDS) ... ++---------------------------------------------------------------+ + */ + +/* eslint-disable @typescript-eslint/no-use-before-define */ + +((window) => { + const MAX_RECORDS = 100; + const INDEX_NUM_RECORDS = 0; + const INDEX_NUM_SHIFTED_OFF = 1; + const INDEX_HEAD = 2; + const INDEX_OFFSETS = 3; + const INDEX_RECORDS = INDEX_OFFSETS + 2 * MAX_RECORDS; + const HEAD_INIT = 4 * INDEX_RECORDS; + + // Available on start due to bindings. + const core = window.Deno.core; + const { recv, send } = core; + + let sharedBytes; + let shared32; + + let asyncHandlers; + + let initialized = false; + + function maybeInit() { + if (!initialized) { + init(); + initialized = true; + } + } + + function init() { + const shared = core.shared; + assert(shared.byteLength > 0); + assert(sharedBytes == null); + assert(shared32 == null); + sharedBytes = new Uint8Array(shared); + shared32 = new Int32Array(shared); + asyncHandlers = []; + // Callers should not call core.recv, use setAsyncHandler. + recv(handleAsyncMsgFromRust); + } + + function ops() { + // op id 0 is a special value to retrieve the map of registered ops. + const opsMapBytes = send(0, new Uint8Array([]), null); + const opsMapJson = String.fromCharCode.apply(null, opsMapBytes); + return JSON.parse(opsMapJson); + } + + function assert(cond) { + if (!cond) { + throw Error("assert"); + } + } + + function reset() { + maybeInit(); + shared32[INDEX_NUM_RECORDS] = 0; + shared32[INDEX_NUM_SHIFTED_OFF] = 0; + shared32[INDEX_HEAD] = HEAD_INIT; + } + + function head() { + maybeInit(); + return shared32[INDEX_HEAD]; + } + + function numRecords() { + return shared32[INDEX_NUM_RECORDS]; + } + + function size() { + return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF]; + } + + function setMeta(index, end, opId) { + shared32[INDEX_OFFSETS + 2 * index] = end; + shared32[INDEX_OFFSETS + 2 * index + 1] = opId; + } + + function getMeta(index) { + if (index < numRecords()) { + const buf = shared32[INDEX_OFFSETS + 2 * index]; + const opId = shared32[INDEX_OFFSETS + 2 * index + 1]; + return [opId, buf]; + } else { + return null; + } + } + + function getOffset(index) { + if (index < numRecords()) { + if (index == 0) { + return HEAD_INIT; + } else { + const prevEnd = shared32[INDEX_OFFSETS + 2 * (index - 1)]; + return (prevEnd + 3) & ~3; + } + } else { + return null; + } + } + + function push(opId, buf) { + const off = head(); + const end = off + buf.byteLength; + const alignedEnd = (end + 3) & ~3; + const index = numRecords(); + if (alignedEnd > shared32.byteLength || index >= MAX_RECORDS) { + // console.log("shared_queue.js push fail"); + return false; + } + setMeta(index, end, opId); + assert(alignedEnd % 4 === 0); + assert(end - off == buf.byteLength); + sharedBytes.set(buf, off); + shared32[INDEX_NUM_RECORDS] += 1; + shared32[INDEX_HEAD] = alignedEnd; + return true; + } + + /// Returns null if empty. + function shift() { + const i = shared32[INDEX_NUM_SHIFTED_OFF]; + if (size() == 0) { + assert(i == 0); + return null; + } + + const off = getOffset(i); + const [opId, end] = getMeta(i); + + if (size() > 1) { + shared32[INDEX_NUM_SHIFTED_OFF] += 1; + } else { + reset(); + } + + assert(off != null); + assert(end != null); + const buf = sharedBytes.subarray(off, end); + return [opId, buf]; + } + + function setAsyncHandler(opId, cb) { + maybeInit(); + assert(opId != null); + asyncHandlers[opId] = cb; + } + + function handleAsyncMsgFromRust(opId, buf) { + if (buf) { + // This is the overflow_response case of deno::Isolate::poll(). + asyncHandlers[opId](buf); + } else { + while (true) { + const opIdBuf = shift(); + if (opIdBuf == null) { + break; + } + assert(asyncHandlers[opIdBuf[0]] != null); + asyncHandlers[opIdBuf[0]](opIdBuf[1]); + } + } + } + + function dispatch(opId, control, zeroCopy = null) { + return send(opId, control, zeroCopy); + } + + Object.assign(window.Deno.core, { + setAsyncHandler, + dispatch, + ops, + // sharedQueue is private but exposed for testing. + sharedQueue: { + MAX_RECORDS, + head, + numRecords, + size, + push, + reset, + shift, + }, + }); +})(this); diff --git a/core/core_test.js b/core/core_test.js new file mode 100644 index 000000000..7821f93a6 --- /dev/null +++ b/core/core_test.js @@ -0,0 +1,87 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +function assert(cond) { + if (!cond) { + throw Error("assert"); + } +} + +// Check overflow (corresponds to full_records test in rust) +function fullRecords(q) { + q.reset(); + const oneByte = new Uint8Array([42]); + for (let i = 0; i < q.MAX_RECORDS; i++) { + assert(q.push(1, oneByte)); + } + assert(!q.push(1, oneByte)); + const [opId, r] = q.shift(); + assert(opId == 1); + assert(r.byteLength == 1); + assert(r[0] == 42); + // Even if we shift one off, we still cannot push a new record. + assert(!q.push(1, oneByte)); +} + +function main() { + const q = Deno.core.sharedQueue; + + const h = q.head(); + assert(h > 0); + + // This record's len is not divisble by + // 4 so after pushing it to the queue, + // next record offset should be aligned to 4. + let r = new Uint8Array([1, 2, 3, 4, 5]); + const len = r.byteLength + h; + assert(q.push(1, r)); + // Record should be aligned to 4 bytes + assert(q.head() == len + 3); + + r = new Uint8Array([6, 7]); + assert(q.push(1, r)); + + r = new Uint8Array([8, 9, 10, 11]); + assert(q.push(1, r)); + assert(q.numRecords() == 3); + assert(q.size() == 3); + + let opId; + [opId, r] = q.shift(); + assert(r.byteLength == 5); + assert(r[0] == 1); + assert(r[1] == 2); + assert(r[2] == 3); + assert(r[3] == 4); + assert(r[4] == 5); + assert(q.numRecords() == 3); + assert(q.size() == 2); + + [opId, r] = q.shift(); + assert(r.byteLength == 2); + assert(r[0] == 6); + assert(r[1] == 7); + assert(q.numRecords() == 3); + assert(q.size() == 1); + + [opId, r] = q.shift(); + assert(opId == 1); + assert(r.byteLength == 4); + assert(r[0] == 8); + assert(r[1] == 9); + assert(r[2] == 10); + assert(r[3] == 11); + assert(q.numRecords() == 0); + assert(q.size() == 0); + + assert(q.shift() == null); + assert(q.shift() == null); + assert(q.numRecords() == 0); + assert(q.size() == 0); + + fullRecords(q); + + Deno.core.print("shared_queue_test.js ok\n"); + q.reset(); +} + +main(); diff --git a/core/isolate.rs b/core/isolate.rs index e3db18663..112af3045 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -348,9 +348,7 @@ impl CoreIsolate { pub(crate) fn shared_init(&mut self) { if self.needs_init { self.needs_init = false; - js_check( - self.execute("shared_queue.js", include_str!("shared_queue.js")), - ); + js_check(self.execute("core.js", include_str!("core.js"))); // Maybe execute the startup script. if let Some(s) = self.startup_script.take() { self.execute(&s.filename, &s.source).unwrap() @@ -1126,15 +1124,10 @@ pub mod tests { } #[test] - fn test_js() { + fn core_test_js() { run_in_task(|mut cx| { let (mut isolate, _dispatch_count) = setup(Mode::Async); - js_check( - isolate.execute( - "shared_queue_test.js", - include_str!("shared_queue_test.js"), - ), - ); + js_check(isolate.execute("core_test.js", include_str!("core_test.js"))); if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) { unreachable!(); } diff --git a/core/shared_queue.js b/core/shared_queue.js deleted file mode 100644 index 1750740d6..000000000 --- a/core/shared_queue.js +++ /dev/null @@ -1,211 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -/* -SharedQueue Binary Layout -+-------------------------------+-------------------------------+ -| NUM_RECORDS (32) | -+---------------------------------------------------------------+ -| NUM_SHIFTED_OFF (32) | -+---------------------------------------------------------------+ -| HEAD (32) | -+---------------------------------------------------------------+ -| OFFSETS (32) | -+---------------------------------------------------------------+ -| RECORD_ENDS (*MAX_RECORDS) ... -+---------------------------------------------------------------+ -| RECORDS (*MAX_RECORDS) ... -+---------------------------------------------------------------+ - */ - -/* eslint-disable @typescript-eslint/no-use-before-define */ - -((window) => { - const GLOBAL_NAMESPACE = "Deno"; - const CORE_NAMESPACE = "core"; - const MAX_RECORDS = 100; - const INDEX_NUM_RECORDS = 0; - const INDEX_NUM_SHIFTED_OFF = 1; - const INDEX_HEAD = 2; - const INDEX_OFFSETS = 3; - const INDEX_RECORDS = INDEX_OFFSETS + 2 * MAX_RECORDS; - const HEAD_INIT = 4 * INDEX_RECORDS; - - // Available on start due to bindings. - const Deno = window[GLOBAL_NAMESPACE]; - const core = Deno[CORE_NAMESPACE]; - // Warning: DO NOT use window.Deno after this point. - // It is possible that the Deno namespace has been deleted. - // Use the above local Deno and core variable instead. - - let sharedBytes; - let shared32; - - let asyncHandlers; - - let initialized = false; - - function maybeInit() { - if (!initialized) { - init(); - initialized = true; - } - } - - function init() { - const shared = Deno.core.shared; - assert(shared.byteLength > 0); - assert(sharedBytes == null); - assert(shared32 == null); - sharedBytes = new Uint8Array(shared); - shared32 = new Int32Array(shared); - asyncHandlers = []; - // Callers should not call Deno.core.recv, use setAsyncHandler. - Deno.core.recv(handleAsyncMsgFromRust); - } - - function ops() { - // op id 0 is a special value to retrieve the map of registered ops. - const opsMapBytes = Deno.core.send(0, new Uint8Array([]), null); - const opsMapJson = String.fromCharCode.apply(null, opsMapBytes); - return JSON.parse(opsMapJson); - } - - function assert(cond) { - if (!cond) { - throw Error("assert"); - } - } - - function reset() { - maybeInit(); - shared32[INDEX_NUM_RECORDS] = 0; - shared32[INDEX_NUM_SHIFTED_OFF] = 0; - shared32[INDEX_HEAD] = HEAD_INIT; - } - - function head() { - maybeInit(); - return shared32[INDEX_HEAD]; - } - - function numRecords() { - return shared32[INDEX_NUM_RECORDS]; - } - - function size() { - return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF]; - } - - function setMeta(index, end, opId) { - shared32[INDEX_OFFSETS + 2 * index] = end; - shared32[INDEX_OFFSETS + 2 * index + 1] = opId; - } - - function getMeta(index) { - if (index < numRecords()) { - const buf = shared32[INDEX_OFFSETS + 2 * index]; - const opId = shared32[INDEX_OFFSETS + 2 * index + 1]; - return [opId, buf]; - } else { - return null; - } - } - - function getOffset(index) { - if (index < numRecords()) { - if (index == 0) { - return HEAD_INIT; - } else { - const prevEnd = shared32[INDEX_OFFSETS + 2 * (index - 1)]; - return (prevEnd + 3) & ~3; - } - } else { - return null; - } - } - - function push(opId, buf) { - const off = head(); - const end = off + buf.byteLength; - const alignedEnd = (end + 3) & ~3; - const index = numRecords(); - if (alignedEnd > shared32.byteLength || index >= MAX_RECORDS) { - // console.log("shared_queue.js push fail"); - return false; - } - setMeta(index, end, opId); - assert(alignedEnd % 4 === 0); - assert(end - off == buf.byteLength); - sharedBytes.set(buf, off); - shared32[INDEX_NUM_RECORDS] += 1; - shared32[INDEX_HEAD] = alignedEnd; - return true; - } - - /// Returns null if empty. - function shift() { - const i = shared32[INDEX_NUM_SHIFTED_OFF]; - if (size() == 0) { - assert(i == 0); - return null; - } - - const off = getOffset(i); - const [opId, end] = getMeta(i); - - if (size() > 1) { - shared32[INDEX_NUM_SHIFTED_OFF] += 1; - } else { - reset(); - } - - assert(off != null); - assert(end != null); - const buf = sharedBytes.subarray(off, end); - return [opId, buf]; - } - - function setAsyncHandler(opId, cb) { - maybeInit(); - assert(opId != null); - asyncHandlers[opId] = cb; - } - - function handleAsyncMsgFromRust(opId, buf) { - if (buf) { - // This is the overflow_response case of deno::Isolate::poll(). - asyncHandlers[opId](buf); - } else { - while (true) { - const opIdBuf = shift(); - if (opIdBuf == null) { - break; - } - assert(asyncHandlers[opIdBuf[0]] != null); - asyncHandlers[opIdBuf[0]](opIdBuf[1]); - } - } - } - - function dispatch(opId, control, zeroCopy = null) { - return Deno.core.send(opId, control, zeroCopy); - } - - const denoCore = { - setAsyncHandler, - dispatch, - sharedQueue: { - MAX_RECORDS, - head, - numRecords, - size, - push, - reset, - shift, - }, - ops, - }; - - assert(window[GLOBAL_NAMESPACE] != null); - assert(window[GLOBAL_NAMESPACE][CORE_NAMESPACE] != null); - Object.assign(core, denoCore); -})(this); diff --git a/core/shared_queue_test.js b/core/shared_queue_test.js deleted file mode 100644 index 7821f93a6..000000000 --- a/core/shared_queue_test.js +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -function assert(cond) { - if (!cond) { - throw Error("assert"); - } -} - -// Check overflow (corresponds to full_records test in rust) -function fullRecords(q) { - q.reset(); - const oneByte = new Uint8Array([42]); - for (let i = 0; i < q.MAX_RECORDS; i++) { - assert(q.push(1, oneByte)); - } - assert(!q.push(1, oneByte)); - const [opId, r] = q.shift(); - assert(opId == 1); - assert(r.byteLength == 1); - assert(r[0] == 42); - // Even if we shift one off, we still cannot push a new record. - assert(!q.push(1, oneByte)); -} - -function main() { - const q = Deno.core.sharedQueue; - - const h = q.head(); - assert(h > 0); - - // This record's len is not divisble by - // 4 so after pushing it to the queue, - // next record offset should be aligned to 4. - let r = new Uint8Array([1, 2, 3, 4, 5]); - const len = r.byteLength + h; - assert(q.push(1, r)); - // Record should be aligned to 4 bytes - assert(q.head() == len + 3); - - r = new Uint8Array([6, 7]); - assert(q.push(1, r)); - - r = new Uint8Array([8, 9, 10, 11]); - assert(q.push(1, r)); - assert(q.numRecords() == 3); - assert(q.size() == 3); - - let opId; - [opId, r] = q.shift(); - assert(r.byteLength == 5); - assert(r[0] == 1); - assert(r[1] == 2); - assert(r[2] == 3); - assert(r[3] == 4); - assert(r[4] == 5); - assert(q.numRecords() == 3); - assert(q.size() == 2); - - [opId, r] = q.shift(); - assert(r.byteLength == 2); - assert(r[0] == 6); - assert(r[1] == 7); - assert(q.numRecords() == 3); - assert(q.size() == 1); - - [opId, r] = q.shift(); - assert(opId == 1); - assert(r.byteLength == 4); - assert(r[0] == 8); - assert(r[1] == 9); - assert(r[2] == 10); - assert(r[3] == 11); - assert(q.numRecords() == 0); - assert(q.size() == 0); - - assert(q.shift() == null); - assert(q.shift() == null); - assert(q.numRecords() == 0); - assert(q.size() == 0); - - fullRecords(q); - - Deno.core.print("shared_queue_test.js ok\n"); - q.reset(); -} - -main(); -- cgit v1.2.3