> getTableReplication(GetTableResponseBody response) {
// At least one replication config must be present
if (response == null
|| response.getPolicies() == null
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTask.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTask.java
index f775cfd0d..155e0b18b 100644
--- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTask.java
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTask.java
@@ -24,6 +24,9 @@
/**
* A callable class to apply an operation to some entity (table/database) by running a Spark job.
* Takes care of the job lifecycle using /jobs API.
+ *
+ * NOTE: Every implementation must implement a static {@code OPERATION_TYPE} field in order for
+ * the job scheduler to load the OperationTask.
*/
@Slf4j
@Getter
@@ -270,6 +273,20 @@ private void reportJobState(
AppConstants.JOB_DURATION,
System.currentTimeMillis() - startTime,
attributes);
+
+ // Granular attributes to publish entity level job metrics
+ Attributes granularAttributes =
+ Attributes.of(
+ AttributeKey.stringKey(AppConstants.ENTITY_NAME),
+ metadata.getEntityName(),
+ AttributeKey.stringKey(AppConstants.ENTITY_TYPE),
+ metadata.getClass().getSimpleName().replace("Metadata", ""),
+ AttributeKey.stringKey(AppConstants.JOB_TYPE),
+ getType().getValue(),
+ AttributeKey.stringKey(AppConstants.JOB_STATE),
+ state.name());
+
+ otelEmitter.count(METRICS_SCOPE, "maintenance_job_completed", 1, granularAttributes);
}
protected abstract boolean launchJob();
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java
index fe0290b36..37e3df399 100644
--- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java
@@ -9,12 +9,15 @@
import com.linkedin.openhouse.jobs.client.TablesClient;
import com.linkedin.openhouse.jobs.client.model.JobConf;
import com.linkedin.openhouse.jobs.scheduler.JobsScheduler;
+import com.linkedin.openhouse.jobs.util.AppConstants;
import com.linkedin.openhouse.jobs.util.DataLayoutUtil;
import com.linkedin.openhouse.jobs.util.DatabaseMetadata;
import com.linkedin.openhouse.jobs.util.DirectoryMetadata;
import com.linkedin.openhouse.jobs.util.Metadata;
import com.linkedin.openhouse.jobs.util.TableDataLayoutMetadata;
import com.linkedin.openhouse.jobs.util.TableMetadata;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -168,6 +171,15 @@ private List> processMetadataList(
if (optionalOperationTask.isPresent()) {
taskList.add(optionalOperationTask.get());
}
+
+ // Publish entity metrics for triggered tasks
+ Attributes taskAttributes =
+ Attributes.of(
+ AttributeKey.stringKey(AppConstants.ENTITY_NAME), metadata.getEntityName(),
+ AttributeKey.stringKey(AppConstants.ENTITY_TYPE),
+ metadata.getClass().getSimpleName().replace("Metadata", ""),
+ AttributeKey.stringKey(AppConstants.JOB_TYPE), jobType.getValue());
+ otelEmitter.count(METRICS_SCOPE, "maintenance_job_triggered", 1, taskAttributes);
}
return taskList;
}
@@ -183,6 +195,15 @@ public Optional> processMetadata(
task.setOtelEmitter(otelEmitter);
if (!task.shouldRun()) {
log.info("Skipping task {}", task);
+
+ // Publish entity metrics for skipped tasks
+ Attributes taskAttributes =
+ Attributes.of(
+ AttributeKey.stringKey(AppConstants.ENTITY_NAME), metadata.getEntityName(),
+ AttributeKey.stringKey(AppConstants.ENTITY_TYPE),
+ metadata.getClass().getSimpleName().replace("Metadata", ""),
+ AttributeKey.stringKey(AppConstants.JOB_TYPE), task.getType().getValue());
+ otelEmitter.count(METRICS_SCOPE, "maintenance_job_skipped", 1, taskAttributes);
return Optional.empty();
} else {
if (OperationMode.SUBMIT.equals(operationMode)) {
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BaseApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BaseApp.java
index d6b315926..2c4a8d8d7 100644
--- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BaseApp.java
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BaseApp.java
@@ -65,20 +65,30 @@ protected static CommandLine createCommandLine(String[] args, List