mirror of https://github.com/apache/druid.git
Contrib Extension for Ambari Metrics Emitter (#3767)
* Contrib Extension for Ambari Metrics Emitter extension to enable druid to send metrics to ambari metrics server (https://cwiki.apache.org/confluence/display/AMBARI/Metrics) review comments switch to public repo * review comments * add docs * fix pom version * Add link for doc page in extensions.md * remove unused imports * review comments review comments remove unused dependency review comment
This commit is contained in:
parent
35160e5595
commit
f576a0ff14
|
@ -0,0 +1,80 @@
|
||||||
|
---
|
||||||
|
layout: doc_page
|
||||||
|
---
|
||||||
|
|
||||||
|
# Ambari Metrics Emitter
|
||||||
|
|
||||||
|
To use this extension, make sure to [include](../../operations/including-extensions.html) `ambari-metrics-emitter` extension.
|
||||||
|
|
||||||
|
## Introduction
|
||||||
|
|
||||||
|
This extension emits druid metrics to a ambari-metrics carbon server.
|
||||||
|
Events are sent after been [pickled](http://ambari-metrics.readthedocs.org/en/latest/feeding-carbon.html#the-pickle-protocol); the size of the batch is configurable.
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
All the configuration parameters for ambari-metrics emitter are under `druid.emitter.ambari-metrics`.
|
||||||
|
|
||||||
|
|property|description|required?|default|
|
||||||
|
|--------|-----------|---------|-------|
|
||||||
|
|`druid.emitter.ambari-metrics.hostname`|The hostname of the ambari-metrics server.|yes|none|
|
||||||
|
|`druid.emitter.ambari-metrics.port`|The port of the ambari-metrics server.|yes|none|
|
||||||
|
|`druid.emitter.ambari-metrics.protocol`|The protocol used to send metrics to ambari metrics collector. One of http/https|no|http|
|
||||||
|
|`druid.emitter.ambari-metrics.trustStorePath`|Path to trustStore to be used for https|no|none|
|
||||||
|
|`druid.emitter.ambari-metrics.trustStoreType`|trustStore type to be used for https|no|none|
|
||||||
|
|`druid.emitter.ambari-metrics.trustStoreType`|trustStore password to be used for https|no|none|
|
||||||
|
|`druid.emitter.ambari-metrics.batchSize`|Number of events to send as one batch.|no|100|
|
||||||
|
|`druid.emitter.ambari-metrics.eventConverter`| Filter and converter of druid events to ambari-metrics timeline event(please see next section). |yes|none|
|
||||||
|
|`druid.emitter.ambari-metrics.flushPeriod` | Queue flushing period in milliseconds. |no|1 minute|
|
||||||
|
|`druid.emitter.ambari-metrics.maxQueueSize`| Maximum size of the queue used to buffer events. |no|`MAX_INT`|
|
||||||
|
|`druid.emitter.ambari-metrics.alertEmitters`| List of emitters where alerts will be forwarded to. |no| empty list (no forwarding)|
|
||||||
|
|`druid.emitter.ambari-metrics.emitWaitTime` | wait time in milliseconds to try to send the event otherwise emitter will throwing event. |no|0|
|
||||||
|
|`druid.emitter.ambari-metrics.waitForEventTime` | waiting time in milliseconds if necessary for an event to become available. |no|1000 (1 sec)|
|
||||||
|
|
||||||
|
### Druid to Ambari Metrics Timeline Event Converter
|
||||||
|
|
||||||
|
Ambari Metrics Timeline Event Converter defines a mapping between druid metrics name plus dimensions to a timeline event metricName.
|
||||||
|
ambari-metrics metric path is organized using the following schema:
|
||||||
|
`<namespacePrefix>.[<druid service name>].[<druid hostname>].<druid metrics dimensions>.<druid metrics name>`
|
||||||
|
Properly naming the metrics is critical to avoid conflicts, confusing data and potentially wrong interpretation later on.
|
||||||
|
|
||||||
|
Example `druid.historical.hist-host1:8080.MyDataSourceName.GroupBy.query/time`:
|
||||||
|
|
||||||
|
* `druid` -> namespace prefix
|
||||||
|
* `historical` -> service name
|
||||||
|
* `hist-host1:8080` -> druid hostname
|
||||||
|
* `MyDataSourceName` -> dimension value
|
||||||
|
* `GroupBy` -> dimension value
|
||||||
|
* `query/time` -> metric name
|
||||||
|
|
||||||
|
We have two different implementation of event converter:
|
||||||
|
|
||||||
|
#### Send-All converter
|
||||||
|
|
||||||
|
The first implementation called `all`, will send all the druid service metrics events.
|
||||||
|
The path will be in the form `<namespacePrefix>.[<druid service name>].[<druid hostname>].<dimensions values ordered by dimension's name>.<metric>`
|
||||||
|
User has control of `<namespacePrefix>.[<druid service name>].[<druid hostname>].`
|
||||||
|
|
||||||
|
```json
|
||||||
|
|
||||||
|
druid.emitter.ambari-metrics.eventConverter={"type":"all", "namespacePrefix": "druid.test", "appName":"druid"}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
#### White-list based converter
|
||||||
|
|
||||||
|
The second implementation called `whiteList`, will send only the white listed metrics and dimensions.
|
||||||
|
Same as for the `all` converter user has control of `<namespacePrefix>.[<druid service name>].[<druid hostname>].`
|
||||||
|
White-list based converter comes with the following default white list map located under resources in `./src/main/resources/defaultWhiteListMap.json`
|
||||||
|
|
||||||
|
Although user can override the default white list map by supplying a property called `mapPath`.
|
||||||
|
This property is a String containing the path for the file containing **white list map Json object**.
|
||||||
|
For example the following converter will read the map from the file `/pathPrefix/fileName.json`.
|
||||||
|
|
||||||
|
```json
|
||||||
|
|
||||||
|
druid.emitter.ambari-metrics.eventConverter={"type":"whiteList", "namespacePrefix": "druid.test", "ignoreHostname":true, "appName":"druid", "mapPath":"/pathPrefix/fileName.json"}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
**Druid emits a huge number of metrics we highly recommend to use the `whiteList` converter**
|
|
@ -49,6 +49,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c
|
||||||
|
|
||||||
|Name|Description|Docs|
|
|Name|Description|Docs|
|
||||||
|----|-----------|----|
|
|----|-----------|----|
|
||||||
|
|ambari-metrics-emitter|Ambari Metrics Emitter |[link](../development/extensions-contrib/ambari-metrics-emitter.html)|
|
||||||
|druid-azure-extensions|Microsoft Azure deep storage.|[link](../development/extensions-contrib/azure.html)|
|
|druid-azure-extensions|Microsoft Azure deep storage.|[link](../development/extensions-contrib/azure.html)|
|
||||||
|druid-cassandra-storage|Apache Cassandra deep storage.|[link](../development/extensions-contrib/cassandra.html)|
|
|druid-cassandra-storage|Apache Cassandra deep storage.|[link](../development/extensions-contrib/cassandra.html)|
|
||||||
|druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.html)|
|
|druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.html)|
|
||||||
|
|
|
@ -0,0 +1,91 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!--
|
||||||
|
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
~ or more contributor license agreements. See the NOTICE file
|
||||||
|
~ distributed with this work for additional information
|
||||||
|
~ regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
~ to you under the Apache License, Version 2.0 (the
|
||||||
|
~ "License"); you may not use this file except in compliance
|
||||||
|
~ with the License. You may obtain a copy of the License at
|
||||||
|
~
|
||||||
|
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
~
|
||||||
|
~ Unless required by applicable law or agreed to in writing,
|
||||||
|
~ software distributed under the License is distributed on an
|
||||||
|
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
~ KIND, either express or implied. See the License for the
|
||||||
|
~ specific language governing permissions and limitations
|
||||||
|
~ under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<groupId>io.druid</groupId>
|
||||||
|
<artifactId>druid</artifactId>
|
||||||
|
<version>0.9.3-SNAPSHOT</version>
|
||||||
|
<relativePath>../../pom.xml</relativePath>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<groupId>io.druid.extensions.contrib</groupId>
|
||||||
|
<artifactId>ambari-metrics-emitter</artifactId>
|
||||||
|
<name>ambari-metrics-emitter</name>
|
||||||
|
<description>Druid emitter extension to support ambari metrics server</description>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.druid</groupId>
|
||||||
|
<artifactId>druid-common</artifactId>
|
||||||
|
<version>${project.parent.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.druid</groupId>
|
||||||
|
<artifactId>druid-api</artifactId>
|
||||||
|
<version>${project.parent.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.metamx</groupId>
|
||||||
|
<artifactId>emitter</artifactId>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.druid</groupId>
|
||||||
|
<artifactId>druid-server</artifactId>
|
||||||
|
<version>${project.parent.version}</version>
|
||||||
|
<type>test-jar</type>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.ambari</groupId>
|
||||||
|
<artifactId>ambari-metrics-common</artifactId>
|
||||||
|
<version>2.4.1.0.22</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>junit</groupId>
|
||||||
|
<artifactId>junit</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.easymock</groupId>
|
||||||
|
<artifactId>easymock</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>pl.pragmatists</groupId>
|
||||||
|
<artifactId>JUnitParams</artifactId>
|
||||||
|
<version>1.0.4</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
<repositories>
|
||||||
|
<repository>
|
||||||
|
<id>hortonworks</id>
|
||||||
|
<name>hortonworks</name>
|
||||||
|
<url>http://repo.hortonworks.com/content/repositories/releases</url>
|
||||||
|
</repository>
|
||||||
|
</repositories>
|
||||||
|
</project>
|
|
@ -0,0 +1,236 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.emitter.ambari.metrics;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import com.metamx.common.ISE;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
import com.metamx.emitter.core.Emitter;
|
||||||
|
import com.metamx.emitter.core.Event;
|
||||||
|
import com.metamx.emitter.service.AlertEvent;
|
||||||
|
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||||
|
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
|
||||||
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
|
||||||
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
|
||||||
|
public class AmbariMetricsEmitter extends AbstractTimelineMetricsSink implements Emitter
|
||||||
|
{
|
||||||
|
private static final Logger log = new Logger(AmbariMetricsEmitter.class);
|
||||||
|
|
||||||
|
private final DruidToTimelineMetricConverter timelineMetricConverter;
|
||||||
|
private final List<Emitter> emitterList;
|
||||||
|
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||||
|
private final LinkedBlockingQueue<TimelineMetric> eventsQueue;
|
||||||
|
private final AmbariMetricsEmitterConfig config;
|
||||||
|
private final String collectorURI;
|
||||||
|
private static final long DEFAULT_FLUSH_TIMEOUT_MILLIS = 60000; // default flush wait 1 min
|
||||||
|
private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
|
||||||
|
.setDaemon(true)
|
||||||
|
.setNameFormat("AmbariMetricsEmitter-%s")
|
||||||
|
.build()); // Thread pool of two in order to schedule flush runnable
|
||||||
|
private final AtomicLong countLostEvents = new AtomicLong(0);
|
||||||
|
|
||||||
|
public AmbariMetricsEmitter(
|
||||||
|
AmbariMetricsEmitterConfig config,
|
||||||
|
List<Emitter> emitterList
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.config = config;
|
||||||
|
this.emitterList = emitterList;
|
||||||
|
this.timelineMetricConverter = config.getDruidToTimelineEventConverter();
|
||||||
|
this.eventsQueue = new LinkedBlockingQueue<>(config.getMaxQueueSize());
|
||||||
|
this.collectorURI = String.format(
|
||||||
|
"%s://%s:%s%s",
|
||||||
|
config.getProtocol(),
|
||||||
|
config.getHostname(),
|
||||||
|
config.getPort(),
|
||||||
|
WS_V1_TIMELINE_METRICS
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start()
|
||||||
|
{
|
||||||
|
synchronized (started) {
|
||||||
|
log.info("Starting Ambari Metrics Emitter.");
|
||||||
|
if (!started.get()) {
|
||||||
|
if (config.getProtocol().equals("https")) {
|
||||||
|
loadTruststore(config.getTrustStorePath(), config.getTrustStoreType(), config.getTrustStorePassword());
|
||||||
|
}
|
||||||
|
exec.scheduleAtFixedRate(
|
||||||
|
new ConsumerRunnable(),
|
||||||
|
config.getFlushPeriod(),
|
||||||
|
config.getFlushPeriod(),
|
||||||
|
TimeUnit.MILLISECONDS
|
||||||
|
);
|
||||||
|
started.set(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void emit(Event event)
|
||||||
|
{
|
||||||
|
if (!started.get()) {
|
||||||
|
throw new ISE("WTF emit was called while service is not started yet");
|
||||||
|
}
|
||||||
|
if (event instanceof ServiceMetricEvent) {
|
||||||
|
final TimelineMetric timelineEvent = timelineMetricConverter.druidEventToTimelineMetric((ServiceMetricEvent) event);
|
||||||
|
if (timelineEvent == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
final boolean isSuccessful = eventsQueue.offer(
|
||||||
|
timelineEvent,
|
||||||
|
config.getEmitWaitTime(),
|
||||||
|
TimeUnit.MILLISECONDS
|
||||||
|
);
|
||||||
|
if (!isSuccessful) {
|
||||||
|
if (countLostEvents.getAndIncrement() % 1000 == 0) {
|
||||||
|
log.error(
|
||||||
|
"Lost total of [%s] events because of emitter queue is full. Please increase the capacity or/and the consumer frequency",
|
||||||
|
countLostEvents.get()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
log.error(e, "got interrupted with message [%s]", e.getMessage());
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
} else if (event instanceof AlertEvent) {
|
||||||
|
for (Emitter emitter : emitterList) {
|
||||||
|
emitter.emit(event);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new ISE("unknown event type [%s]", event.getClass());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getCollectorUri()
|
||||||
|
{
|
||||||
|
return collectorURI;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getTimeoutSeconds()
|
||||||
|
{
|
||||||
|
return (int) (DEFAULT_FLUSH_TIMEOUT_MILLIS / 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ConsumerRunnable implements Runnable
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
int batchSize = config.getBatchSize();
|
||||||
|
TimelineMetrics metrics = new TimelineMetrics();
|
||||||
|
while (eventsQueue.size() > 0 && !exec.isShutdown()) {
|
||||||
|
try {
|
||||||
|
final TimelineMetric metricEvent = eventsQueue.poll(
|
||||||
|
config.getWaitForEventTime(),
|
||||||
|
TimeUnit.MILLISECONDS
|
||||||
|
);
|
||||||
|
if (metricEvent != null) {
|
||||||
|
metrics.addOrMergeTimelineMetric(metricEvent);
|
||||||
|
if (metrics.getMetrics().size() == batchSize) {
|
||||||
|
emitMetrics(metrics);
|
||||||
|
log.debug(
|
||||||
|
"sent [%d] events",
|
||||||
|
metrics.getMetrics().size()
|
||||||
|
);
|
||||||
|
metrics = new TimelineMetrics();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
log.error(e, e.getMessage());
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (metrics.getMetrics().size() > 0) {
|
||||||
|
emitMetrics(metrics);
|
||||||
|
log.debug(
|
||||||
|
"sent [%d] events",
|
||||||
|
metrics.getMetrics().size()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error(e, e.getMessage());
|
||||||
|
if (e instanceof InterruptedException) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void flush() throws IOException
|
||||||
|
{
|
||||||
|
synchronized (started) {
|
||||||
|
if (started.get()) {
|
||||||
|
Future future = exec.schedule(new ConsumerRunnable(), 0, TimeUnit.MILLISECONDS);
|
||||||
|
try {
|
||||||
|
future.get(DEFAULT_FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||||
|
if (e instanceof InterruptedException) {
|
||||||
|
throw new RuntimeException("interrupted flushing elements from queue", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException
|
||||||
|
{
|
||||||
|
synchronized (started) {
|
||||||
|
flush();
|
||||||
|
exec.shutdown();
|
||||||
|
started.set(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static String sanitize(String namespace)
|
||||||
|
{
|
||||||
|
Pattern DOT_OR_WHITESPACE = Pattern.compile("[\\s]+|[.]+");
|
||||||
|
return DOT_OR_WHITESPACE.matcher(namespace).replaceAll("_");
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,262 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.emitter.ambari.metrics;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
|
||||||
|
public class AmbariMetricsEmitterConfig
|
||||||
|
{
|
||||||
|
private final static int DEFAULT_BATCH_SIZE = 100;
|
||||||
|
private final static Long DEFAULT_FLUSH_PERIOD_MILLIS = (long) (60 * 1000); // flush every one minute
|
||||||
|
private final static long DEFAULT_GET_TIMEOUT = 1000; // default wait for get operations on the queue 1 sec
|
||||||
|
private final static String DEFAULT_PROTOCOL = "http";
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final String hostname;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final int port;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final String protocol;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final String trustStorePath;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final String trustStoreType;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final String trustStorePassword;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final int batchSize;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final long flushPeriod;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final int maxQueueSize;
|
||||||
|
|
||||||
|
@JsonProperty("eventConverter")
|
||||||
|
private final DruidToTimelineMetricConverter druidToTimelineEventConverter;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final List<String> alertEmitters;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final long emitWaitTime;
|
||||||
|
|
||||||
|
//waiting up to the specified wait time if necessary for an event to become available.
|
||||||
|
@JsonProperty
|
||||||
|
private final long waitForEventTime;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public AmbariMetricsEmitterConfig(
|
||||||
|
@JsonProperty("hostname") String hostname,
|
||||||
|
@JsonProperty("port") Integer port,
|
||||||
|
@JsonProperty("protocol") String protocol,
|
||||||
|
@JsonProperty("trustStorePath") String trustStorePath,
|
||||||
|
@JsonProperty("trustStoreType") String trustStoretype,
|
||||||
|
@JsonProperty("trustStorePassword") String trustStorePassword,
|
||||||
|
@JsonProperty("batchSize") Integer batchSize,
|
||||||
|
@JsonProperty("flushPeriod") Long flushPeriod,
|
||||||
|
@JsonProperty("maxQueueSize") Integer maxQueueSize,
|
||||||
|
@JsonProperty("eventConverter") DruidToTimelineMetricConverter druidToTimelineEventConverter,
|
||||||
|
@JsonProperty("alertEmitters") List<String> alertEmitters,
|
||||||
|
@JsonProperty("emitWaitTime") Long emitWaitTime,
|
||||||
|
@JsonProperty("waitForEventTime") Long waitForEventTime
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null");
|
||||||
|
this.port = Preconditions.checkNotNull(port, "port can not be null");
|
||||||
|
this.protocol = protocol == null ? DEFAULT_PROTOCOL : protocol;
|
||||||
|
this.trustStorePath = trustStorePath;
|
||||||
|
this.trustStoreType = trustStoretype;
|
||||||
|
this.trustStorePassword = trustStorePassword;
|
||||||
|
this.batchSize = (batchSize == null) ? DEFAULT_BATCH_SIZE : batchSize;
|
||||||
|
this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD_MILLIS : flushPeriod;
|
||||||
|
this.maxQueueSize = maxQueueSize == null ? Integer.MAX_VALUE : maxQueueSize;
|
||||||
|
this.druidToTimelineEventConverter = Preconditions.checkNotNull(
|
||||||
|
druidToTimelineEventConverter,
|
||||||
|
"Event converter can not be null"
|
||||||
|
);
|
||||||
|
this.alertEmitters = alertEmitters == null ? Collections.<String>emptyList() : alertEmitters;
|
||||||
|
this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
|
||||||
|
this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT : waitForEventTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getHostname()
|
||||||
|
{
|
||||||
|
return hostname;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public int getPort()
|
||||||
|
{
|
||||||
|
return port;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getProtocol()
|
||||||
|
{
|
||||||
|
return protocol;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getTrustStorePath()
|
||||||
|
{
|
||||||
|
return trustStorePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getTrustStoreType()
|
||||||
|
{
|
||||||
|
return trustStoreType;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getTrustStorePassword()
|
||||||
|
{
|
||||||
|
return trustStorePassword;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public int getBatchSize()
|
||||||
|
{
|
||||||
|
return batchSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public int getMaxQueueSize()
|
||||||
|
{
|
||||||
|
return maxQueueSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public long getFlushPeriod()
|
||||||
|
{
|
||||||
|
return flushPeriod;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public DruidToTimelineMetricConverter getDruidToTimelineEventConverter()
|
||||||
|
{
|
||||||
|
return druidToTimelineEventConverter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public List<String> getAlertEmitters()
|
||||||
|
{
|
||||||
|
return alertEmitters;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public long getEmitWaitTime()
|
||||||
|
{
|
||||||
|
return emitWaitTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public long getWaitForEventTime()
|
||||||
|
{
|
||||||
|
return waitForEventTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
AmbariMetricsEmitterConfig that = (AmbariMetricsEmitterConfig) o;
|
||||||
|
|
||||||
|
if (port != that.port) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (batchSize != that.batchSize) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (flushPeriod != that.flushPeriod) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (maxQueueSize != that.maxQueueSize) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (emitWaitTime != that.emitWaitTime) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (waitForEventTime != that.waitForEventTime) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!hostname.equals(that.hostname)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!protocol.equals(that.protocol)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (trustStorePath != null ? !trustStorePath.equals(that.trustStorePath) : that.trustStorePath != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (trustStoreType != null ? !trustStoreType.equals(that.trustStoreType) : that.trustStoreType != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (trustStorePassword != null
|
||||||
|
? !trustStorePassword.equals(that.trustStorePassword)
|
||||||
|
: that.trustStorePassword != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!druidToTimelineEventConverter.equals(that.druidToTimelineEventConverter)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return alertEmitters.equals(that.alertEmitters);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
int result = hostname.hashCode();
|
||||||
|
result = 31 * result + port;
|
||||||
|
result = 31 * result + protocol.hashCode();
|
||||||
|
result = 31 * result + (trustStorePath != null ? trustStorePath.hashCode() : 0);
|
||||||
|
result = 31 * result + (trustStoreType != null ? trustStoreType.hashCode() : 0);
|
||||||
|
result = 31 * result + (trustStorePassword != null ? trustStorePassword.hashCode() : 0);
|
||||||
|
result = 31 * result + batchSize;
|
||||||
|
result = 31 * result + (int) (flushPeriod ^ (flushPeriod >>> 32));
|
||||||
|
result = 31 * result + maxQueueSize;
|
||||||
|
result = 31 * result + druidToTimelineEventConverter.hashCode();
|
||||||
|
result = 31 * result + alertEmitters.hashCode();
|
||||||
|
result = 31 * result + (int) (emitWaitTime ^ (emitWaitTime >>> 32));
|
||||||
|
result = 31 * result + (int) (waitForEventTime ^ (waitForEventTime >>> 32));
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,71 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.emitter.ambari.metrics;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.Module;
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.inject.Binder;
|
||||||
|
import com.google.inject.Injector;
|
||||||
|
import com.google.inject.Key;
|
||||||
|
import com.google.inject.Provides;
|
||||||
|
import com.google.inject.name.Named;
|
||||||
|
import com.google.inject.name.Names;
|
||||||
|
import com.metamx.emitter.core.Emitter;
|
||||||
|
import io.druid.guice.JsonConfigProvider;
|
||||||
|
import io.druid.guice.ManageLifecycle;
|
||||||
|
import io.druid.initialization.DruidModule;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class AmbariMetricsEmitterModule implements DruidModule
|
||||||
|
{
|
||||||
|
private static final String EMITTER_TYPE = "ambari-metrics";
|
||||||
|
@Override
|
||||||
|
public List<? extends Module> getJacksonModules()
|
||||||
|
{
|
||||||
|
return Collections.EMPTY_LIST;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(Binder binder)
|
||||||
|
{
|
||||||
|
JsonConfigProvider.bind(binder, "druid.emitter." + EMITTER_TYPE, AmbariMetricsEmitterConfig.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Provides
|
||||||
|
@ManageLifecycle
|
||||||
|
@Named(EMITTER_TYPE)
|
||||||
|
public Emitter getEmitter(AmbariMetricsEmitterConfig emitterConfig, final Injector injector){
|
||||||
|
List<Emitter> emitters = Lists.transform(
|
||||||
|
emitterConfig.getAlertEmitters(),
|
||||||
|
new Function<String, Emitter>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Emitter apply(String s)
|
||||||
|
{
|
||||||
|
return injector.getInstance(Key.get(Emitter.class, Names.named(s)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
return new AmbariMetricsEmitter(emitterConfig, emitters);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.emitter.ambari.metrics;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
|
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||||
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
|
||||||
|
|
||||||
|
|
||||||
|
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = WhiteListBasedDruidToTimelineEventConverter.class)
|
||||||
|
@JsonSubTypes(value= {
|
||||||
|
@JsonSubTypes.Type(name="all", value = SendAllTimelineEventConverter.class),
|
||||||
|
@JsonSubTypes.Type(name="whiteList", value = WhiteListBasedDruidToTimelineEventConverter.class)
|
||||||
|
})
|
||||||
|
|
||||||
|
public interface DruidToTimelineMetricConverter
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* This function acts as a filter. It returns <tt>null</tt> if the event is not suppose to be emitted to Ambari Server
|
||||||
|
* Also This function will define the mapping between the druid event dimension's values and Ambari Metric Name
|
||||||
|
*
|
||||||
|
* @param serviceMetricEvent Druid event ot type {@link ServiceMetricEvent}
|
||||||
|
*
|
||||||
|
* @return {@link TimelineMetric} or <tt>null</tt>
|
||||||
|
*/
|
||||||
|
TimelineMetric druidEventToTimelineMetric(ServiceMetricEvent serviceMetricEvent);
|
||||||
|
}
|
|
@ -0,0 +1,129 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.emitter.ambari.metrics;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.common.base.Strings;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableSortedSet;
|
||||||
|
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||||
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Emits all the events instance of {@link com.metamx.emitter.service.ServiceMetricEvent}.
|
||||||
|
* <p>
|
||||||
|
* All the dimensions will be retained and lexicographically order using dimensions name.
|
||||||
|
* <p>
|
||||||
|
* The metric name of the timeline event is:
|
||||||
|
* <namespacePrefix>.<druid service name>.<dimensions values ordered by dimension's name>.<metric>
|
||||||
|
* <p>
|
||||||
|
* Note that this name will be sanitized by replacing all the `.` or `space` to `_` {@link AmbariMetricsEmitter#sanitize(String)}
|
||||||
|
*/
|
||||||
|
|
||||||
|
@JsonTypeName("all")
|
||||||
|
public class SendAllTimelineEventConverter implements DruidToTimelineMetricConverter
|
||||||
|
{
|
||||||
|
public static final String DEFAULT_APP_NAME = "druid";
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final String namespacePrefix;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final String appName;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getNamespacePrefix()
|
||||||
|
{
|
||||||
|
return namespacePrefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public SendAllTimelineEventConverter(
|
||||||
|
@JsonProperty("namespacePrefix") String namespacePrefix,
|
||||||
|
@JsonProperty("appName") String appName
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.namespacePrefix = namespacePrefix;
|
||||||
|
this.appName = appName == null ? DEFAULT_APP_NAME : appName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimelineMetric druidEventToTimelineMetric(ServiceMetricEvent serviceMetricEvent)
|
||||||
|
{
|
||||||
|
ImmutableList.Builder metricNameBuilder = new ImmutableList.Builder<>();
|
||||||
|
if(!Strings.isNullOrEmpty(namespacePrefix)) {
|
||||||
|
metricNameBuilder.add(namespacePrefix);
|
||||||
|
}
|
||||||
|
metricNameBuilder.add(AmbariMetricsEmitter.sanitize(serviceMetricEvent.getService()));
|
||||||
|
ImmutableSortedSet<String> dimNames = ImmutableSortedSet.copyOf(serviceMetricEvent.getUserDims().keySet());
|
||||||
|
for (String dimName : dimNames) {
|
||||||
|
metricNameBuilder.add(
|
||||||
|
AmbariMetricsEmitter.sanitize(
|
||||||
|
String.valueOf(
|
||||||
|
serviceMetricEvent.getUserDims().get(dimName)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
metricNameBuilder.add(AmbariMetricsEmitter.sanitize(serviceMetricEvent.getMetric()));
|
||||||
|
|
||||||
|
TimelineMetric metric = new TimelineMetric();
|
||||||
|
metric.setMetricName(Joiner.on(".").join(metricNameBuilder.build()));
|
||||||
|
metric.setAppId(appName);
|
||||||
|
metric.setHostName(serviceMetricEvent.getHost());
|
||||||
|
metric.setType(serviceMetricEvent.getFeed());
|
||||||
|
metric.setInstanceId(serviceMetricEvent.getService());
|
||||||
|
long ts = serviceMetricEvent.getCreatedTime().getMillis();
|
||||||
|
metric.setStartTime(ts);
|
||||||
|
metric.setTimestamp(ts);
|
||||||
|
metric.getMetricValues().put(ts, serviceMetricEvent.getValue().doubleValue());
|
||||||
|
return metric;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SendAllTimelineEventConverter that = (SendAllTimelineEventConverter) o;
|
||||||
|
|
||||||
|
if (namespacePrefix != null ? !namespacePrefix.equals(that.namespacePrefix) : that.namespacePrefix != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return appName.equals(that.appName);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
int result = namespacePrefix != null ? namespacePrefix.hashCode() : 0;
|
||||||
|
result = 31 * result + appName.hashCode();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,247 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.emitter.ambari.metrics;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.base.Strings;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableSortedMap;
|
||||||
|
import com.google.common.io.CharStreams;
|
||||||
|
import com.google.common.io.Files;
|
||||||
|
import com.metamx.common.ISE;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||||
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.SortedMap;
|
||||||
|
|
||||||
|
@JsonTypeName("whiteList")
|
||||||
|
public class WhiteListBasedDruidToTimelineEventConverter implements DruidToTimelineMetricConverter
|
||||||
|
{
|
||||||
|
private static final Logger LOGGER = new Logger(WhiteListBasedDruidToTimelineEventConverter.class);
|
||||||
|
/**
|
||||||
|
* @code whiteListDimsMapper is a white list of metric->dimensions mappings.
|
||||||
|
* Key is the druid metric name or the metric's prefix.
|
||||||
|
* Value is a list of metric's dimensions names.
|
||||||
|
* The order of the dimension name is important, it will be used to build the ambari metrics name.
|
||||||
|
* For instance if we have dimensions dataSource and queryType for metric query/time
|
||||||
|
* the final metric name will be -> prefix.dataSource.queryType.metricName
|
||||||
|
* Note that this name will be sanitized by replacing all the `.` or `space` to `_` {@link AmbariMetricsEmitter#sanitize(String)}
|
||||||
|
*/
|
||||||
|
private final ImmutableSortedMap<String, ImmutableList<String>> whiteListDimsMapper;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final String namespacePrefix;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final String appName;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final String mapPath;
|
||||||
|
|
||||||
|
private final ObjectMapper mapper;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public WhiteListBasedDruidToTimelineEventConverter(
|
||||||
|
@JsonProperty("namespacePrefix") String namespacePrefix,
|
||||||
|
@JsonProperty("appName") String appName,
|
||||||
|
@JsonProperty("mapPath") String mapPath,
|
||||||
|
@JacksonInject ObjectMapper mapper
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.mapper = Preconditions.checkNotNull(mapper);
|
||||||
|
this.mapPath = mapPath;
|
||||||
|
this.whiteListDimsMapper = readMap(this.mapPath);
|
||||||
|
this.namespacePrefix = namespacePrefix;
|
||||||
|
this.appName = appName == null ? SendAllTimelineEventConverter.DEFAULT_APP_NAME : appName;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getNamespacePrefix()
|
||||||
|
{
|
||||||
|
return namespacePrefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param event Event subject to filtering
|
||||||
|
*
|
||||||
|
* @return true if and only if the event prefix key is in the {@code whiteListDimsMapper}
|
||||||
|
*/
|
||||||
|
private boolean isInWhiteList(ServiceMetricEvent event)
|
||||||
|
{
|
||||||
|
return getPrefixKey(event.getMetric(), whiteListDimsMapper) != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param key the metric name to lookup
|
||||||
|
* @param whiteList
|
||||||
|
*
|
||||||
|
* @return <tt>null</tt> if the key does not match with any of the prefixes keys in @code metricsWhiteList,
|
||||||
|
* or the prefix in @code whiteListDimsMapper
|
||||||
|
*/
|
||||||
|
private String getPrefixKey(String key, SortedMap<String, ?> whiteList)
|
||||||
|
{
|
||||||
|
String prefixKey = null;
|
||||||
|
if (whiteList.containsKey(key)) {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
SortedMap<String, ?> headMap = whiteList.headMap(key);
|
||||||
|
if (!headMap.isEmpty() && key.startsWith(headMap.lastKey())) {
|
||||||
|
prefixKey = headMap.lastKey();
|
||||||
|
}
|
||||||
|
return prefixKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a {@link List} of the white-listed dimension's values to send.
|
||||||
|
* The list is order is the same as the order of dimensions {@code whiteListDimsMapper}
|
||||||
|
*
|
||||||
|
* @param event the event for which will filter dimensions
|
||||||
|
*
|
||||||
|
* @return {@link List} of the filtered dimension values to send or <tt>null<tt/> if the event is not in the white list
|
||||||
|
*/
|
||||||
|
private List<String> getOrderedDimValues(ServiceMetricEvent event)
|
||||||
|
{
|
||||||
|
String prefixKey = getPrefixKey(event.getMetric(), whiteListDimsMapper);
|
||||||
|
if (prefixKey == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
ImmutableList.Builder<String> outputList = new ImmutableList.Builder();
|
||||||
|
List<String> dimensions = whiteListDimsMapper.get(prefixKey);
|
||||||
|
if (dimensions == null) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
for (String dimKey : dimensions) {
|
||||||
|
String dimValue = (String) event.getUserDims().get(dimKey);
|
||||||
|
if (dimValue != null) {
|
||||||
|
outputList.add(AmbariMetricsEmitter.sanitize(dimValue));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return outputList.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param serviceMetricEvent druid metric event to convert
|
||||||
|
*
|
||||||
|
* @return <tt>null</tt> if the event is not white listed, otherwise return {@link TimelineMetric}
|
||||||
|
* <p>
|
||||||
|
* The metric name of the ambari timeline metric event is:
|
||||||
|
* <namespacePrefix>.<druid service name>.<white-listed dimensions>.<metric>
|
||||||
|
* <p/>
|
||||||
|
* The order of the dimension is the order returned by {@code getOrderedDimValues()}
|
||||||
|
* Note that this name will be sanitized by replacing all the `.` or space by `_` {@link AmbariMetricsEmitter#sanitize(String)}
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimelineMetric druidEventToTimelineMetric(ServiceMetricEvent serviceMetricEvent)
|
||||||
|
{
|
||||||
|
if (!this.isInWhiteList(serviceMetricEvent)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
final ImmutableList.Builder<String> metricNameBuilder = new ImmutableList.Builder<>();
|
||||||
|
if(!Strings.isNullOrEmpty(namespacePrefix)) {
|
||||||
|
metricNameBuilder.add(namespacePrefix);
|
||||||
|
}
|
||||||
|
metricNameBuilder.add(AmbariMetricsEmitter.sanitize(serviceMetricEvent.getService()));
|
||||||
|
metricNameBuilder.addAll(this.getOrderedDimValues(serviceMetricEvent));
|
||||||
|
metricNameBuilder.add(AmbariMetricsEmitter.sanitize(serviceMetricEvent.getMetric()));
|
||||||
|
|
||||||
|
TimelineMetric metric = new TimelineMetric();
|
||||||
|
metric.setMetricName(Joiner.on(".").join(metricNameBuilder.build()));
|
||||||
|
metric.setAppId(appName);
|
||||||
|
metric.setHostName(serviceMetricEvent.getHost());
|
||||||
|
metric.setType(serviceMetricEvent.getFeed());
|
||||||
|
metric.setInstanceId(serviceMetricEvent.getService());
|
||||||
|
long ts = serviceMetricEvent.getCreatedTime().getMillis();
|
||||||
|
metric.setStartTime(ts);
|
||||||
|
metric.setTimestamp(ts);
|
||||||
|
metric.getMetricValues().put(ts, serviceMetricEvent.getValue().doubleValue());
|
||||||
|
return metric;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ImmutableSortedMap<String, ImmutableList<String>> readMap(final String mapPath)
|
||||||
|
{
|
||||||
|
String fileContent;
|
||||||
|
String actualPath = mapPath;
|
||||||
|
try {
|
||||||
|
if (Strings.isNullOrEmpty(mapPath)) {
|
||||||
|
actualPath = this.getClass().getClassLoader().getResource("defaultWhiteListMap.json").getFile();
|
||||||
|
LOGGER.info("using default whiteList map located at [%s]", actualPath);
|
||||||
|
fileContent = CharStreams.toString(new InputStreamReader(this.getClass()
|
||||||
|
.getClassLoader()
|
||||||
|
.getResourceAsStream("defaultWhiteListMap.json")));
|
||||||
|
} else {
|
||||||
|
fileContent = Files.asCharSource(new File(mapPath), Charset.forName("UTF-8")).read();
|
||||||
|
}
|
||||||
|
return mapper.reader(new TypeReference<ImmutableSortedMap<String, ImmutableList<String>>>()
|
||||||
|
{
|
||||||
|
}).readValue(fileContent);
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new ISE(e, "Got an exception while parsing file [%s]", actualPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
WhiteListBasedDruidToTimelineEventConverter that = (WhiteListBasedDruidToTimelineEventConverter) o;
|
||||||
|
|
||||||
|
if (namespacePrefix != null ? !namespacePrefix.equals(that.namespacePrefix) : that.namespacePrefix != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!appName.equals(that.appName)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return mapPath != null ? mapPath.equals(that.mapPath) : that.mapPath == null;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
int result = namespacePrefix != null ? namespacePrefix.hashCode() : 0;
|
||||||
|
result = 31 * result + appName.hashCode();
|
||||||
|
result = 31 * result + (mapPath != null ? mapPath.hashCode() : 0);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1 @@
|
||||||
|
io.druid.emitter.ambari.metrics.AmbariMetricsEmitterModule
|
|
@ -0,0 +1,73 @@
|
||||||
|
{
|
||||||
|
"ingest/events": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/handoff/failed": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/persists": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/rows/output": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/merge": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"jvm/gc": [],
|
||||||
|
"jvm/mem": [
|
||||||
|
"memKind"
|
||||||
|
],
|
||||||
|
"query/cpu/time": [
|
||||||
|
"dataSource",
|
||||||
|
"type"
|
||||||
|
],
|
||||||
|
"query/node/time": [
|
||||||
|
"dataSource",
|
||||||
|
"type"
|
||||||
|
],
|
||||||
|
"query/node/ttfb": [
|
||||||
|
"dataSource",
|
||||||
|
"type"
|
||||||
|
],
|
||||||
|
"query/partial/time": [
|
||||||
|
"dataSource",
|
||||||
|
"type"
|
||||||
|
],
|
||||||
|
"query/segment/time": [
|
||||||
|
"dataSource",
|
||||||
|
"type"
|
||||||
|
],
|
||||||
|
"query/segmentAndCache/time": [
|
||||||
|
"dataSource",
|
||||||
|
"type"
|
||||||
|
],
|
||||||
|
"query/time": [
|
||||||
|
"dataSource",
|
||||||
|
"type"
|
||||||
|
],
|
||||||
|
"query/wait/time": [
|
||||||
|
"dataSource",
|
||||||
|
"type"
|
||||||
|
],
|
||||||
|
"segment/count": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"segment/dropQueue/count": [],
|
||||||
|
"segment/loadQueue/count": [],
|
||||||
|
"segment/loadQueue/failed": [],
|
||||||
|
"segment/loadQueue/size": [],
|
||||||
|
"segment/scan/pending": [],
|
||||||
|
"segment/size": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"segment/usedPercent": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"segment/added/bytes" : [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"segment/nuked/bytes" : [
|
||||||
|
"dataSource"
|
||||||
|
]
|
||||||
|
}
|
|
@ -0,0 +1,69 @@
|
||||||
|
package io.druid.emitter.ambari.metrics;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.InjectableValues;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
|
|
||||||
|
public class AmbariMetricsEmitterConfigTest
|
||||||
|
{
|
||||||
|
private final ObjectMapper mapper = new DefaultObjectMapper();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp()
|
||||||
|
{
|
||||||
|
mapper.setInjectableValues(new InjectableValues.Std().addValue(
|
||||||
|
ObjectMapper.class,
|
||||||
|
new DefaultObjectMapper()
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerDeAmbariMetricsEmitterConfig() throws IOException
|
||||||
|
{
|
||||||
|
AmbariMetricsEmitterConfig config = new AmbariMetricsEmitterConfig(
|
||||||
|
"hostname",
|
||||||
|
8080,
|
||||||
|
"http",
|
||||||
|
"truststore.path",
|
||||||
|
"truststore.type",
|
||||||
|
"truststore.password",
|
||||||
|
1000,
|
||||||
|
1000L,
|
||||||
|
100,
|
||||||
|
new SendAllTimelineEventConverter("prefix", "druid"),
|
||||||
|
Collections.EMPTY_LIST,
|
||||||
|
500L,
|
||||||
|
400L
|
||||||
|
);
|
||||||
|
AmbariMetricsEmitterConfig serde = mapper.reader(AmbariMetricsEmitterConfig.class).readValue(
|
||||||
|
mapper.writeValueAsBytes(config)
|
||||||
|
);
|
||||||
|
Assert.assertEquals(config, serde);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerDeDruidToTimelineEventConverter() throws IOException
|
||||||
|
{
|
||||||
|
SendAllTimelineEventConverter sendAllConverter = new SendAllTimelineEventConverter("prefix", "druid");
|
||||||
|
DruidToTimelineMetricConverter serde = mapper.reader(DruidToTimelineMetricConverter.class)
|
||||||
|
.readValue( mapper.writeValueAsBytes(sendAllConverter));
|
||||||
|
Assert.assertEquals(sendAllConverter, serde);
|
||||||
|
|
||||||
|
WhiteListBasedDruidToTimelineEventConverter whiteListBasedDruidToTimelineEventConverter = new WhiteListBasedDruidToTimelineEventConverter(
|
||||||
|
"prefix",
|
||||||
|
"druid",
|
||||||
|
"",
|
||||||
|
new DefaultObjectMapper()
|
||||||
|
);
|
||||||
|
serde = mapper.reader(DruidToTimelineMetricConverter.class)
|
||||||
|
.readValue(mapper.writeValueAsBytes(
|
||||||
|
whiteListBasedDruidToTimelineEventConverter));
|
||||||
|
Assert.assertEquals(whiteListBasedDruidToTimelineEventConverter, serde);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.emitter.ambari.metrics;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class DruidToWhiteListBasedConverterTest
|
||||||
|
{
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSanitize()
|
||||||
|
{
|
||||||
|
String test = "host name.yahoo.com:8080";
|
||||||
|
Assert.assertEquals("host_name_yahoo_com:8080", AmbariMetricsEmitter.sanitize(test));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,141 @@
|
||||||
|
package io.druid.emitter.ambari.metrics;
|
||||||
|
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||||
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
import junitparams.JUnitParamsRunner;
|
||||||
|
import junitparams.Parameters;
|
||||||
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
|
||||||
|
|
||||||
|
@RunWith(JUnitParamsRunner.class)
|
||||||
|
public class WhiteListBasedDruidToTimelineEventConverterTest
|
||||||
|
{
|
||||||
|
final private String prefix = "druid";
|
||||||
|
final private WhiteListBasedDruidToTimelineEventConverter defaultWhiteListBasedDruidToTimelineEventConverter = new WhiteListBasedDruidToTimelineEventConverter(
|
||||||
|
prefix,
|
||||||
|
"druid",
|
||||||
|
null,
|
||||||
|
new DefaultObjectMapper()
|
||||||
|
);
|
||||||
|
private ServiceMetricEvent event;
|
||||||
|
private final DateTime createdTime = new DateTime();
|
||||||
|
private final String hostname = "testHost:8080";
|
||||||
|
private final String serviceName = "historical";
|
||||||
|
private final String defaultNamespace = prefix + "." + serviceName;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp()
|
||||||
|
{
|
||||||
|
event = EasyMock.createMock(ServiceMetricEvent.class);
|
||||||
|
EasyMock.expect(event.getHost()).andReturn(hostname).anyTimes();
|
||||||
|
EasyMock.expect(event.getService()).andReturn(serviceName).anyTimes();
|
||||||
|
EasyMock.expect(event.getCreatedTime()).andReturn(createdTime).anyTimes();
|
||||||
|
EasyMock.expect(event.getUserDims()).andReturn(Maps.<String, Object>newHashMap()).anyTimes();
|
||||||
|
EasyMock.expect(event.getValue()).andReturn(10).anyTimes();
|
||||||
|
EasyMock.expect(event.getFeed()).andReturn("metrics").anyTimes();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Parameters(
|
||||||
|
{
|
||||||
|
"query/time, true",
|
||||||
|
"query/node/ttfb, true",
|
||||||
|
"query/segmentAndCache/time, true",
|
||||||
|
"query/intervalChunk/time, false",
|
||||||
|
"query/time/balaba, true",
|
||||||
|
"query/tim, false",
|
||||||
|
"segment/added/bytes, true",
|
||||||
|
"segment/count, true",
|
||||||
|
"segment/size, true",
|
||||||
|
"segment/cost/raw, false",
|
||||||
|
"coordinator/TIER_1 /cost/raw, false",
|
||||||
|
"segment/Kost/raw, false",
|
||||||
|
", false",
|
||||||
|
"word, false",
|
||||||
|
"coordinator, false",
|
||||||
|
"server/, false",
|
||||||
|
"ingest/persists/time, true",
|
||||||
|
"jvm/mem/init, true",
|
||||||
|
"jvm/gc/count, true"
|
||||||
|
}
|
||||||
|
)
|
||||||
|
public void testDefaultIsInWhiteList(String key, boolean expectedValue)
|
||||||
|
{
|
||||||
|
EasyMock.expect(event.getMetric()).andReturn(key).anyTimes();
|
||||||
|
EasyMock.replay(event);
|
||||||
|
boolean isIn = defaultWhiteListBasedDruidToTimelineEventConverter.druidEventToTimelineMetric(event) != null;
|
||||||
|
Assert.assertEquals(expectedValue, isIn);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Parameters
|
||||||
|
public void testGetName(ServiceMetricEvent serviceMetricEvent, String expectedPath)
|
||||||
|
{
|
||||||
|
TimelineMetric metric = defaultWhiteListBasedDruidToTimelineEventConverter.druidEventToTimelineMetric(serviceMetricEvent);
|
||||||
|
String path = null;
|
||||||
|
if (metric != null) {
|
||||||
|
path = metric.getMetricName();
|
||||||
|
}
|
||||||
|
Assert.assertEquals(expectedPath, path);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Object[] parametersForTestGetName()
|
||||||
|
{
|
||||||
|
return new Object[]{
|
||||||
|
new Object[]{
|
||||||
|
new ServiceMetricEvent.Builder().setDimension("id", "dummy_id")
|
||||||
|
.setDimension("status", "some_status")
|
||||||
|
.setDimension("numDimensions", "1")
|
||||||
|
.setDimension("segment", "dummy_segment")
|
||||||
|
.build(createdTime, "query/segment/time/balabla/more", 10)
|
||||||
|
.build(serviceName, hostname),
|
||||||
|
defaultNamespace + ".query/segment/time/balabla/more"
|
||||||
|
},
|
||||||
|
new Object[]{
|
||||||
|
new ServiceMetricEvent.Builder().setDimension("dataSource", "some_data_source")
|
||||||
|
.setDimension("tier", "_default_tier")
|
||||||
|
.build(createdTime, "segment/max", 10)
|
||||||
|
.build(serviceName, hostname),
|
||||||
|
null
|
||||||
|
},
|
||||||
|
new Object[]{
|
||||||
|
new ServiceMetricEvent.Builder().setDimension("dataSource", "data-source")
|
||||||
|
.setDimension("type", "groupBy")
|
||||||
|
.setDimension("interval", "2013/2015")
|
||||||
|
.setDimension("some_random_dim1", "random_dim_value1")
|
||||||
|
.setDimension("some_random_dim2", "random_dim_value2")
|
||||||
|
.setDimension("hasFilters", "no")
|
||||||
|
.setDimension("duration", "P1D")
|
||||||
|
.setDimension("remoteAddress", "194.0.90.2")
|
||||||
|
.setDimension("id", "ID")
|
||||||
|
.setDimension("context", "{context}")
|
||||||
|
.build(createdTime, "query/time", 10)
|
||||||
|
.build(serviceName, hostname),
|
||||||
|
defaultNamespace + ".data-source.groupBy.query/time"
|
||||||
|
},
|
||||||
|
new Object[]{
|
||||||
|
new ServiceMetricEvent.Builder().setDimension("dataSource", "data-source")
|
||||||
|
.setDimension("type", "groupBy")
|
||||||
|
.setDimension("some_random_dim1", "random_dim_value1")
|
||||||
|
.build(createdTime, "ingest/persists/count", 10)
|
||||||
|
.build(serviceName, hostname),
|
||||||
|
defaultNamespace + ".data-source.ingest/persists/count"
|
||||||
|
},
|
||||||
|
new Object[]{
|
||||||
|
new ServiceMetricEvent.Builder().setDimension("bufferPoolName", "BufferPool")
|
||||||
|
.setDimension("type", "groupBy")
|
||||||
|
.setDimension("some_random_dim1", "random_dim_value1")
|
||||||
|
.build(createdTime, "jvm/bufferpool/capacity", 10)
|
||||||
|
.build(serviceName, hostname),
|
||||||
|
null
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
1
pom.xml
1
pom.xml
|
@ -120,6 +120,7 @@
|
||||||
<module>extensions-contrib/google-extensions</module>
|
<module>extensions-contrib/google-extensions</module>
|
||||||
<module>extensions-contrib/virtual-columns</module>
|
<module>extensions-contrib/virtual-columns</module>
|
||||||
<module>extensions-contrib/thrift-extensions</module>
|
<module>extensions-contrib/thrift-extensions</module>
|
||||||
|
<module>extensions-contrib/ambari-metrics-emitter</module>
|
||||||
</modules>
|
</modules>
|
||||||
|
|
||||||
<dependencyManagement>
|
<dependencyManagement>
|
||||||
|
|
Loading…
Reference in New Issue