Mountain/IPC/DevLog/
EmitOTLPSpan.rs1#![allow(non_snake_case)]
2
3use std::{
13 collections::hash_map::DefaultHasher,
14 hash::{Hash, Hasher},
15 sync::{
16 OnceLock,
17 atomic::{AtomicBool, Ordering},
18 },
19};
20
21use crate::{Binary::Build::PostHogPlugin::Constants, IPC::DevLog::NowNano};
22
23static OTLP_AVAILABLE:AtomicBool = AtomicBool::new(true);
24static OTLP_TRACE_ID:OnceLock<String> = OnceLock::new();
25
26fn GetTraceId() -> &'static str {
27 OTLP_TRACE_ID.get_or_init(|| {
28 let mut H = DefaultHasher::new();
29 std::process::id().hash(&mut H);
30 NowNano::Fn().hash(&mut H);
31 format!("{:032x}", H.finish() as u128)
32 })
33}
34
35fn RandU64() -> u64 {
36 let mut H = DefaultHasher::new();
37 std::thread::current().id().hash(&mut H);
38 NowNano::Fn().hash(&mut H);
39 H.finish()
40}
41
42pub fn Fn(Name:&str, StartNano:u64, EndNano:u64, Attributes:&[(&str, &str)]) {
43 if !cfg!(debug_assertions) {
44 return;
45 }
46 if matches!(Constants::TELEMETRY_CAPTURE, "false" | "0" | "off") {
47 return;
48 }
49 if matches!(Constants::OTLP_ENABLED, "false" | "0" | "off") {
50 return;
51 }
52 if !OTLP_AVAILABLE.load(Ordering::Relaxed) {
53 return;
54 }
55
56 let SpanId = format!("{:016x}", RandU64());
57 let TraceId = GetTraceId().to_string();
58 let SpanName = Name.to_string();
59
60 let AttributesJson:Vec<String> = Attributes
61 .iter()
62 .map(|(K, V)| {
63 format!(
64 r#"{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
65 K,
66 V.replace('\\', "\\\\").replace('"', "\\\"")
67 )
68 })
69 .collect();
70
71 let IsError = SpanName.contains("error");
72 let StatusCode = if IsError { 2 } else { 1 };
73 let Payload = format!(
74 concat!(
75 r#"{{"resourceSpans":[{{"resource":{{"attributes":["#,
76 r#"{{"key":"service.name","value":{{"stringValue":"land-editor-mountain"}}}},"#,
77 r#"{{"key":"service.version","value":{{"stringValue":"0.0.1"}}}}"#,
78 r#"]}},"scopeSpans":[{{"scope":{{"name":"mountain.ipc","version":"1.0.0"}},"#,
79 r#""spans":[{{"traceId":"{}","spanId":"{}","name":"{}","kind":1,"#,
80 r#""startTimeUnixNano":"{}","endTimeUnixNano":"{}","#,
81 r#""attributes":[{}],"status":{{"code":{}}}}}]}}]}}]}}"#,
82 ),
83 TraceId,
84 SpanId,
85 SpanName,
86 StartNano,
87 EndNano,
88 AttributesJson.join(","),
89 StatusCode,
90 );
91
92 let (HostAddress, PathSegment) = ParseEndpoint(Constants::OTLP_ENDPOINT);
96
97 std::thread::spawn(move || {
98 use std::{
99 io::{Read as IoRead, Write as IoWrite},
100 net::TcpStream,
101 time::Duration,
102 };
103
104 let Ok(SocketAddress) = HostAddress.parse() else {
105 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
106 return;
107 };
108 let Ok(mut Stream) = TcpStream::connect_timeout(&SocketAddress, Duration::from_millis(200)) else {
109 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
110 return;
111 };
112 let _ = Stream.set_write_timeout(Some(Duration::from_millis(200)));
113 let _ = Stream.set_read_timeout(Some(Duration::from_millis(200)));
114
115 let HttpReq = format!(
116 "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: \
117 close\r\n\r\n",
118 PathSegment,
119 HostAddress,
120 Payload.len()
121 );
122 if Stream.write_all(HttpReq.as_bytes()).is_err() {
123 return;
124 }
125 if Stream.write_all(Payload.as_bytes()).is_err() {
126 return;
127 }
128 let mut Buf = [0u8; 32];
129 let _ = Stream.read(&mut Buf);
130 if !(Buf.starts_with(b"HTTP/1.1 2") || Buf.starts_with(b"HTTP/1.0 2")) {
131 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
132 }
133 });
134}
135
136fn ParseEndpoint(Endpoint:&str) -> (String, String) {
140 let WithoutScheme = Endpoint
141 .strip_prefix("http://")
142 .or_else(|| Endpoint.strip_prefix("https://"))
143 .unwrap_or(Endpoint);
144 let (HostPort, Path) = match WithoutScheme.split_once('/') {
145 Some((HP, Rest)) => (HP.to_string(), format!("/{}", Rest.trim_start_matches('/'))),
146 None => (WithoutScheme.to_string(), "/v1/traces".to_string()),
147 };
148 let PathFinal = if Path == "/" { "/v1/traces".to_string() } else { Path };
149 (HostPort, PathFinal)
150}