[Monitoring] Allow HTTP Exporter to be given custom HTTP Headers for every request

This allows X-Pack Monitoring 5.0 to be given custom headers for any request.

Original commit: elastic/x-pack-elasticsearch@04050181bf
This commit is contained in:
Chris Earle 2016-08-09 17:26:08 -04:00
parent 632871be84
commit c41550d36a
3 changed files with 315 additions and 54 deletions
elasticsearch/x-pack/monitoring/src
main/java/org/elasticsearch/xpack/monitoring/agent/exporter/http
test/java/org/elasticsearch/xpack/monitoring/agent/exporter/http

@ -12,12 +12,12 @@ import org.elasticsearch.SpecialPermission;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
@ -55,7 +55,10 @@ import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -70,6 +73,17 @@ public class HttpExporter extends Exporter {
public static final String AUTH_USERNAME_SETTING = "auth.username";
public static final String AUTH_PASSWORD_SETTING = "auth.password";
/**
* A parent setting to header key/value pairs, whose names are user defined.
*/
public static final String HEADERS = "headers";
/**
* Blacklist of headers that the user is not allowed to set.
* <p>
* Headers are blacklisted if they have the opportunity to break things and we won't be guaranteed to overwrite them.
*/
public static final Set<String> BLACKLISTED_HEADERS = Collections.unmodifiableSet(Sets.newHashSet("Content-Length", "Content-Type"));
// es level timeout used when checking and writing templates (used to speed up tests)
public static final String TEMPLATE_CHECK_TIMEOUT_SETTING = "index.template.master_timeout";
@ -103,6 +117,10 @@ public class HttpExporter extends Exporter {
@Nullable
final TimeValue templateCheckTimeout;
/**
* Headers supplied by the user to send (likely to a proxy for routing).
*/
private final @Nullable Map<String, String[]> headers;
volatile boolean checkedAndUploadedIndexTemplate = false;
volatile boolean supportedClusterVersion = false;
@ -113,18 +131,14 @@ public class HttpExporter extends Exporter {
public HttpExporter(Config config, Environment env) {
super(config);
this.env = env;
hosts = config.settings().getAsArray(HOST_SETTING, Strings.EMPTY_ARRAY);
if (hosts.length == 0) {
throw new SettingsException("missing required setting [" + settingFQN(HOST_SETTING) + "]");
}
validateHosts(hosts);
auth = resolveAuth(config.settings());
connectionTimeout = config.settings().getAsTime(CONNECTION_TIMEOUT_SETTING, TimeValue.timeValueMillis(6000));
connectionReadTimeout = config.settings().getAsTime(CONNECTION_READ_TIMEOUT_SETTING,
this.hosts = resolveHosts(config.settings());
this.auth = resolveAuth(config.settings());
// allow the user to configure headers
this.headers = configureHeaders(config.settings());
this.connectionTimeout = config.settings().getAsTime(CONNECTION_TIMEOUT_SETTING, TimeValue.timeValueMillis(6000));
this.connectionReadTimeout = config.settings().getAsTime(CONNECTION_READ_TIMEOUT_SETTING,
TimeValue.timeValueMillis(connectionTimeout.millis() * 10));
// HORRIBLE!!! We can't use settings.getAsTime(..) !!!
@ -151,6 +165,53 @@ public class HttpExporter extends Exporter {
Strings.arrayToCommaDelimitedString(hosts), MonitoringIndexNameResolver.PREFIX);
}
private String[] resolveHosts(Settings settings) {
final String[] hosts = settings.getAsArray(HOST_SETTING);
if (hosts.length == 0) {
throw new SettingsException("missing required setting [" + settingFQN(HOST_SETTING) + "]");
}
for (String host : hosts) {
try {
HttpExporterUtils.parseHostWithPath(host, "");
} catch (URISyntaxException | MalformedURLException e) {
throw new SettingsException("[" + settingFQN(HOST_SETTING) + "] invalid host: [" + host + "]", e);
}
}
return hosts;
}
private Map<String, String[]> configureHeaders(Settings settings) {
final Settings headerSettings = settings.getAsSettings(HEADERS);
final Set<String> names = headerSettings.names();
// Most users won't define headers
if (names.isEmpty()) {
return null;
}
final Map<String, String[]> headers = new HashMap<>();
// record and validate each header as best we can
for (final String name : names) {
if (BLACKLISTED_HEADERS.contains(name)) {
throw new SettingsException("[" + name + "] cannot be overwritten via [" + settingFQN("headers") + "]");
}
final String[] values = headerSettings.getAsArray(name);
if (values.length == 0) {
throw new SettingsException("headers must have values, missing for setting [" + settingFQN("headers." + name) + "]");
}
headers.put(name, values);
}
return Collections.unmodifiableMap(headers);
}
ResolversRegistry getResolvers() {
return resolvers;
}
@ -333,7 +394,17 @@ public class HttpExporter extends Exporter {
private HttpURLConnection openConnection(String host, String method, String path, @Nullable String contentType) {
try {
final URL url = HttpExporterUtils.parseHostWithPath(host, path);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
// Custom Headers must be set before we manually apply headers, so that our headers beat custom ones
if (headers != null) {
// Headers can technically be duplicated, although it's not expected to be used frequently
for (final Map.Entry<String, String[]> header : headers.entrySet()) {
for (final String value : header.getValue()) {
conn.addRequestProperty(header.getKey(), value);
}
}
}
if (conn instanceof HttpsURLConnection && sslSocketFactory != null) {
final HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
@ -531,17 +602,6 @@ public class HttpExporter extends Exporter {
}
}
private static void validateHosts(String[] hosts) {
for (String host : hosts) {
try {
HttpExporterUtils.parseHostWithPath(host, "");
} catch (URISyntaxException | MalformedURLException e) {
throw new SettingsException("[xpack.monitoring.exporters] invalid host: [" + host + "]." +
" error: [" + e.getMessage() + "]");
}
}
}
/**
* SSL Initialization *
*/

@ -0,0 +1,136 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.agent.exporter.http;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.monitoring.agent.exporter.Exporter;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
/**
* Tests for {@link HttpExporter}.
*/
public class HttpExporterSimpleTests extends ESTestCase {
private final Environment environment = mock(Environment.class);
public void testExporterWithBlacklistedHeaders() {
final String blacklistedHeader = randomFrom(HttpExporter.BLACKLISTED_HEADERS);
final String expected = "[" + blacklistedHeader + "] cannot be overwritten via [xpack.monitoring.exporters._http.headers]";
final Settings.Builder builder = Settings.builder()
.put("xpack.monitoring.exporters._http.type", HttpExporter.TYPE)
.put("xpack.monitoring.exporters._http.host", "http://localhost:9200")
.put("xpack.monitoring.exporters._http.headers.abc", "xyz")
.put("xpack.monitoring.exporters._http.headers." + blacklistedHeader, "value should not matter");
if (randomBoolean()) {
builder.put("xpack.monitoring.exporters._http.headers.xyz", "abc");
}
final Exporter.Config config = createConfig("_http", builder.build());
final SettingsException exception = expectThrows(SettingsException.class, () -> {
new HttpExporter(config, environment);
});
assertThat(exception.getMessage(), equalTo(expected));
}
public void testExporterWithEmptyHeaders() {
final String name = randomFrom("abc", "ABC", "X-Flag");
final String expected = "headers must have values, missing for setting [xpack.monitoring.exporters._http.headers." + name + "]";
final Settings.Builder builder = Settings.builder()
.put("xpack.monitoring.exporters._http.type", HttpExporter.TYPE)
.put("xpack.monitoring.exporters._http.host", "localhost:9200")
.put("xpack.monitoring.exporters._http.headers." + name, "");
if (randomBoolean()) {
builder.put("xpack.monitoring.exporters._http.headers.xyz", "abc");
}
final Exporter.Config config = createConfig("_http", builder.build());
final SettingsException exception = expectThrows(SettingsException.class, () -> {
new HttpExporter(config, environment);
});
assertThat(exception.getMessage(), equalTo(expected));
}
public void testExporterWithMissingHost() {
// forgot host!
final Settings.Builder builder = Settings.builder()
.put("xpack.monitoring.exporters._http.type", HttpExporter.TYPE);
if (randomBoolean()) {
builder.put("xpack.monitoring.exporters._http.host", "");
} else if (randomBoolean()) {
builder.putArray("xpack.monitoring.exporters._http.host");
} else if (randomBoolean()) {
builder.putNull("xpack.monitoring.exporters._http.host");
}
final Exporter.Config config = createConfig("_http", builder.build());
final SettingsException exception = expectThrows(SettingsException.class, () -> {
new HttpExporter(config, environment);
});
assertThat(exception.getMessage(), equalTo("missing required setting [xpack.monitoring.exporters._http.host]"));
}
public void testExporterWithInvalidHost() {
final String invalidHost = randomFrom("://localhost:9200", "gopher!://xyz.my.com");
final Settings.Builder builder = Settings.builder()
.put("xpack.monitoring.exporters._http.type", HttpExporter.TYPE);
// sometimes add a valid URL with it
if (randomBoolean()) {
if (randomBoolean()) {
builder.putArray("xpack.monitoring.exporters._http.host", "localhost:9200", invalidHost);
} else {
builder.putArray("xpack.monitoring.exporters._http.host", invalidHost, "localhost:9200");
}
} else {
builder.put("xpack.monitoring.exporters._http.host", invalidHost);
}
final Exporter.Config config = createConfig("_http", builder.build());
final SettingsException exception = expectThrows(SettingsException.class, () -> {
new HttpExporter(config, environment);
});
assertThat(exception.getMessage(), equalTo("[xpack.monitoring.exporters._http.host] invalid host: [" + invalidHost + "]"));
}
public void testExporterWithHostOnly() {
final Settings.Builder builder = Settings.builder()
.put("xpack.monitoring.exporters._http.type", "http")
.put("xpack.monitoring.exporters._http.host", "http://localhost:9200");
final Exporter.Config config = createConfig("_http", builder.build());
new HttpExporter(config, environment);
}
/**
* Create the {@link Exporter.Config} with the given name, and select those settings from {@code settings}.
*
* @param name The name of the exporter.
* @param settings The settings to select the exporter's settings from
* @return Never {@code null}.
*/
private static Exporter.Config createConfig(String name, Settings settings) {
return new Exporter.Config(name, HttpExporter.TYPE, Settings.EMPTY, settings.getAsSettings("xpack.monitoring.exporters." + name));
}
}

@ -20,8 +20,10 @@ import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ESIntegTestCase;
@ -35,7 +37,6 @@ import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.monitoring.agent.resolver.bulk.MonitoringBulkTimestampedResolver;
import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase;
import org.hamcrest.Matchers;
import org.joda.time.format.DateTimeFormat;
import org.junit.After;
import org.junit.Before;
@ -45,6 +46,7 @@ import java.net.BindException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -52,7 +54,10 @@ import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ -85,6 +90,41 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
webServer.shutdown();
}
private void assertMonitorTemplates() throws InterruptedException {
assertMonitorTemplates(null);
}
private void assertMonitorTemplates(final @Nullable Map<String, String[]> customHeaders) throws InterruptedException {
RecordedRequest request;
for (Map.Entry<String, String> template : monitoringTemplates().entrySet()) {
request = webServer.takeRequest();
assertThat(request.getMethod(), equalTo("GET"));
assertThat(request.getPath(), equalTo("/_template/" + template.getKey()));
assertHeaders(request, customHeaders);
request = webServer.takeRequest();
assertThat(request.getMethod(), equalTo("PUT"));
assertThat(request.getPath(), equalTo("/_template/" + template.getKey()));
assertThat(request.getBody().readUtf8(), equalTo(template.getValue()));
assertHeaders(request, customHeaders);
}
}
private void assertHeaders(final RecordedRequest request, final Map<String, String[]> customHeaders) {
if (customHeaders != null) {
for (final Map.Entry<String, String[]> entry : customHeaders.entrySet()) {
final String header = entry.getKey();
final String[] values = entry.getValue();
final List<String> headerValues = request.getHeaders().values(header);
assertThat(header, headerValues, hasSize(values.length));
assertThat(header, headerValues, containsInAnyOrder(values));
}
}
}
public void testExport() throws Exception {
enqueueGetClusterVersionResponse(Version.CURRENT);
for (String template : monitoringTemplates().keySet()) {
@ -111,16 +151,7 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/"));
for (Map.Entry<String, String> template : monitoringTemplates().entrySet()) {
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + template.getKey()));
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("PUT"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + template.getKey()));
assertThat(recordedRequest.getBody().readUtf8(), equalTo(template.getValue()));
}
assertMonitorTemplates();
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("POST"));
@ -129,6 +160,55 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
assertBulkRequest(recordedRequest.getBody(), nbDocs);
}
public void testExportWithHeaders() throws Exception {
final String headerValue = randomAsciiOfLengthBetween(3, 9);
final String[] array = generateRandomStringArray(2, 4, false);
final Map<String, String[]> headers = new HashMap<>();
headers.put("X-Cloud-Cluster", new String[] { headerValue });
headers.put("X-Found-Cluster", new String[] { headerValue });
headers.put("Array-Check", array);
enqueueGetClusterVersionResponse(Version.CURRENT);
for (String template : monitoringTemplates().keySet()) {
enqueueResponse(404, "template [" + template + "] does not exist");
enqueueResponse(201, "template [" + template + "] created");
}
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
Settings.Builder builder = Settings.builder()
.put(MonitoringSettings.INTERVAL.getKey(), "-1")
.put("xpack.monitoring.exporters._http.type", "http")
.put("xpack.monitoring.exporters._http.host", webServer.getHostName() + ":" + webServer.getPort())
.put("xpack.monitoring.exporters._http.connection.keep_alive", false)
.put("xpack.monitoring.exporters._http.update_mappings", false)
.put("xpack.monitoring.exporters._http.headers.X-Cloud-Cluster", headerValue)
.put("xpack.monitoring.exporters._http.headers.X-Found-Cluster", headerValue)
.putArray("xpack.monitoring.exporters._http.headers.Array-Check", array);
internalCluster().startNode(builder);
final int nbDocs = randomIntBetween(1, 25);
export(newRandomMonitoringDocs(nbDocs));
assertThat(webServer.getRequestCount(), equalTo(2 + monitoringTemplates().size() * 2));
RecordedRequest recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/"));
assertHeaders(recordedRequest, headers);
assertMonitorTemplates(headers);
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("POST"));
assertThat(recordedRequest.getPath(), equalTo("/_bulk"));
assertHeaders(recordedRequest, headers);
assertBulkRequest(recordedRequest.getBody(), nbDocs);
}
public void testDynamicHostChange() {
// disable exporting to be able to use non valid hosts
Settings.Builder builder = Settings.builder()
@ -140,17 +220,17 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.putArray("xpack.monitoring.exporters._http.host", "test1")));
assertThat(getExporter(nodeName).hosts, Matchers.arrayContaining("test1"));
assertThat(getExporter(nodeName).hosts, arrayContaining("test1"));
// wipes the non array settings
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.putArray("xpack.monitoring.exporters._http.host", "test2")
.put("xpack.monitoring.exporters._http.host", "")));
assertThat(getExporter(nodeName).hosts, Matchers.arrayContaining("test2"));
assertThat(getExporter(nodeName).hosts, arrayContaining("test2"));
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.putArray("xpack.monitoring.exporters._http.host", "test3")));
assertThat(getExporter(nodeName).hosts, Matchers.arrayContaining("test3"));
assertThat(getExporter(nodeName).hosts, arrayContaining("test3"));
}
public void testHostChangeReChecksTemplate() throws Exception {
@ -162,8 +242,6 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
.put("xpack.monitoring.exporters._http.connection.keep_alive", false)
.put("xpack.monitoring.exporters._http.update_mappings", false);
logger.info("--> starting node");
enqueueGetClusterVersionResponse(Version.CURRENT);
for (String template : monitoringTemplates().keySet()) {
enqueueResponse(404, "template [" + template + "] does not exist");
@ -173,7 +251,6 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
String agentNode = internalCluster().startNode(builder);
logger.info("--> exporting data");
HttpExporter exporter = getExporter(agentNode);
assertThat(exporter.supportedClusterVersion, is(false));
export(Collections.singletonList(newRandomMonitoringDoc()));
@ -185,16 +262,7 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/"));
for (Map.Entry<String, String> template : monitoringTemplates().entrySet()) {
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + template.getKey()));
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("PUT"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + template.getKey()));
assertThat(recordedRequest.getBody().readUtf8(), equalTo(template.getValue()));
}
assertMonitorTemplates();
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("POST"));
@ -278,15 +346,12 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
.put("xpack.monitoring.exporters._http.host", webServer.getHostName() + ":" + webServer.getPort())
.put("xpack.monitoring.exporters._http.connection.keep_alive", false);
logger.info("--> starting node");
// returning an unsupported cluster version
enqueueGetClusterVersionResponse(randomFrom(Version.fromString("0.18.0"), Version.fromString("1.0.0"),
Version.fromString("1.4.0")));
String agentNode = internalCluster().startNode(builder);
logger.info("--> exporting data");
HttpExporter exporter = getExporter(agentNode);
assertThat(exporter.supportedClusterVersion, is(false));
assertNull(exporter.openBulk());