From a966de5319e833409c9513b6616b2d2e192f7adf Mon Sep 17 00:00:00 2001
From: awelsh93 <32643586+awelsh93@users.noreply.github.com>
Date: Tue, 27 Oct 2020 02:49:26 +0000
Subject: [PATCH] Add https to druid-influxdb-emitter extension (#9938)
* Add https to druid-influxdb-emitter extension
* address CI failures
* increase test coverage
* tests for being unable to load trustStore
* fix EqualsVerifier test
* fix intellij inspection error
* use try-with-resources when loading trustStore
---
.../extensions-contrib/influxdb-emitter.md | 4 +
extensions-contrib/influxdb-emitter/pom.xml | 5 +
.../emitter/influxdb/InfluxdbEmitter.java | 41 ++++++-
.../influxdb/InfluxdbEmitterConfig.java | 72 ++++++++++---
.../influxdb/InfluxdbEmitterConfigTest.java | 96 +++++++++++++++++
.../emitter/influxdb/InfluxdbEmitterTest.java | 101 ++++++++++++++++++
6 files changed, 304 insertions(+), 15 deletions(-)
diff --git a/docs/development/extensions-contrib/influxdb-emitter.md b/docs/development/extensions-contrib/influxdb-emitter.md
index 3b1c84c30ca..076b8fc70b8 100644
--- a/docs/development/extensions-contrib/influxdb-emitter.md
+++ b/docs/development/extensions-contrib/influxdb-emitter.md
@@ -41,6 +41,10 @@ All the configuration parameters for the influxdb emitter are under `druid.emitt
|--------|-----------|---------|-------|
|`druid.emitter.influxdb.hostname`|The hostname of the InfluxDB server.|Yes|N/A|
|`druid.emitter.influxdb.port`|The port of the InfluxDB server.|No|8086|
+|`druid.emitter.influxdb.protocol`|The protocol used to send metrics to InfluxDB. One of http/https|No|http|
+|`druid.emitter.influxdb.trustStorePath`|The path to the trustStore to be used for https|No|none|
+|`druid.emitter.influxdb.trustStoreType`|The trustStore type to be used for https|No|`jks`|
+|`druid.emitter.influxdb.trustStorePassword`|The trustStore password to be used for https|No|none|
|`druid.emitter.influxdb.databaseName`|The name of the database in InfluxDB.|Yes|N/A|
|`druid.emitter.influxdb.maxQueueSize`|The size of the queue that holds events.|No|Integer.MAX_VALUE(=2^31-1)|
|`druid.emitter.influxdb.flushPeriod`|How often (in milliseconds) the events queue is parsed into Line Protocol and POSTed to InfluxDB.|No|60000|
diff --git a/extensions-contrib/influxdb-emitter/pom.xml b/extensions-contrib/influxdb-emitter/pom.xml
index 2c449e6f624..3d579181243 100644
--- a/extensions-contrib/influxdb-emitter/pom.xml
+++ b/extensions-contrib/influxdb-emitter/pom.xml
@@ -91,5 +91,10 @@
JUnitParams
test
+
+ nl.jqno.equalsverifier
+ equalsverifier
+ test
+
diff --git a/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitter.java b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitter.java
index ee22917f8e3..bb1c72b0314 100644
--- a/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitter.java
+++ b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitter.java
@@ -28,11 +28,18 @@ import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.security.KeyStore;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
@@ -56,7 +63,7 @@ public class InfluxdbEmitter implements Emitter
public InfluxdbEmitter(InfluxdbEmitterConfig influxdbEmitterConfig)
{
this.influxdbEmitterConfig = influxdbEmitterConfig;
- this.influxdbClient = HttpClientBuilder.create().build();
+ this.influxdbClient = buildInfluxdbClient();
this.eventsQueue = new LinkedBlockingQueue<>(influxdbEmitterConfig.getMaxQueueSize());
this.dimensionWhiteList = influxdbEmitterConfig.getDimensionWhitelist();
log.info("constructed influxdb emitter");
@@ -96,7 +103,8 @@ public class InfluxdbEmitter implements Emitter
public void postToInflux(String payload)
{
HttpPost post = new HttpPost(
- "http://" + influxdbEmitterConfig.getHostname()
+ influxdbEmitterConfig.getProtocol() + "://"
+ + influxdbEmitterConfig.getHostname()
+ ":" + influxdbEmitterConfig.getPort()
+ "/write?db=" + influxdbEmitterConfig.getDatabaseName()
+ "&u=" + influxdbEmitterConfig.getInfluxdbUserName()
@@ -211,4 +219,33 @@ public class InfluxdbEmitter implements Emitter
postToInflux(payload.toString());
}
+ private HttpClient buildInfluxdbClient()
+ {
+ if ("https".equals(influxdbEmitterConfig.getProtocol())) {
+ SSLContext sslContext;
+ if (influxdbEmitterConfig.getTrustStorePath() == null || influxdbEmitterConfig.getTrustStorePassword() == null) {
+ String msg = "Can't load TrustStore. Truststore path or password is not set.";
+ log.error(msg);
+ throw new IllegalStateException(msg);
+ }
+
+ try (FileInputStream in = new FileInputStream(new File(influxdbEmitterConfig.getTrustStorePath()))) {
+ KeyStore store = KeyStore.getInstance(influxdbEmitterConfig.getTrustStoreType());
+ store.load(in, influxdbEmitterConfig.getTrustStorePassword().toCharArray());
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ tmf.init(store);
+ sslContext = SSLContext.getInstance("TLS");
+ sslContext.init(null, tmf.getTrustManagers(), null);
+ }
+ catch (Exception ex) {
+ String msg = "Unable to load TrustStore";
+ log.error(msg);
+ throw new IllegalStateException(msg);
+ }
+ return HttpClients.custom().setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE).build();
+ } else {
+ return HttpClientBuilder.create().build();
+ }
+ }
+
}
diff --git a/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfig.java b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfig.java
index d96b07083e2..87b1a6922ce 100644
--- a/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfig.java
+++ b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfig.java
@@ -23,10 +23,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
-import org.apache.druid.java.util.common.logger.Logger;
+import java.security.KeyStore;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
public class InfluxdbEmitterConfig
@@ -36,12 +37,21 @@ public class InfluxdbEmitterConfig
private static final int DEFAULT_QUEUE_SIZE = Integer.MAX_VALUE;
private static final int DEFAULT_FLUSH_PERIOD = 60000; // milliseconds
private static final List DEFAULT_DIMENSION_WHITELIST = Arrays.asList("dataSource", "type", "numMetrics", "numDimensions", "threshold", "dimension", "taskType", "taskStatus", "tier");
+ private static final String DEFAULT_PROTOCOL = "http";
@JsonProperty
private final String hostname;
@JsonProperty
private final Integer port;
@JsonProperty
+ private final String protocol;
+ @JsonProperty
+ private final String trustStorePath;
+ @JsonProperty
+ private final String trustStoreType;
+ @JsonProperty
+ private final String trustStorePassword;
+ @JsonProperty
private final String databaseName;
@JsonProperty
private final Integer maxQueueSize;
@@ -56,12 +66,15 @@ public class InfluxdbEmitterConfig
@JsonProperty
private final ImmutableSet dimensionWhitelist;
- private static Logger log = new Logger(InfluxdbEmitterConfig.class);
@JsonCreator
public InfluxdbEmitterConfig(
@JsonProperty("hostname") String hostname,
@JsonProperty("port") Integer port,
+ @JsonProperty("protocol") String protocol,
+ @JsonProperty("trustStorePath") String trustStorePath,
+ @JsonProperty("trustStoreType") String trustStoreType,
+ @JsonProperty("trustStorePassword") String trustStorePassword,
@JsonProperty("databaseName") String databaseName,
@JsonProperty("maxQueueSize") Integer maxQueueSize,
@JsonProperty("flushPeriod") Integer flushPeriod,
@@ -73,6 +86,10 @@ public class InfluxdbEmitterConfig
{
this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null");
this.port = port == null ? DEFAULT_PORT : port;
+ this.protocol = protocol == null ? DEFAULT_PROTOCOL : protocol;
+ this.trustStorePath = trustStorePath;
+ this.trustStoreType = trustStoreType == null ? KeyStore.getDefaultType() : trustStoreType;
+ this.trustStorePassword = trustStorePassword;
this.databaseName = Preconditions.checkNotNull(databaseName, "databaseName can not be null");
this.maxQueueSize = maxQueueSize == null ? DEFAULT_QUEUE_SIZE : maxQueueSize;
this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD : flushPeriod;
@@ -88,7 +105,7 @@ public class InfluxdbEmitterConfig
if (this == o) {
return true;
}
- if (!(o instanceof InfluxdbEmitterConfig)) {
+ if (o == null || getClass() != o.getClass()) {
return false;
}
@@ -121,6 +138,18 @@ public class InfluxdbEmitterConfig
if (!getDimensionWhitelist().equals(that.getDimensionWhitelist())) {
return false;
}
+ if (!getProtocol().equals(that.getProtocol())) {
+ return false;
+ }
+ if (getTrustStorePath() != null ? !getTrustStorePath().equals(that.getTrustStorePath()) : that.getTrustStorePath() != null) {
+ return false;
+ }
+ if (!getTrustStoreType().equals(that.getTrustStoreType())) {
+ return false;
+ }
+ if (getTrustStorePassword() != null ? !getTrustStorePassword().equals(that.getTrustStorePassword()) : that.getTrustStorePassword() != null) {
+ return false;
+ }
return true;
}
@@ -128,16 +157,9 @@ public class InfluxdbEmitterConfig
@Override
public int hashCode()
{
- int result = getHostname().hashCode();
- result = 31 * result + getPort();
- result = 31 * result + getDatabaseName().hashCode();
- result = 31 * result + getFlushPeriod();
- result = 31 * result + getMaxQueueSize();
- result = 31 * result + getFlushDelay();
- result = 31 * result + getInfluxdbUserName().hashCode();
- result = 31 * result + getInfluxdbPassword().hashCode();
- result = 31 * result + getDimensionWhitelist().hashCode();
- return result;
+ return Objects.hash(hostname, port, protocol, trustStorePath, trustStoreType,
+ trustStorePassword, databaseName, flushPeriod, maxQueueSize,
+ flushDelay, influxdbUserName, influxdbPassword, dimensionWhitelist);
}
@JsonProperty
@@ -152,6 +174,30 @@ public class InfluxdbEmitterConfig
return port;
}
+ @JsonProperty
+ public String getProtocol()
+ {
+ return protocol;
+ }
+
+ @JsonProperty
+ public String getTrustStorePath()
+ {
+ return trustStorePath;
+ }
+
+ @JsonProperty
+ public String getTrustStoreType()
+ {
+ return trustStoreType;
+ }
+
+ @JsonProperty
+ public String getTrustStorePassword()
+ {
+ return trustStorePassword;
+ }
+
@JsonProperty
public String getDatabaseName()
{
diff --git a/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfigTest.java b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfigTest.java
index 09e6e55c2f3..d6312115a0e 100644
--- a/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfigTest.java
+++ b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfigTest.java
@@ -22,11 +22,13 @@ package org.apache.druid.emitter.influxdb;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
+import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.security.KeyStore;
import java.util.Arrays;
public class InfluxdbEmitterConfigTest
@@ -45,6 +47,10 @@ public class InfluxdbEmitterConfigTest
influxdbEmitterConfig = new InfluxdbEmitterConfig(
"localhost",
8086,
+ null,
+ null,
+ null,
+ null,
"dbname",
10000,
15000,
@@ -61,6 +67,10 @@ public class InfluxdbEmitterConfigTest
InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
"localhost",
8080,
+ null,
+ null,
+ null,
+ null,
"dbname",
10000,
15000,
@@ -78,6 +88,10 @@ public class InfluxdbEmitterConfigTest
InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
null,
8080,
+ null,
+ null,
+ null,
+ null,
"dbname",
10000,
15000,
@@ -94,6 +108,10 @@ public class InfluxdbEmitterConfigTest
InfluxdbEmitterConfig influxdbEmitterConfigWithNullPort = new InfluxdbEmitterConfig(
"localhost",
null,
+ null,
+ null,
+ null,
+ null,
"dbname",
10000,
15000,
@@ -112,6 +130,10 @@ public class InfluxdbEmitterConfigTest
InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
"localhost",
8086,
+ null,
+ null,
+ null,
+ null,
"dbname",
10000,
15000,
@@ -129,6 +151,10 @@ public class InfluxdbEmitterConfigTest
InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
"localhost",
8086,
+ "https",
+ "/path",
+ "jks",
+ "password",
"dbname",
10000,
15000,
@@ -146,6 +172,10 @@ public class InfluxdbEmitterConfigTest
InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
"localhost",
8086,
+ null,
+ null,
+ null,
+ null,
"dbname",
10000,
15000,
@@ -162,6 +192,10 @@ public class InfluxdbEmitterConfigTest
InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
"localhost",
8086,
+ null,
+ null,
+ null,
+ null,
"dbname",
10000,
15000,
@@ -178,6 +212,10 @@ public class InfluxdbEmitterConfigTest
InfluxdbEmitterConfig influxdbEmitterConfig = new InfluxdbEmitterConfig(
"localhost",
8086,
+ null,
+ null,
+ null,
+ null,
"dbname",
10000,
15000,
@@ -196,6 +234,10 @@ public class InfluxdbEmitterConfigTest
InfluxdbEmitterConfig influxdbEmitterConfig = new InfluxdbEmitterConfig(
"localhost",
8086,
+ null,
+ null,
+ null,
+ null,
"dbname",
10000,
15000,
@@ -208,4 +250,58 @@ public class InfluxdbEmitterConfigTest
Assert.assertEquals(expected, influxdbEmitterConfig.getDimensionWhitelist());
}
+ @Test
+ public void testConfigWithNullProtocol()
+ {
+ InfluxdbEmitterConfig influxdbEmitterConfigWithNullProtocol = new InfluxdbEmitterConfig(
+ "localhost",
+ 8086,
+ null,
+ "path",
+ "jks",
+ "pass",
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ "password",
+ null
+ );
+ String expectedProtocol = "http";
+ Assert.assertEquals(expectedProtocol, influxdbEmitterConfigWithNullProtocol.getProtocol());
+ }
+
+ @Test
+ public void testConfigEquals()
+ {
+ EqualsVerifier.forClass(InfluxdbEmitterConfig.class).withNonnullFields(
+ "hostname", "port", "protocol", "trustStoreType", "databaseName",
+ "maxQueueSize", "flushPeriod", "flushDelay", "influxdbUserName",
+ "influxdbPassword", "dimensionWhitelist"
+ ).usingGetClass().verify();
+ }
+
+ @Test
+ public void testConfigWithNullTrustStoreType()
+ {
+ InfluxdbEmitterConfig influxdbEmitterConfigWithNullTrustStoreType = new InfluxdbEmitterConfig(
+ "localhost",
+ 8086,
+ null,
+ "path",
+ null,
+ "pass",
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ "password",
+ null
+ );
+ String expectedTrustStoreType = KeyStore.getDefaultType();
+ Assert.assertEquals(expectedTrustStoreType, influxdbEmitterConfigWithNullTrustStoreType.getTrustStoreType());
+ }
+
}
diff --git a/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterTest.java b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterTest.java
index 318f38f65c3..f39549250f4 100644
--- a/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterTest.java
+++ b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterTest.java
@@ -66,6 +66,10 @@ public class InfluxdbEmitterTest
InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
"localhost",
8086,
+ null,
+ null,
+ null,
+ null,
"dbname",
10000,
15000,
@@ -107,6 +111,10 @@ public class InfluxdbEmitterTest
InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
"localhost",
8086,
+ null,
+ null,
+ null,
+ null,
"dbname",
10000,
15000,
@@ -149,6 +157,10 @@ public class InfluxdbEmitterTest
InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
"localhost",
8086,
+ null,
+ null,
+ null,
+ null,
"dbname",
10000,
15000,
@@ -191,6 +203,10 @@ public class InfluxdbEmitterTest
InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
"localhost",
8086,
+ null,
+ null,
+ null,
+ null,
"dbname",
10000,
15000,
@@ -211,4 +227,89 @@ public class InfluxdbEmitterTest
{
Assert.assertTrue(new InfluxdbEmitterModule().getJacksonModules().isEmpty());
}
+
+ @Test(expected = IllegalStateException.class)
+ public void testBuildInfluxdbClientWithHttpsProtocolAndNoTrustStore()
+ {
+ InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
+ "localhost",
+ 8086,
+ "https",
+ null,
+ null,
+ null,
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ "password",
+ null
+ );
+ InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testBuildInfluxdbClientWithHttpsProtocolAndNullTrustStorePath()
+ {
+ InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
+ "localhost",
+ 8086,
+ "https",
+ null,
+ null,
+ "pass",
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ "password",
+ null
+ );
+ InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testBuildInfluxdbClientWithHttpsProtocolAndNullTrustStorePassword()
+ {
+ InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
+ "localhost",
+ 8086,
+ "https",
+ "path",
+ null,
+ null,
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ "password",
+ null
+ );
+ InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testUnableToLoadTrustStore()
+ {
+ InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
+ "localhost",
+ 8086,
+ "https",
+ "path",
+ null,
+ "pass",
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ "password",
+ null
+ );
+ InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
+ }
+
}