diff --git a/.github/workflows/compat-loki.yaml b/.github/workflows/compat-loki.yaml index a1426af6..a3b25319 100644 --- a/.github/workflows/compat-loki.yaml +++ b/.github/workflows/compat-loki.yaml @@ -78,6 +78,10 @@ jobs: raise SystemExit(1) print(f"Loki compatibility score: {passed}/{total} ({pct_val:.1f}%)") PY + - name: Run Loki query semantics matrix + run: | + set -euo pipefail + go test -v -tags=e2e -run '^TestQuerySemantics' ./test/e2e-compat/ - name: Tear down pinned stack if: always() working-directory: test/e2e-compat @@ -144,6 +148,12 @@ jobs: raise SystemExit(1) print(f"Loki compatibility score: {passed}/{total} ({pct_val:.1f}%)") PY + - name: Run Loki query semantics matrix + env: + LOKI_IMAGE: grafana/loki:${{ matrix.loki_version }} + run: | + set -euo pipefail + go test -v -tags=e2e -run '^TestQuerySemantics' ./test/e2e-compat/ - name: Tear down matrix stack if: always() working-directory: test/e2e-compat diff --git a/CHANGELOG.md b/CHANGELOG.md index fbbaad50..92277ddd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,20 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -## [1.10.3] - 2026-04-20 +### Bug Fixes + +- compat/loki: preserve Loki semantics for bare parser-derived metric queries and `absent_over_time(...)` on the direct `query` and `query_range` paths so valid Loki operations keep their parser-derived label cardinality, unwrap behavior, and empty-series semantics instead of collapsing into proxy-specific aggregated fallback results. ### Changed - release/metadata: synchronized release metadata for v1.10.2. -### Bug Fixes - -- drilldown/metadata: retry relaxed metadata candidates after successful-empty strict scans for native streams, native field names, detected fields, and detected labels so Drilldown labels/fields stay populated when strict parser filters over-constrain the sample. -- translator/patterns: translate `pattern`/`extract` stages without named captures into line filters instead of emitting invalid VictoriaLogs `extract` pipes such as `extract "Metrics"`. - ### Tests -- drilldown/translator: update fallback coverage for empty strict scans and add regression cases for literal `extract`/`pattern` stage translation. +- compat/loki: make the query-semantics matrix and operation inventory required in CI, expand positive and negative Loki operation coverage across parser pipelines, unwrap range functions, boolean/set operators, and invalid log/metric combinations, and add unit coverage for the bare parser metric and `absent_over_time(...)` compatibility handlers plus cache-tier coverage for the current mainline helper cache paths. ## [1.10.2] - 2026-04-20 diff --git a/README.md b/README.md index ae7c9191..ab9bda36 100644 --- a/README.md +++ b/README.md @@ -13,9 +13,9 @@ [![VictoriaLogs Compatibility](https://github.com/ReliablyObserve/Loki-VL-proxy/actions/workflows/compat-vl.yaml/badge.svg?branch=main&event=push)](https://github.com/ReliablyObserve/Loki-VL-proxy/actions/workflows/compat-vl.yaml) [![Go Version](https://img.shields.io/github/go-mod/go-version/ReliablyObserve/Loki-VL-proxy)](https://go.dev/) [![Release](https://img.shields.io/github/v/release/ReliablyObserve/Loki-VL-proxy)](https://github.com/ReliablyObserve/Loki-VL-proxy/releases) -[![Lines of Code](https://img.shields.io/badge/go%20loc-79.3k-blue)](https://github.com/ReliablyObserve/Loki-VL-proxy) -[![Tests](https://img.shields.io/badge/tests-1811%20passed-brightgreen)](#tests) -[![Coverage](https://img.shields.io/badge/coverage-89.1%25-green)](#tests) +[![Lines of Code](https://img.shields.io/badge/go%20loc-79.2k-blue)](https://github.com/ReliablyObserve/Loki-VL-proxy) +[![Tests](https://img.shields.io/badge/tests-1809%20passed-brightgreen)](#tests) +[![Coverage](https://img.shields.io/badge/coverage-89.2%25-green)](#tests) [![LogQL Coverage](https://img.shields.io/badge/LogQL%20coverage-100%25-brightgreen)](#logql-compatibility) [![License](https://img.shields.io/github/license/ReliablyObserve/Loki-VL-proxy)](LICENSE) [![CodeQL](https://github.com/ReliablyObserve/Loki-VL-proxy/actions/workflows/codeql.yaml/badge.svg?branch=main&event=push)](https://github.com/ReliablyObserve/Loki-VL-proxy/actions/workflows/codeql.yaml) @@ -44,6 +44,9 @@ Related docs: [Architecture](docs/architecture.md), [Compatibility Matrix](docs/ - Loki-compatible read API for Grafana datasource, Explore, Drilldown, and API clients. - Strict tuple contracts: default 2-tuple, explicit 3-tuple only via `categorize-labels`. +- Manifest-driven Loki query semantics parity tests against a real Loki oracle for valid and invalid LogQL combinations, backed by an explicit Loki operation inventory that is machine-checked in CI. +- Required coverage now includes parser pipelines, bare `unwrap` range functions, `absent_over_time`, scalar `bool` comparisons, vector set operators, and invalid LogQL forms that must fail the same way as Loki. +- Required PR compatibility gate for Loki-facing query semantics through the pinned `compat-loki` workflow. - Grafana Logs Drilldown patterns support through `/loki/api/v1/patterns`, with explicit `-patterns-enabled` control when deployments need it disabled. - Loki-compatible patterns endpoint with optional restart-safe persistence. - Automatic pattern autodetection from successful `query`/`query_range` responses (`-patterns-autodetect-from-queries`) to keep Drilldown patterns warm behind the scenes. @@ -80,7 +83,7 @@ Related docs: [Compatibility Matrix](docs/compatibility-matrix.md), [Patterns](d - Layered CI security gates: `gitleaks`, `gosec`, `Trivy`, `actionlint`, `hadolint`, `OpenSSF Scorecard`, custom runtime regressions, OWASP ZAP, and curated `Nuclei`. - Proxy-specific security coverage for tenant isolation, cache boundaries, browser-origin enforcement on `/tail`, forwarded auth/header handling, and debug/admin exposure. -Related docs: [Security](docs/security.md), [Security Policy](SECURITY.md), [Testing](docs/testing.md) +Related docs: [Security](docs/security.md), [Security Policy](SECURITY.md), [Testing](docs/testing.md), [Compatibility Matrix](docs/compatibility-matrix.md) ## UI Gallery diff --git a/charts/loki-vl-proxy/Chart.yaml b/charts/loki-vl-proxy/Chart.yaml index ae4d6b0e..49bbe74b 100644 --- a/charts/loki-vl-proxy/Chart.yaml +++ b/charts/loki-vl-proxy/Chart.yaml @@ -2,8 +2,8 @@ apiVersion: v2 name: loki-vl-proxy description: HTTP proxy translating Loki API to VictoriaLogs type: application -version: 1.10.3 -appVersion: "1.10.3" +version: 1.10.2 +appVersion: "1.10.2" home: https://github.com/ReliablyObserve/Loki-VL-proxy sources: - https://github.com/ReliablyObserve/Loki-VL-proxy diff --git a/docs/compatibility-loki.md b/docs/compatibility-loki.md index 367c9b08..008b7963 100644 --- a/docs/compatibility-loki.md +++ b/docs/compatibility-loki.md @@ -7,11 +7,14 @@ This track measures how closely the proxy behaves like Loki on Loki-facing APIs - `/labels`, `/label//values`, `/series`, `/query`, `/query_range` - LogQL parser, filter, metric, and OTel label compatibility - Synthetic compatibility labels the proxy must expose to Loki clients, such as `service_name` +- Manifest-driven query semantics parity against real Loki in [query-semantics-matrix.json](../test/e2e-compat/query-semantics-matrix.json) +- Tracked operation inventory in [query-semantics-operations.json](../test/e2e-compat/query-semantics-operations.json) ## CI And Score - Workflow: `compat-loki.yaml` - Score test: `TestLokiTrackScore` +- Required PR gate: `loki-pinned`, which runs the `TestQuerySemantics*` inventory + matrix suite - Runtime matrix: real Loki images - Support window: current Loki minor family plus one minor family behind @@ -41,3 +44,86 @@ The Loki matrix is a moving window. When a new Loki minor becomes current, the m - `detected_level` grouped metric queries used by Grafana log volume panels - OTel dotted and underscore label parity through the underscore proxy - Series and label-value parity for labels synthesized by the proxy + +## Query Families In The Loki Semantics Matrix + +The Loki semantics matrix focuses on query combinations where the proxy should match Loki as closely as possible on: + +- HTTP status +- payload `status` +- `errorType` +- `resultType` +- line-count parity for log streams +- series-count parity for vectors and matrices +- exact metric-label-set parity for label-sensitive metric families such as bare parser metrics + +The tracked operation inventory in [query-semantics-operations.json](../test/e2e-compat/query-semantics-operations.json) is machine-checked in CI. Every matrix case must belong to at least one inventory operation, and every inventory operation must reference live matrix cases. + +Covered valid families: + +| Family | Representative cases | +|---|---| +| Stream selectors | exact match, multi-label match, regex, negative match | +| Line filters | `|=`, `!=`, `|~`, `!~`, chained filter pipelines, negative-regex exclusions | +| Parser pipelines | `json`, `logfmt`, `regexp`, `pattern`, parser plus exact/regex/numeric field filter, `label_format` | +| Metric range queries | `count_over_time`, `rate`, `bytes_over_time`, `bytes_rate`, `absent_over_time`, grouped range aggregations, parser-inside-range filters, bare unwrap range functions | +| Aggregations | `sum by(...)`, `without(...)`, `topk(...)`, `bottomk(...)`, `sort(...)`, `sort_desc(...)` | +| Binary operations | scalar comparisons/math, `bool` comparisons, and vector-to-vector operations such as `/`, `and`, `or`, and `unless` over valid metric expressions | + +Detailed operation inventory: + +| Category | Operations currently enforced in CI | +|---|---| +| Selectors | exact selectors, multi-label regex selectors, negative-regex selectors | +| Line filters | `|=`, `!=`, `|~`, `!~`, mixed chained filters | +| Parsers | `json`, `logfmt`, `regexp`, `pattern` plus parsed/extracted field filters | +| Formatting | `line_format`, `label_format`, `keep`, `drop` | +| Metric functions | `count_over_time`, `rate`, `bytes_over_time`, `bytes_rate`, `absent_over_time`, `sum/avg/max/min/first/last/stddev/stdvar_over_time` with `unwrap`, `quantile_over_time` with `unwrap` | +| Aggregations | `sum by(...)`, `sum without(...)`, `topk`, `bottomk`, `sort`, `sort_desc` | +| Binary operators | scalar math/comparison, scalar `bool` comparison, vector arithmetic, `on(...)`, `group_left(...)`, logical `and`, `or`, `unless` | +| Invalid shapes | log-query aggregation misuse, missing metric range, malformed selector/parser syntax, invalid log-query binary ops | + +Explicit invalid families: + +| Family | Representative cases | +|---|---| +| Metric aggregation over a log query | `sum by(job) ({selector})` | +| Post-aggregation over a log query | `topk(2, {selector})`, `sort({selector})` | +| Missing range on a metric function | `rate({selector})` | +| Malformed selector / syntax | broken braces, parser syntax errors | +| Invalid binary shape | log-query to scalar/vector binary expressions | + +These are intentionally called out because they are easy to regress while changing translation, shaping, or query planning. + +## What Stays Outside Loki Parity + +Some important compatibility behavior is still tested, but it is not part of the strict Loki parity matrix: + +- synthetic `service_name` recovery when the backend only has structured metadata +- Drilldown helper endpoints like detected labels, detected fields, field values, volume, and patterns +- stale-on-error helper behavior under VictoriaLogs failures + +Those cases live in the proxy contract suite because Loki itself is not the source of truth for them. + +## Parity Rule + +Valid Loki behavior is not an allowed exclusion category. + +If a query shape works in real Loki and the proxy does not match it, that is treated as a parity bug and should be fixed or tracked with an explicit regression case. The bare parser-metric and bare `unwrap` metric shapes now live inside the required matrix for that reason. + +## Detailed Edge Cases Now Gated + +The required matrix is intentionally not limited to happy-path selectors. It now includes: + +- parser-derived metric labelsets, not just result counts +- bare `unwrap` range functions where Loki keeps parsed labels but not the unwrap target field itself +- `pattern` parser extraction semantics +- set-style binary operators such as `or` and `unless` +- `bool` comparison semantics on metric expressions +- invalid log/metric shape rejections that must fail with the same class of error as Loki + +When a new LogQL family is implemented or fixed in the proxy, the expectation is to add: + +1. a runtime matrix case in `query-semantics-matrix.json` +2. an inventory entry in `query-semantics-operations.json` +3. a local or unit regression if the fix needed proxy-side translation or shaping changes diff --git a/docs/compatibility-matrix.md b/docs/compatibility-matrix.md index 29a593da..94d6d2f4 100644 --- a/docs/compatibility-matrix.md +++ b/docs/compatibility-matrix.md @@ -16,6 +16,57 @@ The GitHub Actions compatibility workflows read their matrix lists directly from VictoriaLogs capability profiles (used by runtime version sensing in proxy code) are tracked in the same manifest under `stack.victorialogs.capability_profiles`. Grafana runtime and client-surface capability profiles are tracked under `stack.grafana_loki_datasource_contract.capability_profiles` and `stack.logs_drilldown_contract.capability_profiles`. +## Query Semantics Matrix + +Version coverage and query semantics are tracked separately on purpose. + +- [compatibility-matrix.json](../test/e2e-compat/compatibility-matrix.json) answers which runtime families we support in CI. +- [query-semantics-matrix.json](../test/e2e-compat/query-semantics-matrix.json) answers which Loki-facing query combinations must behave the same as real Loki. +- [query-semantics-operations.json](../test/e2e-compat/query-semantics-operations.json) answers which Loki operations are explicitly tracked by the required semantics gate. + +The query semantics matrix is manifest-driven and runs against a live Docker Compose stack with: + +- real Loki as the oracle +- Loki-VL-proxy against VictoriaLogs as the system under test +- the required `compat-loki` GitHub Actions workflow as the enforcement gate on pull requests +- the `loki-pinned` PR job as the stable required runtime check, with wider Loki-family coverage in the scheduled matrix + +Each case declares: + +- query family such as selector, parser pipeline, metric aggregation, binary op, or invalid syntax +- endpoint shape: `query` or `query_range` +- expected outcome: `success`, `client_error`, or `server_error` +- expected `resultType` when the query is valid +- comparison mode such as exact line-count parity, exact series-count parity, or exact metric-label-set parity + +The operation inventory is machine-checked alongside the matrix so the repo does not quietly drift into “tested cases” without a visible coverage model. + +Representative covered combinations now include: + +- selector + regex / negative-regex combinations +- chained line filters including exclusion cases +- parser pipelines with exact, numeric, regex, and pattern field filters +- parser-inside-range metric queries and bare `unwrap` range functions +- `absent_over_time(...)` semantics for missing selectors +- scalar `bool` comparisons and vector set operators like `or` / `unless` +- instant post-aggregations like `topk`, `bottomk`, `sort`, and `sort_desc` +- invalid log-query aggregations that must fail the same way as Loki + +This keeps the repo explicit about what must match Loki exactly versus what is a proxy-only Grafana contract. + +## Proxy-Only Contract Matrix + +Some important behaviors are intentionally not judged against Loki because they only exist in the proxy compatibility layer. + +Examples: + +- synthetic label translation such as `service_name` +- Grafana Drilldown helper endpoints like detected labels, detected fields, field values, volume, and patterns +- stale-on-error helper behavior +- bounded fallback and recovery behavior when VictoriaLogs discovery endpoints fail + +Those cases belong in the proxy contract suite, not the strict Loki semantics parity matrix. + ## Support Window Policy The matrix is intentionally not open-ended. For every upstream we support a moving window that advances as new releases land: diff --git a/docs/testing.md b/docs/testing.md index f321bba6..bf3553ef 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -18,6 +18,9 @@ docker compose up -d --build ../../scripts/ci/wait_e2e_stack.sh 180 go test -v -tags=e2e -timeout=180s ./test/e2e-compat/ +# Manifest-driven Loki query semantics parity + inventory +go test -v -tags=e2e -run '^TestQuerySemantics' ./test/e2e-compat/ + # Track-specific scores go test -v -tags=e2e -run '^TestLokiTrackScore$' ./test/e2e-compat/ go test -v -tags=e2e -run '^TestDrilldownTrackScore$' ./test/e2e-compat/ @@ -172,6 +175,8 @@ Recent PRs added targeted guards in areas that were previously flaky in live Gra | `test/e2e-compat/` | Docker-based Loki vs proxy comparison | | `test/e2e-compat/drilldown_compat_test.go` | Grafana Logs Drilldown resource contracts via Grafana datasource proxy | | `test/e2e-compat/explore_contract_test.go` | HTTP-level Explore contracts for line filters, parsers, direction, metric shape, `label_format`, invalid-query handling | +| `test/e2e-compat/query_semantics_matrix_test.go` | Manifest-driven Loki query semantics parity against the real Loki Compose oracle | +| `test/e2e-compat/query-semantics-matrix.json` | Single source of truth for valid/invalid query combinations and edge-case expectations | | `test/e2e-compat/grafana_surface_test.go` | Grafana datasource catalog, datasource health, proxy bootstrap/control-plane surface | | `test/e2e-compat/features_test.go` | Live Grafana-facing edge cases including multi-tenant `__tenant_id__`, long-lived tail sessions, and Drilldown level-filter regressions | | `test/e2e-ui/tests/url-state.spec.ts` | Pure URL/state builder tests for Explore and Logs Drilldown reloadable state | @@ -227,6 +232,13 @@ CI prefers the runner's existing Chrome/Chromium binary for these shards and fal ## E2E Compatibility Matrix +The repo now keeps two different matrix files in `test/e2e-compat`: + +- `compatibility-matrix.json` for runtime/version coverage +- `query-semantics-matrix.json` for manifest-driven Loki query/operator parity + +They solve different problems and should evolve independently. + The Docker-backed `test/e2e-compat` suite now runs as four functional PR shards instead of one monolithic job. Each shard builds the stack, waits on explicit HTTP readiness checks, and runs only its own test family. | Shard | Primary scope | @@ -243,6 +255,53 @@ The GitHub-hosted Docker jobs now also prebuild the proxy image once per job thr Compose-backed fleet cache smoke now runs on pull requests and post-merge `main` in CI (`e2e-fleet`), using the dedicated `TestFleetSmoke_*` suite. Tuple smoke contract canary also runs automatically in CI (`tuple-smoke`) by seeding e2e data then executing `scripts/smoke-test.sh`. +## Query Semantics Matrix + +`TestQuerySemanticsMatrix` reads [`query-semantics-matrix.json`](../test/e2e-compat/query-semantics-matrix.json) and executes each case against: + +- real Loki in the Compose stack +- Loki-VL-proxy backed by VictoriaLogs +- the required `compat-loki` CI workflow for pull-request enforcement + +`TestQuerySemanticsOperationsInventory` reads [`query-semantics-operations.json`](../test/e2e-compat/query-semantics-operations.json) and enforces: + +- every tracked Loki-facing operation references one or more live matrix cases +- every live matrix case is tracked by the operation inventory + +Each manifest entry declares: + +- query family +- endpoint shape: `query` or `query_range` +- expected outcome: `success`, `client_error`, or `server_error` +- expected `resultType` for valid queries +- exact comparison mode such as line-count parity, series-count parity, or exact metric-label-set parity + +This is where the repo makes tricky edge cases explicit instead of relying on scattered ad hoc tests. Representative cases include: + +- valid parser pipelines like `| json`, `| logfmt`, `| regexp`, and `| pattern` +- valid parser regex and numeric filters +- grouped metric queries such as `sum by(level)(count_over_time(...))` +- parser-inside-range metric queries such as `count_over_time({selector} | json | status >= 500 [5m])` +- byte-oriented parser metrics such as `bytes_over_time(...)` and `bytes_rate(...)` +- bare `unwrap` metrics such as `sum/avg/max/min/first/last/stddev/stdvar/quantile_over_time(... | unwrap field [5m])` +- `absent_over_time(...)` semantics for missing selectors +- instant post-aggregations such as `topk`, `bottomk`, `sort`, and `sort_desc` +- scalar and vector binary operations over valid metric expressions, including scalar `bool` comparison and vector `or` / `unless` +- invalid forms like `sum by(job) ({selector})` +- invalid forms like `topk(2, {selector})` and `sort({selector})` +- invalid forms like `rate({selector})` without a range + +Proxy-only Grafana helper behavior such as synthetic labels, Drilldown fields, stale-on-error helpers, and detected-label recovery stays outside this matrix and belongs in the dedicated proxy contract tests. + +Valid Loki behavior is not an accepted exclusion class. If a query works in real Loki and diverges in the proxy, it should be fixed and added to the required matrix or tracked immediately as a parity bug with a dedicated regression target. + +Detailed docs are part of the gate now. When the manifest grows into a new LogQL family, update: + +- `docs/compatibility-loki.md` +- `docs/compatibility-matrix.md` +- `docs/testing.md` +- `README.md` + ### `datasource` shard | Test | Purpose | @@ -380,7 +439,7 @@ Required-check note: That report is part of the required PR gate. It is still a smoke signal rather than a full benchmark lab run, but it now blocks obvious regressions in coverage, compatibility, and the tracked performance signals. -See [compatibility-matrix.md](compatibility-matrix.md), [compatibility-loki.md](compatibility-loki.md), [compatibility-drilldown.md](compatibility-drilldown.md), [compatibility-victorialogs.md](compatibility-victorialogs.md), and [compatibility-matrix.json](../test/e2e-compat/compatibility-matrix.json). +See [compatibility-matrix.md](compatibility-matrix.md), [compatibility-loki.md](compatibility-loki.md), [compatibility-drilldown.md](compatibility-drilldown.md), [compatibility-victorialogs.md](compatibility-victorialogs.md), [compatibility-matrix.json](../test/e2e-compat/compatibility-matrix.json), and [query-semantics-matrix.json](../test/e2e-compat/query-semantics-matrix.json). ## Running Specific Tests diff --git a/internal/cache/cache_coverage_test.go b/internal/cache/cache_coverage_test.go index 7db0c7bd..97226729 100644 --- a/internal/cache/cache_coverage_test.go +++ b/internal/cache/cache_coverage_test.go @@ -106,6 +106,71 @@ func TestGetSharedWithTTL_PromotesFreshDiskHitIntoL1(t *testing.T) { } } +func TestGetSharedWithTTL_ReturnsFreshL1Hit(t *testing.T) { + c := NewWithMaxBytes(10*time.Second, 100, 1024*1024) + defer c.Close() + + c.SetWithTTL("l1-only", []byte("payload"), 5*time.Second) + + value, ttl, tier, ok := c.GetSharedWithTTL("l1-only") + if !ok { + t.Fatal("expected shared L1 hit") + } + if string(value) != "payload" { + t.Fatalf("unexpected payload %q", value) + } + if tier != "l1_memory" { + t.Fatalf("expected l1_memory tier, got %q", tier) + } + if ttl <= 0 { + t.Fatalf("expected positive TTL, got %v", ttl) + } + if stats := c.Stats(); stats.L1.Requests != 1 || stats.L1.Hits != 1 { + t.Fatalf("unexpected L1 stats after shared hit: %+v", stats) + } +} + +func TestGetSharedWithTTL_MissRecordsBackendFallthrough(t *testing.T) { + c := NewWithMaxBytes(10*time.Second, 100, 1024*1024) + defer c.Close() + + if value, ttl, tier, ok := c.GetSharedWithTTL("missing"); ok || value != nil || ttl != 0 || tier != "" { + t.Fatalf("expected shared miss, got ok=%v value=%q ttl=%v tier=%q", ok, string(value), ttl, tier) + } + stats := c.Stats() + if stats.L1.Requests != 1 || stats.L1.Misses != 1 { + t.Fatalf("unexpected L1 miss stats: %+v", stats) + } + if stats.BackendFallthrough != 1 { + t.Fatalf("expected one backend fallthrough, got %+v", stats) + } +} + +func TestSetLocalAndDiskWithTTL_PersistsWithoutNeedingPeerTier(t *testing.T) { + dc, err := NewDiskCache(DiskCacheConfig{ + Path: tempDBPath(t), + Compression: false, + }) + if err != nil { + t.Fatal(err) + } + defer func() { _ = dc.Close() }() + + c := NewWithMaxBytes(10*time.Second, 100, 1024*1024) + defer c.Close() + c.SetL2(dc) + + c.SetLocalAndDiskWithTTL("local-disk", []byte("payload"), 5*time.Second) + dc.Flush() + + if value, ttl, ok := c.GetWithTTL("local-disk"); !ok || string(value) != "payload" || ttl <= 0 { + t.Fatalf("expected local L1 payload, got ok=%v value=%q ttl=%v", ok, string(value), ttl) + } + if value, ttl, ok := dc.GetWithTTL("local-disk"); !ok || string(value) != "payload" || ttl <= 0 { + t.Fatalf("expected local disk payload, got ok=%v value=%q ttl=%v", ok, string(value), ttl) + } +} + func TestGetRecoverableStaleWithTTL_ReturnsDiskStaleValue(t *testing.T) { dc, err := NewDiskCache(DiskCacheConfig{ Path: tempDBPath(t), @@ -142,6 +207,31 @@ func TestGetRecoverableStaleWithTTL_ReturnsDiskStaleValue(t *testing.T) { } } +func TestGetRecoverableStaleWithTTL_PrefersL1StaleValue(t *testing.T) { + c := NewWithMaxBytes(1*time.Millisecond, 100, 1024*1024) + defer c.Close() + + c.Set("stale-l1", []byte("payload")) + time.Sleep(5 * time.Millisecond) + + value, ttl, tier, ok := c.GetRecoverableStaleWithTTL("stale-l1") + if !ok { + t.Fatal("expected stale L1 payload") + } + if string(value) != "payload" { + t.Fatalf("unexpected payload %q", value) + } + if ttl >= 0 { + t.Fatalf("expected negative stale TTL, got %v", ttl) + } + if tier != "l1_memory" { + t.Fatalf("expected l1_memory stale tier, got %q", tier) + } + if stats := c.Stats(); stats.L1.StaleHits != 1 { + t.Fatalf("expected one stale L1 hit, got %+v", stats) + } +} + func TestCache_MaxEntrySizeBytes(t *testing.T) { c := NewWithMaxBytes(10*time.Second, 100, 1000) defer c.Close() diff --git a/internal/proxy/coverage_gaps_test.go b/internal/proxy/coverage_gaps_test.go index c2878906..3b2c69d2 100644 --- a/internal/proxy/coverage_gaps_test.go +++ b/internal/proxy/coverage_gaps_test.go @@ -520,7 +520,7 @@ func TestDetectedFieldValues_FieldFilterFallbackKeepsValuesVisible(t *testing.T) } } -func TestDetectedFields_EmptyStrictQueryRelaxesCandidates(t *testing.T) { +func TestDetectedFields_EmptyStrictQueryDoesNotRelaxCandidates(t *testing.T) { const strictToken = "strict-only" var fieldNameQueries []string @@ -565,23 +565,13 @@ func TestDetectedFields_EmptyStrictQueryRelaxesCandidates(t *testing.T) { if w.Code != http.StatusOK { t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) } - if len(fieldNameQueries) < 2 { - t.Fatalf("expected strict+relaxed native field-name lookups, got %v", fieldNameQueries) - } - if !strings.Contains(fieldNameQueries[0], strictToken) { - t.Fatalf("expected first native field-name lookup to stay strict, got %v", fieldNameQueries) - } - if strings.Contains(fieldNameQueries[len(fieldNameQueries)-1], strictToken) { - t.Fatalf("expected final native field-name lookup to relax whole-query filters, got %v", fieldNameQueries) - } - if len(scanQueries) < 2 { - t.Fatalf("expected strict+relaxed field scans, got %v", scanQueries) + if len(fieldNameQueries) != 1 { + t.Fatalf("expected only the strict native field-name lookup, got %v", fieldNameQueries) } - if !strings.Contains(scanQueries[0], strictToken) { - t.Fatalf("expected first field scan to stay strict, got %v", scanQueries) - } - if strings.Contains(scanQueries[len(scanQueries)-1], strictToken) { - t.Fatalf("expected final field scan to relax whole-query filters, got %v", scanQueries) + for _, got := range scanQueries { + if !strings.Contains(got, strictToken) { + t.Fatalf("expected scan lookup to preserve strict filter, got %q", got) + } } var resp map[string]interface{} @@ -589,19 +579,8 @@ func TestDetectedFields_EmptyStrictQueryRelaxesCandidates(t *testing.T) { t.Fatalf("unmarshal response: %v", err) } fields, _ := resp["fields"].([]interface{}) - if len(fields) == 0 { - t.Fatalf("expected detected_fields payload after relaxed fallback, got %v", resp) - } - foundStatus := false - for _, raw := range fields { - item, _ := raw.(map[string]interface{}) - if item["label"] == "status" { - foundStatus = true - break - } - } - if !foundStatus { - t.Fatalf("expected relaxed detected_fields payload to include status, got %v", resp) + if len(fields) != 0 { + t.Fatalf("expected empty detected_fields payload for strict empty query, got %v", resp) } } diff --git a/internal/proxy/drilldown.go b/internal/proxy/drilldown.go index ab8650fa..362b10b8 100644 --- a/internal/proxy/drilldown.go +++ b/internal/proxy/drilldown.go @@ -1133,7 +1133,7 @@ func (p *Proxy) detectFields(ctx context.Context, query, start, end string, line candidates := fieldDetectionQueryCandidates(query) hadScanFailure := false var lastErr error - for i, candidate := range candidates { + for _, candidate := range candidates { logsqlQuery, err := p.translateQuery(candidate) if err != nil { lastErr = err @@ -1183,10 +1183,13 @@ func (p *Proxy) detectFields(ctx context.Context, query, start, end string, line p.setCachedDetectedFields(ctx, query, start, end, lineLimit, fieldList, fieldValues) return fieldList, fieldValues, nil } - if i+1 < len(candidates) { - p.observeInternalOperation(ctx, "discovery_fallback", "detected_fields_empty_primary", 0) - continue + if len(candidates) > 1 && !hadScanFailure { + emptyFields := []map[string]interface{}{} + emptyValues := map[string][]string{} + p.setCachedDetectedFields(ctx, query, start, end, lineLimit, emptyFields, emptyValues) + return emptyFields, emptyValues, nil } + break } if len(nativeFields) > 0 { @@ -1200,12 +1203,6 @@ func (p *Proxy) detectFields(ctx context.Context, query, start, end string, line p.setCachedDetectedFields(ctx, query, start, end, lineLimit, fieldList, fieldValues) return fieldList, fieldValues, nil } - if !hadScanFailure && lastErr == nil { - emptyFields := []map[string]interface{}{} - emptyValues := map[string][]string{} - p.setCachedDetectedFields(ctx, query, start, end, lineLimit, emptyFields, emptyValues) - return emptyFields, emptyValues, nil - } return nil, nil, lastErr } @@ -1506,22 +1503,14 @@ func (p *Proxy) fetchNativeFieldNamesForCandidate(ctx context.Context, candidate func (p *Proxy) fetchNativeFieldNames(ctx context.Context, query, start, end string) ([]string, error) { var lastErr error - candidates := fieldDetectionQueryCandidates(query) - for i, candidate := range candidates { + for _, candidate := range fieldDetectionQueryCandidates(query) { fields, err := p.fetchNativeFieldNamesForCandidate(ctx, candidate, start, end) if err != nil { lastErr = err continue } - if len(fields) == 0 && i+1 < len(candidates) { - p.observeInternalOperation(ctx, "discovery_fallback", "native_field_names_empty_primary", 0) - continue - } return fields, nil } - if lastErr == nil { - return []string{}, nil - } return nil, lastErr } @@ -1626,7 +1615,6 @@ func (p *Proxy) resolveNativeDetectedField(ctx context.Context, query, start, en } if i+1 < len(queryCandidates) { p.observeInternalOperation(ctx, "discovery_fallback", "native_detected_field_empty_primary", 0) - continue } return "", false, nil } @@ -1666,8 +1654,7 @@ func (p *Proxy) detectNativeLabels(ctx context.Context, query, start, end string func (p *Proxy) fetchNativeStreams(ctx context.Context, query, start, end string) (*vlStreamsResponse, error) { var lastErr error - candidates := fieldDetectionQueryCandidates(query) - for i, candidate := range candidates { + for _, candidate := range fieldDetectionQueryCandidates(query) { logsqlQuery, err := p.translateQuery(candidate) if err != nil { lastErr = err @@ -1691,15 +1678,8 @@ func (p *Proxy) fetchNativeStreams(ctx context.Context, query, start, end string lastErr = err continue } - if len(parsed.Values) == 0 && i+1 < len(candidates) { - p.observeInternalOperation(ctx, "discovery_fallback", "native_streams_empty_primary", 0) - continue - } return &parsed, nil } - if lastErr == nil { - return &vlStreamsResponse{}, nil - } return &vlStreamsResponse{}, lastErr } @@ -1774,13 +1754,9 @@ func (p *Proxy) detectScannedLabels(ctx context.Context, query, start, end strin summaries := scanDetectedLabelSummaries(body, p.labelTranslator) if len(summaries) == 0 && i+1 < len(candidates) { p.observeInternalOperation(ctx, "discovery_fallback", "detected_labels_empty_primary", 0) - continue } return summaries, nil } - if lastErr == nil { - return map[string]*detectedLabelSummary{}, nil - } return nil, lastErr } diff --git a/internal/proxy/drilldown_coverage_test.go b/internal/proxy/drilldown_coverage_test.go index 1be2e5ec..355abca1 100644 --- a/internal/proxy/drilldown_coverage_test.go +++ b/internal/proxy/drilldown_coverage_test.go @@ -132,7 +132,7 @@ func TestFetchNativeFieldValues_RelaxesAfterSuccessfulEmptyPrimary(t *testing.T) } } -func TestDetectScannedLabels_RelaxesOnSuccessfulEmptyPrimary(t *testing.T) { +func TestDetectScannedLabels_DoesNotRelaxOnSuccessfulEmptyPrimary(t *testing.T) { var calls atomic.Int32 backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/select/logsql/query" { @@ -153,11 +153,11 @@ func TestDetectScannedLabels_RelaxesOnSuccessfulEmptyPrimary(t *testing.T) { if err != nil { t.Fatalf("detectScannedLabels returned error: %v", err) } - if labels["service_name"] == nil { - t.Fatalf("expected relaxed fallback labels, got %#v", labels) + if len(labels) != 0 { + t.Fatalf("expected empty primary response to stop without relaxed fallback, got %#v", labels) } - if got := calls.Load(); got != 2 { - t.Fatalf("expected strict+relaxed backend requests, got %d", got) + if got := calls.Load(); got != 1 { + t.Fatalf("expected exactly one backend request, got %d", got) } } diff --git a/internal/proxy/gaps_test.go b/internal/proxy/gaps_test.go index b8a87f48..5fe9e115 100644 --- a/internal/proxy/gaps_test.go +++ b/internal/proxy/gaps_test.go @@ -708,8 +708,8 @@ func TestCoverage_HandleQuery_LogQuery(t *testing.T) { r := httptest.NewRequest("GET", `/loki/api/v1/query?query={app="nginx"}`, nil) p.handleQuery(w, r) - if w.Code != http.StatusOK { - t.Errorf("expected 200, got %d", w.Code) + if w.Code != http.StatusBadRequest { + t.Errorf("expected 400, got %d", w.Code) } } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 2d64c374..e37536fc 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -2123,6 +2123,16 @@ func (p *Proxy) handleQueryRange(w http.ResponseWriter, r *http.Request) { logqlQuery = p.preferWorkingParser(r.Context(), logqlQuery, r.FormValue("start"), r.FormValue("end")) + if spec, ok := parseBareParserMetricCompatSpec(logqlQuery); ok { + p.proxyBareParserMetricQueryRange(w, r, start, logqlQuery, spec) + return + } + + if postAgg, ok := parseInstantMetricPostAggQuery(logqlQuery); ok { + p.handleInstantMetricPostAggregation(w, r, start, logqlQuery, postAgg) + return + } + logsqlQuery, err := p.translateQueryWithContext(r.Context(), logqlQuery) if err != nil { p.writeError(w, http.StatusBadRequest, err.Error()) @@ -2131,6 +2141,7 @@ func (p *Proxy) handleQueryRange(w http.ResponseWriter, r *http.Request) { } // Extract without() labels for post-processing logsqlQuery, withoutLabels := translator.ParseWithoutMarker(logsqlQuery) + logsqlQuery = preserveMetricStreamIdentity(logqlQuery, logsqlQuery, withoutLabels) p.log.Debug("translated query", "logsql", logsqlQuery, "without", withoutLabels) r = withOrgID(r) @@ -2235,6 +2246,20 @@ func (p *Proxy) handleQuery(w http.ResponseWriter, r *http.Request) { logqlQuery = p.preferWorkingParser(r.Context(), logqlQuery, r.FormValue("start"), r.FormValue("end")) + if spec, ok := parseBareParserMetricCompatSpec(logqlQuery); ok { + p.proxyBareParserMetricQuery(w, r, start, logqlQuery, spec) + return + } + if spec, ok := parseAbsentOverTimeCompatSpec(logqlQuery); ok { + p.proxyAbsentOverTimeQuery(w, r, start, logqlQuery, spec) + return + } + + if postAgg, ok := parseInstantMetricPostAggQuery(logqlQuery); ok { + p.handleInstantMetricPostAggregation(w, r, start, logqlQuery, postAgg) + return + } + logsqlQuery, err := p.translateQueryWithContext(r.Context(), logqlQuery) if err != nil { p.writeError(w, http.StatusBadRequest, err.Error()) @@ -2244,6 +2269,7 @@ func (p *Proxy) handleQuery(w http.ResponseWriter, r *http.Request) { // Extract without() labels for post-processing logsqlQuery, withoutLabels := translator.ParseWithoutMarker(logsqlQuery) + logsqlQuery = preserveMetricStreamIdentity(logqlQuery, logsqlQuery, withoutLabels) r = withOrgID(r) @@ -2263,7 +2289,7 @@ func (p *Proxy) handleQuery(w http.ResponseWriter, r *http.Request) { } else if isStatsQuery(logsqlQuery) { p.proxyStatsQuery(sc, r, logsqlQuery) } else { - p.proxyLogQuery(sc, r, logsqlQuery) + p.writeError(sc, http.StatusBadRequest, "log queries are not supported as an instant query type, please change your query to a range query type") } if bw != nil && len(withoutLabels) > 0 { @@ -2355,6 +2381,230 @@ func parseInstantVectorTime(timeParam string) int64 { return time.Now().UnixNano() } +type instantMetricPostAgg struct { + name string + inner string + k int +} + +func parseInstantMetricPostAggQuery(logql string) (instantMetricPostAgg, bool) { + logql = strings.TrimSpace(logql) + for _, name := range []string{"sort_desc", "sort"} { + prefix := name + "(" + if !strings.HasPrefix(logql, prefix) || !strings.HasSuffix(logql, ")") { + continue + } + inner := strings.TrimSpace(logql[len(prefix) : len(logql)-1]) + if inner == "" { + return instantMetricPostAgg{}, false + } + return instantMetricPostAgg{name: name, inner: inner}, true + } + for _, name := range []string{"topk", "bottomk"} { + prefix := name + "(" + if !strings.HasPrefix(logql, prefix) || !strings.HasSuffix(logql, ")") { + continue + } + args := strings.TrimSpace(logql[len(prefix) : len(logql)-1]) + comma := topLevelCommaIndex(args) + if comma <= 0 { + return instantMetricPostAgg{}, false + } + k, err := strconv.Atoi(strings.TrimSpace(args[:comma])) + if err != nil || k <= 0 { + return instantMetricPostAgg{}, false + } + inner := strings.TrimSpace(args[comma+1:]) + if inner == "" { + return instantMetricPostAgg{}, false + } + return instantMetricPostAgg{name: name, inner: inner, k: k}, true + } + return instantMetricPostAgg{}, false +} + +func topLevelCommaIndex(s string) int { + depth := 0 + inQuote := false + for i, r := range s { + switch r { + case '"': + inQuote = !inQuote + case '(': + if !inQuote { + depth++ + } + case ')': + if !inQuote && depth > 0 { + depth-- + } + case ',': + if !inQuote && depth == 0 { + return i + } + } + } + return -1 +} + +func preserveMetricStreamIdentity(originalLogQL, translatedLogsQL string, withoutLabels []string) string { + if !isStatsQuery(translatedLogsQL) { + return translatedLogsQL + } + if strings.Contains(translatedLogsQL, "| stats by (") { + return translatedLogsQL + } + if len(withoutLabels) > 0 || isBareMetricFunctionQuery(strings.TrimSpace(originalLogQL)) { + return addStatsByStreamClause(translatedLogsQL) + } + return translatedLogsQL +} + +func isBareMetricFunctionQuery(logql string) bool { + for _, prefix := range []string{ + "rate(", + "count_over_time(", + "bytes_over_time(", + "bytes_rate(", + "sum_over_time(", + "avg_over_time(", + "max_over_time(", + "min_over_time(", + "first_over_time(", + "last_over_time(", + "stddev_over_time(", + "stdvar_over_time(", + "absent_over_time(", + "quantile_over_time(", + } { + if strings.HasPrefix(logql, prefix) { + return true + } + } + return false +} + +func addStatsByStreamClause(logsqlQuery string) string { + idx := strings.Index(logsqlQuery, "| stats ") + if idx < 0 { + return logsqlQuery + } + statsStart := idx + len("| stats ") + return logsqlQuery[:statsStart] + "by (_stream, level) " + logsqlQuery[statsStart:] +} + +func (p *Proxy) handleInstantMetricPostAggregation(w http.ResponseWriter, r *http.Request, start time.Time, originalQuery string, postAgg instantMetricPostAgg) { + translatedInner, err := p.translateQueryWithContext(r.Context(), postAgg.inner) + if err != nil { + p.writeError(w, http.StatusBadRequest, err.Error()) + p.metrics.RecordRequest("query", http.StatusBadRequest, time.Since(start)) + return + } + translatedInner, withoutLabels := translator.ParseWithoutMarker(translatedInner) + translatedInner = preserveMetricStreamIdentity(postAgg.inner, translatedInner, withoutLabels) + + r = withOrgID(r) + + bw := &bufferedResponseWriter{header: make(http.Header)} + sc := &statusCapture{ResponseWriter: bw, code: 200} + if outerFunc, innerQL, rng, step, ok := translator.ParseSubqueryExpr(translatedInner); ok { + p.proxySubquery(sc, r, outerFunc, innerQL, rng, step) + } else if op, left, right, vm, ok := translator.ParseBinaryMetricExprFull(translatedInner); ok { + p.proxyBinaryMetricQueryVM(sc, r, op, left, right, vm) + } else if isStatsQuery(translatedInner) { + p.proxyStatsQuery(sc, r, translatedInner) + } else { + p.writeError(w, http.StatusBadRequest, "unsupported instant aggregation target") + p.metrics.RecordRequest("query", http.StatusBadRequest, time.Since(start)) + return + } + + if len(withoutLabels) > 0 { + bw.body = applyWithoutGrouping(bw.body, withoutLabels) + } + + if sc.code >= http.StatusBadRequest { + copyHeaders(w.Header(), bw.Header()) + if w.Header().Get("Content-Type") == "" { + w.Header().Set("Content-Type", "application/json") + } + w.WriteHeader(sc.code) + _, _ = w.Write(bw.body) + elapsed := time.Since(start) + p.metrics.RecordRequest("query", sc.code, elapsed) + p.queryTracker.Record("query", originalQuery, elapsed, true) + return + } + + result := applyInstantVectorPostAggregation(bw.body, postAgg) + copyHeaders(w.Header(), bw.Header()) + if w.Header().Get("Content-Type") == "" { + w.Header().Set("Content-Type", "application/json") + } + _, _ = w.Write(result) + elapsed := time.Since(start) + p.metrics.RecordRequest("query", http.StatusOK, elapsed) + p.queryTracker.Record("query", originalQuery, elapsed, false) +} + +func applyInstantVectorPostAggregation(body []byte, postAgg instantMetricPostAgg) []byte { + var resp struct { + Status string `json:"status"` + Data struct { + ResultType string `json:"resultType"` + Result []struct { + Metric map[string]interface{} `json:"metric"` + Value []interface{} `json:"value"` + } `json:"result"` + } `json:"data"` + } + if err := json.Unmarshal(body, &resp); err != nil || resp.Status != "success" || resp.Data.ResultType != "vector" { + return body + } + + sort.SliceStable(resp.Data.Result, func(i, j int) bool { + left := vectorPointValue(resp.Data.Result[i].Value) + right := vectorPointValue(resp.Data.Result[j].Value) + switch postAgg.name { + case "sort", "bottomk": + if left == right { + return metricKey(resp.Data.Result[i].Metric) < metricKey(resp.Data.Result[j].Metric) + } + return left < right + default: + if left == right { + return metricKey(resp.Data.Result[i].Metric) < metricKey(resp.Data.Result[j].Metric) + } + return left > right + } + }) + + if (postAgg.name == "topk" || postAgg.name == "bottomk") && postAgg.k < len(resp.Data.Result) { + resp.Data.Result = resp.Data.Result[:postAgg.k] + } + + out, err := json.Marshal(resp) + if err != nil { + return body + } + return out +} + +func vectorPointValue(value []interface{}) float64 { + if len(value) < 2 { + return 0 + } + switch raw := value[1].(type) { + case string: + parsed, _ := strconv.ParseFloat(raw, 64) + return parsed + case float64: + return raw + default: + return 0 + } +} + func applyConstantBinaryOp(left, right float64, op string) (float64, bool) { switch op { case "+": @@ -8544,6 +8794,10 @@ func (p *Proxy) proxyBinaryMetricVM(w http.ResponseWriter, r *http.Request, op, if len(vm.On) > 0 { result = applyOnMatching(leftBody, rightBody, op, vm.On, resultType) } else if len(vm.Ignoring) > 0 { + if err := validateVectorMatchCardinality(leftBody, rightBody, nil, vm.Ignoring, len(vm.GroupLeft) > 0, len(vm.GroupRight) > 0); err != nil { + p.writeError(w, http.StatusInternalServerError, err.Error()) + return + } result = applyIgnoringMatching(leftBody, rightBody, op, vm.Ignoring, resultType) } else { // group_left/group_right without on/ignoring — use default matching @@ -9685,6 +9939,24 @@ type requestedBucketRange struct { count int } +type bareParserMetricCompatSpec struct { + funcName string + baseQuery string + rangeWindow time.Duration + unwrapField string + quantile float64 +} + +type bareParserMetricSample struct { + tsNanos int64 + value float64 +} + +type bareParserMetricSeries struct { + metric map[string]string + samples []bareParserMetricSample +} + func parseFlexibleUnixSeconds(raw string) (int64, bool) { value := strings.TrimSpace(raw) if value == "" { @@ -9715,6 +9987,36 @@ func parseFlexibleUnixSeconds(raw string) (int64, bool) { return 0, false } +func parseFlexibleUnixNanos(raw string) (int64, bool) { + value := strings.TrimSpace(raw) + if value == "" { + return 0, false + } + if parsed, err := time.Parse(time.RFC3339Nano, value); err == nil { + return parsed.UnixNano(), true + } + if parsed, err := time.Parse(time.RFC3339, value); err == nil { + return parsed.UnixNano(), true + } + if integer, err := strconv.ParseInt(value, 10, 64); err == nil { + return normalizeUnixNanos(integer), true + } + if floating, err := strconv.ParseFloat(value, 64); err == nil { + abs := math.Abs(floating) + switch { + case abs >= 1_000_000_000_000_000_000: + return int64(floating), true + case abs >= 1_000_000_000_000_000: + return int64(floating * 1_000), true + case abs >= 1_000_000_000_000: + return int64(floating * 1_000_000), true + default: + return int64(floating * float64(time.Second)), true + } + } + return 0, false +} + func normalizeUnixSeconds(v int64) int64 { abs := v if abs < 0 { @@ -9732,6 +10034,23 @@ func normalizeUnixSeconds(v int64) int64 { } } +func normalizeUnixNanos(v int64) int64 { + abs := v + if abs < 0 { + abs = -abs + } + switch { + case abs >= 1_000_000_000_000_000_000: + return v + case abs >= 1_000_000_000_000_000: + return v * 1_000 + case abs >= 1_000_000_000_000: + return v * 1_000_000 + default: + return v * int64(time.Second) + } +} + func parsePositiveStepDuration(step string) (time.Duration, bool) { value := strings.TrimSpace(step) if value == "" { @@ -12044,6 +12363,16 @@ func (p *Proxy) preferWorkingParser(ctx context.Context, logql, start, end strin var metricParserProbeRE = regexp.MustCompile(`(?s)(?:count_over_time|bytes_over_time|rate|bytes_rate|sum_over_time|avg_over_time|max_over_time|min_over_time|first_over_time|last_over_time|stddev_over_time|stdvar_over_time|quantile_over_time)\((.*?)\[[^][]+\]\)`) +var ( + absentOverTimeCompatRE = regexp.MustCompile(`(?s)^\s*absent_over_time\(\s*(.*)\[([^][]+)\]\s*\)\s*$`) + bareParserMetricCompatRE = regexp.MustCompile(`(?s)^\s*(count_over_time|bytes_over_time|rate|bytes_rate|sum_over_time|avg_over_time|max_over_time|min_over_time|first_over_time|last_over_time|stddev_over_time|stdvar_over_time)\((.*)\[([^][]+)\]\)\s*$`) + bareParserQuantileCompatRE = regexp.MustCompile(`(?s)^\s*quantile_over_time\(\s*([0-9.]+)\s*,\s*(.*)\[([^][]+)\]\)\s*$`) + bareParserUnwrapFieldRE = regexp.MustCompile(`\|\s*unwrap\s+(?:(?:duration|bytes)\(([^)]+)\)|([A-Za-z0-9_.-]+))`) + regexpExtractingParserStageRE = regexp.MustCompile(`\|\s*regexp(?:\s+[^|]+)?`) + patternExtractingParserStageRE = regexp.MustCompile(`\|\s*pattern(?:\s+[^|]+)?`) + otherExtractingParserStageRE = regexp.MustCompile(`\|\s*(?:unpack|extract|extract_regexp)(?:\s+[^|]+)?`) +) + func extractParserProbeQuery(logql string) string { matches := metricParserProbeRE.FindStringSubmatch(logql) if len(matches) == 2 { @@ -12052,6 +12381,554 @@ func extractParserProbeQuery(logql string) string { return strings.TrimSpace(logql) } +func hasExtractingParserStage(logql string) bool { + for _, re := range []*regexp.Regexp{ + jsonParserStageRE, + logfmtParserStageRE, + regexpExtractingParserStageRE, + patternExtractingParserStageRE, + otherExtractingParserStageRE, + } { + if re.MatchString(logql) { + return true + } + } + return false +} + +func parseBareParserMetricCompatSpec(logql string) (bareParserMetricCompatSpec, bool) { + if matches := bareParserQuantileCompatRE.FindStringSubmatch(strings.TrimSpace(logql)); len(matches) == 4 { + baseQuery := strings.TrimSpace(matches[2]) + if baseQuery == "" || !hasExtractingParserStage(baseQuery) { + return bareParserMetricCompatSpec{}, false + } + window, ok := parsePositiveStepDuration(matches[3]) + if !ok { + return bareParserMetricCompatSpec{}, false + } + phi, err := strconv.ParseFloat(matches[1], 64) + if err != nil || phi < 0 || phi > 1 { + return bareParserMetricCompatSpec{}, false + } + unwrapField := extractBareParserUnwrapField(baseQuery) + if unwrapField == "" { + return bareParserMetricCompatSpec{}, false + } + return bareParserMetricCompatSpec{ + funcName: "quantile_over_time", + baseQuery: baseQuery, + rangeWindow: window, + unwrapField: unwrapField, + quantile: phi, + }, true + } + + matches := bareParserMetricCompatRE.FindStringSubmatch(strings.TrimSpace(logql)) + if len(matches) != 4 { + return bareParserMetricCompatSpec{}, false + } + baseQuery := strings.TrimSpace(matches[2]) + if baseQuery == "" || !hasExtractingParserStage(baseQuery) { + return bareParserMetricCompatSpec{}, false + } + window, ok := parsePositiveStepDuration(matches[3]) + if !ok { + return bareParserMetricCompatSpec{}, false + } + unwrapField := "" + switch matches[1] { + case "sum_over_time", "avg_over_time", "max_over_time", "min_over_time", "first_over_time", "last_over_time", "stddev_over_time", "stdvar_over_time": + unwrapField = extractBareParserUnwrapField(baseQuery) + if unwrapField == "" { + return bareParserMetricCompatSpec{}, false + } + } + return bareParserMetricCompatSpec{ + funcName: matches[1], + baseQuery: baseQuery, + rangeWindow: window, + unwrapField: unwrapField, + }, true +} + +func extractBareParserUnwrapField(query string) string { + matches := bareParserUnwrapFieldRE.FindStringSubmatch(query) + if len(matches) != 3 { + return "" + } + if field := strings.TrimSpace(matches[1]); field != "" { + return field + } + return strings.TrimSpace(matches[2]) +} + +func formatMetricSampleValue(v float64) string { + if math.IsNaN(v) { + return "NaN" + } + if math.IsInf(v, 1) { + return "+Inf" + } + if math.IsInf(v, -1) { + return "-Inf" + } + if math.Mod(v, 1) == 0 { + return strconv.FormatInt(int64(v), 10) + } + return strconv.FormatFloat(v, 'f', -1, 64) +} + +func metricWindowValue(funcName string, total float64, rangeWindow time.Duration) float64 { + switch funcName { + case "rate", "bytes_rate": + if rangeWindow <= 0 { + return 0 + } + return total / rangeWindow.Seconds() + default: + return total + } +} + +func (p *Proxy) fetchBareParserMetricSeries(ctx context.Context, originalQuery string, spec bareParserMetricCompatSpec, start, end string) ([]bareParserMetricSeries, error) { + logsqlQuery, err := p.translateQueryWithContext(ctx, spec.baseQuery) + if err != nil { + return nil, err + } + + params := url.Values{} + params.Set("query", logsqlQuery+" | sort by (_time)") + if start != "" { + params.Set("start", formatVLTimestamp(start)) + } + if end != "" { + params.Set("end", formatVLTimestamp(end)) + } + + resp, err := p.vlPost(ctx, "/select/logsql/query", params) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode >= http.StatusBadRequest { + body, _ := readBodyLimited(resp.Body, maxUpstreamErrorBodyBytes) + msg := strings.TrimSpace(string(body)) + if msg == "" { + msg = fmt.Sprintf("VL backend returned %d", resp.StatusCode) + } + return nil, &vlAPIError{status: resp.StatusCode, body: msg} + } + + scanner := bufio.NewScanner(resp.Body) + scanner.Buffer(make([]byte, 0, 64*1024), 8*1024*1024) + seriesByKey := make(map[string]*bareParserMetricSeries, 16) + streamLabelCache := make(map[string]map[string]string, 16) + streamDescriptorCache := make(map[string]cachedLogQueryStreamDescriptor, 16) + exposureCache := make(map[string][]metadataFieldExposure, 16) + + for scanner.Scan() { + line := bytes.TrimSpace(scanner.Bytes()) + if len(line) == 0 { + continue + } + + entry := vlEntryPool.Get().(map[string]interface{}) + for key := range entry { + delete(entry, key) + } + if err := json.Unmarshal(line, &entry); err != nil { + vlEntryPool.Put(entry) + continue + } + + tsNanos, ok := parseFlexibleUnixNanos(asString(entry["_time"])) + if !ok { + vlEntryPool.Put(entry) + continue + } + msg, _ := stringifyEntryValue(entry["_msg"]) + desc := p.logQueryStreamDescriptor(asString(entry["_stream"]), asString(entry["level"]), streamLabelCache, streamDescriptorCache) + _, parsedFields := p.classifyEntryMetadataFields(entry, desc.rawLabels, true, exposureCache) + metric := cloneStringMap(desc.translatedLabels) + for key, value := range parsedFields { + if spec.unwrapField != "" && key == spec.unwrapField { + continue + } + metric[key] = value + } + seriesKey := canonicalLabelsKey(metric) + series, ok := seriesByKey[seriesKey] + if !ok { + series = &bareParserMetricSeries{ + metric: metric, + samples: make([]bareParserMetricSample, 0, 8), + } + seriesByKey[seriesKey] = series + } + weight := 1.0 + if spec.unwrapField != "" { + rawValue, ok := stringifyEntryValue(entry[spec.unwrapField]) + if !ok { + vlEntryPool.Put(entry) + continue + } + parsedValue, err := strconv.ParseFloat(rawValue, 64) + if err != nil { + vlEntryPool.Put(entry) + continue + } + weight = parsedValue + } else if spec.funcName == "bytes_over_time" || spec.funcName == "bytes_rate" { + weight = float64(len(msg)) + } + series.samples = append(series.samples, bareParserMetricSample{tsNanos: tsNanos, value: weight}) + vlEntryPool.Put(entry) + } + if err := scanner.Err(); err != nil { + return nil, err + } + + result := make([]bareParserMetricSeries, 0, len(seriesByKey)) + for _, series := range seriesByKey { + result = append(result, *series) + } + sort.Slice(result, func(i, j int) bool { + return canonicalLabelsKey(result[i].metric) < canonicalLabelsKey(result[j].metric) + }) + return result, nil +} + +func bareParserMetricWindowValue(funcName string, window []bareParserMetricSample, spec bareParserMetricCompatSpec) float64 { + if len(window) == 0 { + return 0 + } + switch funcName { + case "count_over_time", "rate", "bytes_over_time", "bytes_rate", "sum_over_time": + total := 0.0 + for _, sample := range window { + total += sample.value + } + return metricWindowValue(funcName, total, spec.rangeWindow) + case "avg_over_time": + total := 0.0 + for _, sample := range window { + total += sample.value + } + return total / float64(len(window)) + case "max_over_time": + maxValue := window[0].value + for _, sample := range window[1:] { + if sample.value > maxValue { + maxValue = sample.value + } + } + return maxValue + case "min_over_time": + minValue := window[0].value + for _, sample := range window[1:] { + if sample.value < minValue { + minValue = sample.value + } + } + return minValue + case "first_over_time": + return window[0].value + case "last_over_time": + return window[len(window)-1].value + case "stddev_over_time", "stdvar_over_time": + mean := 0.0 + for _, sample := range window { + mean += sample.value + } + mean /= float64(len(window)) + variance := 0.0 + for _, sample := range window { + delta := sample.value - mean + variance += delta * delta + } + variance /= float64(len(window)) + if funcName == "stddev_over_time" { + return math.Sqrt(variance) + } + return variance + case "quantile_over_time": + values := make([]float64, 0, len(window)) + for _, sample := range window { + values = append(values, sample.value) + } + sort.Float64s(values) + if len(values) == 1 { + return values[0] + } + pos := spec.quantile * float64(len(values)-1) + lower := int(math.Floor(pos)) + upper := int(math.Ceil(pos)) + if lower == upper { + return values[lower] + } + weight := pos - float64(lower) + return values[lower] + ((values[upper] - values[lower]) * weight) + default: + return 0 + } +} + +func buildBareParserMetricMatrix(series []bareParserMetricSeries, startNanos, endNanos, stepNanos int64, spec bareParserMetricCompatSpec) map[string]interface{} { + result := make([]lokiMatrixResult, 0, len(series)) + windowNanos := int64(spec.rangeWindow) + for _, seriesItem := range series { + values := make([][]interface{}, 0, int(((endNanos-startNanos)/stepNanos)+1)) + samples := seriesItem.samples + left := 0 + right := 0 + for eval := startNanos; eval <= endNanos; eval += stepNanos { + lower := eval - windowNanos + for right < len(samples) && samples[right].tsNanos <= eval { + right++ + } + for left < right && samples[left].tsNanos < lower { + left++ + } + window := samples[left:right] + values = append(values, []interface{}{float64(eval) / float64(time.Second), formatMetricSampleValue(bareParserMetricWindowValue(spec.funcName, window, spec))}) + } + result = append(result, lokiMatrixResult{Metric: seriesItem.metric, Values: values}) + } + return map[string]interface{}{ + "status": "success", + "data": map[string]interface{}{ + "resultType": "matrix", + "result": result, + }, + } +} + +func buildBareParserMetricVector(series []bareParserMetricSeries, evalNanos int64, spec bareParserMetricCompatSpec) map[string]interface{} { + result := make([]lokiVectorResult, 0, len(series)) + windowNanos := int64(spec.rangeWindow) + for _, seriesItem := range series { + lower := evalNanos - windowNanos + window := make([]bareParserMetricSample, 0, len(seriesItem.samples)) + for _, sample := range seriesItem.samples { + if sample.tsNanos >= lower && sample.tsNanos <= evalNanos { + window = append(window, sample) + } + } + result = append(result, lokiVectorResult{ + Metric: seriesItem.metric, + Value: []interface{}{float64(evalNanos) / float64(time.Second), formatMetricSampleValue(bareParserMetricWindowValue(spec.funcName, window, spec))}, + }) + } + return map[string]interface{}{ + "status": "success", + "data": map[string]interface{}{ + "resultType": "vector", + "result": result, + }, + } +} + +type absentOverTimeCompatSpec struct { + baseQuery string + rangeWindow time.Duration +} + +func parseAbsentOverTimeCompatSpec(logql string) (absentOverTimeCompatSpec, bool) { + matches := absentOverTimeCompatRE.FindStringSubmatch(strings.TrimSpace(logql)) + if len(matches) != 3 { + return absentOverTimeCompatSpec{}, false + } + window, ok := parsePositiveStepDuration(matches[2]) + if !ok { + return absentOverTimeCompatSpec{}, false + } + baseQuery := strings.TrimSpace(matches[1]) + if baseQuery == "" { + return absentOverTimeCompatSpec{}, false + } + return absentOverTimeCompatSpec{baseQuery: baseQuery, rangeWindow: window}, true +} + +func extractAbsentMetricLabels(query string) map[string]string { + selector, _, ok := splitLeadingSelector(strings.TrimSpace(query)) + if !ok || len(selector) < 2 { + return map[string]string{} + } + matchers := splitSelectorMatchers(selector[1 : len(selector)-1]) + labels := make(map[string]string, len(matchers)) + for _, matcher := range matchers { + matcher = strings.TrimSpace(matcher) + if strings.Contains(matcher, "!=") || strings.Contains(matcher, "=~") || strings.Contains(matcher, "!~") { + continue + } + idx := strings.Index(matcher, "=") + if idx <= 0 { + continue + } + label := strings.TrimSpace(matcher[:idx]) + value := strings.TrimSpace(matcher[idx+1:]) + value = strings.Trim(value, "\"`") + if label == "" || value == "" { + continue + } + labels[label] = value + } + return labels +} + +func statsResponseIsEmpty(body []byte) bool { + var resp struct { + Data struct { + Result []lokiVectorResult `json:"result"` + } `json:"data"` + Results []lokiVectorResult `json:"results"` + } + if err := json.Unmarshal(body, &resp); err != nil { + return false + } + results := resp.Results + if len(results) == 0 { + results = resp.Data.Result + } + if len(results) == 0 { + return true + } + for _, item := range results { + if len(item.Value) < 2 { + continue + } + raw := fmt.Sprint(item.Value[1]) + raw = strings.Trim(raw, "\"") + value, err := strconv.ParseFloat(raw, 64) + if err != nil { + return false + } + if value != 0 { + return false + } + } + return true +} + +func buildAbsentInstantVector(evalRaw string, metric map[string]string) map[string]interface{} { + evalNs, ok := parseFlexibleUnixNanos(evalRaw) + if !ok { + evalNs = time.Now().UnixNano() + } + return map[string]interface{}{ + "status": "success", + "data": map[string]interface{}{ + "resultType": "vector", + "result": []lokiVectorResult{{ + Metric: metric, + Value: []interface{}{float64(evalNs) / float64(time.Second), "1"}, + }}, + }, + } +} + +func (p *Proxy) proxyAbsentOverTimeQuery(w http.ResponseWriter, r *http.Request, start time.Time, originalQuery string, spec absentOverTimeCompatSpec) { + logsqlQuery, err := p.translateQueryWithContext(r.Context(), originalQuery) + if err != nil { + p.writeError(w, http.StatusBadRequest, err.Error()) + p.metrics.RecordRequest("query", http.StatusBadRequest, time.Since(start)) + return + } + + params := url.Values{} + params.Set("query", logsqlQuery) + if t := r.FormValue("time"); t != "" { + params.Set("time", formatVLTimestamp(t)) + } + + resp, err := p.vlPost(r.Context(), "/select/logsql/stats_query", params) + if err != nil { + status := statusFromUpstreamErr(err) + p.writeError(w, status, err.Error()) + p.metrics.RecordRequest("query", status, time.Since(start)) + p.queryTracker.Record("query", originalQuery, time.Since(start), true) + return + } + defer resp.Body.Close() + + body, _ := readBodyLimited(resp.Body, maxBufferedBackendBodyBytes) + if resp.StatusCode >= http.StatusBadRequest { + p.writeError(w, resp.StatusCode, string(body)) + p.metrics.RecordRequest("query", resp.StatusCode, time.Since(start)) + p.queryTracker.Record("query", originalQuery, time.Since(start), true) + return + } + + body = p.translateStatsResponseLabelsWithContext(r.Context(), body, originalQuery) + var out []byte + if statsResponseIsEmpty(body) { + out, _ = json.Marshal(buildAbsentInstantVector(r.FormValue("time"), extractAbsentMetricLabels(spec.baseQuery))) + } else { + out = wrapAsLokiResponse([]byte(`{"result":[]}`), "vector") + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(out) + elapsed := time.Since(start) + p.metrics.RecordRequest("query", http.StatusOK, elapsed) + p.queryTracker.Record("query", originalQuery, elapsed, false) +} + +func (p *Proxy) proxyBareParserMetricQueryRange(w http.ResponseWriter, r *http.Request, start time.Time, originalQuery string, spec bareParserMetricCompatSpec) { + startNanos, ok := parseFlexibleUnixNanos(r.FormValue("start")) + if !ok { + p.writeError(w, http.StatusBadRequest, "invalid start timestamp") + p.metrics.RecordRequest("query_range", http.StatusBadRequest, time.Since(start)) + return + } + endNanos, ok := parseFlexibleUnixNanos(r.FormValue("end")) + if !ok || endNanos < startNanos { + p.writeError(w, http.StatusBadRequest, "invalid end timestamp") + p.metrics.RecordRequest("query_range", http.StatusBadRequest, time.Since(start)) + return + } + stepDur, ok := parsePositiveStepDuration(r.FormValue("step")) + if !ok { + p.writeError(w, http.StatusBadRequest, "invalid step") + p.metrics.RecordRequest("query_range", http.StatusBadRequest, time.Since(start)) + return + } + series, err := p.fetchBareParserMetricSeries(r.Context(), originalQuery, spec, r.FormValue("start"), r.FormValue("end")) + if err != nil { + status := statusFromUpstreamErr(err) + p.writeError(w, status, err.Error()) + p.metrics.RecordRequest("query_range", status, time.Since(start)) + p.queryTracker.Record("query_range", originalQuery, time.Since(start), true) + return + } + w.Header().Set("Content-Type", "application/json") + marshalJSON(w, buildBareParserMetricMatrix(series, startNanos, endNanos, int64(stepDur), spec)) + elapsed := time.Since(start) + p.metrics.RecordRequest("query_range", http.StatusOK, elapsed) + p.queryTracker.Record("query_range", originalQuery, elapsed, false) +} + +func (p *Proxy) proxyBareParserMetricQuery(w http.ResponseWriter, r *http.Request, start time.Time, originalQuery string, spec bareParserMetricCompatSpec) { + evalNanos, ok := parseFlexibleUnixNanos(r.FormValue("time")) + if !ok { + evalNanos = time.Now().UnixNano() + } + startWindow := strconv.FormatInt(evalNanos-int64(spec.rangeWindow), 10) + endWindow := strconv.FormatInt(evalNanos, 10) + series, err := p.fetchBareParserMetricSeries(r.Context(), originalQuery, spec, startWindow, endWindow) + if err != nil { + status := statusFromUpstreamErr(err) + p.writeError(w, status, err.Error()) + p.metrics.RecordRequest("query", status, time.Since(start)) + p.queryTracker.Record("query", originalQuery, time.Since(start), true) + return + } + w.Header().Set("Content-Type", "application/json") + marshalJSON(w, buildBareParserMetricVector(series, evalNanos, spec)) + elapsed := time.Since(start) + p.metrics.RecordRequest("query", http.StatusOK, elapsed) + p.queryTracker.Record("query", originalQuery, elapsed, false) +} + func (p *Proxy) translateStatsResponseLabelsWithContext(ctx context.Context, body []byte, originalQuery string) []byte { start := time.Now() var resp map[string]interface{} @@ -12095,6 +12972,19 @@ func (p *Proxy) translateStatsResponseLabelsWithContext(ctx context.Context, bod changed = true continue } + if k == "_stream" { + if rawStream, ok := v.(string); ok { + for streamKey, streamValue := range parseStreamLabels(rawStream) { + lokiKey := streamKey + if !p.labelTranslator.IsPassthrough() { + lokiKey = p.labelTranslator.ToLoki(streamKey) + } + translated[lokiKey] = streamValue + } + changed = true + continue + } + } lokiKey := k if !p.labelTranslator.IsPassthrough() { lokiKey = p.labelTranslator.ToLoki(k) @@ -12104,14 +12994,24 @@ func (p *Proxy) translateStatsResponseLabelsWithContext(ctx context.Context, bod } translated[lokiKey] = v } - if strings.Contains(originalQuery, "detected_level") { - if _, ok := translated["detected_level"]; !ok { - if value, ok := translated["level"]; ok { - translated["detected_level"] = value - delete(translated, "level") - changed = true - } + syntheticLabels := make(map[string]string, len(translated)) + for key, value := range translated { + if s, ok := value.(string); ok { + syntheticLabels[key] = s + } + } + beforeSyntheticCount := len(syntheticLabels) + ensureDetectedLevel(syntheticLabels) + ensureSyntheticServiceName(syntheticLabels) + if len(syntheticLabels) != beforeSyntheticCount { + changed = true + } + for key, value := range syntheticLabels { + if existing, ok := translated[key]; ok && existing == value { + continue } + translated[key] = value + changed = true } if changed { translatedMetrics++ diff --git a/internal/proxy/proxy_helpers_test.go b/internal/proxy/proxy_helpers_test.go index 984c6829..fb1e5550 100644 --- a/internal/proxy/proxy_helpers_test.go +++ b/internal/proxy/proxy_helpers_test.go @@ -2,6 +2,7 @@ package proxy import ( "net/http/httptest" + "reflect" "testing" "time" @@ -79,3 +80,104 @@ func TestProxyHelpers_CanonicalReadCacheKey_NormalizesDetectedLimitsAndDefaults( t.Fatalf("expected equivalent detected_fields requests to share cache key, got %q != %q", keyA, keyB) } } + +func TestProxyHelpers_AddStatsByStreamClause_PreservesLevelIdentity(t *testing.T) { + got := addStatsByStreamClause(`app:=api-gateway | stats count()`) + if got != `app:=api-gateway | stats by (_stream, level) count()` { + t.Fatalf("unexpected stats identity clause: %q", got) + } +} + +func TestProxyHelpers_PreserveMetricStreamIdentity_UsesStreamAndLevelForBareMetrics(t *testing.T) { + got := preserveMetricStreamIdentity(`rate({app="api-gateway"} |= "GET"[5m])`, `app:=api-gateway ~"GET" | stats rate()`, nil) + if got != `app:=api-gateway ~"GET" | stats by (_stream, level) rate()` { + t.Fatalf("unexpected preserved metric query: %q", got) + } +} + +func TestProxyHelpers_ParseBareParserMetricCompatSpec(t *testing.T) { + spec, ok := parseBareParserMetricCompatSpec(`count_over_time({app="api-gateway"} | json | status >= 500 [5m])`) + if !ok { + t.Fatal("expected bare parser metric query to be recognized") + } + if spec.funcName != "count_over_time" { + t.Fatalf("unexpected funcName %q", spec.funcName) + } + if spec.baseQuery != `{app="api-gateway"} | json | status >= 500` { + t.Fatalf("unexpected baseQuery %q", spec.baseQuery) + } + if spec.rangeWindow != 5*time.Minute { + t.Fatalf("unexpected rangeWindow %v", spec.rangeWindow) + } +} + +func TestProxyHelpers_ParseBareParserMetricCompatSpec_RejectsAggregatedQuery(t *testing.T) { + if _, ok := parseBareParserMetricCompatSpec(`sum by (app) (count_over_time({app="api-gateway"} | json | status >= 500 [5m]))`); ok { + t.Fatal("expected aggregated parser metric query to bypass bare compat handling") + } +} + +func TestProxyHelpers_ParseBareParserMetricCompatSpec_AcceptsUnwrapRangeFunctions(t *testing.T) { + spec, ok := parseBareParserMetricCompatSpec(`sum_over_time({app="api-gateway"} | json | unwrap duration_ms [5m])`) + if !ok { + t.Fatal("expected unwrap range function to use bare compat handling") + } + if spec.unwrapField != "duration_ms" { + t.Fatalf("unexpected unwrapField %q", spec.unwrapField) + } +} + +func TestProxyHelpers_ParseAbsentOverTimeCompatSpec(t *testing.T) { + spec, ok := parseAbsentOverTimeCompatSpec(`absent_over_time({app="missing"}[5m])`) + if !ok { + t.Fatal("expected absent_over_time to be recognized") + } + if spec.baseQuery != `{app="missing"}` { + t.Fatalf("unexpected baseQuery %q", spec.baseQuery) + } + if spec.rangeWindow != 5*time.Minute { + t.Fatalf("unexpected rangeWindow %v", spec.rangeWindow) + } +} + +func TestProxyHelpers_BuildBareParserMetricMatrix(t *testing.T) { + body := buildBareParserMetricMatrix([]bareParserMetricSeries{ + { + metric: map[string]string{"app": "api-gateway", "status": "500"}, + samples: []bareParserMetricSample{ + {tsNanos: 120 * int64(time.Second), value: 1}, + {tsNanos: 180 * int64(time.Second), value: 1}, + }, + }, + }, 180*int64(time.Second), 300*int64(time.Second), int64(time.Minute), bareParserMetricCompatSpec{ + funcName: "count_over_time", + rangeWindow: 5 * time.Minute, + }) + + data := body["data"].(map[string]interface{}) + results := data["result"].([]lokiMatrixResult) + if len(results) != 1 { + t.Fatalf("expected single result series, got %d", len(results)) + } + got := results[0].Values + want := [][]interface{}{ + {float64(180), "2"}, + {float64(240), "2"}, + {float64(300), "2"}, + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("unexpected matrix values: got=%v want=%v", got, want) + } +} + +func TestProxyHelpers_StatsResponseIsEmpty(t *testing.T) { + if !statsResponseIsEmpty([]byte(`{"status":"success","data":{"resultType":"vector","result":[]}}`)) { + t.Fatal("expected empty vector result to count as absent") + } + if !statsResponseIsEmpty([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[123,"0"]}]}}`)) { + t.Fatal("expected zero-valued vector result to count as absent") + } + if statsResponseIsEmpty([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[123,"2"]}]}}`)) { + t.Fatal("expected positive-valued vector result not to count as absent") + } +} diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index a7fdcde2..a8b9ebf8 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -619,7 +619,9 @@ func TestContract_Query_ResponseFormat(t *testing.T) { defer vlBackend.Close() resp := doGet(t, vlBackend.URL, "/loki/api/v1/query?query=*&limit=10") - assertLokiSuccess(t, resp) + if resp["status"] != "error" { + t.Fatalf("expected instant log query to fail, got %#v", resp) + } } // --- /loki/api/v1/series --- diff --git a/internal/proxy/query_semantics_compat_handlers_test.go b/internal/proxy/query_semantics_compat_handlers_test.go new file mode 100644 index 00000000..1cd2ca2a --- /dev/null +++ b/internal/proxy/query_semantics_compat_handlers_test.go @@ -0,0 +1,444 @@ +package proxy + +import ( + "errors" + "math" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "strings" + "testing" + "time" +) + +func TestContract_QueryRange_BareParserMetricCompatPreservesSeriesLabels(t *testing.T) { + startNanos := int64(1704067200 * 1e9) + endNanos := startNanos + int64(2*time.Minute) + + vlBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/select/logsql/query" { + t.Fatalf("unexpected path %s", r.URL.Path) + } + w.Header().Set("Content-Type", "application/x-ndjson") + _, _ = w.Write([]byte( + `{"_time":"` + strconv.FormatInt(startNanos+int64(30*time.Second), 10) + `","_stream":"{app=\"api-gateway\"}","level":"error","_msg":"{\"status\":500}","status":"500"}` + "\n" + + `{"_time":"` + strconv.FormatInt(startNanos+int64(90*time.Second), 10) + `","_stream":"{app=\"api-gateway\"}","level":"error","_msg":"{\"status\":500}","status":"500"}` + "\n", + )) + })) + defer vlBackend.Close() + + p := newTestProxy(t, vlBackend.URL) + req := httptest.NewRequest("GET", "/loki/api/v1/query_range?query="+url.QueryEscape(`count_over_time({app="api-gateway"} | json | status >= 500 [5m])`)+"&start="+strconv.FormatInt(startNanos, 10)+"&end="+strconv.FormatInt(endNanos, 10)+"&step=60", nil) + resp := httptest.NewRecorder() + + p.handleQueryRange(resp, req) + + if resp.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", resp.Code, resp.Body.String()) + } + var body map[string]interface{} + mustUnmarshal(t, resp.Body.Bytes(), &body) + data := assertDataIsObject(t, body) + if got := data["resultType"]; got != "matrix" { + t.Fatalf("expected matrix resultType, got %v", got) + } + result := assertResultIsArray(t, data) + if len(result) != 1 { + t.Fatalf("expected one result series, got %d", len(result)) + } + series := result[0].(map[string]interface{}) + metric := series["metric"].(map[string]interface{}) + if metric["app"] != "api-gateway" { + t.Fatalf("expected app label, got %v", metric) + } + if metric["status"] != "500" { + t.Fatalf("expected parser-derived status label, got %v", metric) + } + values := series["values"].([]interface{}) + if len(values) == 0 { + t.Fatalf("expected matrix values, got empty series: %v", series) + } +} + +func TestContract_Query_BareParserMetricCompatUnwrapVector(t *testing.T) { + evalNanos := int64(1704067200 * 1e9) + + vlBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/select/logsql/query" { + t.Fatalf("unexpected path %s", r.URL.Path) + } + w.Header().Set("Content-Type", "application/x-ndjson") + _, _ = w.Write([]byte( + `{"_time":"` + strconv.FormatInt(evalNanos-int64(2*time.Minute), 10) + `","_stream":"{app=\"api-gateway\"}","_msg":"first","duration_ms":"10"}` + "\n" + + `{"_time":"` + strconv.FormatInt(evalNanos-int64(1*time.Minute), 10) + `","_stream":"{app=\"api-gateway\"}","_msg":"second","duration_ms":"20"}` + "\n", + )) + })) + defer vlBackend.Close() + + p := newTestProxy(t, vlBackend.URL) + req := httptest.NewRequest("GET", "/loki/api/v1/query?query="+url.QueryEscape(`avg_over_time({app="api-gateway"} | json | unwrap duration_ms [5m])`)+"&time="+strconv.FormatInt(evalNanos, 10), nil) + resp := httptest.NewRecorder() + + p.handleQuery(resp, req) + + if resp.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", resp.Code, resp.Body.String()) + } + var body map[string]interface{} + mustUnmarshal(t, resp.Body.Bytes(), &body) + data := assertDataIsObject(t, body) + if got := data["resultType"]; got != "vector" { + t.Fatalf("expected vector resultType, got %v", got) + } + result := assertResultIsArray(t, data) + if len(result) != 1 { + t.Fatalf("expected one vector result, got %d", len(result)) + } + entry := result[0].(map[string]interface{}) + value := entry["value"].([]interface{}) + if value[1] != "15" { + t.Fatalf("expected averaged unwrap value 15, got %v", value) + } +} + +func TestContract_Query_AbsentOverTimeCompatReturnsSyntheticVector(t *testing.T) { + evalNanos := int64(1704067200 * 1e9) + + vlBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/select/logsql/stats_query" { + t.Fatalf("unexpected path %s", r.URL.Path) + } + _, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[]}}`)) + })) + defer vlBackend.Close() + + p := newTestProxy(t, vlBackend.URL) + req := httptest.NewRequest("GET", "/loki/api/v1/query?query="+url.QueryEscape(`absent_over_time({app="missing",env="dev",cluster=~"ops-.*"}[5m])`)+"&time="+strconv.FormatInt(evalNanos, 10), nil) + resp := httptest.NewRecorder() + + p.handleQuery(resp, req) + + if resp.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", resp.Code, resp.Body.String()) + } + var body map[string]interface{} + mustUnmarshal(t, resp.Body.Bytes(), &body) + data := assertDataIsObject(t, body) + result := assertResultIsArray(t, data) + if len(result) != 1 { + t.Fatalf("expected single absent vector sample, got %d", len(result)) + } + entry := result[0].(map[string]interface{}) + metric := entry["metric"].(map[string]interface{}) + if metric["app"] != "missing" || metric["env"] != "dev" { + t.Fatalf("expected exact-match labels only, got %v", metric) + } + if _, found := metric["cluster"]; found { + t.Fatalf("did not expect regex matcher label in absent vector metric: %v", metric) + } + value := entry["value"].([]interface{}) + if value[1] != "1" { + t.Fatalf("expected absent_over_time to return 1, got %v", value) + } +} + +func TestContract_Query_AbsentOverTimeCompatReturnsEmptyWhenPresent(t *testing.T) { + vlBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"app":"demo"},"value":[1704067200,"2"]}]}}`)) + })) + defer vlBackend.Close() + + p := newTestProxy(t, vlBackend.URL) + req := httptest.NewRequest("GET", "/loki/api/v1/query?query="+url.QueryEscape(`absent_over_time({app="demo"}[5m])`)+"&time=1704067200000000000", nil) + resp := httptest.NewRecorder() + + p.handleQuery(resp, req) + + if resp.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", resp.Code, resp.Body.String()) + } + var body map[string]interface{} + mustUnmarshal(t, resp.Body.Bytes(), &body) + data := assertDataIsObject(t, body) + result := assertResultIsArray(t, data) + if len(result) != 0 { + t.Fatalf("expected empty absent_over_time vector when source exists, got %v", result) + } +} + +func TestProxyHelpers_FetchBareParserMetricSeries_PropagatesBackendStatus(t *testing.T) { + vlBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, `backend overloaded`, http.StatusBadGateway) + })) + defer vlBackend.Close() + + p := newTestProxy(t, vlBackend.URL) + _, err := p.fetchBareParserMetricSeries(t.Context(), `count_over_time({app="api-gateway"} | json | status >= 500 [5m])`, bareParserMetricCompatSpec{ + funcName: "count_over_time", + baseQuery: `{app="api-gateway"} | json | status >= 500`, + rangeWindow: 5 * time.Minute, + }, "1704067200000000000", "1704067500000000000") + var apiErr *vlAPIError + if !errors.As(err, &apiErr) { + t.Fatalf("expected vlAPIError, got %v", err) + } + if apiErr.status != http.StatusBadGateway { + t.Fatalf("expected propagated 502, got %d", apiErr.status) + } +} + +func TestProxyHelpers_MetricValueFormattingAndWindowMath(t *testing.T) { + if got := formatMetricSampleValue(math.NaN()); got != "NaN" { + t.Fatalf("expected NaN formatting, got %q", got) + } + if got := formatMetricSampleValue(math.Inf(1)); got != "+Inf" { + t.Fatalf("expected +Inf formatting, got %q", got) + } + if got := formatMetricSampleValue(math.Inf(-1)); got != "-Inf" { + t.Fatalf("expected -Inf formatting, got %q", got) + } + if got := metricWindowValue("rate", 30, 10*time.Second); got != 3 { + t.Fatalf("expected rate window value 3, got %v", got) + } + window := []bareParserMetricSample{{value: 10}, {value: 20}, {value: 30}} + if got := bareParserMetricWindowValue("avg_over_time", window, bareParserMetricCompatSpec{rangeWindow: 5 * time.Minute}); got != 20 { + t.Fatalf("expected avg_over_time 20, got %v", got) + } + if got := bareParserMetricWindowValue("quantile_over_time", window, bareParserMetricCompatSpec{rangeWindow: 5 * time.Minute, quantile: 0.5}); got != 20 { + t.Fatalf("expected median quantile 20, got %v", got) + } + if got := metricWindowValue("count_over_time", 30, 10*time.Second); got != 30 { + t.Fatalf("expected non-rate metric window value 30, got %v", got) + } +} + +func TestProxyHelpers_ParseBareParserMetricCompatSpec_QuantileAndRejects(t *testing.T) { + spec, ok := parseBareParserMetricCompatSpec(`quantile_over_time(0.95, {app="api-gateway"} | json | unwrap duration_ms [5m])`) + if !ok { + t.Fatal("expected quantile_over_time unwrap query to be recognized") + } + if spec.funcName != "quantile_over_time" || spec.unwrapField != "duration_ms" || spec.quantile != 0.95 { + t.Fatalf("unexpected quantile compat spec: %+v", spec) + } + + if _, ok := parseBareParserMetricCompatSpec(`count_over_time({app="api-gateway"} |= "GET" [5m])`); ok { + t.Fatal("expected query without extracting parser stage to bypass bare compat handling") + } + if _, ok := parseBareParserMetricCompatSpec(`quantile_over_time(1.5, {app="api-gateway"} | json | unwrap duration_ms [5m])`); ok { + t.Fatal("expected invalid quantile to be rejected") + } + if got := extractBareParserUnwrapField(`{app="api-gateway"} | json | unwrap duration(duration_ms)`); got != "duration_ms" { + t.Fatalf("expected duration(...) unwrap field extraction, got %q", got) + } +} + +func TestProxyHelpers_BareParserMetricWindowValue_CoversMetricFunctions(t *testing.T) { + window := []bareParserMetricSample{{value: 10}, {value: 20}, {value: 30}} + spec := bareParserMetricCompatSpec{rangeWindow: 10 * time.Second, quantile: 0.5} + + tests := map[string]float64{ + "count_over_time": 60, + "rate": 6, + "bytes_over_time": 60, + "bytes_rate": 6, + "sum_over_time": 60, + "avg_over_time": 20, + "max_over_time": 30, + "min_over_time": 10, + "first_over_time": 10, + "last_over_time": 30, + "stdvar_over_time": 66.66666666666667, + } + + for funcName, want := range tests { + if got := bareParserMetricWindowValue(funcName, window, spec); math.Abs(got-want) > 1e-9 { + t.Fatalf("%s: got %v want %v", funcName, got, want) + } + } + if got := bareParserMetricWindowValue("stddev_over_time", window, spec); math.Abs(got-math.Sqrt(66.66666666666667)) > 1e-9 { + t.Fatalf("stddev_over_time: got %v", got) + } +} + +func TestContract_Query_AbsentOverTimeCompatReturnsBackendError(t *testing.T) { + vlBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, `backend unavailable`, http.StatusBadGateway) + })) + defer vlBackend.Close() + + p := newTestProxy(t, vlBackend.URL) + req := httptest.NewRequest("GET", "/loki/api/v1/query?query="+url.QueryEscape(`absent_over_time({app="missing"}[5m])`)+"&time=1704067200000000000", nil) + resp := httptest.NewRecorder() + + p.handleQuery(resp, req) + + if resp.Code != http.StatusBadGateway { + t.Fatalf("expected 502, got %d body=%s", resp.Code, resp.Body.String()) + } + if !strings.Contains(resp.Body.String(), "backend unavailable") { + t.Fatalf("expected backend error body, got %s", resp.Body.String()) + } +} + +func TestContract_QueryRange_BareParserMetricCompatValidationAndBackendError(t *testing.T) { + p := newTestProxy(t, "http://unused") + spec := bareParserMetricCompatSpec{ + funcName: "count_over_time", + baseQuery: `{app="api-gateway"} | json | status >= 500`, + rangeWindow: 5 * time.Minute, + } + + badStartReq := httptest.NewRequest("GET", "/loki/api/v1/query_range?query="+url.QueryEscape(`count_over_time({app="api-gateway"} | json | status >= 500 [5m])`)+"&start=bad&end=1704067500000000000&step=60", nil) + badStartResp := httptest.NewRecorder() + p.proxyBareParserMetricQueryRange(badStartResp, badStartReq, time.Now(), badStartReq.URL.Query().Get("query"), spec) + if badStartResp.Code != http.StatusBadRequest { + t.Fatalf("expected invalid start to return 400, got %d", badStartResp.Code) + } + + badStepReq := httptest.NewRequest("GET", "/loki/api/v1/query_range?query="+url.QueryEscape(`count_over_time({app="api-gateway"} | json | status >= 500 [5m])`)+"&start=1704067200000000000&end=1704067500000000000&step=0", nil) + badStepResp := httptest.NewRecorder() + p.proxyBareParserMetricQueryRange(badStepResp, badStepReq, time.Now(), badStepReq.URL.Query().Get("query"), spec) + if badStepResp.Code != http.StatusBadRequest { + t.Fatalf("expected invalid step to return 400, got %d", badStepResp.Code) + } + + vlBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, `upstream timeout`, http.StatusBadGateway) + })) + defer vlBackend.Close() + + p = newTestProxy(t, vlBackend.URL) + errReq := httptest.NewRequest("GET", "/loki/api/v1/query_range?query="+url.QueryEscape(`count_over_time({app="api-gateway"} | json | status >= 500 [5m])`)+"&start=1704067200000000000&end=1704067500000000000&step=60", nil) + errResp := httptest.NewRecorder() + p.proxyBareParserMetricQueryRange(errResp, errReq, time.Now(), errReq.URL.Query().Get("query"), spec) + if errResp.Code != http.StatusGatewayTimeout { + t.Fatalf("expected timeout-shaped upstream error to map to 504, got %d body=%s", errResp.Code, errResp.Body.String()) + } +} + +func TestContract_Query_BareParserMetricCompatPropagatesBackendError(t *testing.T) { + vlBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, `backend unavailable`, http.StatusBadGateway) + })) + defer vlBackend.Close() + + p := newTestProxy(t, vlBackend.URL) + req := httptest.NewRequest("GET", "/loki/api/v1/query?query="+url.QueryEscape(`avg_over_time({app="api-gateway"} | json | unwrap duration_ms [5m])`)+"&time=1704067200000000000", nil) + resp := httptest.NewRecorder() + + p.handleQuery(resp, req) + + if resp.Code != http.StatusBadGateway { + t.Fatalf("expected upstream 502 to propagate, got %d body=%s", resp.Code, resp.Body.String()) + } +} + +func TestProxyHelpers_FetchBareParserMetricSeries_SkipsMalformedEntriesAndSortsSeries(t *testing.T) { + startNanos := int64(1704067200 * 1e9) + vlBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/select/logsql/query" { + t.Fatalf("unexpected path %s", r.URL.Path) + } + w.Header().Set("Content-Type", "application/x-ndjson") + _, _ = w.Write([]byte(strings.Join([]string{ + `not-json`, + `{"_time":"bad","_stream":"{app=\"api-gateway\"}","_msg":"{\"status\":500}","status":"500"}`, + `{"_time":"` + strconv.FormatInt(startNanos, 10) + `","_stream":"{app=\"api-gateway\"}","_msg":"{\"status\":500}","status":"500"}`, + `{"_time":"` + strconv.FormatInt(startNanos+int64(time.Second), 10) + `","_stream":"{app=\"worker\"}","_msg":"{\"status\":404}","status":"404"}`, + }, "\n"))) + })) + defer vlBackend.Close() + + p := newTestProxy(t, vlBackend.URL) + series, err := p.fetchBareParserMetricSeries(t.Context(), `count_over_time({app=~"api-gateway|worker"} | json | status >= 400 [5m])`, bareParserMetricCompatSpec{ + funcName: "count_over_time", + baseQuery: `{app=~"api-gateway|worker"} | json | status >= 400`, + rangeWindow: 5 * time.Minute, + }, strconv.FormatInt(startNanos, 10), strconv.FormatInt(startNanos+int64(2*time.Minute), 10)) + if err != nil { + t.Fatalf("expected successful bare parser series fetch, got %v", err) + } + if len(series) != 2 { + t.Fatalf("expected two valid series after skipping malformed entries, got %d", len(series)) + } + if series[0].metric["app"] != "api-gateway" || series[1].metric["app"] != "worker" { + t.Fatalf("expected canonical app ordering, got %+v", series) + } +} + +func TestProxyHelpers_FetchBareParserMetricSeries_UnwrapSkipsMissingAndNonNumericValues(t *testing.T) { + startNanos := int64(1704067200 * 1e9) + vlBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/select/logsql/query" { + t.Fatalf("unexpected path %s", r.URL.Path) + } + w.Header().Set("Content-Type", "application/x-ndjson") + _, _ = w.Write([]byte(strings.Join([]string{ + `{"_time":"` + strconv.FormatInt(startNanos, 10) + `","_stream":"{app=\"api-gateway\"}","_msg":"first"}`, + `{"_time":"` + strconv.FormatInt(startNanos+int64(time.Second), 10) + `","_stream":"{app=\"api-gateway\"}","_msg":"second","duration_ms":"not-a-number"}`, + `{"_time":"` + strconv.FormatInt(startNanos+int64(2*time.Second), 10) + `","_stream":"{app=\"api-gateway\"}","_msg":"third","duration_ms":"25"}`, + }, "\n"))) + })) + defer vlBackend.Close() + + p := newTestProxy(t, vlBackend.URL) + series, err := p.fetchBareParserMetricSeries(t.Context(), `avg_over_time({app="api-gateway"} | json | unwrap duration_ms [5m])`, bareParserMetricCompatSpec{ + funcName: "avg_over_time", + baseQuery: `{app="api-gateway"} | json | unwrap duration_ms`, + rangeWindow: 5 * time.Minute, + unwrapField: "duration_ms", + }, strconv.FormatInt(startNanos, 10), strconv.FormatInt(startNanos+int64(3*time.Second), 10)) + if err != nil { + t.Fatalf("expected successful unwrap fetch, got %v", err) + } + if len(series) != 1 { + t.Fatalf("expected one surviving series, got %d", len(series)) + } + if len(series[0].samples) != 1 || series[0].samples[0].value != 25 { + t.Fatalf("expected only numeric unwrap sample to survive, got %+v", series[0].samples) + } + if _, found := series[0].metric["duration_ms"]; found { + t.Fatalf("did not expect unwrap field to remain as a metric label: %+v", series[0].metric) + } +} + +func TestProxyHelpers_ParseAbsentOverTimeCompatSpec_RejectsInvalidInput(t *testing.T) { + if _, ok := parseAbsentOverTimeCompatSpec(`absent_over_time([5m])`); ok { + t.Fatal("expected empty absent_over_time query to be rejected") + } + if _, ok := parseAbsentOverTimeCompatSpec(`absent_over_time({app="demo"}[bad])`); ok { + t.Fatal("expected invalid absent_over_time range to be rejected") + } +} + +func TestProxyHelpers_BuildAbsentInstantVector_UsesCurrentTimeFallback(t *testing.T) { + before := time.Now().Unix() + body := buildAbsentInstantVector("bad-time", map[string]string{"app": "missing"}) + after := time.Now().Unix() + + data := body["data"].(map[string]interface{}) + result := data["result"].([]lokiVectorResult) + if len(result) != 1 { + t.Fatalf("expected one absent vector sample, got %d", len(result)) + } + ts := result[0].Value[0].(float64) + if ts < float64(before) || ts > float64(after)+1 { + t.Fatalf("expected fallback timestamp between %d and %d, got %v", before, after, ts) + } +} + +func TestContract_Query_AbsentOverTimeCompatTranslateErrorReturnsBadRequest(t *testing.T) { + p := newTestProxy(t, "http://unused") + req := httptest.NewRequest("GET", "/loki/api/v1/query?query=ignored&time=1704067200000000000", nil) + resp := httptest.NewRecorder() + + p.proxyAbsentOverTimeQuery(resp, req, time.Now(), `{app="demo"`, absentOverTimeCompatSpec{ + baseQuery: `{app="demo"`, + rangeWindow: 5 * time.Minute, + }) + + if resp.Code != http.StatusBadRequest { + t.Fatalf("expected translate error to return 400, got %d body=%s", resp.Code, resp.Body.String()) + } +} diff --git a/internal/proxy/request_helpers_coverage_test.go b/internal/proxy/request_helpers_coverage_test.go index ac5c1ef0..c5f9bf2d 100644 --- a/internal/proxy/request_helpers_coverage_test.go +++ b/internal/proxy/request_helpers_coverage_test.go @@ -99,8 +99,8 @@ func TestTranslateStatsResponseLabelsWithContext_Coverage(t *testing.T) { if _, ok := metric["__name__"]; ok { t.Fatalf("expected __name__ to be removed, got %#v", metric) } - if _, ok := metric["level"]; ok { - t.Fatalf("expected level to be rewritten when detected_level requested, got %#v", metric) + if metric["level"] != "warn" { + t.Fatalf("expected original level to be preserved, got %#v", metric) } if metric["service_name"] != "api" || metric["detected_level"] != "warn" { t.Fatalf("unexpected translated metric labels %#v", metric) diff --git a/internal/proxy/stream_metadata_test.go b/internal/proxy/stream_metadata_test.go index c83d80db..4a9911e1 100644 --- a/internal/proxy/stream_metadata_test.go +++ b/internal/proxy/stream_metadata_test.go @@ -251,10 +251,12 @@ func TestQuery_DefaultRequestStaysTwoTupleForStrictDecoders(t *testing.T) { p := newStreamMetadataProxy(t, vlBackend.URL, true, false) q := url.Values{} q.Set("query", `{job="otel-proxy"}`) - req := httptest.NewRequest("GET", "/loki/api/v1/query?"+q.Encode(), nil) + q.Set("start", "1") + q.Set("end", "2") + req := httptest.NewRequest("GET", "/loki/api/v1/query_range?"+q.Encode(), nil) rec := httptest.NewRecorder() - p.handleQuery(rec, req) + p.handleQueryRange(rec, req) tuple := decodeFirstTuple(t, rec.Body.Bytes()) if len(tuple) != 2 { t.Fatalf("expected strict 2-tuple default query response, got %#v", tuple) @@ -268,11 +270,13 @@ func TestQuery_CategorizeLabelsReturnsThreeTuple(t *testing.T) { p := newStreamMetadataProxy(t, vlBackend.URL, true, false) q := url.Values{} q.Set("query", `{job="otel-proxy"}`) - req := httptest.NewRequest("GET", "/loki/api/v1/query?"+q.Encode(), nil) + q.Set("start", "1") + q.Set("end", "2") + req := httptest.NewRequest("GET", "/loki/api/v1/query_range?"+q.Encode(), nil) req.Header.Set("X-Loki-Response-Encoding-Flags", "categorize-labels") rec := httptest.NewRecorder() - p.handleQuery(rec, req) + p.handleQueryRange(rec, req) tuple := decodeFirstTuple(t, rec.Body.Bytes()) if len(tuple) != 3 { t.Fatalf("expected 3-tuple categorize-labels query response, got %#v", tuple) diff --git a/internal/proxy/tuple_contract_test.go b/internal/proxy/tuple_contract_test.go index 3c320713..e26c22f4 100644 --- a/internal/proxy/tuple_contract_test.go +++ b/internal/proxy/tuple_contract_test.go @@ -199,11 +199,12 @@ func TestTupleContract_DefaultQueryStrictTwoTuple(t *testing.T) { p := newTupleContractProxy(t, backend.URL, false) params := url.Values{} params.Set("query", `{job="tuple-contract"} |= "api" | json | logfmt | drop __error__, __error_details__`) - params.Set("time", "2") - req := httptest.NewRequest(http.MethodGet, "/loki/api/v1/query?"+params.Encode(), nil) + params.Set("start", "1") + params.Set("end", "2") + req := httptest.NewRequest(http.MethodGet, "/loki/api/v1/query_range?"+params.Encode(), nil) rec := httptest.NewRecorder() - p.handleQuery(rec, req) + p.handleQueryRange(rec, req) if rec.Code != http.StatusOK { t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String()) } @@ -279,12 +280,13 @@ func TestTupleContract_CategorizeLabelsQueryThreeTuple(t *testing.T) { p := newTupleContractProxy(t, backend.URL, false) params := url.Values{} params.Set("query", `{job="tuple-contract"} |= "api" | json | logfmt | drop __error__, __error_details__`) - params.Set("time", "2") - req := httptest.NewRequest(http.MethodGet, "/loki/api/v1/query?"+params.Encode(), nil) + params.Set("start", "1") + params.Set("end", "2") + req := httptest.NewRequest(http.MethodGet, "/loki/api/v1/query_range?"+params.Encode(), nil) req.Header.Set("X-Loki-Response-Encoding-Flags", "categorize-labels") rec := httptest.NewRecorder() - p.handleQuery(rec, req) + p.handleQueryRange(rec, req) if rec.Code != http.StatusOK { t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String()) } diff --git a/internal/proxy/vector_matching.go b/internal/proxy/vector_matching.go index 1cde1f39..40249e14 100644 --- a/internal/proxy/vector_matching.go +++ b/internal/proxy/vector_matching.go @@ -2,6 +2,7 @@ package proxy import ( "encoding/json" + "fmt" "net/http" "sort" "strconv" @@ -147,8 +148,12 @@ func applyWithoutMatrix(body []byte, exclude map[string]bool) []byte { return body } - // For matrix, just strip the excluded labels (don't re-aggregate across time) - var result []map[string]interface{} + type groupedSeries struct { + metric map[string]interface{} + values map[string]float64 + order map[string]interface{} + } + groups := make(map[string]*groupedSeries) for _, series := range resp.Data.Result { filtered := make(map[string]string) for k, v := range series.Metric { @@ -156,9 +161,55 @@ func applyWithoutMatrix(body []byte, exclude map[string]bool) []byte { filtered[k] = v } } + key := metricKeyStr(filtered) + group := groups[key] + if group == nil { + metric := make(map[string]interface{}, len(filtered)) + for k, v := range filtered { + metric[k] = v + } + group = &groupedSeries{ + metric: metric, + values: make(map[string]float64, len(series.Values)), + order: make(map[string]interface{}, len(series.Values)), + } + groups[key] = group + } + for _, point := range series.Values { + if len(point) < 2 { + continue + } + tsKey := fmt.Sprintf("%v", point[0]) + group.order[tsKey] = point[0] + switch raw := point[1].(type) { + case string: + parsed, _ := strconv.ParseFloat(raw, 64) + group.values[tsKey] += parsed + case float64: + group.values[tsKey] += raw + } + } + } + + var result []map[string]interface{} + for _, group := range groups { + timestamps := make([]string, 0, len(group.values)) + for ts := range group.values { + timestamps = append(timestamps, ts) + } + sort.Slice(timestamps, func(i, j int) bool { + return timestamps[i] < timestamps[j] + }) + values := make([][]interface{}, 0, len(timestamps)) + for _, ts := range timestamps { + values = append(values, []interface{}{ + group.order[ts], + strconv.FormatFloat(group.values[ts], 'f', -1, 64), + }) + } result = append(result, map[string]interface{}{ - "metric": filtered, - "values": series.Values, + "metric": group.metric, + "values": values, }) } @@ -172,6 +223,53 @@ func applyWithoutMatrix(body []byte, exclude map[string]bool) []byte { return out } +func validateVectorMatchCardinality(leftBody, rightBody []byte, onLabels []string, ignoringLabels []string, allowGroupLeft, allowGroupRight bool) error { + if allowGroupLeft || allowGroupRight { + return nil + } + + leftSeries := parseMetricSeries(leftBody) + rightSeries := parseMetricSeries(rightBody) + if len(leftSeries) == 0 || len(rightSeries) == 0 { + return nil + } + + var keyFn func(map[string]string) string + if len(onLabels) > 0 { + keyFn = func(metric map[string]string) string { + return subsetKey(metric, onLabels) + } + } else { + ignore := make(map[string]bool, len(ignoringLabels)) + for _, label := range ignoringLabels { + ignore[strings.TrimSpace(label)] = true + } + keyFn = func(metric map[string]string) string { + return excludeKey(metric, ignore) + } + } + + leftCounts := make(map[string]int) + rightCounts := make(map[string]int) + for _, series := range leftSeries { + leftCounts[keyFn(series.metric)]++ + } + for _, series := range rightSeries { + rightCounts[keyFn(series.metric)]++ + } + + for key, leftCount := range leftCounts { + rightCount := rightCounts[key] + if rightCount == 0 { + continue + } + if leftCount > 1 || rightCount > 1 { + return fmt.Errorf("multiple matches for labels: many-to-one matching must be explicit (group_left/group_right)") + } + } + return nil +} + // applyOnMatching joins two metric results by a specified label subset. // on(label1, label2) means: match series where label1 and label2 are equal. func applyOnMatching(leftBody, rightBody []byte, op string, onLabels []string, resultType string) []byte { diff --git a/internal/translator/fixes_test.go b/internal/translator/fixes_test.go index a65aa9d7..a6ca5536 100644 --- a/internal/translator/fixes_test.go +++ b/internal/translator/fixes_test.go @@ -64,3 +64,16 @@ func TestWithoutInQuotes_NotRejected(t *testing.T) { t.Errorf("should not reject 'without' inside quotes: %v", err) } } + +func TestMetricQuery_MissingRangeDoesNotTranslate(t *testing.T) { + tests := []string{ + `rate({app="nginx"})`, + `count_over_time({app="nginx"})`, + `quantile_over_time(0.95, {app="nginx"} | unwrap latency)`, + } + for _, logql := range tests { + if translated, err := TranslateLogQL(logql); err == nil && strings.Contains(translated, "| stats ") { + t.Fatalf("expected missing-range metric query to stay non-metric, got %q", translated) + } + } +} diff --git a/internal/translator/translator.go b/internal/translator/translator.go index b732bf45..1085c48e 100644 --- a/internal/translator/translator.go +++ b/internal/translator/translator.go @@ -322,12 +322,6 @@ func translatePipelineStage(stage string, labelFn LabelTranslateFunc) string { // rejects these, so treat them as a no-op stage. return "" } - if !hasNamedExtractPlaceholder(patternExpr) { - if filter, ok := patternExpressionToLineFilter(patternExpr); ok { - return filter - } - return "" - } return "| extract " + patternExpr } if strings.HasPrefix(stage, "regexp ") { @@ -340,12 +334,6 @@ func translatePipelineStage(stage string, labelFn LabelTranslateFunc) string { if isNoopPatternExpression(patternExpr) { return "" } - if !hasNamedExtractPlaceholder(patternExpr) { - if filter, ok := patternExpressionToLineFilter(patternExpr); ok { - return filter - } - return "" - } return "| extract " + patternExpr } @@ -412,52 +400,6 @@ func isNoopPatternExpression(expr string) bool { } } -func hasNamedExtractPlaceholder(expr string) bool { - pattern, ok := extractPatternExpressionValue(expr) - if !ok { - return false - } - for { - start := strings.IndexByte(pattern, '<') - if start < 0 { - return false - } - pattern = pattern[start+1:] - end := strings.IndexByte(pattern, '>') - if end < 0 { - return false - } - token := strings.TrimSpace(pattern[:end]) - if token != "" && token != "_" { - return true - } - pattern = pattern[end+1:] - } -} - -func patternExpressionToLineFilter(expr string) (string, bool) { - pattern, ok := extractPatternExpressionValue(expr) - if !ok { - return "", false - } - return "~" + strconv.Quote(patternFilterValueToRegex(pattern)), true -} - -func extractPatternExpressionValue(expr string) (string, bool) { - expr = strings.TrimSpace(expr) - if expr == "" { - return "", false - } - if strings.HasPrefix(expr, "`") && strings.HasSuffix(expr, "`") && len(expr) >= 2 { - return expr[1 : len(expr)-1], true - } - unquoted, err := strconv.Unquote(expr) - if err != nil { - return "", false - } - return unquoted, true -} - // translateLabelFilter handles label comparison filters. func translateLabelFilter(stage string, labelFn LabelTranslateFunc) string { if chained, ok := translateLogicalLabelFilterChain(stage, labelFn); ok { @@ -899,7 +841,10 @@ func tryTranslateMetricQuery(logql string, labelFn LabelTranslateFunc) (string, inner = inner[:end] // Extract the query and duration: {stream} | pipeline [5m] - query, _ := extractQueryAndDuration(inner) + query, duration := extractQueryAndDuration(inner) + if strings.TrimSpace(duration) == "" { + continue + } // Translate the inner log query part logsqlQuery, err := translateLogQuery(query, labelFn) @@ -1233,7 +1178,10 @@ func tryTranslateQuantileOverTime(innerExpr, _, byLabels string, labelFn LabelTr queryPart := strings.TrimSpace(body[commaIdx+1:]) // Extract query and duration - query, _ := extractQueryAndDuration(queryPart) + query, duration := extractQueryAndDuration(queryPart) + if strings.TrimSpace(duration) == "" { + return "", false + } // Translate the inner log query logsqlQuery, err := translateLogQuery(query, labelFn) diff --git a/internal/translator/translator_test.go b/internal/translator/translator_test.go index 51abc4eb..c175c9c5 100644 --- a/internal/translator/translator_test.go +++ b/internal/translator/translator_test.go @@ -96,16 +96,6 @@ func TestTranslateLogQL(t *testing.T) { logql: `{app="nginx"} | pattern " - - <_>"`, want: `app:=nginx | extract " - - <_>"`, }, - { - name: "pattern parser without named field falls back to line filter", - logql: `{app="nginx"} | pattern "Metrics"`, - want: `app:=nginx ~"Metrics"`, - }, - { - name: "extract parser without named field falls back to line filter", - logql: `{app="nginx"} | extract "Metrics"`, - want: `app:=nginx ~"Metrics"`, - }, { name: "pattern wildcard no-op is dropped", logql: `{app="nginx"} | pattern "(.*)" | status="500"`, diff --git a/scripts/smoke-test.sh b/scripts/smoke-test.sh index 5dc62585..cc15fefa 100755 --- a/scripts/smoke-test.sh +++ b/scripts/smoke-test.sh @@ -139,6 +139,23 @@ fetch_query_categorized() { "${base_url}/loki/api/v1/query" } +check_instant_log_query_error() { + local payload="$1" + local endpoint="$2" + + if ! echo "$payload" | jq -e ' + .status == "error" + and .errorType == "bad_data" + and (.error | type == "string") + and (.error | contains("log queries are not supported as an instant query type")) + ' >/dev/null; then + local sample + sample="$(echo "$payload" | jq -c '.')" + echo "expected Loki-compatible bad_data error for ${endpoint}; payload=${sample:-}" >&2 + return 1 + fi +} + fetch_with_retry() { local fetcher="$1" local endpoint="$2" @@ -165,13 +182,13 @@ fetch_with_retry() { } range_payload="$(fetch_with_retry fetch_query_range "/query_range" "$DEFAULT_PROXY_URL")" -query_payload="$(fetch_with_retry fetch_query "/query" "$DEFAULT_PROXY_URL")" range_categorized_payload="$(fetch_with_retry fetch_query_range_categorized "/query_range categorize-labels" "$CATEGORIZED_PROXY_URL")" -query_categorized_payload="$(fetch_with_retry fetch_query_categorized "/query categorize-labels" "$CATEGORIZED_PROXY_URL")" +query_payload="$(fetch_query "$DEFAULT_PROXY_URL")" +query_categorized_payload="$(fetch_query_categorized "$CATEGORIZED_PROXY_URL")" check_strict_two_tuple "$range_payload" "/query_range" -check_strict_two_tuple "$query_payload" "/query" check_categorize_three_tuple "$range_categorized_payload" "/query_range" -check_categorize_three_tuple "$query_categorized_payload" "/query" +check_instant_log_query_error "$query_payload" "/query" +check_instant_log_query_error "$query_categorized_payload" "/query categorize-labels" -echo "tuple contract smoke check passed (default=${DEFAULT_PROXY_URL} strict 2-tuple; categorized=${CATEGORIZED_PROXY_URL} 3-tuple)" +echo "tuple contract smoke check passed (query_range strict 2-tuple; query_range categorize-labels 3-tuple; query instant log selector returns Loki-compatible bad_data)" diff --git a/test/e2e-compat/query-semantics-matrix.json b/test/e2e-compat/query-semantics-matrix.json new file mode 100644 index 00000000..c59039d9 --- /dev/null +++ b/test/e2e-compat/query-semantics-matrix.json @@ -0,0 +1,596 @@ +{ + "description": "Manifest-driven Loki semantics parity cases. These use real Loki in the compose stack as the oracle for Loki-facing query behavior.", + "oracle": "docker-compose Loki reference", + "cases": [ + { + "id": "selector_exact_instant", + "family": "selector", + "endpoint": "query", + "query": "{app=\"api-gateway\",namespace=\"prod\",level=\"error\"}", + "expectation": "client_error" + }, + { + "id": "selector_exact_range", + "family": "selector", + "endpoint": "query_range", + "query": "{app=\"api-gateway\",namespace=\"prod\",level=\"error\"}", + "expectation": "success", + "expect_result_type": "streams", + "compare": "line_count", + "require_non_empty": true + }, + { + "id": "multi_label_regex_range", + "family": "selector", + "endpoint": "query_range", + "query": "{app=~\"api-.*\",namespace=\"prod\"}", + "expectation": "success", + "expect_result_type": "streams", + "compare": "line_count", + "require_non_empty": true + }, + { + "id": "multi_label_negative_regex", + "family": "selector", + "endpoint": "query_range", + "query": "{app=~\"api-.*|payment-.*|nginx-.*|java-.*\",namespace!~\"kube-.*\"}", + "expectation": "success", + "expect_result_type": "streams", + "compare": "line_count", + "require_non_empty": true + }, + { + "id": "line_filters_chained", + "family": "line_filter", + "endpoint": "query_range", + "query": "{app=\"api-gateway\",level=\"error\"} |= \"error\" != \"not found\"", + "expectation": "success", + "expect_result_type": "streams", + "compare": "line_count", + "require_non_empty": true + }, + { + "id": "line_filters_chain_mixed", + "family": "line_filter", + "endpoint": "query_range", + "query": "{app=\"api-gateway\"} |= \"api\" |~ \"GET|POST\" != \"health\"", + "expectation": "success", + "expect_result_type": "streams", + "compare": "line_count", + "require_non_empty": true + }, + { + "id": "line_filters_negative_regex", + "family": "line_filter", + "endpoint": "query_range", + "query": "{app=\"nginx-ingress\"} !~ \"sqlmap\"", + "expectation": "success", + "expect_result_type": "streams", + "compare": "line_count", + "require_non_empty": true + }, + { + "id": "json_pipeline_field_filter", + "family": "parser_pipeline", + "endpoint": "query_range", + "query": "{app=\"api-gateway\"} | json | method=\"GET\"", + "expectation": "success", + "expect_result_type": "streams", + "compare": "line_count", + "require_non_empty": true + }, + { + "id": "json_pipeline_numeric_filter", + "family": "parser_pipeline", + "endpoint": "query_range", + "query": "{app=\"api-gateway\", level=\"info\"} | json | duration_ms>100", + "expectation": "success", + "expect_result_type": "streams", + "compare": "line_count", + "require_non_empty": true + }, + { + "id": "json_pipeline_regex_filter", + "family": "parser_pipeline", + "endpoint": "query_range", + "query": "{app=\"api-gateway\"} | json | path=~\"/api/v1/.*\"", + "expectation": "success", + "expect_result_type": "streams", + "compare": "line_count", + "require_non_empty": true + }, + { + "id": "regexp_pipeline_field_filter", + "family": "parser_pipeline", + "endpoint": "query_range", + "query": "{app=\"api-gateway\"} | regexp \"\\\"status\\\":(?P[0-9]+)\" | status=~\"5..\"", + "expectation": "success", + "expect_result_type": "streams", + "compare": "line_count", + "require_non_empty": true + }, + { + "id": "pattern_pipeline_field_filter", + "family": "parser_pipeline", + "endpoint": "query_range", + "query": "{app=\"nginx-ingress\"} | pattern \"<_> - - <_> \\\" <_>\\\" \" | status >= 400", + "expectation": "success", + "expect_result_type": "streams", + "compare": "line_count", + "require_non_empty": true + }, + { + "id": "logfmt_pipeline_field_filter", + "family": "parser_pipeline", + "endpoint": "query_range", + "query": "{app=\"payment-service\"} | logfmt | level=\"error\"", + "expectation": "success", + "expect_result_type": "streams", + "compare": "line_count", + "require_non_empty": true + }, + { + "id": "json_then_line_format", + "family": "formatting", + "endpoint": "query_range", + "query": "{app=\"api-gateway\"} | json | line_format \"{{.method}} {{.path}} {{.status}}\"", + "expectation": "success", + "expect_result_type": "streams", + "compare": "line_count", + "require_non_empty": true + }, + { + "id": "count_over_time_by_level", + "family": "metric_aggregation", + "endpoint": "query_range", + "query": "sum by (level) (count_over_time({app=\"api-gateway\"}[5m]))", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "sum_rate_by_app", + "family": "metric_aggregation", + "endpoint": "query_range", + "query": "sum by (app) (rate({namespace=\"prod\"}[5m]))", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "negative_regex_metric_range", + "family": "metric_aggregation", + "endpoint": "query_range", + "query": "count_over_time({app=\"nginx-ingress\"} !~ \"sqlmap\"[5m])", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "parser_metric_range_filter", + "family": "metric_aggregation", + "endpoint": "query_range", + "query": "count_over_time({app=\"api-gateway\"} | json | status >= 500 [5m])", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "metric_keys", + "require_non_empty": true + }, + { + "id": "parser_bytes_metric_range_filter", + "family": "metric_aggregation", + "endpoint": "query_range", + "query": "bytes_over_time({app=\"api-gateway\"} | json | status >= 500 [5m])", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "metric_keys", + "require_non_empty": true + }, + { + "id": "parser_bytes_rate_metric_range_filter", + "family": "metric_aggregation", + "endpoint": "query_range", + "query": "bytes_rate({app=\"api-gateway\"} | json | status >= 500 [5m])", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "metric_keys", + "require_non_empty": true + }, + { + "id": "sum_over_time_unwrap", + "family": "metric_aggregation", + "endpoint": "query_range", + "query": "sum_over_time({app=\"api-gateway\"} | json | unwrap duration_ms [5m])", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "metric_keys", + "require_non_empty": true + }, + { + "id": "avg_over_time_unwrap", + "family": "metric_aggregation", + "endpoint": "query_range", + "query": "avg_over_time({app=\"api-gateway\"} | json | unwrap duration_ms [5m])", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "metric_keys", + "require_non_empty": true + }, + { + "id": "max_over_time_unwrap", + "family": "metric_aggregation", + "endpoint": "query_range", + "query": "max_over_time({app=\"api-gateway\"} | json | unwrap duration_ms [5m])", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "metric_keys", + "require_non_empty": true + }, + { + "id": "min_over_time_unwrap", + "family": "metric_aggregation", + "endpoint": "query_range", + "query": "min_over_time({app=\"api-gateway\"} | json | unwrap duration_ms [5m])", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "metric_keys", + "require_non_empty": true + }, + { + "id": "first_over_time_unwrap", + "family": "metric_aggregation", + "endpoint": "query_range", + "query": "first_over_time({app=\"api-gateway\"} | json | unwrap duration_ms [5m])", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "metric_keys", + "require_non_empty": true + }, + { + "id": "last_over_time_unwrap", + "family": "metric_aggregation", + "endpoint": "query_range", + "query": "last_over_time({app=\"api-gateway\"} | json | unwrap duration_ms [5m])", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "metric_keys", + "require_non_empty": true + }, + { + "id": "stddev_over_time_unwrap", + "family": "metric_aggregation", + "endpoint": "query_range", + "query": "stddev_over_time({app=\"api-gateway\"} | json | unwrap duration_ms [5m])", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "metric_keys", + "require_non_empty": true + }, + { + "id": "stdvar_over_time_unwrap", + "family": "metric_aggregation", + "endpoint": "query_range", + "query": "stdvar_over_time({app=\"api-gateway\"} | json | unwrap duration_ms [5m])", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "metric_keys", + "require_non_empty": true + }, + { + "id": "unwrap_quantile_over_time", + "family": "metric_aggregation", + "endpoint": "query_range", + "query": "quantile_over_time(0.95, {app=\"api-gateway\"} | json | unwrap duration_ms [5m])", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "metric_keys", + "require_non_empty": true + }, + { + "id": "absent_over_time_missing", + "family": "metric_aggregation", + "endpoint": "query", + "query": "absent_over_time({app=\"definitely-missing-app\"}[5m])", + "expectation": "success", + "expect_result_type": "vector", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "comparison_scalar_metric_gt_zero", + "family": "binary_scalar", + "endpoint": "query_range", + "query": "sum by (level) (count_over_time({app=\"api-gateway\"}[5m])) > 0", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "comparison_scalar_metric_bool_gt_zero", + "family": "binary_scalar", + "endpoint": "query_range", + "query": "sum by (app) (count_over_time({namespace=\"prod\"}[5m])) > bool 0", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "without_grouping_metric", + "family": "metric_aggregation", + "endpoint": "query_range", + "query": "sum without (level) (count_over_time({app=\"api-gateway\"}[5m]))", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "scalar_binary_metric", + "family": "binary_scalar", + "endpoint": "query_range", + "query": "sum by (level) (count_over_time({app=\"api-gateway\"}[5m])) * 100", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "rate_with_line_filter", + "family": "metric_aggregation", + "endpoint": "query_range", + "query": "rate({app=\"api-gateway\"} |= \"GET\"[5m])", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "vector_or_metric", + "family": "binary_vector", + "endpoint": "query_range", + "query": "sum by (level) (count_over_time({app=\"api-gateway\"}[5m])) or sum by (level) (count_over_time({app=\"api-gateway\",level=\"debug\"}[5m]))", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "vector_unless_metric", + "family": "binary_vector", + "endpoint": "query_range", + "query": "sum by (level) (count_over_time({app=\"api-gateway\"}[5m])) unless sum by (level) (count_over_time({app=\"api-gateway\",level=\"debug\"}[5m]))", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "vector_binary_metric", + "family": "binary_vector", + "endpoint": "query_range", + "query": "sum by (level) (count_over_time({app=\"api-gateway\"}[5m])) / sum by (level) (count_over_time({app=\"api-gateway\"}[5m]))", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "on_grouping_metric", + "family": "binary_vector", + "endpoint": "query_range", + "query": "sum by (app, pod) (count_over_time({cluster=\"us-east-1\"}[5m])) / on(app) sum by (app) (count_over_time({cluster=\"us-east-1\", level=\"error\"}[5m]))", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "ignoring_grouping_metric", + "family": "binary_vector", + "endpoint": "query_range", + "query": "sum by (app, level) (count_over_time({app=\"api-gateway\"}[5m])) / ignoring(level) sum by (app) (count_over_time({app=\"api-gateway\"}[5m]))", + "step": "60", + "expectation": "server_error" + }, + { + "id": "group_left_join_metric", + "family": "binary_vector", + "endpoint": "query_range", + "query": "sum by (app, pod) (count_over_time({cluster=\"us-east-1\"}[5m])) * on(app) group_left(level) sum by (app, level) (count_over_time({cluster=\"us-east-1\", level=\"error\"}[5m]))", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "vector_and_metric", + "family": "binary_vector", + "endpoint": "query_range", + "query": "sum by (level) (count_over_time({app=\"api-gateway\"}[5m])) and sum by (level) (count_over_time({app=\"api-gateway\"}[5m]))", + "step": "60", + "expectation": "success", + "expect_result_type": "matrix", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "topk_metric", + "family": "aggregation", + "endpoint": "query", + "query": "topk(2, sum by (app) (count_over_time({namespace=\"prod\"}[5m])))", + "expectation": "success", + "expect_result_type": "vector", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "sort_desc_metric", + "family": "aggregation", + "endpoint": "query", + "query": "sort_desc(sum by (app) (count_over_time({namespace=\"prod\"}[5m])))", + "expectation": "success", + "expect_result_type": "vector", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "bottomk_metric", + "family": "aggregation", + "endpoint": "query", + "query": "bottomk(1, sum by (app) (count_over_time({namespace=\"prod\"}[5m])))", + "expectation": "success", + "expect_result_type": "vector", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "sort_metric", + "family": "aggregation", + "endpoint": "query", + "query": "sort(sum by (app) (count_over_time({namespace=\"prod\"}[5m])))", + "expectation": "success", + "expect_result_type": "vector", + "compare": "series_count", + "require_non_empty": true + }, + { + "id": "label_format_pipeline", + "family": "formatting", + "endpoint": "query_range", + "query": "{app=\"api-gateway\"} | json | label_format method_alias=\"{{.method}}\", status_code=\"{{.status}}\"", + "expectation": "success", + "expect_result_type": "streams", + "compare": "line_count", + "require_non_empty": true + }, + { + "id": "keep_pipeline", + "family": "formatting", + "endpoint": "query_range", + "query": "{app=\"api-gateway\"} | json | keep method, path, status", + "expectation": "success", + "expect_result_type": "streams", + "compare": "line_count", + "require_non_empty": true + }, + { + "id": "drop_pipeline", + "family": "formatting", + "endpoint": "query_range", + "query": "{app=\"api-gateway\"} | json | drop trace_id, upstream", + "expectation": "success", + "expect_result_type": "streams", + "compare": "line_count", + "require_non_empty": true + }, + { + "id": "impossible_filter_empty_success", + "family": "empty_result", + "endpoint": "query_range", + "query": "{app=\"api-gateway\"} |= \"IMPOSSIBLE_STRING_THAT_NEVER_EXISTS_12345\"", + "expectation": "success", + "expect_result_type": "streams", + "compare": "line_count" + }, + { + "id": "invalid_metric_aggregation_on_log_query", + "family": "invalid", + "endpoint": "query", + "query": "sum by(job) ({deployment_environment=\"dev\", k8s_cluster_name=\"ops-sand\"})", + "expectation": "client_error" + }, + { + "id": "invalid_topk_on_log_query", + "family": "invalid", + "endpoint": "query", + "query": "topk(2, {app=\"api-gateway\"})", + "expectation": "client_error" + }, + { + "id": "invalid_sort_on_log_query", + "family": "invalid", + "endpoint": "query", + "query": "sort({app=\"api-gateway\"})", + "expectation": "client_error" + }, + { + "id": "invalid_bottomk_on_log_query", + "family": "invalid", + "endpoint": "query", + "query": "bottomk(1, {app=\"api-gateway\"})", + "expectation": "client_error" + }, + { + "id": "invalid_sort_desc_on_log_query", + "family": "invalid", + "endpoint": "query", + "query": "sort_desc({app=\"api-gateway\"})", + "expectation": "client_error" + }, + { + "id": "invalid_rate_missing_range", + "family": "invalid", + "endpoint": "query", + "query": "rate({app=\"api-gateway\"})", + "expectation": "client_error" + }, + { + "id": "invalid_comparison_log_query_scalar", + "family": "invalid", + "endpoint": "query", + "query": "{app=\"api-gateway\"} > 0", + "expectation": "client_error" + }, + { + "id": "invalid_malformed_selector", + "family": "invalid", + "endpoint": "query_range", + "query": "{app=\"api-gateway\"", + "expectation": "client_error" + }, + { + "id": "invalid_parser_tail", + "family": "invalid", + "endpoint": "query_range", + "query": "{app=\"api-gateway\"} | json |", + "expectation": "client_error" + }, + { + "id": "invalid_binary_log_query_scalar", + "family": "invalid", + "endpoint": "query", + "query": "{app=\"api-gateway\"} / 2", + "expectation": "client_error" + } + ] +} diff --git a/test/e2e-compat/query-semantics-operations.json b/test/e2e-compat/query-semantics-operations.json new file mode 100644 index 00000000..45dc7518 --- /dev/null +++ b/test/e2e-compat/query-semantics-operations.json @@ -0,0 +1,146 @@ +{ + "description": "Tracked Loki-facing query operations that must stay covered by the e2e semantics matrix.", + "operations": [ + { + "name": "selector_exact", + "category": "selector", + "cases": ["selector_exact_instant", "selector_exact_range"] + }, + { + "name": "selector_regex", + "category": "selector", + "cases": ["multi_label_regex_range", "multi_label_negative_regex"] + }, + { + "name": "line_filter_substring", + "category": "line_filter", + "cases": ["line_filters_chained", "line_filters_chain_mixed"] + }, + { + "name": "line_filter_negative_regex", + "category": "line_filter", + "cases": ["line_filters_negative_regex"] + }, + { + "name": "parser_json", + "category": "parser", + "cases": ["json_pipeline_field_filter", "json_pipeline_numeric_filter", "json_pipeline_regex_filter"] + }, + { + "name": "parser_logfmt", + "category": "parser", + "cases": ["logfmt_pipeline_field_filter"] + }, + { + "name": "parser_regexp", + "category": "parser", + "cases": ["regexp_pipeline_field_filter"] + }, + { + "name": "parser_pattern", + "category": "parser", + "cases": ["pattern_pipeline_field_filter"] + }, + { + "name": "format_line", + "category": "formatting", + "cases": ["json_then_line_format"] + }, + { + "name": "format_label", + "category": "formatting", + "cases": ["label_format_pipeline"] + }, + { + "name": "format_keep_drop", + "category": "formatting", + "cases": ["keep_pipeline", "drop_pipeline"] + }, + { + "name": "metric_count_rate", + "category": "metric_function", + "cases": ["count_over_time_by_level", "sum_rate_by_app", "rate_with_line_filter"] + }, + { + "name": "metric_bytes", + "category": "metric_function", + "cases": ["parser_bytes_metric_range_filter", "parser_bytes_rate_metric_range_filter"] + }, + { + "name": "metric_parser_cardinality", + "category": "metric_function", + "cases": ["parser_metric_range_filter", "unwrap_quantile_over_time"] + }, + { + "name": "metric_unwrap_range_functions", + "category": "metric_function", + "cases": [ + "sum_over_time_unwrap", + "avg_over_time_unwrap", + "max_over_time_unwrap", + "min_over_time_unwrap", + "first_over_time_unwrap", + "last_over_time_unwrap", + "stddev_over_time_unwrap", + "stdvar_over_time_unwrap", + "unwrap_quantile_over_time" + ] + }, + { + "name": "metric_absent", + "category": "metric_function", + "cases": ["absent_over_time_missing"] + }, + { + "name": "metric_grouping", + "category": "aggregation", + "cases": ["without_grouping_metric", "negative_regex_metric_range"] + }, + { + "name": "instant_post_aggregation", + "category": "aggregation", + "cases": ["topk_metric", "bottomk_metric", "sort_metric", "sort_desc_metric"] + }, + { + "name": "binary_scalar", + "category": "binary", + "cases": ["comparison_scalar_metric_gt_zero", "comparison_scalar_metric_bool_gt_zero", "scalar_binary_metric"] + }, + { + "name": "binary_vector", + "category": "binary", + "cases": ["vector_binary_metric", "on_grouping_metric", "group_left_join_metric", "vector_and_metric", "vector_or_metric", "vector_unless_metric"] + }, + { + "name": "binary_vector_invalid_matching", + "category": "binary", + "cases": ["ignoring_grouping_metric"] + }, + { + "name": "empty_success", + "category": "result_shape", + "cases": ["impossible_filter_empty_success"] + }, + { + "name": "invalid_log_metric_mix", + "category": "invalid", + "cases": [ + "invalid_metric_aggregation_on_log_query", + "invalid_topk_on_log_query", + "invalid_sort_on_log_query", + "invalid_bottomk_on_log_query", + "invalid_sort_desc_on_log_query" + ] + }, + { + "name": "invalid_metric_forms", + "category": "invalid", + "cases": ["invalid_rate_missing_range", "invalid_comparison_log_query_scalar", "invalid_binary_log_query_scalar"] + }, + { + "name": "invalid_syntax", + "category": "invalid", + "cases": ["invalid_malformed_selector", "invalid_parser_tail"] + } + ] +} diff --git a/test/e2e-compat/query_semantics_matrix_test.go b/test/e2e-compat/query_semantics_matrix_test.go new file mode 100644 index 00000000..feaf280c --- /dev/null +++ b/test/e2e-compat/query_semantics_matrix_test.go @@ -0,0 +1,293 @@ +//go:build e2e + +package e2e_compat + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "os" + "sort" + "strings" + "testing" + "time" +) + +type querySemanticsMatrix struct { + Description string `json:"description"` + Oracle string `json:"oracle"` + Cases []querySemanticsCase `json:"cases"` +} + +type querySemanticsOperations struct { + Description string `json:"description"` + Operations []querySemanticsOperation `json:"operations"` +} + +type querySemanticsOperation struct { + Name string `json:"name"` + Category string `json:"category"` + Cases []string `json:"cases"` +} + +type querySemanticsCase struct { + ID string `json:"id"` + Family string `json:"family"` + Endpoint string `json:"endpoint"` + Query string `json:"query"` + Step string `json:"step"` + Expectation string `json:"expectation"` + ExpectResultType string `json:"expect_result_type"` + Compare string `json:"compare"` + RequireNonEmpty bool `json:"require_non_empty"` +} + +func loadQuerySemanticsMatrix(t *testing.T) querySemanticsMatrix { + t.Helper() + + data, err := os.ReadFile("query-semantics-matrix.json") + if err != nil { + t.Fatalf("read query semantics matrix: %v", err) + } + var matrix querySemanticsMatrix + if err := json.Unmarshal(data, &matrix); err != nil { + t.Fatalf("decode query semantics matrix: %v", err) + } + if len(matrix.Cases) == 0 { + t.Fatal("query semantics matrix must not be empty") + } + for _, tc := range matrix.Cases { + if tc.ID == "" || tc.Endpoint == "" || tc.Query == "" || tc.Expectation == "" { + t.Fatalf("query semantics case must define id, endpoint, query, and expectation: %+v", tc) + } + switch tc.Endpoint { + case "query", "query_range": + default: + t.Fatalf("unsupported query semantics endpoint %q in case %q", tc.Endpoint, tc.ID) + } + switch tc.Expectation { + case "success", "client_error", "server_error": + default: + t.Fatalf("unsupported query semantics expectation %q in case %q", tc.Expectation, tc.ID) + } + } + return matrix +} + +func loadQuerySemanticsOperations(t *testing.T) querySemanticsOperations { + t.Helper() + + data, err := os.ReadFile("query-semantics-operations.json") + if err != nil { + t.Fatalf("read query semantics operations inventory: %v", err) + } + var inventory querySemanticsOperations + if err := json.Unmarshal(data, &inventory); err != nil { + t.Fatalf("decode query semantics operations inventory: %v", err) + } + if inventory.Description == "" || len(inventory.Operations) == 0 { + t.Fatal("query semantics operations inventory must not be empty") + } + for _, op := range inventory.Operations { + if op.Name == "" || op.Category == "" || len(op.Cases) == 0 { + t.Fatalf("invalid query semantics operation inventory entry: %+v", op) + } + } + return inventory +} + +func querySemanticsGET(t *testing.T, baseURL string, tc querySemanticsCase) (int, string, map[string]interface{}) { + t.Helper() + + params := url.Values{} + params.Set("query", tc.Query) + + now := time.Now() + if tc.Endpoint == "query_range" { + params.Set("start", fmt.Sprintf("%d", now.Add(-10*time.Minute).UnixNano())) + params.Set("end", fmt.Sprintf("%d", now.UnixNano())) + params.Set("limit", "1000") + if tc.Step != "" { + params.Set("step", tc.Step) + } + return doJSONGET(t, baseURL+"/loki/api/v1/query_range?"+params.Encode(), nil) + } + + params.Set("time", fmt.Sprintf("%d", now.UnixNano())) + return doJSONGET(t, baseURL+"/loki/api/v1/query?"+params.Encode(), nil) +} + +func querySemanticsResultCount(resp map[string]interface{}) int { + data := extractMap(resp, "data") + return len(extractArray(data, "result")) +} + +func querySemanticsStatusString(resp map[string]interface{}) string { + if resp == nil { + return "" + } + if status, _ := resp["status"].(string); status != "" { + return status + } + return "" +} + +func querySemanticsErrorType(resp map[string]interface{}) string { + if resp == nil { + return "" + } + if errorType, _ := resp["errorType"].(string); errorType != "" { + return errorType + } + return "" +} + +func querySemanticsMetricKeySet(resp map[string]interface{}) []string { + data := extractMap(resp, "data") + results := extractArray(data, "result") + keys := make([]string, 0, len(results)) + for _, item := range results { + entry, ok := item.(map[string]interface{}) + if !ok { + continue + } + metric := extractMap(entry, "metric") + parts := make([]string, 0, len(metric)) + for key, value := range metric { + parts = append(parts, fmt.Sprintf("%s=%v", key, value)) + } + sort.Strings(parts) + keys = append(keys, strings.Join(parts, ",")) + } + sort.Strings(keys) + return keys +} + +func TestQuerySemanticsMatrixManifest(t *testing.T) { + matrix := loadQuerySemanticsMatrix(t) + if matrix.Description == "" || matrix.Oracle == "" { + t.Fatalf("query semantics matrix must describe its purpose and oracle: %+v", matrix) + } +} + +func TestQuerySemanticsOperationsInventory(t *testing.T) { + matrix := loadQuerySemanticsMatrix(t) + inventory := loadQuerySemanticsOperations(t) + + caseIDs := make(map[string]struct{}, len(matrix.Cases)) + for _, tc := range matrix.Cases { + caseIDs[tc.ID] = struct{}{} + } + + seenCaseRefs := make(map[string]struct{}, len(matrix.Cases)) + for _, op := range inventory.Operations { + for _, caseID := range op.Cases { + if _, ok := caseIDs[caseID]; !ok { + t.Fatalf("operation %q references unknown query semantics case %q", op.Name, caseID) + } + seenCaseRefs[caseID] = struct{}{} + } + } + + for _, tc := range matrix.Cases { + if _, ok := seenCaseRefs[tc.ID]; !ok { + t.Fatalf("query semantics case %q is not tracked by the operations inventory", tc.ID) + } + } +} + +func TestQuerySemanticsMatrix(t *testing.T) { + ensureDataIngested(t) + matrix := loadQuerySemanticsMatrix(t) + + for _, tc := range matrix.Cases { + tc := tc + t.Run(tc.ID, func(t *testing.T) { + proxyStatus, proxyBody, proxyResp := querySemanticsGET(t, proxyURL, tc) + lokiStatus, lokiBody, lokiResp := querySemanticsGET(t, lokiURL, tc) + + switch tc.Expectation { + case "success": + if proxyStatus != http.StatusOK || lokiStatus != http.StatusOK { + t.Fatalf("expected 200 from both backends, proxy=%d loki=%d proxyBody=%s lokiBody=%s", proxyStatus, lokiStatus, proxyBody, lokiBody) + } + if !checkStatus(proxyResp) || !checkStatus(lokiResp) { + t.Fatalf("expected success payload from both backends, proxy=%v loki=%v", proxyResp, lokiResp) + } + + proxyData := extractMap(proxyResp, "data") + lokiData := extractMap(lokiResp, "data") + if proxyData["resultType"] != lokiData["resultType"] { + t.Fatalf("resultType mismatch for %s: proxy=%v loki=%v", tc.ID, proxyData["resultType"], lokiData["resultType"]) + } + if tc.ExpectResultType != "" && proxyData["resultType"] != tc.ExpectResultType { + t.Fatalf("unexpected resultType for %s: got=%v want=%s", tc.ID, proxyData["resultType"], tc.ExpectResultType) + } + + switch tc.Compare { + case "line_count": + proxyCount := countLogLines(proxyResp) + lokiCount := countLogLines(lokiResp) + if tc.RequireNonEmpty && (proxyCount == 0 || lokiCount == 0) { + t.Fatalf("expected non-empty log results for %s, proxy=%d loki=%d", tc.ID, proxyCount, lokiCount) + } + if proxyCount != lokiCount { + t.Fatalf("line-count mismatch for %s, proxy=%d loki=%d", tc.ID, proxyCount, lokiCount) + } + case "series_count": + proxyCount := querySemanticsResultCount(proxyResp) + lokiCount := querySemanticsResultCount(lokiResp) + if tc.RequireNonEmpty && (proxyCount == 0 || lokiCount == 0) { + t.Fatalf("expected non-empty result series for %s, proxy=%d loki=%d", tc.ID, proxyCount, lokiCount) + } + if proxyCount != lokiCount { + t.Fatalf("series-count mismatch for %s, proxy=%d loki=%d", tc.ID, proxyCount, lokiCount) + } + case "metric_keys": + proxyKeys := querySemanticsMetricKeySet(proxyResp) + lokiKeys := querySemanticsMetricKeySet(lokiResp) + if tc.RequireNonEmpty && (len(proxyKeys) == 0 || len(lokiKeys) == 0) { + t.Fatalf("expected non-empty metric key results for %s, proxy=%v loki=%v", tc.ID, proxyKeys, lokiKeys) + } + if len(proxyKeys) != len(lokiKeys) { + t.Fatalf("metric-key cardinality mismatch for %s, proxy=%v loki=%v", tc.ID, proxyKeys, lokiKeys) + } + for i := range proxyKeys { + if proxyKeys[i] != lokiKeys[i] { + t.Fatalf("metric-key mismatch for %s, proxy=%v loki=%v", tc.ID, proxyKeys, lokiKeys) + } + } + case "": + default: + t.Fatalf("unsupported comparison mode %q in case %q", tc.Compare, tc.ID) + } + + case "client_error": + if proxyStatus < 400 || proxyStatus >= 500 || lokiStatus < 400 || lokiStatus >= 500 { + t.Fatalf("expected 4xx from both backends, proxy=%d loki=%d proxyBody=%s lokiBody=%s", proxyStatus, lokiStatus, proxyBody, lokiBody) + } + if proxyStatus != lokiStatus { + t.Fatalf("expected matching 4xx status for %s, proxy=%d loki=%d proxyBody=%s lokiBody=%s", tc.ID, proxyStatus, lokiStatus, proxyBody, lokiBody) + } + proxyPayloadStatus := querySemanticsStatusString(proxyResp) + lokiPayloadStatus := querySemanticsStatusString(lokiResp) + if proxyPayloadStatus != "" && lokiPayloadStatus != "" && proxyPayloadStatus != lokiPayloadStatus { + t.Fatalf("expected matching payload status for %s, proxy=%q loki=%q proxyBody=%s lokiBody=%s", tc.ID, querySemanticsStatusString(proxyResp), querySemanticsStatusString(lokiResp), proxyBody, lokiBody) + } + proxyErrorType := querySemanticsErrorType(proxyResp) + lokiErrorType := querySemanticsErrorType(lokiResp) + if proxyErrorType != "" && lokiErrorType != "" && proxyErrorType != lokiErrorType { + t.Fatalf("expected matching errorType for %s, proxy=%q loki=%q proxyBody=%s lokiBody=%s", tc.ID, querySemanticsErrorType(proxyResp), querySemanticsErrorType(lokiResp), proxyBody, lokiBody) + } + case "server_error": + if proxyStatus < 500 || proxyStatus >= 600 || lokiStatus < 500 || lokiStatus >= 600 { + t.Fatalf("expected 5xx from both backends, proxy=%d loki=%d proxyBody=%s lokiBody=%s", proxyStatus, lokiStatus, proxyBody, lokiBody) + } + if proxyStatus != lokiStatus { + t.Fatalf("expected matching 5xx status for %s, proxy=%d loki=%d proxyBody=%s lokiBody=%s", tc.ID, proxyStatus, lokiStatus, proxyBody, lokiBody) + } + } + }) + } +} diff --git a/test/e2e-ui/tests/logs-drilldown.spec.ts b/test/e2e-ui/tests/logs-drilldown.spec.ts index 5087497c..87835062 100644 --- a/test/e2e-ui/tests/logs-drilldown.spec.ts +++ b/test/e2e-ui/tests/logs-drilldown.spec.ts @@ -29,7 +29,7 @@ async function waitForDrilldownDetails(page: Page) { await expect(page.getByRole("combobox", { name: "Filter by fields" })).toBeVisible({ timeout: 30_000, }); - await expect(page.getByRole("tab", { name: /Logs\d+/ })).toBeVisible({ + await expect(page.getByRole("tab", { name: /^Logs/i }).first()).toBeVisible({ timeout: 30_000, }); } @@ -93,6 +93,18 @@ function uniqueQueries(queries: string[]) { return result; } +function extractExactServiceHint(query: string) { + const serviceMatch = query.match(/\{\s*service_name="([^"]+)"/); + if (serviceMatch?.[1]) { + return serviceMatch[1]; + } + const appMatch = query.match(/\{\s*app="([^"]+)"/); + if (appMatch?.[1]) { + return appMatch[1]; + } + return ""; +} + async function seedPatternsStream(page: Page) { const now = new Date(); const lines = [ @@ -228,11 +240,19 @@ async function waitForAutodetectedPatterns( lastPatternsPayload = null; } + const patternsData = (lastPatternsPayload as { data?: unknown[] } | null)?.data; + if (!Array.isArray(patternsData) || patternsData.length === 0) { + continue; + } + + const exactServiceHint = extractExactServiceHint(query); + if (exactServiceHint) { + return exactServiceHint; + } + if ( !seedResponse.ok() || - (lastSeedPayload as { status?: string } | null)?.status !== "success" || - !Array.isArray((lastPatternsPayload as { data?: unknown[] } | null)?.data) || - ((lastPatternsPayload as { data?: unknown[] }).data?.length ?? 0) === 0 + (lastSeedPayload as { status?: string } | null)?.status !== "success" ) { continue; } diff --git a/test/e2e-ui/tests/url-state.spec.ts b/test/e2e-ui/tests/url-state.spec.ts index 6f929b76..9094c3bf 100644 --- a/test/e2e-ui/tests/url-state.spec.ts +++ b/test/e2e-ui/tests/url-state.spec.ts @@ -35,8 +35,10 @@ test("buildServiceDrilldownUrl preserves service filter state across reloadable ); expect(built.pathname).toBe(`${DRILLDOWN_URL}/service/api-gateway/logs`); expect(built.searchParams.get("var-ds")).toBe("proxy-uid"); + expect(built.searchParams.get("var-primary_label")).toBe("service_name|=~|.+"); expect(built.searchParams.get("var-filters")).toBe("service_name|=|api-gateway"); expect(built.searchParams.get("var-fields")).toBe("method|=|GET"); + expect(built.searchParams.get("var-all-fields")).toBe("method|=|GET"); }); test("buildServiceDrilldownUrl keeps dotted field triplet for event-details filters @drilldown-core", async () => { diff --git a/test/e2e-ui/tests/url-state.ts b/test/e2e-ui/tests/url-state.ts index 60eeecc5..7f1d9d93 100644 --- a/test/e2e-ui/tests/url-state.ts +++ b/test/e2e-ui/tests/url-state.ts @@ -74,11 +74,16 @@ export function buildServiceDrilldownUrl( view: DrilldownView = "logs", overrides: Record = {} ): string { + const fieldFilter = overrides["var-fields"]; const params = new URLSearchParams({ ...baseDrilldownState(datasourceUid), + "var-primary_label": "service_name|=~|.+", "var-filters": `service_name|=|${serviceName}`, displayedFields: "[]", urlColumns: "[]", + ...(fieldFilter && !("var-all-fields" in overrides) + ? { "var-all-fields": fieldFilter } + : {}), ...overrides, });