mirror of https://github.com/apache/druid.git
OpenTelemetry emitter extension (#12015)
* Add OpenTelemetry emitter extension * Fix build * Fix checkstyle * Add used undeclared dependencies * Ignore unused declared dependencies
This commit is contained in:
parent
e0c4c568cb
commit
6a93872586
|
@ -614,6 +614,8 @@
|
|||
<argument>org.apache.druid.extensions.contrib:gce-extensions</argument>
|
||||
<argument>-c</argument>
|
||||
<argument>org.apache.druid.extensions.contrib:aliyun-oss-extensions</argument>
|
||||
<argument>-c</argument>
|
||||
<argument>org.apache.druid.extensions.contrib:opentelemetry-emitter</argument>
|
||||
</arguments>
|
||||
</configuration>
|
||||
</execution>
|
||||
|
|
|
@ -0,0 +1,166 @@
|
|||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||
~ or more contributor license agreements. See the NOTICE file
|
||||
~ distributed with this work for additional information
|
||||
~ regarding copyright ownership. The ASF licenses this file
|
||||
~ to you under the Apache License, Version 2.0 (the
|
||||
~ "License"); you may not use this file except in compliance
|
||||
~ with the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing,
|
||||
~ software distributed under the License is distributed on an
|
||||
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
~ KIND, either express or implied. See the License for the
|
||||
~ specific language governing permissions and limitations
|
||||
~ under the License.
|
||||
-->
|
||||
|
||||
# OpenTelemetry Emitter
|
||||
|
||||
The [OpenTelemetry](https://opentelemetry.io/) emitter generates OpenTelemetry Spans for queries.
|
||||
|
||||
## How OpenTelemetry emitter works
|
||||
|
||||
The [OpenTelemetry](https://opentelemetry.io/) emitter processes `ServiceMetricEvent` events for the `query/time`
|
||||
metric. It extracts OpenTelemetry context from
|
||||
the [query context](https://druid.apache.org/docs/latest/querying/query-context.html). To link druid spans to parent
|
||||
traces, the query context should contain at least `traceparent` key.
|
||||
See [context propagation](https://www.w3.org/TR/trace-context/) for more information. If no `traceparent` key is
|
||||
provided, then spans are created without `parentTraceId` and are not linked to the parent span. In addition, the emitter
|
||||
also adds other druid context entries to the span attributes.
|
||||
|
||||
## Configuration
|
||||
|
||||
### Enabling
|
||||
|
||||
To enable the OpenTelemetry emitter, add the extension and enable the emitter in `common.runtime.properties`.
|
||||
|
||||
Load the plugin:
|
||||
|
||||
```
|
||||
druid.extensions.loadList=[..., "opentelemetry-emitter"]
|
||||
```
|
||||
|
||||
Then there are 2 options:
|
||||
|
||||
* You want to use only `opentelemetry-emitter`
|
||||
|
||||
```
|
||||
druid.emitter=opentelemetry
|
||||
```
|
||||
|
||||
* You want to use `opentelemetry-emitter` with other emitters
|
||||
|
||||
```
|
||||
druid.emitter=composing
|
||||
druid.emitter.composing.emitters=[..., "opentelemetry"]
|
||||
```
|
||||
|
||||
_*More about Druid configuration [here](https://druid.apache.org/docs/latest/configuration/index.html)._
|
||||
|
||||
## Testing
|
||||
|
||||
### Part 1: Run zipkin and otel-collector
|
||||
|
||||
Create `docker-compose.yaml` in your working dir:
|
||||
|
||||
```
|
||||
version: "2"
|
||||
services:
|
||||
|
||||
zipkin-all-in-one:
|
||||
image: openzipkin/zipkin:latest
|
||||
ports:
|
||||
- "9411:9411"
|
||||
|
||||
otel-collector:
|
||||
image: otel/opentelemetry-collector:latest
|
||||
command: ["--config=otel-local-config.yaml", "${OTELCOL_ARGS}"]
|
||||
volumes:
|
||||
- ${PWD}/config.yaml:/otel-local-config.yaml
|
||||
ports:
|
||||
- "4317:4317"
|
||||
```
|
||||
|
||||
Create `config.yaml` file with configuration for otel-collector:
|
||||
|
||||
```
|
||||
version: "2"
|
||||
receivers:
|
||||
receivers:
|
||||
otlp:
|
||||
protocols:
|
||||
grpc:
|
||||
|
||||
exporters:
|
||||
zipkin:
|
||||
endpoint: "http://zipkin-all-in-one:9411/api/v2/spans"
|
||||
format: proto
|
||||
|
||||
logging:
|
||||
|
||||
processors:
|
||||
batch:
|
||||
|
||||
service:
|
||||
pipelines:
|
||||
traces:
|
||||
receivers: [otlp]
|
||||
processors: [batch]
|
||||
exporters: [logging, zipkin]
|
||||
```
|
||||
|
||||
*_How to configure otel-collector you can read [here](https://opentelemetry.io/docs/collector/configuration/)._
|
||||
|
||||
Run otel-collector and zipkin.
|
||||
|
||||
```
|
||||
docker-compose up
|
||||
```
|
||||
|
||||
### Part 2: Run Druid
|
||||
|
||||
Build Druid:
|
||||
|
||||
```
|
||||
mvn clean install -Pdist
|
||||
tar -C /tmp -xf distribution/target/apache-druid-0.21.0-bin.tar.gz
|
||||
cd /tmp/apache-druid-0.21.0
|
||||
```
|
||||
|
||||
Edit `conf/druid/single-server/micro-quickstart/_common/common.runtime.properties` to enable the emitter (
|
||||
see `Configuration` section above).
|
||||
|
||||
Start the quickstart with the apppropriate environment variables for opentelemetry autoconfiguration:
|
||||
|
||||
```
|
||||
OTEL_SERVICE_NAME="org.apache.druid" OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317" bin/start-micro-quickstart
|
||||
```
|
||||
|
||||
*_More about opentelemetry
|
||||
autoconfiguration [here](https://github.com/open-telemetry/opentelemetry-java/tree/main/sdk-extensions/autoconfigure)_
|
||||
|
||||
Load sample data - [example](https://druid.apache.org/docs/latest/tutorials/index.html#step-4-load-data).
|
||||
|
||||
### Part 3: Send queries
|
||||
|
||||
Create `query.json`:
|
||||
|
||||
```
|
||||
{
|
||||
"query":"SELECT COUNT(*) as total FROM wiki WHERE countryName IS NOT NULL",
|
||||
"context":{
|
||||
"traceparent":"00-54ef39243e3feb12072e0f8a74c1d55a-ad6d5b581d7c29c1-01"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Send query:
|
||||
|
||||
```
|
||||
curl -XPOST -H'Content-Type: application/json' http://localhost:8888/druid/v2/sql/ -d @query.json
|
||||
```
|
||||
|
||||
Then open `http://localhost:9411/zipkin/` and you can see there your spans.
|
|
@ -0,0 +1,217 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||
~ or more contributor license agreements. See the NOTICE file
|
||||
~ distributed with this work for additional information
|
||||
~ regarding copyright ownership. The ASF licenses this file
|
||||
~ to you under the Apache License, Version 2.0 (the
|
||||
~ "License"); you may not use this file except in compliance
|
||||
~ with the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing,
|
||||
~ software distributed under the License is distributed on an
|
||||
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
~ KIND, either express or implied. See the License for the
|
||||
~ specific language governing permissions and limitations
|
||||
~ under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.23.0-SNAPSHOT</version>
|
||||
<relativePath>../../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>org.apache.druid.extensions.contrib</groupId>
|
||||
<artifactId>opentelemetry-emitter</artifactId>
|
||||
<name>opentelemetry-emitter</name>
|
||||
<description>Extension support for emitting OpenTelemetry spans for Druid queries</description>
|
||||
|
||||
<properties>
|
||||
<opentelemetry.version>1.7.0</opentelemetry.version>
|
||||
<opentelemetry.instrumentation.version>1.7.0-alpha</opentelemetry.instrumentation.version>
|
||||
<!-- These guava and grpc versions are used only in the opentelemetry-extension.
|
||||
Look at build section for more details about shading. -->
|
||||
<shade.guava.version>30.1.1-jre</shade.guava.version>
|
||||
<shade.grpc.version>1.41.0</shade.grpc.version>
|
||||
</properties>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-bom</artifactId>
|
||||
<version>${opentelemetry.version}</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-bom-alpha</artifactId>
|
||||
<version>${opentelemetry.version}-alpha</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry.instrumentation</groupId>
|
||||
<artifactId>opentelemetry-instrumentation-bom-alpha</artifactId>
|
||||
<version>${opentelemetry.instrumentation.version}</version>
|
||||
<type>pom</type>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<artifactId>druid-core</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-context</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-sdk</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-sdk-trace</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-sdk-common</artifactId>
|
||||
</dependency>
|
||||
<!-- OpenTelemetry extension bundles the OpenTelemetry auto-instrumentation,
|
||||
So it could potentially affect performance -->
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.grpc</groupId>
|
||||
<artifactId>grpc-netty-shaded</artifactId>
|
||||
<version>${shade.grpc.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
<artifactId>jsr305</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.inject</groupId>
|
||||
<artifactId>guice</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>${shade.guava.version}</version>
|
||||
</dependency>
|
||||
<!-- explicitly include perfmark dependency of grpc we exclude from the shaded jar
|
||||
Note: we could use promoteTransitiveDependencies=true in the shade plugin, but that promotes all
|
||||
transitive dependencies as well, which unnecessarily pollutes the final pom -->
|
||||
<dependency>
|
||||
<groupId>io.perfmark</groupId>
|
||||
<artifactId>perfmark-api</artifactId>
|
||||
<version>0.23.0</version>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>joda-time</groupId>
|
||||
<artifactId>joda-time</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>pl.pragmatists</groupId>
|
||||
<artifactId>JUnitParams</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<configuration>
|
||||
<ignoredUnusedDeclaredDependencies>
|
||||
<!-- Transitive dependencies from opentelemetry but explicitly added to be shadowed -->
|
||||
<ignoredUnusedDeclaredDependency>io.grpc:grpc-netty-shaded</ignoredUnusedDeclaredDependency>
|
||||
<ignoredUnusedDeclaredDependency>com.google.guava:guava</ignoredUnusedDeclaredDependency>
|
||||
</ignoredUnusedDeclaredDependencies>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>opentelemetry-extension</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<transformers>
|
||||
<!-- grpc stores service providers in META-INF/services/* files,
|
||||
so we need to relocate the class names of the implementation classes.
|
||||
More about SPI - https://docs.oracle.com/javase/tutorial/ext/basics/spi.html.
|
||||
https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html. -->
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
|
||||
</transformers>
|
||||
<artifactSet>
|
||||
<includes>
|
||||
<include>io.opentelemetry</include>
|
||||
<include>io.grpc</include>
|
||||
<include>com.google.guava</include>
|
||||
</includes>
|
||||
</artifactSet>
|
||||
<relocations>
|
||||
<relocation>
|
||||
<pattern>com.google.common</pattern>
|
||||
<shadedPattern>org.apache.druid.opentelemetry.shaded.com.google.common</shadedPattern>
|
||||
</relocation>
|
||||
<relocation>
|
||||
<pattern>io.grpc</pattern>
|
||||
<shadedPattern>org.apache.druid.opentelemetry.shaded.io.grpc</shadedPattern>
|
||||
<includes>
|
||||
<include>io.grpc.*</include>
|
||||
</includes>
|
||||
</relocation>
|
||||
<relocation>
|
||||
<pattern>io.opentelemetry</pattern>
|
||||
<shadedPattern>org.apache.druid.opentelemetry.shaded.io.opentelemetry</shadedPattern>
|
||||
</relocation>
|
||||
</relocations>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.emitter.opentelemetry;
|
||||
|
||||
import io.opentelemetry.context.propagation.TextMapGetter;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Implementation of a text-based approach to read the W3C Trace Context from the metric query context.
|
||||
* <a href="https://opentelemetry.io/docs/java/manual_instrumentation/#context-propagation">Context propagation</a>
|
||||
* <a href="https://www.w3.org/TR/trace-context/">W3C Trace Context</a>
|
||||
*/
|
||||
public class DruidContextTextMapGetter implements TextMapGetter<ServiceMetricEvent>
|
||||
{
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Map<String, Object> getContext(ServiceMetricEvent event)
|
||||
{
|
||||
Object context = event.getUserDims().get("context");
|
||||
if (context instanceof Map) {
|
||||
return (Map<String, Object>) context;
|
||||
}
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String get(ServiceMetricEvent event, String key)
|
||||
{
|
||||
return Optional.ofNullable(getContext(event).get(key)).map(Objects::toString).orElse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterable<String> keys(ServiceMetricEvent event)
|
||||
{
|
||||
return getContext(event).keySet();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,130 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.emitter.opentelemetry;
|
||||
|
||||
import io.opentelemetry.api.OpenTelemetry;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.api.trace.Tracer;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.context.propagation.TextMapPropagator;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.java.util.emitter.core.Emitter;
|
||||
import org.apache.druid.java.util.emitter.core.Event;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class OpenTelemetryEmitter implements Emitter
|
||||
{
|
||||
static final DruidContextTextMapGetter DRUID_CONTEXT_TEXT_MAP_GETTER = new DruidContextTextMapGetter();
|
||||
static final HashSet<String> TRACEPARENT_PROPAGATION_FIELDS = new HashSet<>(Arrays.asList(
|
||||
"traceparent",
|
||||
"tracestate"
|
||||
));
|
||||
private static final Logger log = new Logger(OpenTelemetryEmitter.class);
|
||||
private final Tracer tracer;
|
||||
private final TextMapPropagator propagator;
|
||||
|
||||
OpenTelemetryEmitter(OpenTelemetry openTelemetry)
|
||||
{
|
||||
tracer = openTelemetry.getTracer("druid-opentelemetry-extension");
|
||||
propagator = openTelemetry.getPropagators().getTextMapPropagator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start()
|
||||
{
|
||||
log.debug("Starting OpenTelemetryEmitter");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void emit(Event e)
|
||||
{
|
||||
if (!(e instanceof ServiceMetricEvent)) {
|
||||
return;
|
||||
}
|
||||
ServiceMetricEvent event = (ServiceMetricEvent) e;
|
||||
|
||||
// We only generate spans for the following types of events:
|
||||
// query/time
|
||||
if (!event.getMetric().equals("query/time")) {
|
||||
return;
|
||||
}
|
||||
|
||||
emitQueryTimeEvent(event);
|
||||
}
|
||||
|
||||
private void emitQueryTimeEvent(ServiceMetricEvent event)
|
||||
{
|
||||
Context opentelemetryContext = propagator.extract(Context.current(), event, DRUID_CONTEXT_TEXT_MAP_GETTER);
|
||||
|
||||
try (Scope scope = opentelemetryContext.makeCurrent()) {
|
||||
DateTime endTime = event.getCreatedTime();
|
||||
DateTime startTime = endTime.minusMillis(event.getValue().intValue());
|
||||
|
||||
Span span = tracer.spanBuilder(event.getService())
|
||||
.setStartTimestamp(startTime.getMillis(), TimeUnit.MILLISECONDS)
|
||||
.startSpan();
|
||||
|
||||
getContext(event).entrySet()
|
||||
.stream()
|
||||
.filter(entry -> entry.getValue() != null)
|
||||
.filter(entry -> !TRACEPARENT_PROPAGATION_FIELDS.contains(entry.getKey()))
|
||||
.forEach(entry -> span.setAttribute(entry.getKey(), entry.getValue().toString()));
|
||||
|
||||
Object status = event.getUserDims().get("success");
|
||||
if (status == null) {
|
||||
span.setStatus(StatusCode.UNSET);
|
||||
} else if (status.toString().equals("true")) {
|
||||
span.setStatus(StatusCode.OK);
|
||||
} else {
|
||||
span.setStatus(StatusCode.ERROR);
|
||||
}
|
||||
|
||||
span.end(endTime.getMillis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
private static Map<String, Object> getContext(ServiceMetricEvent event)
|
||||
{
|
||||
Object context = event.getUserDims().get("context");
|
||||
if (context instanceof Map) {
|
||||
return (Map<String, Object>) context;
|
||||
}
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.emitter.opentelemetry;
|
||||
|
||||
/**
|
||||
* The placeholder for future configurations but there is no configuration yet
|
||||
*/
|
||||
public class OpenTelemetryEmitterConfig
|
||||
{
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.emitter.opentelemetry;
|
||||
|
||||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Provides;
|
||||
import com.google.inject.name.Named;
|
||||
import io.opentelemetry.sdk.autoconfigure.OpenTelemetrySdkAutoConfiguration;
|
||||
import org.apache.druid.guice.JsonConfigProvider;
|
||||
import org.apache.druid.guice.ManageLifecycle;
|
||||
import org.apache.druid.initialization.DruidModule;
|
||||
import org.apache.druid.java.util.emitter.core.Emitter;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class OpenTelemetryEmitterModule implements DruidModule
|
||||
{
|
||||
private static final String EMITTER_TYPE = "opentelemetry";
|
||||
|
||||
@Override
|
||||
public List<? extends Module> getJacksonModules()
|
||||
{
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bind(binder, "druid.emitter." + EMITTER_TYPE, OpenTelemetryEmitterConfig.class);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@ManageLifecycle
|
||||
@Named(EMITTER_TYPE)
|
||||
public Emitter getEmitter(OpenTelemetryEmitterConfig config, ObjectMapper mapper)
|
||||
{
|
||||
// It's a good practice to not set the GlobalOpenTelemetry since there's no need to do that
|
||||
return new OpenTelemetryEmitter(OpenTelemetrySdkAutoConfiguration.initialize(false));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
org.apache.druid.emitter.opentelemetry.OpenTelemetryEmitterModule
|
|
@ -0,0 +1,267 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.emitter.opentelemetry;
|
||||
|
||||
import io.opentelemetry.api.OpenTelemetry;
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
|
||||
import io.opentelemetry.context.propagation.ContextPropagators;
|
||||
import io.opentelemetry.sdk.OpenTelemetrySdk;
|
||||
import io.opentelemetry.sdk.common.CompletableResultCode;
|
||||
import io.opentelemetry.sdk.trace.SdkTracerProvider;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
|
||||
import io.opentelemetry.sdk.trace.export.SpanExporter;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.emitter.core.Event;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
public class OpenTelemetryEmitterTest
|
||||
{
|
||||
private static class NoopExporter implements SpanExporter
|
||||
{
|
||||
public Collection<SpanData> spanDataCollection;
|
||||
|
||||
@Override
|
||||
public CompletableResultCode export(Collection<SpanData> collection)
|
||||
{
|
||||
this.spanDataCollection = collection;
|
||||
return CompletableResultCode.ofSuccess();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableResultCode flush()
|
||||
{
|
||||
return CompletableResultCode.ofSuccess();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableResultCode shutdown()
|
||||
{
|
||||
return CompletableResultCode.ofSuccess();
|
||||
}
|
||||
}
|
||||
|
||||
private static final DateTime TIMESTAMP = DateTimes.of(2021, 11, 5, 1, 1);
|
||||
|
||||
private OpenTelemetry openTelemetry;
|
||||
private NoopExporter noopExporter;
|
||||
private OpenTelemetryEmitter emitter;
|
||||
|
||||
@Before
|
||||
public void setup()
|
||||
{
|
||||
noopExporter = new NoopExporter();
|
||||
openTelemetry = OpenTelemetrySdk.builder()
|
||||
.setTracerProvider(SdkTracerProvider.builder()
|
||||
.addSpanProcessor(SimpleSpanProcessor.create(
|
||||
noopExporter))
|
||||
.build())
|
||||
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
|
||||
.build();
|
||||
emitter = new OpenTelemetryEmitter(openTelemetry);
|
||||
}
|
||||
|
||||
// Check that we don't call "emitQueryTimeEvent" method for event that is not instance of ServiceMetricEvent
|
||||
@Test
|
||||
public void testNoEmitNotServiceMetric()
|
||||
{
|
||||
final Event notServiceMetricEvent =
|
||||
new Event()
|
||||
{
|
||||
@Override
|
||||
public Map<String, Object> toMap()
|
||||
{
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFeed()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
emitter.emit(notServiceMetricEvent);
|
||||
Assert.assertNull(noopExporter.spanDataCollection);
|
||||
}
|
||||
|
||||
// Check that we don't call "emitQueryTimeEvent" method for ServiceMetricEvent that is not "query/time" type
|
||||
@Test
|
||||
public void testNoEmitNotQueryTimeMetric()
|
||||
{
|
||||
final ServiceMetricEvent notQueryTimeMetric =
|
||||
new ServiceMetricEvent.Builder().build(
|
||||
TIMESTAMP,
|
||||
"query/cache/total/hitRate",
|
||||
0.54
|
||||
)
|
||||
.build(
|
||||
"broker",
|
||||
"brokerHost1"
|
||||
);
|
||||
|
||||
emitter.emit(notQueryTimeMetric);
|
||||
Assert.assertNull(noopExporter.spanDataCollection);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTraceparentId()
|
||||
{
|
||||
final String traceId = "00-54ef39243e3feb12072e0f8a74c1d55a-ad6d5b581d7c29c1-01";
|
||||
final String expectedParentTraceId = "54ef39243e3feb12072e0f8a74c1d55a";
|
||||
final String expectedParentSpanId = "ad6d5b581d7c29c1";
|
||||
final Map<String, String> context = new HashMap<>();
|
||||
context.put("traceparent", traceId);
|
||||
|
||||
final String serviceName = "druid/broker";
|
||||
final DateTime createdTime = TIMESTAMP;
|
||||
final long metricValue = 100;
|
||||
|
||||
final ServiceMetricEvent queryTimeMetric =
|
||||
new ServiceMetricEvent.Builder().setDimension("context", context)
|
||||
.build(
|
||||
createdTime,
|
||||
"query/time",
|
||||
metricValue
|
||||
)
|
||||
.build(
|
||||
serviceName,
|
||||
"host"
|
||||
);
|
||||
|
||||
emitter.emit(queryTimeMetric);
|
||||
|
||||
Assert.assertEquals(1, noopExporter.spanDataCollection.size());
|
||||
|
||||
SpanData actualSpanData = noopExporter.spanDataCollection.iterator().next();
|
||||
Assert.assertEquals(serviceName, actualSpanData.getName());
|
||||
Assert.assertEquals((createdTime.getMillis() - metricValue) * 1_000_000, actualSpanData.getStartEpochNanos());
|
||||
Assert.assertEquals(expectedParentTraceId, actualSpanData.getParentSpanContext().getTraceId());
|
||||
Assert.assertEquals(expectedParentSpanId, actualSpanData.getParentSpanContext().getSpanId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAttributes()
|
||||
{
|
||||
final Map<String, String> context = new HashMap<>();
|
||||
final String expectedAttributeKey = "attribute";
|
||||
final String expectedAttributeValue = "value";
|
||||
context.put(expectedAttributeKey, expectedAttributeValue);
|
||||
|
||||
final ServiceMetricEvent queryTimeMetricWithAttributes =
|
||||
new ServiceMetricEvent.Builder().setDimension("context", context)
|
||||
.build(
|
||||
TIMESTAMP,
|
||||
"query/time",
|
||||
100
|
||||
)
|
||||
.build(
|
||||
"druid/broker",
|
||||
"host"
|
||||
);
|
||||
|
||||
emitter.emit(queryTimeMetricWithAttributes);
|
||||
|
||||
SpanData actualSpanData = noopExporter.spanDataCollection.iterator().next();
|
||||
Assert.assertEquals(1, actualSpanData.getAttributes().size());
|
||||
Assert.assertEquals(
|
||||
expectedAttributeValue,
|
||||
actualSpanData.getAttributes().get(AttributeKey.stringKey(expectedAttributeKey))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterNullValue()
|
||||
{
|
||||
final Map<String, String> context = new HashMap<>();
|
||||
context.put("attributeKey", null);
|
||||
|
||||
final ServiceMetricEvent queryTimeMetric =
|
||||
new ServiceMetricEvent.Builder().setDimension("context", context)
|
||||
.build(
|
||||
TIMESTAMP,
|
||||
"query/time",
|
||||
100
|
||||
)
|
||||
.build(
|
||||
"druid/broker",
|
||||
"host"
|
||||
);
|
||||
|
||||
emitter.emit(queryTimeMetric);
|
||||
|
||||
SpanData actualSpanData = noopExporter.spanDataCollection.iterator().next();
|
||||
Assert.assertEquals(0, actualSpanData.getAttributes().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOkStatus()
|
||||
{
|
||||
final ServiceMetricEvent queryTimeMetric =
|
||||
new ServiceMetricEvent.Builder().setDimension("success", "true")
|
||||
.build(
|
||||
TIMESTAMP,
|
||||
"query/time",
|
||||
100
|
||||
)
|
||||
.build(
|
||||
"druid/broker",
|
||||
"host"
|
||||
);
|
||||
|
||||
emitter.emit(queryTimeMetric);
|
||||
|
||||
SpanData actualSpanData = noopExporter.spanDataCollection.iterator().next();
|
||||
Assert.assertEquals(StatusCode.OK, actualSpanData.getStatus().getStatusCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testErrorStatus()
|
||||
{
|
||||
final ServiceMetricEvent queryTimeMetric =
|
||||
new ServiceMetricEvent.Builder().setDimension("success", "false")
|
||||
.build(
|
||||
TIMESTAMP,
|
||||
"query/time",
|
||||
100
|
||||
)
|
||||
.build(
|
||||
"druid/broker",
|
||||
"host"
|
||||
);
|
||||
|
||||
emitter.emit(queryTimeMetric);
|
||||
|
||||
SpanData actualSpanData = noopExporter.spanDataCollection.iterator().next();
|
||||
Assert.assertEquals(StatusCode.ERROR, actualSpanData.getStatus().getStatusCode());
|
||||
}
|
||||
}
|
3
pom.xml
3
pom.xml
|
@ -198,6 +198,7 @@
|
|||
<module>extensions-contrib/gce-extensions</module>
|
||||
<module>extensions-contrib/aliyun-oss-extensions</module>
|
||||
<module>extensions-contrib/prometheus-emitter</module>
|
||||
<module>extensions-contrib/opentelemetry-emitter</module>
|
||||
<!-- distribution packaging -->
|
||||
<module>distribution</module>
|
||||
</modules>
|
||||
|
@ -1567,7 +1568,7 @@
|
|||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>2.2</version>
|
||||
<version>3.2.4</version>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
|
|
Loading…
Reference in New Issue