summaryrefslogtreecommitdiff
path: root/runtime
diff options
context:
space:
mode:
authorsnek <snek@deno.com>2024-11-13 11:38:46 +0100
committerGitHub <noreply@github.com>2024-11-13 10:38:46 +0000
commitaa546189be730163ee5370029e4dfdb3b454ab96 (patch)
tree4407643e6908f82c9ac31d9ae5faf04b3ab8d413 /runtime
parent7becd83a3828b35331d0fcb82c64146e520f154b (diff)
feat: OpenTelemetry Tracing API and Exporting (#26710)
Initial import of OTEL code supporting tracing. Metrics soon to come. Implements APIs for https://jsr.io/@deno/otel so that code using OpenTelemetry.js just works tm. There is still a lot of work to do with configuration and adding built-in tracing to core APIs, which will come in followup PRs. --------- Co-authored-by: Luca Casonato <hello@lcas.dev>
Diffstat (limited to 'runtime')
-rw-r--r--runtime/Cargo.toml7
-rw-r--r--runtime/js/90_deno_ns.js19
-rw-r--r--runtime/js/99_main.js14
-rw-r--r--runtime/js/telemetry.js395
-rw-r--r--runtime/lib.rs16
-rw-r--r--runtime/ops/mod.rs1
-rw-r--r--runtime/ops/os/mod.rs2
-rw-r--r--runtime/ops/otel.rs686
-rw-r--r--runtime/shared.rs1
-rw-r--r--runtime/snapshot.rs1
-rw-r--r--runtime/web_worker.rs3
-rw-r--r--runtime/worker.rs3
-rw-r--r--runtime/worker_bootstrap.rs11
13 files changed, 1144 insertions, 15 deletions
diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml
index ba236de14..b59cd14fa 100644
--- a/runtime/Cargo.toml
+++ b/runtime/Cargo.toml
@@ -100,6 +100,7 @@ deno_websocket.workspace = true
deno_webstorage.workspace = true
node_resolver = { workspace = true, features = ["sync"] }
+async-trait.workspace = true
color-print.workspace = true
dlopen2.workspace = true
encoding_rs.workspace = true
@@ -114,7 +115,13 @@ log.workspace = true
netif = "0.1.6"
notify.workspace = true
once_cell.workspace = true
+opentelemetry.workspace = true
+opentelemetry-http.workspace = true
+opentelemetry-otlp.workspace = true
+opentelemetry-semantic-conventions.workspace = true
+opentelemetry_sdk.workspace = true
percent-encoding.workspace = true
+pin-project.workspace = true
regex.workspace = true
rustyline = { workspace = true, features = ["custom-bindings"] }
same-file = "1.0.6"
diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js
index fd2ac00f2..11f618ce2 100644
--- a/runtime/js/90_deno_ns.js
+++ b/runtime/js/90_deno_ns.js
@@ -29,6 +29,7 @@ import * as tty from "ext:runtime/40_tty.js";
import * as kv from "ext:deno_kv/01_db.ts";
import * as cron from "ext:deno_cron/01_cron.ts";
import * as webgpuSurface from "ext:deno_webgpu/02_surface.js";
+import * as telemetry from "ext:runtime/telemetry.js";
const denoNs = {
Process: process.Process,
@@ -134,7 +135,7 @@ const denoNs = {
createHttpClient: httpClient.createHttpClient,
};
-// NOTE(bartlomieju): keep IDs in sync with `cli/main.rs`
+// NOTE(bartlomieju): keep IDs in sync with `runtime/lib.rs`
const unstableIds = {
broadcastChannel: 1,
cron: 2,
@@ -143,11 +144,12 @@ const unstableIds = {
http: 5,
kv: 6,
net: 7,
- process: 8,
- temporal: 9,
- unsafeProto: 10,
- webgpu: 11,
- workerOptions: 12,
+ otel: 8,
+ process: 9,
+ temporal: 10,
+ unsafeProto: 11,
+ webgpu: 12,
+ workerOptions: 13,
};
const denoNsUnstableById = { __proto__: null };
@@ -181,4 +183,9 @@ denoNsUnstableById[unstableIds.webgpu] = {
// denoNsUnstableById[unstableIds.workerOptions] = { __proto__: null }
+denoNsUnstableById[unstableIds.otel] = {
+ tracing: telemetry.tracing,
+ metrics: telemetry.metrics,
+};
+
export { denoNs, denoNsUnstableById, unstableIds };
diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js
index 6ddaa1335..2da5c5398 100644
--- a/runtime/js/99_main.js
+++ b/runtime/js/99_main.js
@@ -86,6 +86,8 @@ import {
workerRuntimeGlobalProperties,
} from "ext:runtime/98_global_scope_worker.js";
import { SymbolDispose, SymbolMetadata } from "ext:deno_web/00_infra.js";
+import { bootstrap as bootstrapOtel } from "ext:runtime/telemetry.js";
+
// deno-lint-ignore prefer-primordials
if (Symbol.metadata) {
throw "V8 supports Symbol.metadata now, no need to shim it";
@@ -573,6 +575,7 @@ function bootstrapMainRuntime(runtimeOptions, warmup = false) {
10: serveHost,
11: serveIsMain,
12: serveWorkerCount,
+ 13: otelConfig,
} = runtimeOptions;
if (mode === executionModes.serve) {
@@ -673,9 +676,10 @@ function bootstrapMainRuntime(runtimeOptions, warmup = false) {
});
ObjectSetPrototypeOf(globalThis, Window.prototype);
+ bootstrapOtel(otelConfig);
+
if (inspectFlag) {
- const consoleFromDeno = globalThis.console;
- core.wrapConsole(consoleFromDeno, core.v8Console);
+ core.wrapConsole(globalThis.console, core.v8Console);
}
event.defineEventHandler(globalThis, "error");
@@ -855,6 +859,7 @@ function bootstrapWorkerRuntime(
5: hasNodeModulesDir,
6: argv0,
7: nodeDebug,
+ 13: otelConfig,
} = runtimeOptions;
performance.setTimeOrigin();
@@ -882,8 +887,9 @@ function bootstrapWorkerRuntime(
}
ObjectSetPrototypeOf(globalThis, DedicatedWorkerGlobalScope.prototype);
- const consoleFromDeno = globalThis.console;
- core.wrapConsole(consoleFromDeno, core.v8Console);
+ bootstrapOtel(otelConfig);
+
+ core.wrapConsole(globalThis.console, core.v8Console);
event.defineEventHandler(self, "message");
event.defineEventHandler(self, "error", undefined, true);
diff --git a/runtime/js/telemetry.js b/runtime/js/telemetry.js
new file mode 100644
index 000000000..e9eb51f7c
--- /dev/null
+++ b/runtime/js/telemetry.js
@@ -0,0 +1,395 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+import { core, primordials } from "ext:core/mod.js";
+import {
+ op_otel_log,
+ op_otel_span_attribute,
+ op_otel_span_attribute2,
+ op_otel_span_attribute3,
+ op_otel_span_continue,
+ op_otel_span_flush,
+ op_otel_span_start,
+} from "ext:core/ops";
+import { Console } from "ext:deno_console/01_console.js";
+import { performance } from "ext:deno_web/15_performance.js";
+
+const {
+ SymbolDispose,
+ MathRandom,
+ Array,
+ ObjectEntries,
+ SafeMap,
+ ReflectApply,
+ SymbolFor,
+ Error,
+} = primordials;
+const { AsyncVariable, setAsyncContext } = core;
+
+const CURRENT = new AsyncVariable();
+let TRACING_ENABLED = false;
+
+const SPAN_ID_BYTES = 8;
+const TRACE_ID_BYTES = 16;
+
+const TRACE_FLAG_SAMPLED = 1 << 0;
+
+const hexSliceLookupTable = (function () {
+ const alphabet = "0123456789abcdef";
+ const table = new Array(256);
+ for (let i = 0; i < 16; ++i) {
+ const i16 = i * 16;
+ for (let j = 0; j < 16; ++j) {
+ table[i16 + j] = alphabet[i] + alphabet[j];
+ }
+ }
+ return table;
+})();
+
+function generateId(bytes) {
+ let out = "";
+ for (let i = 0; i < bytes / 4; i += 1) {
+ const r32 = (MathRandom() * 2 ** 32) >>> 0;
+ out += hexSliceLookupTable[(r32 >> 24) & 0xff];
+ out += hexSliceLookupTable[(r32 >> 16) & 0xff];
+ out += hexSliceLookupTable[(r32 >> 8) & 0xff];
+ out += hexSliceLookupTable[r32 & 0xff];
+ }
+ return out;
+}
+
+function submit(span) {
+ if (!(span.traceFlags & TRACE_FLAG_SAMPLED)) return;
+
+ op_otel_span_start(
+ span.traceId,
+ span.spanId,
+ span.parentSpanId ?? "",
+ span.kind,
+ span.name,
+ span.startTime,
+ span.endTime,
+ );
+
+ if (span.status !== null && span.status.code !== 0) {
+ op_otel_span_continue(span.code, span.message ?? "");
+ }
+
+ const attributes = ObjectEntries(span.attributes);
+ let i = 0;
+ while (i < attributes.length) {
+ if (i + 2 < attributes.length) {
+ op_otel_span_attribute3(
+ attributes.length,
+ attributes[i][0],
+ attributes[i][1],
+ attributes[i + 1][0],
+ attributes[i + 1][1],
+ attributes[i + 2][0],
+ attributes[i + 2][1],
+ );
+ i += 3;
+ } else if (i + 1 < attributes.length) {
+ op_otel_span_attribute2(
+ attributes.length,
+ attributes[i][0],
+ attributes[i][1],
+ attributes[i + 1][0],
+ attributes[i + 1][1],
+ );
+ i += 2;
+ } else {
+ op_otel_span_attribute(
+ attributes.length,
+ attributes[i][0],
+ attributes[i][1],
+ );
+ i += 1;
+ }
+ }
+
+ op_otel_span_flush();
+}
+
+const now = () => (performance.timeOrigin + performance.now()) / 1000;
+
+const INVALID_SPAN_ID = "0000000000000000";
+const INVALID_TRACE_ID = "00000000000000000000000000000000";
+const NO_ASYNC_CONTEXT = {};
+
+class Span {
+ traceId;
+ spanId;
+ parentSpanId;
+ kind;
+ name;
+ startTime;
+ endTime;
+ status = null;
+ attributes = { __proto__: null };
+ traceFlags = TRACE_FLAG_SAMPLED;
+
+ enabled = TRACING_ENABLED;
+ #asyncContext = NO_ASYNC_CONTEXT;
+
+ constructor(name, kind = "internal") {
+ if (!this.enabled) {
+ this.traceId = INVALID_TRACE_ID;
+ this.spanId = INVALID_SPAN_ID;
+ this.parentSpanId = INVALID_SPAN_ID;
+ return;
+ }
+
+ this.startTime = now();
+
+ this.spanId = generateId(SPAN_ID_BYTES);
+
+ let traceId;
+ let parentSpanId;
+ const parent = Span.current();
+ if (parent) {
+ if (parent.spanId !== undefined) {
+ parentSpanId = parent.spanId;
+ traceId = parent.traceId;
+ } else {
+ const context = parent.spanContext();
+ parentSpanId = context.spanId;
+ traceId = context.traceId;
+ }
+ }
+ if (
+ traceId && traceId !== INVALID_TRACE_ID && parentSpanId &&
+ parentSpanId !== INVALID_SPAN_ID
+ ) {
+ this.traceId = traceId;
+ this.parentSpanId = parentSpanId;
+ } else {
+ this.traceId = generateId(TRACE_ID_BYTES);
+ this.parentSpanId = INVALID_SPAN_ID;
+ }
+
+ this.name = name;
+
+ switch (kind) {
+ case "internal":
+ this.kind = 0;
+ break;
+ case "server":
+ this.kind = 1;
+ break;
+ case "client":
+ this.kind = 2;
+ break;
+ case "producer":
+ this.kind = 3;
+ break;
+ case "consumer":
+ this.kind = 4;
+ break;
+ default:
+ throw new Error(`Invalid span kind: ${kind}`);
+ }
+
+ this.enter();
+ }
+
+ // helper function to match otel js api
+ spanContext() {
+ return {
+ traceId: this.traceId,
+ spanId: this.spanId,
+ traceFlags: this.traceFlags,
+ };
+ }
+
+ setAttribute(name, value) {
+ if (!this.enabled) return;
+ this.attributes[name] = value;
+ }
+
+ enter() {
+ if (!this.enabled) return;
+ const context = (CURRENT.get() || ROOT_CONTEXT).setValue(SPAN_KEY, this);
+ this.#asyncContext = CURRENT.enter(context);
+ }
+
+ exit() {
+ if (!this.enabled || this.#asyncContext === NO_ASYNC_CONTEXT) return;
+ setAsyncContext(this.#asyncContext);
+ this.#asyncContext = NO_ASYNC_CONTEXT;
+ }
+
+ end() {
+ if (!this.enabled || this.endTime !== undefined) return;
+ this.exit();
+ this.endTime = now();
+ submit(this);
+ }
+
+ [SymbolDispose]() {
+ this.end();
+ }
+
+ static current() {
+ return CURRENT.get()?.getValue(SPAN_KEY);
+ }
+}
+
+function hrToSecs(hr) {
+ return ((hr[0] * 1e3 + hr[1] / 1e6) / 1000);
+}
+
+// Exporter compatible with opentelemetry js library
+class SpanExporter {
+ export(spans, resultCallback) {
+ try {
+ for (let i = 0; i < spans.length; i += 1) {
+ const span = spans[i];
+ const context = span.spanContext();
+ submit({
+ spanId: context.spanId,
+ traceId: context.traceId,
+ traceFlags: context.traceFlags,
+ name: span.name,
+ kind: span.kind,
+ parentSpanId: span.parentSpanId,
+ startTime: hrToSecs(span.startTime),
+ endTime: hrToSecs(span.endTime),
+ status: span.status,
+ attributes: span.attributes,
+ });
+ }
+ resultCallback({ code: 0 });
+ } catch (error) {
+ resultCallback({ code: 1, error });
+ }
+ }
+
+ async shutdown() {}
+
+ async forceFlush() {}
+}
+
+// SPAN_KEY matches symbol in otel-js library
+const SPAN_KEY = SymbolFor("OpenTelemetry Context Key SPAN");
+
+// Context tracker compatible with otel-js api
+class Context {
+ #data = new SafeMap();
+
+ constructor(data) {
+ this.#data = data ? new SafeMap(data) : new SafeMap();
+ }
+
+ getValue(key) {
+ return this.#data.get(key);
+ }
+
+ setValue(key, value) {
+ const c = new Context(this.#data);
+ c.#data.set(key, value);
+ return c;
+ }
+
+ deleteValue(key) {
+ const c = new Context(this.#data);
+ c.#data.delete(key);
+ return c;
+ }
+}
+
+const ROOT_CONTEXT = new Context();
+
+// Context manager for opentelemetry js library
+class ContextManager {
+ active() {
+ return CURRENT.get() ?? ROOT_CONTEXT;
+ }
+
+ with(context, fn, thisArg, ...args) {
+ const ctx = CURRENT.enter(context);
+ try {
+ return ReflectApply(fn, thisArg, args);
+ } finally {
+ setAsyncContext(ctx);
+ }
+ }
+
+ bind(context, f) {
+ return (...args) => {
+ const ctx = CURRENT.enter(context);
+ try {
+ return ReflectApply(f, thisArg, args);
+ } finally {
+ setAsyncContext(ctx);
+ }
+ };
+ }
+
+ enable() {
+ return this;
+ }
+
+ disable() {
+ return this;
+ }
+}
+
+function otelLog(message, level) {
+ let traceId = "";
+ let spanId = "";
+ let traceFlags = 0;
+ const span = Span.current();
+ if (span) {
+ if (span.spanId !== undefined) {
+ spanId = span.spanId;
+ traceId = span.traceId;
+ traceFlags = span.traceFlags;
+ } else {
+ const context = span.spanContext();
+ spanId = context.spanId;
+ traceId = context.traceId;
+ traceFlags = context.traceFlags;
+ }
+ }
+ return op_otel_log(message, level, traceId, spanId, traceFlags);
+}
+
+const otelConsoleConfig = {
+ ignore: 0,
+ capture: 1,
+ replace: 2,
+};
+
+export function bootstrap(config) {
+ if (config.length === 0) return;
+ const { 0: consoleConfig } = config;
+
+ TRACING_ENABLED = true;
+
+ switch (consoleConfig) {
+ case otelConsoleConfig.capture:
+ core.wrapConsole(globalThis.console, new Console(otelLog));
+ break;
+ case otelConsoleConfig.replace:
+ ObjectDefineProperty(
+ globalThis,
+ "console",
+ core.propNonEnumerable(new Console(otelLog)),
+ );
+ break;
+ default:
+ break;
+ }
+}
+
+export const tracing = {
+ get enabled() {
+ return TRACING_ENABLED;
+ },
+ Span,
+ SpanExporter,
+ ContextManager,
+};
+
+// TODO(devsnek): implement metrics
+export const metrics = {};
diff --git a/runtime/lib.rs b/runtime/lib.rs
index f0b1129ce..21b61e1c0 100644
--- a/runtime/lib.rs
+++ b/runtime/lib.rs
@@ -99,18 +99,24 @@ pub static UNSTABLE_GRANULAR_FLAGS: &[UnstableGranularFlag] = &[
show_in_help: true,
id: 7,
},
+ UnstableGranularFlag {
+ name: "otel",
+ help_text: "Enable unstable OpenTelemetry features",
+ show_in_help: false,
+ id: 8,
+ },
// TODO(bartlomieju): consider removing it
UnstableGranularFlag {
name: ops::process::UNSTABLE_FEATURE_NAME,
help_text: "Enable unstable process APIs",
show_in_help: false,
- id: 8,
+ id: 9,
},
UnstableGranularFlag {
name: "temporal",
help_text: "Enable unstable Temporal API",
show_in_help: true,
- id: 9,
+ id: 10,
},
UnstableGranularFlag {
name: "unsafe-proto",
@@ -118,19 +124,19 @@ pub static UNSTABLE_GRANULAR_FLAGS: &[UnstableGranularFlag] = &[
show_in_help: true,
// This number is used directly in the JS code. Search
// for "unstableIds" to see where it's used.
- id: 10,
+ id: 11,
},
UnstableGranularFlag {
name: deno_webgpu::UNSTABLE_FEATURE_NAME,
help_text: "Enable unstable `WebGPU` APIs",
show_in_help: true,
- id: 11,
+ id: 12,
},
UnstableGranularFlag {
name: ops::worker_host::UNSTABLE_FEATURE_NAME,
help_text: "Enable unstable Web Worker APIs",
show_in_help: true,
- id: 12,
+ id: 13,
},
];
diff --git a/runtime/ops/mod.rs b/runtime/ops/mod.rs
index 67065b901..c2e402f33 100644
--- a/runtime/ops/mod.rs
+++ b/runtime/ops/mod.rs
@@ -4,6 +4,7 @@ pub mod bootstrap;
pub mod fs_events;
pub mod http;
pub mod os;
+pub mod otel;
pub mod permissions;
pub mod process;
pub mod runtime;
diff --git a/runtime/ops/os/mod.rs b/runtime/ops/os/mod.rs
index 9bee9d823..790962f38 100644
--- a/runtime/ops/os/mod.rs
+++ b/runtime/ops/os/mod.rs
@@ -186,6 +186,8 @@ fn op_get_exit_code(state: &mut OpState) -> i32 {
#[op2(fast)]
fn op_exit(state: &mut OpState) {
+ crate::ops::otel::otel_drop_state(state);
+
let code = state.borrow::<ExitCode>().get();
std::process::exit(code)
}
diff --git a/runtime/ops/otel.rs b/runtime/ops/otel.rs
new file mode 100644
index 000000000..6a4750acc
--- /dev/null
+++ b/runtime/ops/otel.rs
@@ -0,0 +1,686 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+use crate::tokio_util::create_basic_runtime;
+use deno_core::anyhow::anyhow;
+use deno_core::anyhow::{self};
+use deno_core::futures::channel::mpsc;
+use deno_core::futures::channel::mpsc::UnboundedSender;
+use deno_core::futures::future::BoxFuture;
+use deno_core::futures::stream;
+use deno_core::futures::Stream;
+use deno_core::futures::StreamExt;
+use deno_core::op2;
+use deno_core::v8;
+use deno_core::OpState;
+use once_cell::sync::Lazy;
+use opentelemetry::logs::Severity;
+use opentelemetry::trace::SpanContext;
+use opentelemetry::trace::SpanId;
+use opentelemetry::trace::SpanKind;
+use opentelemetry::trace::Status as SpanStatus;
+use opentelemetry::trace::TraceFlags;
+use opentelemetry::trace::TraceId;
+use opentelemetry::InstrumentationScope;
+use opentelemetry::Key;
+use opentelemetry::KeyValue;
+use opentelemetry::StringValue;
+use opentelemetry::Value;
+use opentelemetry_otlp::HttpExporterBuilder;
+use opentelemetry_otlp::Protocol;
+use opentelemetry_otlp::WithExportConfig;
+use opentelemetry_otlp::WithHttpConfig;
+use opentelemetry_sdk::export::trace::SpanData;
+use opentelemetry_sdk::logs::BatchLogProcessor;
+use opentelemetry_sdk::logs::LogProcessor as LogProcessorTrait;
+use opentelemetry_sdk::logs::LogRecord;
+use opentelemetry_sdk::trace::BatchSpanProcessor;
+use opentelemetry_sdk::trace::SpanProcessor as SpanProcessorTrait;
+use opentelemetry_sdk::Resource;
+use opentelemetry_semantic_conventions::resource::PROCESS_RUNTIME_NAME;
+use opentelemetry_semantic_conventions::resource::PROCESS_RUNTIME_VERSION;
+use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_LANGUAGE;
+use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_NAME;
+use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_VERSION;
+use serde::Deserialize;
+use serde::Serialize;
+use std::borrow::Cow;
+use std::env;
+use std::fmt::Debug;
+use std::pin::Pin;
+use std::task::Context;
+use std::task::Poll;
+use std::thread;
+use std::time::Duration;
+use std::time::SystemTime;
+
+type SpanProcessor = BatchSpanProcessor<OtelSharedRuntime>;
+type LogProcessor = BatchLogProcessor<OtelSharedRuntime>;
+
+deno_core::extension!(
+ deno_otel,
+ ops = [op_otel_log, op_otel_span_start, op_otel_span_continue, op_otel_span_attribute, op_otel_span_attribute2, op_otel_span_attribute3, op_otel_span_flush],
+ options = {
+ otel_config: Option<OtelConfig>, // `None` means OpenTelemetry is disabled.
+ },
+ state = |state, options| {
+ if let Some(otel_config) = options.otel_config {
+ otel_create_globals(otel_config, state).unwrap();
+ }
+ }
+);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct OtelConfig {
+ pub runtime_name: Cow<'static, str>,
+ pub runtime_version: Cow<'static, str>,
+ pub console: OtelConsoleConfig,
+}
+
+#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
+#[repr(u8)]
+pub enum OtelConsoleConfig {
+ Ignore = 0,
+ Capture = 1,
+ Replace = 2,
+}
+
+impl Default for OtelConfig {
+ fn default() -> Self {
+ Self {
+ runtime_name: Cow::Borrowed(env!("CARGO_PKG_NAME")),
+ runtime_version: Cow::Borrowed(env!("CARGO_PKG_VERSION")),
+ console: OtelConsoleConfig::Capture,
+ }
+ }
+}
+
+static OTEL_SHARED_RUNTIME_SPAWN_TASK_TX: Lazy<
+ UnboundedSender<BoxFuture<'static, ()>>,
+> = Lazy::new(otel_create_shared_runtime);
+
+fn otel_create_shared_runtime() -> UnboundedSender<BoxFuture<'static, ()>> {
+ let (spawn_task_tx, mut spawn_task_rx) =
+ mpsc::unbounded::<BoxFuture<'static, ()>>();
+
+ thread::spawn(move || {
+ let rt = create_basic_runtime();
+ rt.block_on(async move {
+ while let Some(task) = spawn_task_rx.next().await {
+ tokio::spawn(task);
+ }
+ });
+ });
+
+ spawn_task_tx
+}
+
+#[derive(Clone, Copy)]
+struct OtelSharedRuntime;
+
+impl hyper::rt::Executor<BoxFuture<'static, ()>> for OtelSharedRuntime {
+ fn execute(&self, fut: BoxFuture<'static, ()>) {
+ (*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX)
+ .unbounded_send(fut)
+ .expect("failed to send task to shared OpenTelemetry runtime");
+ }
+}
+
+impl opentelemetry_sdk::runtime::Runtime for OtelSharedRuntime {
+ type Interval = Pin<Box<dyn Stream<Item = ()> + Send + 'static>>;
+ type Delay = Pin<Box<tokio::time::Sleep>>;
+
+ fn interval(&self, period: Duration) -> Self::Interval {
+ stream::repeat(())
+ .then(move |_| tokio::time::sleep(period))
+ .boxed()
+ }
+
+ fn spawn(&self, future: BoxFuture<'static, ()>) {
+ (*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX)
+ .unbounded_send(future)
+ .expect("failed to send task to shared OpenTelemetry runtime");
+ }
+
+ fn delay(&self, duration: Duration) -> Self::Delay {
+ Box::pin(tokio::time::sleep(duration))
+ }
+}
+
+impl opentelemetry_sdk::runtime::RuntimeChannel for OtelSharedRuntime {
+ type Receiver<T: Debug + Send> = BatchMessageChannelReceiver<T>;
+ type Sender<T: Debug + Send> = BatchMessageChannelSender<T>;
+
+ fn batch_message_channel<T: Debug + Send>(
+ &self,
+ capacity: usize,
+ ) -> (Self::Sender<T>, Self::Receiver<T>) {
+ let (batch_tx, batch_rx) = tokio::sync::mpsc::channel::<T>(capacity);
+ (batch_tx.into(), batch_rx.into())
+ }
+}
+
+#[derive(Debug)]
+pub struct BatchMessageChannelSender<T: Send> {
+ sender: tokio::sync::mpsc::Sender<T>,
+}
+
+impl<T: Send> From<tokio::sync::mpsc::Sender<T>>
+ for BatchMessageChannelSender<T>
+{
+ fn from(sender: tokio::sync::mpsc::Sender<T>) -> Self {
+ Self { sender }
+ }
+}
+
+impl<T: Send> opentelemetry_sdk::runtime::TrySend
+ for BatchMessageChannelSender<T>
+{
+ type Message = T;
+
+ fn try_send(
+ &self,
+ item: Self::Message,
+ ) -> Result<(), opentelemetry_sdk::runtime::TrySendError> {
+ self.sender.try_send(item).map_err(|err| match err {
+ tokio::sync::mpsc::error::TrySendError::Full(_) => {
+ opentelemetry_sdk::runtime::TrySendError::ChannelFull
+ }
+ tokio::sync::mpsc::error::TrySendError::Closed(_) => {
+ opentelemetry_sdk::runtime::TrySendError::ChannelClosed
+ }
+ })
+ }
+}
+
+pub struct BatchMessageChannelReceiver<T> {
+ receiver: tokio::sync::mpsc::Receiver<T>,
+}
+
+impl<T> From<tokio::sync::mpsc::Receiver<T>>
+ for BatchMessageChannelReceiver<T>
+{
+ fn from(receiver: tokio::sync::mpsc::Receiver<T>) -> Self {
+ Self { receiver }
+ }
+}
+
+impl<T> Stream for BatchMessageChannelReceiver<T> {
+ type Item = T;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ self.receiver.poll_recv(cx)
+ }
+}
+
+mod hyper_client {
+ use http_body_util::BodyExt;
+ use http_body_util::Full;
+ use hyper::body::Body as HttpBody;
+ use hyper::body::Frame;
+ use hyper_util::client::legacy::connect::HttpConnector;
+ use hyper_util::client::legacy::Client;
+ use opentelemetry_http::Bytes;
+ use opentelemetry_http::HttpError;
+ use opentelemetry_http::Request;
+ use opentelemetry_http::Response;
+ use opentelemetry_http::ResponseExt;
+ use std::fmt::Debug;
+ use std::pin::Pin;
+ use std::task::Poll;
+ use std::task::{self};
+
+ use super::OtelSharedRuntime;
+
+ // same as opentelemetry_http::HyperClient except it uses OtelSharedRuntime
+ #[derive(Debug, Clone)]
+ pub struct HyperClient {
+ inner: Client<HttpConnector, Body>,
+ }
+
+ impl HyperClient {
+ pub fn new() -> Self {
+ Self {
+ inner: Client::builder(OtelSharedRuntime).build(HttpConnector::new()),
+ }
+ }
+ }
+
+ #[async_trait::async_trait]
+ impl opentelemetry_http::HttpClient for HyperClient {
+ async fn send(
+ &self,
+ request: Request<Vec<u8>>,
+ ) -> Result<Response<Bytes>, HttpError> {
+ let (parts, body) = request.into_parts();
+ let request = Request::from_parts(parts, Body(Full::from(body)));
+ let mut response = self.inner.request(request).await?;
+ let headers = std::mem::take(response.headers_mut());
+
+ let mut http_response = Response::builder()
+ .status(response.status())
+ .body(response.into_body().collect().await?.to_bytes())?;
+ *http_response.headers_mut() = headers;
+
+ Ok(http_response.error_for_status()?)
+ }
+ }
+
+ #[pin_project::pin_project]
+ pub struct Body(#[pin] Full<Bytes>);
+
+ impl HttpBody for Body {
+ type Data = Bytes;
+ type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
+
+ #[inline]
+ fn poll_frame(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
+ self.project().0.poll_frame(cx).map_err(Into::into)
+ }
+
+ #[inline]
+ fn is_end_stream(&self) -> bool {
+ self.0.is_end_stream()
+ }
+
+ #[inline]
+ fn size_hint(&self) -> hyper::body::SizeHint {
+ self.0.size_hint()
+ }
+ }
+}
+
+fn otel_create_globals(
+ config: OtelConfig,
+ op_state: &mut OpState,
+) -> anyhow::Result<()> {
+ // Parse the `OTEL_EXPORTER_OTLP_PROTOCOL` variable. The opentelemetry_*
+ // crates don't do this automatically.
+ // TODO(piscisaureus): enable GRPC support.
+ let protocol = match env::var("OTEL_EXPORTER_OTLP_PROTOCOL").as_deref() {
+ Ok("http/protobuf") => Protocol::HttpBinary,
+ Ok("http/json") => Protocol::HttpJson,
+ Ok("") | Err(env::VarError::NotPresent) => {
+ return Ok(());
+ }
+ Ok(protocol) => {
+ return Err(anyhow!(
+ "Env var OTEL_EXPORTER_OTLP_PROTOCOL specifies an unsupported protocol: {}",
+ protocol
+ ));
+ }
+ Err(err) => {
+ return Err(anyhow!(
+ "Failed to read env var OTEL_EXPORTER_OTLP_PROTOCOL: {}",
+ err
+ ))
+ }
+ };
+
+ // Define the resource attributes that will be attached to all log records.
+ // These attributes are sourced as follows (in order of precedence):
+ // * The `service.name` attribute from the `OTEL_SERVICE_NAME` env var.
+ // * Additional attributes from the `OTEL_RESOURCE_ATTRIBUTES` env var.
+ // * Default attribute values defined here.
+ // TODO(piscisaureus): add more default attributes (e.g. script path).
+ let mut resource = Resource::default();
+
+ // Add the runtime name and version to the resource attributes. Also override
+ // the `telemetry.sdk` attributes to include the Deno runtime.
+ resource = resource.merge(&Resource::new(vec![
+ KeyValue::new(PROCESS_RUNTIME_NAME, config.runtime_name),
+ KeyValue::new(PROCESS_RUNTIME_VERSION, config.runtime_version.clone()),
+ KeyValue::new(
+ TELEMETRY_SDK_LANGUAGE,
+ format!(
+ "deno-{}",
+ resource.get(Key::new(TELEMETRY_SDK_LANGUAGE)).unwrap()
+ ),
+ ),
+ KeyValue::new(
+ TELEMETRY_SDK_NAME,
+ format!(
+ "deno-{}",
+ resource.get(Key::new(TELEMETRY_SDK_NAME)).unwrap()
+ ),
+ ),
+ KeyValue::new(
+ TELEMETRY_SDK_VERSION,
+ format!(
+ "{}-{}",
+ config.runtime_version,
+ resource.get(Key::new(TELEMETRY_SDK_VERSION)).unwrap()
+ ),
+ ),
+ ]));
+
+ // The OTLP endpoint is automatically picked up from the
+ // `OTEL_EXPORTER_OTLP_ENDPOINT` environment variable. Additional headers can
+ // be specified using `OTEL_EXPORTER_OTLP_HEADERS`.
+
+ let client = hyper_client::HyperClient::new();
+
+ let span_exporter = HttpExporterBuilder::default()
+ .with_http_client(client.clone())
+ .with_protocol(protocol)
+ .build_span_exporter()?;
+ let mut span_processor =
+ BatchSpanProcessor::builder(span_exporter, OtelSharedRuntime).build();
+ span_processor.set_resource(&resource);
+ op_state.put::<SpanProcessor>(span_processor);
+
+ let log_exporter = HttpExporterBuilder::default()
+ .with_http_client(client)
+ .with_protocol(protocol)
+ .build_log_exporter()?;
+ let log_processor =
+ BatchLogProcessor::builder(log_exporter, OtelSharedRuntime).build();
+ log_processor.set_resource(&resource);
+ op_state.put::<LogProcessor>(log_processor);
+
+ Ok(())
+}
+
+/// This function is called by the runtime whenever it is about to call
+/// `os::process::exit()`, to ensure that all OpenTelemetry logs are properly
+/// flushed before the process terminates.
+pub fn otel_drop_state(state: &mut OpState) {
+ if let Some(processor) = state.try_take::<SpanProcessor>() {
+ let _ = processor.force_flush();
+ drop(processor);
+ }
+ if let Some(processor) = state.try_take::<LogProcessor>() {
+ let _ = processor.force_flush();
+ drop(processor);
+ }
+}
+
+#[op2(fast)]
+fn op_otel_log(
+ state: &mut OpState,
+ #[string] message: String,
+ #[smi] level: i32,
+ #[string] trace_id: &str,
+ #[string] span_id: &str,
+ #[smi] trace_flags: u8,
+) {
+ let Some(logger) = state.try_borrow::<LogProcessor>() else {
+ log::error!("op_otel_log: OpenTelemetry Logger not available");
+ return;
+ };
+
+ // Convert the integer log level that ext/console uses to the corresponding
+ // OpenTelemetry log severity.
+ let severity = match level {
+ ..=0 => Severity::Debug,
+ 1 => Severity::Info,
+ 2 => Severity::Warn,
+ 3.. => Severity::Error,
+ };
+
+ let mut log_record = LogRecord::default();
+
+ log_record.observed_timestamp = Some(SystemTime::now());
+ log_record.body = Some(message.into());
+ log_record.severity_number = Some(severity);
+ log_record.severity_text = Some(severity.name());
+ if let (Ok(trace_id), Ok(span_id)) =
+ (TraceId::from_hex(trace_id), SpanId::from_hex(span_id))
+ {
+ let span_context = SpanContext::new(
+ trace_id,
+ span_id,
+ TraceFlags::new(trace_flags),
+ false,
+ Default::default(),
+ );
+ log_record.trace_context = Some((&span_context).into());
+ }
+ logger.emit(
+ &mut log_record,
+ &InstrumentationScope::builder("deno").build(),
+ );
+}
+
+struct TemporarySpan(SpanData);
+
+#[allow(clippy::too_many_arguments)]
+#[op2(fast)]
+fn op_otel_span_start<'s>(
+ scope: &mut v8::HandleScope<'s>,
+ state: &mut OpState,
+ trace_id: v8::Local<'s, v8::Value>,
+ span_id: v8::Local<'s, v8::Value>,
+ parent_span_id: v8::Local<'s, v8::Value>,
+ #[smi] span_kind: u8,
+ name: v8::Local<'s, v8::Value>,
+ start_time: f64,
+ end_time: f64,
+) -> Result<(), anyhow::Error> {
+ if let Some(temporary_span) = state.try_take::<TemporarySpan>() {
+ let Some(span_processor) = state.try_borrow::<SpanProcessor>() else {
+ return Ok(());
+ };
+ span_processor.on_end(temporary_span.0);
+ };
+
+ let trace_id = {
+ let x = v8::ValueView::new(scope, trace_id.try_cast()?);
+ match x.data() {
+ v8::ValueViewData::OneByte(bytes) => {
+ TraceId::from_hex(&String::from_utf8_lossy(bytes))?
+ }
+ _ => return Err(anyhow!("invalid trace_id")),
+ }
+ };
+
+ let span_id = {
+ let x = v8::ValueView::new(scope, span_id.try_cast()?);
+ match x.data() {
+ v8::ValueViewData::OneByte(bytes) => {
+ SpanId::from_hex(&String::from_utf8_lossy(bytes))?
+ }
+ _ => return Err(anyhow!("invalid span_id")),
+ }
+ };
+
+ let parent_span_id = {
+ let x = v8::ValueView::new(scope, parent_span_id.try_cast()?);
+ match x.data() {
+ v8::ValueViewData::OneByte(bytes) => {
+ let s = String::from_utf8_lossy(bytes);
+ if s.is_empty() {
+ SpanId::INVALID
+ } else {
+ SpanId::from_hex(&s)?
+ }
+ }
+ _ => return Err(anyhow!("invalid parent_span_id")),
+ }
+ };
+
+ let name = {
+ let x = v8::ValueView::new(scope, name.try_cast()?);
+ match x.data() {
+ v8::ValueViewData::OneByte(bytes) => {
+ String::from_utf8_lossy(bytes).into_owned()
+ }
+ v8::ValueViewData::TwoByte(bytes) => String::from_utf16_lossy(bytes),
+ }
+ };
+
+ let temporary_span = TemporarySpan(SpanData {
+ span_context: SpanContext::new(
+ trace_id,
+ span_id,
+ TraceFlags::SAMPLED,
+ false,
+ Default::default(),
+ ),
+ parent_span_id,
+ span_kind: match span_kind {
+ 0 => SpanKind::Internal,
+ 1 => SpanKind::Server,
+ 2 => SpanKind::Client,
+ 3 => SpanKind::Producer,
+ 4 => SpanKind::Consumer,
+ _ => return Err(anyhow!("invalid span kind")),
+ },
+ name: Cow::Owned(name),
+ start_time: SystemTime::UNIX_EPOCH
+ .checked_add(std::time::Duration::from_secs_f64(start_time))
+ .ok_or_else(|| anyhow!("invalid start time"))?,
+ end_time: SystemTime::UNIX_EPOCH
+ .checked_add(std::time::Duration::from_secs_f64(end_time))
+ .ok_or_else(|| anyhow!("invalid start time"))?,
+ attributes: Vec::new(),
+ dropped_attributes_count: 0,
+ events: Default::default(),
+ links: Default::default(),
+ status: SpanStatus::Unset,
+ instrumentation_scope: InstrumentationScope::builder("deno").build(),
+ });
+ state.put(temporary_span);
+
+ Ok(())
+}
+
+#[op2(fast)]
+fn op_otel_span_continue(
+ state: &mut OpState,
+ #[smi] status: u8,
+ #[string] error_description: Cow<'_, str>,
+) {
+ if let Some(temporary_span) = state.try_borrow_mut::<TemporarySpan>() {
+ temporary_span.0.status = match status {
+ 0 => SpanStatus::Unset,
+ 1 => SpanStatus::Ok,
+ 2 => SpanStatus::Error {
+ description: Cow::Owned(error_description.into_owned()),
+ },
+ _ => return,
+ };
+ }
+}
+
+macro_rules! attr {
+ ($scope:ident, $temporary_span:ident, $name:ident, $value:ident) => {
+ let name = if let Ok(name) = $name.try_cast() {
+ let view = v8::ValueView::new($scope, name);
+ match view.data() {
+ v8::ValueViewData::OneByte(bytes) => {
+ Some(String::from_utf8_lossy(bytes).into_owned())
+ }
+ v8::ValueViewData::TwoByte(bytes) => {
+ Some(String::from_utf16_lossy(bytes))
+ }
+ }
+ } else {
+ None
+ };
+ let value = if let Ok(string) = $value.try_cast::<v8::String>() {
+ Some(Value::String(StringValue::from({
+ let x = v8::ValueView::new($scope, string);
+ match x.data() {
+ v8::ValueViewData::OneByte(bytes) => {
+ String::from_utf8_lossy(bytes).into_owned()
+ }
+ v8::ValueViewData::TwoByte(bytes) => String::from_utf16_lossy(bytes),
+ }
+ })))
+ } else if let Ok(number) = $value.try_cast::<v8::Number>() {
+ Some(Value::F64(number.value()))
+ } else if let Ok(boolean) = $value.try_cast::<v8::Boolean>() {
+ Some(Value::Bool(boolean.is_true()))
+ } else if let Ok(bigint) = $value.try_cast::<v8::BigInt>() {
+ let (i64_value, _lossless) = bigint.i64_value();
+ Some(Value::I64(i64_value))
+ } else {
+ None
+ };
+ if let (Some(name), Some(value)) = (name, value) {
+ $temporary_span
+ .0
+ .attributes
+ .push(KeyValue::new(name, value));
+ } else {
+ $temporary_span.0.dropped_attributes_count += 1;
+ }
+ };
+}
+
+#[op2(fast)]
+fn op_otel_span_attribute<'s>(
+ scope: &mut v8::HandleScope<'s>,
+ state: &mut OpState,
+ #[smi] capacity: u32,
+ key: v8::Local<'s, v8::Value>,
+ value: v8::Local<'s, v8::Value>,
+) {
+ if let Some(temporary_span) = state.try_borrow_mut::<TemporarySpan>() {
+ temporary_span.0.attributes.reserve_exact(
+ (capacity as usize) - temporary_span.0.attributes.capacity(),
+ );
+ attr!(scope, temporary_span, key, value);
+ }
+}
+
+#[op2(fast)]
+fn op_otel_span_attribute2<'s>(
+ scope: &mut v8::HandleScope<'s>,
+ state: &mut OpState,
+ #[smi] capacity: u32,
+ key1: v8::Local<'s, v8::Value>,
+ value1: v8::Local<'s, v8::Value>,
+ key2: v8::Local<'s, v8::Value>,
+ value2: v8::Local<'s, v8::Value>,
+) {
+ if let Some(temporary_span) = state.try_borrow_mut::<TemporarySpan>() {
+ temporary_span.0.attributes.reserve_exact(
+ (capacity as usize) - temporary_span.0.attributes.capacity(),
+ );
+ attr!(scope, temporary_span, key1, value1);
+ attr!(scope, temporary_span, key2, value2);
+ }
+}
+
+#[allow(clippy::too_many_arguments)]
+#[op2(fast)]
+fn op_otel_span_attribute3<'s>(
+ scope: &mut v8::HandleScope<'s>,
+ state: &mut OpState,
+ #[smi] capacity: u32,
+ key1: v8::Local<'s, v8::Value>,
+ value1: v8::Local<'s, v8::Value>,
+ key2: v8::Local<'s, v8::Value>,
+ value2: v8::Local<'s, v8::Value>,
+ key3: v8::Local<'s, v8::Value>,
+ value3: v8::Local<'s, v8::Value>,
+) {
+ if let Some(temporary_span) = state.try_borrow_mut::<TemporarySpan>() {
+ temporary_span.0.attributes.reserve_exact(
+ (capacity as usize) - temporary_span.0.attributes.capacity(),
+ );
+ attr!(scope, temporary_span, key1, value1);
+ attr!(scope, temporary_span, key2, value2);
+ attr!(scope, temporary_span, key3, value3);
+ }
+}
+
+#[op2(fast)]
+fn op_otel_span_flush(state: &mut OpState) {
+ let Some(temporary_span) = state.try_take::<TemporarySpan>() else {
+ return;
+ };
+
+ let Some(span_processor) = state.try_borrow::<SpanProcessor>() else {
+ return;
+ };
+
+ span_processor.on_end(temporary_span.0);
+}
diff --git a/runtime/shared.rs b/runtime/shared.rs
index f7d76f67a..c05f352f1 100644
--- a/runtime/shared.rs
+++ b/runtime/shared.rs
@@ -47,6 +47,7 @@ extension!(runtime,
"40_signals.js",
"40_tty.js",
"41_prompt.js",
+ "telemetry.js",
"90_deno_ns.js",
"98_global_scope_shared.js",
"98_global_scope_window.js",
diff --git a/runtime/snapshot.rs b/runtime/snapshot.rs
index 251ee5f41..bb9bf9166 100644
--- a/runtime/snapshot.rs
+++ b/runtime/snapshot.rs
@@ -312,6 +312,7 @@ pub fn create_runtime_snapshot(
),
ops::fs_events::deno_fs_events::init_ops(),
ops::os::deno_os::init_ops(Default::default()),
+ ops::otel::deno_otel::init_ops(None),
ops::permissions::deno_permissions::init_ops(),
ops::process::deno_process::init_ops(None),
ops::signal::deno_signal::init_ops(),
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index 61e5c7702..d81c82c50 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -505,6 +505,9 @@ impl WebWorker {
),
ops::fs_events::deno_fs_events::init_ops_and_esm(),
ops::os::deno_os_worker::init_ops_and_esm(),
+ ops::otel::deno_otel::init_ops_and_esm(
+ options.bootstrap.otel_config.clone(),
+ ),
ops::permissions::deno_permissions::init_ops_and_esm(),
ops::process::deno_process::init_ops_and_esm(
services.npm_process_state_provider,
diff --git a/runtime/worker.rs b/runtime/worker.rs
index 88a61fa93..82df755fa 100644
--- a/runtime/worker.rs
+++ b/runtime/worker.rs
@@ -422,6 +422,9 @@ impl MainWorker {
),
ops::fs_events::deno_fs_events::init_ops_and_esm(),
ops::os::deno_os::init_ops_and_esm(exit_code.clone()),
+ ops::otel::deno_otel::init_ops_and_esm(
+ options.bootstrap.otel_config.clone(),
+ ),
ops::permissions::deno_permissions::init_ops_and_esm(),
ops::process::deno_process::init_ops_and_esm(
services.npm_process_state_provider,
diff --git a/runtime/worker_bootstrap.rs b/runtime/worker_bootstrap.rs
index 3f3c25c5e..dc989a1c0 100644
--- a/runtime/worker_bootstrap.rs
+++ b/runtime/worker_bootstrap.rs
@@ -1,5 +1,6 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+use crate::ops::otel::OtelConfig;
use deno_core::v8;
use deno_core::ModuleSpecifier;
use serde::Serialize;
@@ -118,6 +119,8 @@ pub struct BootstrapOptions {
// Used by `deno serve`
pub serve_port: Option<u16>,
pub serve_host: Option<String>,
+ // OpenTelemetry output options. If `None`, OpenTelemetry is disabled.
+ pub otel_config: Option<OtelConfig>,
}
impl Default for BootstrapOptions {
@@ -152,6 +155,7 @@ impl Default for BootstrapOptions {
mode: WorkerExecutionMode::None,
serve_port: Default::default(),
serve_host: Default::default(),
+ otel_config: None,
}
}
}
@@ -193,6 +197,8 @@ struct BootstrapV8<'a>(
Option<bool>,
// serve worker count
Option<usize>,
+ // OTEL config
+ Box<[u8]>,
);
impl BootstrapOptions {
@@ -219,6 +225,11 @@ impl BootstrapOptions {
self.serve_host.as_deref(),
serve_is_main,
serve_worker_count,
+ if let Some(otel_config) = self.otel_config.as_ref() {
+ Box::new([otel_config.console as u8])
+ } else {
+ Box::new([])
+ },
);
bootstrap.serialize(ser).unwrap()