rlg/engine.rs
1// engine.rs
2// Copyright © 2024-2026 RustLogs (RLG). All rights reserved.
3// SPDX-License-Identifier: Apache-2.0
4// SPDX-License-Identifier: MIT
5
6//! Near-lock-free ingestion engine backed by a bounded ring buffer.
7//!
8//! The global [`ENGINE`] accepts [`LogEvent`]s via [`LockFreeEngine::ingest()`]
9//! using only atomic operations. A dedicated background thread drains events
10//! in batches of 64 and writes them through [`PlatformSink`](crate::sink::PlatformSink).
11//!
12//! **The Mutex is never locked on the hot path.** It exists solely for
13//! `shutdown()` to join the flusher thread.
14
15use crate::log_level::LogLevel;
16#[cfg(not(miri))]
17use crate::sink::PlatformSink;
18use crate::tui::TuiMetrics;
19#[cfg(not(miri))]
20use crate::tui::spawn_tui_thread;
21use crossbeam_queue::ArrayQueue;
22use std::fmt;
23use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
24use std::sync::{Arc, LazyLock, Mutex};
25use std::thread;
26#[cfg(not(miri))]
27use std::time::Duration;
28
29/// Capacity of the lock-free ring buffer (number of log events).
30const RING_BUFFER_CAPACITY: usize = 65_536;
31
32/// Maximum number of events drained per flusher wake-up cycle.
33#[cfg(not(miri))]
34const MAX_DRAIN_BATCH_SIZE: usize = 64;
35
36/// A structured log event passed through the ring buffer.
37///
38/// The caller pays only for a `Log` move (~128-byte memcpy).
39/// Serialization happens on the flusher thread.
40#[derive(Debug, Clone)]
41pub struct LogEvent {
42 /// Severity level of this event.
43 pub level: LogLevel,
44 /// Numeric severity for fast level-gating comparisons.
45 pub level_num: u8,
46 /// Structured log data. Formatted on the flusher thread, not here.
47 pub log: crate::log::Log,
48}
49
50/// The near-lock-free ingestion engine.
51///
52/// Owns the ring buffer, flusher thread, and TUI metrics counters.
53/// Access the global instance via [`ENGINE`].
54pub struct LockFreeEngine {
55 /// Bounded ring buffer (lock-free push/pop via `crossbeam`).
56 queue: Arc<ArrayQueue<LogEvent>>,
57 /// Signals the flusher thread to drain and exit.
58 shutdown_flag: Arc<AtomicBool>,
59 /// Atomic counters consumed by the opt-in TUI dashboard.
60 metrics: Arc<TuiMetrics>,
61 /// Minimum severity level. Events below this are dropped at `ingest()`.
62 filter_level: AtomicU8,
63 /// Flusher thread handle for lock-free `unpark()`. No Mutex involved.
64 flusher_thread_handle: Option<thread::Thread>,
65 /// `JoinHandle` for `shutdown()` only. **Never locked on the hot path.**
66 flusher_join: Mutex<Option<thread::JoinHandle<()>>>,
67}
68
69impl fmt::Debug for LockFreeEngine {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 f.debug_struct("LockFreeEngine")
72 .field("queue", &self.queue)
73 .field("shutdown_flag", &self.shutdown_flag)
74 .field("metrics", &self.metrics)
75 .field("filter_level", &self.filter_level)
76 .field(
77 "flusher_thread_handle",
78 &self
79 .flusher_thread_handle
80 .as_ref()
81 .map(thread::Thread::id),
82 )
83 .finish_non_exhaustive()
84 }
85}
86
87/// Global engine instance, lazily initialized on first access.
88pub static ENGINE: LazyLock<LockFreeEngine> =
89 LazyLock::new(|| LockFreeEngine::new(RING_BUFFER_CAPACITY));
90
91impl LockFreeEngine {
92 /// Create a new engine with the given buffer capacity and spawn the flusher.
93 ///
94 /// # Panics
95 ///
96 /// Panics if the OS cannot spawn the background flusher thread.
97 #[must_use]
98 pub fn new(capacity: usize) -> Self {
99 let queue = Arc::new(ArrayQueue::new(capacity));
100 let shutdown_flag = Arc::new(AtomicBool::new(false));
101 let metrics = Arc::new(TuiMetrics::default());
102 let filter_level = AtomicU8::new(0); // Default to ALL
103
104 // Under MIRI, skip spawning background threads to avoid
105 // "main thread terminated without waiting" errors.
106 #[cfg(not(miri))]
107 let flusher_handle = {
108 let flusher_queue = queue.clone();
109 let flusher_shutdown = shutdown_flag.clone();
110
111 // Spawn lightweight OS thread (Runtime Agnostic)
112 let handle = thread::Builder::new()
113 .name("rlg-flusher".into())
114 .spawn(move || {
115 use std::io::Write;
116 let mut sink = PlatformSink::native();
117 let mut fmt_buf = Vec::with_capacity(512);
118
119 loop {
120 let mut batch: [Option<LogEvent>;
121 MAX_DRAIN_BATCH_SIZE] =
122 std::array::from_fn(|_| None);
123 let mut count = 0;
124 while count < MAX_DRAIN_BATCH_SIZE {
125 match flusher_queue.pop() {
126 Some(event) => {
127 batch[count] = Some(event);
128 count += 1;
129 }
130 None => break,
131 }
132 }
133 for event in batch.iter().flatten() {
134 fmt_buf.clear();
135 let _ = writeln!(fmt_buf, "{}", &event.log);
136 sink.emit(event.level.as_str(), &fmt_buf);
137 }
138
139 if flusher_shutdown.load(Ordering::Relaxed)
140 && flusher_queue.is_empty()
141 {
142 break;
143 }
144
145 // Park briefly as fallback; real wakeup comes from unpark() in ingest().
146 thread::park_timeout(Duration::from_millis(5));
147 }
148 })
149 .expect(
150 "Failed to spawn rlg-flusher background thread",
151 );
152
153 // Spawn the TUI dashboard thread if RLG_TUI=1
154 if std::env::var("RLG_TUI")
155 .map(|v| v == "1")
156 .unwrap_or(false)
157 {
158 spawn_tui_thread(
159 metrics.clone(),
160 shutdown_flag.clone(),
161 );
162 }
163
164 Some(handle)
165 };
166
167 #[cfg(miri)]
168 let flusher_handle: Option<thread::JoinHandle<()>> = None;
169
170 let flusher_thread_handle =
171 flusher_handle.as_ref().map(|h| h.thread().clone());
172
173 Self {
174 queue,
175 shutdown_flag,
176 metrics,
177 filter_level,
178 flusher_thread_handle,
179 flusher_join: Mutex::new(flusher_handle),
180 }
181 }
182
183 /// Appends an event to the ring buffer.
184 ///
185 /// If the buffer is full, the oldest event is evicted to make room.
186 /// Dropped events are tracked via `TuiMetrics::dropped_events`.
187 pub fn ingest(&self, event: LogEvent) {
188 if event.level_num < self.filter_level.load(Ordering::Acquire) {
189 return;
190 }
191
192 self.metrics.inc_events();
193 self.metrics.inc_level(event.level);
194
195 if event.level_num >= LogLevel::ERROR.to_numeric() {
196 self.metrics.inc_errors();
197 }
198
199 // If the buffer is full, evict and retry with bounded retries.
200 if let Err(rejected) = self.queue.push(event) {
201 self.metrics.inc_dropped();
202 let mut to_push = rejected;
203 for _ in 0..3 {
204 let _ = self.queue.pop();
205 match self.queue.push(to_push) {
206 Ok(()) => break,
207 Err(e) => to_push = e,
208 }
209 }
210 }
211
212 // Wake the flusher thread — no Mutex on the hot path.
213 if let Some(thread) = &self.flusher_thread_handle {
214 thread.unpark();
215 }
216 }
217
218 /// Sets the global log level filter.
219 pub fn set_filter(&self, level: u8) {
220 self.filter_level.store(level, Ordering::Release);
221 }
222
223 /// Returns the current global log level filter.
224 #[must_use]
225 pub fn filter_level(&self) -> u8 {
226 self.filter_level.load(Ordering::Relaxed)
227 }
228
229 /// Increments the format counter in the TUI metrics.
230 pub fn inc_format(&self, format: crate::log_format::LogFormat) {
231 self.metrics.inc_format(format);
232 }
233
234 /// Increments the active span count in the TUI metrics.
235 pub fn inc_spans(&self) {
236 self.metrics.inc_spans();
237 }
238
239 /// Decrements the active span count in the TUI metrics.
240 pub fn dec_spans(&self) {
241 self.metrics.dec_spans();
242 }
243
244 /// Returns the current number of active spans.
245 #[must_use]
246 pub fn active_spans(&self) -> usize {
247 self.metrics.active_spans.load(Ordering::Relaxed)
248 }
249
250 /// Applies configuration settings to the engine.
251 ///
252 /// Sets the log level filter from the config. File sink construction
253 /// and rotation are handled by the flusher thread at startup via
254 /// [`PlatformSink::from_config`](crate::sink::PlatformSink::from_config).
255 pub fn apply_config(&self, config: &crate::config::Config) {
256 self.set_filter(config.log_level.to_numeric());
257 }
258
259 /// Safely halts the background thread, flushing pending logs.
260 ///
261 /// Signals the flusher thread to stop and waits for it to finish
262 /// draining any remaining events from the queue.
263 pub fn shutdown(&self) {
264 self.shutdown_flag.store(true, Ordering::SeqCst);
265 // Wake the flusher so it can drain and exit.
266 if let Some(thread) = &self.flusher_thread_handle {
267 thread.unpark();
268 }
269 if let Ok(mut guard) = self.flusher_join.lock()
270 && let Some(handle) = guard.take()
271 {
272 let _ = handle.join();
273 }
274 }
275}
276
277/// Zero-Allocation Serializer Helper
278#[derive(Debug, Clone, Copy)]
279pub struct FastSerializer;
280
281impl FastSerializer {
282 /// Appends a u64 integer to a buffer using `itoa` without allocating a String.
283 pub fn append_u64(buf: &mut Vec<u8>, val: u64) {
284 let mut buffer = itoa::Buffer::new();
285 buf.extend_from_slice(buffer.format(val).as_bytes());
286 }
287
288 /// Appends an f64 float to a buffer using `ryu` without allocating a String.
289 pub fn append_f64(buf: &mut Vec<u8>, val: f64) {
290 let mut buffer = ryu::Buffer::new();
291 buf.extend_from_slice(buffer.format(val).as_bytes());
292 }
293}