summaryrefslogtreecommitdiff
path: root/runtime/ops/otel.rs
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/ops/otel.rs')
-rw-r--r--runtime/ops/otel.rs136
1 files changed, 97 insertions, 39 deletions
diff --git a/runtime/ops/otel.rs b/runtime/ops/otel.rs
index 6a4750acc..b32764d7f 100644
--- a/runtime/ops/otel.rs
+++ b/runtime/ops/otel.rs
@@ -13,6 +13,9 @@ use deno_core::op2;
use deno_core::v8;
use deno_core::OpState;
use once_cell::sync::Lazy;
+use once_cell::sync::OnceCell;
+use opentelemetry::logs::AnyValue;
+use opentelemetry::logs::LogRecord as LogRecordTrait;
use opentelemetry::logs::Severity;
use opentelemetry::trace::SpanContext;
use opentelemetry::trace::SpanId;
@@ -58,15 +61,15 @@ type LogProcessor = BatchLogProcessor<OtelSharedRuntime>;
deno_core::extension!(
deno_otel,
- ops = [op_otel_log, op_otel_span_start, op_otel_span_continue, op_otel_span_attribute, op_otel_span_attribute2, op_otel_span_attribute3, op_otel_span_flush],
- options = {
- otel_config: Option<OtelConfig>, // `None` means OpenTelemetry is disabled.
- },
- state = |state, options| {
- if let Some(otel_config) = options.otel_config {
- otel_create_globals(otel_config, state).unwrap();
- }
- }
+ ops = [
+ op_otel_log,
+ op_otel_span_start,
+ op_otel_span_continue,
+ op_otel_span_attribute,
+ op_otel_span_attribute2,
+ op_otel_span_attribute3,
+ op_otel_span_flush,
+ ],
);
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -74,6 +77,7 @@ pub struct OtelConfig {
pub runtime_name: Cow<'static, str>,
pub runtime_version: Cow<'static, str>,
pub console: OtelConsoleConfig,
+ pub deterministic: bool,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
@@ -90,6 +94,7 @@ impl Default for OtelConfig {
runtime_name: Cow::Borrowed(env!("CARGO_PKG_NAME")),
runtime_version: Cow::Borrowed(env!("CARGO_PKG_VERSION")),
console: OtelConsoleConfig::Capture,
+ deterministic: false,
}
}
}
@@ -295,10 +300,10 @@ mod hyper_client {
}
}
-fn otel_create_globals(
- config: OtelConfig,
- op_state: &mut OpState,
-) -> anyhow::Result<()> {
+static OTEL_PROCESSORS: OnceCell<(SpanProcessor, LogProcessor)> =
+ OnceCell::new();
+
+pub fn init(config: OtelConfig) -> anyhow::Result<()> {
// Parse the `OTEL_EXPORTER_OTLP_PROTOCOL` variable. The opentelemetry_*
// crates don't do this automatically.
// TODO(piscisaureus): enable GRPC support.
@@ -318,7 +323,7 @@ fn otel_create_globals(
return Err(anyhow!(
"Failed to read env var OTEL_EXPORTER_OTLP_PROTOCOL: {}",
err
- ))
+ ));
}
};
@@ -372,7 +377,6 @@ fn otel_create_globals(
let mut span_processor =
BatchSpanProcessor::builder(span_exporter, OtelSharedRuntime).build();
span_processor.set_resource(&resource);
- op_state.put::<SpanProcessor>(span_processor);
let log_exporter = HttpExporterBuilder::default()
.with_http_client(client)
@@ -381,36 +385,92 @@ fn otel_create_globals(
let log_processor =
BatchLogProcessor::builder(log_exporter, OtelSharedRuntime).build();
log_processor.set_resource(&resource);
- op_state.put::<LogProcessor>(log_processor);
+
+ OTEL_PROCESSORS
+ .set((span_processor, log_processor))
+ .map_err(|_| anyhow!("failed to init otel"))?;
Ok(())
}
/// This function is called by the runtime whenever it is about to call
-/// `os::process::exit()`, to ensure that all OpenTelemetry logs are properly
+/// `process::exit()`, to ensure that all OpenTelemetry logs are properly
/// flushed before the process terminates.
-pub fn otel_drop_state(state: &mut OpState) {
- if let Some(processor) = state.try_take::<SpanProcessor>() {
- let _ = processor.force_flush();
- drop(processor);
+pub fn flush() {
+ if let Some((span_processor, log_processor)) = OTEL_PROCESSORS.get() {
+ let _ = span_processor.force_flush();
+ let _ = log_processor.force_flush();
}
- if let Some(processor) = state.try_take::<LogProcessor>() {
- let _ = processor.force_flush();
- drop(processor);
+}
+
+pub fn handle_log(record: &log::Record) {
+ use log::Level;
+
+ let Some((_, log_processor)) = OTEL_PROCESSORS.get() else {
+ return;
+ };
+
+ let mut log_record = LogRecord::default();
+
+ log_record.set_observed_timestamp(SystemTime::now());
+ log_record.set_severity_number(match record.level() {
+ Level::Error => Severity::Error,
+ Level::Warn => Severity::Warn,
+ Level::Info => Severity::Info,
+ Level::Debug => Severity::Debug,
+ Level::Trace => Severity::Trace,
+ });
+ log_record.set_severity_text(record.level().as_str());
+ log_record.set_body(record.args().to_string().into());
+ log_record.set_target(record.metadata().target().to_string());
+
+ struct Visitor<'s>(&'s mut LogRecord);
+
+ impl<'s, 'kvs> log::kv::VisitSource<'kvs> for Visitor<'s> {
+ fn visit_pair(
+ &mut self,
+ key: log::kv::Key<'kvs>,
+ value: log::kv::Value<'kvs>,
+ ) -> Result<(), log::kv::Error> {
+ #[allow(clippy::manual_map)]
+ let value = if let Some(v) = value.to_bool() {
+ Some(AnyValue::Boolean(v))
+ } else if let Some(v) = value.to_borrowed_str() {
+ Some(AnyValue::String(v.to_owned().into()))
+ } else if let Some(v) = value.to_f64() {
+ Some(AnyValue::Double(v))
+ } else if let Some(v) = value.to_i64() {
+ Some(AnyValue::Int(v))
+ } else {
+ None
+ };
+
+ if let Some(value) = value {
+ let key = Key::from(key.as_str().to_owned());
+ self.0.add_attribute(key, value);
+ }
+
+ Ok(())
+ }
}
+
+ let _ = record.key_values().visit(&mut Visitor(&mut log_record));
+
+ log_processor.emit(
+ &mut log_record,
+ &InstrumentationScope::builder("deno").build(),
+ );
}
#[op2(fast)]
fn op_otel_log(
- state: &mut OpState,
#[string] message: String,
#[smi] level: i32,
#[string] trace_id: &str,
#[string] span_id: &str,
#[smi] trace_flags: u8,
) {
- let Some(logger) = state.try_borrow::<LogProcessor>() else {
- log::error!("op_otel_log: OpenTelemetry Logger not available");
+ let Some((_, log_processor)) = OTEL_PROCESSORS.get() else {
return;
};
@@ -425,23 +485,21 @@ fn op_otel_log(
let mut log_record = LogRecord::default();
- log_record.observed_timestamp = Some(SystemTime::now());
- log_record.body = Some(message.into());
- log_record.severity_number = Some(severity);
- log_record.severity_text = Some(severity.name());
+ log_record.set_observed_timestamp(SystemTime::now());
+ log_record.set_body(message.into());
+ log_record.set_severity_number(severity);
+ log_record.set_severity_text(severity.name());
if let (Ok(trace_id), Ok(span_id)) =
(TraceId::from_hex(trace_id), SpanId::from_hex(span_id))
{
- let span_context = SpanContext::new(
+ log_record.set_trace_context(
trace_id,
span_id,
- TraceFlags::new(trace_flags),
- false,
- Default::default(),
+ Some(TraceFlags::new(trace_flags)),
);
- log_record.trace_context = Some((&span_context).into());
}
- logger.emit(
+
+ log_processor.emit(
&mut log_record,
&InstrumentationScope::builder("deno").build(),
);
@@ -463,7 +521,7 @@ fn op_otel_span_start<'s>(
end_time: f64,
) -> Result<(), anyhow::Error> {
if let Some(temporary_span) = state.try_take::<TemporarySpan>() {
- let Some(span_processor) = state.try_borrow::<SpanProcessor>() else {
+ let Some((span_processor, _)) = OTEL_PROCESSORS.get() else {
return Ok(());
};
span_processor.on_end(temporary_span.0);
@@ -678,7 +736,7 @@ fn op_otel_span_flush(state: &mut OpState) {
return;
};
- let Some(span_processor) = state.try_borrow::<SpanProcessor>() else {
+ let Some((span_processor, _)) = OTEL_PROCESSORS.get() else {
return;
};