diff options
Diffstat (limited to 'runtime/ops')
-rw-r--r-- | runtime/ops/os/mod.rs | 4 | ||||
-rw-r--r-- | runtime/ops/otel.rs | 136 |
2 files changed, 98 insertions, 42 deletions
diff --git a/runtime/ops/os/mod.rs b/runtime/ops/os/mod.rs index 790962f38..b10a2939e 100644 --- a/runtime/ops/os/mod.rs +++ b/runtime/ops/os/mod.rs @@ -186,10 +186,8 @@ fn op_get_exit_code(state: &mut OpState) -> i32 { #[op2(fast)] fn op_exit(state: &mut OpState) { - crate::ops::otel::otel_drop_state(state); - let code = state.borrow::<ExitCode>().get(); - std::process::exit(code) + crate::exit(code) } #[op2] diff --git a/runtime/ops/otel.rs b/runtime/ops/otel.rs index 6a4750acc..b32764d7f 100644 --- a/runtime/ops/otel.rs +++ b/runtime/ops/otel.rs @@ -13,6 +13,9 @@ use deno_core::op2; use deno_core::v8; use deno_core::OpState; use once_cell::sync::Lazy; +use once_cell::sync::OnceCell; +use opentelemetry::logs::AnyValue; +use opentelemetry::logs::LogRecord as LogRecordTrait; use opentelemetry::logs::Severity; use opentelemetry::trace::SpanContext; use opentelemetry::trace::SpanId; @@ -58,15 +61,15 @@ type LogProcessor = BatchLogProcessor<OtelSharedRuntime>; deno_core::extension!( deno_otel, - ops = [op_otel_log, op_otel_span_start, op_otel_span_continue, op_otel_span_attribute, op_otel_span_attribute2, op_otel_span_attribute3, op_otel_span_flush], - options = { - otel_config: Option<OtelConfig>, // `None` means OpenTelemetry is disabled. - }, - state = |state, options| { - if let Some(otel_config) = options.otel_config { - otel_create_globals(otel_config, state).unwrap(); - } - } + ops = [ + op_otel_log, + op_otel_span_start, + op_otel_span_continue, + op_otel_span_attribute, + op_otel_span_attribute2, + op_otel_span_attribute3, + op_otel_span_flush, + ], ); #[derive(Debug, Clone, Serialize, Deserialize)] @@ -74,6 +77,7 @@ pub struct OtelConfig { pub runtime_name: Cow<'static, str>, pub runtime_version: Cow<'static, str>, pub console: OtelConsoleConfig, + pub deterministic: bool, } #[derive(Debug, Clone, Copy, Serialize, Deserialize)] @@ -90,6 +94,7 @@ impl Default for OtelConfig { runtime_name: Cow::Borrowed(env!("CARGO_PKG_NAME")), runtime_version: Cow::Borrowed(env!("CARGO_PKG_VERSION")), console: OtelConsoleConfig::Capture, + deterministic: false, } } } @@ -295,10 +300,10 @@ mod hyper_client { } } -fn otel_create_globals( - config: OtelConfig, - op_state: &mut OpState, -) -> anyhow::Result<()> { +static OTEL_PROCESSORS: OnceCell<(SpanProcessor, LogProcessor)> = + OnceCell::new(); + +pub fn init(config: OtelConfig) -> anyhow::Result<()> { // Parse the `OTEL_EXPORTER_OTLP_PROTOCOL` variable. The opentelemetry_* // crates don't do this automatically. // TODO(piscisaureus): enable GRPC support. @@ -318,7 +323,7 @@ fn otel_create_globals( return Err(anyhow!( "Failed to read env var OTEL_EXPORTER_OTLP_PROTOCOL: {}", err - )) + )); } }; @@ -372,7 +377,6 @@ fn otel_create_globals( let mut span_processor = BatchSpanProcessor::builder(span_exporter, OtelSharedRuntime).build(); span_processor.set_resource(&resource); - op_state.put::<SpanProcessor>(span_processor); let log_exporter = HttpExporterBuilder::default() .with_http_client(client) @@ -381,36 +385,92 @@ fn otel_create_globals( let log_processor = BatchLogProcessor::builder(log_exporter, OtelSharedRuntime).build(); log_processor.set_resource(&resource); - op_state.put::<LogProcessor>(log_processor); + + OTEL_PROCESSORS + .set((span_processor, log_processor)) + .map_err(|_| anyhow!("failed to init otel"))?; Ok(()) } /// This function is called by the runtime whenever it is about to call -/// `os::process::exit()`, to ensure that all OpenTelemetry logs are properly +/// `process::exit()`, to ensure that all OpenTelemetry logs are properly /// flushed before the process terminates. -pub fn otel_drop_state(state: &mut OpState) { - if let Some(processor) = state.try_take::<SpanProcessor>() { - let _ = processor.force_flush(); - drop(processor); +pub fn flush() { + if let Some((span_processor, log_processor)) = OTEL_PROCESSORS.get() { + let _ = span_processor.force_flush(); + let _ = log_processor.force_flush(); } - if let Some(processor) = state.try_take::<LogProcessor>() { - let _ = processor.force_flush(); - drop(processor); +} + +pub fn handle_log(record: &log::Record) { + use log::Level; + + let Some((_, log_processor)) = OTEL_PROCESSORS.get() else { + return; + }; + + let mut log_record = LogRecord::default(); + + log_record.set_observed_timestamp(SystemTime::now()); + log_record.set_severity_number(match record.level() { + Level::Error => Severity::Error, + Level::Warn => Severity::Warn, + Level::Info => Severity::Info, + Level::Debug => Severity::Debug, + Level::Trace => Severity::Trace, + }); + log_record.set_severity_text(record.level().as_str()); + log_record.set_body(record.args().to_string().into()); + log_record.set_target(record.metadata().target().to_string()); + + struct Visitor<'s>(&'s mut LogRecord); + + impl<'s, 'kvs> log::kv::VisitSource<'kvs> for Visitor<'s> { + fn visit_pair( + &mut self, + key: log::kv::Key<'kvs>, + value: log::kv::Value<'kvs>, + ) -> Result<(), log::kv::Error> { + #[allow(clippy::manual_map)] + let value = if let Some(v) = value.to_bool() { + Some(AnyValue::Boolean(v)) + } else if let Some(v) = value.to_borrowed_str() { + Some(AnyValue::String(v.to_owned().into())) + } else if let Some(v) = value.to_f64() { + Some(AnyValue::Double(v)) + } else if let Some(v) = value.to_i64() { + Some(AnyValue::Int(v)) + } else { + None + }; + + if let Some(value) = value { + let key = Key::from(key.as_str().to_owned()); + self.0.add_attribute(key, value); + } + + Ok(()) + } } + + let _ = record.key_values().visit(&mut Visitor(&mut log_record)); + + log_processor.emit( + &mut log_record, + &InstrumentationScope::builder("deno").build(), + ); } #[op2(fast)] fn op_otel_log( - state: &mut OpState, #[string] message: String, #[smi] level: i32, #[string] trace_id: &str, #[string] span_id: &str, #[smi] trace_flags: u8, ) { - let Some(logger) = state.try_borrow::<LogProcessor>() else { - log::error!("op_otel_log: OpenTelemetry Logger not available"); + let Some((_, log_processor)) = OTEL_PROCESSORS.get() else { return; }; @@ -425,23 +485,21 @@ fn op_otel_log( let mut log_record = LogRecord::default(); - log_record.observed_timestamp = Some(SystemTime::now()); - log_record.body = Some(message.into()); - log_record.severity_number = Some(severity); - log_record.severity_text = Some(severity.name()); + log_record.set_observed_timestamp(SystemTime::now()); + log_record.set_body(message.into()); + log_record.set_severity_number(severity); + log_record.set_severity_text(severity.name()); if let (Ok(trace_id), Ok(span_id)) = (TraceId::from_hex(trace_id), SpanId::from_hex(span_id)) { - let span_context = SpanContext::new( + log_record.set_trace_context( trace_id, span_id, - TraceFlags::new(trace_flags), - false, - Default::default(), + Some(TraceFlags::new(trace_flags)), ); - log_record.trace_context = Some((&span_context).into()); } - logger.emit( + + log_processor.emit( &mut log_record, &InstrumentationScope::builder("deno").build(), ); @@ -463,7 +521,7 @@ fn op_otel_span_start<'s>( end_time: f64, ) -> Result<(), anyhow::Error> { if let Some(temporary_span) = state.try_take::<TemporarySpan>() { - let Some(span_processor) = state.try_borrow::<SpanProcessor>() else { + let Some((span_processor, _)) = OTEL_PROCESSORS.get() else { return Ok(()); }; span_processor.on_end(temporary_span.0); @@ -678,7 +736,7 @@ fn op_otel_span_flush(state: &mut OpState) { return; }; - let Some(span_processor) = state.try_borrow::<SpanProcessor>() else { + let Some((span_processor, _)) = OTEL_PROCESSORS.get() else { return; }; |