diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryIntegrationTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryIntegrationTest.java new file mode 100644 index 00000000..2b3b30d4 --- /dev/null +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryIntegrationTest.java @@ -0,0 +1,281 @@ +package org.hypertrace.core.documentstore.postgres; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.hypertrace.core.documentstore.DatastoreProvider; +import org.hypertrace.core.documentstore.commons.SchemaRegistry; +import org.hypertrace.core.documentstore.expression.impl.DataType; +import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +class PostgresSchemaRegistryIntegrationTest { + + private static final String TABLE_NAME = "myTestFlat"; + + private static GenericContainer postgres; + private static PostgresDatastore datastore; + private static SchemaRegistry registry; + + @BeforeAll + static void init() throws Exception { + postgres = + new GenericContainer<>(DockerImageName.parse("postgres:13.1")) + .withEnv("POSTGRES_PASSWORD", "postgres") + .withEnv("POSTGRES_USER", "postgres") + .withExposedPorts(5432) + .waitingFor(Wait.forListeningPort()); + postgres.start(); + + String connectionUrl = + String.format("jdbc:postgresql://localhost:%s/", postgres.getMappedPort(5432)); + + Map postgresConfig = new HashMap<>(); + postgresConfig.put("url", connectionUrl); + postgresConfig.put("user", "postgres"); + postgresConfig.put("password", "postgres"); + Config config = ConfigFactory.parseMap(postgresConfig); + + datastore = (PostgresDatastore) DatastoreProvider.getDatastore("Postgres", config); + + createFlatTable(); + + registry = datastore.getSchemaRegistry(); + } + + private static void createFlatTable() throws Exception { + String createTableSQL = + String.format( + "CREATE TABLE IF NOT EXISTS \"%s\" (" + + "\"_id\" INTEGER PRIMARY KEY," + + "\"item\" TEXT," + + "\"price\" INTEGER," + + "\"quantity\" BIGINT," + + "\"rating\" REAL," + + "\"score\" DOUBLE PRECISION," + + "\"date\" TIMESTAMPTZ," + + "\"created_date\" DATE," + + "\"in_stock\" BOOLEAN," + + "\"tags\" TEXT[]," + + "\"props\" JSONB" + + ");", + TABLE_NAME); + + try (Connection connection = datastore.getPostgresClient(); + PreparedStatement statement = connection.prepareStatement(createTableSQL)) { + statement.execute(); + System.out.println("Created flat table: " + TABLE_NAME); + } + } + + @BeforeEach + void setUp() { + registry.invalidate(TABLE_NAME); + } + + @AfterAll + static void shutdown() { + if (postgres != null) { + postgres.stop(); + } + } + + @Test + void getSchemaReturnsAllColumns() { + Map schema = registry.getSchema(TABLE_NAME); + + assertNotNull(schema); + assertEquals(11, schema.size()); + assertTrue(schema.containsKey("_id")); + assertTrue(schema.containsKey("item")); + assertTrue(schema.containsKey("price")); + assertTrue(schema.containsKey("quantity")); + assertTrue(schema.containsKey("rating")); + assertTrue(schema.containsKey("score")); + assertTrue(schema.containsKey("date")); + assertTrue(schema.containsKey("created_date")); + assertTrue(schema.containsKey("in_stock")); + assertTrue(schema.containsKey("tags")); + assertTrue(schema.containsKey("props")); + } + + @Test + void getSchemaReturnsCorrectIntegerMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata idMeta = schema.get("_id"); + assertEquals("_id", idMeta.getName()); + assertEquals(DataType.INTEGER, idMeta.getCanonicalType()); + assertEquals(PostgresDataType.INTEGER, idMeta.getPostgresType()); + assertFalse(idMeta.isNullable()); + + PostgresColumnMetadata priceMeta = schema.get("price"); + assertEquals(DataType.INTEGER, priceMeta.getCanonicalType()); + assertEquals(PostgresDataType.INTEGER, priceMeta.getPostgresType()); + assertTrue(priceMeta.isNullable()); + } + + @Test + void getSchemaReturnsCorrectBigintMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata quantityMeta = schema.get("quantity"); + assertEquals(DataType.LONG, quantityMeta.getCanonicalType()); + assertEquals(PostgresDataType.BIGINT, quantityMeta.getPostgresType()); + } + + @Test + void getSchemaReturnsCorrectFloatMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata ratingMeta = schema.get("rating"); + assertEquals(DataType.FLOAT, ratingMeta.getCanonicalType()); + assertEquals(PostgresDataType.REAL, ratingMeta.getPostgresType()); + } + + @Test + void getSchemaReturnsCorrectDoubleMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata scoreMeta = schema.get("score"); + assertEquals(DataType.DOUBLE, scoreMeta.getCanonicalType()); + assertEquals(PostgresDataType.DOUBLE_PRECISION, scoreMeta.getPostgresType()); + } + + @Test + void getSchemaReturnsCorrectTextMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata itemMeta = schema.get("item"); + assertEquals(DataType.STRING, itemMeta.getCanonicalType()); + assertEquals(PostgresDataType.TEXT, itemMeta.getPostgresType()); + } + + @Test + void getSchemaReturnsCorrectBooleanMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata inStockMeta = schema.get("in_stock"); + assertEquals(DataType.BOOLEAN, inStockMeta.getCanonicalType()); + assertEquals(PostgresDataType.BOOLEAN, inStockMeta.getPostgresType()); + } + + @Test + void getSchemaReturnsCorrectTimestamptzMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata dateMeta = schema.get("date"); + assertEquals(DataType.TIMESTAMPTZ, dateMeta.getCanonicalType()); + assertEquals(PostgresDataType.TIMESTAMPTZ, dateMeta.getPostgresType()); + } + + @Test + void getSchemaReturnsCorrectDateMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata createdDateMeta = schema.get("created_date"); + assertEquals(DataType.DATE, createdDateMeta.getCanonicalType()); + assertEquals(PostgresDataType.DATE, createdDateMeta.getPostgresType()); + } + + @Test + void getSchemaReturnsCorrectJsonbMapping() { + Map schema = registry.getSchema(TABLE_NAME); + + PostgresColumnMetadata propsMeta = schema.get("props"); + assertEquals(DataType.JSON, propsMeta.getCanonicalType()); + assertEquals(PostgresDataType.JSONB, propsMeta.getPostgresType()); + } + + @Test + void getColumnOrRefreshReturnsExistingColumn() { + Optional result = registry.getColumnOrRefresh(TABLE_NAME, "item"); + + assertTrue(result.isPresent()); + assertEquals("item", result.get().getName()); + assertEquals(DataType.STRING, result.get().getCanonicalType()); + } + + @Test + void getColumnOrRefreshReturnsEmptyForNonExistentColumn() { + Optional result = + registry.getColumnOrRefresh(TABLE_NAME, "nonexistent_column"); + + assertFalse(result.isPresent()); + } + + @Test + void getColumnOrRefreshFindsNewlyAddedColumnAfterInvalidation() throws Exception { + // First, verify the new column doesn't exist + Optional before = registry.getColumnOrRefresh(TABLE_NAME, "new_column"); + assertFalse(before.isPresent()); + + // Add a new column to the table + try (Connection connection = datastore.getPostgresClient(); + PreparedStatement statement = + connection.prepareStatement( + String.format("ALTER TABLE \"%s\" ADD COLUMN \"new_column\" TEXT", TABLE_NAME))) { + statement.execute(); + } + + // Invalidate cache to force reload + registry.invalidate(TABLE_NAME); + + // Now the registry should find the new column after reload + Optional after = registry.getColumnOrRefresh(TABLE_NAME, "new_column"); + assertTrue(after.isPresent()); + assertEquals("new_column", after.get().getName()); + assertEquals(DataType.STRING, after.get().getCanonicalType()); + + // Cleanup: drop the column + try (Connection connection = datastore.getPostgresClient(); + PreparedStatement statement = + connection.prepareStatement( + String.format("ALTER TABLE \"%s\" DROP COLUMN \"new_column\"", TABLE_NAME))) { + statement.execute(); + } + } + + @Test + void cacheReturnsSameInstanceOnSubsequentCalls() { + Map schema1 = registry.getSchema(TABLE_NAME); + Map schema2 = registry.getSchema(TABLE_NAME); + + // Should be the same cached instance + assertSame(schema1, schema2); + } + + @Test + void invalidateCausesReload() { + Map schema1 = registry.getSchema(TABLE_NAME); + + registry.invalidate(TABLE_NAME); + + Map schema2 = registry.getSchema(TABLE_NAME); + + // Should be different instances after invalidation + assertNotSame(schema1, schema2); + // But same content + assertEquals(schema1.keySet(), schema2.keySet()); + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/Datastore.java b/document-store/src/main/java/org/hypertrace/core/documentstore/Datastore.java index b426cb06..4022dd46 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/Datastore.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/Datastore.java @@ -2,6 +2,8 @@ import java.util.Map; import java.util.Set; +import org.hypertrace.core.documentstore.commons.ColumnMetadata; +import org.hypertrace.core.documentstore.commons.SchemaRegistry; import org.hypertrace.core.documentstore.metric.DocStoreMetricProvider; public interface Datastore { @@ -19,6 +21,10 @@ public interface Datastore { @SuppressWarnings("unused") DocStoreMetricProvider getDocStoreMetricProvider(); + default SchemaRegistry getSchemaRegistry() { + return null; + } + void close(); /** diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java b/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java index c1fcacde..dda12fff 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java @@ -93,6 +93,8 @@ public DatastoreConfig convert(final Config config) { connectionConfig.credentials(), connectionConfig.applicationName(), connectionConfig.connectionPoolConfig(), + connectionConfig.schemaCacheExpiry(), + connectionConfig.schemaRefreshCooldown(), connectionConfig.customParameters()) { @Override public String toConnectionString() { diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java new file mode 100644 index 00000000..6fa89dc3 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java @@ -0,0 +1,11 @@ +package org.hypertrace.core.documentstore.commons; + +import org.hypertrace.core.documentstore.expression.impl.DataType; + +public interface ColumnMetadata { + String getName(); + + DataType getCanonicalType(); + + boolean isNullable(); +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java new file mode 100644 index 00000000..68782184 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java @@ -0,0 +1,13 @@ +package org.hypertrace.core.documentstore.commons; + +import java.util.Map; +import java.util.Optional; + +public interface SchemaRegistry { + + Map getSchema(String tableName); + + void invalidate(String tableName); + + Optional getColumnOrRefresh(String tableName, String colName); +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/expression/impl/DataType.java b/document-store/src/main/java/org/hypertrace/core/documentstore/expression/impl/DataType.java index 9eec68b9..44ed4d2c 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/expression/impl/DataType.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/expression/impl/DataType.java @@ -22,6 +22,7 @@ public enum DataType { FLOAT, DOUBLE, BOOLEAN, + JSON, // timestamp with time-zone information. For example: 2004-10-19 10:23:54+02. // For more info, see: https://www.postgresql.org/docs/current/datatype-datetime.html TIMESTAMPTZ, diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java index 18d9975c..2960a2d2 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java @@ -126,6 +126,8 @@ public ConnectionConfig build() { credentials, applicationName, connectionPoolConfig, + null, + null, customParameters); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java index 925e4b1e..2d1ab176 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java @@ -3,6 +3,7 @@ import static java.util.Collections.unmodifiableList; import static java.util.function.Predicate.not; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -34,8 +35,13 @@ public class PostgresConnectionConfig extends ConnectionConfig { .password(PostgresDefaults.DEFAULT_PASSWORD) .build(); + private static final Duration DEFAULT_SCHEMA_CACHE_EXPIRY = Duration.ofHours(24); + private static final Duration DEFAULT_SCHEMA_REFRESH_COOLDOWN = Duration.ofMinutes(15); + @NonNull String applicationName; @NonNull ConnectionPoolConfig connectionPoolConfig; + @NonNull Duration schemaCacheExpiry; + @NonNull Duration schemaRefreshCooldown; public static ConnectionConfigBuilder builder() { return ConnectionConfig.builder().type(DatabaseType.POSTGRES); @@ -47,6 +53,8 @@ public PostgresConnectionConfig( @Nullable final ConnectionCredentials credentials, @NonNull final String applicationName, @Nullable final ConnectionPoolConfig connectionPoolConfig, + @Nullable final Duration schemaCacheExpiry, + @Nullable final Duration schemaRefreshCooldown, @NonNull final Map customParameters) { super( ensureSingleEndpoint(endpoints), @@ -55,6 +63,8 @@ public PostgresConnectionConfig( customParameters); this.applicationName = applicationName; this.connectionPoolConfig = getConnectionPoolConfigOrDefault(connectionPoolConfig); + this.schemaCacheExpiry = getSchemaCacheExpiryOrDefault(schemaCacheExpiry); + this.schemaRefreshCooldown = getSchemaRefreshCooldownOrDefault(schemaRefreshCooldown); } public String toConnectionString() { @@ -127,4 +137,15 @@ private ConnectionPoolConfig getConnectionPoolConfigOrDefault( @Nullable final ConnectionPoolConfig connectionPoolConfig) { return Optional.ofNullable(connectionPoolConfig).orElse(ConnectionPoolConfig.builder().build()); } + + @NonNull + private Duration getSchemaCacheExpiryOrDefault(@Nullable final Duration schemaCacheExpiry) { + return Optional.ofNullable(schemaCacheExpiry).orElse(DEFAULT_SCHEMA_CACHE_EXPIRY); + } + + @NonNull + private Duration getSchemaRefreshCooldownOrDefault( + @Nullable final Duration schemaRefreshCooldown) { + return Optional.ofNullable(schemaRefreshCooldown).orElse(DEFAULT_SCHEMA_REFRESH_COOLDOWN); + } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 0b7afc62..6dd89e1b 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -37,8 +37,14 @@ public class FlatPostgresCollection extends PostgresCollection { private static final String WRITE_NOT_SUPPORTED = "Write operations are not supported for flat collections yet!"; - FlatPostgresCollection(final PostgresClient client, final String collectionName) { + private final PostgresSchemaRegistry schemaRegistry; + + FlatPostgresCollection( + final PostgresClient client, + final String collectionName, + final PostgresSchemaRegistry schemaRegistry) { super(client, collectionName); + this.schemaRegistry = schemaRegistry; } @Override diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java index afbb6f98..4e9fb3e2 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java @@ -20,11 +20,13 @@ import org.hypertrace.core.documentstore.Collection; import org.hypertrace.core.documentstore.Datastore; import org.hypertrace.core.documentstore.DocumentType; +import org.hypertrace.core.documentstore.commons.SchemaRegistry; import org.hypertrace.core.documentstore.metric.DocStoreMetricProvider; import org.hypertrace.core.documentstore.metric.postgres.PostgresDocStoreMetricProvider; import org.hypertrace.core.documentstore.model.config.ConnectionConfig; import org.hypertrace.core.documentstore.model.config.DatastoreConfig; import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig; +import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +39,7 @@ public class PostgresDatastore implements Datastore { private final PostgresClient client; private final String database; private final DocStoreMetricProvider docStoreMetricProvider; + private final SchemaRegistry schemaRegistry; public PostgresDatastore(@NonNull final DatastoreConfig datastoreConfig) { final ConnectionConfig connectionConfig = datastoreConfig.connectionConfig(); @@ -57,6 +60,11 @@ public PostgresDatastore(@NonNull final DatastoreConfig datastoreConfig) { client = new PostgresClient(postgresConnectionConfig); database = connectionConfig.database(); docStoreMetricProvider = new PostgresDocStoreMetricProvider(this, postgresConnectionConfig); + schemaRegistry = + new PostgresSchemaRegistry( + new PostgresMetadataFetcher(client), + postgresConnectionConfig.schemaCacheExpiry(), + postgresConnectionConfig.schemaRefreshCooldown()); } catch (final IllegalArgumentException e) { throw new IllegalArgumentException( String.format("Unable to instantiate PostgresClient with config:%s", connectionConfig), @@ -163,7 +171,8 @@ public Collection getCollectionForType(String collectionName, DocumentType docum switch (documentType) { case FLAT: { - return new FlatPostgresCollection(client, collectionName); + return new FlatPostgresCollection( + client, collectionName, (PostgresSchemaRegistry) schemaRegistry); } case NESTED: return getCollection(collectionName); @@ -189,6 +198,12 @@ public DocStoreMetricProvider getDocStoreMetricProvider() { return docStoreMetricProvider; } + @SuppressWarnings("unchecked") + @Override + public SchemaRegistry getSchemaRegistry() { + return schemaRegistry; + } + @Override public void close() { try { diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java new file mode 100644 index 00000000..2d2be0ec --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java @@ -0,0 +1,125 @@ +package org.hypertrace.core.documentstore.postgres; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import lombok.AllArgsConstructor; +import org.hypertrace.core.documentstore.expression.impl.DataType; +import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; + +/** + * Fetches schema metadata directly from Postgres system catalogs. Hardcoded to query + * information_schema.columns. + */ +@AllArgsConstructor +public class PostgresMetadataFetcher { + + private final PostgresClient client; + + private static final String DISCOVERY_SQL = + "SELECT column_name, udt_name, is_nullable " + + "FROM information_schema.columns " + + "WHERE table_schema = 'public' AND table_name = ?"; + + public Map fetch(String tableName) { + Map metadataMap = new HashMap<>(); + + try (Connection conn = client.getPooledConnection(); + PreparedStatement ps = conn.prepareStatement(DISCOVERY_SQL)) { + + ps.setString(1, tableName); + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + String columnName = rs.getString("column_name"); + String udtName = rs.getString("udt_name"); + boolean isNullable = "YES".equalsIgnoreCase(rs.getString("is_nullable")); + metadataMap.put( + columnName, + new PostgresColumnMetadata( + columnName, + mapToCanonicalType(udtName), + mapToPostgresType(udtName), + udtName, + isNullable)); + } + } + return metadataMap; + } catch (SQLException e) { + throw new RuntimeException("Failed to fetch Postgres metadata for table: " + tableName, e); + } + } + + /** Maps Postgres udt_name to canonical DataType. */ + private DataType mapToCanonicalType(String udtName) { + if (udtName == null) { + return DataType.UNSPECIFIED; + } + + switch (udtName.toLowerCase()) { + case "int4": + case "int2": + return DataType.INTEGER; + case "int8": + return DataType.LONG; + case "float4": + return DataType.FLOAT; + case "float8": + case "numeric": + return DataType.DOUBLE; + case "bool": + return DataType.BOOLEAN; + case "timestamptz": + return DataType.TIMESTAMPTZ; + case "date": + return DataType.DATE; + case "text": + case "varchar": + case "bpchar": + case "uuid": + return DataType.STRING; + case "jsonb": + return DataType.JSON; + default: + return DataType.UNSPECIFIED; + } + } + + /** Maps Postgres udt_name to PostgresDataType. */ + private PostgresDataType mapToPostgresType(String udtName) { + if (udtName == null) { + return PostgresDataType.UNKNOWN; + } + + switch (udtName.toLowerCase()) { + case "int4": + case "int2": + return PostgresDataType.INTEGER; + case "int8": + return PostgresDataType.BIGINT; + case "float4": + return PostgresDataType.REAL; + case "float8": + case "numeric": + return PostgresDataType.DOUBLE_PRECISION; + case "bool": + return PostgresDataType.BOOLEAN; + case "timestamptz": + return PostgresDataType.TIMESTAMPTZ; + case "date": + return PostgresDataType.DATE; + case "text": + case "varchar": + case "bpchar": + case "uuid": + return PostgresDataType.TEXT; + case "jsonb": + return PostgresDataType.JSONB; + default: + return PostgresDataType.UNKNOWN; + } + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java new file mode 100644 index 00000000..e8fc9b19 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistry.java @@ -0,0 +1,158 @@ +package org.hypertrace.core.documentstore.postgres; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.hypertrace.core.documentstore.commons.SchemaRegistry; +import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; + +/** + * A lazily-loaded, cached schema registry for PostgreSQL tables. + * + *

This registry fetches and caches column metadata from PostgreSQL's {@code information_schema} + * on demand. It provides: + * + *

    + *
  • Lazy loading: Schema metadata is fetched only when first requested for the + * particular table. + *
  • TTL-based caching: Cached schemas expire after a configurable duration (default: 24 + * hours). + *
  • Circuit breaker: Prevents excessive database calls by enforcing a cooldown period + * between refresh attempts for missing columns (default: 15 minutes). + *
+ * + *

Usage Example

+ * + *
{@code
+ * PostgresMetadataFetcher fetcher = new PostgresMetadataFetcher(datastore);
+ * PostgresSchemaRegistry registry = new PostgresSchemaRegistry(fetcher);
+ *
+ * // Get all columns for a table
+ * Map schema = registry.getSchema("my_table");
+ *
+ * // Get a specific column, refreshing if not found (respects cooldown)
+ * PostgresColumnMetadata column = registry.getColumnOrRefresh("my_table", "my_column");
+ * }
+ * + *

Thread Safety

+ * + *

This class is thread-safe. The underlying Guava {@link LoadingCache} and {@link + * ConcurrentHashMap} handle concurrent access. + * + * @see PostgresMetadataFetcher + * @see PostgresColumnMetadata + */ +public class PostgresSchemaRegistry implements SchemaRegistry { + + private final LoadingCache> cache; + private final ConcurrentHashMap lastRefreshTimes; + private final Duration refreshCooldown; + + /** + * Creates a new schema registry with custom cache settings. + * + * @param fetcher the metadata fetcher to use for loading schema information + * @param cacheExpiry how long to keep cached schemas before they expire + * @param refreshCooldown minimum time between refresh attempts for missing columns + */ + public PostgresSchemaRegistry( + PostgresMetadataFetcher fetcher, Duration cacheExpiry, Duration refreshCooldown) { + this.refreshCooldown = refreshCooldown; + this.lastRefreshTimes = new ConcurrentHashMap<>(); + this.cache = + CacheBuilder.newBuilder() + .expireAfterWrite(cacheExpiry.toMinutes(), TimeUnit.MINUTES) + .build( + new CacheLoader<>() { + @Override + public Map load(String tableName) { + lastRefreshTimes.put(tableName, Instant.now()); + return fetcher.fetch(tableName); + } + }); + } + + /** + * Gets the schema for a table, loading it from the database if not cached. + * + * @param tableName the name of the table + * @return a map of column names to their metadata + * @throws RuntimeException if the schema cannot be fetched + */ + @Override + public Map getSchema(String tableName) { + try { + return cache.get(tableName); + } catch (ExecutionException e) { + throw new RuntimeException("Failed to fetch schema for " + tableName, e.getCause()); + } + } + + /** + * Invalidates the cached schema for a specific table. + * + *

The next call to {@link #getSchema(String)} will reload the schema from the database. + * + * @param tableName the name of the table to invalidate + */ + @Override + public void invalidate(String tableName) { + cache.invalidate(tableName); + } + + /** + * Gets metadata for a specific column, optionally refreshing the schema if the column is not + * found. + * + *

This method implements a circuit breaker pattern: + * + *

    + *
  • If the column exists in the cached schema, it is returned immediately. + *
  • If the column is not found and the cooldown period has elapsed since the last refresh, + * the schema is reloaded from the database. + *
  • If the column is not found but the cooldown period has not elapsed, {@code null} is + * returned without hitting the database. + *
+ * + *

Note that this is a check-then-act sequence that should ideally be atomic. However, this + * method is deliberately not thread-safe since even in case of a data race, it will result in one + * extra call to the DB, which will not be allowed anyway due to the cooldown period having been + * reset by the previous call. This is likely to be more performant than contending for locks. + * + * @param tableName the name of the table + * @param colName the name of the column + * @return an Optional containing the column metadata, or empty if the column does not exist + */ + @Override + public Optional getColumnOrRefresh(String tableName, String colName) { + Map schema = getSchema(tableName); + + if (!schema.containsKey(colName) && canRefresh(tableName)) { + invalidate(tableName); + schema = getSchema(tableName); + } + + return Optional.ofNullable(schema.get(colName)); + } + + /** + * Checks if a refresh is allowed for the given table based on the cooldown period. + * + * @param tableName the name of the table + * @return {@code true} if the cooldown period has elapsed since the last refresh + */ + private boolean canRefresh(String tableName) { + Instant lastRefresh = lastRefreshTimes.get(tableName); + if (lastRefresh == null) { + return true; + } + return Duration.between(lastRefresh, Instant.now()).compareTo(refreshCooldown) >= 0; + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java new file mode 100644 index 00000000..508b3dc6 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java @@ -0,0 +1,34 @@ +package org.hypertrace.core.documentstore.postgres.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import org.hypertrace.core.documentstore.commons.ColumnMetadata; +import org.hypertrace.core.documentstore.expression.impl.DataType; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; + +@Builder +@AllArgsConstructor +public class PostgresColumnMetadata implements ColumnMetadata { + + private final String colName; + private final DataType canonicalType; + @Getter private final PostgresDataType postgresType; + private final String pgType; + private final boolean nullable; + + @Override + public String getName() { + return colName; + } + + @Override + public DataType getCanonicalType() { + return canonicalType; + } + + @Override + public boolean isNullable() { + return nullable; + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/nonjson/field/PostgresDataType.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/nonjson/field/PostgresDataType.java index 89626e7c..c920473f 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/nonjson/field/PostgresDataType.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/nonjson/field/PostgresDataType.java @@ -15,6 +15,7 @@ public enum PostgresDataType { REAL("float4"), DOUBLE_PRECISION("float8"), BOOLEAN("bool"), + JSONB("jsonb"), TIMESTAMPTZ("timestamptz"), DATE("date"), UNKNOWN(null); diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcherTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcherTest.java new file mode 100644 index 00000000..6fa32d91 --- /dev/null +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcherTest.java @@ -0,0 +1,310 @@ +package org.hypertrace.core.documentstore.postgres; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import org.hypertrace.core.documentstore.expression.impl.DataType; +import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class PostgresMetadataFetcherTest { + + private static final String TEST_TABLE = "test_table"; + + @Mock private PostgresClient client; + @Mock private Connection connection; + @Mock private PreparedStatement preparedStatement; + @Mock private ResultSet resultSet; + + private PostgresMetadataFetcher fetcher; + + @BeforeEach + void setUp() throws SQLException { + when(client.getPooledConnection()).thenReturn(connection); + when(connection.prepareStatement(anyString())).thenReturn(preparedStatement); + when(preparedStatement.executeQuery()).thenReturn(resultSet); + fetcher = new PostgresMetadataFetcher(client); + } + + @Test + void fetchReturnsEmptyMapForTableWithNoColumns() throws SQLException { + when(resultSet.next()).thenReturn(false); + + Map result = fetcher.fetch(TEST_TABLE); + + assertTrue(result.isEmpty()); + verify(preparedStatement).setString(1, TEST_TABLE); + } + + @Test + void fetchReturnsSingleColumn() throws SQLException { + when(resultSet.next()).thenReturn(true, false); + when(resultSet.getString("column_name")).thenReturn("id"); + when(resultSet.getString("udt_name")).thenReturn("int4"); + when(resultSet.getString("is_nullable")).thenReturn("NO"); + + Map result = fetcher.fetch(TEST_TABLE); + + assertEquals(1, result.size()); + PostgresColumnMetadata metadata = result.get("id"); + assertNotNull(metadata); + assertEquals("id", metadata.getName()); + assertEquals(DataType.INTEGER, metadata.getCanonicalType()); + assertEquals(PostgresDataType.INTEGER, metadata.getPostgresType()); + assertFalse(metadata.isNullable()); + } + + @Test + void fetchReturnsMultipleColumns() throws SQLException { + when(resultSet.next()).thenReturn(true, true, true, false); + when(resultSet.getString("column_name")).thenReturn("id", "name", "price"); + when(resultSet.getString("udt_name")).thenReturn("int8", "text", "float8"); + when(resultSet.getString("is_nullable")).thenReturn("NO", "YES", "YES"); + + Map result = fetcher.fetch(TEST_TABLE); + + assertEquals(3, result.size()); + + // Verify id column + PostgresColumnMetadata idMeta = result.get("id"); + assertEquals(DataType.LONG, idMeta.getCanonicalType()); + assertEquals(PostgresDataType.BIGINT, idMeta.getPostgresType()); + assertFalse(idMeta.isNullable()); + + // Verify name column + PostgresColumnMetadata nameMeta = result.get("name"); + assertEquals(DataType.STRING, nameMeta.getCanonicalType()); + assertEquals(PostgresDataType.TEXT, nameMeta.getPostgresType()); + assertTrue(nameMeta.isNullable()); + + // Verify price column + PostgresColumnMetadata priceMeta = result.get("price"); + assertEquals(DataType.DOUBLE, priceMeta.getCanonicalType()); + assertEquals(PostgresDataType.DOUBLE_PRECISION, priceMeta.getPostgresType()); + assertTrue(priceMeta.isNullable()); + } + + @Test + void fetchMapsInt4ToInteger() throws SQLException { + setupSingleColumnResult("col", "int4", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.INTEGER, meta.getCanonicalType()); + assertEquals(PostgresDataType.INTEGER, meta.getPostgresType()); + } + + @Test + void fetchMapsInt2ToInteger() throws SQLException { + setupSingleColumnResult("col", "int2", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.INTEGER, meta.getCanonicalType()); + assertEquals(PostgresDataType.INTEGER, meta.getPostgresType()); + } + + @Test + void fetchMapsInt8ToLong() throws SQLException { + setupSingleColumnResult("col", "int8", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.LONG, meta.getCanonicalType()); + assertEquals(PostgresDataType.BIGINT, meta.getPostgresType()); + } + + @Test + void fetchMapsFloat4ToFloat() throws SQLException { + setupSingleColumnResult("col", "float4", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.FLOAT, meta.getCanonicalType()); + assertEquals(PostgresDataType.REAL, meta.getPostgresType()); + } + + @Test + void fetchMapsFloat8ToDouble() throws SQLException { + setupSingleColumnResult("col", "float8", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.DOUBLE, meta.getCanonicalType()); + assertEquals(PostgresDataType.DOUBLE_PRECISION, meta.getPostgresType()); + } + + @Test + void fetchMapsNumericToDouble() throws SQLException { + setupSingleColumnResult("col", "numeric", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.DOUBLE, meta.getCanonicalType()); + assertEquals(PostgresDataType.DOUBLE_PRECISION, meta.getPostgresType()); + } + + @Test + void fetchMapsBoolToBoolean() throws SQLException { + setupSingleColumnResult("col", "bool", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.BOOLEAN, meta.getCanonicalType()); + assertEquals(PostgresDataType.BOOLEAN, meta.getPostgresType()); + } + + @Test + void fetchMapsTextToString() throws SQLException { + setupSingleColumnResult("col", "text", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.STRING, meta.getCanonicalType()); + assertEquals(PostgresDataType.TEXT, meta.getPostgresType()); + } + + @Test + void fetchMapsVarcharToString() throws SQLException { + setupSingleColumnResult("col", "varchar", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.STRING, meta.getCanonicalType()); + assertEquals(PostgresDataType.TEXT, meta.getPostgresType()); + } + + @Test + void fetchMapsBpcharToString() throws SQLException { + setupSingleColumnResult("col", "bpchar", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.STRING, meta.getCanonicalType()); + assertEquals(PostgresDataType.TEXT, meta.getPostgresType()); + } + + @Test + void fetchMapsUuidToString() throws SQLException { + setupSingleColumnResult("col", "uuid", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.STRING, meta.getCanonicalType()); + assertEquals(PostgresDataType.TEXT, meta.getPostgresType()); + } + + @Test + void fetchMapsJsonbToJson() throws SQLException { + setupSingleColumnResult("col", "jsonb", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.JSON, meta.getCanonicalType()); + assertEquals(PostgresDataType.JSONB, meta.getPostgresType()); + } + + @Test + void fetchMapsTimestamptzToTimestamptz() throws SQLException { + setupSingleColumnResult("col", "timestamptz", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.TIMESTAMPTZ, meta.getCanonicalType()); + assertEquals(PostgresDataType.TIMESTAMPTZ, meta.getPostgresType()); + } + + @Test + void fetchMapsDateToDate() throws SQLException { + setupSingleColumnResult("col", "date", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.DATE, meta.getCanonicalType()); + assertEquals(PostgresDataType.DATE, meta.getPostgresType()); + } + + @Test + void fetchMapsUnknownTypeToUnspecified() throws SQLException { + setupSingleColumnResult("col", "unknown_type", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.UNSPECIFIED, meta.getCanonicalType()); + assertEquals(PostgresDataType.UNKNOWN, meta.getPostgresType()); + } + + @Test + void fetchMapsNullUdtNameToUnspecified() throws SQLException { + setupSingleColumnResult("col", null, "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.UNSPECIFIED, meta.getCanonicalType()); + assertEquals(PostgresDataType.UNKNOWN, meta.getPostgresType()); + } + + @Test + void fetchHandlesNullableColumn() throws SQLException { + setupSingleColumnResult("col", "text", "YES"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertTrue(meta.isNullable()); + } + + @Test + void fetchHandlesNonNullableColumn() throws SQLException { + setupSingleColumnResult("col", "text", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertFalse(meta.isNullable()); + } + + @Test + void fetchHandlesCaseInsensitiveUdtName() throws SQLException { + setupSingleColumnResult("col", "INT4", "NO"); + + PostgresColumnMetadata meta = fetcher.fetch(TEST_TABLE).get("col"); + + assertEquals(DataType.INTEGER, meta.getCanonicalType()); + assertEquals(PostgresDataType.INTEGER, meta.getPostgresType()); + } + + @Test + void fetchThrowsRuntimeExceptionOnSqlException() throws SQLException { + when(preparedStatement.executeQuery()).thenThrow(new SQLException("Connection failed")); + + RuntimeException exception = + assertThrows(RuntimeException.class, () -> fetcher.fetch(TEST_TABLE)); + + assertTrue(exception.getMessage().contains(TEST_TABLE)); + assertTrue(exception.getCause() instanceof SQLException); + } + + private void setupSingleColumnResult(String colName, String udtName, String isNullable) + throws SQLException { + when(resultSet.next()).thenReturn(true, false); + when(resultSet.getString("column_name")).thenReturn(colName); + when(resultSet.getString("udt_name")).thenReturn(udtName); + when(resultSet.getString("is_nullable")).thenReturn(isNullable); + } +} diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryTest.java new file mode 100644 index 00000000..7fa22d5f --- /dev/null +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresSchemaRegistryTest.java @@ -0,0 +1,306 @@ +package org.hypertrace.core.documentstore.postgres; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.hypertrace.core.documentstore.expression.impl.DataType; +import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class PostgresSchemaRegistryTest { + + private static final String TEST_TABLE = "test_table"; + private static final String COL_ID = "id"; + private static final String COL_NAME = "name"; + private static final String COL_PRICE = "price"; + private static final Duration CACHE_EXPIRY = Duration.ofHours(24); + private static final Duration REFRESH_COOLDOWN = Duration.ofMillis(50); + + @Mock private PostgresMetadataFetcher fetcher; + + private PostgresSchemaRegistry registry; + + @BeforeEach + void setUp() { + registry = new PostgresSchemaRegistry(fetcher, CACHE_EXPIRY, REFRESH_COOLDOWN); + } + + @Test + void getSchemaLoadsFromFetcherOnCacheMiss() { + Map expectedSchema = createTestSchema(); + when(fetcher.fetch(TEST_TABLE)).thenReturn(expectedSchema); + + Map result = registry.getSchema(TEST_TABLE); + + assertEquals(expectedSchema, result); + verify(fetcher, times(1)).fetch(TEST_TABLE); + } + + @Test + void getSchemaReturnsCachedValueOnSubsequentCalls() { + Map expectedSchema = createTestSchema(); + when(fetcher.fetch(TEST_TABLE)).thenReturn(expectedSchema); + + // First call - loads from fetcher + Map result1 = registry.getSchema(TEST_TABLE); + // Second call - should use cache + Map result2 = registry.getSchema(TEST_TABLE); + + assertEquals(expectedSchema, result1); + assertEquals(expectedSchema, result2); + // Fetcher should only be called once + verify(fetcher, times(1)).fetch(TEST_TABLE); + } + + @Test + void getSchemaLoadsEachTableIndependently() { + String table1 = "table1"; + String table2 = "table2"; + Map schema1 = createTestSchema(); + Map schema2 = new HashMap<>(); + schema2.put( + "other_col", + PostgresColumnMetadata.builder() + .colName("other_col") + .canonicalType(DataType.BOOLEAN) + .postgresType(PostgresDataType.BOOLEAN) + .pgType("bool") + .nullable(false) + .build()); + + when(fetcher.fetch(table1)).thenReturn(schema1); + when(fetcher.fetch(table2)).thenReturn(schema2); + + Map result1 = registry.getSchema(table1); + Map result2 = registry.getSchema(table2); + + assertEquals(schema1, result1); + assertEquals(schema2, result2); + verify(fetcher, times(1)).fetch(table1); + verify(fetcher, times(1)).fetch(table2); + } + + @Test + void invalidateClearsSpecificTableCache() { + Map schema = createTestSchema(); + when(fetcher.fetch(TEST_TABLE)).thenReturn(schema); + + // Load into cache + registry.getSchema(TEST_TABLE); + verify(fetcher, times(1)).fetch(TEST_TABLE); + + // Invalidate the cache + registry.invalidate(TEST_TABLE); + + // Next call should reload from fetcher + registry.getSchema(TEST_TABLE); + verify(fetcher, times(2)).fetch(TEST_TABLE); + } + + @Test + void invalidateDoesNotAffectOtherTables() { + String table1 = "table1"; + String table2 = "table2"; + Map schema1 = createTestSchema(); + Map schema2 = new HashMap<>(); + + when(fetcher.fetch(table1)).thenReturn(schema1); + when(fetcher.fetch(table2)).thenReturn(schema2); + + // Load both tables into cache + registry.getSchema(table1); + registry.getSchema(table2); + + // Invalidate only table1 + registry.invalidate(table1); + + // table1 should reload, table2 should use cache + registry.getSchema(table1); + registry.getSchema(table2); + + verify(fetcher, times(2)).fetch(table1); + verify(fetcher, times(1)).fetch(table2); + } + + @Test + void getColumnOrRefreshReturnsColumnIfExists() { + Map schema = createTestSchema(); + when(fetcher.fetch(TEST_TABLE)).thenReturn(schema); + + Optional result = registry.getColumnOrRefresh(TEST_TABLE, COL_ID); + + assertTrue(result.isPresent()); + assertEquals(COL_ID, result.get().getName()); + assertEquals(DataType.INTEGER, result.get().getCanonicalType()); + // Should only call fetcher once (initial load) + verify(fetcher, times(1)).fetch(TEST_TABLE); + } + + @Test + void getColumnOrRefreshRefreshesSchemaIfColumnMissingAndCooldownExpired() throws Exception { + // Initial schema without the "new_col" + Map initialSchema = createTestSchema(); + + // Updated schema with the "new_col" + Map updatedSchema = new HashMap<>(initialSchema); + updatedSchema.put( + "new_col", + PostgresColumnMetadata.builder() + .colName("new_col") + .canonicalType(DataType.STRING) + .postgresType(PostgresDataType.TEXT) + .pgType("text") + .nullable(true) + .build()); + + when(fetcher.fetch(TEST_TABLE)).thenReturn(initialSchema).thenReturn(updatedSchema); + + // First call loads the schema + registry.getSchema(TEST_TABLE); + verify(fetcher, times(1)).fetch(TEST_TABLE); + + // Wait past cooldown period + Thread.sleep(REFRESH_COOLDOWN.toMillis() + 10); + + // Now try to get missing column - should trigger refresh + Optional result = registry.getColumnOrRefresh(TEST_TABLE, "new_col"); + + assertTrue(result.isPresent()); + assertEquals("new_col", result.get().getName()); + // Should call fetcher twice (initial load + refresh after cooldown) + verify(fetcher, times(2)).fetch(TEST_TABLE); + } + + @Test + void getColumnOrRefreshDoesNotRefreshIfWithinCooldownPeriod() { + Map schema = createTestSchema(); + when(fetcher.fetch(TEST_TABLE)).thenReturn(schema); + + // First call loads the schema + registry.getSchema(TEST_TABLE); + verify(fetcher, times(1)).fetch(TEST_TABLE); + + // Try to get a missing column - should NOT refresh because we're within cooldown + Optional result = + registry.getColumnOrRefresh(TEST_TABLE, "nonexistent_col"); + + assertFalse(result.isPresent()); + // Should only call fetcher once (initial load, no refresh due to cooldown) + verify(fetcher, times(1)).fetch(TEST_TABLE); + } + + @Test + void getColumnOrRefreshRefreshesAfterCooldownExpires() throws Exception { + Map schema = createTestSchema(); + when(fetcher.fetch(TEST_TABLE)).thenReturn(schema); + + // First call loads the schema + registry.getSchema(TEST_TABLE); + verify(fetcher, times(1)).fetch(TEST_TABLE); + + // Try immediately - should NOT refresh (within cooldown) + registry.getColumnOrRefresh(TEST_TABLE, "nonexistent_col"); + verify(fetcher, times(1)).fetch(TEST_TABLE); + + // Wait past cooldown + Thread.sleep(REFRESH_COOLDOWN.toMillis() + 10); + + // Try again - should refresh now + registry.getColumnOrRefresh(TEST_TABLE, "nonexistent_col"); + verify(fetcher, times(2)).fetch(TEST_TABLE); + } + + @Test + void getColumnOrRefreshReturnsEmptyIfColumnStillMissingAfterRefresh() throws Exception { + Map schema = createTestSchema(); + when(fetcher.fetch(TEST_TABLE)).thenReturn(schema); + + // First call loads the schema + registry.getSchema(TEST_TABLE); + + // Wait past cooldown + Thread.sleep(REFRESH_COOLDOWN.toMillis() + 10); + + // Try to get a column that doesn't exist even after refresh + Optional result = + registry.getColumnOrRefresh(TEST_TABLE, "nonexistent_col"); + + assertFalse(result.isPresent()); + // Should call fetcher twice (initial load + refresh attempt after cooldown) + verify(fetcher, times(2)).fetch(TEST_TABLE); + } + + @Test + void getColumnOrRefreshUsesExistingCacheBeforeRefresh() { + Map schema = createTestSchema(); + when(fetcher.fetch(TEST_TABLE)).thenReturn(schema); + + // Pre-populate cache + registry.getSchema(TEST_TABLE); + + // Get existing column - should not trigger refresh + Optional result = registry.getColumnOrRefresh(TEST_TABLE, COL_NAME); + + assertTrue(result.isPresent()); + assertEquals(COL_NAME, result.get().getName()); + // Should only call fetcher once (initial getSchema) + verify(fetcher, times(1)).fetch(TEST_TABLE); + } + + @Test + void getSchemaThrowsRuntimeExceptionWhenFetcherFails() { + when(fetcher.fetch(TEST_TABLE)).thenThrow(new RuntimeException("Database error")); + + RuntimeException exception = + assertThrows(RuntimeException.class, () -> registry.getSchema(TEST_TABLE)); + + assertEquals("Database error", exception.getCause().getMessage()); + } + + private Map createTestSchema() { + Map schema = new HashMap<>(); + schema.put( + COL_ID, + PostgresColumnMetadata.builder() + .colName(COL_ID) + .canonicalType(DataType.INTEGER) + .postgresType(PostgresDataType.INTEGER) + .pgType("int4") + .nullable(false) + .build()); + schema.put( + COL_NAME, + PostgresColumnMetadata.builder() + .colName(COL_NAME) + .canonicalType(DataType.STRING) + .postgresType(PostgresDataType.TEXT) + .pgType("text") + .nullable(true) + .build()); + schema.put( + COL_PRICE, + PostgresColumnMetadata.builder() + .colName(COL_PRICE) + .canonicalType(DataType.DOUBLE) + .postgresType(PostgresDataType.DOUBLE_PRECISION) + .pgType("float8") + .nullable(true) + .build()); + return schema; + } +}