diff --git a/marvel/src/main/java/org/elasticsearch/marvel/MarvelPlugin.java b/marvel/src/main/java/org/elasticsearch/marvel/MarvelPlugin.java index 19b99276254..7e12b7cfef0 100644 --- a/marvel/src/main/java/org/elasticsearch/marvel/MarvelPlugin.java +++ b/marvel/src/main/java/org/elasticsearch/marvel/MarvelPlugin.java @@ -7,7 +7,6 @@ package org.elasticsearch.marvel; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterModule; -import org.elasticsearch.cluster.settings.Validator; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.logging.ESLogger; @@ -16,7 +15,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.marvel.agent.AgentService; import org.elasticsearch.marvel.agent.collector.CollectorModule; import org.elasticsearch.marvel.agent.exporter.ExporterModule; -import org.elasticsearch.marvel.agent.exporter.HttpESExporter; +import org.elasticsearch.marvel.agent.exporter.Exporters; import org.elasticsearch.marvel.agent.renderer.RendererModule; import org.elasticsearch.marvel.agent.settings.MarvelModule; import org.elasticsearch.marvel.agent.settings.MarvelSetting; @@ -97,14 +96,7 @@ public class MarvelPlugin extends Plugin { } public void onModule(ClusterModule module) { - // HttpESExporter - module.registerClusterDynamicSetting(HttpESExporter.SETTINGS_INDEX_TIME_FORMAT, Validator.EMPTY); - module.registerClusterDynamicSetting(HttpESExporter.SETTINGS_HOSTS, Validator.EMPTY); - module.registerClusterDynamicSetting(HttpESExporter.SETTINGS_HOSTS + ".*", Validator.EMPTY); - module.registerClusterDynamicSetting(HttpESExporter.SETTINGS_TIMEOUT, Validator.EMPTY); - module.registerClusterDynamicSetting(HttpESExporter.SETTINGS_READ_TIMEOUT, Validator.EMPTY); - module.registerClusterDynamicSetting(HttpESExporter.SETTINGS_SSL_HOSTNAME_VERIFICATION, Validator.EMPTY); - + Exporters.registerDynamicSettings(module); // MarvelSettingsService for (MarvelSetting setting : MarvelSettings.dynamicSettings()) { module.registerClusterDynamicSetting(setting.dynamicSettingName(), setting.dynamicValidator()); diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java index 64f1f504567..fe3f0b18cb3 100644 --- a/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java +++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.marvel.agent.collector.Collector; import org.elasticsearch.marvel.agent.collector.cluster.ClusterInfoCollector; import org.elasticsearch.marvel.agent.exporter.Exporter; +import org.elasticsearch.marvel.agent.exporter.Exporters; import org.elasticsearch.marvel.agent.exporter.MarvelDoc; import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.marvel.license.LicenseService; @@ -32,18 +33,18 @@ public class AgentService extends AbstractLifecycleComponent imple private final MarvelSettings marvelSettings; private final Collection collectors; - private final Collection exporters; + private final Exporters exporters; @Inject public AgentService(Settings settings, NodeSettingsService nodeSettingsService, LicenseService licenseService, MarvelSettings marvelSettings, - Set collectors, Set exporters) { + Set collectors, Exporters exporters) { super(settings); this.marvelSettings = marvelSettings; this.samplingInterval = marvelSettings.interval().millis(); this.collectors = Collections.unmodifiableSet(filterCollectors(collectors, marvelSettings.collectors())); - this.exporters = Collections.unmodifiableSet(exporters); + this.exporters = exporters; nodeSettingsService.addListener(this); logger.trace("marvel is running in [{}] mode", licenseService.mode()); @@ -88,26 +89,15 @@ public class AgentService extends AbstractLifecycleComponent imple @Override protected void doStart() { - if (exporters.size() == 0) { - return; - } - for (Collector collector : collectors) { collector.start(); } - - for (Exporter exporter : exporters) { - exporter.start(); - } - + exporters.start(); applyIntervalSettings(); } @Override protected void doStop() { - if (exporters.size() == 0) { - return; - } if (workerThread != null && workerThread.isAlive()) { exportingWorker.closed = true; workerThread.interrupt(); @@ -123,9 +113,7 @@ public class AgentService extends AbstractLifecycleComponent imple collector.stop(); } - for (Exporter exporter : exporters) { - exporter.stop(); - } + exporters.stop(); } @Override @@ -139,11 +127,6 @@ public class AgentService extends AbstractLifecycleComponent imple } } - // used for testing - public Collection getExporters() { - return exporters; - } - @Override public void onRefreshSettings(Settings settings) { TimeValue newSamplingInterval = settings.getAsTime(MarvelSettings.INTERVAL, null); diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/AbstractExporter.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/AbstractExporter.java deleted file mode 100644 index 1002774d015..00000000000 --- a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/AbstractExporter.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.marvel.agent.exporter; - - -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; - -import java.util.Collection; - -public abstract class AbstractExporter extends AbstractLifecycleComponent implements Exporter { - - private final String name; - - protected final ClusterService clusterService; - - @Inject - public AbstractExporter(Settings settings, String name, ClusterService clusterService) { - super(settings); - this.name = name; - this.clusterService = clusterService; - } - - @Override - public String name() { - return name; - } - - @Override - public T start() { - logger.debug("starting exporter [{}]", name()); - return super.start(); - } - - @Override - protected void doStart() { - } - - protected boolean masterOnly() { - return false; - } - - @Override - public void export(Collection marvelDocs) { - if (masterOnly() && !clusterService.state().nodes().localNodeMaster()) { - logger.trace("exporter [{}] runs on master only", name()); - return; - } - - if (marvelDocs == null) { - logger.debug("no objects to export for [{}]", name()); - return; - } - - try { - doExport(marvelDocs); - } catch (Exception e) { - logger.error("export [{}] throws exception when exporting data", e, name()); - } - } - - protected abstract void doExport(Collection marvelDocs) throws Exception; - - @Override - public T stop() { - logger.debug("stopping exporter [{}]", name()); - return super.stop(); - } - - @Override - protected void doStop() { - } - - @Override - public void close() { - logger.trace("closing exporter [{}]", name()); - super.close(); - } - - @Override - protected void doClose() { - } -} \ No newline at end of file diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporter.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporter.java index c7688aa3ebb..46d111e8f38 100644 --- a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporter.java +++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporter.java @@ -5,13 +5,98 @@ */ package org.elasticsearch.marvel.agent.exporter; -import org.elasticsearch.common.component.LifecycleComponent; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.marvel.shield.MarvelSettingsFilter; import java.util.Collection; -public interface Exporter extends LifecycleComponent { +public abstract class Exporter { - String name(); + protected final String type; + protected final Config config; + protected final ESLogger logger; + + public Exporter(String type, Config config) { + this.type = type; + this.config = config; + this.logger = config.logger(getClass()); + } + + public String type() { + return type; + } + + public String name() { + return config.name; + } + + public boolean masterOnly() { + return false; + } + + public abstract void export(Collection marvelDocs) throws Exception; + + public abstract void close(); + + protected String settingFQN(String setting) { + return Exporters.EXPORTERS_SETTING + "." + config.name + "." + setting; + } + + public static class Config { + + private final String name; + private final boolean enabled; + private final Settings globalSettings; + private final Settings settings; + + public Config(String name, Settings globalSettings, Settings settings) { + this.name = name; + this.globalSettings = globalSettings; + this.settings = settings; + this.enabled = settings.getAsBoolean("enabled", true); + } + + public String name() { + return name; + } + + public boolean enabled() { + return enabled; + } + + public Settings settings() { + return settings; + } + + public ESLogger logger(Class clazz) { + return Loggers.getLogger(clazz, globalSettings); + } + } + + public static abstract class Factory { + + private final String type; + private final boolean singleton; + + public Factory(String type, boolean singleton) { + this.type = type; + this.singleton = singleton; + } + + public String type() { + return type; + } + + public boolean singleton() { + return singleton; + } + + public void filterOutSensitiveSettings(String prefix, MarvelSettingsFilter filter) { + } + + public abstract E create(Config config); + } - void export(Collection marvelDocs); } \ No newline at end of file diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExporterModule.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExporterModule.java index 7011009bdd9..4940a980181 100644 --- a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExporterModule.java +++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExporterModule.java @@ -6,37 +6,38 @@ package org.elasticsearch.marvel.agent.exporter; import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.multibindings.Multibinder; +import org.elasticsearch.common.inject.multibindings.MapBinder; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.marvel.agent.exporter.http.HttpExporter; +import org.elasticsearch.marvel.agent.exporter.local.LocalExporter; -import java.util.HashSet; -import java.util.Set; +import java.util.HashMap; +import java.util.Map; public class ExporterModule extends AbstractModule { - private final Set> exporters = new HashSet<>(); + private final Map>> exporterFactories = new HashMap<>(); private final Settings settings; public ExporterModule(Settings settings) { this.settings = settings; - - // TODO do we need to choose what exporters to bind based on settings? - - // Registers default exporter - registerExporter(HttpESExporter.class); + registerExporter(HttpExporter.TYPE, HttpExporter.Factory.class); + registerExporter(LocalExporter.TYPE, LocalExporter.Factory.class); } @Override protected void configure() { - Multibinder binder = Multibinder.newSetBinder(binder(), Exporter.class); - for (Class exporter : exporters) { - bind(exporter).asEagerSingleton(); - binder.addBinding().to(exporter); + bind(Exporters.class).asEagerSingleton(); + MapBinder factoryBinder = MapBinder.newMapBinder(binder(), String.class, Exporter.Factory.class); + for (Map.Entry>> entry : exporterFactories.entrySet()) { + bind(entry.getValue()).asEagerSingleton(); + factoryBinder.addBinding(entry.getKey()).to(entry.getValue()); } + } - public void registerExporter(Class exporter) { - exporters.add(exporter); + public void registerExporter(String type, Class> factory) { + exporterFactories.put(type, factory); } } diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporters.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporters.java new file mode 100644 index 00000000000..606c1aed40a --- /dev/null +++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporters.java @@ -0,0 +1,197 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.agent.exporter; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cluster.ClusterModule; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.settings.Validator; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsException; +import org.elasticsearch.marvel.agent.exporter.local.LocalExporter; +import org.elasticsearch.marvel.shield.MarvelSettingsFilter; +import org.elasticsearch.node.settings.NodeSettingsService; + +import java.util.*; + +/** + * + */ +public class Exporters extends AbstractLifecycleComponent implements Iterable, NodeSettingsService.Listener { + + static final String EXPORTERS_SETTING = "marvel.agent.exporters"; + + private final Map factories; + private final MarvelSettingsFilter settingsFilter; + private final ClusterService clusterService; + + private volatile InternalExporters exporters = InternalExporters.EMPTY; + + @Inject + public Exporters(Settings settings, Map factories, + MarvelSettingsFilter settingsFilter, ClusterService clusterService, + NodeSettingsService nodeSettingsService) { + + super(settings); + this.factories = factories; + this.settingsFilter = settingsFilter; + this.clusterService = clusterService; + nodeSettingsService.addListener(this); + } + + @Override + protected void doStart() { + exporters = initExporters(settings.getAsSettings(EXPORTERS_SETTING)); + } + + @Override + protected void doStop() { + ElasticsearchException exception = null; + for (Exporter exporter : exporters) { + try { + exporter.close(); + } catch (Exception e) { + logger.error("exporter [{}] failed to close cleanly", e, exporter.name()); + if (exception == null) { + exception = new ElasticsearchException("failed to cleanly close exporters"); + } + exception.addSuppressed(e); + } + } + if (exception != null) { + throw exception; + } + } + + @Override + protected void doClose() { + } + + public Exporter getExporter(String name) { + return exporters.get(name); + } + + @Override + public Iterator iterator() { + return exporters.iterator(); + } + + public void export(Collection marvelDocs) { + for (Exporter exporter : exporters) { + if (exporter.masterOnly() && !clusterService.localNode().masterNode()) { + // the exporter is supposed to only run on the master node, but we're not + // the master node... so skipping + continue; + } + try { + exporter.export(marvelDocs); + } catch (Exception e) { + logger.error("exporter [{}] failed to export marvel data", e, exporter.name()); + } + } + } + + @Override + public void onRefreshSettings(Settings settings) { + InternalExporters existing = exporters; + Settings updatedSettings = settings.getAsSettings(EXPORTERS_SETTING); + if (updatedSettings.names().isEmpty()) { + return; + } + this.exporters = initExporters(Settings.builder() + .put(existing.settings) + .put(updatedSettings) + .build()); + existing.close(logger); + } + + InternalExporters initExporters(Settings settings) { + Set singletons = new HashSet<>(); + Map exporters = new HashMap<>(); + boolean hasDisabled = false; + for (String name : settings.names()) { + Settings exporterSettings = settings.getAsSettings(name); + String type = exporterSettings.get("type"); + if (type == null) { + throw new SettingsException("missing exporter type for [" + name + "] exporter"); + } + Exporter.Factory factory = factories.get(type); + if (factory == null) { + throw new SettingsException("unknown exporter type [" + type + "] set for exporter [" + name + "]"); + } + factory.filterOutSensitiveSettings(EXPORTERS_SETTING + ".*.", settingsFilter); + Exporter.Config config = new Exporter.Config(name, settings, exporterSettings); + if (!config.enabled()) { + hasDisabled = true; + if (logger.isDebugEnabled()) { + logger.debug("exporter [{}/{}] is disabled", type, name); + } + continue; + } + if (factory.singleton()) { + // this is a singleton exporter factory, let's make sure we didn't already registered one + // (there can only be one instance of a singleton exporter) + if (singletons.contains(type)) { + throw new SettingsException("multiple [" + type + "] exporters are configured. there can " + + "only be one [" + type + "] exporter configured"); + } + singletons.add(type); + } + exporters.put(config.name(), factory.create(config)); + } + + // no exporters are configured, lets create a default local one. + // + // NOTE: if there are exporters configured and they're all disabled, we don't + // fallback on the default + // + if (exporters.isEmpty() && !hasDisabled) { + Exporter.Config config = new Exporter.Config("default_" + LocalExporter.TYPE, settings, Settings.EMPTY); + exporters.put(config.name(), factories.get(LocalExporter.TYPE).create(config)); + } + + return new InternalExporters(settings, exporters); + } + + public static void registerDynamicSettings(ClusterModule clusterModule) { + clusterModule.registerClusterDynamicSetting(EXPORTERS_SETTING + "*", Validator.EMPTY); + } + + static class InternalExporters implements Iterable { + + static final InternalExporters EMPTY = new InternalExporters(Settings.EMPTY, Collections.emptyMap()); + + final Settings settings; + final Map exporters; + + public InternalExporters(Settings settings, Map exporters) { + this.settings = settings; + this.exporters = exporters; + } + + @Override + public Iterator iterator() { + return exporters.values().iterator(); + } + + public Exporter get(String name) { + return exporters.get(name); + } + + void close(ESLogger logger) { + for (Exporter exporter : exporters.values()) { + try { + exporter.close(); + } catch (Exception e) { + logger.error("failed to close exporter [{}]", e, exporter.name()); + } + } + } + } +} diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/LocalExporter.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/LocalExporter.java deleted file mode 100644 index 185c31f031c..00000000000 --- a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/LocalExporter.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.marvel.agent.exporter; - -import org.elasticsearch.client.Client; -import org.elasticsearch.common.component.Lifecycle; -import org.elasticsearch.common.component.LifecycleListener; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.marvel.shield.SecuredClient; - -import java.util.Collection; - -/** - * - */ -public class LocalExporter implements Exporter { - - public static final String NAME = "local"; - - private final Client client; - - @Inject - public LocalExporter(SecuredClient client) { - this.client = client; - } - - @Override - public String name() { - return NAME; - } - - @Override - public void export(Collection marvelDocs) { - - } - - @Override - public Lifecycle.State lifecycleState() { - return null; - } - - @Override - public void addLifecycleListener(LifecycleListener lifecycleListener) { - - } - - @Override - public void removeLifecycleListener(LifecycleListener lifecycleListener) { - - } - - @Override - public LocalExporter start() { - return null; - } - - @Override - public LocalExporter stop() { - return null; - } - - @Override - public void close() { - - } -} diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/HttpESExporter.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java similarity index 59% rename from marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/HttpESExporter.java rename to marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java index 613188c13dd..c8ff91602a6 100644 --- a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/HttpESExporter.java +++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java @@ -3,35 +3,30 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.marvel.agent.exporter; +package org.elasticsearch.marvel.agent.exporter.http; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.common.Base64; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.BoundTransportAddress; +import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.Environment; -import org.elasticsearch.http.HttpServer; +import org.elasticsearch.marvel.agent.exporter.Exporter; +import org.elasticsearch.marvel.agent.exporter.MarvelDoc; import org.elasticsearch.marvel.agent.renderer.Renderer; import org.elasticsearch.marvel.agent.renderer.RendererRegistry; import org.elasticsearch.marvel.agent.settings.MarvelSettings; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.service.NodeService; -import org.elasticsearch.node.settings.NodeSettingsService; +import org.elasticsearch.marvel.shield.MarvelSettingsFilter; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @@ -48,142 +43,158 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Map; -public class HttpESExporter extends AbstractExporter implements NodeSettingsService.Listener { +/** + * + */ +public class HttpExporter extends Exporter { - private static final String NAME = "es_exporter"; + public static final String TYPE = "http"; - private static final String SETTINGS_PREFIX = "marvel.agent.exporter.es."; - public static final String SETTINGS_HOSTS = SETTINGS_PREFIX + "hosts"; - public static final String SETTINGS_INDEX_TIME_FORMAT = SETTINGS_PREFIX + "index.timeformat"; - public static final String SETTINGS_TIMEOUT = SETTINGS_PREFIX + "timeout"; - public static final String SETTINGS_READ_TIMEOUT = SETTINGS_PREFIX + "read_timeout"; + public static final String HOST_SETTING = "host"; + public static final String INDEX_NAME_TIME_FORMAT_SETTING = "index.name.time_format"; + public static final String CONNECTION_TIMEOUT_SETTING = "connection.timeout"; + public static final String CONNECTION_READ_TIMEOUT_SETTING = "connection.read_timeout"; + public static final String AUTH_USERNAME_SETTING = "auth.username"; + public static final String AUTH_PASSWORD_SETTING = "auth.password"; // es level timeout used when checking and writing templates (used to speed up tests) - public static final String SETTINGS_CHECK_TEMPLATE_TIMEOUT = SETTINGS_PREFIX + ".template.master_timeout"; + public static final String TEMPLATE_CHECK_TIMEOUT_SETTING = "index.template.master_timeout"; // es level timeout used for bulk indexing (used to speed up tests) - public static final String SETTINGS_BULK_TIMEOUT = SETTINGS_PREFIX + ".bulk.timeout"; + public static final String BULK_TIMEOUT_SETTING = "bulk.timeout"; - public static final String DEFAULT_INDEX_TIME_FORMAT = "YYYY.MM.dd"; + public static final String SSL_SETTING = "ssl"; + public static final String SSL_PROTOCOL_SETTING = SSL_SETTING + ".protocol"; + public static final String SSL_TRUSTSTORE_SETTING = SSL_SETTING + ".truststore.path"; + public static final String SSL_TRUSTSTORE_PASSWORD_SETTING = SSL_SETTING + ".truststore.password"; + public static final String SSL_TRUSTSTORE_ALGORITHM_SETTING = SSL_SETTING + ".truststore.algorithm"; + public static final String SSL_HOSTNAME_VERIFICATION_SETTING = SSL_SETTING + ".hostname_verification"; + + public static final String DEFAULT_INDEX_NAME_TIME_FORMAT = "YYYY.MM.dd"; + + /** Minimum supported version of the remote template **/ + static final Version MIN_SUPPORTED_TEMPLATE_VERSOIN = Version.V_2_0_0_beta2; + + /** Minimum supported version of the remote marvel cluster **/ + static final Version MIN_SUPPORTED_CLUSTER_VERSION = Version.V_2_0_0_beta2; volatile String[] hosts; - volatile boolean boundToLocalNode = false; - volatile DateTimeFormatter indexTimeFormatter; - volatile int timeoutInMillis; - volatile int readTimeoutInMillis; + final TimeValue connectionTimeout; + final TimeValue connectionReadTimeout; + final BasicAuth auth; + final DateTimeFormatter indexTimeFormatter; /** https support * */ final SSLSocketFactory sslSocketFactory; - volatile boolean hostnameVerification; + final boolean hostnameVerification; - final ClusterService clusterService; - final ClusterName clusterName; - final NodeService nodeService; - final Environment environment; - final RendererRegistry registry; + final Environment env; + final RendererRegistry rendererRegistry; - HttpServer httpServer; - final boolean httpEnabled; - - @Nullable - final TimeValue templateCheckTimeout; - @Nullable - final TimeValue bulkTimeout; + final @Nullable TimeValue templateCheckTimeout; + final @Nullable TimeValue bulkTimeout; volatile boolean checkedAndUploadedIndexTemplate = false; volatile boolean supportedClusterVersion = false; + + /** Version of the built-in template **/ final Version templateVersion; - /** Minimum supported version of the remote template **/ - final Version minCompatibleTemplateVersion = Version.V_2_0_0_beta2; - - /** Minimum supported version of the remote marvel cluster **/ - final Version minCompatibleClusterVersion = Version.V_2_0_0_beta2; - final ConnectionKeepAliveWorker keepAliveWorker; Thread keepAliveThread; - @Inject - public HttpESExporter(Settings settings, ClusterService clusterService, ClusterName clusterName, - NodeSettingsService nodeSettingsService, - NodeService nodeService, Environment environment, - RendererRegistry registry) { - super(settings, NAME, clusterService); + public HttpExporter(Exporter.Config config, Environment env, RendererRegistry rendererRegistry) { - this.clusterService = clusterService; - - this.clusterName = clusterName; - this.nodeService = nodeService; - this.environment = environment; - this.registry = registry; - - httpEnabled = settings.getAsBoolean(Node.HTTP_ENABLED, true); - - hosts = settings.getAsArray(SETTINGS_HOSTS, Strings.EMPTY_ARRAY); + super(TYPE, config); + this.env = env; + this.rendererRegistry = rendererRegistry; + hosts = config.settings().getAsArray(HOST_SETTING, Strings.EMPTY_ARRAY); + if (hosts.length == 0) { + throw new SettingsException("missing required setting [" + settingFQN(HOST_SETTING) + "]"); + } validateHosts(hosts); - String indexTimeFormat = settings.get(SETTINGS_INDEX_TIME_FORMAT, DEFAULT_INDEX_TIME_FORMAT); + auth = resolveAuth(config.settings()); + + String indexTimeFormat = config.settings().get(INDEX_NAME_TIME_FORMAT_SETTING, DEFAULT_INDEX_NAME_TIME_FORMAT); try { - logger.debug("checking that index time format [{}] is correct", indexTimeFormat); indexTimeFormatter = DateTimeFormat.forPattern(indexTimeFormat).withZoneUTC(); } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Invalid marvel index time format [" + indexTimeFormat + "] configured in setting [" + SETTINGS_INDEX_TIME_FORMAT + "]", e); + throw new IllegalArgumentException("invalid marvel index name time format [" + indexTimeFormat + "] set for [" + settingFQN(INDEX_NAME_TIME_FORMAT_SETTING) + "]", e); } - timeoutInMillis = (int) settings.getAsTime(SETTINGS_TIMEOUT, new TimeValue(6000)).millis(); - readTimeoutInMillis = (int) settings.getAsTime(SETTINGS_READ_TIMEOUT, new TimeValue(timeoutInMillis * 10)).millis(); + connectionTimeout = config.settings().getAsTime(CONNECTION_TIMEOUT_SETTING, TimeValue.timeValueMillis(6000)); + connectionReadTimeout = config.settings().getAsTime(CONNECTION_READ_TIMEOUT_SETTING, TimeValue.timeValueMillis(connectionTimeout.millis() * 10)); - templateCheckTimeout = settings.getAsTime(SETTINGS_CHECK_TEMPLATE_TIMEOUT, null); - bulkTimeout = settings.getAsTime(SETTINGS_BULK_TIMEOUT, null); + // HORRIBLE!!! We can't use settings.getAsTime(..) !!! + // WE MUST FIX THIS IN CORE... + // TimeValue SHOULD NOT SELECTIVELY CHOOSE WHAT FIELDS TO PARSE BASED ON THEIR NAMES!!!! + String templateCheckTimeoutValue = config.settings().get(TEMPLATE_CHECK_TIMEOUT_SETTING, null); + templateCheckTimeout = TimeValue.parseTimeValue(templateCheckTimeoutValue, null, settingFQN(TEMPLATE_CHECK_TIMEOUT_SETTING)); + bulkTimeout = config.settings().getAsTime(BULK_TIMEOUT_SETTING, null); keepAliveWorker = new ConnectionKeepAliveWorker(); - nodeSettingsService.addListener(this); - if (!settings.getByPrefix(SETTINGS_SSL_PREFIX).getAsMap().isEmpty()) { - sslSocketFactory = createSSLSocketFactory(settings); - } else { - logger.trace("no ssl context configured"); - sslSocketFactory = null; - } - hostnameVerification = settings.getAsBoolean(SETTINGS_SSL_HOSTNAME_VERIFICATION, true); + sslSocketFactory = createSSLSocketFactory(config.settings().getAsSettings(SSL_SETTING)); + hostnameVerification = config.settings().getAsBoolean(SSL_HOSTNAME_VERIFICATION_SETTING, true); // Checks that the built-in template is versioned - templateVersion = HttpESExporterUtils.parseTemplateVersion(HttpESExporterUtils.loadDefaultTemplate()); + templateVersion = HttpExporterUtils.parseTemplateVersion(HttpExporterUtils.loadDefaultTemplate()); if (templateVersion == null) { throw new IllegalStateException("unable to find built-in template version"); } - logger.debug("initialized with targets: {}, index prefix [{}], index time format [{}], template version [{}]", - HttpESExporterUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(hosts)), + logger.debug("initialized with hosts [{}], index prefix [{}], index time format [{}], template version [{}]", + Strings.arrayToCommaDelimitedString(hosts), MarvelSettings.MARVEL_INDICES_PREFIX, indexTimeFormat, templateVersion); } - static private void validateHosts(String[] hosts) { - for (String host : hosts) { - try { - HttpESExporterUtils.parseHostWithPath(host, ""); - } catch (URISyntaxException e) { - throw new RuntimeException("[marvel.agent.exporter] invalid host: [" + HttpESExporterUtils.santizeUrlPwds(host) + "]." + - " error: [" + HttpESExporterUtils.santizeUrlPwds(e.getMessage()) + "]"); - } catch (MalformedURLException e) { - throw new RuntimeException("[marvel.agent.exporter] invalid host: [" + HttpESExporterUtils.santizeUrlPwds(host) + "]." + - " error: [" + HttpESExporterUtils.santizeUrlPwds(e.getMessage()) + "]"); + @Override + public void export(Collection marvelDocs) throws Exception { + HttpURLConnection connection = openExportingConnection(); + if (connection == null) { + return; + } + + if ((marvelDocs != null) && (!marvelDocs.isEmpty())) { + OutputStream os = connection.getOutputStream(); + + // We need to use a buffer to render each Marvel document + // because the renderer might close the outputstream (ex: XContentBuilder) + try (BytesStreamOutput buffer = new BytesStreamOutput()) { + for (MarvelDoc marvelDoc : marvelDocs) { + render(marvelDoc, buffer); + + // write the result to the connection + os.write(buffer.bytes().toBytes()); + buffer.reset(); + } + } finally { + try { + sendCloseExportingConnection(connection); + } catch (IOException e) { + logger.error("failed sending data to [{}]: {}", connection.getURL(), ExceptionsHelper.detailedMessage(e)); + throw e; + } } } } @Override - public String name() { - return NAME; - } - - @Inject(optional = true) - public void setHttpServer(HttpServer httpServer) { - this.httpServer = httpServer; + public void close() { + if (keepAliveThread != null && keepAliveThread.isAlive()) { + keepAliveWorker.closed = true; + keepAliveThread.interrupt(); + try { + keepAliveThread.join(6000); + } catch (InterruptedException e) { + // don't care. + } + } } private HttpURLConnection openExportingConnection() { @@ -200,17 +211,17 @@ public class HttpESExporter extends AbstractExporter implements return conn; } - private void render(OutputStream os, MarvelDoc marvelDoc) throws IOException { + private void render(MarvelDoc marvelDoc, OutputStream out) throws IOException { final XContentType xContentType = XContentType.SMILE; // Get the appropriate renderer in order to render the MarvelDoc - Renderer renderer = registry.renderer(marvelDoc.type()); + Renderer renderer = rendererRegistry.renderer(marvelDoc.type()); if (renderer == null) { - logger.warn("unable to render marvel document of type [{}]: no renderer found in registry", marvelDoc.type()); + logger.warn("unable to render marvel document of type [{}]. no renderer found in registry", marvelDoc.type()); return; } - try (XContentBuilder builder = new XContentBuilder(xContentType.xContent(), os)) { + try (XContentBuilder builder = new XContentBuilder(xContentType.xContent(), out)) { // Builds the bulk action metadata line builder.startObject(); @@ -228,23 +239,18 @@ public class HttpESExporter extends AbstractExporter implements builder.endObject(); // Adds action metadata line bulk separator - renderBulkSeparator(builder); + builder.flush(); // Flush is needed here because the separator is written directly in the builder's stream + builder.stream().write(builder.contentType().xContent().streamSeparator()); // Render the MarvelDoc - renderer.render(marvelDoc,xContentType, os); + renderer.render(marvelDoc, xContentType, out); // Adds final bulk separator - renderBulkSeparator(builder); + builder.flush(); + builder.stream().write(builder.contentType().xContent().streamSeparator()); } } - private void renderBulkSeparator(XContentBuilder builder) throws IOException { - // Flush is needed here... - builder.flush(); - //... because the separator is written directly in the builder's stream - builder.stream().write(builder.contentType().xContent().streamSeparator()); - } - @SuppressWarnings("unchecked") private void sendCloseExportingConnection(HttpURLConnection conn) throws IOException { logger.trace("sending content"); @@ -273,66 +279,6 @@ public class HttpESExporter extends AbstractExporter implements } } - @Override - protected void doExport(Collection marvelDocs) throws Exception { - HttpURLConnection connection = openExportingConnection(); - if (connection == null) { - return; - } - - if ((marvelDocs != null) && (!marvelDocs.isEmpty())) { - OutputStream os = connection.getOutputStream(); - - // We need to use a buffer to render each Marvel document - // because the renderer might close the outputstream (ex: XContentBuilder) - try (BytesStreamOutput buffer = new BytesStreamOutput()) { - for (MarvelDoc marvelDoc : marvelDocs) { - render(buffer, marvelDoc); - - // write the result to the connection - os.write(buffer.bytes().toBytes()); - buffer.reset(); - } - } finally { - try { - sendCloseExportingConnection(connection); - } catch (IOException e) { - logger.error("error sending data to [{}]: {}", HttpESExporterUtils.santizeUrlPwds(connection.getURL()), HttpESExporterUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(e))); - } - } - } - } - - @Override - protected void doStart() { - // not initializing keep alive worker here but rather upon first exporting. - // In the case we are sending metrics to the same ES as where the plugin is hosted - // we want to give it some time to start. - } - - - @Override - protected void doStop() { - if (keepAliveThread != null && keepAliveThread.isAlive()) { - keepAliveWorker.closed = true; - keepAliveThread.interrupt(); - try { - keepAliveThread.join(6000); - } catch (InterruptedException e) { - // don't care. - } - } - } - - @Override - protected void doClose() { - } - - // used for testing - String[] getHosts() { - return hosts; - } - String getIndexName() { return MarvelSettings.MARVEL_INDICES_PREFIX + indexTimeFormatter.print(System.currentTimeMillis()); @@ -344,41 +290,6 @@ public class HttpESExporter extends AbstractExporter implements * @return a url connection to the selected host or null if no current host is available. */ private HttpURLConnection openAndValidateConnection(String method, String path, String contentType) { - if (hosts.length == 0) { - // Due to how Guice injection works and because HttpServer can be optional, - // we can't be 100% sure that the HttpServer is created when the ESExporter - // instance is created. This is specially true in integration tests. - // So if HttpServer is enabled in settings we can safely use the NodeService - // to retrieve the bound address. - BoundTransportAddress boundAddress = null; - if (httpEnabled) { - if ((httpServer != null) && (httpServer.lifecycleState() == Lifecycle.State.STARTED)) { - logger.debug("deriving host setting from httpServer"); - boundAddress = httpServer.info().address(); - } else if (nodeService.info().getHttp() != null) { - logger.debug("deriving host setting from node info API"); - boundAddress = nodeService.info().getHttp().address(); - } - } else { - logger.warn("http server is not enabled no hosts are manually configured"); - return null; - } - - String[] extractedHosts = HttpESExporterUtils.extractHostsFromAddress(boundAddress, logger); - if (extractedHosts == null || extractedHosts.length == 0) { - return null; - } - hosts = extractedHosts; - logger.trace("auto-resolved hosts to {}", (Object)extractedHosts); - boundToLocalNode = true; - } - - // it's important to have boundToLocalNode persistent to prevent calls during shutdown (causing ugly exceptions) - if (boundToLocalNode && (httpServer != null) && (httpServer.lifecycleState() != Lifecycle.State.STARTED)) { - logger.debug("local node http server is not started. can't connect"); - return null; - } - // out of for to move faulty hosts to the end int hostIndex = 0; try { @@ -388,16 +299,16 @@ public class HttpESExporter extends AbstractExporter implements try { Version remoteVersion = loadRemoteClusterVersion(host); if (remoteVersion == null) { - logger.warn("unable to check remote cluster version: no version found on host [" + HttpESExporterUtils.santizeUrlPwds(host) + "]"); + logger.warn("unable to check remote cluster version: no version found on host [" + host + "]"); continue; } - supportedClusterVersion = remoteVersion.onOrAfter(minCompatibleClusterVersion); + supportedClusterVersion = remoteVersion.onOrAfter(MIN_SUPPORTED_CLUSTER_VERSION); if (!supportedClusterVersion) { - logger.error("remote cluster version [" + remoteVersion + "] is not supported, please use a cluster with minimum version [" + minCompatibleClusterVersion + "]"); + logger.error("remote cluster version [" + remoteVersion + "] is not supported, please use a cluster with minimum version [" + MIN_SUPPORTED_CLUSTER_VERSION + "]"); continue; } } catch (ElasticsearchException e) { - logger.error("exception when checking remote cluster version on host [{}]", e, HttpESExporterUtils.santizeUrlPwds(host)); + logger.error("exception when checking remote cluster version on host [{}]", e, host); continue; } } @@ -425,11 +336,11 @@ public class HttpESExporter extends AbstractExporter implements System.arraycopy(hosts, hostIndex, newHosts, 0, hosts.length - hostIndex); System.arraycopy(hosts, 0, newHosts, hosts.length - hostIndex, hostIndex); hosts = newHosts; - logger.debug("preferred target host is now [{}]", HttpESExporterUtils.santizeUrlPwds(hosts[0])); + logger.debug("preferred target host is now [{}]", hosts[0]); } } - logger.error("could not connect to any configured elasticsearch instances: [{}]", HttpESExporterUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(hosts))); + logger.error("could not connect to any configured elasticsearch instances [{}]", Strings.arrayToCommaDelimitedString(hosts)); return null; @@ -438,7 +349,7 @@ public class HttpESExporter extends AbstractExporter implements /** open a connection to the given hosts, returning null when not successful * */ private HttpURLConnection openConnection(String host, String method, String path, @Nullable String contentType) { try { - final URL url = HttpESExporterUtils.parseHostWithPath(host, path); + final URL url = HttpExporterUtils.parseHostWithPath(host, path); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); if (conn instanceof HttpsURLConnection && sslSocketFactory != null) { @@ -450,14 +361,13 @@ public class HttpESExporter extends AbstractExporter implements } conn.setRequestMethod(method); - conn.setConnectTimeout(timeoutInMillis); - conn.setReadTimeout(readTimeoutInMillis); + conn.setConnectTimeout((int) connectionTimeout.getMillis()); + conn.setReadTimeout((int) connectionReadTimeout.getMillis()); if (contentType != null) { conn.setRequestProperty("Content-Type", contentType); } - if (url.getUserInfo() != null) { - String basicAuth = "Basic " + Base64.encodeBytes(url.getUserInfo().getBytes("ISO-8859-1")); - conn.setRequestProperty("Authorization", basicAuth); + if (auth != null) { + auth.apply(conn); } conn.setUseCaches(false); if (method.equalsIgnoreCase("POST") || method.equalsIgnoreCase("PUT")) { @@ -467,17 +377,17 @@ public class HttpESExporter extends AbstractExporter implements return conn; } catch (URISyntaxException e) { - logErrorBasedOnLevel(e, "error parsing host [{}]", HttpESExporterUtils.santizeUrlPwds(host)); + logErrorBasedOnLevel(e, "error parsing host [{}]", host); } catch (IOException e) { - logErrorBasedOnLevel(e, "error connecting to [{}]", HttpESExporterUtils.santizeUrlPwds(host)); + logErrorBasedOnLevel(e, "error connecting to [{}]", host); } return null; } private void logErrorBasedOnLevel(Throwable t, String msg, Object... params) { - logger.error(msg + " [" + HttpESExporterUtils.santizeUrlPwds(t.getMessage()) + "]", params); + logger.error(msg + " [" + t.getMessage() + "]", params); if (logger.isDebugEnabled()) { - logger.debug(msg + ". full error details:\n[{}]", params, HttpESExporterUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(t))); + logger.debug(msg + ". full error details:\n[{}]", params, ExceptionsHelper.detailedMessage(t)); } } @@ -489,16 +399,16 @@ public class HttpESExporter extends AbstractExporter implements try { connection = openConnection(host, "GET", "/", null); if (connection == null) { - throw new ElasticsearchException("unable to check remote cluster version: no available connection for host [" + HttpESExporterUtils.santizeUrlPwds(host) + "]"); + throw new ElasticsearchException("unable to check remote cluster version: no available connection for host [" + host + "]"); } try (InputStream is = connection.getInputStream()) { ByteArrayOutputStream out = new ByteArrayOutputStream(); Streams.copy(is, out); - return HttpESExporterUtils.parseElasticsearchVersion(out.toByteArray()); + return HttpExporterUtils.parseElasticsearchVersion(out.toByteArray()); } } catch (IOException e) { - throw new ElasticsearchException("failed to verify the remote cluster version on host [" + HttpESExporterUtils.santizeUrlPwds(host) + "]:\n" + HttpESExporterUtils.santizeUrlPwds(e.getMessage())); + throw new ElasticsearchException("failed to verify the remote cluster version on host [" + host + "]:\n" + e.getMessage()); } finally { if (connection != null) { try { @@ -545,22 +455,22 @@ public class HttpESExporter extends AbstractExporter implements } if ((remoteTemplate == null) || (remoteTemplate.length == 0)) { - logger.error("unable to load remote marvel template on host [{}]", HttpESExporterUtils.santizeUrlPwds(host)); + logger.error("unable to load remote marvel template on host [{}]", host); return false; } - Version remoteVersion = HttpESExporterUtils.parseTemplateVersion(remoteTemplate); - logger.debug("detected existing remote template in version [{}] on host [{}]", remoteVersion, HttpESExporterUtils.santizeUrlPwds(host)); + Version remoteVersion = HttpExporterUtils.parseTemplateVersion(remoteTemplate); + logger.debug("detected existing remote template in version [{}] on host [{}]", remoteVersion, host); if (remoteVersion == null) { logger.warn("marvel template version cannot be found: template will be updated to version [{}]", templateVersion); } else { - if (remoteVersion.before(minCompatibleTemplateVersion)) { + if (remoteVersion.before(MIN_SUPPORTED_TEMPLATE_VERSOIN)) { logger.error("marvel template version [{}] is below the minimum compatible version [{}] on host [{}]: " - + "please manually update the marvel template to a more recent version" - + "and delete the current active marvel index (don't forget to back up it first if needed)", - remoteVersion, minCompatibleTemplateVersion, HttpESExporterUtils.santizeUrlPwds(host)); + + "please manually update the marvel template to a more recent version" + + "and delete the current active marvel index (don't forget to back up it first if needed)", + remoteVersion, MIN_SUPPORTED_TEMPLATE_VERSOIN, host); return false; } @@ -580,7 +490,7 @@ public class HttpESExporter extends AbstractExporter implements } } } catch (IOException e) { - logger.error("failed to verify the marvel template to [{}]:\n{}", HttpESExporterUtils.santizeUrlPwds(host), HttpESExporterUtils.santizeUrlPwds(e.getMessage())); + logger.error("failed to verify the marvel template to [{}]:\n{}", host, e.getMessage()); return false; } finally { if (connection != null) { @@ -602,7 +512,7 @@ public class HttpESExporter extends AbstractExporter implements } logger.debug("loading marvel pre-configured template"); - byte[] template = HttpESExporterUtils.loadDefaultTemplate(); + byte[] template = HttpExporterUtils.loadDefaultTemplate(); // Uploads the template and closes the outputstream Streams.copy(template, connection.getOutputStream()); @@ -614,7 +524,7 @@ public class HttpESExporter extends AbstractExporter implements logger.info("marvel template updated to version [{}]", templateVersion); } catch (IOException e) { - logger.error("failed to update the marvel template to [{}]:\n{}", HttpESExporterUtils.santizeUrlPwds(host), HttpESExporterUtils.santizeUrlPwds(e.getMessage())); + logger.error("failed to update the marvel template to [{}]:\n{}", host, e.getMessage()); return false; } finally { @@ -641,59 +551,116 @@ public class HttpESExporter extends AbstractExporter implements try { logger.error("{} response code [{} {}]. content: [{}]", - HttpESExporterUtils.santizeUrlPwds(msg), conn.getResponseCode(), - HttpESExporterUtils.santizeUrlPwds(conn.getResponseMessage()), - HttpESExporterUtils.santizeUrlPwds(err)); + msg, conn.getResponseCode(), + conn.getResponseMessage(), + err); } catch (IOException e) { - logger.error("{}. connection had an error while reporting the error. tough life.", HttpESExporterUtils.santizeUrlPwds(msg)); - } - } - - @Override - public void onRefreshSettings(Settings settings) { - TimeValue newTimeout = settings.getAsTime(SETTINGS_TIMEOUT, null); - if (newTimeout != null) { - logger.info("connection timeout set to [{}]", newTimeout); - timeoutInMillis = (int) newTimeout.millis(); - } - - newTimeout = settings.getAsTime(SETTINGS_READ_TIMEOUT, null); - if (newTimeout != null) { - logger.info("connection read timeout set to [{}]", newTimeout); - readTimeoutInMillis = (int) newTimeout.millis(); - } - - String[] newHosts = settings.getAsArray(SETTINGS_HOSTS, null); - if (newHosts != null) { - logger.info("hosts set to [{}]", HttpESExporterUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(newHosts))); - this.hosts = newHosts; - this.checkedAndUploadedIndexTemplate = false; - this.supportedClusterVersion = false; - this.boundToLocalNode = false; - } - - Boolean newHostnameVerification = settings.getAsBoolean(SETTINGS_SSL_HOSTNAME_VERIFICATION, null); - if (newHostnameVerification != null) { - logger.info("hostname verification set to [{}]", newHostnameVerification); - this.hostnameVerification = newHostnameVerification; - } - - String newIndexTimeFormat = settings.get(SETTINGS_INDEX_TIME_FORMAT, null); - if (newIndexTimeFormat != null) { - try { - indexTimeFormatter = DateTimeFormat.forPattern(newIndexTimeFormat).withZoneUTC(); - } catch (IllegalArgumentException e) { - logger.error("Unable to update marvel index time format: format [" + newIndexTimeFormat + "] is invalid", e); - } + logger.error("{}. connection had an error while reporting the error. tough life.", msg); } } protected void initKeepAliveThread() { - keepAliveThread = new Thread(keepAliveWorker, EsExecutors.threadName(settings, "keep_alive")); + keepAliveThread = new Thread(keepAliveWorker, "marvel-exporter[" + config.name() + "][keep_alive]"); keepAliveThread.setDaemon(true); keepAliveThread.start(); } + + static private void validateHosts(String[] hosts) { + for (String host : hosts) { + try { + HttpExporterUtils.parseHostWithPath(host, ""); + } catch (URISyntaxException e) { + throw new SettingsException("[marvel.agent.exporter] invalid host: [" + host + "]." + + " error: [" + e.getMessage() + "]"); + } catch (MalformedURLException e) { + throw new SettingsException("[marvel.agent.exporter] invalid host: [" + host + "]." + + " error: [" + e.getMessage() + "]"); + } + } + } + + /** SSL Initialization * */ + public SSLSocketFactory createSSLSocketFactory(Settings settings) { + if (settings.names().isEmpty()) { + logger.trace("no ssl context configured"); + return null; + } + SSLContext sslContext; + // Initialize sslContext + try { + String protocol = settings.get(SSL_PROTOCOL_SETTING, "TLS"); + String trustStore = settings.get(SSL_TRUSTSTORE_SETTING, System.getProperty("javax.net.ssl.trustStore")); + String trustStorePassword = settings.get(SSL_TRUSTSTORE_PASSWORD_SETTING, System.getProperty("javax.net.ssl.trustStorePassword")); + String trustStoreAlgorithm = settings.get(SSL_TRUSTSTORE_ALGORITHM_SETTING, System.getProperty("ssl.TrustManagerFactory.algorithm")); + + if (trustStore == null) { + throw new SettingsException("missing required setting [" + SSL_TRUSTSTORE_SETTING + "]"); + } + + if (trustStoreAlgorithm == null) { + trustStoreAlgorithm = TrustManagerFactory.getDefaultAlgorithm(); + } + + logger.debug("using ssl trust store [{}] with algorithm [{}]", trustStore, trustStoreAlgorithm); + + Path trustStorePath = env.configFile().resolve(trustStore); + if (!Files.exists(trustStorePath)) { + throw new SettingsException("could not find trust store file [" + trustStorePath + "]"); + } + + TrustManager[] trustManagers; + try (InputStream trustStoreStream = Files.newInputStream(trustStorePath)) { + // Load TrustStore + KeyStore ks = KeyStore.getInstance("jks"); + ks.load(trustStoreStream, trustStorePassword == null ? null : trustStorePassword.toCharArray()); + + // Initialize a trust manager factory with the trusted store + TrustManagerFactory trustFactory = TrustManagerFactory.getInstance(trustStoreAlgorithm); + trustFactory.init(ks); + + // Retrieve the trust managers from the factory + trustManagers = trustFactory.getTrustManagers(); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize a TrustManagerFactory", e); + } + + sslContext = SSLContext.getInstance(protocol); + sslContext.init(null, trustManagers, null); + + } catch (Exception e) { + throw new ElasticsearchException("failed to initialize ssl", e); + } + return sslContext.getSocketFactory(); + } + + BasicAuth resolveAuth(Settings setting) { + String username = setting.get(AUTH_USERNAME_SETTING, null); + String password = setting.get(AUTH_PASSWORD_SETTING, null); + if (username == null && password == null) { + return null; + } + if (username == null) { + throw new SettingsException("invalid auth setting. missing [" + settingFQN(AUTH_USERNAME_SETTING) + "]"); + } + return new BasicAuth(username, password); + } + + /** + * Trust all hostname verifier. This simply returns true to completely disable hostname verification + */ + static class TrustAllHostnameVerifier implements HostnameVerifier { + static final HostnameVerifier INSTANCE = new TrustAllHostnameVerifier(); + + private TrustAllHostnameVerifier() { + } + + @Override + public boolean verify(String s, SSLSession sslSession) { + return true; + } + } + /** * Sadly we need to make sure we keep the connection open to the target ES a * Java's connection pooling closes connections if idle for 5sec. @@ -717,7 +684,7 @@ public class HttpESExporter extends AbstractExporter implements } HttpURLConnection conn = openConnection(currentHosts[0], "GET", "", null); if (conn == null) { - logger.trace("keep alive thread shutting down. failed to open connection to current host [{}]", HttpESExporterUtils.santizeUrlPwds(currentHosts[0])); + logger.trace("keep alive thread shutting down. failed to open connection to current host [{}]", currentHosts[0]); return; } else { conn.getInputStream().close(); // close and release to connection pool. @@ -726,83 +693,50 @@ public class HttpESExporter extends AbstractExporter implements // ignore, if closed, good.... } catch (Throwable t) { logger.debug("error in keep alive thread, shutting down (will be restarted after a successful connection has been made) {}", - HttpESExporterUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(t))); + ExceptionsHelper.detailedMessage(t)); return; } } } } - private static final String SETTINGS_SSL_PREFIX = SETTINGS_PREFIX + "ssl."; + static class BasicAuth { - public static final String SETTINGS_SSL_PROTOCOL = SETTINGS_SSL_PREFIX + "protocol"; - public static final String SETTINGS_SSL_TRUSTSTORE = SETTINGS_SSL_PREFIX + "truststore.path"; - public static final String SETTINGS_SSL_TRUSTSTORE_PASSWORD = SETTINGS_SSL_PREFIX + "truststore.password"; - public static final String SETTINGS_SSL_TRUSTSTORE_ALGORITHM = SETTINGS_SSL_PREFIX + "truststore.algorithm"; - public static final String SETTINGS_SSL_HOSTNAME_VERIFICATION = SETTINGS_SSL_PREFIX + "hostname_verification"; + String username; + char[] password; - /** SSL Initialization * */ - public SSLSocketFactory createSSLSocketFactory(Settings settings) { - SSLContext sslContext; - // Initialize sslContext - try { - String sslContextProtocol = settings.get(SETTINGS_SSL_PROTOCOL, "TLS"); - String trustStore = settings.get(SETTINGS_SSL_TRUSTSTORE, System.getProperty("javax.net.ssl.trustStore")); - String trustStorePassword = settings.get(SETTINGS_SSL_TRUSTSTORE_PASSWORD, System.getProperty("javax.net.ssl.trustStorePassword")); - String trustStoreAlgorithm = settings.get(SETTINGS_SSL_TRUSTSTORE_ALGORITHM, System.getProperty("ssl.TrustManagerFactory.algorithm")); - - if (trustStore == null) { - throw new RuntimeException("truststore is not configured, use " + SETTINGS_SSL_TRUSTSTORE); - } - - if (trustStoreAlgorithm == null) { - trustStoreAlgorithm = TrustManagerFactory.getDefaultAlgorithm(); - } - - logger.debug("SSL: using trustStore[{}], trustAlgorithm[{}]", trustStore, trustStoreAlgorithm); - - Path trustStorePath = environment.configFile().resolve(trustStore); - if (!Files.exists(trustStorePath)) { - throw new FileNotFoundException("Truststore at path [" + trustStorePath + "] does not exist"); - } - - TrustManager[] trustManagers; - try (InputStream trustStoreStream = Files.newInputStream(trustStorePath)) { - // Load TrustStore - KeyStore ks = KeyStore.getInstance("jks"); - ks.load(trustStoreStream, trustStorePassword == null ? null : trustStorePassword.toCharArray()); - - // Initialize a trust manager factory with the trusted store - TrustManagerFactory trustFactory = TrustManagerFactory.getInstance(trustStoreAlgorithm); - trustFactory.init(ks); - - // Retrieve the trust managers from the factory - trustManagers = trustFactory.getTrustManagers(); - } catch (Exception e) { - throw new RuntimeException("Failed to initialize a TrustManagerFactory", e); - } - - sslContext = SSLContext.getInstance(sslContextProtocol); - sslContext.init(null, trustManagers, null); - } catch (Exception e) { - throw new RuntimeException("[marvel.agent.exporter] failed to initialize the SSLContext", e); + public BasicAuth(String username, String password) { + this.username = username; + this.password = password != null ? password.toCharArray() : null; + } + + void apply(HttpURLConnection connection) throws UnsupportedEncodingException { + String userInfo = username + ":" + (password != null ? new String(password) : ""); + String basicAuth = "Basic " + Base64.encodeBytes(userInfo.getBytes("ISO-8859-1")); + connection.setRequestProperty("Authorization", basicAuth); } - return sslContext.getSocketFactory(); } - /** - * Trust all hostname verifier. This simply returns true to completely disable hostname verification - */ - static class TrustAllHostnameVerifier implements HostnameVerifier { - static final HostnameVerifier INSTANCE = new TrustAllHostnameVerifier(); + public static class Factory extends Exporter.Factory { - private TrustAllHostnameVerifier() { + private final Environment env; + private final RendererRegistry rendererRegistry; + + @Inject + public Factory(Environment env, RendererRegistry rendererRegistry) { + super(TYPE, false); + this.env = env; + this.rendererRegistry = rendererRegistry; } @Override - public boolean verify(String s, SSLSession sslSession) { - return true; + public HttpExporter create(Config config) { + return new HttpExporter(config, env, rendererRegistry); + } + + @Override + public void filterOutSensitiveSettings(String prefix, MarvelSettingsFilter filter) { + filter.filterOut(prefix + AUTH_PASSWORD_SETTING); } } } - diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterUtils.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporterUtils.java similarity index 87% rename from marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterUtils.java rename to marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporterUtils.java index 29d428673ac..746227d2743 100644 --- a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterUtils.java +++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporterUtils.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.marvel.agent.exporter; +package org.elasticsearch.marvel.agent.exporter.http; import org.elasticsearch.Version; import org.elasticsearch.common.Strings; @@ -24,7 +24,7 @@ import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; -public class HttpESExporterUtils { +public class HttpExporterUtils { public static final String MARVEL_TEMPLATE_FILE = "/marvel_index_template.json"; static final String MARVEL_VERSION_FIELD = "marvel_version"; @@ -77,11 +77,7 @@ public class HttpESExporterUtils { if (hostUrl.getPort() == -1) { // url has no port, default to 9200 - sadly we need to rebuild.. StringBuilder newUrl = new StringBuilder(hostUrl.getProtocol() + "://"); - if (hostUrl.getUserInfo() != null) { - newUrl.append(hostUrl.getUserInfo()).append("@"); - } newUrl.append(hostUrl.getHost()).append(":9200").append(hostUrl.toURI().getPath()); - hostUrl = new URL(newUrl.toString()); } @@ -93,7 +89,7 @@ public class HttpESExporterUtils { * Loads the default Marvel template */ public static byte[] loadDefaultTemplate() { - try (InputStream is = HttpESExporterUtils.class.getResourceAsStream(MARVEL_TEMPLATE_FILE)) { + try (InputStream is = HttpExporterUtils.class.getResourceAsStream(MARVEL_TEMPLATE_FILE)) { ByteArrayOutputStream out = new ByteArrayOutputStream(); Streams.copy(is, out); return out.toByteArray(); @@ -134,15 +130,4 @@ public class HttpESExporterUtils { } return null; } - - private static final String userInfoChars = "\\w-\\._~!$&\\'\\(\\)*+,;=%"; - private static Pattern urlPwdSanitizer = Pattern.compile("([" + userInfoChars + "]+?):[" + userInfoChars + "]+?@"); - - public static String santizeUrlPwds(Object text) { - if (text == null) { - return null; - } - Matcher matcher = urlPwdSanitizer.matcher(text.toString()); - return matcher.replaceAll("$1:XXXXXX@"); - } } diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java new file mode 100644 index 00000000000..739b57ebc24 --- /dev/null +++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.agent.exporter.local; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.marvel.agent.exporter.Exporter; +import org.elasticsearch.marvel.agent.exporter.MarvelDoc; +import org.elasticsearch.marvel.shield.SecuredClient; + +import java.util.Collection; + +/** + * + */ +public class LocalExporter extends Exporter { + + public static final String TYPE = "local"; + + private final Client client; + + public LocalExporter(Exporter.Config config, SecuredClient client) { + super(TYPE, config); + this.client = client; + } + + @Override + public void export(Collection marvelDocs) { + } + + @Override + public void close() { + } + + public static class Factory extends Exporter.Factory { + + private final SecuredClient client; + + @Inject + public Factory(SecuredClient client) { + super(TYPE, true); + this.client = client; + } + + @Override + public LocalExporter create(Config config) { + return new LocalExporter(config, client); + } + } +} diff --git a/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java new file mode 100644 index 00000000000..c61a7ab94a2 --- /dev/null +++ b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java @@ -0,0 +1,173 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.agent.exporter; + +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsException; +import org.elasticsearch.marvel.agent.exporter.local.LocalExporter; +import org.elasticsearch.marvel.shield.MarvelSettingsFilter; +import org.elasticsearch.marvel.shield.SecuredClient; +import org.elasticsearch.node.settings.NodeSettingsService; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +/** + * + */ +public class ExportersTests extends ESTestCase { + + private Exporters exporters; + private Map factories; + private MarvelSettingsFilter settingsFilter; + private ClusterService clusterService; + private NodeSettingsService nodeSettingsService; + + @Before + public void init() throws Exception { + factories = new HashMap<>(); + + // we always need to have the local exporter as it serves as the default one + factories.put(LocalExporter.TYPE, new LocalExporter.Factory(mock(SecuredClient.class))); + + settingsFilter = mock(MarvelSettingsFilter.class); + clusterService = mock(ClusterService.class); + nodeSettingsService = mock(NodeSettingsService.class); + exporters = new Exporters(Settings.EMPTY, factories, settingsFilter, clusterService, nodeSettingsService); + } + + @Test + public void testInitExporters_Default() throws Exception { + Exporter.Factory factory = spy(new TestFactory("_type", true)); + factories.put("_type", factory); + Exporters.InternalExporters internalExporters = exporters.initExporters(Settings.builder() + .build()); + + assertThat(internalExporters, notNullValue()); + assertThat(internalExporters.settings.getAsMap().size(), is(0)); + assertThat(internalExporters.exporters.size(), is(1)); + assertThat(internalExporters.exporters, hasKey("default_" + LocalExporter.TYPE)); + assertThat(internalExporters.exporters.get("default_" + LocalExporter.TYPE), instanceOf(LocalExporter.class)); + } + + @Test + public void testInitExporters_Single() throws Exception { + Exporter.Factory factory = spy(new TestFactory("_type", true)); + factories.put("_type", factory); + Exporters.InternalExporters internalExporters = exporters.initExporters(Settings.builder() + .put("_name.type", "_type") + .build()); + + assertThat(internalExporters, notNullValue()); + assertThat(internalExporters.settings.getAsMap().size(), is(1)); + assertThat(internalExporters.settings.getAsMap(), hasEntry("_name.type", "_type")); + assertThat(internalExporters.exporters.size(), is(1)); + assertThat(internalExporters.exporters, hasKey("_name")); + assertThat(internalExporters.exporters.get("_name"), instanceOf(TestFactory.TestExporter.class)); + assertThat(internalExporters.exporters.get("_name").type, is("_type")); + } + + @Test + public void testInitExporters_Single_Disabled() throws Exception { + Exporter.Factory factory = spy(new TestFactory("_type", true)); + factories.put("_type", factory); + Exporters.InternalExporters internalExporters = exporters.initExporters(Settings.builder() + .put("_name.type", "_type") + .put("_name.enabled", false) + .build()); + + assertThat(internalExporters, notNullValue()); + assertThat(internalExporters.settings.getAsMap().size(), is(2)); + assertThat(internalExporters.settings.getAsMap(), hasEntry("_name.type", "_type")); + assertThat(internalExporters.settings.getAsMap(), hasEntry("_name.enabled", "false")); + + // the only configured exporter is disabled... yet we intentionally don't fallback on the default + + assertThat(internalExporters.exporters.size(), is(0)); + } + + @Test(expected = SettingsException.class) + public void testInitExporters_Single_UnknownType() throws Exception { + exporters.initExporters(Settings.builder() + .put("_name.type", "unknown_type") + .build()); + } + + @Test(expected = SettingsException.class) + public void testInitExporters_Single_MissingExporterType() throws Exception { + exporters.initExporters(Settings.builder() + .put("_name.foo", "bar") + .build()); + } + + @Test + public void testInitExporters_Mutliple_SameType() throws Exception { + Exporter.Factory factory = spy(new TestFactory("_type", false)); + factories.put("_type", factory); + Exporters.InternalExporters internalExporters = exporters.initExporters(Settings.builder() + .put("_name0.type", "_type") + .put("_name1.type", "_type") + .build()); + + assertThat(internalExporters, notNullValue()); + assertThat(internalExporters.settings.getAsMap().size(), is(2)); + assertThat(internalExporters.settings.getAsMap(), hasEntry("_name0.type", "_type")); + assertThat(internalExporters.settings.getAsMap(), hasEntry("_name1.type", "_type")); + assertThat(internalExporters.exporters.size(), is(2)); + assertThat(internalExporters.exporters, hasKey("_name0")); + assertThat(internalExporters.exporters.get("_name0"), instanceOf(TestFactory.TestExporter.class)); + assertThat(internalExporters.exporters.get("_name0").type, is("_type")); + assertThat(internalExporters.exporters, hasKey("_name1")); + assertThat(internalExporters.exporters.get("_name1"), instanceOf(TestFactory.TestExporter.class)); + assertThat(internalExporters.exporters.get("_name1").type, is("_type")); + } + + @Test(expected = SettingsException.class) + public void testInitExporters_Mutliple_SameType_Singletons() throws Exception { + Exporter.Factory factory = spy(new TestFactory("_type", true)); + factories.put("_type", factory); + exporters.initExporters(Settings.builder() + .put("_name0.type", "_type") + .put("_name1.type", "_type") + .build()); + } + + static class TestFactory extends Exporter.Factory { + + public TestFactory(String type, boolean singleton) { + super(type, singleton); + } + + @Override + public TestExporter create(Exporter.Config config) { + return new TestExporter(type(), config); + } + + static class TestExporter extends Exporter { + + public TestExporter(String type, Config config) { + super(type, config); + } + + @Override + public void export(Collection marvelDocs) throws Exception { + } + + @Override + public void close() { + } + } + } +} diff --git a/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterTests.java b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterTests.java deleted file mode 100644 index d275120dd79..00000000000 --- a/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterTests.java +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.marvel.agent.exporter; - -import org.elasticsearch.Version; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; -import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.license.plugin.LicensePlugin; -import org.elasticsearch.marvel.MarvelPlugin; -import org.elasticsearch.marvel.agent.AgentService; -import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector; -import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMarvelDoc; -import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryCollector; -import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryMarvelDoc; -import org.elasticsearch.marvel.agent.settings.MarvelSettings; -import org.elasticsearch.node.Node; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.ESIntegTestCase.ClusterScope; -import org.hamcrest.Matchers; -import org.joda.time.format.DateTimeFormat; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.concurrent.atomic.AtomicLong; - -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; - - -// Transport Client instantiation also calls the marvel plugin, which then fails to find modules -@ClusterScope(transportClientRatio = 0.0, scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) -@ESIntegTestCase.SuppressLocalMode -public class HttpESExporterTests extends ESIntegTestCase { - - final static AtomicLong timeStampGenerator = new AtomicLong(); - - @Override - protected Collection> nodePlugins() { - return Arrays.asList(LicensePlugin.class, MarvelPlugin.class); - } - - @Override - protected Collection> transportClientPlugins() { - return nodePlugins(); - } - - @Test - public void testHttpServerOff() { - Settings.Builder builder = Settings.builder() - .put(MarvelSettings.STARTUP_DELAY, "200m") - .put(Node.HTTP_ENABLED, false); - internalCluster().startNode(builder); - HttpESExporter httpEsExporter = getEsExporter(); - logger.info("trying exporting despite of no target"); - httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc())); - } - - @Test - public void testTemplateAdditionDespiteOfLateClusterForming() { - Settings.Builder builder = Settings.builder() - .put(MarvelSettings.STARTUP_DELAY, "200m") - .put(Node.HTTP_ENABLED, true) - .put("discovery.type", "zen") - .put("discovery.zen.ping_timeout", "1s") - .put("discovery.initial_state_timeout", "100ms") - .put("discovery.zen.minimum_master_nodes", 2) - .put(HttpESExporter.SETTINGS_BULK_TIMEOUT, "1s") - .put(HttpESExporter.SETTINGS_CHECK_TEMPLATE_TIMEOUT, "1s"); - - internalCluster().startNode(builder); - - HttpESExporter httpEsExporter = getEsExporter(); - logger.info("exporting events while there is no cluster"); - httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc())); - - logger.info("bringing up a second node"); - internalCluster().startNode(builder); - ensureGreen(); - logger.info("exporting a second event"); - httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc())); - - logger.info("verifying that template has been created"); - assertMarvelTemplateExists(); - } - - @Test - public void testDynamicHostChange() { - // disable exporting to be able to use non valid hosts - Settings.Builder builder = Settings.builder() - .put(MarvelSettings.INTERVAL, "-1"); - internalCluster().startNode(builder); - - HttpESExporter httpEsExporter = getEsExporter(); - - assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().putArray(HttpESExporter.SETTINGS_HOSTS, "test1"))); - assertThat(httpEsExporter.getHosts(), Matchers.arrayContaining("test1")); - - // wipes the non array settings - assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() - .putArray(HttpESExporter.SETTINGS_HOSTS, "test2").put(HttpESExporter.SETTINGS_HOSTS, ""))); - assertThat(httpEsExporter.getHosts(), Matchers.arrayContaining("test2")); - - assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().putArray(HttpESExporter.SETTINGS_HOSTS, "test3"))); - assertThat(httpEsExporter.getHosts(), Matchers.arrayContaining("test3")); - } - - @Test - public void testHostChangeReChecksTemplate() { - Settings.Builder builder = Settings.builder() - .put(MarvelSettings.STARTUP_DELAY, "200m") - .put(Node.HTTP_ENABLED, true); - internalCluster().startNode(builder); - - HttpESExporter httpEsExporter = getEsExporter(); - - logger.info("exporting an event"); - httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc())); - - logger.info("removing the marvel template"); - assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get()); - assertMarvelTemplateNotExists(); - - assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings( - Settings.builder().putArray(HttpESExporter.SETTINGS_HOSTS, httpEsExporter.getHosts())).get()); - - logger.info("exporting a second event"); - httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc())); - - logger.info("verifying that template has been created"); - assertMarvelTemplateExists(); - } - - @Test - public void testHostFailureChecksTemplate() throws Exception { - Settings.Builder builder = Settings.builder() - .put(MarvelSettings.STARTUP_DELAY, "200m") - .put(Node.HTTP_ENABLED, true); - - final String node0 = internalCluster().startNode(builder); - final HttpESExporter httpEsExporter0 = getEsExporter(node0); - assertThat(node0, equalTo(internalCluster().getMasterName())); - - final String node1 = internalCluster().startNode(builder); - final HttpESExporter httpEsExporter1 = getEsExporter(node1); - - logger.info("--> exporting events to force host resolution"); - httpEsExporter0.export(Collections.singletonList(newRandomMarvelDoc())); - httpEsExporter1.export(Collections.singletonList(newRandomMarvelDoc())); - - logger.info("--> setting exporting hosts to {} + {}", httpEsExporter0.getHosts(), httpEsExporter1.getHosts()); - ArrayList mergedHosts = new ArrayList(); - mergedHosts.addAll(Arrays.asList(httpEsExporter0.getHosts())); - mergedHosts.addAll(Arrays.asList(httpEsExporter1.getHosts())); - - assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings( - Settings.builder().putArray(HttpESExporter.SETTINGS_HOSTS, mergedHosts.toArray(Strings.EMPTY_ARRAY))).get()); - - logger.info("--> exporting events to have new settings take effect"); - httpEsExporter0.export(Collections.singletonList(newRandomMarvelDoc())); - httpEsExporter1.export(Collections.singletonList(newRandomMarvelDoc())); - - logger.info("verifying that template has been created"); - assertMarvelTemplateExists(); - - logger.info("--> removing the marvel template"); - assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get()); - assertMarvelTemplateNotExists(); - - logger.info("--> shutting down node0"); - internalCluster().stopCurrentMasterNode(); - - logger.info("--> exporting events from node1"); - // we use assert busy node because url caching may cause the node failure to be only detected while sending the event - assertBusy(new Runnable() { - @Override - public void run() { - httpEsExporter1.export(Collections.singletonList(newRandomMarvelDoc())); - logger.debug("--> checking for template"); - assertMarvelTemplateExists(); - } - }); - } - - @Test - public void testDynamicIndexFormatChange() { - Settings.Builder builder = Settings.builder() - .put(MarvelSettings.STARTUP_DELAY, "200m") - .put(Node.HTTP_ENABLED, true); - String nodeId = internalCluster().startNode(builder); - - logger.info("exporting a first event"); - HttpESExporter httpEsExporter = getEsExporter(nodeId); - httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc())); - - logger.info("checks that the index [{}] is created", httpEsExporter.getIndexName()); - assertTrue(client().admin().indices().prepareExists(httpEsExporter.getIndexName()).get().isExists()); - - String newTimeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM"); - logger.info("updating index time format setting to {}", newTimeFormat); - assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(HttpESExporter.SETTINGS_INDEX_TIME_FORMAT, newTimeFormat))); - - logger.info("exporting a second event"); - httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc())); - - String expectedMarvelIndex = MarvelSettings.MARVEL_INDICES_PREFIX - + DateTimeFormat.forPattern(newTimeFormat).withZoneUTC().print(System.currentTimeMillis()); - - logger.info("checks that the index [{}] is created", expectedMarvelIndex); - assertTrue(client().admin().indices().prepareExists(expectedMarvelIndex).get().isExists()); - - logger.info("verifying that template has been created"); - assertMarvelTemplateExists(); - } - - @Test - public void testLoadRemoteClusterVersion() { - Settings.Builder builder = Settings.builder() - .put(MarvelSettings.STARTUP_DELAY, "200m") - .put(Node.HTTP_ENABLED, true); - String nodeId = internalCluster().startNode(builder); - - HttpESExporter httpEsExporter = getEsExporter(nodeId); - - logger.info("--> exporting events to force host resolution"); - httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc())); - - assertNotNull(httpEsExporter.getHosts()); - assertThat(httpEsExporter.getHosts().length, greaterThan(0)); - - logger.info("--> loading remote cluster version"); - Version resolved = httpEsExporter.loadRemoteClusterVersion(httpEsExporter.getHosts()[0]); - assertTrue(resolved.equals(Version.CURRENT)); - } - - private HttpESExporter getEsExporter() { - AgentService service = internalCluster().getInstance(AgentService.class); - return (HttpESExporter) service.getExporters().iterator().next(); - } - - private HttpESExporter getEsExporter(String node) { - AgentService service = internalCluster().getInstance(AgentService.class, node); - return (HttpESExporter) service.getExporters().iterator().next(); - } - - private MarvelDoc newRandomMarvelDoc() { - if (randomBoolean()) { - return new IndexRecoveryMarvelDoc(internalCluster().getClusterName(), - IndexRecoveryCollector.TYPE, timeStampGenerator.incrementAndGet(), new RecoveryResponse()); - } else { - return new ClusterStateMarvelDoc(internalCluster().getClusterName(), - ClusterStateCollector.TYPE, timeStampGenerator.incrementAndGet(), ClusterState.PROTO, ClusterHealthStatus.GREEN); - } - } - - private void assertMarvelTemplateExists() { - assertTrue("marvel template must exists", isTemplateExists("marvel")); - } - - private void assertMarvelTemplateNotExists() { - assertFalse("marvel template must not exists", isTemplateExists("marvel")); - } - - private boolean isTemplateExists(String templateName) { - for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(templateName).get().getIndexTemplates()) { - if (template.getName().equals(templateName)) { - return true; - } - } - return false; - } -} diff --git a/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterUtilsTests.java b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterUtilsTests.java deleted file mode 100644 index 06ebe2ee30e..00000000000 --- a/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterUtilsTests.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.marvel.agent.exporter; - -import org.elasticsearch.Version; -import org.elasticsearch.test.ESTestCase; -import org.hamcrest.Matchers; -import org.junit.Test; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.MalformedURLException; -import java.net.URISyntaxException; -import java.net.URL; -import java.net.URLEncoder; -import java.util.ArrayList; -import java.util.List; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.not; - - -public class HttpESExporterUtilsTests extends ESTestCase { - - @Test - public void testLoadTemplate() { - byte[] template = HttpESExporterUtils.loadDefaultTemplate(); - assertNotNull(template); - assertThat(template.length, Matchers.greaterThan(0)); - } - - @Test - public void testParseTemplateVersionFromByteArrayTemplate() throws IOException { - byte[] template = HttpESExporterUtils.loadDefaultTemplate(); - assertNotNull(template); - - Version version = HttpESExporterUtils.parseTemplateVersion(template); - assertNotNull(version); - } - - @Test - public void testParseTemplateVersionFromStringTemplate() throws IOException { - List templates = new ArrayList<>(); - templates.add("{\"marvel_version\": \"1.4.0.Beta1\"}"); - templates.add("{\"marvel_version\": \"1.6.2-SNAPSHOT\"}"); - templates.add("{\"marvel_version\": \"1.7.1\"}"); - templates.add("{\"marvel_version\": \"2.0.0-beta1\"}"); - templates.add("{\"marvel_version\": \"2.0.0\"}"); - templates.add("{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }"); - - for (String template : templates) { - Version version = HttpESExporterUtils.parseTemplateVersion(template); - assertNotNull(version); - } - - Version version = HttpESExporterUtils.parseTemplateVersion("{\"marvel.index_format\": \"7\"}"); - assertNull(version); - } - - @Test - public void testParseVersion() throws IOException { - assertNotNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0-beta1\"}")); - assertNotNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0\"}")); - assertNotNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"1.5.2\"}")); - assertNotNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }")); - assertNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel.index_format\": \"7\"}")); - assertNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD + "unkown", "{\"marvel_version\": \"1.5.2\"}")); - } - - - @Test - public void testHostParsing() throws MalformedURLException, URISyntaxException { - URL url = HttpESExporterUtils.parseHostWithPath("localhost:9200", ""); - verifyUrl(url, "http", "localhost", 9200, "/"); - - url = HttpESExporterUtils.parseHostWithPath("localhost", "_bulk"); - verifyUrl(url, "http", "localhost", 9200, "/_bulk"); - - url = HttpESExporterUtils.parseHostWithPath("http://localhost:9200", "_bulk"); - verifyUrl(url, "http", "localhost", 9200, "/_bulk"); - - url = HttpESExporterUtils.parseHostWithPath("http://localhost", "_bulk"); - verifyUrl(url, "http", "localhost", 9200, "/_bulk"); - - url = HttpESExporterUtils.parseHostWithPath("https://localhost:9200", "_bulk"); - verifyUrl(url, "https", "localhost", 9200, "/_bulk"); - - url = HttpESExporterUtils.parseHostWithPath("https://boaz-air.local:9200", "_bulk"); - verifyUrl(url, "https", "boaz-air.local", 9200, "/_bulk"); - - url = HttpESExporterUtils.parseHostWithPath("boaz:test@localhost:9200", ""); - verifyUrl(url, "http", "localhost", 9200, "/", "boaz:test"); - - url = HttpESExporterUtils.parseHostWithPath("boaz:test@localhost", "_bulk"); - verifyUrl(url, "http", "localhost", 9200, "/_bulk", "boaz:test"); - - url = HttpESExporterUtils.parseHostWithPath("http://boaz:test@localhost:9200", "_bulk"); - verifyUrl(url, "http", "localhost", 9200, "/_bulk", "boaz:test"); - - url = HttpESExporterUtils.parseHostWithPath("http://boaz:test@localhost", "_bulk"); - verifyUrl(url, "http", "localhost", 9200, "/_bulk", "boaz:test"); - - url = HttpESExporterUtils.parseHostWithPath("https://boaz:test@localhost:9200", "_bulk"); - verifyUrl(url, "https", "localhost", 9200, "/_bulk", "boaz:test"); - - url = HttpESExporterUtils.parseHostWithPath("boaz:test@localhost:9200/suburl", ""); - verifyUrl(url, "http", "localhost", 9200, "/suburl/", "boaz:test"); - - url = HttpESExporterUtils.parseHostWithPath("boaz:test@localhost:9200/suburl/", ""); - verifyUrl(url, "http", "localhost", 9200, "/suburl/", "boaz:test"); - - url = HttpESExporterUtils.parseHostWithPath("localhost/suburl", "_bulk"); - verifyUrl(url, "http", "localhost", 9200, "/suburl/_bulk"); - - url = HttpESExporterUtils.parseHostWithPath("http://boaz:test@localhost:9200/suburl/suburl1", "_bulk"); - verifyUrl(url, "http", "localhost", 9200, "/suburl/suburl1/_bulk", "boaz:test"); - - url = HttpESExporterUtils.parseHostWithPath("http://boaz:test@localhost/suburl", "_bulk"); - verifyUrl(url, "http", "localhost", 9200, "/suburl/_bulk", "boaz:test"); - - url = HttpESExporterUtils.parseHostWithPath("https://boaz:test@localhost:9200/suburl", "_bulk"); - verifyUrl(url, "https", "localhost", 9200, "/suburl/_bulk", "boaz:test"); - - url = HttpESExporterUtils.parseHostWithPath("https://user:test@server_with_underscore:9300", "_bulk"); - verifyUrl(url, "https", "server_with_underscore", 9300, "/_bulk", "user:test"); - - url = HttpESExporterUtils.parseHostWithPath("user:test@server_with_underscore:9300", "_bulk"); - verifyUrl(url, "http", "server_with_underscore", 9300, "/_bulk", "user:test"); - - url = HttpESExporterUtils.parseHostWithPath("server_with_underscore:9300", "_bulk"); - verifyUrl(url, "http", "server_with_underscore", 9300, "/_bulk"); - - url = HttpESExporterUtils.parseHostWithPath("server_with_underscore", "_bulk"); - verifyUrl(url, "http", "server_with_underscore", 9200, "/_bulk"); - - url = HttpESExporterUtils.parseHostWithPath("https://user:test@server-dash:9300", "_bulk"); - verifyUrl(url, "https", "server-dash", 9300, "/_bulk", "user:test"); - - url = HttpESExporterUtils.parseHostWithPath("user:test@server-dash:9300", "_bulk"); - verifyUrl(url, "http", "server-dash", 9300, "/_bulk", "user:test"); - - url = HttpESExporterUtils.parseHostWithPath("server-dash:9300", "_bulk"); - verifyUrl(url, "http", "server-dash", 9300, "/_bulk"); - - url = HttpESExporterUtils.parseHostWithPath("server-dash", "_bulk"); - verifyUrl(url, "http", "server-dash", 9200, "/_bulk"); - } - - @Test - public void sanitizeUrlPadTest() throws UnsupportedEncodingException { - String pwd = URLEncoder.encode(randomRealisticUnicodeOfCodepointLengthBetween(3, 20), "UTF-8"); - String[] inputs = new String[]{ - "https://boaz:" + pwd + "@hostname:9200", - "http://boaz:" + pwd + "@hostname:9200", - "boaz:" + pwd + "@hostname", - "boaz:" + pwd + "@hostname/hello", - "Parse exception in [boaz:" + pwd + "@hostname:9200,boaz1:" + pwd + "@hostname \n" + - "caused: by exception ,boaz1:" + pwd + "@hostname", - "failed to upload index template, stopping export\n" + - "java.lang.RuntimeException: failed to load/verify index template\n" + - " at org.elasticsearch.marvel.agent.exporter.ESExporter.checkAndUploadIndexTemplate(ESExporter.java:525)\n" + - " at org.elasticsearch.marvel.agent.exporter.ESExporter.openExportingConnection(ESExporter.java:213)\n" + - " at org.elasticsearch.marvel.agent.exporter.ESExporter.exportXContent(ESExporter.java:285)\n" + - " at org.elasticsearch.marvel.agent.exporter.ESExporter.exportClusterStats(ESExporter.java:206)\n" + - " at org.elasticsearch.marvel.agent.AgentService$ExportingWorker.exportClusterStats(AgentService.java:288)\n" + - " at org.elasticsearch.marvel.agent.AgentService$ExportingWorker.run(AgentService.java:245)\n" + - " at java.lang.Thread.run(Thread.java:745)\n" + - "Caused by: java.io.IOException: Server returned HTTP response code: 401 for URL: http://marvel_exporter:" + pwd + "@localhost:9200/_template/marvel\n" + - " at sun.reflect.GeneratedConstructorAccessor3.createMarvelDoc(Unknown Source)\n" + - " at sun.reflect.DelegatingConstructorAccessorImpl.createMarvelDoc(DelegatingConstructorAccessorImpl.java:45)\n" + - " at java.lang.reflect.Constructor.createMarvelDoc(Constructor.java:526)\n" + - " at sun.net.www.protocol.http.HttpURLConnection$6.run(HttpURLConnection.java:1675)\n" + - " at sun.net.www.protocol.http.HttpURLConnection$6.run(HttpURLConnection.java:1673)\n" + - " at java.security.AccessController.doPrivileged(Native Method)\n" + - " at sun.net.www.protocol.http.HttpURLConnection.getChainedException(HttpURLConnection.java:1671)\n" + - " at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1244)\n" + - " at org.elasticsearch.marvel.agent.exporter.ESExporter.checkAndUploadIndexTemplate(ESExporter.java:519)\n" + - " ... 6 more\n" + - "Caused by: java.io.IOException: Server returned HTTP response code: 401 for URL: http://marvel_exporter:" + pwd + "@localhost:9200/_template/marvel\n" + - " at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1626)\n" + - " at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:468)\n" + - " at org.elasticsearch.marvel.agent.exporter.ESExporter.checkAndUploadIndexTemplate(ESExporter.java:514)\n" + - " ... 6 more" - }; - - for (String input : inputs) { - String sanitized = HttpESExporterUtils.santizeUrlPwds(input); - assertThat(sanitized, not(containsString(pwd))); - } - } - - void verifyUrl(URL url, String protocol, String host, int port, String path) throws URISyntaxException { - assertThat(url.getProtocol(), equalTo(protocol)); - assertThat(url.getHost(), equalTo(host)); - assertThat(url.getPort(), equalTo(port)); - assertThat(url.toURI().getPath(), equalTo(path)); - } - - void verifyUrl(URL url, String protocol, String host, int port, String path, String userInfo) throws URISyntaxException { - verifyUrl(url, protocol, host, port, path); - assertThat(url.getUserInfo(), equalTo(userInfo)); - - } -} diff --git a/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporterTests.java b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporterTests.java new file mode 100644 index 00000000000..32c56587414 --- /dev/null +++ b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporterTests.java @@ -0,0 +1,355 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.agent.exporter.http; + +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.license.plugin.LicensePlugin; +import org.elasticsearch.marvel.MarvelPlugin; +import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector; +import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMarvelDoc; +import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryCollector; +import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryMarvelDoc; +import org.elasticsearch.marvel.agent.exporter.Exporters; +import org.elasticsearch.marvel.agent.exporter.MarvelDoc; +import org.elasticsearch.marvel.agent.settings.MarvelSettings; +import org.elasticsearch.node.Node; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.InternalTestCluster; +import org.hamcrest.Matchers; +import org.joda.time.format.DateTimeFormat; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + + +// Transport Client instantiation also calls the marvel plugin, which then fails to find modules +@ClusterScope(transportClientRatio = 0.0, scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) +@ESIntegTestCase.SuppressLocalMode +public class HttpExporterTests extends ESIntegTestCase { + + final static AtomicLong timeStampGenerator = new AtomicLong(); + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(LicensePlugin.class, MarvelPlugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return nodePlugins(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(Node.HTTP_ENABLED, true) + .put("shield.enabled", false) + .build(); + } + + @Test + public void testSimpleExport() throws Exception { + TargetNode target = TargetNode.start(internalCluster()); + + Settings.Builder builder = Settings.builder() + .put("marvel.agent.exporters._http.type", "http") + .put("marvel.agent.exporters._http.host", target.httpAddress); + String agentNode = internalCluster().startNode(builder); + ensureGreen(); + HttpExporter exporter = getExporter(agentNode); + MarvelDoc doc = newRandomMarvelDoc(); + exporter.export(Collections.singletonList(doc)); + + flush(); + refresh(); + + SearchResponse response = client().prepareSearch(".marvel-es-*").setTypes(doc.type()).get(); + assertThat(response, notNullValue()); + assertThat(response.getHits().totalHits(), is(1L)); + } + +// TODO not sure what this tests... in any case... I'm pretty sure we can remove it +// @Test +// public void testHttpServerOff() { +// Settings.Builder builder = Settings.builder() +// .put(MarvelSettings.STARTUP_DELAY, "200m") +// .put(Node.HTTP_ENABLED, false); +// internalCluster().startNode(builder); +// HttpExporter httpEsExporter = getEsExporter(); +// logger.info("trying exporting despite of no target"); +// httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc())); +// } +// + @Test + public void testTemplateAdditionDespiteOfLateClusterForming() throws Exception { + + TargetNode target = TargetNode.start(internalCluster()); + + Settings.Builder builder = Settings.builder() + .put(MarvelSettings.STARTUP_DELAY, "200m") + .put(Node.HTTP_ENABLED, true) + .put("discovery.type", "zen") + .put("discovery.zen.ping_timeout", "1s") + .put("discovery.initial_state_timeout", "100ms") + .put("discovery.zen.minimum_master_nodes", 2) + .put("marvel.agent.exporters._http.type", "http") + .put("marvel.agent.exporters._http.host", target.httpAddress) + .put("marvel.agent.exporters._http." + HttpExporter.BULK_TIMEOUT_SETTING, "1s") + .put("marvel.agent.exporters._http." + HttpExporter.TEMPLATE_CHECK_TIMEOUT_SETTING, "1s"); + + String nodeName = internalCluster().startNode(builder); + + HttpExporter exporter = getExporter(nodeName); + logger.info("exporting events while there is no cluster"); + exporter.export(Collections.singletonList(newRandomMarvelDoc())); + + logger.info("bringing up a second node"); + internalCluster().startNode(builder); + ensureGreen(); + logger.info("exporting a second event"); + exporter.export(Collections.singletonList(newRandomMarvelDoc())); + + logger.info("verifying that template has been created"); + assertMarvelTemplateExists(); + } + + @Test + public void testDynamicHostChange() { + + // disable exporting to be able to use non valid hosts + Settings.Builder builder = Settings.builder() + .put(MarvelSettings.INTERVAL, "-1") + .put("marvel.agent.exporters._http.type", "http") + .put("marvel.agent.exporters._http.host", "test0"); + + String nodeName = internalCluster().startNode(builder); + + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .putArray("marvel.agent.exporters._http.host", "test1"))); + assertThat(getExporter(nodeName).hosts, Matchers.arrayContaining("test1")); + + // wipes the non array settings + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .putArray("marvel.agent.exporters._http.host", "test2") + .put("marvel.agent.exporters._http.host", ""))); + assertThat(getExporter(nodeName).hosts, Matchers.arrayContaining("test2")); + + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .putArray("marvel.agent.exporters._http.host", "test3"))); + assertThat(getExporter(nodeName).hosts, Matchers.arrayContaining("test3")); + } + + @Test + public void testHostChangeReChecksTemplate() throws Exception { + + TargetNode targetNode = TargetNode.start(internalCluster()); + + Settings.Builder builder = Settings.builder() + .put(MarvelSettings.STARTUP_DELAY, "200m") + .put("marvel.agent.exporters._http.type", "http") + .put("marvel.agent.exporters._http.host", targetNode.httpAddress); + + String agentNode = internalCluster().startNode(builder); + + HttpExporter exporter = getExporter(agentNode); + + logger.info("exporting an event"); + exporter.export(Collections.singletonList(newRandomMarvelDoc())); + + logger.info("removing the marvel template"); + assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get()); + assertMarvelTemplateNotExists(); + + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings( + Settings.builder().putArray("marvel.agent.exporters._http.host", exporter.hosts)).get()); + + // a new exporter is created on update, so we need to re-fetch it + exporter = getExporter(agentNode); + + logger.info("exporting a second event"); + exporter.export(Collections.singletonList(newRandomMarvelDoc())); + + logger.info("verifying that template has been created"); + assertMarvelTemplateExists(); + } + + @Test + public void testHostFailureChecksTemplate() throws Exception { + + TargetNode target0 = TargetNode.start(internalCluster()); + assertThat(target0.name, is(internalCluster().getMasterName())); + + TargetNode target1 = TargetNode.start(internalCluster()); + + // lets start node0 & node1 first, such that node0 will be the master (it's first to start) + final String node0 = internalCluster().startNode(Settings.builder() + .put(MarvelSettings.STARTUP_DELAY, "200m") + .put("marvel.agent.exporters._http.type", "http") + .putArray("marvel.agent.exporters._http.host", target0.httpAddress, target1.httpAddress)); + + HttpExporter exporter = getExporter(node0); + + logger.info("--> exporting events to have new settings take effect"); + exporter.export(Collections.singletonList(newRandomMarvelDoc())); + + logger.info("verifying that template has been created"); + assertMarvelTemplateExists(); + + logger.info("--> removing the marvel template"); + assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get()); + assertMarvelTemplateNotExists(); + + logger.info("--> shutting down target0"); + assertThat(target0.name, is(internalCluster().getMasterName())); // just to be sure it's still the master + internalCluster().stopCurrentMasterNode(); + + // we use assert busy node because url caching may cause the node failure to be only detected while sending the event + assertBusy(new Runnable() { + @Override + public void run() { + try { + logger.info("--> exporting events from node0"); + getExporter(node0).export(Collections.singletonList(newRandomMarvelDoc())); + } catch (Exception e) { + e.printStackTrace(); + fail("failed to export event from node0"); + } + logger.debug("--> checking for template"); + assertMarvelTemplateExists(); + logger.debug("--> template exists"); + } + }); + } + + @Test + public void testDynamicIndexFormatChange() throws Exception { + + TargetNode targetNode = TargetNode.start(internalCluster()); + + Settings.Builder builder = Settings.builder() + .put(MarvelSettings.STARTUP_DELAY, "200m") + .put("marvel.agent.exporters._http.type", "http") + .put("marvel.agent.exporters._http.host", targetNode.httpAddress); + + String agentNode = internalCluster().startNode(builder); + + logger.info("exporting a first event"); + HttpExporter exporter = getExporter(agentNode); + exporter.export(Collections.singletonList(newRandomMarvelDoc())); + + String indexName = exporter.getIndexName(); + logger.info("checks that the index [{}] is created", indexName); + assertTrue(client().admin().indices().prepareExists(indexName).get().isExists()); + + String newTimeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM"); + logger.info("updating index time format setting to {}", newTimeFormat); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put("marvel.agent.exporters._http.index.name.time_format", newTimeFormat))); + + exporter = getExporter(agentNode); + + logger.info("exporting a second event"); + exporter.export(Collections.singletonList(newRandomMarvelDoc())); + + String expectedMarvelIndex = MarvelSettings.MARVEL_INDICES_PREFIX + + DateTimeFormat.forPattern(newTimeFormat).withZoneUTC().print(System.currentTimeMillis()); + + logger.info("checks that the index [{}] is created", expectedMarvelIndex); + assertTrue(client().admin().indices().prepareExists(expectedMarvelIndex).get().isExists()); + + logger.info("verifying that template has been created"); + assertMarvelTemplateExists(); + } + + @Test + public void testLoadRemoteClusterVersion() { + + TargetNode targetNode = TargetNode.start(internalCluster()); + + Settings.Builder builder = Settings.builder() + .put(MarvelSettings.STARTUP_DELAY, "200m") + .put("marvel.agent.exporters._http.type", "http") + .put("marvel.agent.exporters._http.host", targetNode.httpAddress); + + String agentNode = internalCluster().startNode(builder); + + HttpExporter exporter = getExporter(agentNode); + + logger.info("--> loading remote cluster version"); + Version resolved = exporter.loadRemoteClusterVersion(targetNode.httpAddress); + assertTrue(resolved.equals(Version.CURRENT)); + } + + private HttpExporter getExporter(String nodeName) { + Exporters exporters = internalCluster().getInstance(Exporters.class, nodeName); + return (HttpExporter) exporters.iterator().next(); + } + + private MarvelDoc newRandomMarvelDoc() { + if (randomBoolean()) { + return new IndexRecoveryMarvelDoc(internalCluster().getClusterName(), + IndexRecoveryCollector.TYPE, timeStampGenerator.incrementAndGet(), new RecoveryResponse()); + } else { + return new ClusterStateMarvelDoc(internalCluster().getClusterName(), + ClusterStateCollector.TYPE, timeStampGenerator.incrementAndGet(), ClusterState.PROTO, ClusterHealthStatus.GREEN); + } + } + + private void assertMarvelTemplateExists() { + assertTrue("marvel template must exists", isTemplateExists("marvel")); + } + + private void assertMarvelTemplateNotExists() { + assertFalse("marvel template must not exists", isTemplateExists("marvel")); + } + + private boolean isTemplateExists(String templateName) { + for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(templateName).get().getIndexTemplates()) { + if (template.getName().equals(templateName)) { + return true; + } + } + return false; + } + + static class TargetNode { + + private final String name; + private final TransportAddress address; + private final String httpAddress; + private final Client client; + + private TargetNode(InternalTestCluster cluster) { + name = cluster.startNode(); + address = cluster.getInstance(HttpServerTransport.class, name).boundAddress().publishAddress(); + httpAddress = address.getHost() + ":" + address.getPort(); + this.client = cluster.client(name); + } + + static TargetNode start(InternalTestCluster cluster) { + return new TargetNode(cluster); + } + } +} diff --git a/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporterUtilsTests.java b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporterUtilsTests.java new file mode 100644 index 00000000000..bc7e47945c9 --- /dev/null +++ b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporterUtilsTests.java @@ -0,0 +1,128 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.agent.exporter.http; + +import org.elasticsearch.Version; +import org.elasticsearch.test.ESTestCase; +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; + + +public class HttpExporterUtilsTests extends ESTestCase { + + @Test + public void testLoadTemplate() { + byte[] template = HttpExporterUtils.loadDefaultTemplate(); + assertNotNull(template); + assertThat(template.length, Matchers.greaterThan(0)); + } + + @Test + public void testParseTemplateVersionFromByteArrayTemplate() throws IOException { + byte[] template = HttpExporterUtils.loadDefaultTemplate(); + assertNotNull(template); + + Version version = HttpExporterUtils.parseTemplateVersion(template); + assertNotNull(version); + } + + @Test + public void testParseTemplateVersionFromStringTemplate() throws IOException { + List templates = new ArrayList<>(); + templates.add("{\"marvel_version\": \"1.4.0.Beta1\"}"); + templates.add("{\"marvel_version\": \"1.6.2-SNAPSHOT\"}"); + templates.add("{\"marvel_version\": \"1.7.1\"}"); + templates.add("{\"marvel_version\": \"2.0.0-beta1\"}"); + templates.add("{\"marvel_version\": \"2.0.0\"}"); + templates.add("{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }"); + + for (String template : templates) { + Version version = HttpExporterUtils.parseTemplateVersion(template); + assertNotNull(version); + } + + Version version = HttpExporterUtils.parseTemplateVersion("{\"marvel.index_format\": \"7\"}"); + assertNull(version); + } + + @Test + public void testParseVersion() throws IOException { + assertNotNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0-beta1\"}")); + assertNotNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0\"}")); + assertNotNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"1.5.2\"}")); + assertNotNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD, "{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }")); + assertNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel.index_format\": \"7\"}")); + assertNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD + "unkown", "{\"marvel_version\": \"1.5.2\"}")); + } + + + @Test + public void testHostParsing() throws MalformedURLException, URISyntaxException { + URL url = HttpExporterUtils.parseHostWithPath("localhost:9200", ""); + verifyUrl(url, "http", "localhost", 9200, "/"); + + url = HttpExporterUtils.parseHostWithPath("localhost", "_bulk"); + verifyUrl(url, "http", "localhost", 9200, "/_bulk"); + + url = HttpExporterUtils.parseHostWithPath("http://localhost:9200", "_bulk"); + verifyUrl(url, "http", "localhost", 9200, "/_bulk"); + + url = HttpExporterUtils.parseHostWithPath("http://localhost", "_bulk"); + verifyUrl(url, "http", "localhost", 9200, "/_bulk"); + + url = HttpExporterUtils.parseHostWithPath("https://localhost:9200", "_bulk"); + verifyUrl(url, "https", "localhost", 9200, "/_bulk"); + + url = HttpExporterUtils.parseHostWithPath("https://boaz-air.local:9200", "_bulk"); + verifyUrl(url, "https", "boaz-air.local", 9200, "/_bulk"); + + url = HttpExporterUtils.parseHostWithPath("localhost:9200/suburl", ""); + verifyUrl(url, "http", "localhost", 9200, "/suburl/"); + + url = HttpExporterUtils.parseHostWithPath("localhost/suburl", "_bulk"); + verifyUrl(url, "http", "localhost", 9200, "/suburl/_bulk"); + + url = HttpExporterUtils.parseHostWithPath("http://localhost:9200/suburl/suburl1", "_bulk"); + verifyUrl(url, "http", "localhost", 9200, "/suburl/suburl1/_bulk"); + + url = HttpExporterUtils.parseHostWithPath("https://localhost:9200/suburl", "_bulk"); + verifyUrl(url, "https", "localhost", 9200, "/suburl/_bulk"); + + url = HttpExporterUtils.parseHostWithPath("https://server_with_underscore:9300", "_bulk"); + verifyUrl(url, "https", "server_with_underscore", 9300, "/_bulk"); + + url = HttpExporterUtils.parseHostWithPath("server_with_underscore:9300", "_bulk"); + verifyUrl(url, "http", "server_with_underscore", 9300, "/_bulk"); + + url = HttpExporterUtils.parseHostWithPath("server_with_underscore", "_bulk"); + verifyUrl(url, "http", "server_with_underscore", 9200, "/_bulk"); + + url = HttpExporterUtils.parseHostWithPath("https://server-dash:9300", "_bulk"); + verifyUrl(url, "https", "server-dash", 9300, "/_bulk"); + + url = HttpExporterUtils.parseHostWithPath("server-dash:9300", "_bulk"); + verifyUrl(url, "http", "server-dash", 9300, "/_bulk"); + + url = HttpExporterUtils.parseHostWithPath("server-dash", "_bulk"); + verifyUrl(url, "http", "server-dash", 9200, "/_bulk"); + } + + void verifyUrl(URL url, String protocol, String host, int port, String path) throws URISyntaxException { + assertThat(url.getProtocol(), equalTo(protocol)); + assertThat(url.getHost(), equalTo(host)); + assertThat(url.getPort(), equalTo(port)); + assertThat(url.toURI().getPath(), equalTo(path)); + } +} diff --git a/marvel/src/test/java/org/elasticsearch/marvel/agent/renderer/cluster/ClusterStateIT.java b/marvel/src/test/java/org/elasticsearch/marvel/agent/renderer/cluster/ClusterStateIT.java index 7e57614b2cd..4f9f131092f 100644 --- a/marvel/src/test/java/org/elasticsearch/marvel/agent/renderer/cluster/ClusterStateIT.java +++ b/marvel/src/test/java/org/elasticsearch/marvel/agent/renderer/cluster/ClusterStateIT.java @@ -9,7 +9,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector; -import org.elasticsearch.marvel.agent.exporter.HttpESExporterUtils; +import org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils; import org.elasticsearch.marvel.agent.renderer.AbstractRendererTestCase; import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.search.SearchHit; @@ -60,7 +60,7 @@ public class ClusterStateIT extends AbstractRendererTestCase { @Test public void testNoNodesIndexing() throws Exception { logger.debug("--> forcing marvel's index template update"); - assertAcked(client().admin().indices().preparePutTemplate("marvel").setSource(HttpESExporterUtils.loadDefaultTemplate()).execute().actionGet()); + assertAcked(client().admin().indices().preparePutTemplate("marvel").setSource(HttpExporterUtils.loadDefaultTemplate()).execute().actionGet()); logger.debug("--> deleting all marvel indices"); cluster().wipeIndices(MarvelSettings.MARVEL_INDICES_PREFIX + "*");