Mountain/IPC/DevLog/
EmitOTLPSpan.rs1
2use std::{
12 collections::hash_map::DefaultHasher,
13 hash::{Hash, Hasher},
14 sync::{
15 OnceLock,
16 atomic::{AtomicBool, Ordering},
17 },
18};
19
20use crate::{Binary::Build::PostHogPlugin::Constants, IPC::DevLog::NowNano};
21
22static OTLP_AVAILABLE:AtomicBool = AtomicBool::new(true);
23
24static OTLP_TRACE_ID:OnceLock<String> = OnceLock::new();
25
26fn GetTraceId() -> &'static str {
27 OTLP_TRACE_ID.get_or_init(|| {
28 let mut H = DefaultHasher::new();
29 std::process::id().hash(&mut H);
30 NowNano::Fn().hash(&mut H);
31 format!("{:032x}", H.finish() as u128)
32 })
33}
34
35fn RandU64() -> u64 {
36 let mut H = DefaultHasher::new();
37
38 std::thread::current().id().hash(&mut H);
39
40 NowNano::Fn().hash(&mut H);
41
42 H.finish()
43}
44
45pub fn Fn(Name:&str, StartNano:u64, EndNano:u64, Attributes:&[(&str, &str)]) {
46 if !cfg!(debug_assertions) {
47 return;
48 }
49
50 if matches!(Constants::TELEMETRY_CAPTURE, "false" | "0" | "off") {
51 return;
52 }
53
54 if matches!(Constants::OTLP_ENABLED, "false" | "0" | "off") {
55 return;
56 }
57
58 if !OTLP_AVAILABLE.load(Ordering::Relaxed) {
59 return;
60 }
61
62 let SpanId = format!("{:016x}", RandU64());
63
64 let TraceId = GetTraceId().to_string();
65
66 let SpanName = Name.to_string();
67
68 let AttributesJson:Vec<String> = Attributes
69 .iter()
70 .map(|(K, V)| {
71 format!(
72 r#"{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
73 K,
74 V.replace('\\', "\\\\").replace('"', "\\\"")
75 )
76 })
77 .collect();
78
79 let IsError = SpanName.contains("error");
80
81 let StatusCode = if IsError { 2 } else { 1 };
82
83 let Payload = format!(
84 concat!(
85 r#"{{"resourceSpans":[{{"resource":{{"attributes":["#,
86 r#"{{"key":"service.name","value":{{"stringValue":"land-editor-mountain"}}}},"#,
87 r#"{{"key":"service.version","value":{{"stringValue":"0.0.1"}}}}"#,
88 r#"]}},"scopeSpans":[{{"scope":{{"name":"mountain.ipc","version":"1.0.0"}},"#,
89 r#""spans":[{{"traceId":"{}","spanId":"{}","name":"{}","kind":1,"#,
90 r#""startTimeUnixNano":"{}","endTimeUnixNano":"{}","#,
91 r#""attributes":[{}],"status":{{"code":{}}}}}]}}]}}]}}"#,
92 ),
93 TraceId,
94 SpanId,
95 SpanName,
96 StartNano,
97 EndNano,
98 AttributesJson.join(","),
99 StatusCode,
100 );
101
102 let (HostAddress, PathSegment) = ParseEndpoint(Constants::OTLP_ENDPOINT);
106
107 std::thread::spawn(move || {
108 use std::{
109 io::{Read as IoRead, Write as IoWrite},
110 net::TcpStream,
111 time::Duration,
112 };
113
114 let Ok(SocketAddress) = HostAddress.parse() else {
115 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
116 return;
117 };
118 let Ok(mut Stream) = TcpStream::connect_timeout(&SocketAddress, Duration::from_millis(200)) else {
119 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
120 return;
121 };
122 let _ = Stream.set_write_timeout(Some(Duration::from_millis(200)));
123 let _ = Stream.set_read_timeout(Some(Duration::from_millis(200)));
124
125 let HttpReq = format!(
126 "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: \
127 close\r\n\r\n",
128 PathSegment,
129 HostAddress,
130 Payload.len()
131 );
132 if Stream.write_all(HttpReq.as_bytes()).is_err() {
133 return;
134 }
135 if Stream.write_all(Payload.as_bytes()).is_err() {
136 return;
137 }
138 let mut Buf = [0u8; 32];
139 let _ = Stream.read(&mut Buf);
140 if !(Buf.starts_with(b"HTTP/1.1 2") || Buf.starts_with(b"HTTP/1.0 2")) {
141 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
142 }
143 });
144}
145
146fn ParseEndpoint(Endpoint:&str) -> (String, String) {
150 let WithoutScheme = Endpoint
151 .strip_prefix("http://")
152 .or_else(|| Endpoint.strip_prefix("https://"))
153 .unwrap_or(Endpoint);
154
155 let (HostPort, Path) = match WithoutScheme.split_once('/') {
156 Some((HP, Rest)) => (HP.to_string(), format!("/{}", Rest.trim_start_matches('/'))),
157
158 None => (WithoutScheme.to_string(), "/v1/traces".to_string()),
159 };
160
161 let PathFinal = if Path == "/" { "/v1/traces".to_string() } else { Path };
162
163 (HostPort, PathFinal)
164}