Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/flyquery/core/agents/column_name_proposer_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ def build_column_name_proposer_agent(settings):
output_type=ProposedColumnNames,
instructions=prompt.instructions,
settings=settings,
# Deterministic naming: identical re-ingests must yield identical
# column names, otherwise reconcile sees phantom schema churn.
extra_settings={"temperature": 0.0},
)


Expand Down
3 changes: 3 additions & 0 deletions src/flyquery/core/agents/describe_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,7 @@ def build_describe_agent(settings):
output_type=DescribedObjects,
instructions=prompt.instructions,
settings=settings,
# Deterministic descriptions/semantic types: identical columns on
# re-ingest must produce identical metadata, no schema-change churn.
extra_settings={"temperature": 0.0},
)
3 changes: 3 additions & 0 deletions src/flyquery/core/agents/rename_detection_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,7 @@ def build_rename_detection_agent(settings):
settings=settings,
# Rename detection is a short task; cap output tokens tightly
max_output_tokens=2048,
# Deterministic: the same removed/candidate pair must always
# resolve the same way so re-ingests don't flip-flop renames.
extra_settings={"temperature": 0.0},
)
8 changes: 8 additions & 0 deletions src/flyquery/core/services/examples/auto_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class AutoLearner:
Skips when:
- ``retries > 0`` (query required critic refinement)
- PII findings were detected in the result
- the query returned no rows (``row_count`` is 0, when provided)

Called by QueryService (Phase D) after a successful execution.
"""
Expand All @@ -46,6 +47,7 @@ async def maybe_propose(
retries: int,
pii_findings: list[Any],
query_id: uuid.UUID,
row_count: int | None = None,
) -> None:
"""Insert a flyquery_examples row when all criteria pass.

Expand All @@ -57,11 +59,17 @@ async def maybe_propose(
:param retries: number of critic refinement loops (must be 0 to propose)
:param pii_findings: any PII signals detected (must be empty to propose)
:param query_id: UUID of the parent query record
:param row_count: number of rows the query returned; when provided it
must be > 0 to propose (a valid-but-wrong query returning 0 rows
would otherwise poison grounding). When ``None`` the row gate is
skipped to preserve behaviour for callers that do not pass it.
"""
if retries > 0:
return
if pii_findings:
return
if row_count is not None and row_count <= 0:
return
await self._service.create(
tenant_id,
workspace_id,
Expand Down
19 changes: 17 additions & 2 deletions src/flyquery/core/services/execution/ast_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,23 @@ def classify(self, sql: str) -> AstClassification:
# pyright does not unify with the public ``Expression`` base below.
kind = self._kind(stmt) # pyright: ignore[reportArgumentType]

# Collect table refs — skip anonymous subquery aliases
tables = tuple(sorted({t.name for t in stmt.find_all(sqlglot.expressions.Table) if t.name}))
# Collect table refs — skip anonymous subquery aliases AND
# CTE-defined names. sqlglot represents a reference to a CTE
# (``FROM base`` where ``WITH base AS (...)``) as an ``exp.Table``
# node, so without this filter the CTE alias leaks into
# ``table_refs``; the downstream bad-tables guard then flags it
# as a non-existent table and the (otherwise valid) query is
# rejected — see QueryService bad-tables set-difference.
cte_names = {cte.alias_or_name for cte in stmt.find_all(sqlglot.expressions.CTE) if cte.alias_or_name}
tables = tuple(
sorted(
{
t.name
for t in stmt.find_all(sqlglot.expressions.Table)
if t.name and t.name not in cte_names
}
)
)
columns = tuple(sorted({c.name for c in stmt.find_all(sqlglot.expressions.Column) if c.name}))
has_subquery = bool(list(stmt.find_all(sqlglot.expressions.Subquery)))

Expand Down
51 changes: 49 additions & 2 deletions src/flyquery/core/services/execution/table_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from __future__ import annotations

import json
import uuid

import sqlalchemy as sa
Expand All @@ -54,6 +55,7 @@ async def resolve(
dataset_id: uuid.UUID,
table_names: list[str],
object_store_base: str | None = None,
pins: dict[str, str] | None = None,
) -> dict[str, str]:
"""Return a mapping of table name → absolute parquet path.

Expand All @@ -65,6 +67,11 @@ async def resolve(
:param dataset_id: dataset to scope the lookup
:param table_names: unqualified table names from the AST
:param object_store_base: override for ``settings.object_store_base``
:param pins: optional ``{table_name: snapshot_id}`` — a follow-up
drill-down turn pins each table to the snapshot it resolved to
on the first turn, so a mid-conversation re-ingest does not
silently switch the answer to a newer schema. Unpinned tables
fall back to ``current_snapshot_id``.
:return: ``{name: path}`` dict for all resolvable tables
"""
if not table_names:
Expand All @@ -77,16 +84,56 @@ async def resolve(
SELECT t.name, ss.parquet_object_key
FROM flyquery_tables t
JOIN flyquery_schema_snapshots ss
ON ss.id = t.current_snapshot_id
ON ss.table_id = t.id
AND ss.id = COALESCE(
(CAST(:pins AS jsonb) ->> t.name)::uuid,
t.current_snapshot_id
)
WHERE t.dataset_id = :ds
AND t.name = ANY(:names)
AND t.is_active = true
"""),
{"ds": dataset_id, "names": list(table_names)},
{"ds": dataset_id, "names": list(table_names), "pins": json.dumps(pins or {})},
)

out: dict[str, str] = {}
for r in rows.mappings():
key: str = r["parquet_object_key"]
out[r["name"]] = f"{base}/{key}"
return out

async def table_kinds_by_name(self, dataset_id: uuid.UUID, table_names: list[str]) -> dict[str, str]:
"""Return ``{name: kind}`` for the active tables in the dataset.

Used by the firewall/bad-tables guard. Lives here (service layer)
rather than in a controller so the raw SQL stays out of the web tier.
"""
if not table_names:
return {}
rows = await self._session.execute(
sa.text("""
SELECT name, kind FROM flyquery_tables
WHERE dataset_id = :ds AND name = ANY(:names) AND is_active = true
"""),
{"ds": dataset_id, "names": list(table_names)},
)
return {r["name"]: r["kind"] for r in rows.mappings()}

async def current_snapshots(self, dataset_id: uuid.UUID, table_names: list[str]) -> dict[str, str]:
"""Return ``{table_name: current_snapshot_id}`` for the given tables.

Used to record THIS turn's snapshot pins so a later drill-down turn
can reproduce the exact schema version it answered against.
"""
if not table_names:
return {}
rows = await self._session.execute(
sa.text("""
SELECT name, current_snapshot_id
FROM flyquery_tables
WHERE dataset_id = :ds AND name = ANY(:names) AND is_active = true
AND current_snapshot_id IS NOT NULL
"""),
{"ds": dataset_id, "names": list(table_names)},
)
return {r["name"]: str(r["current_snapshot_id"]) for r in rows.mappings()}
Loading
Loading