diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt index 93915358..d106a9c8 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt @@ -27,7 +27,8 @@ class Finding( * Keeps the track of the workflow-monitor exact execution. * Used for filtering the data when chaining monitors in a workflow. */ - val executionId: String? = null + val executionId: String? = null, + val additionalFields: Map? = mapOf() ) : Writeable, ToXContent { constructor( @@ -46,7 +47,8 @@ class Finding( index = index, docLevelQueries = docLevelQueries, timestamp = timestamp, - executionId = null + executionId = null, + additionalFields = null ) @Throws(IOException::class) @@ -59,7 +61,8 @@ class Finding( index = sin.readString(), docLevelQueries = sin.readList((DocLevelQuery)::readFrom), timestamp = sin.readInstant(), - executionId = sin.readOptionalString() + executionId = sin.readOptionalString(), + additionalFields = sin.readMap() ) fun asTemplateArg(): Map { @@ -87,6 +90,7 @@ class Finding( .field(QUERIES_FIELD, docLevelQueries.toTypedArray()) .field(TIMESTAMP_FIELD, timestamp.toEpochMilli()) .field(EXECUTION_ID_FIELD, executionId) + .field(ADDITIONAL_FIELDS_FIELD, additionalFields) builder.endObject() return builder } @@ -102,6 +106,7 @@ class Finding( out.writeCollection(docLevelQueries) out.writeInstant(timestamp) out.writeOptionalString(executionId) + out.writeMap(additionalFields) } companion object { @@ -115,6 +120,7 @@ class Finding( const val TIMESTAMP_FIELD = "timestamp" const val EXECUTION_ID_FIELD = "execution_id" const val NO_ID = "" + const val ADDITIONAL_FIELDS_FIELD = "additional_fields" @JvmStatic @Throws(IOException::class) @@ -128,6 +134,7 @@ class Finding( val queries: MutableList = mutableListOf() lateinit var timestamp: Instant var executionId: String? = null + var additionalFields: Map? = null ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -161,6 +168,7 @@ class Finding( timestamp = requireNotNull(xcp.instant()) } EXECUTION_ID_FIELD -> executionId = xcp.textOrNull() + ADDITIONAL_FIELDS_FIELD -> additionalFields = xcp.map() } } @@ -173,7 +181,8 @@ class Finding( index = index, docLevelQueries = queries, timestamp = timestamp, - executionId = executionId + executionId = executionId, + additionalFields = additionalFields ) } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt index 18fdde5c..a0db5fe4 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt @@ -45,7 +45,8 @@ data class Monitor( val dataSources: DataSources = DataSources(), val deleteQueryIndexInEveryRun: Boolean? = false, val shouldCreateSingleAlertForFindings: Boolean? = false, - val owner: String? = "alerting" + val owner: String? = "alerting", + val metadataForFindings: List? = listOf() ) : ScheduledJob { override val type = MONITOR_TYPE @@ -123,7 +124,8 @@ data class Monitor( } else { false }, - owner = sin.readOptionalString() + owner = sin.readOptionalString(), + metadataForFindings = sin.readOptionalStringList() ) // This enum classifies different Monitors @@ -185,6 +187,7 @@ data class Monitor( builder.field(DELETE_QUERY_INDEX_IN_EVERY_RUN_FIELD, deleteQueryIndexInEveryRun) builder.field(SHOULD_CREATE_SINGLE_ALERT_FOR_FINDINGS_FIELD, shouldCreateSingleAlertForFindings) builder.field(OWNER_FIELD, owner) + builder.field(METADATA_FOR_FINDINGS_FIELD, metadataForFindings) if (params.paramAsBoolean("with_type", false)) builder.endObject() return builder.endObject() } @@ -242,6 +245,7 @@ data class Monitor( out.writeOptionalBoolean(shouldCreateSingleAlertForFindings) } out.writeOptionalString(owner) + out.writeOptionalStringCollection(metadataForFindings) } companion object { @@ -264,6 +268,7 @@ data class Monitor( const val DELETE_QUERY_INDEX_IN_EVERY_RUN_FIELD = "delete_query_index_in_every_run" const val SHOULD_CREATE_SINGLE_ALERT_FOR_FINDINGS_FIELD = "should_create_single_alert_for_findings" const val OWNER_FIELD = "owner" + const val METADATA_FOR_FINDINGS_FIELD = "metadata_for_findings" val MONITOR_TYPE_PATTERN = Pattern.compile("[a-zA-Z0-9_]{5,25}") // This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all @@ -294,6 +299,7 @@ data class Monitor( var deleteQueryIndexInEveryRun = false var delegateMonitor = false var owner = "alerting" + var metadataForFindings: MutableList = mutableListOf() XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -357,6 +363,17 @@ data class Monitor( xcp.booleanValue() } OWNER_FIELD -> owner = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) owner else xcp.text() + METADATA_FOR_FINDINGS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + metadataForFindings.add(xcp.text()) + } + } else -> { xcp.skipChildren() } @@ -385,7 +402,8 @@ data class Monitor( dataSources, deleteQueryIndexInEveryRun, delegateMonitor, - owner + owner, + metadataForFindings ) } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/GetFindingsResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/GetFindingsResponseTests.kt index 0c30b640..3b7a65de 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/GetFindingsResponseTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/GetFindingsResponseTests.kt @@ -24,13 +24,20 @@ internal class GetFindingsResponseTests { "monitor_name1", "test_index1", listOf(DocLevelQuery("1", "myQuery", listOf(), "fieldA:valABC", listOf())), - Instant.now() + Instant.now(), + additionalFields = mapOf(Pair("field1", 1), Pair("field2", "value")) ) val findingDocument1 = FindingDocument("test_index1", "doc1", true, "document 1 payload") val findingDocument2 = FindingDocument("test_index1", "doc2", true, "document 2 payload") val findingDocument3 = FindingDocument("test_index1", "doc3", true, "document 3 payload") + val findingDocument4 = FindingDocument( + "test_index1", + "doc4", + true, + "document 4 payload" + ) - val findingWithDocs1 = FindingWithDocs(finding1, listOf(findingDocument1, findingDocument2, findingDocument3)) + val findingWithDocs1 = FindingWithDocs(finding1, listOf(findingDocument1, findingDocument2, findingDocument3, findingDocument4)) // Alerting GetFindingsResponse mock #2 @@ -43,12 +50,19 @@ internal class GetFindingsResponseTests { "monitor_name2", "test_index2", listOf(DocLevelQuery("1", "myQuery", listOf(), "fieldA:valABC", listOf())), - Instant.now() + Instant.now(), + additionalFields = mapOf(Pair("field1", 1), Pair("field2", "value")) ) val findingDocument21 = FindingDocument("test_index2", "doc21", true, "document 21 payload") val findingDocument22 = FindingDocument("test_index2", "doc22", true, "document 22 payload") + val findingDocument24 = FindingDocument( + "test_index2", + "doc22", + true, + "document 22 payload" + ) - val findingWithDocs2 = FindingWithDocs(finding2, listOf(findingDocument21, findingDocument22)) + val findingWithDocs2 = FindingWithDocs(finding2, listOf(findingDocument21, findingDocument22, findingDocument24)) val req = GetFindingsResponse(RestStatus.OK, 2, listOf(findingWithDocs1, findingWithDocs2)) Assertions.assertNotNull(req) diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt index 6aecb888..8cd50aa7 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt @@ -18,6 +18,7 @@ import org.opensearch.commons.alerting.randomBucketLevelTrigger import org.opensearch.commons.alerting.randomBucketLevelTriggerRunResult import org.opensearch.commons.alerting.randomChainedAlertTrigger import org.opensearch.commons.alerting.randomDocLevelQuery +import org.opensearch.commons.alerting.randomDocumentLevelMonitor import org.opensearch.commons.alerting.randomDocumentLevelMonitorRunResult import org.opensearch.commons.alerting.randomDocumentLevelTrigger import org.opensearch.commons.alerting.randomInputRunResults @@ -113,6 +114,20 @@ class WriteableTests { Assertions.assertEquals(newWorkflow, workflow, "Round tripping Workflow failed") } + @Test + fun `test query-level monitor with metadata for findings as stream`() { + val monitor = randomDocumentLevelMonitor().copy( + inputs = listOf(DocLevelMonitorInput(indices = listOf(""), queries = emptyList())), + triggers = emptyList(), + metadataForFindings = listOf("field1", "field2") + ) + val out = BytesStreamOutput() + monitor.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newMonitor = Monitor(sin) + Assertions.assertEquals(monitor, newMonitor, "Round tripping QueryLevelMonitor doesn't work") + } + @Test fun `test query-level trigger as stream`() { val trigger = randomQueryLevelTrigger() diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt index 252be78f..b47e688b 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt @@ -23,6 +23,7 @@ import org.opensearch.commons.alerting.randomAlert import org.opensearch.commons.alerting.randomBucketLevelMonitor import org.opensearch.commons.alerting.randomBucketLevelTrigger import org.opensearch.commons.alerting.randomDocLevelQuery +import org.opensearch.commons.alerting.randomDocumentLevelMonitor import org.opensearch.commons.alerting.randomQueryLevelMonitor import org.opensearch.commons.alerting.randomQueryLevelMonitorWithoutUser import org.opensearch.commons.alerting.randomQueryLevelTrigger @@ -127,6 +128,7 @@ class XContentTests { } } + @Test fun `test query-level monitor parsing`() { val monitor = randomQueryLevelMonitor() @@ -135,6 +137,19 @@ class XContentTests { assertEquals("Round tripping QueryLevelMonitor doesn't work", monitor, parsedMonitor) } + @Test + fun `test doc-level monitor parsing`() { + val monitor = randomDocumentLevelMonitor().copy( + inputs = listOf(DocLevelMonitorInput(indices = listOf(""), queries = emptyList())), + triggers = emptyList(), + metadataForFindings = listOf("field1", "field2") + ) + + val monitorString = monitor.toJsonStringWithUser() + val parsedMonitor = Monitor.parse(parser(monitorString)) + assertEquals("Round tripping QueryLevelMonitor doesn't work", monitor, parsedMonitor) + } + @Test fun `test monitor parsing with no name`() { val monitorStringWithoutName = """