mirror of https://github.com/apache/druid.git
Add https to druid-influxdb-emitter extension (#9938)
* Add https to druid-influxdb-emitter extension * address CI failures * increase test coverage * tests for being unable to load trustStore * fix EqualsVerifier test * fix intellij inspection error * use try-with-resources when loading trustStore
This commit is contained in:
parent
f3a2903218
commit
a966de5319
|
@ -41,6 +41,10 @@ All the configuration parameters for the influxdb emitter are under `druid.emitt
|
|||
|--------|-----------|---------|-------|
|
||||
|`druid.emitter.influxdb.hostname`|The hostname of the InfluxDB server.|Yes|N/A|
|
||||
|`druid.emitter.influxdb.port`|The port of the InfluxDB server.|No|8086|
|
||||
|`druid.emitter.influxdb.protocol`|The protocol used to send metrics to InfluxDB. One of http/https|No|http|
|
||||
|`druid.emitter.influxdb.trustStorePath`|The path to the trustStore to be used for https|No|none|
|
||||
|`druid.emitter.influxdb.trustStoreType`|The trustStore type to be used for https|No|`jks`|
|
||||
|`druid.emitter.influxdb.trustStorePassword`|The trustStore password to be used for https|No|none|
|
||||
|`druid.emitter.influxdb.databaseName`|The name of the database in InfluxDB.|Yes|N/A|
|
||||
|`druid.emitter.influxdb.maxQueueSize`|The size of the queue that holds events.|No|Integer.MAX_VALUE(=2^31-1)|
|
||||
|`druid.emitter.influxdb.flushPeriod`|How often (in milliseconds) the events queue is parsed into Line Protocol and POSTed to InfluxDB.|No|60000|
|
||||
|
|
|
@ -91,5 +91,10 @@
|
|||
<artifactId>JUnitParams</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>nl.jqno.equalsverifier</groupId>
|
||||
<artifactId>equalsverifier</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -28,11 +28,18 @@ import org.apache.druid.java.util.emitter.core.Event;
|
|||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.conn.ssl.NoopHostnameVerifier;
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.impl.client.HttpClientBuilder;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.TrustManagerFactory;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.security.KeyStore;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
@ -56,7 +63,7 @@ public class InfluxdbEmitter implements Emitter
|
|||
public InfluxdbEmitter(InfluxdbEmitterConfig influxdbEmitterConfig)
|
||||
{
|
||||
this.influxdbEmitterConfig = influxdbEmitterConfig;
|
||||
this.influxdbClient = HttpClientBuilder.create().build();
|
||||
this.influxdbClient = buildInfluxdbClient();
|
||||
this.eventsQueue = new LinkedBlockingQueue<>(influxdbEmitterConfig.getMaxQueueSize());
|
||||
this.dimensionWhiteList = influxdbEmitterConfig.getDimensionWhitelist();
|
||||
log.info("constructed influxdb emitter");
|
||||
|
@ -96,7 +103,8 @@ public class InfluxdbEmitter implements Emitter
|
|||
public void postToInflux(String payload)
|
||||
{
|
||||
HttpPost post = new HttpPost(
|
||||
"http://" + influxdbEmitterConfig.getHostname()
|
||||
influxdbEmitterConfig.getProtocol() + "://"
|
||||
+ influxdbEmitterConfig.getHostname()
|
||||
+ ":" + influxdbEmitterConfig.getPort()
|
||||
+ "/write?db=" + influxdbEmitterConfig.getDatabaseName()
|
||||
+ "&u=" + influxdbEmitterConfig.getInfluxdbUserName()
|
||||
|
@ -211,4 +219,33 @@ public class InfluxdbEmitter implements Emitter
|
|||
postToInflux(payload.toString());
|
||||
}
|
||||
|
||||
private HttpClient buildInfluxdbClient()
|
||||
{
|
||||
if ("https".equals(influxdbEmitterConfig.getProtocol())) {
|
||||
SSLContext sslContext;
|
||||
if (influxdbEmitterConfig.getTrustStorePath() == null || influxdbEmitterConfig.getTrustStorePassword() == null) {
|
||||
String msg = "Can't load TrustStore. Truststore path or password is not set.";
|
||||
log.error(msg);
|
||||
throw new IllegalStateException(msg);
|
||||
}
|
||||
|
||||
try (FileInputStream in = new FileInputStream(new File(influxdbEmitterConfig.getTrustStorePath()))) {
|
||||
KeyStore store = KeyStore.getInstance(influxdbEmitterConfig.getTrustStoreType());
|
||||
store.load(in, influxdbEmitterConfig.getTrustStorePassword().toCharArray());
|
||||
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
|
||||
tmf.init(store);
|
||||
sslContext = SSLContext.getInstance("TLS");
|
||||
sslContext.init(null, tmf.getTrustManagers(), null);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
String msg = "Unable to load TrustStore";
|
||||
log.error(msg);
|
||||
throw new IllegalStateException(msg);
|
||||
}
|
||||
return HttpClients.custom().setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE).build();
|
||||
} else {
|
||||
return HttpClientBuilder.create().build();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,10 +23,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
|
||||
import java.security.KeyStore;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
public class InfluxdbEmitterConfig
|
||||
|
@ -36,12 +37,21 @@ public class InfluxdbEmitterConfig
|
|||
private static final int DEFAULT_QUEUE_SIZE = Integer.MAX_VALUE;
|
||||
private static final int DEFAULT_FLUSH_PERIOD = 60000; // milliseconds
|
||||
private static final List<String> DEFAULT_DIMENSION_WHITELIST = Arrays.asList("dataSource", "type", "numMetrics", "numDimensions", "threshold", "dimension", "taskType", "taskStatus", "tier");
|
||||
private static final String DEFAULT_PROTOCOL = "http";
|
||||
|
||||
@JsonProperty
|
||||
private final String hostname;
|
||||
@JsonProperty
|
||||
private final Integer port;
|
||||
@JsonProperty
|
||||
private final String protocol;
|
||||
@JsonProperty
|
||||
private final String trustStorePath;
|
||||
@JsonProperty
|
||||
private final String trustStoreType;
|
||||
@JsonProperty
|
||||
private final String trustStorePassword;
|
||||
@JsonProperty
|
||||
private final String databaseName;
|
||||
@JsonProperty
|
||||
private final Integer maxQueueSize;
|
||||
|
@ -56,12 +66,15 @@ public class InfluxdbEmitterConfig
|
|||
@JsonProperty
|
||||
private final ImmutableSet<String> dimensionWhitelist;
|
||||
|
||||
private static Logger log = new Logger(InfluxdbEmitterConfig.class);
|
||||
|
||||
@JsonCreator
|
||||
public InfluxdbEmitterConfig(
|
||||
@JsonProperty("hostname") String hostname,
|
||||
@JsonProperty("port") Integer port,
|
||||
@JsonProperty("protocol") String protocol,
|
||||
@JsonProperty("trustStorePath") String trustStorePath,
|
||||
@JsonProperty("trustStoreType") String trustStoreType,
|
||||
@JsonProperty("trustStorePassword") String trustStorePassword,
|
||||
@JsonProperty("databaseName") String databaseName,
|
||||
@JsonProperty("maxQueueSize") Integer maxQueueSize,
|
||||
@JsonProperty("flushPeriod") Integer flushPeriod,
|
||||
|
@ -73,6 +86,10 @@ public class InfluxdbEmitterConfig
|
|||
{
|
||||
this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null");
|
||||
this.port = port == null ? DEFAULT_PORT : port;
|
||||
this.protocol = protocol == null ? DEFAULT_PROTOCOL : protocol;
|
||||
this.trustStorePath = trustStorePath;
|
||||
this.trustStoreType = trustStoreType == null ? KeyStore.getDefaultType() : trustStoreType;
|
||||
this.trustStorePassword = trustStorePassword;
|
||||
this.databaseName = Preconditions.checkNotNull(databaseName, "databaseName can not be null");
|
||||
this.maxQueueSize = maxQueueSize == null ? DEFAULT_QUEUE_SIZE : maxQueueSize;
|
||||
this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD : flushPeriod;
|
||||
|
@ -88,7 +105,7 @@ public class InfluxdbEmitterConfig
|
|||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof InfluxdbEmitterConfig)) {
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -121,6 +138,18 @@ public class InfluxdbEmitterConfig
|
|||
if (!getDimensionWhitelist().equals(that.getDimensionWhitelist())) {
|
||||
return false;
|
||||
}
|
||||
if (!getProtocol().equals(that.getProtocol())) {
|
||||
return false;
|
||||
}
|
||||
if (getTrustStorePath() != null ? !getTrustStorePath().equals(that.getTrustStorePath()) : that.getTrustStorePath() != null) {
|
||||
return false;
|
||||
}
|
||||
if (!getTrustStoreType().equals(that.getTrustStoreType())) {
|
||||
return false;
|
||||
}
|
||||
if (getTrustStorePassword() != null ? !getTrustStorePassword().equals(that.getTrustStorePassword()) : that.getTrustStorePassword() != null) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
||||
}
|
||||
|
@ -128,16 +157,9 @@ public class InfluxdbEmitterConfig
|
|||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = getHostname().hashCode();
|
||||
result = 31 * result + getPort();
|
||||
result = 31 * result + getDatabaseName().hashCode();
|
||||
result = 31 * result + getFlushPeriod();
|
||||
result = 31 * result + getMaxQueueSize();
|
||||
result = 31 * result + getFlushDelay();
|
||||
result = 31 * result + getInfluxdbUserName().hashCode();
|
||||
result = 31 * result + getInfluxdbPassword().hashCode();
|
||||
result = 31 * result + getDimensionWhitelist().hashCode();
|
||||
return result;
|
||||
return Objects.hash(hostname, port, protocol, trustStorePath, trustStoreType,
|
||||
trustStorePassword, databaseName, flushPeriod, maxQueueSize,
|
||||
flushDelay, influxdbUserName, influxdbPassword, dimensionWhitelist);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -152,6 +174,30 @@ public class InfluxdbEmitterConfig
|
|||
return port;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getProtocol()
|
||||
{
|
||||
return protocol;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getTrustStorePath()
|
||||
{
|
||||
return trustStorePath;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getTrustStoreType()
|
||||
{
|
||||
return trustStoreType;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getTrustStorePassword()
|
||||
{
|
||||
return trustStorePassword;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getDatabaseName()
|
||||
{
|
||||
|
|
|
@ -22,11 +22,13 @@ package org.apache.druid.emitter.influxdb;
|
|||
import com.fasterxml.jackson.databind.InjectableValues;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import nl.jqno.equalsverifier.EqualsVerifier;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.security.KeyStore;
|
||||
import java.util.Arrays;
|
||||
|
||||
public class InfluxdbEmitterConfigTest
|
||||
|
@ -45,6 +47,10 @@ public class InfluxdbEmitterConfigTest
|
|||
influxdbEmitterConfig = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8086,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
|
@ -61,6 +67,10 @@ public class InfluxdbEmitterConfigTest
|
|||
InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8080,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
|
@ -78,6 +88,10 @@ public class InfluxdbEmitterConfigTest
|
|||
InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
|
||||
null,
|
||||
8080,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
|
@ -94,6 +108,10 @@ public class InfluxdbEmitterConfigTest
|
|||
InfluxdbEmitterConfig influxdbEmitterConfigWithNullPort = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
|
@ -112,6 +130,10 @@ public class InfluxdbEmitterConfigTest
|
|||
InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8086,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
|
@ -129,6 +151,10 @@ public class InfluxdbEmitterConfigTest
|
|||
InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8086,
|
||||
"https",
|
||||
"/path",
|
||||
"jks",
|
||||
"password",
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
|
@ -146,6 +172,10 @@ public class InfluxdbEmitterConfigTest
|
|||
InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8086,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
|
@ -162,6 +192,10 @@ public class InfluxdbEmitterConfigTest
|
|||
InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8086,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
|
@ -178,6 +212,10 @@ public class InfluxdbEmitterConfigTest
|
|||
InfluxdbEmitterConfig influxdbEmitterConfig = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8086,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
|
@ -196,6 +234,10 @@ public class InfluxdbEmitterConfigTest
|
|||
InfluxdbEmitterConfig influxdbEmitterConfig = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8086,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
|
@ -208,4 +250,58 @@ public class InfluxdbEmitterConfigTest
|
|||
Assert.assertEquals(expected, influxdbEmitterConfig.getDimensionWhitelist());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigWithNullProtocol()
|
||||
{
|
||||
InfluxdbEmitterConfig influxdbEmitterConfigWithNullProtocol = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8086,
|
||||
null,
|
||||
"path",
|
||||
"jks",
|
||||
"pass",
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
30000,
|
||||
"adam",
|
||||
"password",
|
||||
null
|
||||
);
|
||||
String expectedProtocol = "http";
|
||||
Assert.assertEquals(expectedProtocol, influxdbEmitterConfigWithNullProtocol.getProtocol());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigEquals()
|
||||
{
|
||||
EqualsVerifier.forClass(InfluxdbEmitterConfig.class).withNonnullFields(
|
||||
"hostname", "port", "protocol", "trustStoreType", "databaseName",
|
||||
"maxQueueSize", "flushPeriod", "flushDelay", "influxdbUserName",
|
||||
"influxdbPassword", "dimensionWhitelist"
|
||||
).usingGetClass().verify();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigWithNullTrustStoreType()
|
||||
{
|
||||
InfluxdbEmitterConfig influxdbEmitterConfigWithNullTrustStoreType = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8086,
|
||||
null,
|
||||
"path",
|
||||
null,
|
||||
"pass",
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
30000,
|
||||
"adam",
|
||||
"password",
|
||||
null
|
||||
);
|
||||
String expectedTrustStoreType = KeyStore.getDefaultType();
|
||||
Assert.assertEquals(expectedTrustStoreType, influxdbEmitterConfigWithNullTrustStoreType.getTrustStoreType());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -66,6 +66,10 @@ public class InfluxdbEmitterTest
|
|||
InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8086,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
|
@ -107,6 +111,10 @@ public class InfluxdbEmitterTest
|
|||
InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8086,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
|
@ -149,6 +157,10 @@ public class InfluxdbEmitterTest
|
|||
InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8086,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
|
@ -191,6 +203,10 @@ public class InfluxdbEmitterTest
|
|||
InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8086,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
|
@ -211,4 +227,89 @@ public class InfluxdbEmitterTest
|
|||
{
|
||||
Assert.assertTrue(new InfluxdbEmitterModule().getJacksonModules().isEmpty());
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testBuildInfluxdbClientWithHttpsProtocolAndNoTrustStore()
|
||||
{
|
||||
InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8086,
|
||||
"https",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
30000,
|
||||
"adam",
|
||||
"password",
|
||||
null
|
||||
);
|
||||
InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testBuildInfluxdbClientWithHttpsProtocolAndNullTrustStorePath()
|
||||
{
|
||||
InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8086,
|
||||
"https",
|
||||
null,
|
||||
null,
|
||||
"pass",
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
30000,
|
||||
"adam",
|
||||
"password",
|
||||
null
|
||||
);
|
||||
InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testBuildInfluxdbClientWithHttpsProtocolAndNullTrustStorePassword()
|
||||
{
|
||||
InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8086,
|
||||
"https",
|
||||
"path",
|
||||
null,
|
||||
null,
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
30000,
|
||||
"adam",
|
||||
"password",
|
||||
null
|
||||
);
|
||||
InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testUnableToLoadTrustStore()
|
||||
{
|
||||
InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
|
||||
"localhost",
|
||||
8086,
|
||||
"https",
|
||||
"path",
|
||||
null,
|
||||
"pass",
|
||||
"dbname",
|
||||
10000,
|
||||
15000,
|
||||
30000,
|
||||
"adam",
|
||||
"password",
|
||||
null
|
||||
);
|
||||
InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue