diff --git a/Cargo.toml b/Cargo.toml index 3762f87..50b0e0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,10 +21,13 @@ rkyv = { version = "0.7.45", features = ["validation"] } metrics = "0.24.1" [profile.release] -strip = "symbols" +# strip = "symbols" opt-level = 3 -# debug = 1 [profile.dist] inherits = "release" lto = "thin" + +[profile.profiling] +inherits = "release" +debug = true diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index 8b3256e..0000000 --- a/docker-compose.yml +++ /dev/null @@ -1,18 +0,0 @@ -services: - jaeger: - container_name: jaeger - image: jaegertracing/all-in-one:latest - environment: - - COLLECTOR_OTLP_ENABLED=true - - ports: - - "4317:4317" - - "4318:4318" - - "5775:5775/udp" # For Thrift compact protocol - - "6831:6831/udp" # For Jaeger agent - - "6832:6832/udp" # For Thrift binary protocol - - "5778:5778" # For local agent admin - - "16686:16686" # Jaeger UI - - "14268:14268" # For collector - - "14250:14250" # For gRPC - - "9411:9411" # Zipkin compatibility diff --git a/ferrules-api/src/main.rs b/ferrules-api/src/main.rs index d7a0409..9c96f79 100644 --- a/ferrules-api/src/main.rs +++ b/ferrules-api/src/main.rs @@ -33,8 +33,8 @@ const MAX_SIZE_LIMIT: usize = 250 * 1024 * 1024; #[command(author, version, about, long_about = None)] struct Args { /// OpenTelemetry collector endpoint - #[arg(long, env = "OTLP_ENDPOINT", default_value = "http://localhost:4317")] - otlp_endpoint: String, + #[arg(long, env = "OTLP_ENDPOINT")] + otlp_endpoint: Option, /// Sentry DSN #[arg(long, env = "SENTRY_DSN")] @@ -108,6 +108,17 @@ struct Args { #[arg(long, short = 'O', help = "Ort graph optimization level")] graph_opt_level: Option, + + /// Enable profiling for layout model + #[arg(long, help = "Enable profiling for the layout model (saved as .json)")] + profile_layout: bool, + + /// Enable profiling for table transformer model + #[arg( + long, + help = "Enable profiling for the table transformer model (saved as .json)" + )] + profile_table: bool, } fn parse_ep_args(args: &Args) -> Vec { @@ -171,7 +182,7 @@ async fn main() { }; init_tracing( - Some(&args.otlp_endpoint), + args.otlp_endpoint.as_deref(), "ferrules-api".into(), false, use_sentry, @@ -179,7 +190,19 @@ async fn main() { .expect("can't setup tracing for API"); // Initialize Prometheus exporter - let builder = metrics_exporter_prometheus::PrometheusBuilder::new(); + let builder = metrics_exporter_prometheus::PrometheusBuilder::new() + .set_buckets_for_metric( + metrics_exporter_prometheus::Matcher::Suffix("_ms".to_string()), + &[ + 0.0, 1.0, 2.0, 5.0, 10.0, 15.0, 20.0, 25.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, + 90.0, 100.0, 125.0, 150.0, 175.0, 200.0, 250.0, 300.0, 350.0, 400.0, 450.0, 500.0, + 600.0, 700.0, 800.0, 900.0, 1000.0, 1250.0, 1500.0, 1750.0, 2000.0, 2500.0, 3000.0, + 3500.0, 4000.0, 4500.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 15000.0, + 20000.0, 30000.0, 45000.0, 60000.0, 90000.0, 120000.0, 180000.0, 240000.0, + 300000.0, + ], + ) + .expect("failed to set buckets"); let handle = builder .install_recorder() .expect("failed to install Prometheus recorder"); @@ -189,6 +212,17 @@ async fn main() { intra_threads: args.intra_threads, inter_threads: args.inter_threads, opt_level: args.graph_opt_level.map(|v| v.try_into().unwrap()), + warmup: true, + profile_layout: if args.profile_layout { + Some(std::path::PathBuf::from("profile_layout_api")) + } else { + None + }, + profile_table: if args.profile_table { + Some(std::path::PathBuf::from("profile_table_api")) + } else { + None + }, }; // Initialize the layout model and queues let parser = FerrulesParser::new(ort_config); diff --git a/ferrules-cli/src/main.rs b/ferrules-cli/src/main.rs index 685488d..93c3ea3 100644 --- a/ferrules-cli/src/main.rs +++ b/ferrules-cli/src/main.rs @@ -143,6 +143,17 @@ struct Args { help = "Specify the directory to store debug output files" )] debug_dir: Option, + + /// Enable profiling for layout model + #[arg(long, help = "Enable profiling for the layout model (saved as .json)")] + profile_layout: bool, + + /// Enable profiling for table transformer model + #[arg( + long, + help = "Enable profiling for the table transformer model (saved as .json)" + )] + profile_table: bool, } fn parse_page_range(range_str: &str) -> anyhow::Result> { @@ -241,6 +252,17 @@ async fn main() { intra_threads: args.intra_threads, inter_threads: args.inter_threads, opt_level: args.graph_opt_level.map(|v| v.try_into().unwrap()), + warmup: false, + profile_layout: if args.profile_layout { + Some(PathBuf::from("profile_layout")) + } else { + None + }, + profile_table: if args.profile_table { + Some(PathBuf::from("profile_table")) + } else { + None + }, }; let page_range = match args.page_range { @@ -471,6 +493,20 @@ async fn main() { ], ); } + ferrules_core::error::FerrulesError::OcrError(e) => { + format_error( + "OCR Extraction Failed", + "Failed to extract text using OCR.", + vec![ + ("Error", e), + ("File", args.file_path.display().to_string()), + ( + "Suggestion", + "This might indicate an issue with Apple Vision or stitched image size".to_string(), + ), + ], + ); + } } std::process::exit(1); } diff --git a/ferrules-core/src/error.rs b/ferrules-core/src/error.rs index 2a4f183..6eb4561 100644 --- a/ferrules-core/src/error.rs +++ b/ferrules-core/src/error.rs @@ -29,4 +29,6 @@ pub enum FerrulesError { TableTransformerModelError(String), #[error("table parser error: {0}")] TableParserError(String), + #[error("ocr parser error: {0}")] + OcrError(String), } diff --git a/ferrules-core/src/layout/mod.rs b/ferrules-core/src/layout/mod.rs index ed43bbb..3532373 100644 --- a/ferrules-core/src/layout/mod.rs +++ b/ferrules-core/src/layout/mod.rs @@ -12,6 +12,8 @@ use crate::metrics::StepMetrics; pub mod model; +const CONCURRENT_LAYOUT_REQUESTS: usize = 16; + #[derive(Debug)] pub struct Metadata { pub(crate) response_tx: oneshot::Sender>, @@ -28,7 +30,7 @@ pub(crate) struct ParseLayoutRequest { #[derive(Debug)] pub(crate) struct ParseLayoutResponse { - pub(crate) page_id: PageID, + pub(crate) _page_id: PageID, pub(crate) layout_bbox: Vec, pub(crate) step_metrics: StepMetrics, } @@ -53,7 +55,7 @@ impl ParseLayoutQueue { self.queue .send((req, span)) .await - .map_err(|_| FerrulesError::LayoutParsingError) + .map_err(|_| FerrulesError::LayoutParsingError) // We keep LayoutParsingError for layout itself, but we can add more context later if needed. } } @@ -61,9 +63,9 @@ async fn start_layout_parser( layout_parser: Arc, mut input_rx: Receiver<(ParseLayoutRequest, Span)>, ) { - let s = Arc::new(Semaphore::new(layout_parser.config.intra_threads)); + let s = Arc::new(Semaphore::new(CONCURRENT_LAYOUT_REQUESTS)); while let Some((req, span)) = input_rx.recv().await { - let queue_time = req.metadata.queue_time.elapsed().as_millis(); + let queue_time = req.metadata.queue_time.elapsed().as_secs_f64() * 1000.0; let page_id = req.page_id; tracing::debug!("layout request queue time for page {page_id} took: {queue_time}ms"); let _guard = span.enter(); @@ -78,11 +80,11 @@ async fn handle_request( s: Arc, parser: Arc, req: ParseLayoutRequest, - layout_queue_time_ms: u128, + layout_queue_time_ms: f64, ) { let start_wait = Instant::now(); let _permit = s.acquire().await.unwrap(); - let idle_time_ms = start_wait.elapsed().as_millis(); + let idle_time_ms = start_wait.elapsed().as_secs_f64() * 1000.0; let ParseLayoutRequest { page_id, @@ -95,12 +97,12 @@ async fn handle_request( let layout_result = parser .parse_layout_async(&page_image, downscale_factor) .await; - let inference_duration = start.elapsed().as_millis(); + let inference_duration = start.elapsed().as_secs_f64() * 1000.0; drop(_permit); tracing::debug!("layout inference time for page {page_id} took: {inference_duration}ms"); let layout_result = layout_result.map(|l| ParseLayoutResponse { - page_id, + _page_id: page_id, layout_bbox: l, step_metrics: StepMetrics { queue_time_ms: layout_queue_time_ms, @@ -108,8 +110,9 @@ async fn handle_request( idle_time_ms, }, }); - metadata - .response_tx - .send(layout_result) - .expect("can't send parsed result over oneshot chan"); + if let Err(e) = layout_result.as_ref() { + tracing::error!("Layout parsing failed for page {page_id}: {:?}", e); + } + + let _ = metadata.response_tx.send(layout_result); } diff --git a/ferrules-core/src/layout/model.rs b/ferrules-core/src/layout/model.rs index 3cd8b09..e533a03 100644 --- a/ferrules-core/src/layout/model.rs +++ b/ferrules-core/src/layout/model.rs @@ -10,6 +10,7 @@ use ort::{ session::{builder::GraphOptimizationLevel, Session}, }; use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize}; +use std::sync::Mutex; use crate::entities::BBox; @@ -41,6 +42,29 @@ pub struct ORTConfig { pub intra_threads: usize, pub inter_threads: usize, pub opt_level: Option, + pub warmup: bool, + pub profile_layout: Option, + pub profile_table: Option, +} + +impl ORTConfig { + /// Returns a new vector of execution providers sorted by priority (accelerators first). + pub fn get_sorted_providers(&self) -> Vec { + let mut providers = self.execution_providers.clone(); + providers.sort_by(|a, b| { + let priority = |p: &OrtExecutionProvider| -> u8 { + match p { + OrtExecutionProvider::Trt(_) => 4, + OrtExecutionProvider::CUDA(_) => 3, + OrtExecutionProvider::CoreML { .. } => 2, + OrtExecutionProvider::CPU => 1, + } + }; + // Sort in descending order of priority (higher priority comes first) + priority(b).cmp(&priority(a)) + }); + providers + } } #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] @@ -55,13 +79,16 @@ impl Default for ORTConfig { fn default() -> Self { let mut execution_providers = vec![OrtExecutionProvider::CPU]; if cfg!(target_os = "macos") { - execution_providers.push(OrtExecutionProvider::CoreML { ane_only: false }); + execution_providers.push(OrtExecutionProvider::CoreML { ane_only: true }); } Self { execution_providers, intra_threads: ORTLayoutParser::ORT_INTRATHREAD, inter_threads: ORTLayoutParser::ORT_INTERTHREAD, opt_level: Some(ORTGraphOptimizationLevel::Level1), + warmup: false, + profile_layout: None, + profile_table: None, } } } @@ -110,6 +137,7 @@ pub struct ORTLayoutParser { session: Session, output_name: String, pub config: ORTConfig, + buffer_pool: Mutex>>, } impl ORTLayoutParser { @@ -120,19 +148,21 @@ impl ORTLayoutParser { bbox_rescale_factor: f32, ) -> anyhow::Result> { let (img_width, img_height) = (page_img.width(), page_img.height()); - let input = self.preprocess(page_img); - let output_tensor = self.run_async(input).await?; + let mut input = self.acquire_buffer(); + self.preprocess_into(page_img, &mut input); + let output_tensor = self.run_async(&input).await?; + self.release_buffer(input); let mut bboxes = self.extract_bboxes(output_tensor, img_width, img_height, bbox_rescale_factor); nms(&mut bboxes, Self::IOU_THRESHOLD); Ok(bboxes) } - async fn run_async( + pub async fn run_async( &self, - input: ArrayBase, Dim<[usize; 4]>>, + input: &Array4, ) -> anyhow::Result, Dim<[usize; 3]>>> { - let outputs = &self.session.run_async(ort::inputs![input]?)?.await?; + let outputs = &self.session.run_async(ort::inputs![input.view()]?)?.await?; let output_tensor = outputs .get(&self.output_name) @@ -146,6 +176,28 @@ impl ORTLayoutParser { Ok(output_tensor) } + + #[tracing::instrument(skip_all)] + pub async fn run_batch_async( + &self, + input: Array4, + ) -> anyhow::Result> { + let batch_size = input.dim().0; + let outputs = &self.session.run_async(ort::inputs![input]?)?.await?; + + let output_tensor = outputs + .get(&self.output_name) + .context("can't get the value of first output")? + .try_extract_tensor::()?; + + // Adjust output shape to [batch_size, classes + bbox, candidate_boxes] + let output_tensor = output_tensor + .to_shape([batch_size, 15, 21504]) + .unwrap() + .to_owned(); + + Ok(output_tensor) + } } impl ORTLayoutParser { @@ -169,12 +221,11 @@ impl ORTLayoutParser { pub const ORT_INTRATHREAD: usize = 16; pub const ORT_INTERTHREAD: usize = 4; - pub fn new(mut config: ORTConfig) -> anyhow::Result { + pub fn new(config: ORTConfig) -> anyhow::Result { let mut execution_providers = Vec::new(); - // Sort providers by priority cpu -> cuda -> coreml - let providers = &mut config.execution_providers; - providers.sort(); + // Get providers sorted by priority: accelerators first + let providers = config.get_sorted_providers(); // Providers for provider in providers { @@ -182,20 +233,20 @@ impl ORTLayoutParser { OrtExecutionProvider::Trt(device_id) => { execution_providers.push( TensorRTExecutionProvider::default() - .with_device_id(*device_id) + .with_device_id(device_id) .build(), ); } OrtExecutionProvider::CUDA(device_id) => { execution_providers.push( CUDAExecutionProvider::default() - .with_device_id(*device_id) + .with_device_id(device_id) .build(), ); } OrtExecutionProvider::CoreML { ane_only } => { let provider = CoreMLExecutionProvider::default(); - let provider = if *ane_only { + let provider = if ane_only { provider.with_ane_only().build() } else { provider.build() @@ -215,12 +266,17 @@ impl ORTLayoutParser { None => GraphOptimizationLevel::Disable, }; - let session = Session::builder()? + let mut builder = Session::builder()? .with_execution_providers(execution_providers)? .with_optimization_level(opt_lvl)? .with_intra_threads(config.intra_threads)? - .with_inter_threads(config.inter_threads)? - .commit_from_memory(LAYOUT_MODEL_BYTES)?; + .with_inter_threads(config.inter_threads)?; + + if let Some(profile_path) = &config.profile_layout { + builder = builder.with_profiling(profile_path)?; + } + + let session = builder.commit_from_memory(LAYOUT_MODEL_BYTES)?; let output_name = session .outputs @@ -229,18 +285,40 @@ impl ORTLayoutParser { .context("can't find name output input")? .to_owned(); - Ok(Self { + let parser = Self { session, output_name, config, - }) + // TODO: use ticket mutex instead of buffer pool to access resources + buffer_pool: Mutex::new(Vec::with_capacity(32)), + }; + + if parser.config.warmup { + parser.warmup().context("Model warmup failed")?; + } + + Ok(parser) + } + + #[tracing::instrument(skip(self))] + fn warmup(&self) -> anyhow::Result<()> { + let input = Array4::zeros([ + 1, + 3, + Self::REQUIRED_HEIGHT as usize, + Self::REQUIRED_WIDTH as usize, + ]); + // We use the sync run method for warmup during initialization + let _ = self.run(&input)?; + tracing::info!("Layout model warmup complete"); + Ok(()) } pub fn run( &self, - input: ArrayBase, Dim<[usize; 4]>>, + input: &Array4, ) -> anyhow::Result, Dim<[usize; 3]>>> { - let outputs = &self.session.run(ort::inputs![input]?)?; + let outputs = &self.session.run(ort::inputs![input.view()]?)?; let output_tensor = outputs .get(&self.output_name) @@ -255,14 +333,34 @@ impl ORTLayoutParser { Ok(output_tensor) } + #[tracing::instrument(skip_all)] + pub fn run_batch(&self, input: Array4) -> anyhow::Result> { + let batch_size = input.dim().0; + let outputs = &self.session.run(ort::inputs![input]?)?; + + let output_tensor = outputs + .get(&self.output_name) + .context("can't get the value of first output")? + .try_extract_tensor::()?; + + let output_tensor = output_tensor + .to_shape([batch_size, 15, 21504]) + .unwrap() + .to_owned(); + + Ok(output_tensor) + } + pub fn parse_layout( &self, page_img: &DynamicImage, bbox_rescale_factor: f32, ) -> anyhow::Result> { let (img_width, img_height) = (page_img.width(), page_img.height()); - let input = self.preprocess(page_img); - let output_tensor = self.run(input)?; + let mut input = self.acquire_buffer(); + self.preprocess_into(page_img, &mut input); + let output_tensor = self.run(&input)?; + self.release_buffer(input); let mut bboxes = self.extract_bboxes(output_tensor, img_width, img_height, bbox_rescale_factor); nms(&mut bboxes, Self::IOU_THRESHOLD); @@ -293,6 +391,15 @@ impl ORTLayoutParser { .max_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)) .unwrap(); + if proba.is_nan() { + tracing::warn!( + "Found NaN probability for label {} at idx {}", + ID2LABEL[max_prob_idx], + bbox_id + ); + continue; + } + if proba < Self::CONF_THRESHOLD { continue; } @@ -338,7 +445,27 @@ impl ORTLayoutParser { (r, (w0 * r).round(), (h0 * r).round()) } - fn _preprocess_batch(&self, batch_imgs: &[DynamicImage]) -> Array4 { + fn acquire_buffer(&self) -> Array4 { + let mut pool = self.buffer_pool.lock().expect("buffer pool lock poisoned"); + pool.pop().unwrap_or_else(|| { + Array4::ones([ + 1, + 3, + Self::REQUIRED_HEIGHT as usize, + Self::REQUIRED_WIDTH as usize, + ]) + }) + } + + fn release_buffer(&self, buffer: Array4) { + let mut pool = self.buffer_pool.lock().expect("buffer pool lock poisoned"); + if pool.len() < 32 { + pool.push(buffer); + } + } + + #[tracing::instrument(skip_all)] + pub fn preprocess_batch(&self, batch_imgs: &[DynamicImage]) -> Array4 { let (w0, h0) = batch_imgs.first().unwrap().dimensions(); let (_, w_new, h_new) = self.scale_wh( w0 as f32, @@ -358,10 +485,11 @@ impl ORTLayoutParser { for (idx, img) in batch_imgs.iter().enumerate() { let resized_img = img.resize_exact(w_new as u32, h_new as u32, FilterType::Triangle); - for (x, y, pixel) in resized_img.pixels() { + let rgb = resized_img.to_rgb8(); + for (x, y, pixel) in rgb.enumerate_pixels() { let x = x as usize; let y = y as _; - let [r, g, b, _] = pixel.0; + let [r, g, b] = pixel.0; input_tensor[[idx, 0, y, x]] = r as f32 / 255.0; input_tensor[[idx, 1, y, x]] = g as f32 / 255.0; input_tensor[[idx, 2, y, x]] = b as f32 / 255.0; @@ -371,38 +499,45 @@ impl ORTLayoutParser { input_tensor } - fn preprocess(&self, img: &DynamicImage) -> Array4 { + #[tracing::instrument(skip_all)] + pub fn preprocess(&self, img: &DynamicImage) -> Array4 { + let mut input_tensor = self.acquire_buffer(); + self.preprocess_into(img, &mut input_tensor); + input_tensor + } + + #[tracing::instrument(skip_all)] + pub fn preprocess_into(&self, img: &DynamicImage, input_tensor: &mut Array4) { let (w0, h0) = img.dimensions(); let (_, w_new, h_new) = self.scale_wh( w0 as f32, h0 as f32, Self::REQUIRED_WIDTH as f32, Self::REQUIRED_HEIGHT as f32, - ); // f32 round + ); let resized_img = img.resize_exact(w_new as u32, h_new as u32, FilterType::Triangle); - // TODO: reuse this buffer between batches - let mut input_tensor = Array4::ones([ - 1, - 3, - Self::REQUIRED_HEIGHT as usize, - Self::REQUIRED_WIDTH as usize, - ]); + input_tensor.fill(144.0 / 255.0); - for (x, y, pixel) in resized_img.pixels() { + + let rgb = resized_img.to_rgb8(); + for (x, y, pixel) in rgb.enumerate_pixels() { let x = x as usize; let y = y as _; - let [r, g, b, _] = pixel.0; + let [r, g, b] = pixel.0; input_tensor[[0, 0, y, x]] = r as f32 / 255.0; input_tensor[[0, 1, y, x]] = g as f32 / 255.0; input_tensor[[0, 2, y, x]] = b as f32 / 255.0; } - input_tensor } } /// runs nms on without taking into account which class pub(crate) fn nms(raw_bboxes: &mut Vec, iou_threshold: f32) { - raw_bboxes.sort_by(|r1, r2| r2.proba.partial_cmp(&r1.proba).unwrap()); + raw_bboxes.sort_by(|r1, r2| { + r2.proba + .partial_cmp(&r1.proba) + .unwrap_or(std::cmp::Ordering::Equal) + }); let mut current_index = 0; for index in 0..raw_bboxes.len() { let mut drop = false; diff --git a/ferrules-core/src/lib.rs b/ferrules-core/src/lib.rs index dc6d2c1..7e851aa 100644 --- a/ferrules-core/src/lib.rs +++ b/ferrules-core/src/lib.rs @@ -30,9 +30,10 @@ //! // Configure hardware acceleration //! let ort_config = ORTConfig { //! execution_providers: vec![OrtExecutionProvider::CPU], -//! intra_threads: 16, +//! intra_threads: 4 //! inter_threads: 4, //! opt_level: None, +//! warmup: false, //! }; //! //! // Initialize parser diff --git a/ferrules-core/src/metrics.rs b/ferrules-core/src/metrics.rs index b602c41..a5b8382 100644 --- a/ferrules-core/src/metrics.rs +++ b/ferrules-core/src/metrics.rs @@ -1,22 +1,24 @@ use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize}; use serde::{Deserialize, Serialize}; +use crate::blocks::TableAlgorithm; + #[derive( Debug, Clone, Default, Serialize, Deserialize, Archive, RkyvDeserialize, RkyvSerialize, )] pub struct StepMetrics { - pub queue_time_ms: u128, - pub execution_time_ms: u128, - pub idle_time_ms: u128, // Time spent waiting for resources (e.g., semaphore) + pub queue_time_ms: f64, + pub execution_time_ms: f64, + pub idle_time_ms: f64, // Time spent waiting for resources (e.g., semaphore) } impl StepMetrics { - pub(crate) fn new(execution_time_ms: u128) -> Self { + pub(crate) fn new(execution_time_ms: f64) -> Self { Self { execution_time_ms, // Not applicable for native parsing it is the first step always - queue_time_ms: 0, - idle_time_ms: 0, + queue_time_ms: 0.0, + idle_time_ms: 0.0, } } } @@ -29,22 +31,36 @@ pub struct OCRMetrics { pub lines_count: usize, } +#[derive( + Debug, Clone, Default, Serialize, Deserialize, Archive, RkyvDeserialize, RkyvSerialize, +)] +pub struct TableMetrics { + pub step_metrics: StepMetrics, + pub algorithm: TableAlgorithm, +} + #[derive( Debug, Clone, Default, Serialize, Deserialize, Archive, RkyvDeserialize, RkyvSerialize, )] pub struct PageMetrics { pub page_id: usize, - pub total_duration_ms: u128, + pub total_duration_ms: f64, pub native_step: StepMetrics, pub layout_step: StepMetrics, - pub table_step: StepMetrics, + pub table_steps: Vec, pub ocr_step: Option, } impl PageMetrics { #[cfg(feature = "metrics")] pub fn record(&self) { - metrics::histogram!("page_processing_duration_ms").record(self.total_duration_ms as f64); + let ocr_label = if self.ocr_step.is_some() { + "true" + } else { + "false" + }; + metrics::histogram!("page_processing_duration_ms", "ocr" => ocr_label) + .record(self.total_duration_ms as f64); metrics::histogram!("layout_execution_time_ms") .record(self.layout_step.execution_time_ms as f64); @@ -54,10 +70,22 @@ impl PageMetrics { metrics::histogram!("native_execution_time_ms") .record(self.native_step.execution_time_ms as f64); - if self.table_step.execution_time_ms > 0 { + for table in &self.table_steps { + let algo_str = match table.algorithm { + TableAlgorithm::Lattice => "lattice", + TableAlgorithm::Stream => "stream", + TableAlgorithm::Vision => "vision", + TableAlgorithm::Unknown => "unknown", + }; + + metrics::histogram!("table_execution_time_ms", "method" => algo_str) + .record(table.step_metrics.execution_time_ms as f64); + metrics::histogram!("table_queue_time_ms", "method" => algo_str) + .record(table.step_metrics.queue_time_ms as f64); + + // Still record global table metrics if needed, or just let prometheus aggregate metrics::histogram!("table_execution_time_ms") - .record(self.table_step.execution_time_ms as f64); - metrics::histogram!("table_queue_time_ms").record(self.table_step.queue_time_ms as f64); + .record(table.step_metrics.execution_time_ms as f64); } if let Some(ocr) = &self.ocr_step { @@ -72,6 +100,7 @@ impl PageMetrics { pub fn record_span(&self, span: &tracing::Span) { span.record("layout_queue_time_ms", self.layout_step.queue_time_ms); + span.record("layout_idle_time_ms", self.layout_step.idle_time_ms); span.record( "layout_parse_duration_ms", self.layout_step.execution_time_ms, @@ -82,14 +111,26 @@ impl PageMetrics { self.native_step.execution_time_ms, ); if let Some(ocr_metrics) = &self.ocr_step { - span.record("ocr_queue_time_ms", ocr_metrics.step_metrics.queue_time_ms); + span.record("ocr_idle_time_ms", ocr_metrics.step_metrics.idle_time_ms); span.record( "ocr_parse_duration_ms", ocr_metrics.step_metrics.execution_time_ms, ); } - span.record("table_parse_duration_ms", self.table_step.execution_time_ms); - span.record("table_queue_time_ms", self.table_step.queue_time_ms); + + let total_table_duration: f64 = self + .table_steps + .iter() + .map(|t| t.step_metrics.execution_time_ms) + .sum(); + let total_table_queue: f64 = self + .table_steps + .iter() + .map(|t| t.step_metrics.queue_time_ms) + .sum(); + + span.record("table_parse_duration_ms", total_table_duration); + span.record("table_queue_time_ms", total_table_queue); } } @@ -97,6 +138,6 @@ impl PageMetrics { Debug, Clone, Default, Serialize, Deserialize, Archive, RkyvDeserialize, RkyvSerialize, )] pub struct ParsingMetrics { - pub total_duration_ms: u128, + pub total_duration_ms: f64, pub pages: Vec, } diff --git a/ferrules-core/src/ocr/mod.rs b/ferrules-core/src/ocr/mod.rs index 5d14a9e..0025093 100644 --- a/ferrules-core/src/ocr/mod.rs +++ b/ferrules-core/src/ocr/mod.rs @@ -1,26 +1,235 @@ use image::DynamicImage; -use lazy_static::lazy_static; use std::path::PathBuf; +use std::sync::Arc; use std::time::Instant; -use tokio::sync::Semaphore; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::sync::{oneshot, Semaphore}; +use tracing::{Instrument, Span}; -lazy_static! { - /// Global semaphore to limit concurrent OCR tasks. - /// This prevents flooding the blocking thread pool and potential ANE/GPU stalls on macOS. - pub static ref OCR_SEMAPHORE: Semaphore = Semaphore::new(32); -} - -use crate::entities::{BBox, Line}; +use crate::entities::{BBox, Line, PageID}; use crate::error::FerrulesError; use crate::metrics::StepMetrics; +const CONCURRENT_OCR_REQUESTS: usize = 32; +const MAX_OCR_BATCH_SIZE: usize = 16; +const OCR_BATCH_TIMEOUT_MS: u64 = 100; + +#[derive(Debug)] +pub struct OCRMetadata { + pub(crate) response_tx: oneshot::Sender>, + pub(crate) queue_time: Instant, +} + +#[derive(Debug)] +pub(crate) struct ParseOCRRequest { + pub(crate) page_id: PageID, + pub(crate) page_image: Arc, + pub(crate) rescale_factor: f32, + pub(crate) metadata: OCRMetadata, +} + +#[derive(Debug)] +pub(crate) struct ParseOCRResponse { + pub(crate) ocr_lines: Vec, + pub(crate) step_metrics: StepMetrics, +} + +#[derive(Debug, Clone)] +pub struct OCRQueue { + queue: Sender<(ParseOCRRequest, Span)>, +} + +impl OCRQueue { + pub fn new(ocr_parser: Arc) -> Self { + let (queue_sender, queue_receiver) = mpsc::channel(128); // Larger buffer for OCR requests + + tokio::task::spawn(start_ocr_parser(ocr_parser, queue_receiver)); + Self { + queue: queue_sender, + } + } + + pub(crate) async fn push(&self, req: ParseOCRRequest) -> Result<(), FerrulesError> { + let span = Span::current(); + self.queue + .send((req, span)) + .await + .map_err(|e| FerrulesError::OcrError(format!("OCR queue send error: {}", e))) + } +} + +async fn start_ocr_parser( + ocr_parser: Arc, + mut input_rx: Receiver<(ParseOCRRequest, Span)>, +) { + let s = Arc::new(Semaphore::new(CONCURRENT_OCR_REQUESTS)); + while let Some((req, span)) = input_rx.recv().await { + let queue_time = req.metadata.queue_time.elapsed().as_secs_f64() * 1000.0; + let page_id = req.page_id; + tracing::debug!("ocr request queue time for page {page_id} took: {queue_time}ms"); + tokio::spawn( + handle_ocr_request(s.clone(), ocr_parser.clone(), req, queue_time).instrument(span), + ); + } +} + +async fn handle_ocr_request( + s: Arc, + parser: Arc, + req: ParseOCRRequest, + ocr_queue_time_ms: f64, +) { + let start_wait = Instant::now(); + let _permit = s.acquire().await.unwrap(); + let idle_time_ms = start_wait.elapsed().as_secs_f64() * 1000.0; + + let ParseOCRRequest { + page_id, + page_image, + rescale_factor, + metadata, + } = req; + + let start = Instant::now(); + let (tx, rx) = oneshot::channel(); + let _ = parser + .inference_tx + .send(OCRInferenceRequest { + image: page_image, + rescale_factor, + response_tx: tx, + }) + .await; + + let ocr_result = rx.await.unwrap_or(Err(FerrulesError::OcrError( + "OCR channel closed".to_string(), + ))); + let execution_time_ms = start.elapsed().as_secs_f64() * 1000.0; + drop(_permit); + + tracing::debug!("ocr inference time for page {page_id} took: {execution_time_ms}ms"); + + let response = ocr_result.map(|ocr_lines| ParseOCRResponse { + ocr_lines, + step_metrics: StepMetrics { + queue_time_ms: ocr_queue_time_ms, + execution_time_ms, + idle_time_ms, + }, + }); + + let _ = metadata.response_tx.send(response); +} + +struct OCRInferenceRequest { + image: Arc, + rescale_factor: f32, + response_tx: oneshot::Sender, FerrulesError>>, +} + +struct BatchOCRRunner { + rx: Receiver, +} + +impl BatchOCRRunner { + async fn run(mut self) { + let mut batch = Vec::with_capacity(MAX_OCR_BATCH_SIZE); + + loop { + let first_req = match self.rx.recv().await { + Some(req) => req, + None => break, + }; + batch.push(first_req); + + let deadline = tokio::time::Instant::now() + + std::time::Duration::from_millis(OCR_BATCH_TIMEOUT_MS); + + while batch.len() < MAX_OCR_BATCH_SIZE { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + if remaining.is_zero() { + break; + } + match tokio::time::timeout(remaining, self.rx.recv()).await { + Ok(Some(req)) => batch.push(req), + Ok(None) => break, + Err(_) => break, + } + } + + if batch.is_empty() { + continue; + } + + let batch_size = batch.len(); + tracing::debug!("Processing OCR batch of size {}", batch_size); + + let mut images = Vec::with_capacity(batch_size); + let mut restxs = Vec::with_capacity(batch_size); + + for req in batch.drain(..) { + images.push((req.image, req.rescale_factor)); + restxs.push(req.response_tx); + } + + let results = tokio::task::spawn_blocking(move || parse_images_ocr_batch(images)) + .await + .unwrap_or_else(|e| { + tracing::error!("OCR Batch Task Panicked: {:?}", e); + let mut errs = Vec::with_capacity(batch_size); + for _ in 0..batch_size { + errs.push(Err(anyhow::anyhow!("OCR Batch Panic: {:?}", e))); + } + errs + }); + + for (tx, res) in restxs.into_iter().zip(results) { + let _ = tx.send(res.map_err(|e| FerrulesError::OcrError(e.to_string()))); + } + } + } +} + +#[derive(Debug, Clone)] +pub struct OCRParser { + inference_tx: Sender, +} + +impl OCRParser { + pub fn new() -> Self { + let (tx, rx) = mpsc::channel(256); + let runner = BatchOCRRunner { rx }; + tokio::spawn(runner.run()); + Self { inference_tx: tx } + } + + pub async fn parse( + &self, + image: &DynamicImage, + rescale_factor: f32, + ) -> Result, FerrulesError> { + let (tx, rx) = oneshot::channel(); + let _ = self + .inference_tx + .send(OCRInferenceRequest { + image: Arc::new(image.clone()), + rescale_factor, + response_tx: tx, + }) + .await; + rx.await.unwrap_or(Err(FerrulesError::OcrError( + "OCR channel closed".to_string(), + ))) + } +} + #[cfg(target_os = "linux")] -use ocr_linux::parse_image_ocr as parse_image_ocr_inner; +use ocr_linux::{parse_images_ocr_batch, parse_single_image_ocr}; #[cfg(target_os = "macos")] -use ocr_mac::parse_image_ocr as parse_image_ocr_inner; +use ocr_mac::{parse_images_ocr_batch, parse_single_image_ocr}; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct OCRLines { pub text: String, pub confidence: f32, @@ -44,14 +253,14 @@ pub async fn parse_image_ocr( rescale_factor: f32, ) -> Result<(Vec, StepMetrics), FerrulesError> { let start = Instant::now(); - let ocr_result = parse_image_ocr_inner(image, rescale_factor) - .map_err(|_| FerrulesError::LayoutParsingError)?; // TODO: Add specific OCR error variant - let execution_time_ms = start.elapsed().as_millis(); + let ocr_result = parse_single_image_ocr(image, rescale_factor) + .map_err(|e| FerrulesError::OcrError(format!("OCR execution error: {}", e)))?; + let execution_time_ms = start.elapsed().as_secs_f64() * 1000.0; let step_metrics = StepMetrics { - queue_time_ms: 0, + queue_time_ms: 0.0, execution_time_ms, - idle_time_ms: 0, + idle_time_ms: 0.0, }; Ok((ocr_result, step_metrics)) @@ -63,7 +272,6 @@ mod ocr_mac { use objc2::ClassType; use objc2_foundation::{CGRect, NSArray, NSData, NSDictionary}; use objc2_vision::{VNImageRequestHandler, VNRecognizeTextRequest, VNRequest}; - use std::io::Cursor; const CONFIDENCE_THRESHOLD: f32 = 0f32; /// Convert vision coordinates to Bbox absolute coordinates @@ -80,15 +288,27 @@ mod ocr_mac { let bw = bbox.size.width as f32; let bh = bbox.size.height as f32; - let x0 = bx0 * img_width as f32; - let y1 = (1f32 - by0) * (img_height as f32); - let x1 = x0 + bw * (img_width as f32); - let y0 = y1 - bh * (img_height as f32); + let x0 = (bx0 * img_width as f32).clamp(0.0, img_width as f32); + let y1 = ((1f32 - by0) * (img_height as f32)).clamp(0.0, img_height as f32); + let x1 = (x0 + bw * (img_width as f32)).clamp(0.0, img_width as f32); + let y0 = (y1 - bh * (img_height as f32)).clamp(0.0, img_height as f32); - assert!(x0 < x1); - assert!(y0 < y1); - assert!(x1 < img_width as f32); - assert!(y1 < img_height as f32); + if x1 <= x0 || y1 <= y0 { + // Return a minimal valid box if it's too small, rather than crashing + tracing::warn!( + "Vision returned invalid or zero-size bbox: [{}, {}, {}, {}]", + x0, + y0, + x1, + y1 + ); + return BBox { + x0: x0 * downscale_factor, + y0: y0 * downscale_factor, + x1: (x0 + 1.0) * downscale_factor, + y1: (y0 + 1.0) * downscale_factor, + }; + } BBox { x0: x0 * downscale_factor, @@ -98,26 +318,142 @@ mod ocr_mac { } } - pub(super) fn parse_image_ocr( + pub(super) fn parse_images_ocr_batch( + inputs: Vec<(Arc, f32)>, + ) -> Vec>> { + if inputs.is_empty() { + return vec![]; + } + + if inputs.len() == 1 { + let (image, rescale_factor) = inputs.into_iter().next().unwrap(); + return vec![parse_single_image_ocr(&image, rescale_factor)]; + } + + // Stitching logic for true batching + let mut total_height = 0u32; + let mut max_width = 0u32; + for (img, _) in &inputs { + total_height += img.height(); + max_width = max_width.max(img.width()); + } + + let mut combined_image = image::ImageBuffer::new(max_width, total_height); + let mut offsets = Vec::with_capacity(inputs.len()); + let mut current_y = 0u32; + + for (img, _) in &inputs { + offsets.push(current_y); + image::imageops::overlay(&mut combined_image, img.as_ref(), 0, current_y as i64); + current_y += img.height(); + } + + let combined_image = DynamicImage::ImageRgba8(combined_image); + let mut buffer = std::io::Cursor::new(Vec::new()); + if let Err(e) = combined_image.write_to(&mut buffer, image::ImageFormat::Tiff) { + let mut errs = Vec::with_capacity(inputs.len()); + for _ in 0..inputs.len() { + errs.push(Err(anyhow::anyhow!(e.to_string()))); + } + return errs; + } + let raw_data = buffer.into_inner(); + + let mut final_results = vec![Vec::new(); inputs.len()]; + + unsafe { + let mut requests = Vec::with_capacity(inputs.len()); + for (i, (_, _)) in inputs.iter().enumerate() { + let request = VNRecognizeTextRequest::new(); + request.setRecognitionLevel(objc2_vision::VNRequestTextRecognitionLevel::Accurate); + request.setUsesLanguageCorrection(true); + + // Set Region Of Interest for this specific image in the strip + let y0 = offsets[i] as f64 / total_height as f64; + let h = inputs[i].0.height() as f64 / total_height as f64; + // Vision ROI is [x, y, w, h] in normalized coords (0,0 is bottom-left) + // Since we stitched top-to-bottom, we need to flip Y + let roi_y = (1.0 - y0 - h).clamp(0.0, 1.0); + let h = h.clamp(0.0, 1.0 - roi_y); + request.setRegionOfInterest(objc2_foundation::CGRect { + origin: objc2_foundation::CGPoint { x: 0.0, y: roi_y }, + size: objc2_foundation::CGSize { + width: 1.0, + height: h, + }, + }); + + requests.push(request); + } + + let handler = VNImageRequestHandler::initWithData_options( + VNImageRequestHandler::alloc(), + &NSData::with_bytes(&raw_data), + &NSDictionary::new(), + ); + + let v_requests: Vec<&VNRequest> = + requests.iter().map(|r| r.as_ref() as &VNRequest).collect(); + let ns_requests = NSArray::from_slice(&v_requests); + if let Err(e) = handler.performRequests_error(&ns_requests) { + let mut errs = Vec::with_capacity(inputs.len()); + for _ in 0..inputs.len() { + errs.push(Err(anyhow::anyhow!(e.to_string()))); + } + return errs; + } + + for (i, request) in requests.iter().enumerate() { + let rescale_factor = inputs[i].1; + let img_width = inputs[i].0.width(); + let img_height = inputs[i].0.height(); + + if let Some(result) = request.results() { + for recognized_text_region in result.to_vec() { + if (*recognized_text_region).confidence() > CONFIDENCE_THRESHOLD { + if let Some(rec_text) = recognized_text_region.topCandidates(1).first() + { + let bbox = (*recognized_text_region).boundingBox(); + // Note: bbox from Vision here is RELATIVE to ROI if we use ROI correctly? + // Actually, Vision bboxes are typically relative to the WHOLE image if ROI is set on request? + let bbox = + cgrect_to_bbox(&bbox, img_width, img_height, rescale_factor); + final_results[i].push(OCRLines { + text: rec_text.string().to_string(), + confidence: rec_text.confidence(), + bbox, + }) + } + } + } + } + } + } + + final_results.into_iter().map(Ok).collect() + } + + pub(super) fn parse_single_image_ocr( image: &DynamicImage, rescale_factor: f32, ) -> anyhow::Result> { let (img_width, img_height) = (image.width(), image.height()); - let mut buffer: Cursor> = Cursor::new(Vec::new()); - image.write_to(&mut buffer, image::ImageFormat::Png)?; + let mut buffer = std::io::Cursor::new(Vec::new()); + image.write_to(&mut buffer, image::ImageFormat::Tiff)?; + let raw_data = buffer.into_inner(); let mut ocr_result = Vec::new(); unsafe { let request = VNRecognizeTextRequest::new(); request.setRecognitionLevel(objc2_vision::VNRequestTextRecognitionLevel::Accurate); - // TODO set the languages array request.setUsesLanguageCorrection(true); let handler = VNImageRequestHandler::initWithData_options( VNImageRequestHandler::alloc(), - &NSData::with_bytes(buffer.get_ref()), + &NSData::with_bytes(&raw_data), &NSDictionary::new(), ); + let requests = NSArray::from_slice(&[request.as_ref() as &VNRequest]); handler.performRequests_error(&requests)?; @@ -146,8 +482,8 @@ mod ocr_mac { use image::ImageReader; use std::{path::Path, time::Instant}; - #[test] - fn test_ocr_apple_vision() { + #[tokio::test] + async fn test_ocr_apple_vision() { if Path::new("./test_data/double_cols.jpg").exists() { let image = ImageReader::open("./test_data/double_cols.jpg") .unwrap() @@ -155,7 +491,7 @@ mod ocr_mac { .unwrap(); let s = Instant::now(); - let ocr_result = parse_image_ocr(&image, 1f32); + let ocr_result = parse_image_ocr(&image, None, 1f32).await; assert!(ocr_result.is_ok()); println!( @@ -164,6 +500,102 @@ mod ocr_mac { ); } } + + #[test] + fn test_ocr_batching_perf() { + let image_path = if Path::new("./test_data/double_cols.jpg").exists() { + "./test_data/double_cols.jpg" + } else { + "../test_data/double_cols.jpg" + }; + let image = ImageReader::open(image_path).unwrap().decode().unwrap(); + let n = 5; + + // 1. Parallel handles + let s = Instant::now(); + let mut handles = vec![]; + for _ in 0..n { + let img = image.clone(); + handles.push(std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + let _ = rt.block_on(parse_image_ocr(&img, None, 1.0)); + })); + } + for h in handles { + h.join().unwrap(); + } + let parallel_duration = s.elapsed(); + eprintln!( + "Parallel execution ({} threads, 1 request each) took: {:?}", + n, parallel_duration + ); + + // 2. Batch requests + let s = Instant::now(); + unsafe { + let mut buffer = std::io::Cursor::new(Vec::new()); + image + .write_to(&mut buffer, image::ImageFormat::Tiff) + .unwrap(); + let raw_data = buffer.into_inner(); + + let mut requests = vec![]; + for _ in 0..n { + let request = VNRecognizeTextRequest::new(); + request + .setRecognitionLevel(objc2_vision::VNRequestTextRecognitionLevel::Accurate); + request.setUsesLanguageCorrection(true); + requests.push(request); + } + + let handler = VNImageRequestHandler::initWithData_options( + VNImageRequestHandler::alloc(), + &NSData::with_bytes(&raw_data), + &NSDictionary::new(), + ); + + let v_requests: Vec<&VNRequest> = + requests.iter().map(|r| r.as_ref() as &VNRequest).collect(); + let ns_requests = NSArray::from_slice(&v_requests); + let _ = handler.performRequests_error(&ns_requests); + } + let batch_duration = s.elapsed(); + eprintln!( + "Batch execution (1 handler, {} requests) took: {:?}", + n, batch_duration + ); + } + + #[tokio::test] + async fn test_ocr_parser_batching() { + if Path::new("./test_data/double_cols.jpg").exists() { + let image = ImageReader::open("./test_data/double_cols.jpg") + .unwrap() + .decode() + .unwrap(); + let parser = OCRParser::new(); + let n = 3; + let mut set = tokio::task::JoinSet::new(); + + let start = Instant::now(); + for _i in 0..n { + let img = image.clone(); + let p = parser.clone(); + set.spawn(async move { p.parse(&img, 1.0).await }); + } + + let mut results = Vec::new(); + while let Some(res) = set.join_next().await { + results.push(res.unwrap().unwrap()); + } + let duration = start.elapsed(); + eprintln!("OCR Parser batching ({} requests) took: {:?}", n, duration); + assert_eq!(results.len(), n); + for res in results { + assert!(!res.is_empty()); + } + } + } } } @@ -172,7 +604,13 @@ mod ocr_linux { use super::*; - pub(super) fn parse_image_ocr( + pub(super) fn parse_images_ocr_batch( + _inputs: Vec<(Arc, f32)>, + ) -> Vec>> { + vec![Err(anyhow::anyhow!("not implemented yet"))] + } + + pub(super) fn parse_single_image_ocr( _image: &DynamicImage, _rescale_factor: f32, ) -> anyhow::Result> { diff --git a/ferrules-core/src/parse/document.rs b/ferrules-core/src/parse/document.rs index 71c0ae3..e26d655 100644 --- a/ferrules-core/src/parse/document.rs +++ b/ferrules-core/src/parse/document.rs @@ -21,6 +21,7 @@ use crate::{ ParseLayoutQueue, }, metrics::ParsingMetrics, + ocr::{OCRParser, OCRQueue}, parse::table::{ParseTableQueue, TableParser, TableTransformer}, }; @@ -58,6 +59,7 @@ async fn parse_task( parse_native_result: ParseNativePageResult, layout_queue: ParseLayoutQueue, table_queue: ParseTableQueue, + ocr_queue: OCRQueue, debug_dir: Option, callback: Option, ) -> Result @@ -71,6 +73,7 @@ where debug_dir, layout_queue.clone(), table_queue.clone(), + ocr_queue.clone(), ) .await; if let Some(callback) = callback { @@ -88,6 +91,7 @@ pub struct FerrulesParser { layout_queue: ParseLayoutQueue, native_queue: ParseNativeQueue, table_queue: ParseTableQueue, + ocr_queue: OCRQueue, } impl FerrulesParser { @@ -109,10 +113,13 @@ impl FerrulesParser { let transformer = TableTransformer::new(&layout_config).ok(); let table_parser = Arc::new(TableParser::new(transformer)); let table_queue = ParseTableQueue::new(table_parser); + let ocr_parser = Arc::new(OCRParser::new()); + let ocr_queue = OCRQueue::new(ocr_parser); Self { layout_queue, native_queue, table_queue, + ocr_queue, } } /// Parses a document into a structured format with optional page-level progress callback @@ -204,7 +211,7 @@ impl FerrulesParser { let duration = start_time.elapsed(); let parsing_metrics = ParsingMetrics { - total_duration_ms: duration.as_millis(), + total_duration_ms: duration.as_secs_f64() * 1000.0, pages: parsed_pages.iter().map(|p| p.metrics.clone()).collect(), }; @@ -292,6 +299,7 @@ impl FerrulesParser { parse_native_result, self.layout_queue.clone(), self.table_queue.clone(), + self.ocr_queue.clone(), tmp_dir, callback, ) diff --git a/ferrules-core/src/parse/native.rs b/ferrules-core/src/parse/native.rs index bd22b65..47222ea 100644 --- a/ferrules-core/src/parse/native.rs +++ b/ferrules-core/src/parse/native.rs @@ -65,6 +65,7 @@ pub struct ParseNativeRequest { pub required_raster_width: u32, pub required_raster_height: u32, pub sender_tx: Sender>, + pub queue_time: Instant, } impl ParseNativeRequest { pub fn new( @@ -83,13 +84,14 @@ impl ParseNativeRequest { required_raster_width: ORTLayoutParser::REQUIRED_WIDTH, required_raster_height: ORTLayoutParser::REQUIRED_HEIGHT, sender_tx, + queue_time: Instant::now(), } } } #[derive(Debug)] pub struct ParseNativeMetadata { - pub parse_native_duration_ms: u128, + pub parse_native_duration_ms: f64, } #[derive(Debug)] @@ -180,7 +182,7 @@ pub(crate) fn parse_page_native( let text_lines = parse_text_lines(text_spans); - let parse_native_duration_ms = start_time.elapsed().as_millis(); + let parse_native_duration_ms = start_time.elapsed().as_secs_f64() * 1000.0; tracing::debug!("pdfium parsing for page {page_id} took: {parse_native_duration_ms}ms"); Ok(ParseNativePageResult { page_id, @@ -265,6 +267,7 @@ fn handle_parse_native_req( required_raster_width, required_raster_height, sender_tx, + queue_time: _, } = req; let mut document = pdfium .load_pdf_from_byte_slice(&doc_data, password.as_deref()) @@ -298,6 +301,8 @@ pub fn start_native_parser(mut input_rx: Receiver<(ParseNativeRequest, Span)>) { Pdfium::bind_to_statically_linked_library().expect("can't load pdfiurm bindings"), ); while let Some((req, parent_span)) = input_rx.blocking_recv() { + let queue_duration = req.queue_time.elapsed(); + tracing::debug!(parent: &parent_span, "Native request dequeued after {:?} in queue", queue_duration); match handle_parse_native_req(&pdfium, req, parent_span) { Ok(_) => {} Err(e) => eprintln!("error parsing request natively : {:?}", e), diff --git a/ferrules-core/src/parse/page.rs b/ferrules-core/src/parse/page.rs index bbafd92..362ec38 100644 --- a/ferrules-core/src/parse/page.rs +++ b/ferrules-core/src/parse/page.rs @@ -16,8 +16,8 @@ use crate::{ layout::{ model::LayoutBBox, Metadata, ParseLayoutQueue, ParseLayoutRequest, ParseLayoutResponse, }, - metrics::{OCRMetrics, PageMetrics, StepMetrics}, - ocr::parse_image_ocr, + metrics::{OCRMetrics, PageMetrics, StepMetrics, TableMetrics}, + ocr::{OCRMetadata, OCRQueue, ParseOCRRequest}, parse::table::ParseTableQueue, }; @@ -69,6 +69,8 @@ async fn parse_page_text( native_text_lines: Vec, page_layout: &[LayoutBBox], page_image: Arc, + ocr_queue: OCRQueue, + page_id: PageID, downscale_factor: f32, ) -> Result<(Vec, Option, bool), FerrulesError> { let text_layout_box: Vec<&LayoutBBox> = @@ -76,29 +78,31 @@ async fn parse_page_text( let need_ocr = page_needs_ocr(&text_layout_box, &native_text_lines); let (ocr_result, ocr_metrics) = if need_ocr { - let page_image_clone = Arc::clone(&page_image); - let start_wait = Instant::now(); - let _permit = crate::ocr::OCR_SEMAPHORE.acquire().await.unwrap(); - let wait_duration = start_wait.elapsed(); - - let start_ocr = Instant::now(); - // We pass None for debug_dir for now - let res = parse_image_ocr(&page_image_clone, None, downscale_factor).await; - - let ocr_duration = start_ocr.elapsed(); - drop(_permit); - tracing::debug!( - "OCR semaphore wait: {}ms, OCR execution: {}ms", - wait_duration.as_millis(), - ocr_duration.as_millis() - ); - match res { - Ok((lines, mut metrics)) => { - metrics.idle_time_ms = wait_duration.as_millis(); - (Some(lines), Some(metrics)) - } - Err(_) => (None, None), - } + let (tx, rx) = tokio::sync::oneshot::channel(); + let req = ParseOCRRequest { + page_id, + page_image: Arc::clone(&page_image), + rescale_factor: downscale_factor, + metadata: OCRMetadata { + response_tx: tx, + queue_time: Instant::now(), + }, + }; + ocr_queue.push(req).await?; + tracing::debug!("OCR request pushed to queue for page {}", page_id); + + let res = rx + .await + .map_err(|e| { + tracing::error!("OCR channel receive error: {:?}", e); + FerrulesError::OcrError(format!("OCR channel error: {}", e)) + })? + .map_err(|e| { + tracing::error!("OCR execution error: {:?}", e); + e + })?; + + (Some(res.ocr_lines), Some(res.step_metrics)) } else { (None, None) }; @@ -121,9 +125,11 @@ async fn parse_page_text( skip_all, fields( layout_queue_time_ms, - layout_parse_duration_ms, + layout_idle_time_ms, layout_parse_duration_ms, parse_native_duration_ms, + ocr_idle_time_ms, + ocr_parse_duration_ms, table_queue_time_ms, table_parse_duration_ms, ) @@ -133,6 +139,7 @@ pub async fn parse_page_full( debug_dir: Option, layout_queue: ParseLayoutQueue, table_queue: ParseTableQueue, + ocr_queue: OCRQueue, ) -> Result { let start_time = Instant::now(); let span = tracing::Span::current(); @@ -158,22 +165,32 @@ pub async fn parse_page_full( }, }; layout_queue.push(layout_req).await?; + tracing::debug!("Layout request pushed to queue"); let ParseLayoutResponse { - page_id: _, + _page_id: _, // TODO: remove page_id from ParseLayoutResponse layout_bbox: page_layout, step_metrics: layout_step_metrics, } = layout_rx .await // TODO: better unwrapping - .map_err(|_| FerrulesError::LayoutParsingError)? - .map_err(|_| FerrulesError::LayoutParsingError)?; + .map_err(|e| { + tracing::error!("Layout channel receive error: {:?}", e); + FerrulesError::LayoutParsingError + })? + .map_err(|e| { + tracing::error!("Layout model execution error: {:?}", e); + FerrulesError::LayoutParsingError + })?; + tracing::debug!("Layout response received"); let native_lines_captured = text_lines.clone(); let (text_lines_processed, ocr_step_metrics_inner, need_ocr) = parse_page_text( text_lines, &page_layout, Arc::clone(&page_image), + ocr_queue, + page_id, downscale_factor, ) .await?; @@ -190,8 +207,6 @@ pub async fn parse_page_full( // Table parsing let mut set = JoinSet::new(); - let mut total_table_parse_duration = 0; - let mut total_table_queue_time = 0; for (idx, element) in elements.iter().enumerate() { if matches!(element.kind, ElementType::Table(_)) { let (tx, rx) = tokio::sync::oneshot::channel(); @@ -208,25 +223,24 @@ pub async fn parse_page_full( }, }; table_queue.push(req).await?; + tracing::debug!("Table request pushed to queue for table {}", idx); set.spawn(async move { (idx, rx.await) }); } } + let mut table_steps = Vec::new(); while let Some(res) = set.join_next().await { if let Ok((idx, Ok(Ok(resp)))) = res { if let ElementType::Table(ref mut table_opt) = elements[idx].kind { + let algorithm = resp.table_block.algorithm.clone(); *table_opt = Some(resp.table_block); - total_table_parse_duration += resp.step_metrics.execution_time_ms; - total_table_queue_time += resp.step_metrics.queue_time_ms; + table_steps.push(TableMetrics { + step_metrics: resp.step_metrics, + algorithm, + }); } } } - - let table_step_metrics = StepMetrics { - queue_time_ms: total_table_queue_time, - execution_time_ms: total_table_parse_duration, - idle_time_ms: 0, // Should also sum idle time if available, but let's stick to what we have or update loop - }; if let Some(tmp_dir) = debug_dir { debug_page( &tmp_dir, @@ -240,14 +254,14 @@ pub async fn parse_page_full( )? }; - let native_step = StepMetrics::new(parse_native_metadata.parse_native_duration_ms); + let native_step = StepMetrics::new(parse_native_metadata.parse_native_duration_ms as f64); let page_metrics = PageMetrics { page_id, - total_duration_ms: start_time.elapsed().as_millis(), + total_duration_ms: start_time.elapsed().as_secs_f64() * 1000.0, native_step, layout_step: layout_step_metrics, - table_step: table_step_metrics, + table_steps, ocr_step: ocr_step_metrics, }; diff --git a/ferrules-core/src/parse/table/mod.rs b/ferrules-core/src/parse/table/mod.rs index e150b05..5ba0840 100644 --- a/ferrules-core/src/parse/table/mod.rs +++ b/ferrules-core/src/parse/table/mod.rs @@ -74,7 +74,7 @@ async fn start_table_parser( // TODO: make this configurable let s = Arc::new(Semaphore::new(TABLE_PARSER_CONCURRENCY)); while let Some((req, span)) = input_rx.recv().await { - let queue_time = req.metadata.queue_time.elapsed().as_millis(); + let queue_time = req.metadata.queue_time.elapsed().as_secs_f64() * 1000.0; let page_id = req.page_id; tracing::debug!("table request queue time for page {page_id} took: {queue_time}ms"); tokio::spawn( @@ -87,11 +87,11 @@ async fn handle_table_request( s: Arc, _parser: Arc, req: ParseTableRequest, - table_queue_time_ms: u128, + table_queue_time_ms: f64, ) { let start_wait = Instant::now(); let _permit = s.acquire().await.unwrap(); - let idle_time_ms = start_wait.elapsed().as_millis(); + let idle_time_ms = start_wait.elapsed().as_secs_f64() * 1000.0; let ParseTableRequest { page_id, @@ -120,7 +120,7 @@ async fn handle_table_request( downscale_factor, ) .await; - let inference_duration = start.elapsed().as_millis(); + let inference_duration = start.elapsed().as_secs_f64() * 1000.0; tracing::debug!("table inference time for page {page_id} took: {inference_duration}ms"); drop(_permit); diff --git a/ferrules-core/src/parse/table/table_transformer.rs b/ferrules-core/src/parse/table/table_transformer.rs index 10c7eec..da3f869 100644 --- a/ferrules-core/src/parse/table/table_transformer.rs +++ b/ferrules-core/src/parse/table/table_transformer.rs @@ -134,7 +134,11 @@ impl BatchInferenceRunner { Err(e) => { tracing::error!("Failed to spawn blocking for f16 conversion: {}", e); for req in batch.drain(..) { - let _ = req.response_tx.send(Err(FerrulesError::LayoutParsingError)); + let _ = req.response_tx.send(Err( + FerrulesError::TableTransformerModelError( + "Failed to spawn blocking for f16 conversion".to_string(), + ), + )); } continue; } @@ -208,26 +212,29 @@ impl TableTransformer { pub fn new(config: &crate::layout::model::ORTConfig) -> Result { let mut execution_providers = Vec::new(); + // Get providers sorted by priority: accelerators first + let providers = config.get_sorted_providers(); + // Providers - for provider in &config.execution_providers { + for provider in providers { match provider { crate::layout::model::OrtExecutionProvider::Trt(device_id) => { execution_providers.push( TensorRTExecutionProvider::default() - .with_device_id(*device_id) + .with_device_id(device_id) .build(), ); } crate::layout::model::OrtExecutionProvider::CUDA(device_id) => { execution_providers.push( CUDAExecutionProvider::default() - .with_device_id(*device_id) + .with_device_id(device_id) .build(), ); } crate::layout::model::OrtExecutionProvider::CoreML { ane_only } => { let provider = CoreMLExecutionProvider::default(); - let provider = if *ane_only { + let provider = if ane_only { provider.with_ane_only().build() } else { provider.build() @@ -253,7 +260,7 @@ impl TableTransformer { None => GraphOptimizationLevel::Disable, }; - let session = Session::builder() + let mut builder = Session::builder() .map_err(|e| FerrulesError::TableTransformerModelError(e.to_string()))? .with_execution_providers(execution_providers) .map_err(|e| FerrulesError::TableTransformerModelError(e.to_string()))? @@ -262,7 +269,15 @@ impl TableTransformer { .with_intra_threads(config.intra_threads) .map_err(|e| FerrulesError::TableTransformerModelError(e.to_string()))? .with_inter_threads(config.inter_threads) - .map_err(|e| FerrulesError::TableTransformerModelError(e.to_string()))? + .map_err(|e| FerrulesError::TableTransformerModelError(e.to_string()))?; + + if let Some(profile_path) = &config.profile_table { + builder = builder + .with_profiling(profile_path) + .map_err(|e| FerrulesError::TableTransformerModelError(e.to_string()))?; + } + + let session = builder .commit_from_memory(TABLE_MODEL_BYTES) .map_err(|e| FerrulesError::TableTransformerModelError(e.to_string()))?; @@ -308,9 +323,19 @@ impl TableTransformer { response_tx: tx, }) .await - .map_err(|_| FerrulesError::LayoutParsingError)?; - - rx.await.map_err(|_| FerrulesError::LayoutParsingError)? + .map_err(|e| { + FerrulesError::TableTransformerModelError(format!( + "Table transformer queue send error: {}", + e + )) + })?; + + rx.await.map_err(|e| { + FerrulesError::TableTransformerModelError(format!( + "Table transformer channel closed: {}", + e + )) + })? } /// Decode the DETR-style output from the Table Transformer. diff --git a/infra/signoz/common/clickhouse/cluster.xml b/infra/signoz/common/clickhouse/cluster.xml new file mode 100644 index 0000000..8b475ff --- /dev/null +++ b/infra/signoz/common/clickhouse/cluster.xml @@ -0,0 +1,75 @@ + + + + + + zookeeper-1 + 2181 + + + + + + + + + + + + + + + + clickhouse + 9000 + + + + + + + + diff --git a/infra/signoz/common/clickhouse/config.xml b/infra/signoz/common/clickhouse/config.xml new file mode 100644 index 0000000..1965ac3 --- /dev/null +++ b/infra/signoz/common/clickhouse/config.xml @@ -0,0 +1,1142 @@ + + + + + + information + + json + + /var/log/clickhouse-server/clickhouse-server.log + /var/log/clickhouse-server/clickhouse-server.err.log + + 1000M + 10 + + + + + + + + + + + + + + + + + + 8123 + + + 9000 + + + 9004 + + + 9005 + + + + + + + + + + + + 9009 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 4096 + + + 3 + + + + + false + + + /path/to/ssl_cert_file + /path/to/ssl_key_file + + + false + + + /path/to/ssl_ca_cert_file + + + none + + + 0 + + + -1 + -1 + + + false + + + + + + + + + + + none + true + true + sslv2,sslv3 + true + + + + true + true + sslv2,sslv3 + true + + + + RejectCertificateHandler + + + + + + + + + 100 + + + 0 + + + + 10000 + + + + + + 0.9 + + + 4194304 + + + 0 + + + + + + 8589934592 + + + 5368709120 + + + + 1000 + + + 134217728 + + + 10000 + + + /var/lib/clickhouse/ + + + /var/lib/clickhouse/tmp/ + + + + ` + + + + + + /var/lib/clickhouse/user_files/ + + + + + + + + + + + + + users.xml + + + + /var/lib/clickhouse/access/ + + + + + + + default + + + + + + + + + + + + default + + + + + + + + + true + + + false + + ' | sed -e 's|.*>\(.*\)<.*|\1|') + wget https://github.com/ClickHouse/clickhouse-jdbc-bridge/releases/download/v$PKG_VER/clickhouse-jdbc-bridge_$PKG_VER-1_all.deb + apt install --no-install-recommends -f ./clickhouse-jdbc-bridge_$PKG_VER-1_all.deb + clickhouse-jdbc-bridge & + + * [CentOS/RHEL] + export MVN_URL=https://repo1.maven.org/maven2/ru/yandex/clickhouse/clickhouse-jdbc-bridge + export PKG_VER=$(curl -sL $MVN_URL/maven-metadata.xml | grep '' | sed -e 's|.*>\(.*\)<.*|\1|') + wget https://github.com/ClickHouse/clickhouse-jdbc-bridge/releases/download/v$PKG_VER/clickhouse-jdbc-bridge-$PKG_VER-1.noarch.rpm + yum localinstall -y clickhouse-jdbc-bridge-$PKG_VER-1.noarch.rpm + clickhouse-jdbc-bridge & + + Please refer to https://github.com/ClickHouse/clickhouse-jdbc-bridge#usage for more information. + ]]> + + + + + + + + + + + + + + + 01 + example01-01-1 + + + + + + 3600 + + + + 3600 + + + 60 + + + + + + + + + + /metrics + 9363 + + true + true + true + true + + + + + + system + query_log
+ + toYYYYMM(event_date) + + + + + + 7500 +
+ + + + system + trace_log
+ + toYYYYMM(event_date) + 7500 +
+ + + + system + query_thread_log
+ toYYYYMM(event_date) + 7500 +
+ + + + system + query_views_log
+ toYYYYMM(event_date) + 7500 +
+ + + + system + part_log
+ toYYYYMM(event_date) + 7500 +
+ + + + + + system + metric_log
+ 7500 + 1000 +
+ + + + system + asynchronous_metric_log
+ + 7000 +
+ + + + + + engine MergeTree + partition by toYYYYMM(finish_date) + order by (finish_date, finish_time_us, trace_id) + + system + opentelemetry_span_log
+ 7500 +
+ + + + + system + crash_log
+ + + 1000 +
+ + + + + + + system + processors_profile_log
+ + toYYYYMM(event_date) + 7500 +
+ + + + + + + + + *_dictionary.xml + + + *function.xml + /var/lib/clickhouse/user_scripts/ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + /clickhouse/task_queue/ddl + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + click_cost + any + + 0 + 3600 + + + 86400 + 60 + + + + max + + 0 + 60 + + + 3600 + 300 + + + 86400 + 3600 + + + + + + /var/lib/clickhouse/format_schemas/ + + + + + hide encrypt/decrypt arguments + ((?:aes_)?(?:encrypt|decrypt)(?:_mysql)?)\s*\(\s*(?:'(?:\\'|.)+'|.*?)\s*\) + + \1(???) + + + + + + + + + + false + + false + + + https://6f33034cfe684dd7a3ab9875e57b1c8d@o388870.ingest.sentry.io/5226277 + + + + + + + + + + + 268435456 + true + +
diff --git a/infra/signoz/common/clickhouse/custom-function.xml b/infra/signoz/common/clickhouse/custom-function.xml new file mode 100644 index 0000000..b2b3f91 --- /dev/null +++ b/infra/signoz/common/clickhouse/custom-function.xml @@ -0,0 +1,21 @@ + + + executable + histogramQuantile + Float64 + + Array(Float64) + buckets + + + Array(Float64) + counts + + + Float64 + quantile + + CSV + ./histogramQuantile + + diff --git a/infra/signoz/common/clickhouse/users.xml b/infra/signoz/common/clickhouse/users.xml new file mode 100644 index 0000000..f185620 --- /dev/null +++ b/infra/signoz/common/clickhouse/users.xml @@ -0,0 +1,123 @@ + + + + + + + + + + 10000000000 + + + random + + + + + 1 + + + + + + + + + + + + + ::/0 + + + + default + + + default + + + + + + + + + + + + + + 3600 + + + 0 + 0 + 0 + 0 + 0 + + + + diff --git a/infra/signoz/common/signoz/otel-collector-opamp-config.yaml b/infra/signoz/common/signoz/otel-collector-opamp-config.yaml new file mode 100644 index 0000000..7267607 --- /dev/null +++ b/infra/signoz/common/signoz/otel-collector-opamp-config.yaml @@ -0,0 +1 @@ +server_endpoint: ws://signoz:4320/v1/opamp diff --git a/infra/signoz/common/signoz/prometheus.yml b/infra/signoz/common/signoz/prometheus.yml new file mode 100644 index 0000000..683e5e1 --- /dev/null +++ b/infra/signoz/common/signoz/prometheus.yml @@ -0,0 +1,25 @@ +# my global config +global: + scrape_interval: 5s # Set the scrape interval to every 15 seconds. Default is every 1 minute. + evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute. + # scrape_timeout is set to the global default (10s). + +# Alertmanager configuration +alerting: + alertmanagers: + - static_configs: + - targets: + - alertmanager:9093 + +# Load rules once and periodically evaluate them according to the global 'evaluation_interval'. +rule_files: [] + # - "first_rules.yml" + # - "second_rules.yml" + # - 'alerts.yml' + +# A scrape configuration containing exactly one endpoint to scrape: +# Here it's Prometheus itself. +scrape_configs: [] + +remote_read: + - url: tcp://clickhouse:9000/signoz_metrics diff --git a/infra/signoz/docker-compose.yml b/infra/signoz/docker-compose.yml new file mode 100644 index 0000000..811cd3c --- /dev/null +++ b/infra/signoz/docker-compose.yml @@ -0,0 +1,208 @@ +version: "3" +x-common: &common + networks: + - signoz-net + extra_hosts: + - "host.docker.internal:host-gateway" + restart: unless-stopped + logging: + options: + max-size: 50m + max-file: "3" +x-clickhouse-defaults: &clickhouse-defaults + !!merge <<: *common + image: clickhouse/clickhouse-server:25.5.6 + tty: true + labels: + signoz.io/scrape: "true" + signoz.io/port: "9363" + signoz.io/path: "/metrics" + depends_on: + init-clickhouse: + condition: service_completed_successfully + zookeeper-1: + condition: service_healthy + healthcheck: + test: + - CMD + - wget + - --spider + - -q + - 0.0.0.0:8123/ping + interval: 30s + timeout: 5s + retries: 3 + ulimits: + nproc: 65535 + nofile: + soft: 262144 + hard: 262144 + environment: + - CLICKHOUSE_SKIP_USER_SETUP=1 +x-zookeeper-defaults: &zookeeper-defaults + !!merge <<: *common + image: signoz/zookeeper:3.7.1 + user: root + labels: + signoz.io/scrape: "true" + signoz.io/port: "9141" + signoz.io/path: "/metrics" + healthcheck: + test: + - CMD-SHELL + - curl -s -m 2 http://localhost:8080/commands/ruok | grep error | grep null + interval: 30s + timeout: 5s + retries: 3 +x-db-depend: &db-depend + !!merge <<: *common + depends_on: + clickhouse: + condition: service_healthy + schema-migrator-sync: + condition: service_completed_successfully +services: + init-clickhouse: + !!merge <<: *common + image: clickhouse/clickhouse-server:25.5.6 + container_name: signoz-init-clickhouse + command: + - bash + - -c + - | + version="v0.0.1" + node_os=$$(uname -s | tr '[:upper:]' '[:lower:]') + node_arch=$$(uname -m | sed s/aarch64/arm64/ | sed s/x86_64/amd64/) + echo "Fetching histogram-binary for $${node_os}/$${node_arch}" + cd /tmp + wget -O histogram-quantile.tar.gz "https://github.com/SigNoz/signoz/releases/download/histogram-quantile%2F$${version}/histogram-quantile_$${node_os}_$${node_arch}.tar.gz" + tar -xvzf histogram-quantile.tar.gz + mv histogram-quantile /var/lib/clickhouse/user_scripts/histogramQuantile + restart: on-failure + volumes: + - ./common/clickhouse/user_scripts:/var/lib/clickhouse/user_scripts/ + zookeeper-1: + !!merge <<: *zookeeper-defaults + container_name: signoz-zookeeper-1 + # ports: + # - "2181:2181" + # - "2888:2888" + # - "3888:3888" + volumes: + - zookeeper-1:/bitnami/zookeeper + environment: + - ZOO_SERVER_ID=1 + - ALLOW_ANONYMOUS_LOGIN=yes + - ZOO_AUTOPURGE_INTERVAL=1 + - ZOO_ENABLE_PROMETHEUS_METRICS=yes + - ZOO_PROMETHEUS_METRICS_PORT_NUMBER=9141 + clickhouse: + !!merge <<: *clickhouse-defaults + container_name: signoz-clickhouse + # ports: + # - "9000:9000" + # - "8123:8123" + # - "9181:9181" + volumes: + - ./common/clickhouse/config.xml:/etc/clickhouse-server/config.xml + - ./common/clickhouse/users.xml:/etc/clickhouse-server/users.xml + - ./common/clickhouse/custom-function.xml:/etc/clickhouse-server/custom-function.xml + - ./common/clickhouse/user_scripts:/var/lib/clickhouse/user_scripts/ + - ./common/clickhouse/cluster.xml:/etc/clickhouse-server/config.d/cluster.xml + - clickhouse:/var/lib/clickhouse/ + # - ./common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml + signoz: + !!merge <<: *db-depend + image: signoz/signoz:${VERSION:-v0.111.0} + container_name: signoz + command: + - --config=/root/config/prometheus.yml + ports: + - "8080:8080" # signoz port + # - "6060:6060" # pprof port + volumes: + - ./common/signoz/prometheus.yml:/root/config/prometheus.yml + - ./common/dashboards:/root/config/dashboards + - sqlite:/var/lib/signoz/ + environment: + - SIGNOZ_ALERTMANAGER_PROVIDER=signoz + - SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN=tcp://clickhouse:9000 + - SIGNOZ_SQLSTORE_SQLITE_PATH=/var/lib/signoz/signoz.db + - DASHBOARDS_PATH=/root/config/dashboards + - STORAGE=clickhouse + - GODEBUG=netdns=go + - TELEMETRY_ENABLED=true + - DEPLOYMENT_TYPE=docker-standalone-amd + - DOT_METRICS_ENABLED=true + healthcheck: + test: + - CMD + - wget + - --spider + - -q + - localhost:8080/api/v1/health + interval: 30s + timeout: 5s + retries: 3 + otel-collector: + !!merge <<: *db-depend + image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.142.0} + container_name: signoz-otel-collector + command: ["--config=/etc/otel-collector-config.yaml"] + volumes: + - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml + - ./common/signoz/otel-collector-opamp-config.yaml:/etc/manager-config.yaml + - /:/hostfs:ro + - /var/run/docker.sock:/var/run/docker.sock + environment: + - OTEL_RESOURCE_ATTRIBUTES=host.name=signoz-host,os.type=linux + - LOW_CARDINAL_EXCEPTION_GROUPING=false + ports: + # - "1777:1777" # pprof extension + - "4317:4317" # OTLP gRPC receiver + - "4318:4318" # OTLP HTTP receiver + - "2255:2255" # TCP receiver for logspout + depends_on: + signoz: + condition: service_healthy + logspout: + !!merge <<: *common + image: "gliderlabs/logspout:v3.2.14" + container_name: signoz-logspout + volumes: + - /etc/hostname:/etc/host_hostname:ro + - /var/run/docker.sock:/var/run/docker.sock + command: syslog+tcp://otel-collector:2255 + depends_on: + - otel-collector + schema-migrator-sync: + !!merge <<: *common + image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.142.0} + container_name: schema-migrator-sync + command: + - sync + - --dsn=tcp://clickhouse:9000 + - --up= + depends_on: + clickhouse: + condition: service_healthy + restart: on-failure + schema-migrator-async: + !!merge <<: *db-depend + image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.142.0} + container_name: schema-migrator-async + command: + - async + - --dsn=tcp://clickhouse:9000 + - --up= + restart: on-failure +networks: + signoz-net: + name: signoz-net +volumes: + clickhouse: + name: signoz-clickhouse + sqlite: + name: signoz-sqlite + zookeeper-1: + name: signoz-zookeeper-1 diff --git a/infra/signoz/ferrules_dashboard.json b/infra/signoz/ferrules_dashboard.json new file mode 100644 index 0000000..6c5f0e0 --- /dev/null +++ b/infra/signoz/ferrules_dashboard.json @@ -0,0 +1,276 @@ +{ + "title": "Ferrules Performance Dashboard", + "description": "Dashboard for Ferrules API performance metrics including OCR, Table, Layout, and Page processing times.", + "tags": [ + "ferrules", + "pdf-parsing" + ], + "layout": [ + { + "i": "page_duration", + "x": 0, + "y": 0, + "w": 12, + "h": 6 + }, + { + "i": "layout_execution", + "x": 0, + "y": 6, + "w": 6, + "h": 6 + }, + { + "i": "layout_queue", + "x": 6, + "y": 6, + "w": 6, + "h": 6 + }, + { + "i": "ocr_execution", + "x": 0, + "y": 12, + "w": 6, + "h": 6 + }, + { + "i": "ocr_idle", + "x": 6, + "y": 12, + "w": 6, + "h": 6 + }, + { + "i": "table_execution", + "x": 0, + "y": 18, + "w": 6, + "h": 6 + }, + { + "i": "table_queue", + "x": 6, + "y": 18, + "w": 6, + "h": 6 + }, + { + "i": "native_execution", + "x": 0, + "y": 24, + "w": 12, + "h": 6 + } + ], + "widgets": [ + { + "id": "page_duration", + "title": "Page Processing Duration (ms)", + "description": "Total time to process a single page (by OCR usage)", + "panelTypes": "graph", + "query": { + "queryType": "promql", + "promql": [ + { + "name": "A", + "disabled": false, + "query": "histogram_quantile(0.99, sum(rate({__name__=\"page_processing_duration_ms.bucket\"}[1m])) by (le, ocr))", + "legend": "OCR={{ocr}} P99" + }, + { + "name": "B", + "disabled": false, + "query": "histogram_quantile(0.50, sum(rate({__name__=\"page_processing_duration_ms.bucket\"}[1m])) by (le, ocr))", + "legend": "OCR={{ocr}} P50" + } + ] + } + }, + { + "id": "layout_execution", + "title": "Layout Execution Time (ms)", + "description": "Time spent executing layout analysis inference", + "panelTypes": "graph", + "query": { + "queryType": "promql", + "promql": [ + { + "name": "A", + "disabled": false, + "query": "histogram_quantile(0.99, sum(rate({__name__=\"layout_execution_time_ms.bucket\"}[1m])) by (le))", + "legend": "P99" + }, + { + "name": "B", + "disabled": false, + "query": "histogram_quantile(0.50, sum(rate({__name__=\"layout_execution_time_ms.bucket\"}[1m])) by (le))", + "legend": "P50" + }, + { + "name": "C", + "disabled": false, + "query": "sum(rate({__name__=\"layout_execution_time_ms.sum\"}[1m])) / sum(rate({__name__=\"layout_execution_time_ms.count\"}[1m]))", + "legend": "Avg" + } + ] + } + }, + { + "id": "layout_queue", + "title": "Layout Queue Time (ms)", + "description": "Time spent waiting in the layout parser queue", + "panelTypes": "graph", + "query": { + "queryType": "promql", + "promql": [ + { + "name": "A", + "disabled": false, + "query": "histogram_quantile(0.99, sum(rate({__name__=\"layout_queue_time_ms.bucket\"}[1m])) by (le))", + "legend": "P99" + }, + { + "name": "B", + "disabled": false, + "query": "histogram_quantile(0.50, sum(rate({__name__=\"layout_queue_time_ms.bucket\"}[1m])) by (le))", + "legend": "P50" + } + ] + } + }, + { + "id": "ocr_execution", + "title": "OCR Execution Time (ms)", + "description": "Time spent executing OCR engine", + "panelTypes": "graph", + "query": { + "queryType": "promql", + "promql": [ + { + "name": "A", + "disabled": false, + "query": "histogram_quantile(0.99, sum(rate({__name__=\"ocr_execution_time_ms.bucket\"}[1m])) by (le))", + "legend": "P99" + }, + { + "name": "B", + "disabled": false, + "query": "histogram_quantile(0.50, sum(rate({__name__=\"ocr_execution_time_ms.bucket\"}[1m])) by (le))", + "legend": "P50" + }, + { + "name": "C", + "disabled": false, + "query": "sum(rate({__name__=\"ocr_execution_time_ms.sum\"}[1m])) / sum(rate({__name__=\"ocr_execution_time_ms.count\"}[1m]))", + "legend": "Avg" + } + ] + } + }, + { + "id": "ocr_idle", + "title": "OCR Idle Time (ms)", + "description": "Time OCR workers spent waiting for semaphore/resources", + "panelTypes": "graph", + "query": { + "queryType": "promql", + "promql": [ + { + "name": "A", + "disabled": false, + "query": "histogram_quantile(0.99, sum(rate({__name__=\"ocr_idle_time_ms.bucket\"}[1m])) by (le))", + "legend": "P99" + }, + { + "name": "C", + "disabled": false, + "query": "sum(rate({__name__=\"ocr_idle_time_ms.sum\"}[1m])) / sum(rate({__name__=\"ocr_idle_time_ms.count\"}[1m]))", + "legend": "Avg" + } + ] + } + }, + { + "id": "table_execution", + "title": "Table Extraction Time (ms)", + "description": "Time spent extracting tables (by algorithm)", + "panelTypes": "graph", + "query": { + "queryType": "promql", + "promql": [ + { + "name": "A", + "disabled": false, + "query": "histogram_quantile(0.99, sum(rate({__name__=\"table_execution_time_ms.bucket\"}[1m])) by (le, method))", + "legend": "{{method}} P99" + }, + { + "name": "B", + "disabled": false, + "query": "histogram_quantile(0.50, sum(rate({__name__=\"table_execution_time_ms.bucket\"}[1m])) by (le, method))", + "legend": "{{method}} P50" + }, + { + "name": "C", + "disabled": false, + "query": "sum(rate({__name__=\"table_execution_time_ms.sum\"}[1m])) by (method) / sum(rate({__name__=\"table_execution_time_ms.count\"}[1m])) by (method)", + "legend": "{{method}} Avg" + } + ] + } + }, + { + "id": "table_queue", + "title": "Table Queue Time (ms)", + "description": "Time spent waiting for table extraction in queue", + "panelTypes": "graph", + "query": { + "queryType": "promql", + "promql": [ + { + "name": "A", + "disabled": false, + "query": "histogram_quantile(0.99, sum(rate({__name__=\"table_queue_time_ms.bucket\"}[1m])) by (le, method))", + "legend": "{{method}} P99" + }, + { + "name": "B", + "disabled": false, + "query": "histogram_quantile(0.50, sum(rate({__name__=\"table_queue_time_ms.bucket\"}[1m])) by (le, method))", + "legend": "{{method}} P50" + } + ] + } + }, + { + "id": "native_execution", + "title": "Native Execution Time (ms)", + "description": "Time spent in native PDF parsing step", + "panelTypes": "graph", + "query": { + "queryType": "promql", + "promql": [ + { + "name": "A", + "disabled": false, + "query": "histogram_quantile(0.99, sum(rate({__name__=\"native_execution_time_ms.bucket\"}[1m])) by (le))", + "legend": "P99" + }, + { + "name": "B", + "disabled": false, + "query": "histogram_quantile(0.50, sum(rate({__name__=\"native_execution_time_ms.bucket\"}[1m])) by (le))", + "legend": "P50" + }, + { + "name": "C", + "disabled": false, + "query": "sum(rate({__name__=\"native_execution_time_ms.sum\"}[1m])) / sum(rate({__name__=\"native_execution_time_ms.count\"}[1m]))", + "legend": "Avg" + } + ] + } + } + ] +} \ No newline at end of file diff --git a/infra/signoz/otel-collector-config.yaml b/infra/signoz/otel-collector-config.yaml new file mode 100644 index 0000000..6885e8c --- /dev/null +++ b/infra/signoz/otel-collector-config.yaml @@ -0,0 +1,177 @@ +connectors: + signozmeter: + metrics_flush_interval: 1h + dimensions: + - name: service.name + - name: deployment.environment + - name: host.name +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + prometheus: + config: + global: + scrape_interval: 60s + scrape_configs: + - job_name: otel-collector + static_configs: + - targets: + - localhost:8888 + labels: + job_name: otel-collector + - job_name: ferrules-api + scrape_interval: 10s + metrics_path: /metrics + scheme: http + static_configs: + - targets: + - host.docker.internal:3002 + labels: + job_name: ferrules-api + - job_name: docker-container + docker_sd_configs: + - host: unix:///var/run/docker.sock + relabel_configs: + - action: keep + regex: true + source_labels: + - __meta_docker_container_label_signoz_io_scrape + - regex: true + source_labels: + - __meta_docker_container_label_signoz_io_path + target_label: __metrics_path__ + - regex: (.+) + source_labels: + - __meta_docker_container_label_signoz_io_path + target_label: __metrics_path__ + - separator: ":" + source_labels: + - __meta_docker_network_ip + - __meta_docker_container_label_signoz_io_port + target_label: __address__ + - regex: '/(.*)' + replacement: '$1' + source_labels: + - __meta_docker_container_name + target_label: container_name + - regex: __meta_docker_container_label_signoz_io_(.+) + action: labelmap + replacement: $1 + hostmetrics: + collection_interval: 30s + root_path: /hostfs + scrapers: + cpu: {} + load: {} + memory: {} + disk: {} + filesystem: {} + network: {} + tcplog/docker: + listen_address: "0.0.0.0:2255" + operators: + - type: regex_parser + regex: '^<([0-9]+)>[0-9]+ (?P[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+)?([zZ]|([\+-])([01]\d|2[0-3]):?([0-5]\d)?)?) (?P\S+) (?P\S+) [0-9]+ - -( (?P.*))?' + timestamp: + parse_from: attributes.timestamp + layout: '%Y-%m-%dT%H:%M:%S.%LZ' + - type: move + from: attributes["body"] + to: body + - type: remove + field: attributes.timestamp + - type: filter + id: signoz_logs_filter + expr: 'attributes.container_name matches "^signoz|(signoz-(|otel-collector|clickhouse|zookeeper))|(infra-(logspout|otel-agent)-.*)"' +processors: + batch: + send_batch_size: 10000 + send_batch_max_size: 11000 + timeout: 10s + batch/meter: + send_batch_max_size: 25000 + send_batch_size: 20000 + timeout: 1s + resourcedetection: + # Using OTEL_RESOURCE_ATTRIBUTES envvar, env detector adds custom labels. + detectors: [env, system] + timeout: 2s + signozspanmetrics/delta: + metrics_exporter: signozclickhousemetrics + metrics_flush_interval: 60s + latency_histogram_buckets: [100us, 1ms, 2ms, 6ms, 10ms, 50ms, 100ms, 250ms, 500ms, 1000ms, 1400ms, 2000ms, 5s, 10s, 20s, 40s, 60s ] + dimensions_cache_size: 100000 + aggregation_temporality: AGGREGATION_TEMPORALITY_DELTA + enable_exp_histogram: true + dimensions: + - name: service.namespace + default: default + - name: deployment.environment + default: default + # This is added to ensure the uniqueness of the timeseries + # Otherwise, identical timeseries produced by multiple replicas of + # collectors result in incorrect APM metrics + - name: signoz.collector.id + - name: service.version + - name: browser.platform + - name: browser.mobile + - name: k8s.cluster.name + - name: k8s.node.name + - name: k8s.namespace.name + - name: host.name + - name: host.type + - name: container.name +extensions: + health_check: + endpoint: 0.0.0.0:13133 + pprof: + endpoint: 0.0.0.0:1777 +exporters: + clickhousetraces: + datasource: tcp://clickhouse:9000/signoz_traces + low_cardinal_exception_grouping: ${env:LOW_CARDINAL_EXCEPTION_GROUPING} + use_new_schema: true + signozclickhousemetrics: + dsn: tcp://clickhouse:9000/signoz_metrics + clickhouselogsexporter: + dsn: tcp://clickhouse:9000/signoz_logs + timeout: 10s + use_new_schema: true + signozclickhousemeter: + dsn: tcp://clickhouse:9000/signoz_meter + timeout: 45s + sending_queue: + enabled: false +service: + telemetry: + logs: + level: debug + encoding: json + extensions: + - health_check + - pprof + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [clickhousetraces, signozmeter] + metrics: + receivers: [otlp, hostmetrics] + processors: [batch] + exporters: [signozclickhousemetrics, signozmeter] + metrics/prometheus: + receivers: [prometheus] + processors: [batch] + exporters: [signozclickhousemetrics, signozmeter] + logs: + receivers: [otlp, tcplog/docker] + processors: [batch] + exporters: [clickhouselogsexporter, signozmeter] + metrics/meter: + receivers: [signozmeter] + processors: [batch/meter] + exporters: [signozclickhousemeter]