Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/aerospike.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,15 +289,18 @@ def construct_schema_aerospike(
if sample_size:
query.max_records = sample_size
query.records_per_second = records_per_second

policy: Dict[str, Any] = {"max_retries": 0}
Comment thread
rajatoss marked this conversation as resolved.
if socket_timeout_ms is not None:
query.socket_timeout = socket_timeout_ms # type: ignore[attr-defined]
policy["socket_timeout"] = socket_timeout_ms
policy["total_timeout"] = socket_timeout_ms

try:
res = query.results()
res = query.results(policy)
records = [{**record[2], "PK": record[0][2]} for record in res]
except Exception as e:
logger.error(f"Error querying Aerospike set: {e}")
records = []
except Exception:
logger.exception("Error querying Aerospike set %s.%s", as_set.ns, as_set.set)
raise
return construct_schema(records, delimiter)


Expand Down Expand Up @@ -559,7 +562,9 @@ def _infer_schema_metadata(
)
)

set_schema = self._limit_schema_size(set_full_schema, custom_properties)
set_schema = self._limit_schema_size(
set_full_schema, custom_properties, dataset_name
)

set_fields: Union[List[SchemaDescription], ValuesView[SchemaDescription]] = (
set_schema.values()
Expand Down Expand Up @@ -593,6 +598,7 @@ def _limit_schema_size(
self,
schema: Dict[Tuple[str, ...], SchemaDescription],
custom_properties: Dict[str, str],
dataset_name: str,
) -> Dict[Tuple[str, ...], SchemaDescription]:
"""
Limits the size of the schema to the max_schema_size and infer_schema_depth.
Expand All @@ -607,13 +613,13 @@ def _limit_schema_size(
}
if len(truncated_schema) < len(schema):
logger.debug(
f"Truncated schema from {len(schema)} to {len(truncated_schema)}"
f"Truncated schema for {dataset_name} from {len(schema)} to {len(truncated_schema)}"
)
schema_depth = max([len(k) for k in schema])
self.report.report_warning(
title="Schema depth limit reached",
message="Truncating the collection schema because it has too many nested levels.",
context=f"Schema Depth: {len(schema)}, Configured threshold: {self.config.infer_schema_depth}",
context=f"{dataset_name}: Schema Depth: {len(schema)}, Configured threshold: {self.config.infer_schema_depth}",
)
custom_properties["schema.truncated"] = "True"
custom_properties["schema.totalDepth"] = f"{schema_depth}"
Expand All @@ -625,7 +631,7 @@ def _limit_schema_size(
self.report.report_warning(
title="Too many schema fields",
message="Downsampling the collection schema because it has too many schema fields.",
context=f"Schema Size: {schema_size}, Configured threshold: {max_schema_size}",
context=f"{dataset_name}: Schema Size: {schema_size}, Configured threshold: {max_schema_size}",
)
custom_properties["schema.downsampled"] = "True"
custom_properties["schema.totalFields"] = f"{schema_size}"
Expand All @@ -650,6 +656,9 @@ def xdr_sets(self, namespace: str, sets: List[str]) -> Dict[str, List[str]]:
.split(",")
)
except Exception as e:
logger.exception(
"XDR cluster-wide query failed for namespace %s", namespace
)
self.report.warning(
message="Failed to retrieve XDR config from Aerospike",
context=namespace,
Expand All @@ -669,6 +678,7 @@ def xdr_sets(self, namespace: str, sets: List[str]) -> Dict[str, List[str]]:
.split("\n")[0]
)
except Exception as e:
logger.exception("XDR per-DC query failed for %s/%s", namespace, dc)
self.report.warning(
message="Failed to retrieve XDR config for DC",
context=f"{namespace}/{dc}",
Expand Down
Loading
Loading