diff --git a/hypertrace-ingester/build.gradle.kts b/hypertrace-ingester/build.gradle.kts index 4cd0feebc..b154f5a2e 100644 --- a/hypertrace-ingester/build.gradle.kts +++ b/hypertrace-ingester/build.gradle.kts @@ -26,8 +26,8 @@ hypertraceDocker { dependencies { implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") implementation("org.hypertrace.core.datamodel:data-model:0.1.18") implementation("org.hypertrace.core.viewgenerator:view-generator-framework:0.3.1") implementation("com.typesafe:config:1.4.1") @@ -40,6 +40,9 @@ dependencies { implementation(project(":raw-spans-grouper:raw-spans-grouper")) implementation(project(":hypertrace-trace-enricher:hypertrace-trace-enricher")) implementation(project(":hypertrace-view-generator:hypertrace-view-generator")) + implementation(project(":hypertrace-metrics-generator:hypertrace-metrics-generator")) + implementation(project(":hypertrace-metrics-processor:hypertrace-metrics-processor")) + implementation(project(":hypertrace-metrics-exporter:hypertrace-metrics-exporter")) testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") testImplementation("org.mockito:mockito-core:3.8.0") @@ -64,7 +67,10 @@ tasks.register("copyServiceConfigs") { createCopySpec("span-normalizer", "span-normalizer", "main", "common"), createCopySpec("raw-spans-grouper", "raw-spans-grouper", "main", "common"), createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "main", "common"), - createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "main", "common") + createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "main", "common"), + createCopySpec("hypertrace-metrics-generator", "hypertrace-metrics-generator", "main", "common"), + createCopySpec("hypertrace-metrics-processor", "hypertrace-metrics-processor", "main", "common"), + createCopySpec("hypertrace-metrics-exporter", "hypertrace-metrics-exporter", "main", "common") ).into("./build/resources/main/configs/") } @@ -101,10 +107,13 @@ tasks.test { tasks.register("copyServiceConfigsTest") { with( - createCopySpec("span-normalizer", "span-normalizer", "test", "span-normalizer"), - createCopySpec("raw-spans-grouper", "raw-spans-grouper", "test", "raw-spans-grouper"), - createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "test", "hypertrace-trace-enricher"), - createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "test", "hypertrace-view-generator") + createCopySpec("span-normalizer", "span-normalizer", "test", "span-normalizer"), + createCopySpec("raw-spans-grouper", "raw-spans-grouper", "test", "raw-spans-grouper"), + createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "test", "hypertrace-trace-enricher"), + createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "test", "hypertrace-view-generator"), + createCopySpec("hypertrace-metrics-generator", "hypertrace-metrics-generator", "test", "hypertrace-metrics-generator"), + createCopySpec("hypertrace-metrics-processor", "hypertrace-metrics-processor", "test", "hypertrace-metrics-processor"), + createCopySpec("hypertrace-metrics-exporter", "hypertrace-metrics-exporter", "test", "hypertrace-metrics-exporter") ).into("./build/resources/test/configs/") } diff --git a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java index 4ab59dedf..a5de7f1e1 100644 --- a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java +++ b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java @@ -17,6 +17,9 @@ import org.hypertrace.core.serviceframework.config.ConfigUtils; import org.hypertrace.core.spannormalizer.SpanNormalizer; import org.hypertrace.core.viewgenerator.service.MultiViewGeneratorLauncher; +import org.hypertrace.metrics.exporter.MetricsExporterEntryService; +import org.hypertrace.metrics.generator.MetricsGenerator; +import org.hypertrace.metrics.processor.MetricsProcessor; import org.hypertrace.traceenricher.trace.enricher.TraceEnricher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,9 +32,34 @@ public class HypertraceIngester extends KafkaStreamsApp { private static final String HYPERTRACE_INGESTER_JOB_CONFIG = "hypertrace-ingester-job-config"; private Map> jobNameToSubTopology = new HashMap<>(); + private MetricsExporterEntryService metricsExporter; + private Thread metricsExporterThread; public HypertraceIngester(ConfigClient configClient) { super(configClient); + metricsExporter = + new MetricsExporterEntryService( + configClient, getSubJobConfig("hypertrace-metrics-exporter")); + } + + @Override + protected void doInit() { + super.doInit(); + metricsExporter.doInit(); + } + + @Override + protected void doStart() { + super.doStart(); + metricsExporterThread = new Thread(() -> metricsExporter.doStart()); + metricsExporterThread.start(); + } + + @Override + protected void doStop() { + super.doStop(); + metricsExporter.doStop(); + metricsExporterThread.stop(); } private KafkaStreamsApp getSubTopologyInstance(String name) { @@ -49,6 +77,12 @@ private KafkaStreamsApp getSubTopologyInstance(String name) { case "all-views": kafkaStreamsApp = new MultiViewGeneratorLauncher(ConfigClientFactory.getClient()); break; + case "hypertrace-metrics-generator": + kafkaStreamsApp = new MetricsGenerator(ConfigClientFactory.getClient()); + break; + case "hypertrace-metrics-processor": + kafkaStreamsApp = new MetricsProcessor(ConfigClientFactory.getClient()); + break; default: throw new RuntimeException(String.format("Invalid configured sub-topology : [%s]", name)); } diff --git a/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf b/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf index 5c9946df6..093475361 100644 --- a/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf +++ b/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf @@ -3,7 +3,14 @@ main.class = org.hypertrace.ingester.HypertraceIngester service.name = hypertrace-ingester service.admin.port = 8099 -sub.topology.names = ["span-normalizer", "raw-spans-grouper", "hypertrace-trace-enricher", "all-views"] +sub.topology.names = [ + "span-normalizer", + "raw-spans-grouper", + "hypertrace-trace-enricher", + "all-views", + "hypertrace-metrics-generator", + "hypertrace-metrics-processor" +] precreate.topics = false precreate.topics = ${?PRE_CREATE_TOPICS} diff --git a/hypertrace-ingester/src/main/resources/log4j2.properties b/hypertrace-ingester/src/main/resources/log4j2.properties new file mode 100644 index 000000000..bdcf9b332 --- /dev/null +++ b/hypertrace-ingester/src/main/resources/log4j2.properties @@ -0,0 +1,29 @@ +status = error +name = PropertiesConfig + +appender.console.type = Console +appender.console.name = STDOUT +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n + +appender.rolling.type = RollingFile +appender.rolling.name = ROLLING_FILE +appender.rolling.fileName = ${sys:service.name:-hypertrace-ingester}.log +appender.rolling.filePattern = ${sys:service.name:-hypertrace-ingester}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz +appender.rolling.layout.type = PatternLayout +appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.policies.type = Policies +appender.rolling.policies.time.type = TimeBasedTriggeringPolicy +appender.rolling.policies.time.interval = 3600 +appender.rolling.policies.time.modulate = true +appender.rolling.policies.size.type = SizeBasedTriggeringPolicy +appender.rolling.policies.size.size = 20MB +appender.rolling.strategy.type = DefaultRolloverStrategy +appender.rolling.strategy.max = 5 + +rootLogger.level = INFO +rootLogger.appenderRef.stdout.ref = STDOUT +rootLogger.appenderRef.rolling.ref = ROLLING_FILE + + + diff --git a/hypertrace-metrics-exporter/build.gradle.kts b/hypertrace-metrics-exporter/build.gradle.kts new file mode 100644 index 000000000..612342977 --- /dev/null +++ b/hypertrace-metrics-exporter/build.gradle.kts @@ -0,0 +1,3 @@ +subprojects { + group = "org.hypertrace.metrics.exporter" +} \ No newline at end of file diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts new file mode 100644 index 000000000..a784fd8f0 --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts @@ -0,0 +1,57 @@ +plugins { + java + application + jacoco + id("org.hypertrace.docker-java-application-plugin") + id("org.hypertrace.docker-publish-plugin") + id("org.hypertrace.jacoco-report-plugin") +} + +application { + mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher") +} + +hypertraceDocker { + defaultImage { + javaApplication { + serviceName.set("${project.name}") + adminPort.set(8099) + } + } +} + +tasks.test { + useJUnitPlatform() +} + +dependencies { + // common and framework + implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") + + // open telemetry + implementation("io.opentelemetry:opentelemetry-api:1.4.1") + implementation("io.opentelemetry:opentelemetry-api-metrics:1.4.1-alpha") + implementation("io.opentelemetry:opentelemetry-sdk:1.4.1") + implementation("io.opentelemetry:opentelemetry-exporter-otlp-common:1.4.1") + implementation("io.opentelemetry:opentelemetry-sdk-metrics:1.4.1-alpah") + implementation("io.opentelemetry:opentelemetry-exporter-otlp-metrics:1.4.1-alpha") + implementation("io.opentelemetry:opentelemetry-exporter-prometheus:1.4.1-alpha") + + // jetty server + implementation("org.eclipse.jetty:jetty-server:9.4.42.v20210604") + implementation("org.eclipse.jetty:jetty-servlet:9.4.42.v20210604") + + // prometheus metrics servelet + implementation("io.prometheus:simpleclient_servlet:0.6.0") + + // kafka + implementation("org.apache.kafka:kafka-clients:2.6.0") + + // test + testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") + testImplementation("org.mockito:mockito-core:3.8.0") + testImplementation("com.google.code.gson:gson:2.8.7") +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/InMemoryMetricsProducer.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/InMemoryMetricsProducer.java new file mode 100644 index 000000000..57e19142d --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/InMemoryMetricsProducer.java @@ -0,0 +1,58 @@ +package org.hypertrace.metrics.exporter; + +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.MetricProducer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InMemoryMetricsProducer implements MetricProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryMetricsProducer.class); + private BlockingQueue metricDataQueue; + private final AtomicBoolean commitOffset = new AtomicBoolean(false); + + public InMemoryMetricsProducer(int maxQueueSize) { + this.metricDataQueue = new ArrayBlockingQueue(maxQueueSize); + } + + public void addMetricData(List resourceMetrics) { + try { + for (ResourceMetrics rm : resourceMetrics) { + List metricData = OtlpToObjectConverter.toMetricData(rm); + for (MetricData md : metricData) { + this.metricDataQueue.put(md); + } + } + } catch (InterruptedException exception) { + LOGGER.info("This thread was intruppted, so we might loose copying some metrics "); + } + } + + public Collection collectAllMetrics() { + List metricDataList = new ArrayList<>(); + int max = 0; + while (max < 100 && this.metricDataQueue.peek() != null) { + metricDataList.add(this.metricDataQueue.poll()); + max++; + } + return metricDataList; + } + + public void setCommitOffset() { + commitOffset.set(true); + } + + public void clearCommitOffset() { + commitOffset.set(false); + } + + public boolean isCommitOffset() { + return commitOffset.get(); + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsConsumer.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsConsumer.java new file mode 100644 index 000000000..c0eb194ed --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsConsumer.java @@ -0,0 +1,123 @@ +package org.hypertrace.metrics.exporter; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.typesafe.config.Config; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsConsumer implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsConsumer.class); + private static final int CONSUMER_POLL_TIMEOUT_MS = 100; + + private static final String KAFKA_CONFIG_KEY = "kafka.config"; + private static final String INPUT_TOPIC_KEY = "input.topic"; + + private final KafkaConsumer consumer; + private final InMemoryMetricsProducer inMemoryMetricsProducer; + private final AtomicBoolean running = new AtomicBoolean(false); + + public MetricsConsumer(Config config, InMemoryMetricsProducer inMemoryMetricsProducer) { + Properties props = new Properties(); + props.putAll( + mergeProperties(getBaseProperties(), getFlatMapConfig(config.getConfig(KAFKA_CONFIG_KEY)))); + consumer = new KafkaConsumer(props); + consumer.subscribe(Collections.singletonList(config.getString(INPUT_TOPIC_KEY))); + this.inMemoryMetricsProducer = inMemoryMetricsProducer; + } + + public void run() { + running.set(true); + while (running.get()) { + // check if any message to commit + if (inMemoryMetricsProducer.isCommitOffset()) { + // consumer.commitSync(); + inMemoryMetricsProducer.clearCommitOffset(); + } + + // read new data + List resourceMetrics = consume(); + if (!resourceMetrics.isEmpty()) { + inMemoryMetricsProducer.addMetricData(resourceMetrics); + } + waitForSec((long) (1000L * 0.1)); + } + } + + public void stop() { + running.set(false); + } + + public List consume() { + List resourceMetrics = new ArrayList<>(); + + ConsumerRecords records = + consumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT_MS)); + records.forEach( + record -> { + try { + resourceMetrics.add(ResourceMetrics.parseFrom(record.value())); + } catch (InvalidProtocolBufferException e) { + LOGGER.error("Invalid record with exception", e); + } + }); + + return resourceMetrics; + } + + public void close() { + consumer.close(); + } + + private Map getBaseProperties() { + Map baseProperties = new HashMap<>(); + baseProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "hypertrace-metrics-exporter"); + baseProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + baseProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + baseProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); + baseProperties.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + baseProperties.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + return baseProperties; + } + + private Map getFlatMapConfig(Config config) { + Map propertiesMap = new HashMap(); + config.entrySet().stream() + .forEach( + (entry) -> { + propertiesMap.put((String) entry.getKey(), config.getString((String) entry.getKey())); + }); + return propertiesMap; + } + + private Map mergeProperties( + Map baseProps, Map props) { + Objects.requireNonNull(baseProps); + props.forEach(baseProps::put); + return baseProps; + } + + private void waitForSec(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + LOGGER.debug("waiting for pushing next records were intruppted"); + } + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterEntryService.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterEntryService.java new file mode 100644 index 000000000..70e067acc --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterEntryService.java @@ -0,0 +1,83 @@ +package org.hypertrace.metrics.exporter; + +import com.typesafe.config.Config; +import org.hypertrace.core.serviceframework.PlatformService; +import org.hypertrace.core.serviceframework.config.ConfigClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsExporterEntryService extends PlatformService { + + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporterEntryService.class); + private static final String OTLP_CONFIG_KEY = "otlp.collector.config"; + private static final String PULL_EXPORTER_COFING_KEY = "pull.exporter"; + + private MetricsConsumer metricsConsumer; + private OtlpGrpcExporter otlpGrpcExporter; + private Config config; + private InMemoryMetricsProducer inMemoryMetricsProducer; + private Boolean isPullExporter; + private MetricsServer metricsServer; + + public MetricsExporterEntryService(ConfigClient configClient, Config config) { + super(configClient); + this.config = config; + } + + @Override + public void doInit() { + config = (config != null) ? config : getAppConfig(); + inMemoryMetricsProducer = new InMemoryMetricsProducer(5000); + metricsConsumer = new MetricsConsumer(config, inMemoryMetricsProducer); + isPullExporter = config.getBoolean(PULL_EXPORTER_COFING_KEY); + if (!isPullExporter) { + otlpGrpcExporter = new OtlpGrpcExporter(config.getConfig(OTLP_CONFIG_KEY)); + } else { + metricsServer = new MetricsServer(config, inMemoryMetricsProducer); + } + } + + @Override + public void doStart() { + + Thread metricsConsumerThread = new Thread(metricsConsumer); + Thread metricsExporterThread = null; + if (isPullExporter) { + metricsExporterThread = new Thread(() -> metricsServer.start()); + } else { + metricsConsumerThread = new Thread(otlpGrpcExporter); + } + + metricsExporterThread.start(); + metricsConsumerThread.start(); + + try { + metricsExporterThread.join(); + } catch (InterruptedException exception) { + exception.printStackTrace(); + } + + // stop consuming if metric server thread has stopped + metricsConsumer.stop(); + try { + metricsConsumerThread.join(); + } catch (InterruptedException exception) { + exception.printStackTrace(); + } + } + + @Override + public void doStop() { + metricsConsumer.close(); + if (!isPullExporter) { + otlpGrpcExporter.close(); + } else { + metricsServer.stop(); + } + } + + @Override + public boolean healthCheck() { + return true; + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterServlet.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterServlet.java new file mode 100644 index 000000000..a273f8c18 --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterServlet.java @@ -0,0 +1,36 @@ +package org.hypertrace.metrics.exporter; + +import io.opentelemetry.exporter.prometheus.PrometheusCollector; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.MetricsServlet; +import java.io.IOException; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +public class MetricsExporterServlet extends MetricsServlet { + private PrometheusCollector prometheusCollector; + private static final CollectorRegistry collectorRegistry = new CollectorRegistry(false); + private InMemoryMetricsProducer inMemoryMetricsProducer; + + public MetricsExporterServlet(InMemoryMetricsProducer producer) { + super(collectorRegistry); + prometheusCollector = PrometheusCollector.builder().setMetricProducer(producer).build(); + collectorRegistry.register(prometheusCollector); + inMemoryMetricsProducer = producer; + } + + @Override + protected void doGet(final HttpServletRequest req, final HttpServletResponse resp) + throws ServletException, IOException { + try { + // List samples = prometheusCollector.collect(); + super.doGet(req, resp); + inMemoryMetricsProducer.setCommitOffset(); + } catch (ServletException e) { + throw e; + } catch (IOException e) { + throw e; + } + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsServer.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsServer.java new file mode 100644 index 000000000..d0147229b --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsServer.java @@ -0,0 +1,43 @@ +package org.hypertrace.metrics.exporter; + +import com.typesafe.config.Config; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsServer { + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsServer.class); + + private Server server; + + public MetricsServer(Config config, InMemoryMetricsProducer producer) { + server = new Server(8098); + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + this.server.setHandler(context); + this.server.setStopAtShutdown(true); + this.server.setStopTimeout(2000L); + context.addServlet( + new ServletHolder(new MetricsExporterServlet(producer)), "/ingestion/metrics"); + } + + public void start() { + try { + this.server.start(); + LOGGER.info("Started metrics service on port: {}.", 8098); + this.server.join(); + } catch (Exception var4) { + LOGGER.error("Failed to start metrics servlet."); + } + } + + public void stop() { + try { + server.stop(); + } catch (Exception e) { + LOGGER.error("Error stopping metrics server"); + } + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpGrpcExporter.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpGrpcExporter.java new file mode 100644 index 000000000..ec4afe23c --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpGrpcExporter.java @@ -0,0 +1,93 @@ +package org.hypertrace.metrics.exporter; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.Config; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Status; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; +import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; +import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc.MetricsServiceFutureStub; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OtlpGrpcExporter implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(OtlpGrpcExporter.class); + + private MetricsServiceFutureStub metricsService; + private ManagedChannel managedChannel; + private long timeoutNanos; + + public OtlpGrpcExporter(Config config) { + String host = config.getString("host"); + int port = config.getInt("port"); + int timeOut = config.hasPath("timeout_nanos") ? config.getInt("timeout_nanos") : 0; + + ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); + + managedChannel = channel; + timeoutNanos = timeOut; + metricsService = MetricsServiceGrpc.newFutureStub(channel); + } + + public void run() {} + + public CompletableResultCode export(List metrics) { + ExportMetricsServiceRequest exportMetricsServiceRequest = + ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(metrics).build(); + + final CompletableResultCode result = new CompletableResultCode(); + + MetricsServiceFutureStub exporter; + if (timeoutNanos > 0) { + exporter = metricsService.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS); + } else { + exporter = metricsService; + } + + Futures.addCallback( + exporter.export(exportMetricsServiceRequest), + new FutureCallback() { + @Override + public void onSuccess(@Nullable ExportMetricsServiceResponse response) { + result.succeed(); + } + + @Override + public void onFailure(Throwable t) { + Status status = Status.fromThrowable(t); + switch (status.getCode()) { + case UNIMPLEMENTED: + LOGGER.error("Failed to export metrics. Server responded with UNIMPLEMENTED. ", t); + break; + case UNAVAILABLE: + LOGGER.error("Failed to export metrics. Server is UNAVAILABLE. ", t); + break; + default: + LOGGER.warn("Failed to export metrics. Error message: " + t.getMessage()); + break; + } + result.fail(); + } + }, + MoreExecutors.directExecutor()); + return result; + } + + public void close() { + try { + managedChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.error("Failed to shutdown the gRPC channel", e); + } + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpToObjectConverter.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpToObjectConverter.java new file mode 100644 index 000000000..9ceba19e0 --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpToObjectConverter.java @@ -0,0 +1,127 @@ +package org.hypertrace.metrics.exporter; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.proto.common.v1.InstrumentationLibrary; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.Metric.DataCase; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.Sum; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoubleGaugeData; +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.DoubleSumData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.resources.Resource; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class OtlpToObjectConverter { + + public static Resource toResource(io.opentelemetry.proto.resource.v1.Resource otlpResource) { + return Resource.create(toAttributes(otlpResource.getAttributesList())); + } + + public static InstrumentationLibraryInfo toInstrumentationLibraryInfo( + InstrumentationLibrary otlpInstrumentationLibraryInfo) { + return InstrumentationLibraryInfo.create( + otlpInstrumentationLibraryInfo.getName(), otlpInstrumentationLibraryInfo.getVersion()); + } + + public static Attributes toAttributes(List keyValues) { + AttributesBuilder attributesBuilder = Attributes.builder(); + keyValues.forEach( + keyValue -> { + attributesBuilder.put(keyValue.getKey(), keyValue.getValue().getStringValue()); + }); + return attributesBuilder.build(); + } + + public static List toDoublePointData(List numberDataPoints) { + return numberDataPoints.stream() + .map( + numberDataPoint -> + DoublePointData.create( + numberDataPoint.getStartTimeUnixNano(), + numberDataPoint.getTimeUnixNano(), + toAttributes(numberDataPoint.getAttributesList()), + numberDataPoint.getAsInt())) + .collect(Collectors.toList()); + } + + public static List toMetricData(ResourceMetrics resourceMetrics) { + List metricData = new ArrayList<>(); + Resource resource = toResource(resourceMetrics.getResource()); + resourceMetrics + .getInstrumentationLibraryMetricsList() + .forEach( + instrumentationLibraryMetrics -> { + InstrumentationLibraryInfo instrumentationLibraryInfo = + toInstrumentationLibraryInfo( + instrumentationLibraryMetrics.getInstrumentationLibrary()); + instrumentationLibraryMetrics + .getMetricsList() + .forEach( + metric -> { + // get type : for now only support gauge + if (metric.getDataCase().equals(DataCase.GAUGE)) { + Gauge gaugeMetric = metric.getGauge(); + String name = metric.getName(); + String description = metric.getDescription(); + String unit = metric.getUnit(); + DoubleGaugeData data = + DoubleGaugeData.create( + toDoublePointData(gaugeMetric.getDataPointsList())); + MetricData md = + MetricData.createDoubleGauge( + resource, + instrumentationLibraryInfo, + name, + description, + unit, + data); + metricData.add(md); + } else if (metric.getDataCase().equals(DataCase.SUM)) { + Sum sumMetric = metric.getSum(); + boolean isMonotonic = sumMetric.getIsMonotonic(); + AggregationTemporality temporality; + if (sumMetric + .getAggregationTemporality() + .equals( + io.opentelemetry.proto.metrics.v1.AggregationTemporality + .AGGREGATION_TEMPORALITY_CUMULATIVE)) { + temporality = AggregationTemporality.CUMULATIVE; + } else if (sumMetric + .getAggregationTemporality() + .equals( + io.opentelemetry.proto.metrics.v1.AggregationTemporality + .AGGREGATION_TEMPORALITY_DELTA)) { + temporality = AggregationTemporality.DELTA; + } else { + temporality = AggregationTemporality.CUMULATIVE; + } + + DoubleSumData doubleSumData = + DoubleSumData.create( + isMonotonic, + temporality, + toDoublePointData(sumMetric.getDataPointsList())); + MetricData md = + MetricData.createDoubleSum( + resource, + instrumentationLibraryInfo, + metric.getName(), + metric.getDescription(), + metric.getUnit(), + doubleSumData); + metricData.add(md); + } + }); + }); + return metricData; + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf new file mode 100644 index 000000000..69694c89b --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf @@ -0,0 +1,29 @@ +service.name = hypertrace-metrics-exporter +service.admin.port = 8099 + +main.class = org.hypertrace.metrics.exporter.MetricsExporterEntryService + +input.topic = "enriched-otlp-metrics" + + +pull.exporter = true + +otlp.collector.config = { + host = localhost + host = ${?OTLP_COLLECTOR_HOST} + port = 4317 + port = ${?OTLP_COLLECTOR_PORT} +} + +kafka.config = { + application.id = metrics-from-enriched-otlp-metrics-job + bootstrap.servers = "localhost:9092" + bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} +} + +logger.names = ["file"] +logger.file.dir = "/var/logs/metrics-generator" + +metrics.reporter.prefix = org.hypertrace.metrics.exporter.MetricsExporterEntryService +metrics.reporter.names = ["prometheus"] +metrics.reportInterval = 60 \ No newline at end of file diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/log4j2.properties b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/log4j2.properties new file mode 100644 index 000000000..d91bc7bfe --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/log4j2.properties @@ -0,0 +1,23 @@ +status=error +name=PropertiesConfig +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.type=RollingFile +appender.rolling.name=ROLLING_FILE +appender.rolling.fileName=${sys:service.name:-service}.log +appender.rolling.filePattern=${sys:service.name:-service}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz +appender.rolling.layout.type=PatternLayout +appender.rolling.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.policies.type=Policies +appender.rolling.policies.time.type=TimeBasedTriggeringPolicy +appender.rolling.policies.time.interval=3600 +appender.rolling.policies.time.modulate=true +appender.rolling.policies.size.type=SizeBasedTriggeringPolicy +appender.rolling.policies.size.size=20MB +appender.rolling.strategy.type=DefaultRolloverStrategy +appender.rolling.strategy.max=5 +rootLogger.level=INFO +rootLogger.appenderRef.stdout.ref=STDOUT +rootLogger.appenderRef.rolling.ref=ROLLING_FILE diff --git a/hypertrace-metrics-generator/build.gradle.kts b/hypertrace-metrics-generator/build.gradle.kts new file mode 100644 index 000000000..b1720497c --- /dev/null +++ b/hypertrace-metrics-generator/build.gradle.kts @@ -0,0 +1,3 @@ +subprojects { + group = "org.hypertrace.metrics.generator" +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts new file mode 100644 index 000000000..8afca34c7 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts @@ -0,0 +1,46 @@ +plugins { + java + application + jacoco + id("org.hypertrace.docker-java-application-plugin") + id("org.hypertrace.docker-publish-plugin") + id("org.hypertrace.jacoco-report-plugin") +} + +application { + mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher") +} + +hypertraceDocker { + defaultImage { + javaApplication { + serviceName.set("${project.name}") + adminPort.set(8099) + } + } +} + +tasks.test { + useJUnitPlatform() +} + +dependencies { + // common and framework + implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") + + // open telemetry + implementation("io.opentelemetry:opentelemetry-api:1.4.1") + implementation("io.opentelemetry:opentelemetry-api-metrics:1.4.1-alpha") + implementation("io.opentelemetry:opentelemetry-sdk:1.4.1") + implementation("io.opentelemetry:opentelemetry-exporter-otlp-common:1.4.1") + implementation("io.opentelemetry:opentelemetry-sdk-metrics:1.4.1-alpah") + implementation("io.opentelemetry:opentelemetry-exporter-otlp-metrics:1.4.1-alpha") + + // test + testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") + testImplementation("org.mockito:mockito-core:3.8.0") + testImplementation("com.google.code.gson:gson:2.8.7") +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsExtractor.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsExtractor.java new file mode 100644 index 000000000..dd13c3392 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsExtractor.java @@ -0,0 +1,66 @@ +package org.hypertrace.metrics.generator; + +import io.opentelemetry.api.metrics.GlobalMeterProvider; +import io.opentelemetry.api.metrics.LongUpDownCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.common.Labels; +import io.opentelemetry.exporter.otlp.internal.MetricAdapter; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.data.MetricData; +import java.util.Collection; +import java.util.List; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.hypertrace.viewgenerator.api.RawServiceView; + +public class MetricsExtractor + implements Transformer> { + + private LongUpDownCounter longUpDownCounter; + private long resetCounter = 0; + private SdkMeterProvider sdkMeterProvider; + + @Override + public void init(ProcessorContext context) { + SdkMeterProviderBuilder sdkMeterProviderBuilder = SdkMeterProvider.builder(); + sdkMeterProvider = sdkMeterProviderBuilder.buildAndRegisterGlobal(); + resetCounter = 0; + + Meter meter = GlobalMeterProvider.get().get("io.opentelemetry.example.metrics", "1.4.1"); + + this.longUpDownCounter = + meter + .longUpDownCounterBuilder("num_calls") + .setDescription("Measure the number of calls") + .setUnit("1") + .build(); + } + + @Override + public KeyValue transform(String key, RawServiceView value) { + Labels labels = + Labels.of( + "tenant_id", value.getTenantId(), + "consumer_id", "1", + "service_id", value.getServiceId(), + "service_name", value.getServiceName(), + "api_id", value.getApiId(), + "api_name", value.getApiName()); + longUpDownCounter.add(value.getNumCalls(), labels); + if (resetCounter % 10 == 0) { + resetCounter = 0; + Collection metricData = sdkMeterProvider.collectAllMetrics(); + List resourceMetrics = MetricAdapter.toProtoResourceMetrics(metricData); + if (resourceMetrics.size() > 0) { + return new KeyValue<>(null, resourceMetrics.get(0)); + } + } + return null; + } + + @Override + public void close() {} +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java new file mode 100644 index 000000000..a369e742e --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java @@ -0,0 +1,76 @@ +package org.hypertrace.metrics.generator; + +import com.typesafe.config.Config; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp; +import org.hypertrace.core.serviceframework.config.ConfigClient; +import org.hypertrace.viewgenerator.api.RawServiceView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsGenerator extends KafkaStreamsApp { + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsGenerator.class); + private static final String INPUT_TOPIC_CONFIG_KEY = "input.topic"; + private static final String OUTPUT_TOPIC_CONFIG_KEY = "output.topic"; + private static final String METRICS_GENERATOR_JOB_CONFIG = "metrics-generator-job-config"; + + public MetricsGenerator(ConfigClient configClient) { + super(configClient); + } + + @Override + public StreamsBuilder buildTopology( + Map streamsProperties, + StreamsBuilder streamsBuilder, + Map> inputStreams) { + + Config jobConfig = getJobConfig(streamsProperties); + String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY); + String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY); + KStream inputStream = + (KStream) inputStreams.get(inputTopic); + if (inputStream == null) { + inputStream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), (Serde) null)); + inputStreams.put(inputTopic, inputStream); + } + + inputStream + .transform(MetricsExtractor::new) + .to(outputTopic, Produced.with(Serdes.ByteArray(), new OtelMetricsSerde())); + + return streamsBuilder; + } + + @Override + public String getJobConfigKey() { + return METRICS_GENERATOR_JOB_CONFIG; + } + + @Override + public Logger getLogger() { + return LOGGER; + } + + @Override + public List getInputTopics(Map properties) { + Config jobConfig = getJobConfig(properties); + return List.of(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY)); + } + + @Override + public List getOutputTopics(Map properties) { + Config jobConfig = getJobConfig(properties); + return List.of(jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY)); + } + + private Config getJobConfig(Map properties) { + return (Config) properties.get(getJobConfigKey()); + } +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/OtelMetricsSerde.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/OtelMetricsSerde.java new file mode 100644 index 000000000..16130e807 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/OtelMetricsSerde.java @@ -0,0 +1,45 @@ +package org.hypertrace.metrics.generator; + +import com.google.protobuf.InvalidProtocolBufferException; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +public class OtelMetricsSerde implements Serde { + + @Override + public void configure(Map configs, boolean isKey) {} + + @Override + public void close() {} + + @Override + public Serializer serializer() { + return new OtelMetricsSerde.Ser(); + } + + @Override + public Deserializer deserializer() { + return new OtelMetricsSerde.De(); + } + + public static class Ser implements Serializer { + @Override + public byte[] serialize(String topic, ResourceMetrics data) { + return data.toByteArray(); + } + } + + public static class De implements Deserializer { + @Override + public ResourceMetrics deserialize(String topic, byte[] data) { + try { + return ResourceMetrics.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf new file mode 100644 index 000000000..af1d5ad52 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf @@ -0,0 +1,34 @@ +service.name = hypertrace-metrics-generator +service.admin.port = 8099 + +main.class = org.hypertrace.metrics.generator.MetricsGenerator + +input.topic = "raw-service-view-events" +output.topic = "otlp-metrics" +input.class = org.hypertrace.viewgenerator.api.RawServiceView +precreate.topics = false +precreate.topics = ${?PRE_CREATE_TOPICS} + +kafka.streams.config = { + application.id = metrics-from-raw-service-view-events-job + num.stream.threads = 2 + num.stream.threads = ${?NUM_STREAM_THREADS} + + bootstrap.servers = "localhost:9092" + bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} + + schema.registry.url = "http://localhost:8081" + schema.registry.url = ${?SCHEMA_REGISTRY_URL} + value.subject.name.strategy = "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy" +} + +processor { + defaultTenantId = ${?DEFAULT_TENANT_ID} +} + +logger.names = ["file"] +logger.file.dir = "/var/logs/metrics-generator" + +metrics.reporter.prefix = org.hypertrace.metrics.generator.MetricsGenerator +metrics.reporter.names = ["prometheus"] +metrics.reportInterval = 60 diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/log4j2.properties b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/log4j2.properties new file mode 100644 index 000000000..d91bc7bfe --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/log4j2.properties @@ -0,0 +1,23 @@ +status=error +name=PropertiesConfig +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.type=RollingFile +appender.rolling.name=ROLLING_FILE +appender.rolling.fileName=${sys:service.name:-service}.log +appender.rolling.filePattern=${sys:service.name:-service}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz +appender.rolling.layout.type=PatternLayout +appender.rolling.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.policies.type=Policies +appender.rolling.policies.time.type=TimeBasedTriggeringPolicy +appender.rolling.policies.time.interval=3600 +appender.rolling.policies.time.modulate=true +appender.rolling.policies.size.type=SizeBasedTriggeringPolicy +appender.rolling.policies.size.size=20MB +appender.rolling.strategy.type=DefaultRolloverStrategy +appender.rolling.strategy.max=5 +rootLogger.level=INFO +rootLogger.appenderRef.stdout.ref=STDOUT +rootLogger.appenderRef.rolling.ref=ROLLING_FILE diff --git a/hypertrace-metrics-processor/build.gradle.kts b/hypertrace-metrics-processor/build.gradle.kts new file mode 100644 index 000000000..870768c84 --- /dev/null +++ b/hypertrace-metrics-processor/build.gradle.kts @@ -0,0 +1,3 @@ +subprojects { + group = "org.hypertrace.metrics.processor" +} \ No newline at end of file diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts b/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts new file mode 100644 index 000000000..8afca34c7 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts @@ -0,0 +1,46 @@ +plugins { + java + application + jacoco + id("org.hypertrace.docker-java-application-plugin") + id("org.hypertrace.docker-publish-plugin") + id("org.hypertrace.jacoco-report-plugin") +} + +application { + mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher") +} + +hypertraceDocker { + defaultImage { + javaApplication { + serviceName.set("${project.name}") + adminPort.set(8099) + } + } +} + +tasks.test { + useJUnitPlatform() +} + +dependencies { + // common and framework + implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") + + // open telemetry + implementation("io.opentelemetry:opentelemetry-api:1.4.1") + implementation("io.opentelemetry:opentelemetry-api-metrics:1.4.1-alpha") + implementation("io.opentelemetry:opentelemetry-sdk:1.4.1") + implementation("io.opentelemetry:opentelemetry-exporter-otlp-common:1.4.1") + implementation("io.opentelemetry:opentelemetry-sdk-metrics:1.4.1-alpah") + implementation("io.opentelemetry:opentelemetry-exporter-otlp-metrics:1.4.1-alpha") + + // test + testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") + testImplementation("org.mockito:mockito-core:3.8.0") + testImplementation("com.google.code.gson:gson:2.8.7") +} diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsEnricher.java b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsEnricher.java new file mode 100644 index 000000000..32b2a4e69 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsEnricher.java @@ -0,0 +1,22 @@ +package org.hypertrace.metrics.processor; + +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; + +public class MetricsEnricher + implements Transformer> { + + @Override + public void init(ProcessorContext context) {} + + @Override + public KeyValue transform(byte[] key, ResourceMetrics value) { + // noop enricher for now + return new KeyValue<>(key, value); + } + + @Override + public void close() {} +} diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsNormalizer.java b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsNormalizer.java new file mode 100644 index 000000000..e16da2518 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsNormalizer.java @@ -0,0 +1,22 @@ +package org.hypertrace.metrics.processor; + +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; + +public class MetricsNormalizer + implements Transformer> { + + @Override + public void init(ProcessorContext context) {} + + @Override + public KeyValue transform(byte[] key, ResourceMetrics value) { + // noop normalizer for now + return new KeyValue<>(key, value); + } + + @Override + public void close() {} +} diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsProcessor.java b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsProcessor.java new file mode 100644 index 000000000..81c89ac27 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsProcessor.java @@ -0,0 +1,80 @@ +package org.hypertrace.metrics.processor; + +import com.typesafe.config.Config; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp; +import org.hypertrace.core.serviceframework.config.ConfigClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsProcessor extends KafkaStreamsApp { + private static final Logger logger = LoggerFactory.getLogger(MetricsProcessor.class); + private static final String INPUT_TOPIC_CONFIG_KEY = "input.topic"; + private static final String OUTPUT_TOPIC_CONFIG_KEY = "output.topic"; + private static final String METRICS_PROCESSOR_JOB_CONFIG = "metrics-processor-job-config"; + + public MetricsProcessor(ConfigClient configClient) { + super(configClient); + } + + @Override + public StreamsBuilder buildTopology( + Map streamsProperties, + StreamsBuilder streamsBuilder, + Map> inputStreams) { + + Config jobConfig = getJobConfig(streamsProperties); + String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY); + String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY); + + // input stream + KStream inputStream = + (KStream) inputStreams.get(inputTopic); + if (inputStream == null) { + inputStream = + streamsBuilder.stream( + inputTopic, Consumed.with(Serdes.ByteArray(), new OtlpMetricsSerde())); + inputStreams.put(inputTopic, inputStream); + } + + inputStream + .transform(MetricsNormalizer::new) + .transform(MetricsEnricher::new) + .to(outputTopic, Produced.with(Serdes.ByteArray(), new OtlpMetricsSerde())); + + return streamsBuilder; + } + + @Override + public String getJobConfigKey() { + return METRICS_PROCESSOR_JOB_CONFIG; + } + + @Override + public Logger getLogger() { + return logger; + } + + @Override + public List getInputTopics(Map properties) { + Config jobConfig = getJobConfig(properties); + return List.of(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY)); + } + + @Override + public List getOutputTopics(Map properties) { + Config jobConfig = getJobConfig(properties); + return List.of(jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY)); + } + + private Config getJobConfig(Map properties) { + return (Config) properties.get(getJobConfigKey()); + } +} diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/OtlpMetricsSerde.java b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/OtlpMetricsSerde.java new file mode 100644 index 000000000..21c82cb75 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/OtlpMetricsSerde.java @@ -0,0 +1,45 @@ +package org.hypertrace.metrics.processor; + +import com.google.protobuf.InvalidProtocolBufferException; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +public class OtlpMetricsSerde implements Serde { + + @Override + public void configure(Map configs, boolean isKey) {} + + @Override + public void close() {} + + @Override + public Serializer serializer() { + return new OtlpMetricsSerde.Ser(); + } + + @Override + public Deserializer deserializer() { + return new OtlpMetricsSerde.De(); + } + + public static class Ser implements Serializer { + @Override + public byte[] serialize(String topic, ResourceMetrics data) { + return data.toByteArray(); + } + } + + public static class De implements Deserializer { + @Override + public ResourceMetrics deserialize(String topic, byte[] data) { + try { + return ResourceMetrics.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/configs/common/application.conf b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/configs/common/application.conf new file mode 100644 index 000000000..a66eb4749 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/configs/common/application.conf @@ -0,0 +1,34 @@ +service.name = hypertrace-metrics-processor +service.admin.port = 8099 + +main.class = org.hypertrace.metrics.processor.MetricsProcessor + +input.topic = "otlp-metrics" +output.topic = "enriched-otlp-metrics" + +precreate.topics = false +precreate.topics = ${?PRE_CREATE_TOPICS} + +kafka.streams.config = { + application.id = metrics-processor-job + num.stream.threads = 2 + num.stream.threads = ${?NUM_STREAM_THREADS} + + bootstrap.servers = "localhost:9092" + bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} + + schema.registry.url = "http://localhost:8081" + schema.registry.url = ${?SCHEMA_REGISTRY_URL} + value.subject.name.strategy = "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy" +} + +processor { + defaultTenantId = ${?DEFAULT_TENANT_ID} +} + +logger.names = ["file"] +logger.file.dir = "/var/logs/metrics-processor" + +metrics.reporter.prefix = org.hypertrace.metrics.processor.MetricsProcessor +metrics.reporter.names = ["prometheus"] +metrics.reportInterval = 60 diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/log4j2.properties b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/log4j2.properties new file mode 100644 index 000000000..d91bc7bfe --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/log4j2.properties @@ -0,0 +1,23 @@ +status=error +name=PropertiesConfig +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.type=RollingFile +appender.rolling.name=ROLLING_FILE +appender.rolling.fileName=${sys:service.name:-service}.log +appender.rolling.filePattern=${sys:service.name:-service}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz +appender.rolling.layout.type=PatternLayout +appender.rolling.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.policies.type=Policies +appender.rolling.policies.time.type=TimeBasedTriggeringPolicy +appender.rolling.policies.time.interval=3600 +appender.rolling.policies.time.modulate=true +appender.rolling.policies.size.type=SizeBasedTriggeringPolicy +appender.rolling.policies.size.size=20MB +appender.rolling.strategy.type=DefaultRolloverStrategy +appender.rolling.strategy.max=5 +rootLogger.level=INFO +rootLogger.appenderRef.stdout.ref=STDOUT +rootLogger.appenderRef.rolling.ref=ROLLING_FILE diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts b/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts index 818180c41..4fb9c8bcf 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts @@ -36,8 +36,8 @@ tasks.test { dependencies { implementation(project(":hypertrace-trace-enricher:hypertrace-trace-enricher-impl")) implementation("org.hypertrace.core.datamodel:data-model:0.1.18") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") implementation("org.hypertrace.entity.service:entity-service-client:0.8.0") implementation("com.typesafe:config:1.4.1") diff --git a/raw-spans-grouper/raw-spans-grouper/build.gradle.kts b/raw-spans-grouper/raw-spans-grouper/build.gradle.kts index 85b3c7431..a473a8f87 100644 --- a/raw-spans-grouper/raw-spans-grouper/build.gradle.kts +++ b/raw-spans-grouper/raw-spans-grouper/build.gradle.kts @@ -39,8 +39,8 @@ dependencies { } implementation(project(":span-normalizer:span-normalizer-api")) implementation("org.hypertrace.core.datamodel:data-model:0.1.18") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") implementation("com.typesafe:config:1.4.1") diff --git a/settings.gradle.kts b/settings.gradle.kts index 2ca1d9e44..352b5771b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -34,6 +34,11 @@ include("span-normalizer:span-normalizer") include("span-normalizer:raw-span-constants") include("span-normalizer:span-normalizer-constants") +// metrics pipeline +include("hypertrace-metrics-generator:hypertrace-metrics-generator") +include("hypertrace-metrics-processor:hypertrace-metrics-processor") +include("hypertrace-metrics-exporter:hypertrace-metrics-exporter") + // e2e pipeline include("hypertrace-ingester") include("semantic-convention-utils") diff --git a/span-normalizer/span-normalizer-api/build.gradle.kts b/span-normalizer/span-normalizer-api/build.gradle.kts index 81a6197b9..08b8d7f91 100644 --- a/span-normalizer/span-normalizer-api/build.gradle.kts +++ b/span-normalizer/span-normalizer-api/build.gradle.kts @@ -63,4 +63,5 @@ dependencies { because("Multiple vulnerabilities in avro-declared version") } } + api("io.opentelemetry:opentelemetry-proto:0.3.0") } diff --git a/span-normalizer/span-normalizer/build.gradle.kts b/span-normalizer/span-normalizer/build.gradle.kts index 2b2c92b3f..27652e8ea 100644 --- a/span-normalizer/span-normalizer/build.gradle.kts +++ b/span-normalizer/span-normalizer/build.gradle.kts @@ -35,8 +35,8 @@ dependencies { implementation(project(":semantic-convention-utils")) implementation("org.hypertrace.core.datamodel:data-model:0.1.18") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") // Required for the GRPC clients.