Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ ligradle
.DS_Store
*.patch
*/metastore_db
.pyc
.pyc
__pycache__/
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ public Table getTable(String name) {

// Dispatch based on CoralTable implementation type
if (coralTable instanceof IcebergTable) {
ViewDependencyTracker.get().recordBaseTableDependency(ViewDependencyTracker.OPENHOUSE_CATALOG, dbName, name);
return new IcebergCalciteTableAdapter((IcebergTable) coralTable);
} else if (coralTable instanceof HiveTable) {
HiveTable hiveTable = (HiveTable) coralTable;
// Check if it's a view
if (hiveTable.tableType() == VIEW) {
return new HiveCalciteViewAdapter(hiveTable, ImmutableList.of(CoralRootSchema.ROOT_SCHEMA, dbName));
} else {
ViewDependencyTracker.get().recordBaseTableDependency(ViewDependencyTracker.HIVE_CATALOG, dbName, name);
return new HiveCalciteTableAdapter(hiveTable);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public Table getTable(String name) {
case VIRTUAL_VIEW:
return new HiveCalciteViewAdapter(table, ImmutableList.of(HiveSchema.ROOT_SCHEMA, dbName));
default:
ViewDependencyTracker.get().recordBaseTableDependency(ViewDependencyTracker.HIVE_CATALOG, dbName, name);
return new HiveCalciteTableAdapter(table);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,10 @@ public RelNode convertSql(String sql) {
* @return Calcite {@link RelNode} representation of hive view definition
*/
public RelNode convertView(String hiveDbName, String hiveViewName) {
SqlNode sqlNode = processView(hiveDbName, hiveViewName);
return toRel(sqlNode);
return ViewDependencyTracker.get().withViewExpansion(ViewDependencyTracker.HIVE_CATALOG, hiveDbName, hiveViewName, () -> {
SqlNode sqlNode = processView(hiveDbName, hiveViewName);
return toRel(sqlNode);
});
}

// TODO change back to protected once the relevant tests move to the common package
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright 2017-2026 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.common;

import java.util.List;
import java.util.Objects;


/**
* Represents a single node in the view dependency chain.
* Names are in "catalog.db.table" format (e.g. "hive.db.v1", "openhouse.db.t1").
* For a view "hive.db.v1" that depends on "hive.db.v2" and "openhouse.db.t1",
* this would be: ViewDependency("hive.db.v1", ["hive.db.v2", "openhouse.db.t1"])
*/
public class ViewDependency {
private final String viewName;
private final List<String> dependencies;

public ViewDependency(String viewName, List<String> dependencies) {
this.viewName = viewName;
this.dependencies = dependencies;
}

public String getViewName() {
return viewName;
}

public List<String> getDependencies() {
return dependencies;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ViewDependency that = (ViewDependency) o;
return Objects.equals(viewName, that.viewName) && Objects.equals(dependencies, that.dependencies);
}

@Override
public int hashCode() {
return Objects.hash(viewName, dependencies);
}

@Override
public String toString() {
return "ViewDependency{view=" + viewName + ", deps=" + dependencies + "}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Copyright 2017-2026 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.common;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;


/**
* Thread-local tracker that records view to [immediate dependencies] mappings
* during Calcite view expansion.
*/
public class ViewDependencyTracker {
public static final String HIVE_CATALOG = "hive";
public static final String OPENHOUSE_CATALOG = "openhouse";

private static final ThreadLocal<ViewDependencyTracker> INSTANCE =
ThreadLocal.withInitial(ViewDependencyTracker::new);

private final Map<String, List<String>> viewDeps = new LinkedHashMap<>();

private final Deque<String> expansionStack = new ArrayDeque<>();

public static ViewDependencyTracker get() {
return INSTANCE.get();
}

public static void reset() {
INSTANCE.remove();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After each resolution, the state needs to be cleared - not just in tests, but also for 'regular' use

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, this method will be invoked from the caller who invokes the rel converter of Coral


/**
* Tracks a view expansion, recording the view as a dependency of any parent view
* currently being expanded, then executing the supplied work within the scope of
* this view. The expansion stack is managed automatically.
*/
public <T> T withViewExpansion(String catalog, String dbName, String tableName, Supplier<T> work) {
String qualifiedName = catalog + "." + dbName + "." + tableName;
if (!expansionStack.isEmpty()) {
String parent = expansionStack.peek();
viewDeps.computeIfAbsent(parent, k -> new ArrayList<>()).add(qualifiedName);
}
expansionStack.push(qualifiedName);
try {
return work.get();
} finally {
expansionStack.pop();
}
}

/**
* Records a base table (non-view) as a dependency of the view currently being expanded.
*/
public void recordBaseTableDependency(String catalog, String dbName, String tableName) {
String qualifiedName = catalog + "." + dbName + "." + tableName;
if (!expansionStack.isEmpty()) {
String parent = expansionStack.peek();
// Skip self-dependency: this occurs when convertView is called on a base table,
// which pushes the table name onto the stack and then resolves itself.
if (!qualifiedName.equals(parent)) {
viewDeps.computeIfAbsent(parent, k -> new ArrayList<>()).add(qualifiedName);
}
}
}

/**
* Returns the collected view dependency chain.
* Each entry represents a view and its immediate dependencies (both views and base tables).
*/
public List<ViewDependency> getViewDependencies() {
List<ViewDependency> result = new ArrayList<>();
for (Map.Entry<String, List<String>> entry : viewDeps.entrySet()) {
if (!entry.getValue().isEmpty()) {
result.add(new ViewDependency(entry.getKey(), entry.getValue()));
}
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2022 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2026 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand All @@ -18,6 +18,7 @@
import org.apache.calcite.util.Util;

import com.linkedin.coral.common.FuzzyUnionSqlRewriter;
import com.linkedin.coral.common.ViewDependencyTracker;


/**
Expand All @@ -44,8 +45,10 @@ public RelRoot expandView(RelDataType rowType, String queryString, List<String>
String dbName = Util.last(schemaPath);
String tableName = viewPath.get(0);

SqlNode sqlNode = hiveToRelConverter.processView(dbName, tableName)
.accept(new FuzzyUnionSqlRewriter(tableName, hiveToRelConverter));
return hiveToRelConverter.getSqlToRelConverter().convertQuery(sqlNode, true, true);
return ViewDependencyTracker.get().withViewExpansion(ViewDependencyTracker.HIVE_CATALOG, dbName, tableName, () -> {
SqlNode sqlNode = hiveToRelConverter.processView(dbName, tableName)
.accept(new FuzzyUnionSqlRewriter(tableName, hiveToRelConverter));
return hiveToRelConverter.getSqlToRelConverter().convertQuery(sqlNode, true, true);
});
}
}
38 changes: 34 additions & 4 deletions coral-spark/src/main/java/com/linkedin/coral/spark/CoralSpark.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.linkedin.coral.com.google.common.collect.ImmutableList;
import com.linkedin.coral.common.HiveMetastoreClient;
import com.linkedin.coral.common.ViewDependency;
import com.linkedin.coral.common.ViewDependencyTracker;
import com.linkedin.coral.spark.containers.SparkRelInfo;
import com.linkedin.coral.spark.containers.SparkUDFInfo;
import com.linkedin.coral.spark.dialect.SparkSqlDialect;
Expand All @@ -43,14 +45,16 @@
public class CoralSpark {

private final List<String> baseTables;
private final List<ViewDependency> viewDependencies;
private final List<SparkUDFInfo> sparkUDFInfoList;
private final HiveMetastoreClient hiveMetastoreClient;
private final SqlNode sqlNode;
private final String sparkSql;

private CoralSpark(List<String> baseTables, List<SparkUDFInfo> sparkUDFInfoList, String sparkSql,
HiveMetastoreClient hmsClient, SqlNode sqlNode) {
private CoralSpark(List<String> baseTables, List<ViewDependency> viewDependencies,
List<SparkUDFInfo> sparkUDFInfoList, String sparkSql, HiveMetastoreClient hmsClient, SqlNode sqlNode) {
this.baseTables = baseTables;
this.viewDependencies = viewDependencies;
this.sparkUDFInfoList = sparkUDFInfoList;
this.sparkSql = sparkSql;
this.hiveMetastoreClient = hmsClient;
Expand All @@ -74,13 +78,16 @@ private CoralSpark(List<String> baseTables, List<SparkUDFInfo> sparkUDFInfoList,
* @return [[CoralSpark]]
*/
public static CoralSpark create(RelNode irRelNode, HiveMetastoreClient hmsClient) {
// Capture view dependencies that were collected during convertView -> expandView chain
List<ViewDependency> viewDeps = ViewDependencyTracker.get().getViewDependencies();

SparkRelInfo sparkRelInfo = IRRelToSparkRelTransformer.transform(irRelNode);
Set<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfos();
RelNode sparkRelNode = sparkRelInfo.getSparkRelNode();
SqlNode sparkSqlNode = constructSparkSqlNode(sparkRelNode, sparkUDFInfos, hmsClient);
String sparkSQL = constructSparkSQL(sparkSqlNode);
List<String> baseTables = constructBaseTables(sparkRelNode);
return new CoralSpark(baseTables, ImmutableList.copyOf(sparkUDFInfos), sparkSQL, hmsClient, sparkSqlNode);
return new CoralSpark(baseTables, viewDeps, ImmutableList.copyOf(sparkUDFInfos), sparkSQL, hmsClient, sparkSqlNode);
}

/**
Expand All @@ -99,6 +106,9 @@ public static CoralSpark create(RelNode irRelNode, Schema schema, HiveMetastoreC
}

private static CoralSpark createWithAlias(RelNode irRelNode, List<String> aliases, HiveMetastoreClient hmsClient) {
// Capture view dependencies that were collected during convertView -> expandView chain
List<ViewDependency> viewDeps = ViewDependencyTracker.get().getViewDependencies();

SparkRelInfo sparkRelInfo = IRRelToSparkRelTransformer.transform(irRelNode);
Set<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfos();
RelNode sparkRelNode = sparkRelInfo.getSparkRelNode();
Expand All @@ -111,7 +121,7 @@ private static CoralSpark createWithAlias(RelNode irRelNode, List<String> aliase
}
String sparkSQL = constructSparkSQL(sparkSqlNode);
List<String> baseTables = constructBaseTables(sparkRelNode);
return new CoralSpark(baseTables, ImmutableList.copyOf(sparkUDFInfos), sparkSQL, hmsClient, sparkSqlNode);
return new CoralSpark(baseTables, viewDeps, ImmutableList.copyOf(sparkUDFInfos), sparkSQL, hmsClient, sparkSqlNode);
}

private static SqlNode constructSparkSqlNode(RelNode sparkRelNode, Set<SparkUDFInfo> sparkUDFInfos,
Expand Down Expand Up @@ -165,6 +175,26 @@ public List<String> getBaseTables() {
return baseTables;
}

/**
* Getter for the view dependency chain collected during view expansion.
* Each {@link ViewDependency} represents a view and its immediate dependencies
* (which can be other views or base tables) in "catalog.database_name.table_name" format
* (e.g. "hive.db.v1", "openhouse.db.t1").
*
* <p>For example, if view "hive.db.v1" depends on view "hive.db.v2" and table "openhouse.db.t1",
* and "hive.db.v2" depends on tables "hive.db.t3" and "openhouse.db.t4", this returns:
* <pre>
* [ViewDependency("hive.db.v1", ["hive.db.v2", "openhouse.db.t1"]),
* ViewDependency("hive.db.v2", ["hive.db.t3", "openhouse.db.t4"])]
* </pre>
*
* @return List of {@link ViewDependency} representing the view dependency chain,
* or an empty list if no views were involved.
*/
public List<ViewDependency> getViewDependencies() {
return viewDependencies;
}

/**
* Getter for Spark UDF information list:
* Additional information required to use an UDF (for details, read [[SparkUDFInfo]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.testng.annotations.Test;

import com.linkedin.coral.com.google.common.collect.ImmutableList;
import com.linkedin.coral.common.ViewDependency;
import com.linkedin.coral.common.ViewDependencyTracker;
import com.linkedin.coral.hive.hive2rel.functions.StaticHiveFunctionRegistry;
import com.linkedin.coral.spark.containers.SparkUDFInfo;
import com.linkedin.coral.spark.exceptions.UnsupportedUDFException;
Expand Down Expand Up @@ -74,6 +76,60 @@ public void testGetBaseTablesFromView() {
assertTrue(base_tables.contains("default.bar"));
}

@Test
public void testGetViewDependenciesFromNestedView() {
// foo_bar_view depends on foo_view (a view) and bar (a table)
// foo_view depends on foo (a table)
// Expected chain:
// hive.default.foo_bar_view -> [hive.default.foo_view, hive.default.bar]
// hive.default.foo_view -> [hive.default.foo]
ViewDependencyTracker.reset();
RelNode relNode = TestUtils.toRelNode("default", "foo_bar_view");
CoralSpark coralSpark = createCoralSpark(relNode);
List<ViewDependency> viewDeps = coralSpark.getViewDependencies();

assertFalse(viewDeps.isEmpty(), "Expected view dependencies for nested view");

ViewDependency fooBarViewDep =
viewDeps.stream().filter(vd -> vd.getViewName().equals("hive.default.foo_bar_view")).findFirst().orElse(null);
assertNotNull(fooBarViewDep, "Expected dependency entry for foo_bar_view");
assertTrue(fooBarViewDep.getDependencies().contains("hive.default.foo_view"), "foo_bar_view should depend on foo_view");
assertTrue(fooBarViewDep.getDependencies().contains("hive.default.bar"), "foo_bar_view should depend on bar");

ViewDependency fooViewDep =
viewDeps.stream().filter(vd -> vd.getViewName().equals("hive.default.foo_view")).findFirst().orElse(null);
assertNotNull(fooViewDep, "Expected dependency entry for foo_view");
assertTrue(fooViewDep.getDependencies().contains("hive.default.foo"), "foo_view should depend on foo");
}

@Test
public void testGetViewDependenciesFromSimpleView() {
// foo_view depends only on foo (a base table)
// Expected: single entry hive.default.foo_view -> [hive.default.foo]
ViewDependencyTracker.reset();
RelNode relNode = TestUtils.toRelNode("default", "foo_view");
CoralSpark coralSpark = createCoralSpark(relNode);
List<ViewDependency> viewDeps = coralSpark.getViewDependencies();

assertFalse(viewDeps.isEmpty(), "Expected view dependencies for simple view");
assertEquals(viewDeps.size(), 1, "Expected exactly one view dependency entry");

ViewDependency dep = viewDeps.get(0);
assertEquals(dep.getViewName(), "hive.default.foo_view");
assertTrue(dep.getDependencies().contains("hive.default.foo"), "foo_view should depend on foo");
}

@Test
public void testGetViewDependenciesFromBaseTable() {
// A base table should have no view dependencies
ViewDependencyTracker.reset();
RelNode relNode = TestUtils.toRelNode("default", "foo");
CoralSpark coralSpark = createCoralSpark(relNode);
List<ViewDependency> viewDeps = coralSpark.getViewDependencies();

assertTrue(viewDeps.isEmpty(), "Expected no view dependencies for a base table");
}

@Test
public void testQuotingKeywords() {
RelNode relNode = TestUtils.toRelNode("default", "baz_view");
Expand Down