From 00e9a9c5b421ef8a337abb1c5aeb30bfe5066307 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Sun, 28 Dec 2025 23:17:01 +0530 Subject: [PATCH 01/14] Added PostgresSchemaRegistry.java --- .../core/documentstore/Datastore.java | 6 + .../documentstore/commons/ColumnMetadata.java | 13 ++ .../documentstore/commons/SchemaRegistry.java | 12 ++ .../postgres/FlatPostgresCollection.java | 119 ++++++++++++++++- .../postgres/PostgresDatastore.java | 20 ++- .../postgres/PostgresMetadataFetcher.java | 126 ++++++++++++++++++ .../postgres/PostgresSchemaRegistry.java | 56 ++++++++ .../model/PostgresColumnMetadata.java | 39 ++++++ 8 files changed, 384 insertions(+), 7 deletions(-) create mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java create mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java create mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java create mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java create mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/Datastore.java b/document-store/src/main/java/org/hypertrace/core/documentstore/Datastore.java index b426cb06..4022dd46 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/Datastore.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/Datastore.java @@ -2,6 +2,8 @@ import java.util.Map; import java.util.Set; +import org.hypertrace.core.documentstore.commons.ColumnMetadata; +import org.hypertrace.core.documentstore.commons.SchemaRegistry; import org.hypertrace.core.documentstore.metric.DocStoreMetricProvider; public interface Datastore { @@ -19,6 +21,10 @@ public interface Datastore { @SuppressWarnings("unused") DocStoreMetricProvider getDocStoreMetricProvider(); + default SchemaRegistry getSchemaRegistry() { + return null; + } + void close(); /** diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java new file mode 100644 index 00000000..2850485a --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java @@ -0,0 +1,13 @@ +package org.hypertrace.core.documentstore.commons; + +import org.hypertrace.core.documentstore.expression.impl.DataType; + +public interface ColumnMetadata { + String getName(); + + DataType getCanonicalType(); + + String getInternalType(); + + boolean isNullable(); +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java new file mode 100644 index 00000000..544743d5 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java @@ -0,0 +1,12 @@ +package org.hypertrace.core.documentstore.commons; + +import java.util.Map; + +public interface SchemaRegistry { + + Map getSchema(String tableName); + + void invalidate(String tableName); + + T getColumnOrRefresh(String tableName, String colName); +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 0b7afc62..99b9fe0e 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -1,10 +1,18 @@ package org.hypertrace.core.documentstore.postgres; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import org.hypertrace.core.documentstore.BulkArrayValueUpdateRequest; import org.hypertrace.core.documentstore.BulkDeleteResult; import org.hypertrace.core.documentstore.BulkUpdateRequest; @@ -18,6 +26,7 @@ import org.hypertrace.core.documentstore.model.options.QueryOptions; import org.hypertrace.core.documentstore.model.options.UpdateOptions; import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate; +import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser; import org.hypertrace.core.documentstore.postgres.query.v1.transformer.FlatPostgresFieldTransformer; import org.hypertrace.core.documentstore.query.Query; @@ -34,11 +43,18 @@ public class FlatPostgresCollection extends PostgresCollection { private static final Logger LOGGER = LoggerFactory.getLogger(FlatPostgresCollection.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String WRITE_NOT_SUPPORTED = "Write operations are not supported for flat collections yet!"; - FlatPostgresCollection(final PostgresClient client, final String collectionName) { + private final PostgresSchemaRegistry schemaRegistry; + + FlatPostgresCollection( + final PostgresClient client, + final String collectionName, + final PostgresSchemaRegistry schemaRegistry) { super(client, collectionName); + this.schemaRegistry = schemaRegistry; } @Override @@ -81,7 +97,106 @@ public boolean upsert(Key key, Document document) throws IOException { @Override public Document upsertAndReturn(Key key, Document document) throws IOException { - throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED); + String tableName = tableIdentifier.getTableName(); + Map schema = schemaRegistry.getSchema(tableName); + + if (schema.isEmpty()) { + throw new IOException("No schema found for table: " + tableName); + } + + try { + JsonNode docJson = OBJECT_MAPPER.readTree(document.toJson()); + List columns = new ArrayList<>(); + List values = new ArrayList<>(); + + // Extract fields from document that exist in schema + Iterator> fields = docJson.fields(); + while (fields.hasNext()) { + Map.Entry field = fields.next(); + String colName = field.getKey(); + PostgresColumnMetadata colMeta = schemaRegistry.getColumnOrRefresh(tableName, colName); + if (colMeta != null) { + columns.add(colName); + values.add(extractValue(field.getValue())); + } + } + + if (columns.isEmpty()) { + throw new IOException("No matching columns found in schema for document"); + } + + // Build UPSERT SQL: INSERT ... ON CONFLICT DO UPDATE + String columnList = String.join(", ", columns); + String placeholders = columns.stream().map(c -> "?").collect(Collectors.joining(", ")); + String updateSet = columns.stream() + .map(c -> c + " = EXCLUDED." + c) + .collect(Collectors.joining(", ")); + + // Determine primary key column (assume first column or 'id') + String pkColumn = schema.containsKey("id") ? "id" : columns.get(0); + + String sql = String.format( + "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET %s RETURNING *", + tableIdentifier, columnList, placeholders, pkColumn, updateSet); + + try (PreparedStatement ps = client.getConnection().prepareStatement(sql)) { + for (int i = 0; i < values.size(); i++) { + ps.setObject(i + 1, values.get(i)); + } + + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + return resultSetToDocument(rs, columns); + } + } + } + return document; + } catch (SQLException e) { + LOGGER.error("SQLException in upsertAndReturn. key: {} document: {}", key, document, e); + throw new IOException(e); + } + } + + private Object extractValue(JsonNode node) { + if (node.isNull()) { + return null; + } else if (node.isBoolean()) { + return node.booleanValue(); + } else if (node.isInt()) { + return node.intValue(); + } else if (node.isLong()) { + return node.longValue(); + } else if (node.isDouble() || node.isFloat()) { + return node.doubleValue(); + } else if (node.isTextual()) { + return node.textValue(); + } else { + return node.toString(); + } + } + + private Document resultSetToDocument(ResultSet rs, List columns) + throws SQLException, IOException { + StringBuilder json = new StringBuilder("{"); + for (int i = 0; i < columns.size(); i++) { + if (i > 0) { + json.append(","); + } + String col = columns.get(i); + Object value = rs.getObject(col); + json.append("\"").append(col).append("\":"); + if (value == null) { + json.append("null"); + } else if (value instanceof String) { + json.append("\"").append(value).append("\""); + } else if (value instanceof Boolean) { + json.append(value); + } else { + json.append(value); + } + } + json.append("}"); + return new org.hypertrace.core.documentstore.JSONDocument(json.toString()); } @Override diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java index afbb6f98..4b740322 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java @@ -20,11 +20,14 @@ import org.hypertrace.core.documentstore.Collection; import org.hypertrace.core.documentstore.Datastore; import org.hypertrace.core.documentstore.DocumentType; +import org.hypertrace.core.documentstore.commons.ColumnMetadata; +import org.hypertrace.core.documentstore.commons.SchemaRegistry; import org.hypertrace.core.documentstore.metric.DocStoreMetricProvider; import org.hypertrace.core.documentstore.metric.postgres.PostgresDocStoreMetricProvider; import org.hypertrace.core.documentstore.model.config.ConnectionConfig; import org.hypertrace.core.documentstore.model.config.DatastoreConfig; import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig; +import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +40,7 @@ public class PostgresDatastore implements Datastore { private final PostgresClient client; private final String database; private final DocStoreMetricProvider docStoreMetricProvider; + private final SchemaRegistry schemaRegistry; public PostgresDatastore(@NonNull final DatastoreConfig datastoreConfig) { final ConnectionConfig connectionConfig = datastoreConfig.connectionConfig(); @@ -57,6 +61,7 @@ public PostgresDatastore(@NonNull final DatastoreConfig datastoreConfig) { client = new PostgresClient(postgresConnectionConfig); database = connectionConfig.database(); docStoreMetricProvider = new PostgresDocStoreMetricProvider(this, postgresConnectionConfig); + schemaRegistry = new PostgresSchemaRegistry(new PostgresMetadataFetcher(this)); } catch (final IllegalArgumentException e) { throw new IllegalArgumentException( String.format("Unable to instantiate PostgresClient with config:%s", connectionConfig), @@ -81,7 +86,7 @@ public Set listCollections() { Set collections = new HashSet<>(); try { DatabaseMetaData metaData = client.getConnection().getMetaData(); - ResultSet tables = metaData.getTables(null, null, "%", new String[] {"TABLE"}); + ResultSet tables = metaData.getTables(null, null, "%", new String[]{"TABLE"}); while (tables.next()) { Optional nonPublicSchema = Optional.ofNullable(tables.getString("TABLE_SCHEM")) @@ -161,10 +166,9 @@ public Collection getCollection(String collectionName) { @Override public Collection getCollectionForType(String collectionName, DocumentType documentType) { switch (documentType) { - case FLAT: - { - return new FlatPostgresCollection(client, collectionName); - } + case FLAT: { + return new FlatPostgresCollection(client, collectionName, (PostgresSchemaRegistry) schemaRegistry); + } case NESTED: return getCollection(collectionName); default: @@ -189,6 +193,12 @@ public DocStoreMetricProvider getDocStoreMetricProvider() { return docStoreMetricProvider; } + @SuppressWarnings("unchecked") + @Override + public SchemaRegistry getSchemaRegistry() { + return schemaRegistry; + } + @Override public void close() { try { diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java new file mode 100644 index 00000000..b2e2e957 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java @@ -0,0 +1,126 @@ +package org.hypertrace.core.documentstore.postgres; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import lombok.AllArgsConstructor; +import org.hypertrace.core.documentstore.expression.impl.DataType; +import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; + +/** + * Fetches schema metadata directly from Postgres system catalogs. Hardcoded to query + * information_schema.columns. + */ +@AllArgsConstructor +public class PostgresMetadataFetcher { + + private final PostgresDatastore datastore; + + // Hardcoded SQL for high-performance schema discovery + private static final String DISCOVERY_SQL = + "SELECT column_name, udt_name, is_nullable " + + "FROM information_schema.columns " + + "WHERE table_schema = 'public' AND table_name = ?"; + + public Map fetch(String tableName) { + Map metadataMap = new HashMap<>(); + + try (Connection conn = datastore.getPostgresClient(); + PreparedStatement ps = conn.prepareStatement(DISCOVERY_SQL)) { + + ps.setString(1, tableName); + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + String columnName = rs.getString("column_name"); + String udtName = rs.getString("udt_name"); + boolean isNullable = "YES".equalsIgnoreCase(rs.getString("is_nullable")); + metadataMap.put( + columnName, + new PostgresColumnMetadata( + columnName, + mapToCanonicalType(udtName), + mapToPostgresType(udtName), + udtName, + isNullable)); + } + } + return metadataMap; + } catch (SQLException e) { + throw new RuntimeException("Failed to fetch Postgres metadata for table: " + tableName, e); + } + } + + /** + * Maps Postgres udt_name to canonical DataType. + */ + private DataType mapToCanonicalType(String udtName) { + if (udtName == null) { + return DataType.UNSPECIFIED; + } + + switch (udtName.toLowerCase()) { + case "int4": + case "int2": + return DataType.INTEGER; + case "int8": + return DataType.LONG; + case "float4": + return DataType.FLOAT; + case "float8": + case "numeric": + return DataType.DOUBLE; + case "bool": + return DataType.BOOLEAN; + case "timestamptz": + return DataType.TIMESTAMPTZ; + case "date": + return DataType.DATE; + case "text": + case "varchar": + case "bpchar": + case "uuid": + return DataType.STRING; + default: + return DataType.UNSPECIFIED; + } + } + + /** + * Maps Postgres udt_name to PostgresDataType. + */ + private PostgresDataType mapToPostgresType(String udtName) { + if (udtName == null) { + return PostgresDataType.UNKNOWN; + } + + switch (udtName.toLowerCase()) { + case "int4": + case "int2": + return PostgresDataType.INTEGER; + case "int8": + return PostgresDataType.BIGINT; + case "float4": + return PostgresDataType.REAL; + case "float8": + case "numeric": + return PostgresDataType.DOUBLE_PRECISION; + case "bool": + return PostgresDataType.BOOLEAN; + case "timestamptz": + return PostgresDataType.TIMESTAMPTZ; + case "date": + return PostgresDataType.DATE; + case "text": + case "varchar": + case "bpchar": + case "uuid": + return PostgresDataType.TEXT; + default: + return PostgresDataType.UNKNOWN; + } + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java new file mode 100644 index 00000000..bc5ad46e --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java @@ -0,0 +1,56 @@ +package org.hypertrace.core.documentstore.postgres; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.hypertrace.core.documentstore.commons.SchemaRegistry; +import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; + +public class PostgresSchemaRegistry implements SchemaRegistry { + + private final LoadingCache> cache; + private final PostgresMetadataFetcher fetcher; + + public PostgresSchemaRegistry(PostgresMetadataFetcher fetcher) { + this.fetcher = fetcher; + this.cache = + CacheBuilder.newBuilder() + .expireAfterWrite(24, TimeUnit.HOURS) + .build( + new CacheLoader<>() { + @Override + public Map load(String tableName) { + return fetcher.fetch(tableName); // Hardcoded SQL Discovery + } + }); + } + + @Override + public Map getSchema(String tableName) { + try { + return cache.get(tableName); + } catch (ExecutionException e) { + throw new RuntimeException("Failed to fetch schema for " + tableName, e.getCause()); + } + } + + @Override + public void invalidate(String tableName) { + cache.invalidate(tableName); + } + + @Override + public PostgresColumnMetadata getColumnOrRefresh(String tableName, String colName) { + Map schema = getSchema(tableName); + + if (!schema.containsKey(colName)) { + invalidate(tableName); + schema = getSchema(tableName); + } + + return schema.get(colName); + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java new file mode 100644 index 00000000..7f2a18fd --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java @@ -0,0 +1,39 @@ +package org.hypertrace.core.documentstore.postgres.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import org.hypertrace.core.documentstore.commons.ColumnMetadata; +import org.hypertrace.core.documentstore.expression.impl.DataType; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; + +@Builder +@AllArgsConstructor +public class PostgresColumnMetadata implements ColumnMetadata { + + private final String colName; + private final DataType canonicalType; + @Getter private final PostgresDataType postgresType; + private final String pgType; + private final boolean nullable; + + @Override + public String getName() { + return colName; + } + + @Override + public DataType getCanonicalType() { + return canonicalType; + } + + @Override + public String getInternalType() { + return pgType; + } + + @Override + public boolean isNullable() { + return nullable; + } +} From 31846e946d90dd7e3e525d7fc769e152bc575a59 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Sun, 28 Dec 2025 23:17:12 +0530 Subject: [PATCH 02/14] Spotless --- .../postgres/FlatPostgresCollection.java | 12 ++++++------ .../documentstore/postgres/PostgresDatastore.java | 11 ++++++----- .../postgres/PostgresMetadataFetcher.java | 8 ++------ 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 99b9fe0e..fa1c1ea1 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -128,16 +128,16 @@ public Document upsertAndReturn(Key key, Document document) throws IOException { // Build UPSERT SQL: INSERT ... ON CONFLICT DO UPDATE String columnList = String.join(", ", columns); String placeholders = columns.stream().map(c -> "?").collect(Collectors.joining(", ")); - String updateSet = columns.stream() - .map(c -> c + " = EXCLUDED." + c) - .collect(Collectors.joining(", ")); + String updateSet = + columns.stream().map(c -> c + " = EXCLUDED." + c).collect(Collectors.joining(", ")); // Determine primary key column (assume first column or 'id') String pkColumn = schema.containsKey("id") ? "id" : columns.get(0); - String sql = String.format( - "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET %s RETURNING *", - tableIdentifier, columnList, placeholders, pkColumn, updateSet); + String sql = + String.format( + "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET %s RETURNING *", + tableIdentifier, columnList, placeholders, pkColumn, updateSet); try (PreparedStatement ps = client.getConnection().prepareStatement(sql)) { for (int i = 0; i < values.size(); i++) { diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java index 4b740322..9a7fa206 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java @@ -20,7 +20,6 @@ import org.hypertrace.core.documentstore.Collection; import org.hypertrace.core.documentstore.Datastore; import org.hypertrace.core.documentstore.DocumentType; -import org.hypertrace.core.documentstore.commons.ColumnMetadata; import org.hypertrace.core.documentstore.commons.SchemaRegistry; import org.hypertrace.core.documentstore.metric.DocStoreMetricProvider; import org.hypertrace.core.documentstore.metric.postgres.PostgresDocStoreMetricProvider; @@ -86,7 +85,7 @@ public Set listCollections() { Set collections = new HashSet<>(); try { DatabaseMetaData metaData = client.getConnection().getMetaData(); - ResultSet tables = metaData.getTables(null, null, "%", new String[]{"TABLE"}); + ResultSet tables = metaData.getTables(null, null, "%", new String[] {"TABLE"}); while (tables.next()) { Optional nonPublicSchema = Optional.ofNullable(tables.getString("TABLE_SCHEM")) @@ -166,9 +165,11 @@ public Collection getCollection(String collectionName) { @Override public Collection getCollectionForType(String collectionName, DocumentType documentType) { switch (documentType) { - case FLAT: { - return new FlatPostgresCollection(client, collectionName, (PostgresSchemaRegistry) schemaRegistry); - } + case FLAT: + { + return new FlatPostgresCollection( + client, collectionName, (PostgresSchemaRegistry) schemaRegistry); + } case NESTED: return getCollection(collectionName); default: diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java index b2e2e957..474df319 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java @@ -54,9 +54,7 @@ public Map fetch(String tableName) { } } - /** - * Maps Postgres udt_name to canonical DataType. - */ + /** Maps Postgres udt_name to canonical DataType. */ private DataType mapToCanonicalType(String udtName) { if (udtName == null) { return DataType.UNSPECIFIED; @@ -89,9 +87,7 @@ private DataType mapToCanonicalType(String udtName) { } } - /** - * Maps Postgres udt_name to PostgresDataType. - */ + /** Maps Postgres udt_name to PostgresDataType. */ private PostgresDataType mapToPostgresType(String udtName) { if (udtName == null) { return PostgresDataType.UNKNOWN; From 2fdbf0eb7c3d34051c4e60a129058c300f8c981a Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 29 Dec 2025 14:41:02 +0530 Subject: [PATCH 03/14] WIP --- .../postgres/PostgresSchemaRegistry.java | 142 +++++++- .../postgres/PostgresSchemaRegistryTest.java | 338 ++++++++++++++++++ 2 files changed, 475 insertions(+), 5 deletions(-) create mode 100644 document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryTest.java diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java index bc5ad46e..f3c06673 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java @@ -3,31 +3,124 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.hypertrace.core.documentstore.commons.SchemaRegistry; import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; +/** + * A lazily-loaded, cached schema registry for PostgreSQL tables. + * + *

This registry fetches and caches column metadata from PostgreSQL's {@code information_schema} + * on demand. It provides: + * + *

    + *
  • Lazy loading: Schema metadata is fetched only when first requested for the + * particular table. + *
  • TTL-based caching: Cached schemas expire after a configurable duration (default: 24 + * hours). + *
  • Circuit breaker: Prevents excessive database calls by enforcing a cooldown period + * between refresh attempts for missing columns (default: 15 minutes). + *
+ * + *

Usage Example

+ * + *
{@code
+ * PostgresMetadataFetcher fetcher = new PostgresMetadataFetcher(datastore);
+ * PostgresSchemaRegistry registry = new PostgresSchemaRegistry(fetcher);
+ *
+ * // Get all columns for a table
+ * Map schema = registry.getSchema("my_table");
+ *
+ * // Get a specific column, refreshing if not found (respects cooldown)
+ * PostgresColumnMetadata column = registry.getColumnOrRefresh("my_table", "my_column");
+ * }
+ * + *

Thread Safety

+ * + *

This class is thread-safe. The underlying Guava {@link LoadingCache} and {@link + * ConcurrentHashMap} handle concurrent access. + * + * @see PostgresMetadataFetcher + * @see PostgresColumnMetadata + */ public class PostgresSchemaRegistry implements SchemaRegistry { + /** Default cache expiry time: 24 hours. */ + private static final Duration DEFAULT_CACHE_EXPIRY = Duration.ofHours(24); + + /** Default cooldown period between refresh attempts: 15 minutes. */ + private static final Duration DEFAULT_REFRESH_COOLDOWN = Duration.ofMinutes(15); + private final LoadingCache> cache; - private final PostgresMetadataFetcher fetcher; + private final Map lastRefreshTimes; + private final Duration refreshCooldown; + private final Clock clock; + /** + * Creates a new schema registry with default settings. + * + *

Uses default cache expiry of 24 hours and refresh cooldown of 15 minutes. + * + * @param fetcher the metadata fetcher to use for loading schema information + */ public PostgresSchemaRegistry(PostgresMetadataFetcher fetcher) { - this.fetcher = fetcher; + this(fetcher, DEFAULT_CACHE_EXPIRY, DEFAULT_REFRESH_COOLDOWN, Clock.systemUTC()); + } + + /** + * Creates a new schema registry with custom cache settings. + * + * @param fetcher the metadata fetcher to use for loading schema information + * @param cacheExpiry how long to keep cached schemas before they expire + * @param refreshCooldown minimum time between refresh attempts for missing columns + */ + public PostgresSchemaRegistry( + PostgresMetadataFetcher fetcher, Duration cacheExpiry, Duration refreshCooldown) { + this(fetcher, cacheExpiry, refreshCooldown, Clock.systemUTC()); + } + + /** + * Creates a new schema registry with custom settings and clock (for testing). + * + * @param fetcher the metadata fetcher to use for loading schema information + * @param cacheExpiry how long to keep cached schemas before they expire + * @param refreshCooldown minimum time between refresh attempts for missing columns + * @param clock the clock to use for time-based operations + */ + PostgresSchemaRegistry( + PostgresMetadataFetcher fetcher, + Duration cacheExpiry, + Duration refreshCooldown, + Clock clock) { + this.refreshCooldown = refreshCooldown; + this.clock = clock; + this.lastRefreshTimes = new ConcurrentHashMap<>(); this.cache = CacheBuilder.newBuilder() - .expireAfterWrite(24, TimeUnit.HOURS) + .expireAfterWrite(cacheExpiry.toMinutes(), TimeUnit.MINUTES) .build( new CacheLoader<>() { @Override public Map load(String tableName) { - return fetcher.fetch(tableName); // Hardcoded SQL Discovery + lastRefreshTimes.put(tableName, clock.instant()); + return fetcher.fetch(tableName); } }); } + /** + * Gets the schema for a table, loading it from the database if not cached. + * + * @param tableName the name of the table + * @return a map of column names to their metadata + * @throws RuntimeException if the schema cannot be fetched + */ @Override public Map getSchema(String tableName) { try { @@ -37,20 +130,59 @@ public Map getSchema(String tableName) { } } + /** + * Invalidates the cached schema for a specific table. + * + *

The next call to {@link #getSchema(String)} will reload the schema from the database. + * + * @param tableName the name of the table to invalidate + */ @Override public void invalidate(String tableName) { cache.invalidate(tableName); } + /** + * Gets metadata for a specific column, optionally refreshing the schema if the column is not + * found. + * + *

This method implements a circuit breaker pattern: + * + *

    + *
  • If the column exists in the cached schema, it is returned immediately. + *
  • If the column is not found and the cooldown period has elapsed since the last refresh, + * the schema is reloaded from the database. + *
  • If the column is not found but the cooldown period has not elapsed, {@code null} is + * returned without hitting the database. + *
+ * + * @param tableName the name of the table + * @param colName the name of the column + * @return the column metadata, or {@code null} if the column does not exist + */ @Override public PostgresColumnMetadata getColumnOrRefresh(String tableName, String colName) { Map schema = getSchema(tableName); - if (!schema.containsKey(colName)) { + if (!schema.containsKey(colName) && canRefresh(tableName)) { invalidate(tableName); schema = getSchema(tableName); } return schema.get(colName); } + + /** + * Checks if a refresh is allowed for the given table based on the cooldown period. + * + * @param tableName the name of the table + * @return {@code true} if the cooldown period has elapsed since the last refresh + */ + private boolean canRefresh(String tableName) { + Instant lastRefresh = lastRefreshTimes.get(tableName); + if (lastRefresh == null) { + return true; + } + return Duration.between(lastRefresh, clock.instant()).compareTo(refreshCooldown) >= 0; + } } diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryTest.java new file mode 100644 index 00000000..6eb85b01 --- /dev/null +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryTest.java @@ -0,0 +1,338 @@ +package org.hypertrace.core.documentstore.postgres; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import org.hypertrace.core.documentstore.expression.impl.DataType; +import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class PostgresSchemaRegistryTest { + + private static final String TEST_TABLE = "test_table"; + private static final String COL_ID = "id"; + private static final String COL_NAME = "name"; + private static final String COL_PRICE = "price"; + private static final Duration CACHE_EXPIRY = Duration.ofHours(24); + private static final Duration REFRESH_COOLDOWN = Duration.ofMinutes(15); + + @Mock private PostgresMetadataFetcher fetcher; + + private PostgresSchemaRegistry registry; + private MutableClock mutableClock; + + @BeforeEach + void setUp() { + mutableClock = new MutableClock(Instant.parse("2024-01-01T00:00:00Z")); + registry = new PostgresSchemaRegistry(fetcher, CACHE_EXPIRY, REFRESH_COOLDOWN, mutableClock); + } + + @Test + void getSchemaLoadsFromFetcherOnCacheMiss() { + Map expectedSchema = createTestSchema(); + when(fetcher.fetch(TEST_TABLE)).thenReturn(expectedSchema); + + Map result = registry.getSchema(TEST_TABLE); + + assertEquals(expectedSchema, result); + verify(fetcher, times(1)).fetch(TEST_TABLE); + } + + @Test + void getSchemaReturnsCachedValueOnSubsequentCalls() { + Map expectedSchema = createTestSchema(); + when(fetcher.fetch(TEST_TABLE)).thenReturn(expectedSchema); + + // First call - loads from fetcher + Map result1 = registry.getSchema(TEST_TABLE); + // Second call - should use cache + Map result2 = registry.getSchema(TEST_TABLE); + + assertEquals(expectedSchema, result1); + assertEquals(expectedSchema, result2); + // Fetcher should only be called once + verify(fetcher, times(1)).fetch(TEST_TABLE); + } + + @Test + void getSchemaLoadsEachTableIndependently() { + String table1 = "table1"; + String table2 = "table2"; + Map schema1 = createTestSchema(); + Map schema2 = new HashMap<>(); + schema2.put( + "other_col", + PostgresColumnMetadata.builder() + .colName("other_col") + .canonicalType(DataType.BOOLEAN) + .postgresType(PostgresDataType.BOOLEAN) + .pgType("bool") + .nullable(false) + .build()); + + when(fetcher.fetch(table1)).thenReturn(schema1); + when(fetcher.fetch(table2)).thenReturn(schema2); + + Map result1 = registry.getSchema(table1); + Map result2 = registry.getSchema(table2); + + assertEquals(schema1, result1); + assertEquals(schema2, result2); + verify(fetcher, times(1)).fetch(table1); + verify(fetcher, times(1)).fetch(table2); + } + + @Test + void invalidateClearsSpecificTableCache() { + Map schema = createTestSchema(); + when(fetcher.fetch(TEST_TABLE)).thenReturn(schema); + + // Load into cache + registry.getSchema(TEST_TABLE); + verify(fetcher, times(1)).fetch(TEST_TABLE); + + // Invalidate the cache + registry.invalidate(TEST_TABLE); + + // Next call should reload from fetcher + registry.getSchema(TEST_TABLE); + verify(fetcher, times(2)).fetch(TEST_TABLE); + } + + @Test + void invalidateDoesNotAffectOtherTables() { + String table1 = "table1"; + String table2 = "table2"; + Map schema1 = createTestSchema(); + Map schema2 = new HashMap<>(); + + when(fetcher.fetch(table1)).thenReturn(schema1); + when(fetcher.fetch(table2)).thenReturn(schema2); + + // Load both tables into cache + registry.getSchema(table1); + registry.getSchema(table2); + + // Invalidate only table1 + registry.invalidate(table1); + + // table1 should reload, table2 should use cache + registry.getSchema(table1); + registry.getSchema(table2); + + verify(fetcher, times(2)).fetch(table1); + verify(fetcher, times(1)).fetch(table2); + } + + @Test + void getColumnOrRefreshReturnsColumnIfExists() { + Map schema = createTestSchema(); + when(fetcher.fetch(TEST_TABLE)).thenReturn(schema); + + PostgresColumnMetadata result = registry.getColumnOrRefresh(TEST_TABLE, COL_ID); + + assertNotNull(result); + assertEquals(COL_ID, result.getName()); + assertEquals(DataType.INTEGER, result.getCanonicalType()); + // Should only call fetcher once (initial load) + verify(fetcher, times(1)).fetch(TEST_TABLE); + } + + @Test + void getColumnOrRefreshRefreshesSchemaIfColumnMissingAndCooldownExpired() { + // Initial schema without the "new_col" + Map initialSchema = createTestSchema(); + + // Updated schema with the "new_col" + Map updatedSchema = new HashMap<>(initialSchema); + updatedSchema.put( + "new_col", + PostgresColumnMetadata.builder() + .colName("new_col") + .canonicalType(DataType.STRING) + .postgresType(PostgresDataType.TEXT) + .pgType("text") + .nullable(true) + .build()); + + when(fetcher.fetch(TEST_TABLE)).thenReturn(initialSchema).thenReturn(updatedSchema); + + // First call loads the schema + registry.getSchema(TEST_TABLE); + verify(fetcher, times(1)).fetch(TEST_TABLE); + + // Advance time past cooldown period + mutableClock.advance(REFRESH_COOLDOWN.plusMinutes(1)); + + // Now try to get missing column - should trigger refresh + PostgresColumnMetadata result = registry.getColumnOrRefresh(TEST_TABLE, "new_col"); + + assertNotNull(result); + assertEquals("new_col", result.getName()); + // Should call fetcher twice (initial load + refresh after cooldown) + verify(fetcher, times(2)).fetch(TEST_TABLE); + } + + @Test + void getColumnOrRefreshDoesNotRefreshIfWithinCooldownPeriod() { + Map schema = createTestSchema(); + when(fetcher.fetch(TEST_TABLE)).thenReturn(schema); + + // First call loads the schema + registry.getSchema(TEST_TABLE); + verify(fetcher, times(1)).fetch(TEST_TABLE); + + // Try to get a missing column - should NOT refresh because we're within cooldown + PostgresColumnMetadata result = registry.getColumnOrRefresh(TEST_TABLE, "nonexistent_col"); + + assertNull(result); + // Should only call fetcher once (initial load, no refresh due to cooldown) + verify(fetcher, times(1)).fetch(TEST_TABLE); + } + + @Test + void getColumnOrRefreshRefreshesAfterCooldownExpires() { + Map schema = createTestSchema(); + when(fetcher.fetch(TEST_TABLE)).thenReturn(schema); + + // First call loads the schema + registry.getSchema(TEST_TABLE); + verify(fetcher, times(1)).fetch(TEST_TABLE); + + // Try immediately - should NOT refresh (within cooldown) + registry.getColumnOrRefresh(TEST_TABLE, "nonexistent_col"); + verify(fetcher, times(1)).fetch(TEST_TABLE); + + // Advance time past cooldown + mutableClock.advance(REFRESH_COOLDOWN.plusSeconds(1)); + + // Try again - should refresh now + registry.getColumnOrRefresh(TEST_TABLE, "nonexistent_col"); + verify(fetcher, times(2)).fetch(TEST_TABLE); + } + + @Test + void getColumnOrRefreshReturnsNullIfColumnStillMissingAfterRefresh() { + Map schema = createTestSchema(); + when(fetcher.fetch(TEST_TABLE)).thenReturn(schema); + + // First call loads the schema + registry.getSchema(TEST_TABLE); + + // Advance past cooldown + mutableClock.advance(REFRESH_COOLDOWN.plusMinutes(1)); + + // Try to get a column that doesn't exist even after refresh + PostgresColumnMetadata result = registry.getColumnOrRefresh(TEST_TABLE, "nonexistent_col"); + + assertNull(result); + // Should call fetcher twice (initial load + refresh attempt after cooldown) + verify(fetcher, times(2)).fetch(TEST_TABLE); + } + + @Test + void getColumnOrRefreshUsesExistingCacheBeforeRefresh() { + Map schema = createTestSchema(); + when(fetcher.fetch(TEST_TABLE)).thenReturn(schema); + + // Pre-populate cache + registry.getSchema(TEST_TABLE); + + // Get existing column - should not trigger refresh + PostgresColumnMetadata result = registry.getColumnOrRefresh(TEST_TABLE, COL_NAME); + + assertNotNull(result); + assertEquals(COL_NAME, result.getName()); + // Should only call fetcher once (initial getSchema) + verify(fetcher, times(1)).fetch(TEST_TABLE); + } + + @Test + void getSchemaThrowsRuntimeExceptionWhenFetcherFails() { + when(fetcher.fetch(TEST_TABLE)).thenThrow(new RuntimeException("Database error")); + + RuntimeException exception = + assertThrows(RuntimeException.class, () -> registry.getSchema(TEST_TABLE)); + + assertEquals("Database error", exception.getCause().getMessage()); + } + + private Map createTestSchema() { + Map schema = new HashMap<>(); + schema.put( + COL_ID, + PostgresColumnMetadata.builder() + .colName(COL_ID) + .canonicalType(DataType.INTEGER) + .postgresType(PostgresDataType.INTEGER) + .pgType("int4") + .nullable(false) + .build()); + schema.put( + COL_NAME, + PostgresColumnMetadata.builder() + .colName(COL_NAME) + .canonicalType(DataType.STRING) + .postgresType(PostgresDataType.TEXT) + .pgType("text") + .nullable(true) + .build()); + schema.put( + COL_PRICE, + PostgresColumnMetadata.builder() + .colName(COL_PRICE) + .canonicalType(DataType.DOUBLE) + .postgresType(PostgresDataType.DOUBLE_PRECISION) + .pgType("float8") + .nullable(true) + .build()); + return schema; + } + + /** A mutable clock for testing time-dependent behavior. */ + private static class MutableClock extends Clock { + private Instant currentInstant; + private final ZoneId zone; + + MutableClock(Instant initialInstant) { + this.currentInstant = initialInstant; + this.zone = ZoneId.of("UTC"); + } + + void advance(Duration duration) { + currentInstant = currentInstant.plus(duration); + } + + @Override + public ZoneId getZone() { + return zone; + } + + @Override + public Clock withZone(ZoneId zone) { + return this; + } + + @Override + public Instant instant() { + return currentInstant; + } + } +} From 1727dd006441af7cbec718460b3c02e72ee776e9 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 29 Dec 2025 15:04:53 +0530 Subject: [PATCH 04/14] Spotless --- .../documentstore/commons/SchemaRegistry.java | 3 +- .../postgres/FlatPostgresCollection.java | 3 +- .../postgres/PostgresMetadataFetcher.java | 1 - .../postgres/PostgresSchemaRegistry.java | 7 ++-- .../postgres/PostgresSchemaRegistryTest.java | 35 ++++++++++--------- 5 files changed, 26 insertions(+), 23 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java index 544743d5..68782184 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java @@ -1,6 +1,7 @@ package org.hypertrace.core.documentstore.commons; import java.util.Map; +import java.util.Optional; public interface SchemaRegistry { @@ -8,5 +9,5 @@ public interface SchemaRegistry { void invalidate(String tableName); - T getColumnOrRefresh(String tableName, String colName); + Optional getColumnOrRefresh(String tableName, String colName); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index fa1c1ea1..7798e016 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -114,8 +114,7 @@ public Document upsertAndReturn(Key key, Document document) throws IOException { while (fields.hasNext()) { Map.Entry field = fields.next(); String colName = field.getKey(); - PostgresColumnMetadata colMeta = schemaRegistry.getColumnOrRefresh(tableName, colName); - if (colMeta != null) { + if (schemaRegistry.getColumnOrRefresh(tableName, colName).isPresent()) { columns.add(colName); values.add(extractValue(field.getValue())); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java index 474df319..c32842bc 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java @@ -20,7 +20,6 @@ public class PostgresMetadataFetcher { private final PostgresDatastore datastore; - // Hardcoded SQL for high-performance schema discovery private static final String DISCOVERY_SQL = "SELECT column_name, udt_name, is_nullable " + "FROM information_schema.columns " diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java index f3c06673..ff7d6164 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java @@ -7,6 +7,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -158,10 +159,10 @@ public void invalidate(String tableName) { * * @param tableName the name of the table * @param colName the name of the column - * @return the column metadata, or {@code null} if the column does not exist + * @return an Optional containing the column metadata, or empty if the column does not exist */ @Override - public PostgresColumnMetadata getColumnOrRefresh(String tableName, String colName) { + public Optional getColumnOrRefresh(String tableName, String colName) { Map schema = getSchema(tableName); if (!schema.containsKey(colName) && canRefresh(tableName)) { @@ -169,7 +170,7 @@ public PostgresColumnMetadata getColumnOrRefresh(String tableName, String colNam schema = getSchema(tableName); } - return schema.get(colName); + return Optional.ofNullable(schema.get(colName)); } /** diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryTest.java index 6eb85b01..fa8e4a03 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryTest.java @@ -1,9 +1,9 @@ package org.hypertrace.core.documentstore.postgres; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -14,6 +14,7 @@ import java.time.ZoneId; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import org.hypertrace.core.documentstore.expression.impl.DataType; import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; @@ -146,11 +147,11 @@ void getColumnOrRefreshReturnsColumnIfExists() { Map schema = createTestSchema(); when(fetcher.fetch(TEST_TABLE)).thenReturn(schema); - PostgresColumnMetadata result = registry.getColumnOrRefresh(TEST_TABLE, COL_ID); + Optional result = registry.getColumnOrRefresh(TEST_TABLE, COL_ID); - assertNotNull(result); - assertEquals(COL_ID, result.getName()); - assertEquals(DataType.INTEGER, result.getCanonicalType()); + assertTrue(result.isPresent()); + assertEquals(COL_ID, result.get().getName()); + assertEquals(DataType.INTEGER, result.get().getCanonicalType()); // Should only call fetcher once (initial load) verify(fetcher, times(1)).fetch(TEST_TABLE); } @@ -182,10 +183,10 @@ void getColumnOrRefreshRefreshesSchemaIfColumnMissingAndCooldownExpired() { mutableClock.advance(REFRESH_COOLDOWN.plusMinutes(1)); // Now try to get missing column - should trigger refresh - PostgresColumnMetadata result = registry.getColumnOrRefresh(TEST_TABLE, "new_col"); + Optional result = registry.getColumnOrRefresh(TEST_TABLE, "new_col"); - assertNotNull(result); - assertEquals("new_col", result.getName()); + assertTrue(result.isPresent()); + assertEquals("new_col", result.get().getName()); // Should call fetcher twice (initial load + refresh after cooldown) verify(fetcher, times(2)).fetch(TEST_TABLE); } @@ -200,9 +201,10 @@ void getColumnOrRefreshDoesNotRefreshIfWithinCooldownPeriod() { verify(fetcher, times(1)).fetch(TEST_TABLE); // Try to get a missing column - should NOT refresh because we're within cooldown - PostgresColumnMetadata result = registry.getColumnOrRefresh(TEST_TABLE, "nonexistent_col"); + Optional result = + registry.getColumnOrRefresh(TEST_TABLE, "nonexistent_col"); - assertNull(result); + assertFalse(result.isPresent()); // Should only call fetcher once (initial load, no refresh due to cooldown) verify(fetcher, times(1)).fetch(TEST_TABLE); } @@ -240,9 +242,10 @@ void getColumnOrRefreshReturnsNullIfColumnStillMissingAfterRefresh() { mutableClock.advance(REFRESH_COOLDOWN.plusMinutes(1)); // Try to get a column that doesn't exist even after refresh - PostgresColumnMetadata result = registry.getColumnOrRefresh(TEST_TABLE, "nonexistent_col"); + Optional result = + registry.getColumnOrRefresh(TEST_TABLE, "nonexistent_col"); - assertNull(result); + assertFalse(result.isPresent()); // Should call fetcher twice (initial load + refresh attempt after cooldown) verify(fetcher, times(2)).fetch(TEST_TABLE); } @@ -256,10 +259,10 @@ void getColumnOrRefreshUsesExistingCacheBeforeRefresh() { registry.getSchema(TEST_TABLE); // Get existing column - should not trigger refresh - PostgresColumnMetadata result = registry.getColumnOrRefresh(TEST_TABLE, COL_NAME); + Optional result = registry.getColumnOrRefresh(TEST_TABLE, COL_NAME); - assertNotNull(result); - assertEquals(COL_NAME, result.getName()); + assertTrue(result.isPresent()); + assertEquals(COL_NAME, result.get().getName()); // Should only call fetcher once (initial getSchema) verify(fetcher, times(1)).fetch(TEST_TABLE); } From a62fbc218a6b3a68fd8dbd2d49e6a4b07d5ed39c Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 29 Dec 2025 15:06:01 +0530 Subject: [PATCH 05/14] Remove unused method in SchemaRegistry --- .../hypertrace/core/documentstore/commons/SchemaRegistry.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java index 68782184..ec3ea43a 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java @@ -7,7 +7,5 @@ public interface SchemaRegistry { Map getSchema(String tableName); - void invalidate(String tableName); - Optional getColumnOrRefresh(String tableName, String colName); } From 6b7595bafcb036e553d8ee1780108cfeb42150d0 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 29 Dec 2025 15:07:11 +0530 Subject: [PATCH 06/14] Remove unused method in ColumnMetadata --- .../hypertrace/core/documentstore/commons/ColumnMetadata.java | 2 -- .../hypertrace/core/documentstore/commons/SchemaRegistry.java | 2 ++ 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java index 2850485a..6fa89dc3 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java @@ -7,7 +7,5 @@ public interface ColumnMetadata { DataType getCanonicalType(); - String getInternalType(); - boolean isNullable(); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java index ec3ea43a..68782184 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java @@ -7,5 +7,7 @@ public interface SchemaRegistry { Map getSchema(String tableName); + void invalidate(String tableName); + Optional getColumnOrRefresh(String tableName, String colName); } From 7b4ef2a7e2eeef4d8d184c935dc4182d0ecd883d Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 29 Dec 2025 15:07:26 +0530 Subject: [PATCH 07/14] WIP --- .../documentstore/postgres/model/PostgresColumnMetadata.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java index 7f2a18fd..508b3dc6 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java @@ -27,11 +27,6 @@ public DataType getCanonicalType() { return canonicalType; } - @Override - public String getInternalType() { - return pgType; - } - @Override public boolean isNullable() { return nullable; From 598cb25e9e65a02e2e18aa332834a711ce622ff1 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 29 Dec 2025 15:28:47 +0530 Subject: [PATCH 08/14] WIP --- .../postgres/FlatPostgresCollection.java | 110 +----------------- .../postgres/PostgresDatastore.java | 2 +- .../postgres/PostgresMetadataFetcher.java | 4 +- 3 files changed, 4 insertions(+), 112 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 7798e016..6dd89e1b 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -1,18 +1,10 @@ package org.hypertrace.core.documentstore.postgres; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; import org.hypertrace.core.documentstore.BulkArrayValueUpdateRequest; import org.hypertrace.core.documentstore.BulkDeleteResult; import org.hypertrace.core.documentstore.BulkUpdateRequest; @@ -26,7 +18,6 @@ import org.hypertrace.core.documentstore.model.options.QueryOptions; import org.hypertrace.core.documentstore.model.options.UpdateOptions; import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate; -import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser; import org.hypertrace.core.documentstore.postgres.query.v1.transformer.FlatPostgresFieldTransformer; import org.hypertrace.core.documentstore.query.Query; @@ -43,7 +34,6 @@ public class FlatPostgresCollection extends PostgresCollection { private static final Logger LOGGER = LoggerFactory.getLogger(FlatPostgresCollection.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String WRITE_NOT_SUPPORTED = "Write operations are not supported for flat collections yet!"; @@ -97,105 +87,7 @@ public boolean upsert(Key key, Document document) throws IOException { @Override public Document upsertAndReturn(Key key, Document document) throws IOException { - String tableName = tableIdentifier.getTableName(); - Map schema = schemaRegistry.getSchema(tableName); - - if (schema.isEmpty()) { - throw new IOException("No schema found for table: " + tableName); - } - - try { - JsonNode docJson = OBJECT_MAPPER.readTree(document.toJson()); - List columns = new ArrayList<>(); - List values = new ArrayList<>(); - - // Extract fields from document that exist in schema - Iterator> fields = docJson.fields(); - while (fields.hasNext()) { - Map.Entry field = fields.next(); - String colName = field.getKey(); - if (schemaRegistry.getColumnOrRefresh(tableName, colName).isPresent()) { - columns.add(colName); - values.add(extractValue(field.getValue())); - } - } - - if (columns.isEmpty()) { - throw new IOException("No matching columns found in schema for document"); - } - - // Build UPSERT SQL: INSERT ... ON CONFLICT DO UPDATE - String columnList = String.join(", ", columns); - String placeholders = columns.stream().map(c -> "?").collect(Collectors.joining(", ")); - String updateSet = - columns.stream().map(c -> c + " = EXCLUDED." + c).collect(Collectors.joining(", ")); - - // Determine primary key column (assume first column or 'id') - String pkColumn = schema.containsKey("id") ? "id" : columns.get(0); - - String sql = - String.format( - "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET %s RETURNING *", - tableIdentifier, columnList, placeholders, pkColumn, updateSet); - - try (PreparedStatement ps = client.getConnection().prepareStatement(sql)) { - for (int i = 0; i < values.size(); i++) { - ps.setObject(i + 1, values.get(i)); - } - - try (ResultSet rs = ps.executeQuery()) { - if (rs.next()) { - return resultSetToDocument(rs, columns); - } - } - } - return document; - } catch (SQLException e) { - LOGGER.error("SQLException in upsertAndReturn. key: {} document: {}", key, document, e); - throw new IOException(e); - } - } - - private Object extractValue(JsonNode node) { - if (node.isNull()) { - return null; - } else if (node.isBoolean()) { - return node.booleanValue(); - } else if (node.isInt()) { - return node.intValue(); - } else if (node.isLong()) { - return node.longValue(); - } else if (node.isDouble() || node.isFloat()) { - return node.doubleValue(); - } else if (node.isTextual()) { - return node.textValue(); - } else { - return node.toString(); - } - } - - private Document resultSetToDocument(ResultSet rs, List columns) - throws SQLException, IOException { - StringBuilder json = new StringBuilder("{"); - for (int i = 0; i < columns.size(); i++) { - if (i > 0) { - json.append(","); - } - String col = columns.get(i); - Object value = rs.getObject(col); - json.append("\"").append(col).append("\":"); - if (value == null) { - json.append("null"); - } else if (value instanceof String) { - json.append("\"").append(value).append("\""); - } else if (value instanceof Boolean) { - json.append(value); - } else { - json.append(value); - } - } - json.append("}"); - return new org.hypertrace.core.documentstore.JSONDocument(json.toString()); + throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED); } @Override diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java index 9a7fa206..3b1fb345 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java @@ -60,7 +60,7 @@ public PostgresDatastore(@NonNull final DatastoreConfig datastoreConfig) { client = new PostgresClient(postgresConnectionConfig); database = connectionConfig.database(); docStoreMetricProvider = new PostgresDocStoreMetricProvider(this, postgresConnectionConfig); - schemaRegistry = new PostgresSchemaRegistry(new PostgresMetadataFetcher(this)); + schemaRegistry = new PostgresSchemaRegistry(new PostgresMetadataFetcher(client)); } catch (final IllegalArgumentException e) { throw new IllegalArgumentException( String.format("Unable to instantiate PostgresClient with config:%s", connectionConfig), diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java index c32842bc..e61b26b7 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java @@ -18,7 +18,7 @@ @AllArgsConstructor public class PostgresMetadataFetcher { - private final PostgresDatastore datastore; + private final PostgresClient client; private static final String DISCOVERY_SQL = "SELECT column_name, udt_name, is_nullable " @@ -28,7 +28,7 @@ public class PostgresMetadataFetcher { public Map fetch(String tableName) { Map metadataMap = new HashMap<>(); - try (Connection conn = datastore.getPostgresClient(); + try (Connection conn = client.getPooledConnection(); PreparedStatement ps = conn.prepareStatement(DISCOVERY_SQL)) { ps.setString(1, tableName); From 9c173b96e5a25c898a9b226070970df4867a5bba Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 29 Dec 2025 15:45:23 +0530 Subject: [PATCH 09/14] Configure cache expiry and cooldown --- .../TypesafeDatastoreConfigAdapter.java | 2 ++ .../model/config/ConnectionConfig.java | 2 ++ .../postgres/PostgresConnectionConfig.java | 21 +++++++++++++++++++ .../postgres/PostgresDatastore.java | 6 +++++- .../postgres/PostgresSchemaRegistry.java | 17 --------------- 5 files changed, 30 insertions(+), 18 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java b/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java index c1fcacde..dda12fff 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java @@ -93,6 +93,8 @@ public DatastoreConfig convert(final Config config) { connectionConfig.credentials(), connectionConfig.applicationName(), connectionConfig.connectionPoolConfig(), + connectionConfig.schemaCacheExpiry(), + connectionConfig.schemaRefreshCooldown(), connectionConfig.customParameters()) { @Override public String toConnectionString() { diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java index 18d9975c..2960a2d2 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java @@ -126,6 +126,8 @@ public ConnectionConfig build() { credentials, applicationName, connectionPoolConfig, + null, + null, customParameters); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java index 925e4b1e..2d1ab176 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java @@ -3,6 +3,7 @@ import static java.util.Collections.unmodifiableList; import static java.util.function.Predicate.not; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -34,8 +35,13 @@ public class PostgresConnectionConfig extends ConnectionConfig { .password(PostgresDefaults.DEFAULT_PASSWORD) .build(); + private static final Duration DEFAULT_SCHEMA_CACHE_EXPIRY = Duration.ofHours(24); + private static final Duration DEFAULT_SCHEMA_REFRESH_COOLDOWN = Duration.ofMinutes(15); + @NonNull String applicationName; @NonNull ConnectionPoolConfig connectionPoolConfig; + @NonNull Duration schemaCacheExpiry; + @NonNull Duration schemaRefreshCooldown; public static ConnectionConfigBuilder builder() { return ConnectionConfig.builder().type(DatabaseType.POSTGRES); @@ -47,6 +53,8 @@ public PostgresConnectionConfig( @Nullable final ConnectionCredentials credentials, @NonNull final String applicationName, @Nullable final ConnectionPoolConfig connectionPoolConfig, + @Nullable final Duration schemaCacheExpiry, + @Nullable final Duration schemaRefreshCooldown, @NonNull final Map customParameters) { super( ensureSingleEndpoint(endpoints), @@ -55,6 +63,8 @@ public PostgresConnectionConfig( customParameters); this.applicationName = applicationName; this.connectionPoolConfig = getConnectionPoolConfigOrDefault(connectionPoolConfig); + this.schemaCacheExpiry = getSchemaCacheExpiryOrDefault(schemaCacheExpiry); + this.schemaRefreshCooldown = getSchemaRefreshCooldownOrDefault(schemaRefreshCooldown); } public String toConnectionString() { @@ -127,4 +137,15 @@ private ConnectionPoolConfig getConnectionPoolConfigOrDefault( @Nullable final ConnectionPoolConfig connectionPoolConfig) { return Optional.ofNullable(connectionPoolConfig).orElse(ConnectionPoolConfig.builder().build()); } + + @NonNull + private Duration getSchemaCacheExpiryOrDefault(@Nullable final Duration schemaCacheExpiry) { + return Optional.ofNullable(schemaCacheExpiry).orElse(DEFAULT_SCHEMA_CACHE_EXPIRY); + } + + @NonNull + private Duration getSchemaRefreshCooldownOrDefault( + @Nullable final Duration schemaRefreshCooldown) { + return Optional.ofNullable(schemaRefreshCooldown).orElse(DEFAULT_SCHEMA_REFRESH_COOLDOWN); + } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java index 3b1fb345..4e9fb3e2 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java @@ -60,7 +60,11 @@ public PostgresDatastore(@NonNull final DatastoreConfig datastoreConfig) { client = new PostgresClient(postgresConnectionConfig); database = connectionConfig.database(); docStoreMetricProvider = new PostgresDocStoreMetricProvider(this, postgresConnectionConfig); - schemaRegistry = new PostgresSchemaRegistry(new PostgresMetadataFetcher(client)); + schemaRegistry = + new PostgresSchemaRegistry( + new PostgresMetadataFetcher(client), + postgresConnectionConfig.schemaCacheExpiry(), + postgresConnectionConfig.schemaRefreshCooldown()); } catch (final IllegalArgumentException e) { throw new IllegalArgumentException( String.format("Unable to instantiate PostgresClient with config:%s", connectionConfig), diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java index ff7d6164..49f3a0c2 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java @@ -52,28 +52,11 @@ */ public class PostgresSchemaRegistry implements SchemaRegistry { - /** Default cache expiry time: 24 hours. */ - private static final Duration DEFAULT_CACHE_EXPIRY = Duration.ofHours(24); - - /** Default cooldown period between refresh attempts: 15 minutes. */ - private static final Duration DEFAULT_REFRESH_COOLDOWN = Duration.ofMinutes(15); - private final LoadingCache> cache; private final Map lastRefreshTimes; private final Duration refreshCooldown; private final Clock clock; - /** - * Creates a new schema registry with default settings. - * - *

Uses default cache expiry of 24 hours and refresh cooldown of 15 minutes. - * - * @param fetcher the metadata fetcher to use for loading schema information - */ - public PostgresSchemaRegistry(PostgresMetadataFetcher fetcher) { - this(fetcher, DEFAULT_CACHE_EXPIRY, DEFAULT_REFRESH_COOLDOWN, Clock.systemUTC()); - } - /** * Creates a new schema registry with custom cache settings. * From 7bf77c5cf7b39ac3cfe04086f57b53c63f618fcb Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 29 Dec 2025 15:55:12 +0530 Subject: [PATCH 10/14] Added PostgresMetadataFetcherTest --- .../expression/impl/DataType.java | 1 + .../postgres/PostgresMetadataFetcher.java | 4 + .../nonjson/field/PostgresDataType.java | 1 + .../postgres/PostgresMetadataFetcherTest.java | 310 ++++++++++++++++++ 4 files changed, 316 insertions(+) create mode 100644 document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcherTest.java diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/expression/impl/DataType.java b/document-store/src/main/java/org/hypertrace/core/documentstore/expression/impl/DataType.java index 9eec68b9..44ed4d2c 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/expression/impl/DataType.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/expression/impl/DataType.java @@ -22,6 +22,7 @@ public enum DataType { FLOAT, DOUBLE, BOOLEAN, + JSON, // timestamp with time-zone information. For example: 2004-10-19 10:23:54+02. // For more info, see: https://www.postgresql.org/docs/current/datatype-datetime.html TIMESTAMPTZ, diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java index e61b26b7..2d2be0ec 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java @@ -81,6 +81,8 @@ private DataType mapToCanonicalType(String udtName) { case "bpchar": case "uuid": return DataType.STRING; + case "jsonb": + return DataType.JSON; default: return DataType.UNSPECIFIED; } @@ -114,6 +116,8 @@ private PostgresDataType mapToPostgresType(String udtName) { case "bpchar": case "uuid": return PostgresDataType.TEXT; + case "jsonb": + return PostgresDataType.JSONB; default: return PostgresDataType.UNKNOWN; } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/nonjson/field/PostgresDataType.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/nonjson/field/PostgresDataType.java index 89626e7c..c920473f 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/nonjson/field/PostgresDataType.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/nonjson/field/PostgresDataType.java @@ -15,6 +15,7 @@ public enum PostgresDataType { REAL("float4"), DOUBLE_PRECISION("float8"), BOOLEAN("bool"), + JSONB("jsonb"), TIMESTAMPTZ("timestamptz"), DATE("date"), UNKNOWN(null); diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcherTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcherTest.java new file mode 100644 index 00000000..6fa32d91 --- /dev/null +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcherTest.java @@ -0,0 +1,310 @@ +package org.hypertrace.core.documentstore.postgres; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import org.hypertrace.core.documentstore.expression.impl.DataType; +import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class PostgresMetadataFetcherTest { + + private static final String TEST_TABLE = "test_table"; + + @Mock private PostgresClient client; + @Mock private Connection connection; + @Mock private PreparedStatement preparedStatement; + @Mock private ResultSet resultSet; + + private PostgresMetadataFetcher fetcher; + + @BeforeEach + void setUp() throws SQLException { + when(client.getPooledConnection()).thenReturn(connection); + when(connection.prepareStatement(anyString())).thenReturn(preparedStatement); + when(preparedStatement.executeQuery()).thenReturn(resultSet); + fetcher = new PostgresMetadataFetcher(client); + } + + @Test + void fetchReturnsEmptyMapForTableWithNoColumns() throws SQLException { + when(resultSet.next()).thenReturn(false); + + Map result = fetcher.fetch(TEST_TABLE); + + assertTrue(result.isEmpty()); + verify(preparedStatement).setString(1, TEST_TABLE); + } + + @Test + void fetchReturnsSingleColumn() throws SQLException { + when(resultSet.next()).thenReturn(true, false); + when(resultSet.getString("column_name")).thenReturn("id"); + when(resultSet.getString("udt_name")).thenReturn("int4"); + when(resultSet.getString("is_nullable")).thenReturn("NO"); + + Map result = fetcher.fetch(TEST_TABLE); + + assertEquals(1, result.size()); + PostgresColumnMetadata metadata = result.get("id"); + assertNotNull(metadata); + assertEquals("id", metadata.getName()); + assertEquals(DataType.INTEGER, metadata.getCanonicalType()); + assertEquals(PostgresDataType.INTEGER, metadata.getPostgresType()); + assertFalse(metadata.isNullable()); + } + + @Test + void fetchReturnsMultipleColumns() throws SQLException { + when(resultSet.next()).thenReturn(true, true, true, false); + when(resultSet.getString("column_name")).thenReturn("id", "name", "price"); + when(resultSet.getString("udt_name")).thenReturn("int8", "text", "float8"); + when(resultSet.getString("is_nullable")).thenReturn("NO", "YES", "YES"); + + Map result = fetcher.fetch(TEST_TABLE); + + assertEquals(3, result.size()); + + // Verify id column + PostgresColumnMetadata idMeta = result.get("id"); + assertEquals(DataType.LONG, idMeta.getCanonicalType()); + assertEquals(PostgresDataType.BIGINT, idMeta.getPostgresType()); + assertFalse(idMeta.isNullable()); + + // Verify name column + PostgresColumnMetadata nameMeta = result.get("name"); + assertEquals(DataType.STRING, nameMeta.getCanonicalType()); + assertEquals(PostgresDataType.TEXT, nameMeta.getPostgresType()); + assertTrue(nameMeta.isNullable()); + + // Verify price column + PostgresColumnMetadata priceMeta = result.get("price"); + assertEquals(DataType.DOUBLE, priceMeta.getCanonicalType()); + assertEquals(PostgresDataType.DOUBLE_PRECISION, priceMeta.getPostgresType()); + assertTrue(priceMeta.isNullable()); + } + + @Test + void fetchMapsInt4ToInteger() throws SQLException { + setupSingleColumnResult("col", "int4", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.INTEGER, meta.getCanonicalType()); + assertEquals(PostgresDataType.INTEGER, meta.getPostgresType()); + } + + @Test + void fetchMapsInt2ToInteger() throws SQLException { + setupSingleColumnResult("col", "int2", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.INTEGER, meta.getCanonicalType()); + assertEquals(PostgresDataType.INTEGER, meta.getPostgresType()); + } + + @Test + void fetchMapsInt8ToLong() throws SQLException { + setupSingleColumnResult("col", "int8", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.LONG, meta.getCanonicalType()); + assertEquals(PostgresDataType.BIGINT, meta.getPostgresType()); + } + + @Test + void fetchMapsFloat4ToFloat() throws SQLException { + setupSingleColumnResult("col", "float4", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.FLOAT, meta.getCanonicalType()); + assertEquals(PostgresDataType.REAL, meta.getPostgresType()); + } + + @Test + void fetchMapsFloat8ToDouble() throws SQLException { + setupSingleColumnResult("col", "float8", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.DOUBLE, meta.getCanonicalType()); + assertEquals(PostgresDataType.DOUBLE_PRECISION, meta.getPostgresType()); + } + + @Test + void fetchMapsNumericToDouble() throws SQLException { + setupSingleColumnResult("col", "numeric", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.DOUBLE, meta.getCanonicalType()); + assertEquals(PostgresDataType.DOUBLE_PRECISION, meta.getPostgresType()); + } + + @Test + void fetchMapsBoolToBoolean() throws SQLException { + setupSingleColumnResult("col", "bool", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.BOOLEAN, meta.getCanonicalType()); + assertEquals(PostgresDataType.BOOLEAN, meta.getPostgresType()); + } + + @Test + void fetchMapsTextToString() throws SQLException { + setupSingleColumnResult("col", "text", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.STRING, meta.getCanonicalType()); + assertEquals(PostgresDataType.TEXT, meta.getPostgresType()); + } + + @Test + void fetchMapsVarcharToString() throws SQLException { + setupSingleColumnResult("col", "varchar", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.STRING, meta.getCanonicalType()); + assertEquals(PostgresDataType.TEXT, meta.getPostgresType()); + } + + @Test + void fetchMapsBpcharToString() throws SQLException { + setupSingleColumnResult("col", "bpchar", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.STRING, meta.getCanonicalType()); + assertEquals(PostgresDataType.TEXT, meta.getPostgresType()); + } + + @Test + void fetchMapsUuidToString() throws SQLException { + setupSingleColumnResult("col", "uuid", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.STRING, meta.getCanonicalType()); + assertEquals(PostgresDataType.TEXT, meta.getPostgresType()); + } + + @Test + void fetchMapsJsonbToJson() throws SQLException { + setupSingleColumnResult("col", "jsonb", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.JSON, meta.getCanonicalType()); + assertEquals(PostgresDataType.JSONB, meta.getPostgresType()); + } + + @Test + void fetchMapsTimestamptzToTimestamptz() throws SQLException { + setupSingleColumnResult("col", "timestamptz", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.TIMESTAMPTZ, meta.getCanonicalType()); + assertEquals(PostgresDataType.TIMESTAMPTZ, meta.getPostgresType()); + } + + @Test + void fetchMapsDateToDate() throws SQLException { + setupSingleColumnResult("col", "date", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.DATE, meta.getCanonicalType()); + assertEquals(PostgresDataType.DATE, meta.getPostgresType()); + } + + @Test + void fetchMapsUnknownTypeToUnspecified() throws SQLException { + setupSingleColumnResult("col", "unknown_type", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.UNSPECIFIED, meta.getCanonicalType()); + assertEquals(PostgresDataType.UNKNOWN, meta.getPostgresType()); + } + + @Test + void fetchMapsNullUdtNameToUnspecified() throws SQLException { + setupSingleColumnResult("col", null, "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.UNSPECIFIED, meta.getCanonicalType()); + assertEquals(PostgresDataType.UNKNOWN, meta.getPostgresType()); + } + + @Test + void fetchHandlesNullableColumn() throws SQLException { + setupSingleColumnResult("col", "text", "YES"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertTrue(meta.isNullable()); + } + + @Test + void fetchHandlesNonNullableColumn() throws SQLException { + setupSingleColumnResult("col", "text", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertFalse(meta.isNullable()); + } + + @Test + void fetchHandlesCaseInsensitiveUdtName() throws SQLException { + setupSingleColumnResult("col", "INT4", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.INTEGER, meta.getCanonicalType()); + assertEquals(PostgresDataType.INTEGER, meta.getPostgresType()); + } + + @Test + void fetchThrowsRuntimeExceptionOnSqlException() throws SQLException { + when(preparedStatement.executeQuery()).thenThrow(new SQLException("Connection failed")); + + RuntimeException exception = + assertThrows(RuntimeException.class, () -> fetcher.fetch(TEST_TABLE)); + + assertTrue(exception.getMessage().contains(TEST_TABLE)); + assertTrue(exception.getCause() instanceof SQLException); + } + + private void setupSingleColumnResult(String colName, String udtName, String isNullable) + throws SQLException { + when(resultSet.next()).thenReturn(true, false); + when(resultSet.getString("column_name")).thenReturn(colName); + when(resultSet.getString("udt_name")).thenReturn(udtName); + when(resultSet.getString("is_nullable")).thenReturn(isNullable); + } +} From 6d03cd53c77c2c598da9d05e0493b7fb8258002f Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 29 Dec 2025 16:04:20 +0530 Subject: [PATCH 11/14] WIP --- .../postgres/PostgresSchemaRegistry.java | 23 +------- .../postgres/PostgresSchemaRegistryTest.java | 57 ++++--------------- 2 files changed, 13 insertions(+), 67 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java index 49f3a0c2..c4a0ae98 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java @@ -3,7 +3,6 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.Map; @@ -55,7 +54,6 @@ public class PostgresSchemaRegistry implements SchemaRegistry> cache; private final Map lastRefreshTimes; private final Duration refreshCooldown; - private final Clock clock; /** * Creates a new schema registry with custom cache settings. @@ -66,24 +64,7 @@ public class PostgresSchemaRegistry implements SchemaRegistry(); this.cache = CacheBuilder.newBuilder() @@ -92,7 +73,7 @@ public PostgresSchemaRegistry( new CacheLoader<>() { @Override public Map load(String tableName) { - lastRefreshTimes.put(tableName, clock.instant()); + lastRefreshTimes.put(tableName, Instant.now()); return fetcher.fetch(tableName); } }); @@ -167,6 +148,6 @@ private boolean canRefresh(String tableName) { if (lastRefresh == null) { return true; } - return Duration.between(lastRefresh, clock.instant()).compareTo(refreshCooldown) >= 0; + return Duration.between(lastRefresh, Instant.now()).compareTo(refreshCooldown) >= 0; } } diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryTest.java index fa8e4a03..7fa22d5f 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryTest.java @@ -8,10 +8,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.time.Clock; import java.time.Duration; -import java.time.Instant; -import java.time.ZoneId; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -32,17 +29,15 @@ class PostgresSchemaRegistryTest { private static final String COL_NAME = "name"; private static final String COL_PRICE = "price"; private static final Duration CACHE_EXPIRY = Duration.ofHours(24); - private static final Duration REFRESH_COOLDOWN = Duration.ofMinutes(15); + private static final Duration REFRESH_COOLDOWN = Duration.ofMillis(50); @Mock private PostgresMetadataFetcher fetcher; private PostgresSchemaRegistry registry; - private MutableClock mutableClock; @BeforeEach void setUp() { - mutableClock = new MutableClock(Instant.parse("2024-01-01T00:00:00Z")); - registry = new PostgresSchemaRegistry(fetcher, CACHE_EXPIRY, REFRESH_COOLDOWN, mutableClock); + registry = new PostgresSchemaRegistry(fetcher, CACHE_EXPIRY, REFRESH_COOLDOWN); } @Test @@ -157,7 +152,7 @@ void getColumnOrRefreshReturnsColumnIfExists() { } @Test - void getColumnOrRefreshRefreshesSchemaIfColumnMissingAndCooldownExpired() { + void getColumnOrRefreshRefreshesSchemaIfColumnMissingAndCooldownExpired() throws Exception { // Initial schema without the "new_col" Map initialSchema = createTestSchema(); @@ -179,8 +174,8 @@ void getColumnOrRefreshRefreshesSchemaIfColumnMissingAndCooldownExpired() { registry.getSchema(TEST_TABLE); verify(fetcher, times(1)).fetch(TEST_TABLE); - // Advance time past cooldown period - mutableClock.advance(REFRESH_COOLDOWN.plusMinutes(1)); + // Wait past cooldown period + Thread.sleep(REFRESH_COOLDOWN.toMillis() + 10); // Now try to get missing column - should trigger refresh Optional result = registry.getColumnOrRefresh(TEST_TABLE, "new_col"); @@ -210,7 +205,7 @@ void getColumnOrRefreshDoesNotRefreshIfWithinCooldownPeriod() { } @Test - void getColumnOrRefreshRefreshesAfterCooldownExpires() { + void getColumnOrRefreshRefreshesAfterCooldownExpires() throws Exception { Map schema = createTestSchema(); when(fetcher.fetch(TEST_TABLE)).thenReturn(schema); @@ -222,8 +217,8 @@ void getColumnOrRefreshRefreshesAfterCooldownExpires() { registry.getColumnOrRefresh(TEST_TABLE, "nonexistent_col"); verify(fetcher, times(1)).fetch(TEST_TABLE); - // Advance time past cooldown - mutableClock.advance(REFRESH_COOLDOWN.plusSeconds(1)); + // Wait past cooldown + Thread.sleep(REFRESH_COOLDOWN.toMillis() + 10); // Try again - should refresh now registry.getColumnOrRefresh(TEST_TABLE, "nonexistent_col"); @@ -231,15 +226,15 @@ void getColumnOrRefreshRefreshesAfterCooldownExpires() { } @Test - void getColumnOrRefreshReturnsNullIfColumnStillMissingAfterRefresh() { + void getColumnOrRefreshReturnsEmptyIfColumnStillMissingAfterRefresh() throws Exception { Map schema = createTestSchema(); when(fetcher.fetch(TEST_TABLE)).thenReturn(schema); // First call loads the schema registry.getSchema(TEST_TABLE); - // Advance past cooldown - mutableClock.advance(REFRESH_COOLDOWN.plusMinutes(1)); + // Wait past cooldown + Thread.sleep(REFRESH_COOLDOWN.toMillis() + 10); // Try to get a column that doesn't exist even after refresh Optional result = @@ -308,34 +303,4 @@ private Map createTestSchema() { .build()); return schema; } - - /** A mutable clock for testing time-dependent behavior. */ - private static class MutableClock extends Clock { - private Instant currentInstant; - private final ZoneId zone; - - MutableClock(Instant initialInstant) { - this.currentInstant = initialInstant; - this.zone = ZoneId.of("UTC"); - } - - void advance(Duration duration) { - currentInstant = currentInstant.plus(duration); - } - - @Override - public ZoneId getZone() { - return zone; - } - - @Override - public Clock withZone(ZoneId zone) { - return this; - } - - @Override - public Instant instant() { - return currentInstant; - } - } } From c3f5f7e9ba7ff93a51384d8d23b658dce7ec4c1c Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 29 Dec 2025 16:27:49 +0530 Subject: [PATCH 12/14] Added docs on thread safety --- .../documentstore/postgres/PostgresSchemaRegistry.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java index c4a0ae98..e8fc9b19 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java @@ -52,7 +52,7 @@ public class PostgresSchemaRegistry implements SchemaRegistry { private final LoadingCache> cache; - private final Map lastRefreshTimes; + private final ConcurrentHashMap lastRefreshTimes; private final Duration refreshCooldown; /** @@ -121,6 +121,11 @@ public void invalidate(String tableName) { * returned without hitting the database. * * + *

Note that this is a check-then-act sequence that should ideally be atomic. However, this + * method is deliberately not thread-safe since even in case of a data race, it will result in one + * extra call to the DB, which will not be allowed anyway due to the cooldown period having been + * reset by the previous call. This is likely to be more performant than contending for locks. + * * @param tableName the name of the table * @param colName the name of the column * @return an Optional containing the column metadata, or empty if the column does not exist From 827381f7a9111e666f0c4a7ff2ed465f5a997c73 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 29 Dec 2025 16:34:54 +0530 Subject: [PATCH 13/14] Added PostgresSchemaRegistryIntegrationTest.java --- ...PostgresSchemaRegistryIntegrationTest.java | 279 ++++++++++++++++++ 1 file changed, 279 insertions(+) create mode 100644 document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryIntegrationTest.java diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryIntegrationTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryIntegrationTest.java new file mode 100644 index 00000000..9286d7e5 --- /dev/null +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryIntegrationTest.java @@ -0,0 +1,279 @@ +package org.hypertrace.core.documentstore.postgres; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.hypertrace.core.documentstore.DatastoreProvider; +import org.hypertrace.core.documentstore.commons.SchemaRegistry; +import org.hypertrace.core.documentstore.expression.impl.DataType; +import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +class PostgresSchemaRegistryIntegrationTest { + + private static final String TABLE_NAME = "myTestFlat"; + + private static GenericContainer postgres; + private static PostgresDatastore datastore; + private static SchemaRegistry registry; + + @BeforeAll + static void init() throws Exception { + postgres = + new GenericContainer<>(DockerImageName.parse("postgres:13.1")) + .withEnv("POSTGRES_PASSWORD", "postgres") + .withEnv("POSTGRES_USER", "postgres") + .withExposedPorts(5432) + .waitingFor(Wait.forListeningPort()); + postgres.start(); + + String connectionUrl = + String.format("jdbc:postgresql://localhost:%s/", postgres.getMappedPort(5432)); + + Map postgresConfig = new HashMap<>(); + postgresConfig.put("url", connectionUrl); + postgresConfig.put("user", "postgres"); + postgresConfig.put("password", "postgres"); + Config config = ConfigFactory.parseMap(postgresConfig); + + datastore = (PostgresDatastore) DatastoreProvider.getDatastore("Postgres", config); + + createFlatTable(); + + registry = datastore.getSchemaRegistry(); + } + + private static void createFlatTable() throws Exception { + String createTableSQL = + String.format( + "CREATE TABLE IF NOT EXISTS \"%s\" (" + + "\"_id\" INTEGER PRIMARY KEY," + + "\"item\" TEXT," + + "\"price\" INTEGER," + + "\"quantity\" BIGINT," + + "\"rating\" REAL," + + "\"score\" DOUBLE PRECISION," + + "\"date\" TIMESTAMPTZ," + + "\"created_date\" DATE," + + "\"in_stock\" BOOLEAN," + + "\"tags\" TEXT[]," + + "\"props\" JSONB" + + ");", + TABLE_NAME); + + try (Connection connection = datastore.getPostgresClient(); + PreparedStatement statement = connection.prepareStatement(createTableSQL)) { + statement.execute(); + System.out.println("Created flat table: " + TABLE_NAME); + } + } + + @BeforeEach + void setUp() { + registry.invalidate(TABLE_NAME); + } + + @AfterAll + static void shutdown() { + if (postgres != null) { + postgres.stop(); + } + } + + @Test + void getSchemaReturnsAllColumns() { + Map schema = registry.getSchema(TABLE_NAME); + + assertNotNull(schema); + assertEquals(11, schema.size()); + assertTrue(schema.containsKey("_id")); + assertTrue(schema.containsKey("item")); + assertTrue(schema.containsKey("price")); + assertTrue(schema.containsKey("quantity")); + assertTrue(schema.containsKey("rating")); + assertTrue(schema.containsKey("score")); + assertTrue(schema.containsKey("date")); + assertTrue(schema.containsKey("created_date")); + assertTrue(schema.containsKey("in_stock")); + assertTrue(schema.containsKey("tags")); + assertTrue(schema.containsKey("props")); + } + + @Test + void getSchemaReturnsCorrectIntegerMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata idMeta = schema.get("_id"); + assertEquals("_id", idMeta.getName()); + assertEquals(DataType.INTEGER, idMeta.getCanonicalType()); + assertEquals(PostgresDataType.INTEGER, idMeta.getPostgresType()); + assertFalse(idMeta.isNullable()); + + PostgresColumnMetadata priceMeta = schema.get("price"); + assertEquals(DataType.INTEGER, priceMeta.getCanonicalType()); + assertEquals(PostgresDataType.INTEGER, priceMeta.getPostgresType()); + assertTrue(priceMeta.isNullable()); + } + + @Test + void getSchemaReturnsCorrectBigintMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata quantityMeta = schema.get("quantity"); + assertEquals(DataType.LONG, quantityMeta.getCanonicalType()); + assertEquals(PostgresDataType.BIGINT, quantityMeta.getPostgresType()); + } + + @Test + void getSchemaReturnsCorrectFloatMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata ratingMeta = schema.get("rating"); + assertEquals(DataType.FLOAT, ratingMeta.getCanonicalType()); + assertEquals(PostgresDataType.REAL, ratingMeta.getPostgresType()); + } + + @Test + void getSchemaReturnsCorrectDoubleMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata scoreMeta = schema.get("score"); + assertEquals(DataType.DOUBLE, scoreMeta.getCanonicalType()); + assertEquals(PostgresDataType.DOUBLE_PRECISION, scoreMeta.getPostgresType()); + } + + @Test + void getSchemaReturnsCorrectTextMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata itemMeta = schema.get("item"); + assertEquals(DataType.STRING, itemMeta.getCanonicalType()); + assertEquals(PostgresDataType.TEXT, itemMeta.getPostgresType()); + } + + @Test + void getSchemaReturnsCorrectBooleanMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata inStockMeta = schema.get("in_stock"); + assertEquals(DataType.BOOLEAN, inStockMeta.getCanonicalType()); + assertEquals(PostgresDataType.BOOLEAN, inStockMeta.getPostgresType()); + } + + @Test + void getSchemaReturnsCorrectTimestamptzMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata dateMeta = schema.get("date"); + assertEquals(DataType.TIMESTAMPTZ, dateMeta.getCanonicalType()); + assertEquals(PostgresDataType.TIMESTAMPTZ, dateMeta.getPostgresType()); + } + + @Test + void getSchemaReturnsCorrectDateMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata createdDateMeta = schema.get("created_date"); + assertEquals(DataType.DATE, createdDateMeta.getCanonicalType()); + assertEquals(PostgresDataType.DATE, createdDateMeta.getPostgresType()); + } + + @Test + void getSchemaReturnsCorrectJsonbMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata propsMeta = schema.get("props"); + assertEquals(DataType.JSON, propsMeta.getCanonicalType()); + assertEquals(PostgresDataType.JSONB, propsMeta.getPostgresType()); + } + + @Test + void getColumnOrRefreshReturnsExistingColumn() { + Optional result = registry.getColumnOrRefresh(TABLE_NAME, "item"); + + assertTrue(result.isPresent()); + assertEquals("item", result.get().getName()); + assertEquals(DataType.STRING, result.get().getCanonicalType()); + } + + @Test + void getColumnOrRefreshReturnsEmptyForNonExistentColumn() { + Optional result = + registry.getColumnOrRefresh(TABLE_NAME, "nonexistent_column"); + + assertFalse(result.isPresent()); + } + + @Test + void getColumnOrRefreshFindsNewlyAddedColumnAfterInvalidation() throws Exception { + // First, verify the new column doesn't exist + Optional before = registry.getColumnOrRefresh(TABLE_NAME, "new_column"); + assertFalse(before.isPresent()); + + // Add a new column to the table + try (Connection connection = datastore.getPostgresClient(); + PreparedStatement statement = + connection.prepareStatement( + String.format("ALTER TABLE \"%s\" ADD COLUMN \"new_column\" TEXT", TABLE_NAME))) { + statement.execute(); + } + + // Invalidate cache to force reload + registry.invalidate(TABLE_NAME); + + // Now the registry should find the new column after reload + Optional after = registry.getColumnOrRefresh(TABLE_NAME, "new_column"); + assertTrue(after.isPresent()); + assertEquals("new_column", after.get().getName()); + assertEquals(DataType.STRING, after.get().getCanonicalType()); + + // Cleanup: drop the column + try (Connection connection = datastore.getPostgresClient(); + PreparedStatement statement = + connection.prepareStatement( + String.format("ALTER TABLE \"%s\" DROP COLUMN \"new_column\"", TABLE_NAME))) { + statement.execute(); + } + } + + @Test + void cacheReturnsSameInstanceOnSubsequentCalls() { + Map schema1 = registry.getSchema(TABLE_NAME); + Map schema2 = registry.getSchema(TABLE_NAME); + + // Should be the same cached instance + assertTrue(schema1 == schema2); + } + + @Test + void invalidateCausesReload() { + Map schema1 = registry.getSchema(TABLE_NAME); + + registry.invalidate(TABLE_NAME); + + Map schema2 = registry.getSchema(TABLE_NAME); + + // Should be different instances after invalidation + assertFalse(schema1 == schema2); + // But same content + assertEquals(schema1.keySet(), schema2.keySet()); + } +} From 602037bfb07552a3bd17af7414606892f84bb032 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 29 Dec 2025 16:36:28 +0530 Subject: [PATCH 14/14] WIP --- .../postgres/PostgresSchemaRegistryIntegrationTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryIntegrationTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryIntegrationTest.java index 9286d7e5..2b3b30d4 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryIntegrationTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryIntegrationTest.java @@ -3,6 +3,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; import com.typesafe.config.Config; @@ -260,7 +262,7 @@ void cacheReturnsSameInstanceOnSubsequentCalls() { Map schema2 = registry.getSchema(TABLE_NAME); // Should be the same cached instance - assertTrue(schema1 == schema2); + assertSame(schema1, schema2); } @Test @@ -272,7 +274,7 @@ void invalidateCausesReload() { Map schema2 = registry.getSchema(TABLE_NAME); // Should be different instances after invalidation - assertFalse(schema1 == schema2); + assertNotSame(schema1, schema2); // But same content assertEquals(schema1.keySet(), schema2.keySet()); }