Skip to main content

Mountain/IPC/DevLog/
EmitOTLPSpan.rs

1
2//! Fire-and-forget OTLP span exporter. Sends a single
3//! `resourceSpans` payload over plain HTTP to the collector at
4//! `OTLPEndpoint` (default `127.0.0.1:4318`, configurable via
5//! `.env.Land.PostHog`). Stops trying after the first failure
6//! (`OTLP_AVAILABLE` flips to `false`) so a missing collector
7//! doesn't tax every IPC call. Release builds are compiled out
8//! via `cfg!(debug_assertions)`. Honors the `Capture` master
9//! telemetry kill switch and the per-pipe `OTLPEnabled` toggle.
10
11use std::{
12	collections::hash_map::DefaultHasher,
13	hash::{Hash, Hasher},
14	sync::{
15		OnceLock,
16		atomic::{AtomicBool, Ordering},
17	},
18};
19
20use crate::{Binary::Build::PostHogPlugin::Constants, IPC::DevLog::NowNano};
21
22static OTLP_AVAILABLE:AtomicBool = AtomicBool::new(true);
23
24static OTLP_TRACE_ID:OnceLock<String> = OnceLock::new();
25
26fn GetTraceId() -> &'static str {
27	OTLP_TRACE_ID.get_or_init(|| {
28		let mut H = DefaultHasher::new();
29		std::process::id().hash(&mut H);
30		NowNano::Fn().hash(&mut H);
31		format!("{:032x}", H.finish() as u128)
32	})
33}
34
35fn RandU64() -> u64 {
36	let mut H = DefaultHasher::new();
37
38	std::thread::current().id().hash(&mut H);
39
40	NowNano::Fn().hash(&mut H);
41
42	H.finish()
43}
44
45pub fn Fn(Name:&str, StartNano:u64, EndNano:u64, Attributes:&[(&str, &str)]) {
46	if !cfg!(debug_assertions) {
47		return;
48	}
49
50	if matches!(Constants::TELEMETRY_CAPTURE, "false" | "0" | "off") {
51		return;
52	}
53
54	if matches!(Constants::OTLP_ENABLED, "false" | "0" | "off") {
55		return;
56	}
57
58	if !OTLP_AVAILABLE.load(Ordering::Relaxed) {
59		return;
60	}
61
62	let SpanId = format!("{:016x}", RandU64());
63
64	let TraceId = GetTraceId().to_string();
65
66	let SpanName = Name.to_string();
67
68	let AttributesJson:Vec<String> = Attributes
69		.iter()
70		.map(|(K, V)| {
71			format!(
72				r#"{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
73				K,
74				V.replace('\\', "\\\\").replace('"', "\\\"")
75			)
76		})
77		.collect();
78
79	let IsError = SpanName.contains("error");
80
81	let StatusCode = if IsError { 2 } else { 1 };
82
83	let Payload = format!(
84		concat!(
85			r#"{{"resourceSpans":[{{"resource":{{"attributes":["#,
86			r#"{{"key":"service.name","value":{{"stringValue":"land-editor-mountain"}}}},"#,
87			r#"{{"key":"service.version","value":{{"stringValue":"0.0.1"}}}}"#,
88			r#"]}},"scopeSpans":[{{"scope":{{"name":"mountain.ipc","version":"1.0.0"}},"#,
89			r#""spans":[{{"traceId":"{}","spanId":"{}","name":"{}","kind":1,"#,
90			r#""startTimeUnixNano":"{}","endTimeUnixNano":"{}","#,
91			r#""attributes":[{}],"status":{{"code":{}}}}}]}}]}}]}}"#,
92		),
93		TraceId,
94		SpanId,
95		SpanName,
96		StartNano,
97		EndNano,
98		AttributesJson.join(","),
99		StatusCode,
100	);
101
102	// Resolve `OTLPEndpoint` (e.g. `http://127.0.0.1:4318`) → host:port + path.
103	// Strip scheme, split on `/` for the path component if any, default to
104	// `/v1/traces`.
105	let (HostAddress, PathSegment) = ParseEndpoint(Constants::OTLP_ENDPOINT);
106
107	std::thread::spawn(move || {
108		use std::{
109			io::{Read as IoRead, Write as IoWrite},
110			net::TcpStream,
111			time::Duration,
112		};
113
114		let Ok(SocketAddress) = HostAddress.parse() else {
115			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
116			return;
117		};
118		let Ok(mut Stream) = TcpStream::connect_timeout(&SocketAddress, Duration::from_millis(200)) else {
119			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
120			return;
121		};
122		let _ = Stream.set_write_timeout(Some(Duration::from_millis(200)));
123		let _ = Stream.set_read_timeout(Some(Duration::from_millis(200)));
124
125		let HttpReq = format!(
126			"POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: \
127			 close\r\n\r\n",
128			PathSegment,
129			HostAddress,
130			Payload.len()
131		);
132		if Stream.write_all(HttpReq.as_bytes()).is_err() {
133			return;
134		}
135		if Stream.write_all(Payload.as_bytes()).is_err() {
136			return;
137		}
138		let mut Buf = [0u8; 32];
139		let _ = Stream.read(&mut Buf);
140		if !(Buf.starts_with(b"HTTP/1.1 2") || Buf.starts_with(b"HTTP/1.0 2")) {
141			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
142		}
143	});
144}
145
146/// Split `http://host:port/path` into `(host:port, /path)`. Defaults the
147/// path to `/v1/traces` when the endpoint has none. Returns owned `String`s
148/// so the spawned thread does not borrow the build-time `&'static str`.
149fn ParseEndpoint(Endpoint:&str) -> (String, String) {
150	let WithoutScheme = Endpoint
151		.strip_prefix("http://")
152		.or_else(|| Endpoint.strip_prefix("https://"))
153		.unwrap_or(Endpoint);
154
155	let (HostPort, Path) = match WithoutScheme.split_once('/') {
156		Some((HP, Rest)) => (HP.to_string(), format!("/{}", Rest.trim_start_matches('/'))),
157
158		None => (WithoutScheme.to_string(), "/v1/traces".to_string()),
159	};
160
161	let PathFinal = if Path == "/" { "/v1/traces".to_string() } else { Path };
162
163	(HostPort, PathFinal)
164}