Skip to main content

rlg/
engine.rs

1// engine.rs
2// Copyright © 2024-2026 RustLogs (RLG). All rights reserved.
3// SPDX-License-Identifier: Apache-2.0
4// SPDX-License-Identifier: MIT
5
6//! Near-lock-free ingestion engine backed by a bounded ring buffer.
7//!
8//! The global [`ENGINE`] accepts [`LogEvent`]s via [`LockFreeEngine::ingest()`]
9//! using only atomic operations. A dedicated background thread drains events
10//! in batches of 64 and writes them through [`PlatformSink`](crate::sink::PlatformSink).
11//!
12//! **The Mutex is never locked on the hot path.** It exists solely for
13//! `shutdown()` to join the flusher thread.
14
15use crate::log_level::LogLevel;
16#[cfg(not(miri))]
17use crate::sink::PlatformSink;
18use crate::tui::TuiMetrics;
19#[cfg(not(miri))]
20use crate::tui::spawn_tui_thread;
21use crossbeam_queue::ArrayQueue;
22use std::fmt;
23use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
24use std::sync::{Arc, LazyLock, Mutex};
25use std::thread;
26#[cfg(not(miri))]
27use std::time::Duration;
28
29/// Capacity of the lock-free ring buffer (number of log events).
30const RING_BUFFER_CAPACITY: usize = 65_536;
31
32/// Maximum number of events drained per flusher wake-up cycle.
33#[cfg(not(miri))]
34const MAX_DRAIN_BATCH_SIZE: usize = 64;
35
36/// A structured log event passed through the ring buffer.
37///
38/// The caller pays only for a `Log` move (~128-byte memcpy).
39/// Serialization happens on the flusher thread.
40#[derive(Debug, Clone)]
41pub struct LogEvent {
42    /// Severity level of this event.
43    pub level: LogLevel,
44    /// Numeric severity for fast level-gating comparisons.
45    pub level_num: u8,
46    /// Structured log data. Formatted on the flusher thread, not here.
47    pub log: crate::log::Log,
48}
49
50/// The near-lock-free ingestion engine.
51///
52/// Owns the ring buffer, flusher thread, and TUI metrics counters.
53/// Access the global instance via [`ENGINE`].
54pub struct LockFreeEngine {
55    /// Bounded ring buffer (lock-free push/pop via `crossbeam`).
56    queue: Arc<ArrayQueue<LogEvent>>,
57    /// Signals the flusher thread to drain and exit.
58    shutdown_flag: Arc<AtomicBool>,
59    /// Atomic counters consumed by the opt-in TUI dashboard.
60    metrics: Arc<TuiMetrics>,
61    /// Minimum severity level. Events below this are dropped at `ingest()`.
62    filter_level: AtomicU8,
63    /// Flusher thread handle for lock-free `unpark()`. No Mutex involved.
64    flusher_thread_handle: Option<thread::Thread>,
65    /// `JoinHandle` for `shutdown()` only. **Never locked on the hot path.**
66    flusher_join: Mutex<Option<thread::JoinHandle<()>>>,
67}
68
69impl fmt::Debug for LockFreeEngine {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        f.debug_struct("LockFreeEngine")
72            .field("queue", &self.queue)
73            .field("shutdown_flag", &self.shutdown_flag)
74            .field("metrics", &self.metrics)
75            .field("filter_level", &self.filter_level)
76            .field(
77                "flusher_thread_handle",
78                &self
79                    .flusher_thread_handle
80                    .as_ref()
81                    .map(thread::Thread::id),
82            )
83            .finish_non_exhaustive()
84    }
85}
86
87/// Global engine instance, lazily initialized on first access.
88pub static ENGINE: LazyLock<LockFreeEngine> =
89    LazyLock::new(|| LockFreeEngine::new(RING_BUFFER_CAPACITY));
90
91impl LockFreeEngine {
92    /// Create a new engine with the given buffer capacity and spawn the flusher.
93    ///
94    /// # Panics
95    ///
96    /// Panics if the OS cannot spawn the background flusher thread.
97    #[must_use]
98    pub fn new(capacity: usize) -> Self {
99        let queue = Arc::new(ArrayQueue::new(capacity));
100        let shutdown_flag = Arc::new(AtomicBool::new(false));
101        let metrics = Arc::new(TuiMetrics::default());
102        let filter_level = AtomicU8::new(0); // Default to ALL
103
104        // Under MIRI, skip spawning background threads to avoid
105        // "main thread terminated without waiting" errors.
106        #[cfg(not(miri))]
107        let flusher_handle = {
108            let flusher_queue = queue.clone();
109            let flusher_shutdown = shutdown_flag.clone();
110
111            // Spawn lightweight OS thread (Runtime Agnostic)
112            let handle = thread::Builder::new()
113                .name("rlg-flusher".into())
114                .spawn(move || {
115                    use std::io::Write;
116                    let mut sink = PlatformSink::native();
117                    let mut fmt_buf = Vec::with_capacity(512);
118
119                    loop {
120                        let mut batch: [Option<LogEvent>;
121                            MAX_DRAIN_BATCH_SIZE] =
122                            std::array::from_fn(|_| None);
123                        let mut count = 0;
124                        while count < MAX_DRAIN_BATCH_SIZE {
125                            match flusher_queue.pop() {
126                                Some(event) => {
127                                    batch[count] = Some(event);
128                                    count += 1;
129                                }
130                                None => break,
131                            }
132                        }
133                        for event in batch.iter().flatten() {
134                            fmt_buf.clear();
135                            let _ = writeln!(fmt_buf, "{}", &event.log);
136                            sink.emit(event.level.as_str(), &fmt_buf);
137                        }
138
139                        if flusher_shutdown.load(Ordering::Relaxed)
140                            && flusher_queue.is_empty()
141                        {
142                            break;
143                        }
144
145                        // Park briefly as fallback; real wakeup comes from unpark() in ingest().
146                        thread::park_timeout(Duration::from_millis(5));
147                    }
148                })
149                .expect(
150                    "Failed to spawn rlg-flusher background thread",
151                );
152
153            // Spawn the TUI dashboard thread if RLG_TUI=1
154            if std::env::var("RLG_TUI")
155                .map(|v| v == "1")
156                .unwrap_or(false)
157            {
158                spawn_tui_thread(
159                    metrics.clone(),
160                    shutdown_flag.clone(),
161                );
162            }
163
164            Some(handle)
165        };
166
167        #[cfg(miri)]
168        let flusher_handle: Option<thread::JoinHandle<()>> = None;
169
170        let flusher_thread_handle =
171            flusher_handle.as_ref().map(|h| h.thread().clone());
172
173        Self {
174            queue,
175            shutdown_flag,
176            metrics,
177            filter_level,
178            flusher_thread_handle,
179            flusher_join: Mutex::new(flusher_handle),
180        }
181    }
182
183    /// Appends an event to the ring buffer.
184    ///
185    /// If the buffer is full, the oldest event is evicted to make room.
186    /// Dropped events are tracked via `TuiMetrics::dropped_events`.
187    pub fn ingest(&self, event: LogEvent) {
188        if event.level_num < self.filter_level.load(Ordering::Acquire) {
189            return;
190        }
191
192        self.metrics.inc_events();
193        self.metrics.inc_level(event.level);
194
195        if event.level_num >= LogLevel::ERROR.to_numeric() {
196            self.metrics.inc_errors();
197        }
198
199        // If the buffer is full, evict and retry with bounded retries.
200        if let Err(rejected) = self.queue.push(event) {
201            self.metrics.inc_dropped();
202            let mut to_push = rejected;
203            for _ in 0..3 {
204                let _ = self.queue.pop();
205                match self.queue.push(to_push) {
206                    Ok(()) => break,
207                    Err(e) => to_push = e,
208                }
209            }
210        }
211
212        // Wake the flusher thread — no Mutex on the hot path.
213        if let Some(thread) = &self.flusher_thread_handle {
214            thread.unpark();
215        }
216    }
217
218    /// Sets the global log level filter.
219    pub fn set_filter(&self, level: u8) {
220        self.filter_level.store(level, Ordering::Release);
221    }
222
223    /// Returns the current global log level filter.
224    #[must_use]
225    pub fn filter_level(&self) -> u8 {
226        self.filter_level.load(Ordering::Relaxed)
227    }
228
229    /// Increments the format counter in the TUI metrics.
230    pub fn inc_format(&self, format: crate::log_format::LogFormat) {
231        self.metrics.inc_format(format);
232    }
233
234    /// Increments the active span count in the TUI metrics.
235    pub fn inc_spans(&self) {
236        self.metrics.inc_spans();
237    }
238
239    /// Decrements the active span count in the TUI metrics.
240    pub fn dec_spans(&self) {
241        self.metrics.dec_spans();
242    }
243
244    /// Returns the current number of active spans.
245    #[must_use]
246    pub fn active_spans(&self) -> usize {
247        self.metrics.active_spans.load(Ordering::Relaxed)
248    }
249
250    /// Applies configuration settings to the engine.
251    ///
252    /// Sets the log level filter from the config. File sink construction
253    /// and rotation are handled by the flusher thread at startup via
254    /// [`PlatformSink::from_config`](crate::sink::PlatformSink::from_config).
255    pub fn apply_config(&self, config: &crate::config::Config) {
256        self.set_filter(config.log_level.to_numeric());
257    }
258
259    /// Safely halts the background thread, flushing pending logs.
260    ///
261    /// Signals the flusher thread to stop and waits for it to finish
262    /// draining any remaining events from the queue.
263    pub fn shutdown(&self) {
264        self.shutdown_flag.store(true, Ordering::SeqCst);
265        // Wake the flusher so it can drain and exit.
266        if let Some(thread) = &self.flusher_thread_handle {
267            thread.unpark();
268        }
269        if let Ok(mut guard) = self.flusher_join.lock()
270            && let Some(handle) = guard.take()
271        {
272            let _ = handle.join();
273        }
274    }
275}
276
277/// Zero-Allocation Serializer Helper
278#[derive(Debug, Clone, Copy)]
279pub struct FastSerializer;
280
281impl FastSerializer {
282    /// Appends a u64 integer to a buffer using `itoa` without allocating a String.
283    pub fn append_u64(buf: &mut Vec<u8>, val: u64) {
284        let mut buffer = itoa::Buffer::new();
285        buf.extend_from_slice(buffer.format(val).as_bytes());
286    }
287
288    /// Appends an f64 float to a buffer using `ryu` without allocating a String.
289    pub fn append_f64(buf: &mut Vec<u8>, val: f64) {
290        let mut buffer = ryu::Buffer::new();
291        buf.extend_from_slice(buffer.format(val).as_bytes());
292    }
293}