From a77f1a7aa4e145d081c91c315fa61cdd4a5af65a Mon Sep 17 00:00:00 2001 From: Arthur Peters Date: Thu, 30 Apr 2026 10:59:49 -0500 Subject: [PATCH 1/3] Support capturing scheduling events into an OQueue and into data files (#214) This adds a scheduling event OQueue and support for capturing it. Unlike most OQueues this one must be enabled by a feature. See the documentation file. --- Cargo.lock | 1 + docs/src/ostd/data-capture.md | 15 ++++++ kernel/comps/time/Cargo.toml | 1 + kernel/comps/time/src/clocksource.rs | 3 +- kernel/src/event.rs | 48 +++++++++++++++++++ kernel/src/lib.rs | 54 +++++++++++++++++++++ ostd/Cargo.toml | 1 + ostd/src/lib.rs | 2 + ostd/src/task/scheduler/mod.rs | 70 ++++++++++++++++++++++++++++ 9 files changed, 194 insertions(+), 1 deletion(-) create mode 100644 docs/src/ostd/data-capture.md create mode 100644 kernel/src/event.rs diff --git a/Cargo.lock b/Cargo.lock index eacecd04e..17c0f1d2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -311,6 +311,7 @@ dependencies = [ "component", "log", "ostd", + "serde", "spin", ] diff --git a/docs/src/ostd/data-capture.md b/docs/src/ostd/data-capture.md new file mode 100644 index 000000000..2d87b2b67 --- /dev/null +++ b/docs/src/ostd/data-capture.md @@ -0,0 +1,15 @@ +# Data OQueues available in OSTD + +## Scheduler + +Scheduling events can placed on an OQueue. Unlike most OQueues, this is disabled by default, because +of the potential for unknown overhead in a very sensitive part of the system. It can be enabled with +the `capture_scheduling` feature. You can enable this feature with `--features +ostd/capture_scheduling` on the `cargo osdk` command line. (Or the +`FEATURES=ostd/capture_scheduling` environment variable for OSTDs own makefiles.) + +(NOTE: Once we are confident that the overhead is low enough, `capture_scheduling` will be enabled +by default.) + +To have the the Mariposa kernel capture the scheduling events to a file, add this *and* the kernel +command line argument `scheduler.capture_data=true`. \ No newline at end of file diff --git a/kernel/comps/time/Cargo.toml b/kernel/comps/time/Cargo.toml index ada49bac0..492ddd4aa 100644 --- a/kernel/comps/time/Cargo.toml +++ b/kernel/comps/time/Cargo.toml @@ -11,6 +11,7 @@ aster-util = { path = "../../libs/aster-util" } component = { path = "../../libs/comp-sys/component" } log = "0.4" spin = "0.9.4" +serde = { version = "1.0", default-features = false, features = ["derive", "alloc"]} [target.riscv64gc-unknown-none-elf.dependencies] chrono = { version = "0.4.38", default-features = false } diff --git a/kernel/comps/time/src/clocksource.rs b/kernel/comps/time/src/clocksource.rs index 61780bb0d..1edaa1a3a 100644 --- a/kernel/comps/time/src/clocksource.rs +++ b/kernel/comps/time/src/clocksource.rs @@ -15,6 +15,7 @@ use core::{cmp::max, ops::Add, time::Duration}; use aster_util::coeff::Coeff; use ostd::sync::{LocalIrqDisabled, RwLock}; +use serde::Serialize; use crate::NANOS_PER_SECOND; @@ -172,7 +173,7 @@ impl ClockSource { /// elapsed since a reference point (typically the system boot time). /// The [`Instant`] is expressed in seconds and the fractional part is /// expressed in nanoseconds. -#[derive(Debug, Default, Copy, Clone)] +#[derive(Debug, Default, Copy, Clone, Serialize)] pub struct Instant { secs: u64, nanos: u32, diff --git a/kernel/src/event.rs b/kernel/src/event.rs new file mode 100644 index 000000000..434dcb956 --- /dev/null +++ b/kernel/src/event.rs @@ -0,0 +1,48 @@ +// SPDX-License-Identifier: MPL-2.0 + +use aster_time::Instant; +use ostd::task::Task; +use serde::Serialize; + +use crate::{process::posix_thread::AsPosixThread as _, thread::Tid}; + +#[derive(Debug, Clone, Copy, Serialize)] +pub enum TaskId { + KernelTask(usize), + PosixThread(Tid), + Unknown, +} + +impl TaskId { + pub fn new(task: &Task) -> Self { + if let Some(t) = task.as_posix_thread() { + Self::PosixThread(t.tid()) + } else { + Self::KernelTask(task.id().into()) + } + } +} + +#[derive(Debug, Clone, Copy, Serialize)] +pub struct EventContext { + pub task: TaskId, + pub timestamp: Instant, +} + +impl EventContext { + /// Creates a new EventContext from the current context + pub fn new() -> Self { + EventContext { + task: Task::current() + .map(|t| TaskId::new(&t)) + .unwrap_or(TaskId::Unknown), + timestamp: aster_time::read_monotonic_time().into(), + } + } +} + +impl Default for EventContext { + fn default() -> Self { + Self::new() + } +} diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index cb8b23a79..ba38db9a2 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -26,19 +26,32 @@ use aster_framebuffer::FRAMEBUFFER_CONSOLE; #[cfg(not(baseline_asterinas))] mod data_capture; +pub mod event; +use aster_time::Instant; #[cfg(not(baseline_asterinas))] pub use data_capture::{new_data_capture_file, new_legacy_data_capture_file}; use kcmdline::KCmdlineArg; +#[cfg(not(baseline_asterinas))] +use mariposa_data_capture::ObserverRegistration; use ostd::{ arch::qemu::{QemuExitCode, exit_qemu}, boot::boot_info, cpu::{CpuId, CpuSet}, + ignore_err, + task::scheduler::{SchedulingEvent, SchedulingEventKind}, +}; +#[cfg(not(baseline_asterinas))] +use ostd::{ + orpc::oqueue::{OQueueBase as _, ObservationQuery, registry::lookup_by_path}, + path, }; use process::{Process, spawn_init_process}; use sched::SchedPolicy; +use serde::Serialize; use spin::Once; use crate::{ + event::{EventContext, TaskId}, kcmdline::set_kernel_cmd_line, prelude::*, thread::kernel_thread::ThreadOptions, @@ -225,6 +238,47 @@ fn init_thread() { pmu.start(); } + #[cfg(not(baseline_asterinas))] + if karg + .get_module_arg_by_name::("scheduler", "capture_data") + .unwrap_or(false) + { + if let Some(oqueue) = lookup_by_path::(&path!(scheduler.events)) { + #[derive(Debug, Clone, Copy, Serialize)] + struct KernelSchedulingEvent { + timestamp: Instant, + task: TaskId, + kind: SchedulingEventKind, + } + + let capture_file = new_data_capture_file::( + mariposa_data_capture::FileDescriptor { + path: path!(scheduler.events), + length: 500 * 1024 * 1024, + }, + ); + + ignore_err!( + capture_file.register_observer(ObserverRegistration { + path: path!(scheduler.events), + observer: oqueue + .attach_strong_observer(ObservationQuery::new(|e: &SchedulingEvent| { + let context = EventContext::new(); + KernelSchedulingEvent { + timestamp: context.timestamp, + kind: e.kind, + task: TaskId::new(&e.task), + } + })) + .unwrap(), + }) + ); + ignore_err!(capture_file.start()); + } else { + error!("Could not find scheduler.events OQueue. Scheduler events will not be captured.") + } + } + // Wait till initproc become zombie. while !initproc.status().is_zombie() { ostd::task::halt_cpu(); diff --git a/ostd/Cargo.toml b/ostd/Cargo.toml index 133c11dca..6f516ceb1 100644 --- a/ostd/Cargo.toml +++ b/ostd/Cargo.toml @@ -66,6 +66,7 @@ fdt = { version = "0.1.5", features = ["pretty-printing"] } default = ["cvm_guest"] capture_stacks = [] track_mutex = [] +capture_scheduling = [] # The guest OS support for Confidential VMs (CVMs), e.g., Intel TDX cvm_guest = ["dep:tdx-guest", "dep:iced-x86"] diff --git a/ostd/src/lib.rs b/ostd/src/lib.rs index 8cfc75221..8f2f585a1 100644 --- a/ostd/src/lib.rs +++ b/ostd/src/lib.rs @@ -138,6 +138,8 @@ unsafe fn init() { bus::init(); + task::scheduler::init(); + arch::irq::enable_local(); invoke_ffi_init_funcs(); diff --git a/ostd/src/task/scheduler/mod.rs b/ostd/src/task/scheduler/mod.rs index 6106d0978..8c176fe42 100644 --- a/ostd/src/task/scheduler/mod.rs +++ b/ostd/src/task/scheduler/mod.rs @@ -10,9 +10,12 @@ pub mod info; use core::time::Duration; +use serde::Serialize; use spin::Once; use super::{Task, preempt::cpu_local, processor}; +#[cfg(feature = "capture_scheduling")] +use crate::orpc::oqueue::RefProducer; use crate::{ cpu::{CpuId, CpuSet, PinCurrentCpu}, prelude::*, @@ -20,6 +23,35 @@ use crate::{ timer, }; +/// Initialize scheduler globals. +/// +/// This should be called before the scheduler runs for the first time, but after the allocator is +/// fully initialized. +pub(crate) fn init() { + #[cfg(feature = "capture_scheduling")] + SCHEDULING_EVENT_PRODUCER.call_once(|| { + use crate::{ + orpc::oqueue::{OQueue, OQueueRef}, + path, + }; + + // TODO(arthurp): This calls the OQueue constructor before the scheduler is running. This is + // probably safe, but we should have documentation on when and why this is allowed. + let oqueue = OQueueRef::new(1024, path!(scheduler.events)); + oqueue.attach_ref_producer().unwrap() + }); +} + +#[cfg(feature = "capture_scheduling")] +static SCHEDULING_EVENT_PRODUCER: Once> = Once::new(); + +/// Get the producer handle for the scheduling event OQueue. This will panic if called before +/// [`init()`]. +#[cfg(feature = "capture_scheduling")] +fn get_scheduling_event_producer() -> &'static crate::orpc::oqueue::RefProducer { + SCHEDULING_EVENT_PRODUCER.get().unwrap() +} + /// Injects a scheduler implementation into framework. /// /// This function can only be called once and must be called during the initialization of kernel. @@ -37,6 +69,24 @@ pub fn inject_scheduler(scheduler: &'static dyn Scheduler) { static SCHEDULER: Once<&'static dyn Scheduler> = Once::new(); +/// An event either or scheduling or descheduling a task. +#[derive(Debug)] +pub struct SchedulingEvent { + /// The task + pub task: Arc, + /// The kind of event + pub kind: SchedulingEventKind, +} + +/// The kind of a [`SchedulingEvent`]. +#[derive(Debug, Clone, Copy, Serialize)] +pub enum SchedulingEventKind { + /// The task is about to start executing. + Schedule, + /// The task has stopped executing. + Deschedule, +} + /// A per-CPU task scheduler. pub trait Scheduler: Sync + Send { /// Enqueues a runnable task. @@ -270,6 +320,26 @@ where }; }; + // This redefines `next_task` with the same value it started with, but moves the value out + // temporarily. This avoids an atomic incr and decr. + #[cfg(feature = "capture_scheduling")] + let next_task = { + let producer = get_scheduling_event_producer(); + if let Some(t) = Task::current() { + producer.produce_ref(&SchedulingEvent { + task: t.cloned(), + kind: SchedulingEventKind::Deschedule, + }); + } + + let scheduling_event = SchedulingEvent { + task: next_task, + kind: SchedulingEventKind::Schedule, + }; + producer.produce_ref(&scheduling_event); + scheduling_event.task + }; + // `switch_to_task` will spin if it finds that the next task is still running on some CPU core, // which guarantees soundness regardless of the scheduler implementation. // From d3dab6fc8f98d00a8e20f3c7c6678f2dba583b4d Mon Sep 17 00:00:00 2001 From: Arthur Peters Date: Thu, 30 Apr 2026 14:28:13 -0500 Subject: [PATCH 2/3] Add path metadata and accessor to OQueues. (#220) This adds path metadata to OQueues in a way that it can be accessed via the OQueueRef. --- ostd/src/orpc/oqueue/implementation.rs | 12 ++++++++- ostd/src/orpc/oqueue/mod.rs | 35 +++++++++++++++++++++++--- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/ostd/src/orpc/oqueue/implementation.rs b/ostd/src/orpc/oqueue/implementation.rs index eeffe9acf..dc6cf0114 100644 --- a/ostd/src/orpc/oqueue/implementation.rs +++ b/ostd/src/orpc/oqueue/implementation.rs @@ -37,6 +37,7 @@ use crate::{ Cursor, InlineStrongObserver, OQueueError, ObservationQuery, ResourceUnavailableSnafu, single_thread_ring_buffer::RingBuffer, }, + path::Path, }, sync::{LocalIrqDisabled, SpinLock, WaitQueue, WakerKey}, }; @@ -65,6 +66,7 @@ pub(crate) struct OQueueImplementation { /// The size to use for the consumer and strong-observer ring-buffers. len: usize, supports_consume: bool, + path: Option, pub(super) put_wait_queue: WaitQueue, pub(super) read_wait_queue: WaitQueue, } @@ -74,7 +76,9 @@ impl OQueueImplementation { /// /// * `len` is the ring buffer length used for consumers and strong-observers. /// * `supports_consume` specifies the attachment it allows later. - pub(crate) fn new(mut len: usize, supports_consume: bool) -> Self { + /// * `paths` are the paths associated with this OQueue; pass an empty `Vec` for anonymous + /// queues. + pub(crate) fn new(mut len: usize, supports_consume: bool, path: Option) -> Self { if len < 2 { warn!( "Creating an OQueue with length {len} is automatically increased to 2. Ring buffers smaller than 2 are not supported." @@ -91,11 +95,17 @@ impl OQueueImplementation { }), len, supports_consume, + path, put_wait_queue: WaitQueue::new(), read_wait_queue: WaitQueue::new(), } } + /// Return the path associated with this OQueue, or `None` for anonymous queues. + pub(super) fn path(&self) -> Option<&Path> { + self.path.as_ref() + } + /// Detach a consumer. This will free the consumer ring buffer if there are no consumers left. pub(super) fn detach_consumer(self: &Arc) { let mut inner = self.inner.lock(); diff --git a/ostd/src/orpc/oqueue/mod.rs b/ostd/src/orpc/oqueue/mod.rs index e2db16275..02febcf26 100644 --- a/ostd/src/orpc/oqueue/mod.rs +++ b/ostd/src/orpc/oqueue/mod.rs @@ -139,6 +139,9 @@ pub trait OQueueBase { where U: Copy + Send + 'static; + /// Return the path of this OQueue, or `None` if it is anonymous. + fn path(&self) -> Option<&Path>; + /// Erase the kind of OQueue. This will not allow additional operations to succeed. It /// simply makes the checks dynamic. fn as_any_oqueue(&self) -> AnyOQueueRef; @@ -198,6 +201,10 @@ macro_rules! impl_oqueue_base_forward { self.$member.attach_inline_strong_observer(f) } + fn path(&self) -> Option<&Path> { + self.$member.path() + } + fn as_any_oqueue(&self) -> AnyOQueueRef { self.$member.as_any_oqueue() } @@ -286,7 +293,12 @@ impl ConsumableOQueueRef { /// Create a new OQueue with the specified buffer length and support for produce by value and /// consumers. pub fn new(len: usize, path: Path) -> Self { - let ret = Self::new_anonymous(len); + let inner = Arc::new(implementation::OQueueImplementation::new( + len, + true, + Some(path.clone()), + )); + let ret = Self { inner }; registry::register(&path, ret.as_any_oqueue()); ret } @@ -296,7 +308,7 @@ impl ConsumableOQueueRef { /// observation such as for ephemeral queues. pub fn new_anonymous(len: usize) -> Self { Self { - inner: Arc::new(implementation::OQueueImplementation::new(len, true)), + inner: Arc::new(implementation::OQueueImplementation::new(len, true, None)), } } } @@ -316,7 +328,13 @@ clone_without_t!(OQueueRef, : ?Sized + 'static); impl OQueueRef { /// Create a new observation OQueue with the specified buffer length. pub fn new(len: usize, path: Path) -> Self { - let ret = Self::new_anonymous(len); + let ret = Self { + inner: Arc::new(implementation::OQueueImplementation::new( + len, + false, + Some(path.clone()), + )), + }; registry::register(&path, ret.as_any_oqueue()); ret } @@ -325,7 +343,7 @@ impl OQueueRef { /// when the OQueue should never be discovered for observation such as for ephemeral queues. pub fn new_anonymous(len: usize) -> Self { Self { - inner: Arc::new(implementation::OQueueImplementation::new(len, false)), + inner: Arc::new(implementation::OQueueImplementation::new(len, false, None)), } } } @@ -1160,4 +1178,13 @@ mod test { generic_test::TestMessage, >::new(4, Path::test())); } + + #[ktest] + fn oqueue_path() { + let named = ConsumableOQueueRef::::new(4, Path::test()); + assert_eq!(named.path(), Some(&Path::test())); + + let anon = ConsumableOQueueRef::::new_anonymous(4); + assert_eq!(anon.path(), None); + } } From c85b94e0453f26905efad1b95daa30fe3469323b Mon Sep 17 00:00:00 2001 From: Arthur Peters Date: Tue, 14 Apr 2026 16:34:17 -0500 Subject: [PATCH 3/3] Add block I/O capture --- Cargo.lock | 1 + kernel/comps/block/Cargo.toml | 1 + kernel/comps/block/src/bio.rs | 3 +- kernel/src/lib.rs | 99 +++++++++++++++++++++++++++++++++++ 4 files changed, 103 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 17c0f1d2c..96d38b52d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -101,6 +101,7 @@ dependencies = [ "int-to-c-enum", "log", "ostd", + "serde", "spin", ] diff --git a/kernel/comps/block/Cargo.toml b/kernel/comps/block/Cargo.toml index a0357a340..af3e9cb1d 100644 --- a/kernel/comps/block/Cargo.toml +++ b/kernel/comps/block/Cargo.toml @@ -13,6 +13,7 @@ int-to-c-enum = { path = "../../libs/int-to-c-enum" } component = { path = "../../libs/comp-sys/component" } aster-time = { path = "../time" } log = "0.4" +serde = { version = "1.0", default-features = false, features = ["derive", "alloc"]} bitvec = { version = "1.0.1", default-features = false, features = ["alloc"] } [lints] diff --git a/kernel/comps/block/src/bio.rs b/kernel/comps/block/src/bio.rs index aa546049b..326adb4b7 100644 --- a/kernel/comps/block/src/bio.rs +++ b/kernel/comps/block/src/bio.rs @@ -17,6 +17,7 @@ use ostd::{ }, sync::{LocalIrqDisabled, SpinLock, WaitQueue}, }; +use serde::Serialize; use spin::{Mutex, Once}; use super::{BlockDevice, id::Sid}; @@ -25,7 +26,7 @@ use crate::{BLOCK_SIZE, SECTOR_SIZE, prelude::*, request_queue::BioRequestSingle /// Trace data for block device I/O completion. /// /// This struct captures performance metrics when a block I/O request completes. -#[derive(Clone)] +#[derive(Clone, Copy, Serialize)] pub struct BlockDeviceCompletionStats { /// The latency of the I/O request (time from submission to completion). pub latency: Duration, diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index ba38db9a2..2d91b6d00 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -23,6 +23,7 @@ #![feature(closure_track_caller)] #![register_tool(component_access_control)] +use aster_block::bio::{BlockDeviceCompletionStats, SubmittedBio}; use aster_framebuffer::FRAMEBUFFER_CONSOLE; #[cfg(not(baseline_asterinas))] mod data_capture; @@ -38,6 +39,7 @@ use ostd::{ boot::boot_info, cpu::{CpuId, CpuSet}, ignore_err, + orpc::oqueue::registry::lookup_by_type, task::scheduler::{SchedulingEvent, SchedulingEventKind}, }; #[cfg(not(baseline_asterinas))] @@ -279,6 +281,103 @@ fn init_thread() { } } + if karg + .get_module_arg_by_name("io", "capture_block_io") + .unwrap_or(false) + { + { + // Setup submitted bio recording + let oqueues = lookup_by_type::(); + if !oqueues.is_empty() { + #[derive(Serialize, Clone, Copy)] + struct SubmittedBioEvent { + byte_range: (usize, usize), + timestamp: Option, + context: EventContext, + } + + let capture_file = new_data_capture_file::( + mariposa_data_capture::FileDescriptor { + path: path!(io.block.submitted), + length: 500 * 1024 * 1024, + }, + ); + + for oqueue in oqueues { + let Some(path) = oqueue.path().cloned() else { + log::warn!("Found anonymous SubmittedBio OQueue. Not capturing."); + continue; + }; + ignore_err!( + capture_file.register_observer(ObserverRegistration { + path, + observer: oqueue + .attach_strong_observer(ObservationQuery::new( + |e: &SubmittedBio| { + let sid_range = e.sid_range(); + let context = EventContext::new(); + SubmittedBioEvent { + byte_range: ( + sid_range.start.to_offset(), + sid_range.end.to_offset(), + ), + timestamp: e.submission_time().map(|t| t.into()), + context, + } + }, + )) + .unwrap(), + }) + ); + } + ignore_err!(capture_file.start()); + } + } + + { + // Setup submitted bio recording + let oqueues = lookup_by_type::(); + if !oqueues.is_empty() { + #[derive(Clone, Copy, Serialize)] + struct BlockDeviceCompletionEvent { + stats: BlockDeviceCompletionStats, + context: EventContext, + } + + let capture_file = new_data_capture_file::( + mariposa_data_capture::FileDescriptor { + path: path!(io.block.completion), + length: 500 * 1024 * 1024, + }, + ); + + for oqueue in oqueues { + let Some(path) = oqueue.path().cloned() else { + log::warn!( + "Found anonymous BlockDeviceCompletionStats OQueue. Not capturing." + ); + continue; + }; + ignore_err!( + capture_file.register_observer(ObserverRegistration { + path, + observer: oqueue + .attach_strong_observer(ObservationQuery::new(|stats| { + let context = EventContext::new(); + BlockDeviceCompletionEvent { + stats: *stats, + context, + } + })) + .unwrap(), + }) + ); + } + ignore_err!(capture_file.start()); + } + } + } + // Wait till initproc become zombie. while !initproc.status().is_zombie() { ostd::task::halt_cpu();