From aa7aee53ce524b7887b218333166941654788794 Mon Sep 17 00:00:00 2001 From: QiuMM Date: Wed, 14 Feb 2018 05:10:22 +0800 Subject: [PATCH] Opentsdb emitter extension (#5380) * opentsdb emitter extension * doc for opentsdb emitter extension * update opentsdb emitter doc * add the ms unit to the constant name * add a configurable event limit * fix version to 0.13.0-SNAPSHOT * using a thread to consume metric event * rename method and parameter --- distribution/pom.xml | 2 + .../extensions-contrib/opentsdb-emitter.md | 42 ++++ docs/content/development/extensions.md | 1 + extensions-contrib/opentsdb-emitter/pom.xml | 64 ++++++ .../emitter/opentsdb/EventConverter.java | 109 ++++++++++ .../emitter/opentsdb/OpentsdbEmitter.java | 82 +++++++ .../opentsdb/OpentsdbEmitterConfig.java | 158 ++++++++++++++ .../opentsdb/OpentsdbEmitterModule.java | 58 +++++ .../druid/emitter/opentsdb/OpentsdbEvent.java | 109 ++++++++++ .../emitter/opentsdb/OpentsdbSender.java | 129 +++++++++++ .../io.druid.initialization.DruidModule | 1 + .../src/main/resources/defaultMetrics.json | 205 ++++++++++++++++++ .../emitter/opentsdb/EventConverterTest.java | 81 +++++++ .../opentsdb/OpentsdbEmitterConfigTest.java | 48 ++++ .../emitter/opentsdb/OpentsdbEventTest.java | 54 +++++ .../emitter/opentsdb/OpentsdbSenderTest.java | 34 +++ pom.xml | 1 + 17 files changed, 1178 insertions(+) create mode 100644 docs/content/development/extensions-contrib/opentsdb-emitter.md create mode 100644 extensions-contrib/opentsdb-emitter/pom.xml create mode 100644 extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/EventConverter.java create mode 100644 extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbEmitter.java create mode 100644 extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbEmitterConfig.java create mode 100644 extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbEmitterModule.java create mode 100644 extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbEvent.java create mode 100644 extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbSender.java create mode 100644 extensions-contrib/opentsdb-emitter/src/main/resources/META-INF/services/io.druid.initialization.DruidModule create mode 100644 extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json create mode 100644 extensions-contrib/opentsdb-emitter/src/test/java/io/druid/emitter/opentsdb/EventConverterTest.java create mode 100644 extensions-contrib/opentsdb-emitter/src/test/java/io/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java create mode 100644 extensions-contrib/opentsdb-emitter/src/test/java/io/druid/emitter/opentsdb/OpentsdbEventTest.java create mode 100644 extensions-contrib/opentsdb-emitter/src/test/java/io/druid/emitter/opentsdb/OpentsdbSenderTest.java diff --git a/distribution/pom.xml b/distribution/pom.xml index 1059f5ecd6d..e93dbdb1628 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -224,6 +224,8 @@ -c io.druid.extensions.contrib:graphite-emitter -c + io.druid.extensions.contrib:druid-opentsdb-emitter + -c io.druid.extensions.contrib:druid-orc-extensions -c io.druid.extensions.contrib:druid-parquet-extensions diff --git a/docs/content/development/extensions-contrib/opentsdb-emitter.md b/docs/content/development/extensions-contrib/opentsdb-emitter.md new file mode 100644 index 00000000000..dafdfd07552 --- /dev/null +++ b/docs/content/development/extensions-contrib/opentsdb-emitter.md @@ -0,0 +1,42 @@ +--- +layout: doc_page +--- + +# Opentsdb Emitter + +To use this extension, make sure to [include](../../operations/including-extensions.html) `opentsdb-emitter` extension. + +## Introduction + +This extension emits druid metrics to [OpenTSDB](https://github.com/OpenTSDB/opentsdb) over HTTP. And this emitter only emits service metric events to OpenTSDB (See http://druid.io/docs/latest/operations/metrics.html for a list of metrics). + +## Configuration + +All the configuration parameters for the opentsdb emitter are under `druid.emitter.opentsdb`. + +|property|description|required?|default| +|--------|-----------|---------|-------| +|`druid.emitter.opentsdb.host`|The host of the OpenTSDB server.|yes|none| +|`druid.emitter.opentsdb.port`|The port of the OpenTSDB server.|yes|none| +|`druid.emitter.opentsdb.connectionTimeout`|Connection timeout(in milliseconds).|no|2000| +|`druid.emitter.opentsdb.readTimeout`|Read timeout(in milliseconds).|no|2000| +|`druid.emitter.opentsdb.flushThreshold`|Queue flushing threshold.(Events will be sent as one batch)|no|100| +|`druid.emitter.opentsdb.maxQueueSize`|Maximum size of the queue used to buffer events.|no|1000| +|`druid.emitter.opentsdb.metricMapPath`|JSON file defining the desired metrics and dimensions for every Druid metric|no|./src/main/resources/defaultMetrics.json| + +### Druid to OpenTSDB Event Converter + +The opentsdb emitter will send only the desired metrics and dimensions which is defined in a JSON file. +If the user does not specify their own JSON file, a default file is used. All metrics are expected to be configured in the JSON file. Metrics which are not configured will be logged. +Desired metrics and dimensions is organized using the following schema:` : [ ]`
+e.g. + +```json +"query/time": [ + "dataSource", + "type" +] +``` + +For most use-cases, the default configuration is sufficient. + diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index a9e1d1ee2f0..4e2bea613fa 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -71,6 +71,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c |statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)| |kafka-emitter|Kafka metrics emitter|[link](../development/extensions-contrib/kafka-emitter.html)| |druid-thrift-extensions|Support thrift ingestion |[link](../development/extensions-contrib/thrift.html)| +|druid-opentsdb-emitter|OpenTSDB metrics emitter |[link](../development/extensions-contrib/opentsdb-emitter.html)| ## Promoting Community Extension to Core Extension diff --git a/extensions-contrib/opentsdb-emitter/pom.xml b/extensions-contrib/opentsdb-emitter/pom.xml new file mode 100644 index 00000000000..22f475dd5fc --- /dev/null +++ b/extensions-contrib/opentsdb-emitter/pom.xml @@ -0,0 +1,64 @@ + + + + + + 4.0.0 + + io.druid.extensions.contrib + druid-opentsdb-emitter + druid-opentsdb-emitter + + + io.druid + druid + 0.13.0-SNAPSHOT + ../../pom.xml + + + + + io.druid + druid-api + ${project.parent.version} + provided + + + io.druid + druid-server + ${project.parent.version} + provided + + + com.sun.jersey + jersey-client + ${jersey.version} + + + + + junit + junit + test + + + diff --git a/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/EventConverter.java b/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/EventConverter.java new file mode 100644 index 00000000000..83f8986ef52 --- /dev/null +++ b/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/EventConverter.java @@ -0,0 +1,109 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.opentsdb; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import io.druid.java.util.emitter.service.ServiceMetricEvent; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +public class EventConverter +{ + private static final Logger log = new Logger(EventConverter.class); + private static final Pattern WHITESPACE = Pattern.compile("[\\s]+"); + + private final Map> metricMap; + + public EventConverter(ObjectMapper mapper, String metricMapPath) + { + metricMap = readMap(mapper, metricMapPath); + } + + protected String sanitize(String metric) + { + return WHITESPACE.matcher(metric.trim()).replaceAll("_").replaceAll("/", "."); + } + + /** + * This function will convert a druid event to a opentsdb event. + * Also this function acts as a filter. It returns null if the event is not suppose to be emitted to Opentsdb. + * And it will filter out dimensions which is not suppose to be emitted. + * + * @param serviceMetricEvent Druid event ot type {@link ServiceMetricEvent} + * + * @return {@link OpentsdbEvent} or null + */ + public OpentsdbEvent convert(ServiceMetricEvent serviceMetricEvent) + { + String metric = serviceMetricEvent.getMetric(); + if (!metricMap.containsKey(metric)) { + return null; + } + + long timestamp = serviceMetricEvent.getCreatedTime().getMillis() / 1000L; + Number value = serviceMetricEvent.getValue(); + + Map tags = new HashMap<>(); + String service = serviceMetricEvent.getService(); + String host = serviceMetricEvent.getHost(); + tags.put("service", service); + tags.put("host", host); + + Map userDims = serviceMetricEvent.getUserDims(); + for (String dim : metricMap.get(metric)) { + if (userDims.containsKey(dim)) { + tags.put(dim, userDims.get(dim)); + } + } + + return new OpentsdbEvent(sanitize(metric), timestamp, value, tags); + } + + private Map> readMap(ObjectMapper mapper, String metricMapPath) + { + try { + InputStream is; + if (Strings.isNullOrEmpty(metricMapPath)) { + log.info("Using default metric map"); + is = this.getClass().getClassLoader().getResourceAsStream("defaultMetrics.json"); + } else { + log.info("Using default metric map located at [%s]", metricMapPath); + is = new FileInputStream(new File(metricMapPath)); + } + return mapper.reader(new TypeReference>>() + { + }).readValue(is); + } + catch (IOException e) { + throw new ISE(e, "Failed to parse metrics and dimensions"); + } + } +} diff --git a/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbEmitter.java b/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbEmitter.java new file mode 100644 index 00000000000..dd5ab7ae849 --- /dev/null +++ b/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbEmitter.java @@ -0,0 +1,82 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.opentsdb; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.emitter.core.Emitter; +import io.druid.java.util.emitter.core.Event; +import io.druid.java.util.emitter.service.ServiceMetricEvent; + +import java.io.IOException; + +public class OpentsdbEmitter implements Emitter +{ + private static final Logger log = new Logger(OpentsdbEmitter.class); + + private final OpentsdbSender sender; + private final EventConverter converter; + + public OpentsdbEmitter(OpentsdbEmitterConfig config, ObjectMapper mapper) + { + this.sender = new OpentsdbSender( + config.getHost(), + config.getPort(), + config.getConnectionTimeout(), + config.getReadTimeout(), + config.getFlushThreshold(), + config.getMaxQueueSize() + ); + this.converter = new EventConverter(mapper, config.getMetricMapPath()); + } + + @Override + public void start() + { + } + + @Override + public void emit(Event event) + { + if (event instanceof ServiceMetricEvent) { + OpentsdbEvent opentsdbEvent = converter.convert((ServiceMetricEvent) event); + if (opentsdbEvent != null) { + sender.enqueue(opentsdbEvent); + } else { + log.debug( + "Metric=[%s] has not been configured to be emitted to opentsdb", + ((ServiceMetricEvent) event).getMetric() + ); + } + } + } + + @Override + public void flush() throws IOException + { + sender.flush(); + } + + @Override + public void close() throws IOException + { + sender.close(); + } +} diff --git a/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbEmitterConfig.java b/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbEmitterConfig.java new file mode 100644 index 00000000000..1a3a5d47852 --- /dev/null +++ b/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbEmitterConfig.java @@ -0,0 +1,158 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.opentsdb; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +public class OpentsdbEmitterConfig +{ + private static final int DEFAULT_FLUSH_THRESHOLD = 100; + private static final int DEFAULT_MAX_QUEUE_SIZE = 1000; + private static final int DEFAULT_CONNECTION_TIMEOUT_MILLIS = 2000; + private static final int DEFAULT_READ_TIMEOUT_MILLIS = 2000; + + @JsonProperty + private final String host; + + @JsonProperty + private final int port; + + @JsonProperty + private final int connectionTimeout; + + @JsonProperty + private final int readTimeout; + + @JsonProperty + private final int flushThreshold; + + @JsonProperty + private final int maxQueueSize; + + @JsonProperty + private final String metricMapPath; + + @JsonCreator + public OpentsdbEmitterConfig( + @JsonProperty("host") String host, + @JsonProperty("port") Integer port, + @JsonProperty("connectionTimeout") Integer connectionTimeout, + @JsonProperty("readTimeout") Integer readTimeout, + @JsonProperty("flushThreshold") Integer flushThreshold, + @JsonProperty("maxQueueSize") Integer maxQueueSize, + @JsonProperty("metricMapPath") String metricMapPath + ) + { + this.host = Preconditions.checkNotNull(host, "host can not be null."); + this.port = Preconditions.checkNotNull(port, "port can not be null"); + this.connectionTimeout = (connectionTimeout == null || connectionTimeout < 0) + ? DEFAULT_CONNECTION_TIMEOUT_MILLIS + : connectionTimeout; + this.readTimeout = + (readTimeout == null || readTimeout < 0) ? DEFAULT_READ_TIMEOUT_MILLIS : readTimeout; + this.flushThreshold = (flushThreshold == null || flushThreshold < 0) ? DEFAULT_FLUSH_THRESHOLD : flushThreshold; + this.maxQueueSize = (maxQueueSize == null || maxQueueSize < 0) ? DEFAULT_MAX_QUEUE_SIZE : maxQueueSize; + this.metricMapPath = metricMapPath; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + OpentsdbEmitterConfig that = (OpentsdbEmitterConfig) o; + + if (!host.equals(that.host)) { + return false; + } + if (port != that.port) { + return false; + } + if (connectionTimeout != that.connectionTimeout) { + return false; + } + if (readTimeout != that.readTimeout) { + return false; + } + if (flushThreshold != that.flushThreshold) { + return false; + } + if (maxQueueSize != that.maxQueueSize) { + return false; + } + return metricMapPath != null ? metricMapPath.equals(that.metricMapPath) + : that.metricMapPath == null; + } + + @Override + public int hashCode() + { + int result = host.hashCode(); + result = 31 * result + port; + result = 31 * result + connectionTimeout; + result = 31 * result + readTimeout; + result = 31 * result + flushThreshold; + result = 31 * result + maxQueueSize; + result = 31 * result + (metricMapPath != null ? metricMapPath.hashCode() : 0); + return result; + } + + public String getHost() + { + return host; + } + + public int getPort() + { + return port; + } + + public int getConnectionTimeout() + { + return connectionTimeout; + } + + public int getReadTimeout() + { + return readTimeout; + } + + public int getFlushThreshold() + { + return flushThreshold; + } + + public int getMaxQueueSize() + { + return maxQueueSize; + } + + public String getMetricMapPath() + { + return metricMapPath; + } +} diff --git a/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbEmitterModule.java b/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbEmitterModule.java new file mode 100644 index 00000000000..fdd2adc444c --- /dev/null +++ b/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbEmitterModule.java @@ -0,0 +1,58 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.opentsdb; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Binder; +import com.google.inject.Provides; +import com.google.inject.name.Named; +import io.druid.java.util.emitter.core.Emitter; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.ManageLifecycle; +import io.druid.initialization.DruidModule; + +import java.util.Collections; +import java.util.List; + +public class OpentsdbEmitterModule implements DruidModule +{ + private static final String EMITTER_TYPE = "opentsdb"; + + @Override + public List getJacksonModules() + { + return Collections.EMPTY_LIST; + } + + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.emitter." + EMITTER_TYPE, OpentsdbEmitterConfig.class); + } + + @Provides + @ManageLifecycle + @Named(EMITTER_TYPE) + public Emitter getEmitter(OpentsdbEmitterConfig config, ObjectMapper mapper) + { + return new OpentsdbEmitter(config, mapper); + } +} diff --git a/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbEvent.java b/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbEvent.java new file mode 100644 index 00000000000..d828c491f9b --- /dev/null +++ b/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbEvent.java @@ -0,0 +1,109 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.opentsdb; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import java.util.Map; + +public class OpentsdbEvent +{ + + @JsonProperty + private final String metric; + + // timestamp in seconds + @JsonProperty + private final long timestamp; + + @JsonProperty + private final Object value; + + @JsonProperty + private final Map tags; + + public OpentsdbEvent( + @JsonProperty("metric") String metric, + @JsonProperty("timestamp") Long timestamp, + @JsonProperty("value") Object value, + @JsonProperty("tags") Map tags + ) + { + this.metric = Preconditions.checkNotNull(metric, "metric can not be null."); + this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp can not be null."); + this.value = Preconditions.checkNotNull(value, "value can not be null."); + this.tags = Preconditions.checkNotNull(tags, "tags can not be null."); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + OpentsdbEvent that = (OpentsdbEvent) o; + + if (!metric.equals(that.metric)) { + return false; + } + if (timestamp != that.timestamp) { + return false; + } + if (!value.equals(that.value)) { + return false; + } + return tags.equals(that.tags); + } + + @Override + public int hashCode() + { + int result = metric.hashCode(); + result = 31 * result + (int) timestamp; + result = 31 * result + value.hashCode(); + result = 31 * result + tags.hashCode(); + return result; + } + + public String getMetric() + { + return metric; + } + + public long getTimestamp() + { + return timestamp; + } + + public Object getValue() + { + return value; + } + + public Map getTags() + { + return tags; + } +} diff --git a/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbSender.java b/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbSender.java new file mode 100644 index 00000000000..15a12cb7a26 --- /dev/null +++ b/extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbSender.java @@ -0,0 +1,129 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.opentsdb; + +import com.google.common.annotations.VisibleForTesting; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.WebResource; +import io.druid.java.util.common.logger.Logger; + +import javax.ws.rs.core.MediaType; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +public class OpentsdbSender +{ + /** + * @see Opentsdb - /api/put + */ + private static final String PATH = "/api/put"; + private static final Logger log = new Logger(OpentsdbSender.class); + + private final AtomicLong countLostEvents = new AtomicLong(0); + private final int flushThreshold; + private final List events; + private final BlockingQueue eventQueue; + private final Client client; + private final WebResource webResource; + private final ExecutorService executor = Executors.newFixedThreadPool(1); + private volatile boolean running = true; + + public OpentsdbSender(String host, int port, int connectionTimeout, int readTimeout, int flushThreshold, int maxQueueSize) + { + this.flushThreshold = flushThreshold; + events = new ArrayList<>(flushThreshold); + eventQueue = new ArrayBlockingQueue<>(maxQueueSize); + + client = Client.create(); + client.setConnectTimeout(connectionTimeout); + client.setReadTimeout(readTimeout); + webResource = client.resource("http://" + host + ":" + port + PATH); + + executor.execute(new EventConsumer()); + } + + public void enqueue(OpentsdbEvent event) + { + if (!eventQueue.offer(event)) { + if (countLostEvents.getAndIncrement() % 1000 == 0) { + log.error( + "Lost total of [%s] events because of emitter queue is full. Please increase the capacity.", + countLostEvents.get() + ); + } + } + } + + public void flush() + { + sendEvents(); + } + + public void close() + { + flush(); + client.destroy(); + running = false; + executor.shutdown(); + } + + private void sendEvents() + { + if (!events.isEmpty()) { + try { + webResource.entity(events, MediaType.APPLICATION_JSON_TYPE).post(); + } + catch (Exception e) { + log.error(e, "send to opentsdb server failed"); + } + finally { + events.clear(); + } + } + } + + private class EventConsumer implements Runnable + { + @Override + public void run() + { + while (running) { + if (!eventQueue.isEmpty()) { + OpentsdbEvent event = eventQueue.poll(); + events.add(event); + if (events.size() >= flushThreshold) { + sendEvents(); + } + } + } + } + } + + @VisibleForTesting + WebResource getWebResource() + { + return webResource; + } +} diff --git a/extensions-contrib/opentsdb-emitter/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/opentsdb-emitter/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 00000000000..c092d720343 --- /dev/null +++ b/extensions-contrib/opentsdb-emitter/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.emitter.opentsdb.OpentsdbEmitterModule diff --git a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json new file mode 100644 index 00000000000..f73541c17f3 --- /dev/null +++ b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json @@ -0,0 +1,205 @@ +{ + "query/time": [ + "dataSource", + "type" + ], + "query/bytes": [ + "dataSource", + "type" + ], + "query/node/time": [ + "server" + ], + "query/node/bytes": [ + "server" + ], + "query/node/ttfb": [ + "server" + ], + "query/intervalChunk/time": [ + "chunkInterval" + ], + "query/success/count": [], + "query/failed/count": [], + "query/interrupted/count": [], + "query/wait/time": [ + "segment" + ], + "query/cpu/time": [ + "dataSource", + "type" + ], + "jetty/numOpenConnections": [], + "query/cache/delta/numEntries": [], + "query/cache/delta/sizeBytes": [], + "query/cache/delta/hits": [], + "query/cache/delta/misses": [], + "query/cache/delta/evictions": [], + "query/cache/delta/hitRate": [], + "query/cache/delta/averageBytes": [], + "query/cache/delta/timeouts": [], + "query/cache/delta/errors": [], + "query/cache/total/numEntries": [], + "query/cache/total/sizeBytes": [], + "query/cache/total/hits": [], + "query/cache/total/misses": [], + "query/cache/total/evictions": [], + "query/cache/total/hitRate": [], + "query/cache/total/averageBytes": [], + "query/cache/total/timeouts": [], + "query/cache/total/errors": [], + "ingest/events/thrownAway": [ + "dataSource" + ], + "ingest/events/unparseable": [ + "dataSource" + ], + "ingest/events/processed": [ + "dataSource" + ], + "ingest/rows/output": [ + "dataSource" + ], + "ingest/persist/count": [ + "dataSource" + ], + "ingest/persist/time": [ + "dataSource" + ], + "ingest/persist/cpu": [ + "dataSource" + ], + "ingest/persist/backPressure": [ + "dataSource" + ], + "ingest/persist/failed": [ + "dataSource" + ], + "ingest/handoff/failed": [ + "dataSource" + ], + "ingest/merge/time": [ + "dataSource" + ], + "ingest/merge/cpu": [ + "dataSource" + ], + "ingest/handoff/count": [ + "dataSource" + ], + "ingest/sink/count": [ + "dataSource" + ], + "ingest/events/messageGap": [ + "dataSource" + ], + "ingest/kafka/lag": [ + "dataSource" + ], + "task/run/time": [ + "dataSource" + ], + "segment/added/bytes": [ + "dataSource" + ], + "segment/moved/bytes": [ + "dataSource" + ], + "segment/nuked/bytes": [ + "dataSource" + ], + "segment/assigned/count": [ + "tier" + ], + "segment/moved/count": [ + "tier" + ], + "segment/dropped/count": [ + "tier" + ], + "segment/deleted/count": [ + "tier" + ], + "segment/unneeded/count": [ + "tier" + ], + "segment/cost/raw": [ + "tier" + ], + "segment/cost/normalization": [ + "tier" + ], + "segment/cost/normalized": [ + "tier" + ], + "segment/loadQueue/size": [ + "server" + ], + "segment/loadQueue/failed": [ + "server" + ], + "segment/loadQueue/count": [ + "server" + ], + "segment/dropQueue/count": [ + "server" + ], + "segment/size": [ + "dataSource" + ], + "segment/count": [ + "dataSource" + ], + "segment/overShadowed/count": [], + "segment/unavailable/count": [ + "dataSource" + ], + "segment/underReplicated/count": [ + "dataSource" + ], + "segment/max": [], + "segment/used": [ + "dataSource" + ], + "segment/usedPercent": [ + "dataSource" + ], + "segment/pendingDelete": [], + "jvm/pool/committed": [], + "jvm/pool/init": [], + "jvm/pool/max": [], + "jvm/pool/used": [], + "jvm/bufferpool/count": [], + "jvm/bufferpool/used": [], + "jvm/bufferpool/capacity": [], + "jvm/mem/init": [], + "jvm/mem/max": [], + "jvm/mem/used": [], + "jvm/mem/committed": [], + "jvm/gc/count": [], + "jvm/gc/time": [], + "ingest/events/buffered": [ + "dataSource" + ], + "ingest/bytes/received": [ + "dataSource" + ], + "sys/swap/free": [], + "sys/swap/max": [], + "sys/swap/pageIn": [], + "sys/swap/pageOut": [], + "sys/disk/write/count": [], + "sys/disk/read/count": [], + "sys/disk/write/size": [], + "sys/disk/read/size": [], + "sys/net/write/size": [], + "sys/net/read/size": [], + "sys/fs/used": [], + "sys/fs/max": [], + "sys/mem/used": [], + "sys/mem/max": [], + "sys/storage/used": [], + "sys/cpu": [], + "coordinator-segment/count": [], + "historical-segment/count": [] +} \ No newline at end of file diff --git a/extensions-contrib/opentsdb-emitter/src/test/java/io/druid/emitter/opentsdb/EventConverterTest.java b/extensions-contrib/opentsdb-emitter/src/test/java/io/druid/emitter/opentsdb/EventConverterTest.java new file mode 100644 index 00000000000..90d8ac745e4 --- /dev/null +++ b/extensions-contrib/opentsdb-emitter/src/test/java/io/druid/emitter/opentsdb/EventConverterTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.opentsdb; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.java.util.emitter.service.ServiceMetricEvent; +import io.druid.java.util.common.DateTimes; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class EventConverterTest +{ + private EventConverter converter; + + @Before + public void setUp() + { + converter = new EventConverter(new ObjectMapper(), null); + } + + @Test + public void testSanitize() + { + String metric = " foo bar/baz"; + Assert.assertEquals("foo_bar.baz", converter.sanitize(metric)); + } + + @Test + public void testConvert() throws Exception + { + DateTime dateTime = DateTimes.nowUtc(); + ServiceMetricEvent configuredEvent = new ServiceMetricEvent.Builder() + .setDimension("dataSource", "data-source") + .setDimension("type", "groupBy") + .build(dateTime, "query/time", 10) + .build("broker", "brokerHost1"); + + Map expectedTags = new HashMap<>(); + expectedTags.put("service", "broker"); + expectedTags.put("host", "brokerHost1"); + expectedTags.put("dataSource", "data-source"); + expectedTags.put("type", "groupBy"); + + OpentsdbEvent opentsdbEvent = converter.convert(configuredEvent); + assertEquals("query.time", opentsdbEvent.getMetric()); + assertEquals(dateTime.getMillis() / 1000L, opentsdbEvent.getTimestamp()); + assertEquals(10, opentsdbEvent.getValue()); + assertEquals(expectedTags, opentsdbEvent.getTags()); + + ServiceMetricEvent notConfiguredEvent = new ServiceMetricEvent.Builder() + .setDimension("dataSource", "data-source") + .setDimension("type", "groupBy") + .build(dateTime, "foo/bar", 10) + .build("broker", "brokerHost1"); + assertEquals(null, converter.convert(notConfiguredEvent)); + } +} diff --git a/extensions-contrib/opentsdb-emitter/src/test/java/io/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java b/extensions-contrib/opentsdb-emitter/src/test/java/io/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java new file mode 100644 index 00000000000..729628f112a --- /dev/null +++ b/extensions-contrib/opentsdb-emitter/src/test/java/io/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.opentsdb; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class OpentsdbEmitterConfigTest +{ + private ObjectMapper mapper = new DefaultObjectMapper(); + + @Before + public void setUp() + { + mapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())); + } + + @Test + public void testSerDeserOpentsdbEmitterConfig() throws Exception + { + OpentsdbEmitterConfig opentsdbEmitterConfig = new OpentsdbEmitterConfig("localhost", 9999, 2000, 2000, 200, 2000, null); + String opentsdbEmitterConfigString = mapper.writeValueAsString(opentsdbEmitterConfig); + OpentsdbEmitterConfig expectedOpentsdbEmitterConfig = mapper.reader(OpentsdbEmitterConfig.class) + .readValue(opentsdbEmitterConfigString); + Assert.assertEquals(expectedOpentsdbEmitterConfig, opentsdbEmitterConfig); + } +} diff --git a/extensions-contrib/opentsdb-emitter/src/test/java/io/druid/emitter/opentsdb/OpentsdbEventTest.java b/extensions-contrib/opentsdb-emitter/src/test/java/io/druid/emitter/opentsdb/OpentsdbEventTest.java new file mode 100644 index 00000000000..4a2801364d1 --- /dev/null +++ b/extensions-contrib/opentsdb-emitter/src/test/java/io/druid/emitter/opentsdb/OpentsdbEventTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.opentsdb; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class OpentsdbEventTest +{ + private ObjectMapper mapper = new DefaultObjectMapper(); + + @Before + public void setUp() + { + mapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())); + } + + @Test + public void testSerDeserOpentsdbEvent() throws Exception + { + Map tags = new HashMap<>(); + tags.put("foo", "bar"); + tags.put("baz", 1); + OpentsdbEvent opentsdbEvent = new OpentsdbEvent("foo.bar", 1000L, 20, tags); + String opentsdbString = mapper.writeValueAsString(opentsdbEvent); + OpentsdbEvent expectedOpentsdbEvent = mapper.reader(OpentsdbEvent.class) + .readValue(opentsdbString); + Assert.assertEquals(expectedOpentsdbEvent, opentsdbEvent); + } +} diff --git a/extensions-contrib/opentsdb-emitter/src/test/java/io/druid/emitter/opentsdb/OpentsdbSenderTest.java b/extensions-contrib/opentsdb-emitter/src/test/java/io/druid/emitter/opentsdb/OpentsdbSenderTest.java new file mode 100644 index 00000000000..84cd167e121 --- /dev/null +++ b/extensions-contrib/opentsdb-emitter/src/test/java/io/druid/emitter/opentsdb/OpentsdbSenderTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.opentsdb; + +import org.junit.Assert; +import org.junit.Test; + +public class OpentsdbSenderTest +{ + @Test + public void testUrl() throws Exception + { + OpentsdbSender sender = new OpentsdbSender("localhost", 9999, 2000, 2000, 100, 1000); + String expectedUrl = "http://localhost:9999/api/put"; + Assert.assertEquals(expectedUrl, sender.getWebResource().getURI().toString()); + } +} diff --git a/pom.xml b/pom.xml index 3aa4cd472c6..9220865ca47 100644 --- a/pom.xml +++ b/pom.xml @@ -143,6 +143,7 @@ extensions-contrib/sqlserver-metadata-storage extensions-contrib/kafka-emitter extensions-contrib/redis-cache + extensions-contrib/opentsdb-emitter distribution