diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/Cargo.toml | 7 | ||||
-rw-r--r-- | runtime/js/90_deno_ns.js | 19 | ||||
-rw-r--r-- | runtime/js/99_main.js | 14 | ||||
-rw-r--r-- | runtime/js/telemetry.js | 395 | ||||
-rw-r--r-- | runtime/lib.rs | 16 | ||||
-rw-r--r-- | runtime/ops/mod.rs | 1 | ||||
-rw-r--r-- | runtime/ops/os/mod.rs | 2 | ||||
-rw-r--r-- | runtime/ops/otel.rs | 686 | ||||
-rw-r--r-- | runtime/shared.rs | 1 | ||||
-rw-r--r-- | runtime/snapshot.rs | 1 | ||||
-rw-r--r-- | runtime/web_worker.rs | 3 | ||||
-rw-r--r-- | runtime/worker.rs | 3 | ||||
-rw-r--r-- | runtime/worker_bootstrap.rs | 11 |
13 files changed, 1144 insertions, 15 deletions
diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index ba236de14..b59cd14fa 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -100,6 +100,7 @@ deno_websocket.workspace = true deno_webstorage.workspace = true node_resolver = { workspace = true, features = ["sync"] } +async-trait.workspace = true color-print.workspace = true dlopen2.workspace = true encoding_rs.workspace = true @@ -114,7 +115,13 @@ log.workspace = true netif = "0.1.6" notify.workspace = true once_cell.workspace = true +opentelemetry.workspace = true +opentelemetry-http.workspace = true +opentelemetry-otlp.workspace = true +opentelemetry-semantic-conventions.workspace = true +opentelemetry_sdk.workspace = true percent-encoding.workspace = true +pin-project.workspace = true regex.workspace = true rustyline = { workspace = true, features = ["custom-bindings"] } same-file = "1.0.6" diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js index fd2ac00f2..11f618ce2 100644 --- a/runtime/js/90_deno_ns.js +++ b/runtime/js/90_deno_ns.js @@ -29,6 +29,7 @@ import * as tty from "ext:runtime/40_tty.js"; import * as kv from "ext:deno_kv/01_db.ts"; import * as cron from "ext:deno_cron/01_cron.ts"; import * as webgpuSurface from "ext:deno_webgpu/02_surface.js"; +import * as telemetry from "ext:runtime/telemetry.js"; const denoNs = { Process: process.Process, @@ -134,7 +135,7 @@ const denoNs = { createHttpClient: httpClient.createHttpClient, }; -// NOTE(bartlomieju): keep IDs in sync with `cli/main.rs` +// NOTE(bartlomieju): keep IDs in sync with `runtime/lib.rs` const unstableIds = { broadcastChannel: 1, cron: 2, @@ -143,11 +144,12 @@ const unstableIds = { http: 5, kv: 6, net: 7, - process: 8, - temporal: 9, - unsafeProto: 10, - webgpu: 11, - workerOptions: 12, + otel: 8, + process: 9, + temporal: 10, + unsafeProto: 11, + webgpu: 12, + workerOptions: 13, }; const denoNsUnstableById = { __proto__: null }; @@ -181,4 +183,9 @@ denoNsUnstableById[unstableIds.webgpu] = { // denoNsUnstableById[unstableIds.workerOptions] = { __proto__: null } +denoNsUnstableById[unstableIds.otel] = { + tracing: telemetry.tracing, + metrics: telemetry.metrics, +}; + export { denoNs, denoNsUnstableById, unstableIds }; diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index 6ddaa1335..2da5c5398 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -86,6 +86,8 @@ import { workerRuntimeGlobalProperties, } from "ext:runtime/98_global_scope_worker.js"; import { SymbolDispose, SymbolMetadata } from "ext:deno_web/00_infra.js"; +import { bootstrap as bootstrapOtel } from "ext:runtime/telemetry.js"; + // deno-lint-ignore prefer-primordials if (Symbol.metadata) { throw "V8 supports Symbol.metadata now, no need to shim it"; @@ -573,6 +575,7 @@ function bootstrapMainRuntime(runtimeOptions, warmup = false) { 10: serveHost, 11: serveIsMain, 12: serveWorkerCount, + 13: otelConfig, } = runtimeOptions; if (mode === executionModes.serve) { @@ -673,9 +676,10 @@ function bootstrapMainRuntime(runtimeOptions, warmup = false) { }); ObjectSetPrototypeOf(globalThis, Window.prototype); + bootstrapOtel(otelConfig); + if (inspectFlag) { - const consoleFromDeno = globalThis.console; - core.wrapConsole(consoleFromDeno, core.v8Console); + core.wrapConsole(globalThis.console, core.v8Console); } event.defineEventHandler(globalThis, "error"); @@ -855,6 +859,7 @@ function bootstrapWorkerRuntime( 5: hasNodeModulesDir, 6: argv0, 7: nodeDebug, + 13: otelConfig, } = runtimeOptions; performance.setTimeOrigin(); @@ -882,8 +887,9 @@ function bootstrapWorkerRuntime( } ObjectSetPrototypeOf(globalThis, DedicatedWorkerGlobalScope.prototype); - const consoleFromDeno = globalThis.console; - core.wrapConsole(consoleFromDeno, core.v8Console); + bootstrapOtel(otelConfig); + + core.wrapConsole(globalThis.console, core.v8Console); event.defineEventHandler(self, "message"); event.defineEventHandler(self, "error", undefined, true); diff --git a/runtime/js/telemetry.js b/runtime/js/telemetry.js new file mode 100644 index 000000000..e9eb51f7c --- /dev/null +++ b/runtime/js/telemetry.js @@ -0,0 +1,395 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +import { core, primordials } from "ext:core/mod.js"; +import { + op_otel_log, + op_otel_span_attribute, + op_otel_span_attribute2, + op_otel_span_attribute3, + op_otel_span_continue, + op_otel_span_flush, + op_otel_span_start, +} from "ext:core/ops"; +import { Console } from "ext:deno_console/01_console.js"; +import { performance } from "ext:deno_web/15_performance.js"; + +const { + SymbolDispose, + MathRandom, + Array, + ObjectEntries, + SafeMap, + ReflectApply, + SymbolFor, + Error, +} = primordials; +const { AsyncVariable, setAsyncContext } = core; + +const CURRENT = new AsyncVariable(); +let TRACING_ENABLED = false; + +const SPAN_ID_BYTES = 8; +const TRACE_ID_BYTES = 16; + +const TRACE_FLAG_SAMPLED = 1 << 0; + +const hexSliceLookupTable = (function () { + const alphabet = "0123456789abcdef"; + const table = new Array(256); + for (let i = 0; i < 16; ++i) { + const i16 = i * 16; + for (let j = 0; j < 16; ++j) { + table[i16 + j] = alphabet[i] + alphabet[j]; + } + } + return table; +})(); + +function generateId(bytes) { + let out = ""; + for (let i = 0; i < bytes / 4; i += 1) { + const r32 = (MathRandom() * 2 ** 32) >>> 0; + out += hexSliceLookupTable[(r32 >> 24) & 0xff]; + out += hexSliceLookupTable[(r32 >> 16) & 0xff]; + out += hexSliceLookupTable[(r32 >> 8) & 0xff]; + out += hexSliceLookupTable[r32 & 0xff]; + } + return out; +} + +function submit(span) { + if (!(span.traceFlags & TRACE_FLAG_SAMPLED)) return; + + op_otel_span_start( + span.traceId, + span.spanId, + span.parentSpanId ?? "", + span.kind, + span.name, + span.startTime, + span.endTime, + ); + + if (span.status !== null && span.status.code !== 0) { + op_otel_span_continue(span.code, span.message ?? ""); + } + + const attributes = ObjectEntries(span.attributes); + let i = 0; + while (i < attributes.length) { + if (i + 2 < attributes.length) { + op_otel_span_attribute3( + attributes.length, + attributes[i][0], + attributes[i][1], + attributes[i + 1][0], + attributes[i + 1][1], + attributes[i + 2][0], + attributes[i + 2][1], + ); + i += 3; + } else if (i + 1 < attributes.length) { + op_otel_span_attribute2( + attributes.length, + attributes[i][0], + attributes[i][1], + attributes[i + 1][0], + attributes[i + 1][1], + ); + i += 2; + } else { + op_otel_span_attribute( + attributes.length, + attributes[i][0], + attributes[i][1], + ); + i += 1; + } + } + + op_otel_span_flush(); +} + +const now = () => (performance.timeOrigin + performance.now()) / 1000; + +const INVALID_SPAN_ID = "0000000000000000"; +const INVALID_TRACE_ID = "00000000000000000000000000000000"; +const NO_ASYNC_CONTEXT = {}; + +class Span { + traceId; + spanId; + parentSpanId; + kind; + name; + startTime; + endTime; + status = null; + attributes = { __proto__: null }; + traceFlags = TRACE_FLAG_SAMPLED; + + enabled = TRACING_ENABLED; + #asyncContext = NO_ASYNC_CONTEXT; + + constructor(name, kind = "internal") { + if (!this.enabled) { + this.traceId = INVALID_TRACE_ID; + this.spanId = INVALID_SPAN_ID; + this.parentSpanId = INVALID_SPAN_ID; + return; + } + + this.startTime = now(); + + this.spanId = generateId(SPAN_ID_BYTES); + + let traceId; + let parentSpanId; + const parent = Span.current(); + if (parent) { + if (parent.spanId !== undefined) { + parentSpanId = parent.spanId; + traceId = parent.traceId; + } else { + const context = parent.spanContext(); + parentSpanId = context.spanId; + traceId = context.traceId; + } + } + if ( + traceId && traceId !== INVALID_TRACE_ID && parentSpanId && + parentSpanId !== INVALID_SPAN_ID + ) { + this.traceId = traceId; + this.parentSpanId = parentSpanId; + } else { + this.traceId = generateId(TRACE_ID_BYTES); + this.parentSpanId = INVALID_SPAN_ID; + } + + this.name = name; + + switch (kind) { + case "internal": + this.kind = 0; + break; + case "server": + this.kind = 1; + break; + case "client": + this.kind = 2; + break; + case "producer": + this.kind = 3; + break; + case "consumer": + this.kind = 4; + break; + default: + throw new Error(`Invalid span kind: ${kind}`); + } + + this.enter(); + } + + // helper function to match otel js api + spanContext() { + return { + traceId: this.traceId, + spanId: this.spanId, + traceFlags: this.traceFlags, + }; + } + + setAttribute(name, value) { + if (!this.enabled) return; + this.attributes[name] = value; + } + + enter() { + if (!this.enabled) return; + const context = (CURRENT.get() || ROOT_CONTEXT).setValue(SPAN_KEY, this); + this.#asyncContext = CURRENT.enter(context); + } + + exit() { + if (!this.enabled || this.#asyncContext === NO_ASYNC_CONTEXT) return; + setAsyncContext(this.#asyncContext); + this.#asyncContext = NO_ASYNC_CONTEXT; + } + + end() { + if (!this.enabled || this.endTime !== undefined) return; + this.exit(); + this.endTime = now(); + submit(this); + } + + [SymbolDispose]() { + this.end(); + } + + static current() { + return CURRENT.get()?.getValue(SPAN_KEY); + } +} + +function hrToSecs(hr) { + return ((hr[0] * 1e3 + hr[1] / 1e6) / 1000); +} + +// Exporter compatible with opentelemetry js library +class SpanExporter { + export(spans, resultCallback) { + try { + for (let i = 0; i < spans.length; i += 1) { + const span = spans[i]; + const context = span.spanContext(); + submit({ + spanId: context.spanId, + traceId: context.traceId, + traceFlags: context.traceFlags, + name: span.name, + kind: span.kind, + parentSpanId: span.parentSpanId, + startTime: hrToSecs(span.startTime), + endTime: hrToSecs(span.endTime), + status: span.status, + attributes: span.attributes, + }); + } + resultCallback({ code: 0 }); + } catch (error) { + resultCallback({ code: 1, error }); + } + } + + async shutdown() {} + + async forceFlush() {} +} + +// SPAN_KEY matches symbol in otel-js library +const SPAN_KEY = SymbolFor("OpenTelemetry Context Key SPAN"); + +// Context tracker compatible with otel-js api +class Context { + #data = new SafeMap(); + + constructor(data) { + this.#data = data ? new SafeMap(data) : new SafeMap(); + } + + getValue(key) { + return this.#data.get(key); + } + + setValue(key, value) { + const c = new Context(this.#data); + c.#data.set(key, value); + return c; + } + + deleteValue(key) { + const c = new Context(this.#data); + c.#data.delete(key); + return c; + } +} + +const ROOT_CONTEXT = new Context(); + +// Context manager for opentelemetry js library +class ContextManager { + active() { + return CURRENT.get() ?? ROOT_CONTEXT; + } + + with(context, fn, thisArg, ...args) { + const ctx = CURRENT.enter(context); + try { + return ReflectApply(fn, thisArg, args); + } finally { + setAsyncContext(ctx); + } + } + + bind(context, f) { + return (...args) => { + const ctx = CURRENT.enter(context); + try { + return ReflectApply(f, thisArg, args); + } finally { + setAsyncContext(ctx); + } + }; + } + + enable() { + return this; + } + + disable() { + return this; + } +} + +function otelLog(message, level) { + let traceId = ""; + let spanId = ""; + let traceFlags = 0; + const span = Span.current(); + if (span) { + if (span.spanId !== undefined) { + spanId = span.spanId; + traceId = span.traceId; + traceFlags = span.traceFlags; + } else { + const context = span.spanContext(); + spanId = context.spanId; + traceId = context.traceId; + traceFlags = context.traceFlags; + } + } + return op_otel_log(message, level, traceId, spanId, traceFlags); +} + +const otelConsoleConfig = { + ignore: 0, + capture: 1, + replace: 2, +}; + +export function bootstrap(config) { + if (config.length === 0) return; + const { 0: consoleConfig } = config; + + TRACING_ENABLED = true; + + switch (consoleConfig) { + case otelConsoleConfig.capture: + core.wrapConsole(globalThis.console, new Console(otelLog)); + break; + case otelConsoleConfig.replace: + ObjectDefineProperty( + globalThis, + "console", + core.propNonEnumerable(new Console(otelLog)), + ); + break; + default: + break; + } +} + +export const tracing = { + get enabled() { + return TRACING_ENABLED; + }, + Span, + SpanExporter, + ContextManager, +}; + +// TODO(devsnek): implement metrics +export const metrics = {}; diff --git a/runtime/lib.rs b/runtime/lib.rs index f0b1129ce..21b61e1c0 100644 --- a/runtime/lib.rs +++ b/runtime/lib.rs @@ -99,18 +99,24 @@ pub static UNSTABLE_GRANULAR_FLAGS: &[UnstableGranularFlag] = &[ show_in_help: true, id: 7, }, + UnstableGranularFlag { + name: "otel", + help_text: "Enable unstable OpenTelemetry features", + show_in_help: false, + id: 8, + }, // TODO(bartlomieju): consider removing it UnstableGranularFlag { name: ops::process::UNSTABLE_FEATURE_NAME, help_text: "Enable unstable process APIs", show_in_help: false, - id: 8, + id: 9, }, UnstableGranularFlag { name: "temporal", help_text: "Enable unstable Temporal API", show_in_help: true, - id: 9, + id: 10, }, UnstableGranularFlag { name: "unsafe-proto", @@ -118,19 +124,19 @@ pub static UNSTABLE_GRANULAR_FLAGS: &[UnstableGranularFlag] = &[ show_in_help: true, // This number is used directly in the JS code. Search // for "unstableIds" to see where it's used. - id: 10, + id: 11, }, UnstableGranularFlag { name: deno_webgpu::UNSTABLE_FEATURE_NAME, help_text: "Enable unstable `WebGPU` APIs", show_in_help: true, - id: 11, + id: 12, }, UnstableGranularFlag { name: ops::worker_host::UNSTABLE_FEATURE_NAME, help_text: "Enable unstable Web Worker APIs", show_in_help: true, - id: 12, + id: 13, }, ]; diff --git a/runtime/ops/mod.rs b/runtime/ops/mod.rs index 67065b901..c2e402f33 100644 --- a/runtime/ops/mod.rs +++ b/runtime/ops/mod.rs @@ -4,6 +4,7 @@ pub mod bootstrap; pub mod fs_events; pub mod http; pub mod os; +pub mod otel; pub mod permissions; pub mod process; pub mod runtime; diff --git a/runtime/ops/os/mod.rs b/runtime/ops/os/mod.rs index 9bee9d823..790962f38 100644 --- a/runtime/ops/os/mod.rs +++ b/runtime/ops/os/mod.rs @@ -186,6 +186,8 @@ fn op_get_exit_code(state: &mut OpState) -> i32 { #[op2(fast)] fn op_exit(state: &mut OpState) { + crate::ops::otel::otel_drop_state(state); + let code = state.borrow::<ExitCode>().get(); std::process::exit(code) } diff --git a/runtime/ops/otel.rs b/runtime/ops/otel.rs new file mode 100644 index 000000000..6a4750acc --- /dev/null +++ b/runtime/ops/otel.rs @@ -0,0 +1,686 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +use crate::tokio_util::create_basic_runtime; +use deno_core::anyhow::anyhow; +use deno_core::anyhow::{self}; +use deno_core::futures::channel::mpsc; +use deno_core::futures::channel::mpsc::UnboundedSender; +use deno_core::futures::future::BoxFuture; +use deno_core::futures::stream; +use deno_core::futures::Stream; +use deno_core::futures::StreamExt; +use deno_core::op2; +use deno_core::v8; +use deno_core::OpState; +use once_cell::sync::Lazy; +use opentelemetry::logs::Severity; +use opentelemetry::trace::SpanContext; +use opentelemetry::trace::SpanId; +use opentelemetry::trace::SpanKind; +use opentelemetry::trace::Status as SpanStatus; +use opentelemetry::trace::TraceFlags; +use opentelemetry::trace::TraceId; +use opentelemetry::InstrumentationScope; +use opentelemetry::Key; +use opentelemetry::KeyValue; +use opentelemetry::StringValue; +use opentelemetry::Value; +use opentelemetry_otlp::HttpExporterBuilder; +use opentelemetry_otlp::Protocol; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_otlp::WithHttpConfig; +use opentelemetry_sdk::export::trace::SpanData; +use opentelemetry_sdk::logs::BatchLogProcessor; +use opentelemetry_sdk::logs::LogProcessor as LogProcessorTrait; +use opentelemetry_sdk::logs::LogRecord; +use opentelemetry_sdk::trace::BatchSpanProcessor; +use opentelemetry_sdk::trace::SpanProcessor as SpanProcessorTrait; +use opentelemetry_sdk::Resource; +use opentelemetry_semantic_conventions::resource::PROCESS_RUNTIME_NAME; +use opentelemetry_semantic_conventions::resource::PROCESS_RUNTIME_VERSION; +use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_LANGUAGE; +use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_NAME; +use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_VERSION; +use serde::Deserialize; +use serde::Serialize; +use std::borrow::Cow; +use std::env; +use std::fmt::Debug; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use std::thread; +use std::time::Duration; +use std::time::SystemTime; + +type SpanProcessor = BatchSpanProcessor<OtelSharedRuntime>; +type LogProcessor = BatchLogProcessor<OtelSharedRuntime>; + +deno_core::extension!( + deno_otel, + ops = [op_otel_log, op_otel_span_start, op_otel_span_continue, op_otel_span_attribute, op_otel_span_attribute2, op_otel_span_attribute3, op_otel_span_flush], + options = { + otel_config: Option<OtelConfig>, // `None` means OpenTelemetry is disabled. + }, + state = |state, options| { + if let Some(otel_config) = options.otel_config { + otel_create_globals(otel_config, state).unwrap(); + } + } +); + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OtelConfig { + pub runtime_name: Cow<'static, str>, + pub runtime_version: Cow<'static, str>, + pub console: OtelConsoleConfig, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[repr(u8)] +pub enum OtelConsoleConfig { + Ignore = 0, + Capture = 1, + Replace = 2, +} + +impl Default for OtelConfig { + fn default() -> Self { + Self { + runtime_name: Cow::Borrowed(env!("CARGO_PKG_NAME")), + runtime_version: Cow::Borrowed(env!("CARGO_PKG_VERSION")), + console: OtelConsoleConfig::Capture, + } + } +} + +static OTEL_SHARED_RUNTIME_SPAWN_TASK_TX: Lazy< + UnboundedSender<BoxFuture<'static, ()>>, +> = Lazy::new(otel_create_shared_runtime); + +fn otel_create_shared_runtime() -> UnboundedSender<BoxFuture<'static, ()>> { + let (spawn_task_tx, mut spawn_task_rx) = + mpsc::unbounded::<BoxFuture<'static, ()>>(); + + thread::spawn(move || { + let rt = create_basic_runtime(); + rt.block_on(async move { + while let Some(task) = spawn_task_rx.next().await { + tokio::spawn(task); + } + }); + }); + + spawn_task_tx +} + +#[derive(Clone, Copy)] +struct OtelSharedRuntime; + +impl hyper::rt::Executor<BoxFuture<'static, ()>> for OtelSharedRuntime { + fn execute(&self, fut: BoxFuture<'static, ()>) { + (*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX) + .unbounded_send(fut) + .expect("failed to send task to shared OpenTelemetry runtime"); + } +} + +impl opentelemetry_sdk::runtime::Runtime for OtelSharedRuntime { + type Interval = Pin<Box<dyn Stream<Item = ()> + Send + 'static>>; + type Delay = Pin<Box<tokio::time::Sleep>>; + + fn interval(&self, period: Duration) -> Self::Interval { + stream::repeat(()) + .then(move |_| tokio::time::sleep(period)) + .boxed() + } + + fn spawn(&self, future: BoxFuture<'static, ()>) { + (*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX) + .unbounded_send(future) + .expect("failed to send task to shared OpenTelemetry runtime"); + } + + fn delay(&self, duration: Duration) -> Self::Delay { + Box::pin(tokio::time::sleep(duration)) + } +} + +impl opentelemetry_sdk::runtime::RuntimeChannel for OtelSharedRuntime { + type Receiver<T: Debug + Send> = BatchMessageChannelReceiver<T>; + type Sender<T: Debug + Send> = BatchMessageChannelSender<T>; + + fn batch_message_channel<T: Debug + Send>( + &self, + capacity: usize, + ) -> (Self::Sender<T>, Self::Receiver<T>) { + let (batch_tx, batch_rx) = tokio::sync::mpsc::channel::<T>(capacity); + (batch_tx.into(), batch_rx.into()) + } +} + +#[derive(Debug)] +pub struct BatchMessageChannelSender<T: Send> { + sender: tokio::sync::mpsc::Sender<T>, +} + +impl<T: Send> From<tokio::sync::mpsc::Sender<T>> + for BatchMessageChannelSender<T> +{ + fn from(sender: tokio::sync::mpsc::Sender<T>) -> Self { + Self { sender } + } +} + +impl<T: Send> opentelemetry_sdk::runtime::TrySend + for BatchMessageChannelSender<T> +{ + type Message = T; + + fn try_send( + &self, + item: Self::Message, + ) -> Result<(), opentelemetry_sdk::runtime::TrySendError> { + self.sender.try_send(item).map_err(|err| match err { + tokio::sync::mpsc::error::TrySendError::Full(_) => { + opentelemetry_sdk::runtime::TrySendError::ChannelFull + } + tokio::sync::mpsc::error::TrySendError::Closed(_) => { + opentelemetry_sdk::runtime::TrySendError::ChannelClosed + } + }) + } +} + +pub struct BatchMessageChannelReceiver<T> { + receiver: tokio::sync::mpsc::Receiver<T>, +} + +impl<T> From<tokio::sync::mpsc::Receiver<T>> + for BatchMessageChannelReceiver<T> +{ + fn from(receiver: tokio::sync::mpsc::Receiver<T>) -> Self { + Self { receiver } + } +} + +impl<T> Stream for BatchMessageChannelReceiver<T> { + type Item = T; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + self.receiver.poll_recv(cx) + } +} + +mod hyper_client { + use http_body_util::BodyExt; + use http_body_util::Full; + use hyper::body::Body as HttpBody; + use hyper::body::Frame; + use hyper_util::client::legacy::connect::HttpConnector; + use hyper_util::client::legacy::Client; + use opentelemetry_http::Bytes; + use opentelemetry_http::HttpError; + use opentelemetry_http::Request; + use opentelemetry_http::Response; + use opentelemetry_http::ResponseExt; + use std::fmt::Debug; + use std::pin::Pin; + use std::task::Poll; + use std::task::{self}; + + use super::OtelSharedRuntime; + + // same as opentelemetry_http::HyperClient except it uses OtelSharedRuntime + #[derive(Debug, Clone)] + pub struct HyperClient { + inner: Client<HttpConnector, Body>, + } + + impl HyperClient { + pub fn new() -> Self { + Self { + inner: Client::builder(OtelSharedRuntime).build(HttpConnector::new()), + } + } + } + + #[async_trait::async_trait] + impl opentelemetry_http::HttpClient for HyperClient { + async fn send( + &self, + request: Request<Vec<u8>>, + ) -> Result<Response<Bytes>, HttpError> { + let (parts, body) = request.into_parts(); + let request = Request::from_parts(parts, Body(Full::from(body))); + let mut response = self.inner.request(request).await?; + let headers = std::mem::take(response.headers_mut()); + + let mut http_response = Response::builder() + .status(response.status()) + .body(response.into_body().collect().await?.to_bytes())?; + *http_response.headers_mut() = headers; + + Ok(http_response.error_for_status()?) + } + } + + #[pin_project::pin_project] + pub struct Body(#[pin] Full<Bytes>); + + impl HttpBody for Body { + type Data = Bytes; + type Error = Box<dyn std::error::Error + Send + Sync + 'static>; + + #[inline] + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { + self.project().0.poll_frame(cx).map_err(Into::into) + } + + #[inline] + fn is_end_stream(&self) -> bool { + self.0.is_end_stream() + } + + #[inline] + fn size_hint(&self) -> hyper::body::SizeHint { + self.0.size_hint() + } + } +} + +fn otel_create_globals( + config: OtelConfig, + op_state: &mut OpState, +) -> anyhow::Result<()> { + // Parse the `OTEL_EXPORTER_OTLP_PROTOCOL` variable. The opentelemetry_* + // crates don't do this automatically. + // TODO(piscisaureus): enable GRPC support. + let protocol = match env::var("OTEL_EXPORTER_OTLP_PROTOCOL").as_deref() { + Ok("http/protobuf") => Protocol::HttpBinary, + Ok("http/json") => Protocol::HttpJson, + Ok("") | Err(env::VarError::NotPresent) => { + return Ok(()); + } + Ok(protocol) => { + return Err(anyhow!( + "Env var OTEL_EXPORTER_OTLP_PROTOCOL specifies an unsupported protocol: {}", + protocol + )); + } + Err(err) => { + return Err(anyhow!( + "Failed to read env var OTEL_EXPORTER_OTLP_PROTOCOL: {}", + err + )) + } + }; + + // Define the resource attributes that will be attached to all log records. + // These attributes are sourced as follows (in order of precedence): + // * The `service.name` attribute from the `OTEL_SERVICE_NAME` env var. + // * Additional attributes from the `OTEL_RESOURCE_ATTRIBUTES` env var. + // * Default attribute values defined here. + // TODO(piscisaureus): add more default attributes (e.g. script path). + let mut resource = Resource::default(); + + // Add the runtime name and version to the resource attributes. Also override + // the `telemetry.sdk` attributes to include the Deno runtime. + resource = resource.merge(&Resource::new(vec![ + KeyValue::new(PROCESS_RUNTIME_NAME, config.runtime_name), + KeyValue::new(PROCESS_RUNTIME_VERSION, config.runtime_version.clone()), + KeyValue::new( + TELEMETRY_SDK_LANGUAGE, + format!( + "deno-{}", + resource.get(Key::new(TELEMETRY_SDK_LANGUAGE)).unwrap() + ), + ), + KeyValue::new( + TELEMETRY_SDK_NAME, + format!( + "deno-{}", + resource.get(Key::new(TELEMETRY_SDK_NAME)).unwrap() + ), + ), + KeyValue::new( + TELEMETRY_SDK_VERSION, + format!( + "{}-{}", + config.runtime_version, + resource.get(Key::new(TELEMETRY_SDK_VERSION)).unwrap() + ), + ), + ])); + + // The OTLP endpoint is automatically picked up from the + // `OTEL_EXPORTER_OTLP_ENDPOINT` environment variable. Additional headers can + // be specified using `OTEL_EXPORTER_OTLP_HEADERS`. + + let client = hyper_client::HyperClient::new(); + + let span_exporter = HttpExporterBuilder::default() + .with_http_client(client.clone()) + .with_protocol(protocol) + .build_span_exporter()?; + let mut span_processor = + BatchSpanProcessor::builder(span_exporter, OtelSharedRuntime).build(); + span_processor.set_resource(&resource); + op_state.put::<SpanProcessor>(span_processor); + + let log_exporter = HttpExporterBuilder::default() + .with_http_client(client) + .with_protocol(protocol) + .build_log_exporter()?; + let log_processor = + BatchLogProcessor::builder(log_exporter, OtelSharedRuntime).build(); + log_processor.set_resource(&resource); + op_state.put::<LogProcessor>(log_processor); + + Ok(()) +} + +/// This function is called by the runtime whenever it is about to call +/// `os::process::exit()`, to ensure that all OpenTelemetry logs are properly +/// flushed before the process terminates. +pub fn otel_drop_state(state: &mut OpState) { + if let Some(processor) = state.try_take::<SpanProcessor>() { + let _ = processor.force_flush(); + drop(processor); + } + if let Some(processor) = state.try_take::<LogProcessor>() { + let _ = processor.force_flush(); + drop(processor); + } +} + +#[op2(fast)] +fn op_otel_log( + state: &mut OpState, + #[string] message: String, + #[smi] level: i32, + #[string] trace_id: &str, + #[string] span_id: &str, + #[smi] trace_flags: u8, +) { + let Some(logger) = state.try_borrow::<LogProcessor>() else { + log::error!("op_otel_log: OpenTelemetry Logger not available"); + return; + }; + + // Convert the integer log level that ext/console uses to the corresponding + // OpenTelemetry log severity. + let severity = match level { + ..=0 => Severity::Debug, + 1 => Severity::Info, + 2 => Severity::Warn, + 3.. => Severity::Error, + }; + + let mut log_record = LogRecord::default(); + + log_record.observed_timestamp = Some(SystemTime::now()); + log_record.body = Some(message.into()); + log_record.severity_number = Some(severity); + log_record.severity_text = Some(severity.name()); + if let (Ok(trace_id), Ok(span_id)) = + (TraceId::from_hex(trace_id), SpanId::from_hex(span_id)) + { + let span_context = SpanContext::new( + trace_id, + span_id, + TraceFlags::new(trace_flags), + false, + Default::default(), + ); + log_record.trace_context = Some((&span_context).into()); + } + logger.emit( + &mut log_record, + &InstrumentationScope::builder("deno").build(), + ); +} + +struct TemporarySpan(SpanData); + +#[allow(clippy::too_many_arguments)] +#[op2(fast)] +fn op_otel_span_start<'s>( + scope: &mut v8::HandleScope<'s>, + state: &mut OpState, + trace_id: v8::Local<'s, v8::Value>, + span_id: v8::Local<'s, v8::Value>, + parent_span_id: v8::Local<'s, v8::Value>, + #[smi] span_kind: u8, + name: v8::Local<'s, v8::Value>, + start_time: f64, + end_time: f64, +) -> Result<(), anyhow::Error> { + if let Some(temporary_span) = state.try_take::<TemporarySpan>() { + let Some(span_processor) = state.try_borrow::<SpanProcessor>() else { + return Ok(()); + }; + span_processor.on_end(temporary_span.0); + }; + + let trace_id = { + let x = v8::ValueView::new(scope, trace_id.try_cast()?); + match x.data() { + v8::ValueViewData::OneByte(bytes) => { + TraceId::from_hex(&String::from_utf8_lossy(bytes))? + } + _ => return Err(anyhow!("invalid trace_id")), + } + }; + + let span_id = { + let x = v8::ValueView::new(scope, span_id.try_cast()?); + match x.data() { + v8::ValueViewData::OneByte(bytes) => { + SpanId::from_hex(&String::from_utf8_lossy(bytes))? + } + _ => return Err(anyhow!("invalid span_id")), + } + }; + + let parent_span_id = { + let x = v8::ValueView::new(scope, parent_span_id.try_cast()?); + match x.data() { + v8::ValueViewData::OneByte(bytes) => { + let s = String::from_utf8_lossy(bytes); + if s.is_empty() { + SpanId::INVALID + } else { + SpanId::from_hex(&s)? + } + } + _ => return Err(anyhow!("invalid parent_span_id")), + } + }; + + let name = { + let x = v8::ValueView::new(scope, name.try_cast()?); + match x.data() { + v8::ValueViewData::OneByte(bytes) => { + String::from_utf8_lossy(bytes).into_owned() + } + v8::ValueViewData::TwoByte(bytes) => String::from_utf16_lossy(bytes), + } + }; + + let temporary_span = TemporarySpan(SpanData { + span_context: SpanContext::new( + trace_id, + span_id, + TraceFlags::SAMPLED, + false, + Default::default(), + ), + parent_span_id, + span_kind: match span_kind { + 0 => SpanKind::Internal, + 1 => SpanKind::Server, + 2 => SpanKind::Client, + 3 => SpanKind::Producer, + 4 => SpanKind::Consumer, + _ => return Err(anyhow!("invalid span kind")), + }, + name: Cow::Owned(name), + start_time: SystemTime::UNIX_EPOCH + .checked_add(std::time::Duration::from_secs_f64(start_time)) + .ok_or_else(|| anyhow!("invalid start time"))?, + end_time: SystemTime::UNIX_EPOCH + .checked_add(std::time::Duration::from_secs_f64(end_time)) + .ok_or_else(|| anyhow!("invalid start time"))?, + attributes: Vec::new(), + dropped_attributes_count: 0, + events: Default::default(), + links: Default::default(), + status: SpanStatus::Unset, + instrumentation_scope: InstrumentationScope::builder("deno").build(), + }); + state.put(temporary_span); + + Ok(()) +} + +#[op2(fast)] +fn op_otel_span_continue( + state: &mut OpState, + #[smi] status: u8, + #[string] error_description: Cow<'_, str>, +) { + if let Some(temporary_span) = state.try_borrow_mut::<TemporarySpan>() { + temporary_span.0.status = match status { + 0 => SpanStatus::Unset, + 1 => SpanStatus::Ok, + 2 => SpanStatus::Error { + description: Cow::Owned(error_description.into_owned()), + }, + _ => return, + }; + } +} + +macro_rules! attr { + ($scope:ident, $temporary_span:ident, $name:ident, $value:ident) => { + let name = if let Ok(name) = $name.try_cast() { + let view = v8::ValueView::new($scope, name); + match view.data() { + v8::ValueViewData::OneByte(bytes) => { + Some(String::from_utf8_lossy(bytes).into_owned()) + } + v8::ValueViewData::TwoByte(bytes) => { + Some(String::from_utf16_lossy(bytes)) + } + } + } else { + None + }; + let value = if let Ok(string) = $value.try_cast::<v8::String>() { + Some(Value::String(StringValue::from({ + let x = v8::ValueView::new($scope, string); + match x.data() { + v8::ValueViewData::OneByte(bytes) => { + String::from_utf8_lossy(bytes).into_owned() + } + v8::ValueViewData::TwoByte(bytes) => String::from_utf16_lossy(bytes), + } + }))) + } else if let Ok(number) = $value.try_cast::<v8::Number>() { + Some(Value::F64(number.value())) + } else if let Ok(boolean) = $value.try_cast::<v8::Boolean>() { + Some(Value::Bool(boolean.is_true())) + } else if let Ok(bigint) = $value.try_cast::<v8::BigInt>() { + let (i64_value, _lossless) = bigint.i64_value(); + Some(Value::I64(i64_value)) + } else { + None + }; + if let (Some(name), Some(value)) = (name, value) { + $temporary_span + .0 + .attributes + .push(KeyValue::new(name, value)); + } else { + $temporary_span.0.dropped_attributes_count += 1; + } + }; +} + +#[op2(fast)] +fn op_otel_span_attribute<'s>( + scope: &mut v8::HandleScope<'s>, + state: &mut OpState, + #[smi] capacity: u32, + key: v8::Local<'s, v8::Value>, + value: v8::Local<'s, v8::Value>, +) { + if let Some(temporary_span) = state.try_borrow_mut::<TemporarySpan>() { + temporary_span.0.attributes.reserve_exact( + (capacity as usize) - temporary_span.0.attributes.capacity(), + ); + attr!(scope, temporary_span, key, value); + } +} + +#[op2(fast)] +fn op_otel_span_attribute2<'s>( + scope: &mut v8::HandleScope<'s>, + state: &mut OpState, + #[smi] capacity: u32, + key1: v8::Local<'s, v8::Value>, + value1: v8::Local<'s, v8::Value>, + key2: v8::Local<'s, v8::Value>, + value2: v8::Local<'s, v8::Value>, +) { + if let Some(temporary_span) = state.try_borrow_mut::<TemporarySpan>() { + temporary_span.0.attributes.reserve_exact( + (capacity as usize) - temporary_span.0.attributes.capacity(), + ); + attr!(scope, temporary_span, key1, value1); + attr!(scope, temporary_span, key2, value2); + } +} + +#[allow(clippy::too_many_arguments)] +#[op2(fast)] +fn op_otel_span_attribute3<'s>( + scope: &mut v8::HandleScope<'s>, + state: &mut OpState, + #[smi] capacity: u32, + key1: v8::Local<'s, v8::Value>, + value1: v8::Local<'s, v8::Value>, + key2: v8::Local<'s, v8::Value>, + value2: v8::Local<'s, v8::Value>, + key3: v8::Local<'s, v8::Value>, + value3: v8::Local<'s, v8::Value>, +) { + if let Some(temporary_span) = state.try_borrow_mut::<TemporarySpan>() { + temporary_span.0.attributes.reserve_exact( + (capacity as usize) - temporary_span.0.attributes.capacity(), + ); + attr!(scope, temporary_span, key1, value1); + attr!(scope, temporary_span, key2, value2); + attr!(scope, temporary_span, key3, value3); + } +} + +#[op2(fast)] +fn op_otel_span_flush(state: &mut OpState) { + let Some(temporary_span) = state.try_take::<TemporarySpan>() else { + return; + }; + + let Some(span_processor) = state.try_borrow::<SpanProcessor>() else { + return; + }; + + span_processor.on_end(temporary_span.0); +} diff --git a/runtime/shared.rs b/runtime/shared.rs index f7d76f67a..c05f352f1 100644 --- a/runtime/shared.rs +++ b/runtime/shared.rs @@ -47,6 +47,7 @@ extension!(runtime, "40_signals.js", "40_tty.js", "41_prompt.js", + "telemetry.js", "90_deno_ns.js", "98_global_scope_shared.js", "98_global_scope_window.js", diff --git a/runtime/snapshot.rs b/runtime/snapshot.rs index 251ee5f41..bb9bf9166 100644 --- a/runtime/snapshot.rs +++ b/runtime/snapshot.rs @@ -312,6 +312,7 @@ pub fn create_runtime_snapshot( ), ops::fs_events::deno_fs_events::init_ops(), ops::os::deno_os::init_ops(Default::default()), + ops::otel::deno_otel::init_ops(None), ops::permissions::deno_permissions::init_ops(), ops::process::deno_process::init_ops(None), ops::signal::deno_signal::init_ops(), diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 61e5c7702..d81c82c50 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -505,6 +505,9 @@ impl WebWorker { ), ops::fs_events::deno_fs_events::init_ops_and_esm(), ops::os::deno_os_worker::init_ops_and_esm(), + ops::otel::deno_otel::init_ops_and_esm( + options.bootstrap.otel_config.clone(), + ), ops::permissions::deno_permissions::init_ops_and_esm(), ops::process::deno_process::init_ops_and_esm( services.npm_process_state_provider, diff --git a/runtime/worker.rs b/runtime/worker.rs index 88a61fa93..82df755fa 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -422,6 +422,9 @@ impl MainWorker { ), ops::fs_events::deno_fs_events::init_ops_and_esm(), ops::os::deno_os::init_ops_and_esm(exit_code.clone()), + ops::otel::deno_otel::init_ops_and_esm( + options.bootstrap.otel_config.clone(), + ), ops::permissions::deno_permissions::init_ops_and_esm(), ops::process::deno_process::init_ops_and_esm( services.npm_process_state_provider, diff --git a/runtime/worker_bootstrap.rs b/runtime/worker_bootstrap.rs index 3f3c25c5e..dc989a1c0 100644 --- a/runtime/worker_bootstrap.rs +++ b/runtime/worker_bootstrap.rs @@ -1,5 +1,6 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. +use crate::ops::otel::OtelConfig; use deno_core::v8; use deno_core::ModuleSpecifier; use serde::Serialize; @@ -118,6 +119,8 @@ pub struct BootstrapOptions { // Used by `deno serve` pub serve_port: Option<u16>, pub serve_host: Option<String>, + // OpenTelemetry output options. If `None`, OpenTelemetry is disabled. + pub otel_config: Option<OtelConfig>, } impl Default for BootstrapOptions { @@ -152,6 +155,7 @@ impl Default for BootstrapOptions { mode: WorkerExecutionMode::None, serve_port: Default::default(), serve_host: Default::default(), + otel_config: None, } } } @@ -193,6 +197,8 @@ struct BootstrapV8<'a>( Option<bool>, // serve worker count Option<usize>, + // OTEL config + Box<[u8]>, ); impl BootstrapOptions { @@ -219,6 +225,11 @@ impl BootstrapOptions { self.serve_host.as_deref(), serve_is_main, serve_worker_count, + if let Some(otel_config) = self.otel_config.as_ref() { + Box::new([otel_config.console as u8]) + } else { + Box::new([]) + }, ); bootstrap.serialize(ser).unwrap() |