Adding multiple exporter types

- an exporter is no longer fixed construct. Instead, this commit creates an `Exporter` abstraction. An exporter has a type and is responsible for exporting marvel documents.
- multiple exporters can be configured, each can be of a different type.
- an exporter can be of a "singleton" type, indicating that there can only be one exporter of that type at any point in time.
- exporters are configured in the settings. Each exporters is identified by a unique name/id and has its own settings (the type of the exporter defines what settings that exporter expects)
- exporters are loaded at start up time, but it is also possible to update the exporter settings at runtime (all exporter settings are registered as dynamic cluster settings). When updated, all the exporters will be rebuild & reloaded.
- there are two types of exporters: `http` and `local`
- `http` exporter exports documents to external elasticsearch clusters using the REST API
- the new `http` exporter doesn't support user info as part of the URL (it's pron to security info leak). Instead a new `auth.username` and `auth.password` settings were added that can be set per `http` exporter (when shield is installed, the passwords are filtered out from the node info API)
- `local` exporter exports documents to the same cluster the agent is installed on, using the internal node client.
- when no exporter is configured, a default `local` exporter is created (for best ootb experience)

Original commit: elastic/x-pack-elasticsearch@d1d7c8aefd
This commit is contained in:
uboness 2015-09-23 06:49:52 +02:00
parent 334e090902
commit 7206de9c2a
16 changed files with 1297 additions and 1059 deletions

View File

@ -7,7 +7,6 @@ package org.elasticsearch.marvel;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.settings.Validator;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.logging.ESLogger;
@ -16,7 +15,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.AgentService;
import org.elasticsearch.marvel.agent.collector.CollectorModule;
import org.elasticsearch.marvel.agent.exporter.ExporterModule;
import org.elasticsearch.marvel.agent.exporter.HttpESExporter;
import org.elasticsearch.marvel.agent.exporter.Exporters;
import org.elasticsearch.marvel.agent.renderer.RendererModule;
import org.elasticsearch.marvel.agent.settings.MarvelModule;
import org.elasticsearch.marvel.agent.settings.MarvelSetting;
@ -97,14 +96,7 @@ public class MarvelPlugin extends Plugin {
}
public void onModule(ClusterModule module) {
// HttpESExporter
module.registerClusterDynamicSetting(HttpESExporter.SETTINGS_INDEX_TIME_FORMAT, Validator.EMPTY);
module.registerClusterDynamicSetting(HttpESExporter.SETTINGS_HOSTS, Validator.EMPTY);
module.registerClusterDynamicSetting(HttpESExporter.SETTINGS_HOSTS + ".*", Validator.EMPTY);
module.registerClusterDynamicSetting(HttpESExporter.SETTINGS_TIMEOUT, Validator.EMPTY);
module.registerClusterDynamicSetting(HttpESExporter.SETTINGS_READ_TIMEOUT, Validator.EMPTY);
module.registerClusterDynamicSetting(HttpESExporter.SETTINGS_SSL_HOSTNAME_VERIFICATION, Validator.EMPTY);
Exporters.registerDynamicSettings(module);
// MarvelSettingsService
for (MarvelSetting setting : MarvelSettings.dynamicSettings()) {
module.registerClusterDynamicSetting(setting.dynamicSettingName(), setting.dynamicValidator());

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.marvel.agent.collector.Collector;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterInfoCollector;
import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.Exporters;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.license.LicenseService;
@ -32,18 +33,18 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> imple
private final MarvelSettings marvelSettings;
private final Collection<Collector> collectors;
private final Collection<Exporter> exporters;
private final Exporters exporters;
@Inject
public AgentService(Settings settings, NodeSettingsService nodeSettingsService,
LicenseService licenseService, MarvelSettings marvelSettings,
Set<Collector> collectors, Set<Exporter> exporters) {
Set<Collector> collectors, Exporters exporters) {
super(settings);
this.marvelSettings = marvelSettings;
this.samplingInterval = marvelSettings.interval().millis();
this.collectors = Collections.unmodifiableSet(filterCollectors(collectors, marvelSettings.collectors()));
this.exporters = Collections.unmodifiableSet(exporters);
this.exporters = exporters;
nodeSettingsService.addListener(this);
logger.trace("marvel is running in [{}] mode", licenseService.mode());
@ -88,26 +89,15 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> imple
@Override
protected void doStart() {
if (exporters.size() == 0) {
return;
}
for (Collector collector : collectors) {
collector.start();
}
for (Exporter exporter : exporters) {
exporter.start();
}
exporters.start();
applyIntervalSettings();
}
@Override
protected void doStop() {
if (exporters.size() == 0) {
return;
}
if (workerThread != null && workerThread.isAlive()) {
exportingWorker.closed = true;
workerThread.interrupt();
@ -123,9 +113,7 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> imple
collector.stop();
}
for (Exporter exporter : exporters) {
exporter.stop();
}
exporters.stop();
}
@Override
@ -139,11 +127,6 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> imple
}
}
// used for testing
public Collection<Exporter> getExporters() {
return exporters;
}
@Override
public void onRefreshSettings(Settings settings) {
TimeValue newSamplingInterval = settings.getAsTime(MarvelSettings.INTERVAL, null);

View File

@ -1,88 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import java.util.Collection;
public abstract class AbstractExporter<T> extends AbstractLifecycleComponent<T> implements Exporter<T> {
private final String name;
protected final ClusterService clusterService;
@Inject
public AbstractExporter(Settings settings, String name, ClusterService clusterService) {
super(settings);
this.name = name;
this.clusterService = clusterService;
}
@Override
public String name() {
return name;
}
@Override
public T start() {
logger.debug("starting exporter [{}]", name());
return super.start();
}
@Override
protected void doStart() {
}
protected boolean masterOnly() {
return false;
}
@Override
public void export(Collection<MarvelDoc> marvelDocs) {
if (masterOnly() && !clusterService.state().nodes().localNodeMaster()) {
logger.trace("exporter [{}] runs on master only", name());
return;
}
if (marvelDocs == null) {
logger.debug("no objects to export for [{}]", name());
return;
}
try {
doExport(marvelDocs);
} catch (Exception e) {
logger.error("export [{}] throws exception when exporting data", e, name());
}
}
protected abstract void doExport(Collection<MarvelDoc> marvelDocs) throws Exception;
@Override
public T stop() {
logger.debug("stopping exporter [{}]", name());
return super.stop();
}
@Override
protected void doStop() {
}
@Override
public void close() {
logger.trace("closing exporter [{}]", name());
super.close();
}
@Override
protected void doClose() {
}
}

View File

@ -5,13 +5,98 @@
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.shield.MarvelSettingsFilter;
import java.util.Collection;
public interface Exporter<T> extends LifecycleComponent<T> {
public abstract class Exporter {
String name();
protected final String type;
protected final Config config;
protected final ESLogger logger;
public Exporter(String type, Config config) {
this.type = type;
this.config = config;
this.logger = config.logger(getClass());
}
public String type() {
return type;
}
public String name() {
return config.name;
}
public boolean masterOnly() {
return false;
}
public abstract void export(Collection<MarvelDoc> marvelDocs) throws Exception;
public abstract void close();
protected String settingFQN(String setting) {
return Exporters.EXPORTERS_SETTING + "." + config.name + "." + setting;
}
public static class Config {
private final String name;
private final boolean enabled;
private final Settings globalSettings;
private final Settings settings;
public Config(String name, Settings globalSettings, Settings settings) {
this.name = name;
this.globalSettings = globalSettings;
this.settings = settings;
this.enabled = settings.getAsBoolean("enabled", true);
}
public String name() {
return name;
}
public boolean enabled() {
return enabled;
}
public Settings settings() {
return settings;
}
public ESLogger logger(Class clazz) {
return Loggers.getLogger(clazz, globalSettings);
}
}
public static abstract class Factory<E extends Exporter> {
private final String type;
private final boolean singleton;
public Factory(String type, boolean singleton) {
this.type = type;
this.singleton = singleton;
}
public String type() {
return type;
}
public boolean singleton() {
return singleton;
}
public void filterOutSensitiveSettings(String prefix, MarvelSettingsFilter filter) {
}
public abstract E create(Config config);
}
void export(Collection<MarvelDoc> marvelDocs);
}

View File

@ -6,37 +6,38 @@
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.exporter.http.HttpExporter;
import org.elasticsearch.marvel.agent.exporter.local.LocalExporter;
import java.util.HashSet;
import java.util.Set;
import java.util.HashMap;
import java.util.Map;
public class ExporterModule extends AbstractModule {
private final Set<Class<? extends Exporter>> exporters = new HashSet<>();
private final Map<String, Class<? extends Exporter.Factory<? extends Exporter>>> exporterFactories = new HashMap<>();
private final Settings settings;
public ExporterModule(Settings settings) {
this.settings = settings;
// TODO do we need to choose what exporters to bind based on settings?
// Registers default exporter
registerExporter(HttpESExporter.class);
registerExporter(HttpExporter.TYPE, HttpExporter.Factory.class);
registerExporter(LocalExporter.TYPE, LocalExporter.Factory.class);
}
@Override
protected void configure() {
Multibinder<Exporter> binder = Multibinder.newSetBinder(binder(), Exporter.class);
for (Class<? extends Exporter> exporter : exporters) {
bind(exporter).asEagerSingleton();
binder.addBinding().to(exporter);
bind(Exporters.class).asEagerSingleton();
MapBinder<String, Exporter.Factory> factoryBinder = MapBinder.newMapBinder(binder(), String.class, Exporter.Factory.class);
for (Map.Entry<String, Class<? extends Exporter.Factory<? extends Exporter>>> entry : exporterFactories.entrySet()) {
bind(entry.getValue()).asEagerSingleton();
factoryBinder.addBinding(entry.getKey()).to(entry.getValue());
}
}
public void registerExporter(Class<? extends Exporter> exporter) {
exporters.add(exporter);
public void registerExporter(String type, Class<? extends Exporter.Factory<? extends Exporter>> factory) {
exporterFactories.put(type, factory);
}
}

View File

@ -0,0 +1,197 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.settings.Validator;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.marvel.agent.exporter.local.LocalExporter;
import org.elasticsearch.marvel.shield.MarvelSettingsFilter;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.*;
/**
*
*/
public class Exporters extends AbstractLifecycleComponent<Exporters> implements Iterable<Exporter>, NodeSettingsService.Listener {
static final String EXPORTERS_SETTING = "marvel.agent.exporters";
private final Map<String, Exporter.Factory> factories;
private final MarvelSettingsFilter settingsFilter;
private final ClusterService clusterService;
private volatile InternalExporters exporters = InternalExporters.EMPTY;
@Inject
public Exporters(Settings settings, Map<String, Exporter.Factory> factories,
MarvelSettingsFilter settingsFilter, ClusterService clusterService,
NodeSettingsService nodeSettingsService) {
super(settings);
this.factories = factories;
this.settingsFilter = settingsFilter;
this.clusterService = clusterService;
nodeSettingsService.addListener(this);
}
@Override
protected void doStart() {
exporters = initExporters(settings.getAsSettings(EXPORTERS_SETTING));
}
@Override
protected void doStop() {
ElasticsearchException exception = null;
for (Exporter exporter : exporters) {
try {
exporter.close();
} catch (Exception e) {
logger.error("exporter [{}] failed to close cleanly", e, exporter.name());
if (exception == null) {
exception = new ElasticsearchException("failed to cleanly close exporters");
}
exception.addSuppressed(e);
}
}
if (exception != null) {
throw exception;
}
}
@Override
protected void doClose() {
}
public Exporter getExporter(String name) {
return exporters.get(name);
}
@Override
public Iterator<Exporter> iterator() {
return exporters.iterator();
}
public void export(Collection<MarvelDoc> marvelDocs) {
for (Exporter exporter : exporters) {
if (exporter.masterOnly() && !clusterService.localNode().masterNode()) {
// the exporter is supposed to only run on the master node, but we're not
// the master node... so skipping
continue;
}
try {
exporter.export(marvelDocs);
} catch (Exception e) {
logger.error("exporter [{}] failed to export marvel data", e, exporter.name());
}
}
}
@Override
public void onRefreshSettings(Settings settings) {
InternalExporters existing = exporters;
Settings updatedSettings = settings.getAsSettings(EXPORTERS_SETTING);
if (updatedSettings.names().isEmpty()) {
return;
}
this.exporters = initExporters(Settings.builder()
.put(existing.settings)
.put(updatedSettings)
.build());
existing.close(logger);
}
InternalExporters initExporters(Settings settings) {
Set<String> singletons = new HashSet<>();
Map<String, Exporter> exporters = new HashMap<>();
boolean hasDisabled = false;
for (String name : settings.names()) {
Settings exporterSettings = settings.getAsSettings(name);
String type = exporterSettings.get("type");
if (type == null) {
throw new SettingsException("missing exporter type for [" + name + "] exporter");
}
Exporter.Factory factory = factories.get(type);
if (factory == null) {
throw new SettingsException("unknown exporter type [" + type + "] set for exporter [" + name + "]");
}
factory.filterOutSensitiveSettings(EXPORTERS_SETTING + ".*.", settingsFilter);
Exporter.Config config = new Exporter.Config(name, settings, exporterSettings);
if (!config.enabled()) {
hasDisabled = true;
if (logger.isDebugEnabled()) {
logger.debug("exporter [{}/{}] is disabled", type, name);
}
continue;
}
if (factory.singleton()) {
// this is a singleton exporter factory, let's make sure we didn't already registered one
// (there can only be one instance of a singleton exporter)
if (singletons.contains(type)) {
throw new SettingsException("multiple [" + type + "] exporters are configured. there can " +
"only be one [" + type + "] exporter configured");
}
singletons.add(type);
}
exporters.put(config.name(), factory.create(config));
}
// no exporters are configured, lets create a default local one.
//
// NOTE: if there are exporters configured and they're all disabled, we don't
// fallback on the default
//
if (exporters.isEmpty() && !hasDisabled) {
Exporter.Config config = new Exporter.Config("default_" + LocalExporter.TYPE, settings, Settings.EMPTY);
exporters.put(config.name(), factories.get(LocalExporter.TYPE).create(config));
}
return new InternalExporters(settings, exporters);
}
public static void registerDynamicSettings(ClusterModule clusterModule) {
clusterModule.registerClusterDynamicSetting(EXPORTERS_SETTING + "*", Validator.EMPTY);
}
static class InternalExporters implements Iterable<Exporter> {
static final InternalExporters EMPTY = new InternalExporters(Settings.EMPTY, Collections.emptyMap());
final Settings settings;
final Map<String, Exporter> exporters;
public InternalExporters(Settings settings, Map<String, Exporter> exporters) {
this.settings = settings;
this.exporters = exporters;
}
@Override
public Iterator<Exporter> iterator() {
return exporters.values().iterator();
}
public Exporter get(String name) {
return exporters.get(name);
}
void close(ESLogger logger) {
for (Exporter exporter : exporters.values()) {
try {
exporter.close();
} catch (Exception e) {
logger.error("failed to close exporter [{}]", e, exporter.name());
}
}
}
}
}

View File

@ -1,69 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.marvel.shield.SecuredClient;
import java.util.Collection;
/**
*
*/
public class LocalExporter implements Exporter<LocalExporter> {
public static final String NAME = "local";
private final Client client;
@Inject
public LocalExporter(SecuredClient client) {
this.client = client;
}
@Override
public String name() {
return NAME;
}
@Override
public void export(Collection<MarvelDoc> marvelDocs) {
}
@Override
public Lifecycle.State lifecycleState() {
return null;
}
@Override
public void addLifecycleListener(LifecycleListener lifecycleListener) {
}
@Override
public void removeLifecycleListener(LifecycleListener lifecycleListener) {
}
@Override
public LocalExporter start() {
return null;
}
@Override
public LocalExporter stop() {
return null;
}
@Override
public void close() {
}
}

View File

@ -3,35 +3,30 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
package org.elasticsearch.marvel.agent.exporter.http;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment;
import org.elasticsearch.http.HttpServer;
import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.agent.renderer.Renderer;
import org.elasticsearch.marvel.agent.renderer.RendererRegistry;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.marvel.shield.MarvelSettingsFilter;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@ -48,142 +43,158 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
public class HttpESExporter extends AbstractExporter<HttpESExporter> implements NodeSettingsService.Listener {
/**
*
*/
public class HttpExporter extends Exporter {
private static final String NAME = "es_exporter";
public static final String TYPE = "http";
private static final String SETTINGS_PREFIX = "marvel.agent.exporter.es.";
public static final String SETTINGS_HOSTS = SETTINGS_PREFIX + "hosts";
public static final String SETTINGS_INDEX_TIME_FORMAT = SETTINGS_PREFIX + "index.timeformat";
public static final String SETTINGS_TIMEOUT = SETTINGS_PREFIX + "timeout";
public static final String SETTINGS_READ_TIMEOUT = SETTINGS_PREFIX + "read_timeout";
public static final String HOST_SETTING = "host";
public static final String INDEX_NAME_TIME_FORMAT_SETTING = "index.name.time_format";
public static final String CONNECTION_TIMEOUT_SETTING = "connection.timeout";
public static final String CONNECTION_READ_TIMEOUT_SETTING = "connection.read_timeout";
public static final String AUTH_USERNAME_SETTING = "auth.username";
public static final String AUTH_PASSWORD_SETTING = "auth.password";
// es level timeout used when checking and writing templates (used to speed up tests)
public static final String SETTINGS_CHECK_TEMPLATE_TIMEOUT = SETTINGS_PREFIX + ".template.master_timeout";
public static final String TEMPLATE_CHECK_TIMEOUT_SETTING = "index.template.master_timeout";
// es level timeout used for bulk indexing (used to speed up tests)
public static final String SETTINGS_BULK_TIMEOUT = SETTINGS_PREFIX + ".bulk.timeout";
public static final String BULK_TIMEOUT_SETTING = "bulk.timeout";
public static final String DEFAULT_INDEX_TIME_FORMAT = "YYYY.MM.dd";
public static final String SSL_SETTING = "ssl";
public static final String SSL_PROTOCOL_SETTING = SSL_SETTING + ".protocol";
public static final String SSL_TRUSTSTORE_SETTING = SSL_SETTING + ".truststore.path";
public static final String SSL_TRUSTSTORE_PASSWORD_SETTING = SSL_SETTING + ".truststore.password";
public static final String SSL_TRUSTSTORE_ALGORITHM_SETTING = SSL_SETTING + ".truststore.algorithm";
public static final String SSL_HOSTNAME_VERIFICATION_SETTING = SSL_SETTING + ".hostname_verification";
public static final String DEFAULT_INDEX_NAME_TIME_FORMAT = "YYYY.MM.dd";
/** Minimum supported version of the remote template **/
static final Version MIN_SUPPORTED_TEMPLATE_VERSOIN = Version.V_2_0_0_beta2;
/** Minimum supported version of the remote marvel cluster **/
static final Version MIN_SUPPORTED_CLUSTER_VERSION = Version.V_2_0_0_beta2;
volatile String[] hosts;
volatile boolean boundToLocalNode = false;
volatile DateTimeFormatter indexTimeFormatter;
volatile int timeoutInMillis;
volatile int readTimeoutInMillis;
final TimeValue connectionTimeout;
final TimeValue connectionReadTimeout;
final BasicAuth auth;
final DateTimeFormatter indexTimeFormatter;
/** https support * */
final SSLSocketFactory sslSocketFactory;
volatile boolean hostnameVerification;
final boolean hostnameVerification;
final ClusterService clusterService;
final ClusterName clusterName;
final NodeService nodeService;
final Environment environment;
final RendererRegistry registry;
final Environment env;
final RendererRegistry rendererRegistry;
HttpServer httpServer;
final boolean httpEnabled;
@Nullable
final TimeValue templateCheckTimeout;
@Nullable
final TimeValue bulkTimeout;
final @Nullable TimeValue templateCheckTimeout;
final @Nullable TimeValue bulkTimeout;
volatile boolean checkedAndUploadedIndexTemplate = false;
volatile boolean supportedClusterVersion = false;
/** Version of the built-in template **/
final Version templateVersion;
/** Minimum supported version of the remote template **/
final Version minCompatibleTemplateVersion = Version.V_2_0_0_beta2;
/** Minimum supported version of the remote marvel cluster **/
final Version minCompatibleClusterVersion = Version.V_2_0_0_beta2;
final ConnectionKeepAliveWorker keepAliveWorker;
Thread keepAliveThread;
@Inject
public HttpESExporter(Settings settings, ClusterService clusterService, ClusterName clusterName,
NodeSettingsService nodeSettingsService,
NodeService nodeService, Environment environment,
RendererRegistry registry) {
super(settings, NAME, clusterService);
public HttpExporter(Exporter.Config config, Environment env, RendererRegistry rendererRegistry) {
this.clusterService = clusterService;
this.clusterName = clusterName;
this.nodeService = nodeService;
this.environment = environment;
this.registry = registry;
httpEnabled = settings.getAsBoolean(Node.HTTP_ENABLED, true);
hosts = settings.getAsArray(SETTINGS_HOSTS, Strings.EMPTY_ARRAY);
super(TYPE, config);
this.env = env;
this.rendererRegistry = rendererRegistry;
hosts = config.settings().getAsArray(HOST_SETTING, Strings.EMPTY_ARRAY);
if (hosts.length == 0) {
throw new SettingsException("missing required setting [" + settingFQN(HOST_SETTING) + "]");
}
validateHosts(hosts);
String indexTimeFormat = settings.get(SETTINGS_INDEX_TIME_FORMAT, DEFAULT_INDEX_TIME_FORMAT);
auth = resolveAuth(config.settings());
String indexTimeFormat = config.settings().get(INDEX_NAME_TIME_FORMAT_SETTING, DEFAULT_INDEX_NAME_TIME_FORMAT);
try {
logger.debug("checking that index time format [{}] is correct", indexTimeFormat);
indexTimeFormatter = DateTimeFormat.forPattern(indexTimeFormat).withZoneUTC();
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid marvel index time format [" + indexTimeFormat + "] configured in setting [" + SETTINGS_INDEX_TIME_FORMAT + "]", e);
throw new IllegalArgumentException("invalid marvel index name time format [" + indexTimeFormat + "] set for [" + settingFQN(INDEX_NAME_TIME_FORMAT_SETTING) + "]", e);
}
timeoutInMillis = (int) settings.getAsTime(SETTINGS_TIMEOUT, new TimeValue(6000)).millis();
readTimeoutInMillis = (int) settings.getAsTime(SETTINGS_READ_TIMEOUT, new TimeValue(timeoutInMillis * 10)).millis();
connectionTimeout = config.settings().getAsTime(CONNECTION_TIMEOUT_SETTING, TimeValue.timeValueMillis(6000));
connectionReadTimeout = config.settings().getAsTime(CONNECTION_READ_TIMEOUT_SETTING, TimeValue.timeValueMillis(connectionTimeout.millis() * 10));
templateCheckTimeout = settings.getAsTime(SETTINGS_CHECK_TEMPLATE_TIMEOUT, null);
bulkTimeout = settings.getAsTime(SETTINGS_BULK_TIMEOUT, null);
// HORRIBLE!!! We can't use settings.getAsTime(..) !!!
// WE MUST FIX THIS IN CORE...
// TimeValue SHOULD NOT SELECTIVELY CHOOSE WHAT FIELDS TO PARSE BASED ON THEIR NAMES!!!!
String templateCheckTimeoutValue = config.settings().get(TEMPLATE_CHECK_TIMEOUT_SETTING, null);
templateCheckTimeout = TimeValue.parseTimeValue(templateCheckTimeoutValue, null, settingFQN(TEMPLATE_CHECK_TIMEOUT_SETTING));
bulkTimeout = config.settings().getAsTime(BULK_TIMEOUT_SETTING, null);
keepAliveWorker = new ConnectionKeepAliveWorker();
nodeSettingsService.addListener(this);
if (!settings.getByPrefix(SETTINGS_SSL_PREFIX).getAsMap().isEmpty()) {
sslSocketFactory = createSSLSocketFactory(settings);
} else {
logger.trace("no ssl context configured");
sslSocketFactory = null;
}
hostnameVerification = settings.getAsBoolean(SETTINGS_SSL_HOSTNAME_VERIFICATION, true);
sslSocketFactory = createSSLSocketFactory(config.settings().getAsSettings(SSL_SETTING));
hostnameVerification = config.settings().getAsBoolean(SSL_HOSTNAME_VERIFICATION_SETTING, true);
// Checks that the built-in template is versioned
templateVersion = HttpESExporterUtils.parseTemplateVersion(HttpESExporterUtils.loadDefaultTemplate());
templateVersion = HttpExporterUtils.parseTemplateVersion(HttpExporterUtils.loadDefaultTemplate());
if (templateVersion == null) {
throw new IllegalStateException("unable to find built-in template version");
}
logger.debug("initialized with targets: {}, index prefix [{}], index time format [{}], template version [{}]",
HttpESExporterUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(hosts)),
logger.debug("initialized with hosts [{}], index prefix [{}], index time format [{}], template version [{}]",
Strings.arrayToCommaDelimitedString(hosts),
MarvelSettings.MARVEL_INDICES_PREFIX, indexTimeFormat, templateVersion);
}
static private void validateHosts(String[] hosts) {
for (String host : hosts) {
try {
HttpESExporterUtils.parseHostWithPath(host, "");
} catch (URISyntaxException e) {
throw new RuntimeException("[marvel.agent.exporter] invalid host: [" + HttpESExporterUtils.santizeUrlPwds(host) + "]." +
" error: [" + HttpESExporterUtils.santizeUrlPwds(e.getMessage()) + "]");
} catch (MalformedURLException e) {
throw new RuntimeException("[marvel.agent.exporter] invalid host: [" + HttpESExporterUtils.santizeUrlPwds(host) + "]." +
" error: [" + HttpESExporterUtils.santizeUrlPwds(e.getMessage()) + "]");
@Override
public void export(Collection<MarvelDoc> marvelDocs) throws Exception {
HttpURLConnection connection = openExportingConnection();
if (connection == null) {
return;
}
if ((marvelDocs != null) && (!marvelDocs.isEmpty())) {
OutputStream os = connection.getOutputStream();
// We need to use a buffer to render each Marvel document
// because the renderer might close the outputstream (ex: XContentBuilder)
try (BytesStreamOutput buffer = new BytesStreamOutput()) {
for (MarvelDoc marvelDoc : marvelDocs) {
render(marvelDoc, buffer);
// write the result to the connection
os.write(buffer.bytes().toBytes());
buffer.reset();
}
} finally {
try {
sendCloseExportingConnection(connection);
} catch (IOException e) {
logger.error("failed sending data to [{}]: {}", connection.getURL(), ExceptionsHelper.detailedMessage(e));
throw e;
}
}
}
}
@Override
public String name() {
return NAME;
}
@Inject(optional = true)
public void setHttpServer(HttpServer httpServer) {
this.httpServer = httpServer;
public void close() {
if (keepAliveThread != null && keepAliveThread.isAlive()) {
keepAliveWorker.closed = true;
keepAliveThread.interrupt();
try {
keepAliveThread.join(6000);
} catch (InterruptedException e) {
// don't care.
}
}
}
private HttpURLConnection openExportingConnection() {
@ -200,17 +211,17 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
return conn;
}
private void render(OutputStream os, MarvelDoc marvelDoc) throws IOException {
private void render(MarvelDoc marvelDoc, OutputStream out) throws IOException {
final XContentType xContentType = XContentType.SMILE;
// Get the appropriate renderer in order to render the MarvelDoc
Renderer renderer = registry.renderer(marvelDoc.type());
Renderer renderer = rendererRegistry.renderer(marvelDoc.type());
if (renderer == null) {
logger.warn("unable to render marvel document of type [{}]: no renderer found in registry", marvelDoc.type());
logger.warn("unable to render marvel document of type [{}]. no renderer found in registry", marvelDoc.type());
return;
}
try (XContentBuilder builder = new XContentBuilder(xContentType.xContent(), os)) {
try (XContentBuilder builder = new XContentBuilder(xContentType.xContent(), out)) {
// Builds the bulk action metadata line
builder.startObject();
@ -228,23 +239,18 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
builder.endObject();
// Adds action metadata line bulk separator
renderBulkSeparator(builder);
builder.flush(); // Flush is needed here because the separator is written directly in the builder's stream
builder.stream().write(builder.contentType().xContent().streamSeparator());
// Render the MarvelDoc
renderer.render(marvelDoc,xContentType, os);
renderer.render(marvelDoc, xContentType, out);
// Adds final bulk separator
renderBulkSeparator(builder);
builder.flush();
builder.stream().write(builder.contentType().xContent().streamSeparator());
}
}
private void renderBulkSeparator(XContentBuilder builder) throws IOException {
// Flush is needed here...
builder.flush();
//... because the separator is written directly in the builder's stream
builder.stream().write(builder.contentType().xContent().streamSeparator());
}
@SuppressWarnings("unchecked")
private void sendCloseExportingConnection(HttpURLConnection conn) throws IOException {
logger.trace("sending content");
@ -273,66 +279,6 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
}
}
@Override
protected void doExport(Collection<MarvelDoc> marvelDocs) throws Exception {
HttpURLConnection connection = openExportingConnection();
if (connection == null) {
return;
}
if ((marvelDocs != null) && (!marvelDocs.isEmpty())) {
OutputStream os = connection.getOutputStream();
// We need to use a buffer to render each Marvel document
// because the renderer might close the outputstream (ex: XContentBuilder)
try (BytesStreamOutput buffer = new BytesStreamOutput()) {
for (MarvelDoc marvelDoc : marvelDocs) {
render(buffer, marvelDoc);
// write the result to the connection
os.write(buffer.bytes().toBytes());
buffer.reset();
}
} finally {
try {
sendCloseExportingConnection(connection);
} catch (IOException e) {
logger.error("error sending data to [{}]: {}", HttpESExporterUtils.santizeUrlPwds(connection.getURL()), HttpESExporterUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(e)));
}
}
}
}
@Override
protected void doStart() {
// not initializing keep alive worker here but rather upon first exporting.
// In the case we are sending metrics to the same ES as where the plugin is hosted
// we want to give it some time to start.
}
@Override
protected void doStop() {
if (keepAliveThread != null && keepAliveThread.isAlive()) {
keepAliveWorker.closed = true;
keepAliveThread.interrupt();
try {
keepAliveThread.join(6000);
} catch (InterruptedException e) {
// don't care.
}
}
}
@Override
protected void doClose() {
}
// used for testing
String[] getHosts() {
return hosts;
}
String getIndexName() {
return MarvelSettings.MARVEL_INDICES_PREFIX + indexTimeFormatter.print(System.currentTimeMillis());
@ -344,41 +290,6 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
* @return a url connection to the selected host or null if no current host is available.
*/
private HttpURLConnection openAndValidateConnection(String method, String path, String contentType) {
if (hosts.length == 0) {
// Due to how Guice injection works and because HttpServer can be optional,
// we can't be 100% sure that the HttpServer is created when the ESExporter
// instance is created. This is specially true in integration tests.
// So if HttpServer is enabled in settings we can safely use the NodeService
// to retrieve the bound address.
BoundTransportAddress boundAddress = null;
if (httpEnabled) {
if ((httpServer != null) && (httpServer.lifecycleState() == Lifecycle.State.STARTED)) {
logger.debug("deriving host setting from httpServer");
boundAddress = httpServer.info().address();
} else if (nodeService.info().getHttp() != null) {
logger.debug("deriving host setting from node info API");
boundAddress = nodeService.info().getHttp().address();
}
} else {
logger.warn("http server is not enabled no hosts are manually configured");
return null;
}
String[] extractedHosts = HttpESExporterUtils.extractHostsFromAddress(boundAddress, logger);
if (extractedHosts == null || extractedHosts.length == 0) {
return null;
}
hosts = extractedHosts;
logger.trace("auto-resolved hosts to {}", (Object)extractedHosts);
boundToLocalNode = true;
}
// it's important to have boundToLocalNode persistent to prevent calls during shutdown (causing ugly exceptions)
if (boundToLocalNode && (httpServer != null) && (httpServer.lifecycleState() != Lifecycle.State.STARTED)) {
logger.debug("local node http server is not started. can't connect");
return null;
}
// out of for to move faulty hosts to the end
int hostIndex = 0;
try {
@ -388,16 +299,16 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
try {
Version remoteVersion = loadRemoteClusterVersion(host);
if (remoteVersion == null) {
logger.warn("unable to check remote cluster version: no version found on host [" + HttpESExporterUtils.santizeUrlPwds(host) + "]");
logger.warn("unable to check remote cluster version: no version found on host [" + host + "]");
continue;
}
supportedClusterVersion = remoteVersion.onOrAfter(minCompatibleClusterVersion);
supportedClusterVersion = remoteVersion.onOrAfter(MIN_SUPPORTED_CLUSTER_VERSION);
if (!supportedClusterVersion) {
logger.error("remote cluster version [" + remoteVersion + "] is not supported, please use a cluster with minimum version [" + minCompatibleClusterVersion + "]");
logger.error("remote cluster version [" + remoteVersion + "] is not supported, please use a cluster with minimum version [" + MIN_SUPPORTED_CLUSTER_VERSION + "]");
continue;
}
} catch (ElasticsearchException e) {
logger.error("exception when checking remote cluster version on host [{}]", e, HttpESExporterUtils.santizeUrlPwds(host));
logger.error("exception when checking remote cluster version on host [{}]", e, host);
continue;
}
}
@ -425,11 +336,11 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
System.arraycopy(hosts, hostIndex, newHosts, 0, hosts.length - hostIndex);
System.arraycopy(hosts, 0, newHosts, hosts.length - hostIndex, hostIndex);
hosts = newHosts;
logger.debug("preferred target host is now [{}]", HttpESExporterUtils.santizeUrlPwds(hosts[0]));
logger.debug("preferred target host is now [{}]", hosts[0]);
}
}
logger.error("could not connect to any configured elasticsearch instances: [{}]", HttpESExporterUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(hosts)));
logger.error("could not connect to any configured elasticsearch instances [{}]", Strings.arrayToCommaDelimitedString(hosts));
return null;
@ -438,7 +349,7 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
/** open a connection to the given hosts, returning null when not successful * */
private HttpURLConnection openConnection(String host, String method, String path, @Nullable String contentType) {
try {
final URL url = HttpESExporterUtils.parseHostWithPath(host, path);
final URL url = HttpExporterUtils.parseHostWithPath(host, path);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
if (conn instanceof HttpsURLConnection && sslSocketFactory != null) {
@ -450,14 +361,13 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
}
conn.setRequestMethod(method);
conn.setConnectTimeout(timeoutInMillis);
conn.setReadTimeout(readTimeoutInMillis);
conn.setConnectTimeout((int) connectionTimeout.getMillis());
conn.setReadTimeout((int) connectionReadTimeout.getMillis());
if (contentType != null) {
conn.setRequestProperty("Content-Type", contentType);
}
if (url.getUserInfo() != null) {
String basicAuth = "Basic " + Base64.encodeBytes(url.getUserInfo().getBytes("ISO-8859-1"));
conn.setRequestProperty("Authorization", basicAuth);
if (auth != null) {
auth.apply(conn);
}
conn.setUseCaches(false);
if (method.equalsIgnoreCase("POST") || method.equalsIgnoreCase("PUT")) {
@ -467,17 +377,17 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
return conn;
} catch (URISyntaxException e) {
logErrorBasedOnLevel(e, "error parsing host [{}]", HttpESExporterUtils.santizeUrlPwds(host));
logErrorBasedOnLevel(e, "error parsing host [{}]", host);
} catch (IOException e) {
logErrorBasedOnLevel(e, "error connecting to [{}]", HttpESExporterUtils.santizeUrlPwds(host));
logErrorBasedOnLevel(e, "error connecting to [{}]", host);
}
return null;
}
private void logErrorBasedOnLevel(Throwable t, String msg, Object... params) {
logger.error(msg + " [" + HttpESExporterUtils.santizeUrlPwds(t.getMessage()) + "]", params);
logger.error(msg + " [" + t.getMessage() + "]", params);
if (logger.isDebugEnabled()) {
logger.debug(msg + ". full error details:\n[{}]", params, HttpESExporterUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(t)));
logger.debug(msg + ". full error details:\n[{}]", params, ExceptionsHelper.detailedMessage(t));
}
}
@ -489,16 +399,16 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
try {
connection = openConnection(host, "GET", "/", null);
if (connection == null) {
throw new ElasticsearchException("unable to check remote cluster version: no available connection for host [" + HttpESExporterUtils.santizeUrlPwds(host) + "]");
throw new ElasticsearchException("unable to check remote cluster version: no available connection for host [" + host + "]");
}
try (InputStream is = connection.getInputStream()) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, out);
return HttpESExporterUtils.parseElasticsearchVersion(out.toByteArray());
return HttpExporterUtils.parseElasticsearchVersion(out.toByteArray());
}
} catch (IOException e) {
throw new ElasticsearchException("failed to verify the remote cluster version on host [" + HttpESExporterUtils.santizeUrlPwds(host) + "]:\n" + HttpESExporterUtils.santizeUrlPwds(e.getMessage()));
throw new ElasticsearchException("failed to verify the remote cluster version on host [" + host + "]:\n" + e.getMessage());
} finally {
if (connection != null) {
try {
@ -545,22 +455,22 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
}
if ((remoteTemplate == null) || (remoteTemplate.length == 0)) {
logger.error("unable to load remote marvel template on host [{}]", HttpESExporterUtils.santizeUrlPwds(host));
logger.error("unable to load remote marvel template on host [{}]", host);
return false;
}
Version remoteVersion = HttpESExporterUtils.parseTemplateVersion(remoteTemplate);
logger.debug("detected existing remote template in version [{}] on host [{}]", remoteVersion, HttpESExporterUtils.santizeUrlPwds(host));
Version remoteVersion = HttpExporterUtils.parseTemplateVersion(remoteTemplate);
logger.debug("detected existing remote template in version [{}] on host [{}]", remoteVersion, host);
if (remoteVersion == null) {
logger.warn("marvel template version cannot be found: template will be updated to version [{}]", templateVersion);
} else {
if (remoteVersion.before(minCompatibleTemplateVersion)) {
if (remoteVersion.before(MIN_SUPPORTED_TEMPLATE_VERSOIN)) {
logger.error("marvel template version [{}] is below the minimum compatible version [{}] on host [{}]: "
+ "please manually update the marvel template to a more recent version"
+ "and delete the current active marvel index (don't forget to back up it first if needed)",
remoteVersion, minCompatibleTemplateVersion, HttpESExporterUtils.santizeUrlPwds(host));
+ "please manually update the marvel template to a more recent version"
+ "and delete the current active marvel index (don't forget to back up it first if needed)",
remoteVersion, MIN_SUPPORTED_TEMPLATE_VERSOIN, host);
return false;
}
@ -580,7 +490,7 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
}
}
} catch (IOException e) {
logger.error("failed to verify the marvel template to [{}]:\n{}", HttpESExporterUtils.santizeUrlPwds(host), HttpESExporterUtils.santizeUrlPwds(e.getMessage()));
logger.error("failed to verify the marvel template to [{}]:\n{}", host, e.getMessage());
return false;
} finally {
if (connection != null) {
@ -602,7 +512,7 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
}
logger.debug("loading marvel pre-configured template");
byte[] template = HttpESExporterUtils.loadDefaultTemplate();
byte[] template = HttpExporterUtils.loadDefaultTemplate();
// Uploads the template and closes the outputstream
Streams.copy(template, connection.getOutputStream());
@ -614,7 +524,7 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
logger.info("marvel template updated to version [{}]", templateVersion);
} catch (IOException e) {
logger.error("failed to update the marvel template to [{}]:\n{}", HttpESExporterUtils.santizeUrlPwds(host), HttpESExporterUtils.santizeUrlPwds(e.getMessage()));
logger.error("failed to update the marvel template to [{}]:\n{}", host, e.getMessage());
return false;
} finally {
@ -641,59 +551,116 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
try {
logger.error("{} response code [{} {}]. content: [{}]",
HttpESExporterUtils.santizeUrlPwds(msg), conn.getResponseCode(),
HttpESExporterUtils.santizeUrlPwds(conn.getResponseMessage()),
HttpESExporterUtils.santizeUrlPwds(err));
msg, conn.getResponseCode(),
conn.getResponseMessage(),
err);
} catch (IOException e) {
logger.error("{}. connection had an error while reporting the error. tough life.", HttpESExporterUtils.santizeUrlPwds(msg));
}
}
@Override
public void onRefreshSettings(Settings settings) {
TimeValue newTimeout = settings.getAsTime(SETTINGS_TIMEOUT, null);
if (newTimeout != null) {
logger.info("connection timeout set to [{}]", newTimeout);
timeoutInMillis = (int) newTimeout.millis();
}
newTimeout = settings.getAsTime(SETTINGS_READ_TIMEOUT, null);
if (newTimeout != null) {
logger.info("connection read timeout set to [{}]", newTimeout);
readTimeoutInMillis = (int) newTimeout.millis();
}
String[] newHosts = settings.getAsArray(SETTINGS_HOSTS, null);
if (newHosts != null) {
logger.info("hosts set to [{}]", HttpESExporterUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(newHosts)));
this.hosts = newHosts;
this.checkedAndUploadedIndexTemplate = false;
this.supportedClusterVersion = false;
this.boundToLocalNode = false;
}
Boolean newHostnameVerification = settings.getAsBoolean(SETTINGS_SSL_HOSTNAME_VERIFICATION, null);
if (newHostnameVerification != null) {
logger.info("hostname verification set to [{}]", newHostnameVerification);
this.hostnameVerification = newHostnameVerification;
}
String newIndexTimeFormat = settings.get(SETTINGS_INDEX_TIME_FORMAT, null);
if (newIndexTimeFormat != null) {
try {
indexTimeFormatter = DateTimeFormat.forPattern(newIndexTimeFormat).withZoneUTC();
} catch (IllegalArgumentException e) {
logger.error("Unable to update marvel index time format: format [" + newIndexTimeFormat + "] is invalid", e);
}
logger.error("{}. connection had an error while reporting the error. tough life.", msg);
}
}
protected void initKeepAliveThread() {
keepAliveThread = new Thread(keepAliveWorker, EsExecutors.threadName(settings, "keep_alive"));
keepAliveThread = new Thread(keepAliveWorker, "marvel-exporter[" + config.name() + "][keep_alive]");
keepAliveThread.setDaemon(true);
keepAliveThread.start();
}
static private void validateHosts(String[] hosts) {
for (String host : hosts) {
try {
HttpExporterUtils.parseHostWithPath(host, "");
} catch (URISyntaxException e) {
throw new SettingsException("[marvel.agent.exporter] invalid host: [" + host + "]." +
" error: [" + e.getMessage() + "]");
} catch (MalformedURLException e) {
throw new SettingsException("[marvel.agent.exporter] invalid host: [" + host + "]." +
" error: [" + e.getMessage() + "]");
}
}
}
/** SSL Initialization * */
public SSLSocketFactory createSSLSocketFactory(Settings settings) {
if (settings.names().isEmpty()) {
logger.trace("no ssl context configured");
return null;
}
SSLContext sslContext;
// Initialize sslContext
try {
String protocol = settings.get(SSL_PROTOCOL_SETTING, "TLS");
String trustStore = settings.get(SSL_TRUSTSTORE_SETTING, System.getProperty("javax.net.ssl.trustStore"));
String trustStorePassword = settings.get(SSL_TRUSTSTORE_PASSWORD_SETTING, System.getProperty("javax.net.ssl.trustStorePassword"));
String trustStoreAlgorithm = settings.get(SSL_TRUSTSTORE_ALGORITHM_SETTING, System.getProperty("ssl.TrustManagerFactory.algorithm"));
if (trustStore == null) {
throw new SettingsException("missing required setting [" + SSL_TRUSTSTORE_SETTING + "]");
}
if (trustStoreAlgorithm == null) {
trustStoreAlgorithm = TrustManagerFactory.getDefaultAlgorithm();
}
logger.debug("using ssl trust store [{}] with algorithm [{}]", trustStore, trustStoreAlgorithm);
Path trustStorePath = env.configFile().resolve(trustStore);
if (!Files.exists(trustStorePath)) {
throw new SettingsException("could not find trust store file [" + trustStorePath + "]");
}
TrustManager[] trustManagers;
try (InputStream trustStoreStream = Files.newInputStream(trustStorePath)) {
// Load TrustStore
KeyStore ks = KeyStore.getInstance("jks");
ks.load(trustStoreStream, trustStorePassword == null ? null : trustStorePassword.toCharArray());
// Initialize a trust manager factory with the trusted store
TrustManagerFactory trustFactory = TrustManagerFactory.getInstance(trustStoreAlgorithm);
trustFactory.init(ks);
// Retrieve the trust managers from the factory
trustManagers = trustFactory.getTrustManagers();
} catch (Exception e) {
throw new RuntimeException("Failed to initialize a TrustManagerFactory", e);
}
sslContext = SSLContext.getInstance(protocol);
sslContext.init(null, trustManagers, null);
} catch (Exception e) {
throw new ElasticsearchException("failed to initialize ssl", e);
}
return sslContext.getSocketFactory();
}
BasicAuth resolveAuth(Settings setting) {
String username = setting.get(AUTH_USERNAME_SETTING, null);
String password = setting.get(AUTH_PASSWORD_SETTING, null);
if (username == null && password == null) {
return null;
}
if (username == null) {
throw new SettingsException("invalid auth setting. missing [" + settingFQN(AUTH_USERNAME_SETTING) + "]");
}
return new BasicAuth(username, password);
}
/**
* Trust all hostname verifier. This simply returns true to completely disable hostname verification
*/
static class TrustAllHostnameVerifier implements HostnameVerifier {
static final HostnameVerifier INSTANCE = new TrustAllHostnameVerifier();
private TrustAllHostnameVerifier() {
}
@Override
public boolean verify(String s, SSLSession sslSession) {
return true;
}
}
/**
* Sadly we need to make sure we keep the connection open to the target ES a
* Java's connection pooling closes connections if idle for 5sec.
@ -717,7 +684,7 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
}
HttpURLConnection conn = openConnection(currentHosts[0], "GET", "", null);
if (conn == null) {
logger.trace("keep alive thread shutting down. failed to open connection to current host [{}]", HttpESExporterUtils.santizeUrlPwds(currentHosts[0]));
logger.trace("keep alive thread shutting down. failed to open connection to current host [{}]", currentHosts[0]);
return;
} else {
conn.getInputStream().close(); // close and release to connection pool.
@ -726,83 +693,50 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
// ignore, if closed, good....
} catch (Throwable t) {
logger.debug("error in keep alive thread, shutting down (will be restarted after a successful connection has been made) {}",
HttpESExporterUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(t)));
ExceptionsHelper.detailedMessage(t));
return;
}
}
}
}
private static final String SETTINGS_SSL_PREFIX = SETTINGS_PREFIX + "ssl.";
static class BasicAuth {
public static final String SETTINGS_SSL_PROTOCOL = SETTINGS_SSL_PREFIX + "protocol";
public static final String SETTINGS_SSL_TRUSTSTORE = SETTINGS_SSL_PREFIX + "truststore.path";
public static final String SETTINGS_SSL_TRUSTSTORE_PASSWORD = SETTINGS_SSL_PREFIX + "truststore.password";
public static final String SETTINGS_SSL_TRUSTSTORE_ALGORITHM = SETTINGS_SSL_PREFIX + "truststore.algorithm";
public static final String SETTINGS_SSL_HOSTNAME_VERIFICATION = SETTINGS_SSL_PREFIX + "hostname_verification";
String username;
char[] password;
/** SSL Initialization * */
public SSLSocketFactory createSSLSocketFactory(Settings settings) {
SSLContext sslContext;
// Initialize sslContext
try {
String sslContextProtocol = settings.get(SETTINGS_SSL_PROTOCOL, "TLS");
String trustStore = settings.get(SETTINGS_SSL_TRUSTSTORE, System.getProperty("javax.net.ssl.trustStore"));
String trustStorePassword = settings.get(SETTINGS_SSL_TRUSTSTORE_PASSWORD, System.getProperty("javax.net.ssl.trustStorePassword"));
String trustStoreAlgorithm = settings.get(SETTINGS_SSL_TRUSTSTORE_ALGORITHM, System.getProperty("ssl.TrustManagerFactory.algorithm"));
if (trustStore == null) {
throw new RuntimeException("truststore is not configured, use " + SETTINGS_SSL_TRUSTSTORE);
}
if (trustStoreAlgorithm == null) {
trustStoreAlgorithm = TrustManagerFactory.getDefaultAlgorithm();
}
logger.debug("SSL: using trustStore[{}], trustAlgorithm[{}]", trustStore, trustStoreAlgorithm);
Path trustStorePath = environment.configFile().resolve(trustStore);
if (!Files.exists(trustStorePath)) {
throw new FileNotFoundException("Truststore at path [" + trustStorePath + "] does not exist");
}
TrustManager[] trustManagers;
try (InputStream trustStoreStream = Files.newInputStream(trustStorePath)) {
// Load TrustStore
KeyStore ks = KeyStore.getInstance("jks");
ks.load(trustStoreStream, trustStorePassword == null ? null : trustStorePassword.toCharArray());
// Initialize a trust manager factory with the trusted store
TrustManagerFactory trustFactory = TrustManagerFactory.getInstance(trustStoreAlgorithm);
trustFactory.init(ks);
// Retrieve the trust managers from the factory
trustManagers = trustFactory.getTrustManagers();
} catch (Exception e) {
throw new RuntimeException("Failed to initialize a TrustManagerFactory", e);
}
sslContext = SSLContext.getInstance(sslContextProtocol);
sslContext.init(null, trustManagers, null);
} catch (Exception e) {
throw new RuntimeException("[marvel.agent.exporter] failed to initialize the SSLContext", e);
public BasicAuth(String username, String password) {
this.username = username;
this.password = password != null ? password.toCharArray() : null;
}
void apply(HttpURLConnection connection) throws UnsupportedEncodingException {
String userInfo = username + ":" + (password != null ? new String(password) : "");
String basicAuth = "Basic " + Base64.encodeBytes(userInfo.getBytes("ISO-8859-1"));
connection.setRequestProperty("Authorization", basicAuth);
}
return sslContext.getSocketFactory();
}
/**
* Trust all hostname verifier. This simply returns true to completely disable hostname verification
*/
static class TrustAllHostnameVerifier implements HostnameVerifier {
static final HostnameVerifier INSTANCE = new TrustAllHostnameVerifier();
public static class Factory extends Exporter.Factory<HttpExporter> {
private TrustAllHostnameVerifier() {
private final Environment env;
private final RendererRegistry rendererRegistry;
@Inject
public Factory(Environment env, RendererRegistry rendererRegistry) {
super(TYPE, false);
this.env = env;
this.rendererRegistry = rendererRegistry;
}
@Override
public boolean verify(String s, SSLSession sslSession) {
return true;
public HttpExporter create(Config config) {
return new HttpExporter(config, env, rendererRegistry);
}
@Override
public void filterOutSensitiveSettings(String prefix, MarvelSettingsFilter filter) {
filter.filterOut(prefix + AUTH_PASSWORD_SETTING);
}
}
}

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
package org.elasticsearch.marvel.agent.exporter.http;
import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
@ -24,7 +24,7 @@ import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class HttpESExporterUtils {
public class HttpExporterUtils {
public static final String MARVEL_TEMPLATE_FILE = "/marvel_index_template.json";
static final String MARVEL_VERSION_FIELD = "marvel_version";
@ -77,11 +77,7 @@ public class HttpESExporterUtils {
if (hostUrl.getPort() == -1) {
// url has no port, default to 9200 - sadly we need to rebuild..
StringBuilder newUrl = new StringBuilder(hostUrl.getProtocol() + "://");
if (hostUrl.getUserInfo() != null) {
newUrl.append(hostUrl.getUserInfo()).append("@");
}
newUrl.append(hostUrl.getHost()).append(":9200").append(hostUrl.toURI().getPath());
hostUrl = new URL(newUrl.toString());
}
@ -93,7 +89,7 @@ public class HttpESExporterUtils {
* Loads the default Marvel template
*/
public static byte[] loadDefaultTemplate() {
try (InputStream is = HttpESExporterUtils.class.getResourceAsStream(MARVEL_TEMPLATE_FILE)) {
try (InputStream is = HttpExporterUtils.class.getResourceAsStream(MARVEL_TEMPLATE_FILE)) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, out);
return out.toByteArray();
@ -134,15 +130,4 @@ public class HttpESExporterUtils {
}
return null;
}
private static final String userInfoChars = "\\w-\\._~!$&\\'\\(\\)*+,;=%";
private static Pattern urlPwdSanitizer = Pattern.compile("([" + userInfoChars + "]+?):[" + userInfoChars + "]+?@");
public static String santizeUrlPwds(Object text) {
if (text == null) {
return null;
}
Matcher matcher = urlPwdSanitizer.matcher(text.toString());
return matcher.replaceAll("$1:XXXXXX@");
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter.local;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.shield.SecuredClient;
import java.util.Collection;
/**
*
*/
public class LocalExporter extends Exporter {
public static final String TYPE = "local";
private final Client client;
public LocalExporter(Exporter.Config config, SecuredClient client) {
super(TYPE, config);
this.client = client;
}
@Override
public void export(Collection<MarvelDoc> marvelDocs) {
}
@Override
public void close() {
}
public static class Factory extends Exporter.Factory<LocalExporter> {
private final SecuredClient client;
@Inject
public Factory(SecuredClient client) {
super(TYPE, true);
this.client = client;
}
@Override
public LocalExporter create(Config config) {
return new LocalExporter(config, client);
}
}
}

View File

@ -0,0 +1,173 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.marvel.agent.exporter.local.LocalExporter;
import org.elasticsearch.marvel.shield.MarvelSettingsFilter;
import org.elasticsearch.marvel.shield.SecuredClient;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import org.junit.Test;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
/**
*
*/
public class ExportersTests extends ESTestCase {
private Exporters exporters;
private Map<String, Exporter.Factory> factories;
private MarvelSettingsFilter settingsFilter;
private ClusterService clusterService;
private NodeSettingsService nodeSettingsService;
@Before
public void init() throws Exception {
factories = new HashMap<>();
// we always need to have the local exporter as it serves as the default one
factories.put(LocalExporter.TYPE, new LocalExporter.Factory(mock(SecuredClient.class)));
settingsFilter = mock(MarvelSettingsFilter.class);
clusterService = mock(ClusterService.class);
nodeSettingsService = mock(NodeSettingsService.class);
exporters = new Exporters(Settings.EMPTY, factories, settingsFilter, clusterService, nodeSettingsService);
}
@Test
public void testInitExporters_Default() throws Exception {
Exporter.Factory factory = spy(new TestFactory("_type", true));
factories.put("_type", factory);
Exporters.InternalExporters internalExporters = exporters.initExporters(Settings.builder()
.build());
assertThat(internalExporters, notNullValue());
assertThat(internalExporters.settings.getAsMap().size(), is(0));
assertThat(internalExporters.exporters.size(), is(1));
assertThat(internalExporters.exporters, hasKey("default_" + LocalExporter.TYPE));
assertThat(internalExporters.exporters.get("default_" + LocalExporter.TYPE), instanceOf(LocalExporter.class));
}
@Test
public void testInitExporters_Single() throws Exception {
Exporter.Factory factory = spy(new TestFactory("_type", true));
factories.put("_type", factory);
Exporters.InternalExporters internalExporters = exporters.initExporters(Settings.builder()
.put("_name.type", "_type")
.build());
assertThat(internalExporters, notNullValue());
assertThat(internalExporters.settings.getAsMap().size(), is(1));
assertThat(internalExporters.settings.getAsMap(), hasEntry("_name.type", "_type"));
assertThat(internalExporters.exporters.size(), is(1));
assertThat(internalExporters.exporters, hasKey("_name"));
assertThat(internalExporters.exporters.get("_name"), instanceOf(TestFactory.TestExporter.class));
assertThat(internalExporters.exporters.get("_name").type, is("_type"));
}
@Test
public void testInitExporters_Single_Disabled() throws Exception {
Exporter.Factory factory = spy(new TestFactory("_type", true));
factories.put("_type", factory);
Exporters.InternalExporters internalExporters = exporters.initExporters(Settings.builder()
.put("_name.type", "_type")
.put("_name.enabled", false)
.build());
assertThat(internalExporters, notNullValue());
assertThat(internalExporters.settings.getAsMap().size(), is(2));
assertThat(internalExporters.settings.getAsMap(), hasEntry("_name.type", "_type"));
assertThat(internalExporters.settings.getAsMap(), hasEntry("_name.enabled", "false"));
// the only configured exporter is disabled... yet we intentionally don't fallback on the default
assertThat(internalExporters.exporters.size(), is(0));
}
@Test(expected = SettingsException.class)
public void testInitExporters_Single_UnknownType() throws Exception {
exporters.initExporters(Settings.builder()
.put("_name.type", "unknown_type")
.build());
}
@Test(expected = SettingsException.class)
public void testInitExporters_Single_MissingExporterType() throws Exception {
exporters.initExporters(Settings.builder()
.put("_name.foo", "bar")
.build());
}
@Test
public void testInitExporters_Mutliple_SameType() throws Exception {
Exporter.Factory factory = spy(new TestFactory("_type", false));
factories.put("_type", factory);
Exporters.InternalExporters internalExporters = exporters.initExporters(Settings.builder()
.put("_name0.type", "_type")
.put("_name1.type", "_type")
.build());
assertThat(internalExporters, notNullValue());
assertThat(internalExporters.settings.getAsMap().size(), is(2));
assertThat(internalExporters.settings.getAsMap(), hasEntry("_name0.type", "_type"));
assertThat(internalExporters.settings.getAsMap(), hasEntry("_name1.type", "_type"));
assertThat(internalExporters.exporters.size(), is(2));
assertThat(internalExporters.exporters, hasKey("_name0"));
assertThat(internalExporters.exporters.get("_name0"), instanceOf(TestFactory.TestExporter.class));
assertThat(internalExporters.exporters.get("_name0").type, is("_type"));
assertThat(internalExporters.exporters, hasKey("_name1"));
assertThat(internalExporters.exporters.get("_name1"), instanceOf(TestFactory.TestExporter.class));
assertThat(internalExporters.exporters.get("_name1").type, is("_type"));
}
@Test(expected = SettingsException.class)
public void testInitExporters_Mutliple_SameType_Singletons() throws Exception {
Exporter.Factory factory = spy(new TestFactory("_type", true));
factories.put("_type", factory);
exporters.initExporters(Settings.builder()
.put("_name0.type", "_type")
.put("_name1.type", "_type")
.build());
}
static class TestFactory extends Exporter.Factory<TestFactory.TestExporter> {
public TestFactory(String type, boolean singleton) {
super(type, singleton);
}
@Override
public TestExporter create(Exporter.Config config) {
return new TestExporter(type(), config);
}
static class TestExporter extends Exporter {
public TestExporter(String type, Config config) {
super(type, config);
}
@Override
public void export(Collection<MarvelDoc> marvelDocs) throws Exception {
}
@Override
public void close() {
}
}
}
}

View File

@ -1,283 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.plugin.LicensePlugin;
import org.elasticsearch.marvel.MarvelPlugin;
import org.elasticsearch.marvel.agent.AgentService;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMarvelDoc;
import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryCollector;
import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryMarvelDoc;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.hamcrest.Matchers;
import org.joda.time.format.DateTimeFormat;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
// Transport Client instantiation also calls the marvel plugin, which then fails to find modules
@ClusterScope(transportClientRatio = 0.0, scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
@ESIntegTestCase.SuppressLocalMode
public class HttpESExporterTests extends ESIntegTestCase {
final static AtomicLong timeStampGenerator = new AtomicLong();
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LicensePlugin.class, MarvelPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
@Test
public void testHttpServerOff() {
Settings.Builder builder = Settings.builder()
.put(MarvelSettings.STARTUP_DELAY, "200m")
.put(Node.HTTP_ENABLED, false);
internalCluster().startNode(builder);
HttpESExporter httpEsExporter = getEsExporter();
logger.info("trying exporting despite of no target");
httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc()));
}
@Test
public void testTemplateAdditionDespiteOfLateClusterForming() {
Settings.Builder builder = Settings.builder()
.put(MarvelSettings.STARTUP_DELAY, "200m")
.put(Node.HTTP_ENABLED, true)
.put("discovery.type", "zen")
.put("discovery.zen.ping_timeout", "1s")
.put("discovery.initial_state_timeout", "100ms")
.put("discovery.zen.minimum_master_nodes", 2)
.put(HttpESExporter.SETTINGS_BULK_TIMEOUT, "1s")
.put(HttpESExporter.SETTINGS_CHECK_TEMPLATE_TIMEOUT, "1s");
internalCluster().startNode(builder);
HttpESExporter httpEsExporter = getEsExporter();
logger.info("exporting events while there is no cluster");
httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc()));
logger.info("bringing up a second node");
internalCluster().startNode(builder);
ensureGreen();
logger.info("exporting a second event");
httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc()));
logger.info("verifying that template has been created");
assertMarvelTemplateExists();
}
@Test
public void testDynamicHostChange() {
// disable exporting to be able to use non valid hosts
Settings.Builder builder = Settings.builder()
.put(MarvelSettings.INTERVAL, "-1");
internalCluster().startNode(builder);
HttpESExporter httpEsExporter = getEsExporter();
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().putArray(HttpESExporter.SETTINGS_HOSTS, "test1")));
assertThat(httpEsExporter.getHosts(), Matchers.arrayContaining("test1"));
// wipes the non array settings
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.putArray(HttpESExporter.SETTINGS_HOSTS, "test2").put(HttpESExporter.SETTINGS_HOSTS, "")));
assertThat(httpEsExporter.getHosts(), Matchers.arrayContaining("test2"));
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().putArray(HttpESExporter.SETTINGS_HOSTS, "test3")));
assertThat(httpEsExporter.getHosts(), Matchers.arrayContaining("test3"));
}
@Test
public void testHostChangeReChecksTemplate() {
Settings.Builder builder = Settings.builder()
.put(MarvelSettings.STARTUP_DELAY, "200m")
.put(Node.HTTP_ENABLED, true);
internalCluster().startNode(builder);
HttpESExporter httpEsExporter = getEsExporter();
logger.info("exporting an event");
httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc()));
logger.info("removing the marvel template");
assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get());
assertMarvelTemplateNotExists();
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(
Settings.builder().putArray(HttpESExporter.SETTINGS_HOSTS, httpEsExporter.getHosts())).get());
logger.info("exporting a second event");
httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc()));
logger.info("verifying that template has been created");
assertMarvelTemplateExists();
}
@Test
public void testHostFailureChecksTemplate() throws Exception {
Settings.Builder builder = Settings.builder()
.put(MarvelSettings.STARTUP_DELAY, "200m")
.put(Node.HTTP_ENABLED, true);
final String node0 = internalCluster().startNode(builder);
final HttpESExporter httpEsExporter0 = getEsExporter(node0);
assertThat(node0, equalTo(internalCluster().getMasterName()));
final String node1 = internalCluster().startNode(builder);
final HttpESExporter httpEsExporter1 = getEsExporter(node1);
logger.info("--> exporting events to force host resolution");
httpEsExporter0.export(Collections.singletonList(newRandomMarvelDoc()));
httpEsExporter1.export(Collections.singletonList(newRandomMarvelDoc()));
logger.info("--> setting exporting hosts to {} + {}", httpEsExporter0.getHosts(), httpEsExporter1.getHosts());
ArrayList<String> mergedHosts = new ArrayList<String>();
mergedHosts.addAll(Arrays.asList(httpEsExporter0.getHosts()));
mergedHosts.addAll(Arrays.asList(httpEsExporter1.getHosts()));
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(
Settings.builder().putArray(HttpESExporter.SETTINGS_HOSTS, mergedHosts.toArray(Strings.EMPTY_ARRAY))).get());
logger.info("--> exporting events to have new settings take effect");
httpEsExporter0.export(Collections.singletonList(newRandomMarvelDoc()));
httpEsExporter1.export(Collections.singletonList(newRandomMarvelDoc()));
logger.info("verifying that template has been created");
assertMarvelTemplateExists();
logger.info("--> removing the marvel template");
assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get());
assertMarvelTemplateNotExists();
logger.info("--> shutting down node0");
internalCluster().stopCurrentMasterNode();
logger.info("--> exporting events from node1");
// we use assert busy node because url caching may cause the node failure to be only detected while sending the event
assertBusy(new Runnable() {
@Override
public void run() {
httpEsExporter1.export(Collections.singletonList(newRandomMarvelDoc()));
logger.debug("--> checking for template");
assertMarvelTemplateExists();
}
});
}
@Test
public void testDynamicIndexFormatChange() {
Settings.Builder builder = Settings.builder()
.put(MarvelSettings.STARTUP_DELAY, "200m")
.put(Node.HTTP_ENABLED, true);
String nodeId = internalCluster().startNode(builder);
logger.info("exporting a first event");
HttpESExporter httpEsExporter = getEsExporter(nodeId);
httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc()));
logger.info("checks that the index [{}] is created", httpEsExporter.getIndexName());
assertTrue(client().admin().indices().prepareExists(httpEsExporter.getIndexName()).get().isExists());
String newTimeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM");
logger.info("updating index time format setting to {}", newTimeFormat);
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(HttpESExporter.SETTINGS_INDEX_TIME_FORMAT, newTimeFormat)));
logger.info("exporting a second event");
httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc()));
String expectedMarvelIndex = MarvelSettings.MARVEL_INDICES_PREFIX
+ DateTimeFormat.forPattern(newTimeFormat).withZoneUTC().print(System.currentTimeMillis());
logger.info("checks that the index [{}] is created", expectedMarvelIndex);
assertTrue(client().admin().indices().prepareExists(expectedMarvelIndex).get().isExists());
logger.info("verifying that template has been created");
assertMarvelTemplateExists();
}
@Test
public void testLoadRemoteClusterVersion() {
Settings.Builder builder = Settings.builder()
.put(MarvelSettings.STARTUP_DELAY, "200m")
.put(Node.HTTP_ENABLED, true);
String nodeId = internalCluster().startNode(builder);
HttpESExporter httpEsExporter = getEsExporter(nodeId);
logger.info("--> exporting events to force host resolution");
httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc()));
assertNotNull(httpEsExporter.getHosts());
assertThat(httpEsExporter.getHosts().length, greaterThan(0));
logger.info("--> loading remote cluster version");
Version resolved = httpEsExporter.loadRemoteClusterVersion(httpEsExporter.getHosts()[0]);
assertTrue(resolved.equals(Version.CURRENT));
}
private HttpESExporter getEsExporter() {
AgentService service = internalCluster().getInstance(AgentService.class);
return (HttpESExporter) service.getExporters().iterator().next();
}
private HttpESExporter getEsExporter(String node) {
AgentService service = internalCluster().getInstance(AgentService.class, node);
return (HttpESExporter) service.getExporters().iterator().next();
}
private MarvelDoc newRandomMarvelDoc() {
if (randomBoolean()) {
return new IndexRecoveryMarvelDoc(internalCluster().getClusterName(),
IndexRecoveryCollector.TYPE, timeStampGenerator.incrementAndGet(), new RecoveryResponse());
} else {
return new ClusterStateMarvelDoc(internalCluster().getClusterName(),
ClusterStateCollector.TYPE, timeStampGenerator.incrementAndGet(), ClusterState.PROTO, ClusterHealthStatus.GREEN);
}
}
private void assertMarvelTemplateExists() {
assertTrue("marvel template must exists", isTemplateExists("marvel"));
}
private void assertMarvelTemplateNotExists() {
assertFalse("marvel template must not exists", isTemplateExists("marvel"));
}
private boolean isTemplateExists(String templateName) {
for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(templateName).get().getIndexTemplates()) {
if (template.getName().equals(templateName)) {
return true;
}
}
return false;
}
}

View File

@ -1,208 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.Version;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
public class HttpESExporterUtilsTests extends ESTestCase {
@Test
public void testLoadTemplate() {
byte[] template = HttpESExporterUtils.loadDefaultTemplate();
assertNotNull(template);
assertThat(template.length, Matchers.greaterThan(0));
}
@Test
public void testParseTemplateVersionFromByteArrayTemplate() throws IOException {
byte[] template = HttpESExporterUtils.loadDefaultTemplate();
assertNotNull(template);
Version version = HttpESExporterUtils.parseTemplateVersion(template);
assertNotNull(version);
}
@Test
public void testParseTemplateVersionFromStringTemplate() throws IOException {
List<String> templates = new ArrayList<>();
templates.add("{\"marvel_version\": \"1.4.0.Beta1\"}");
templates.add("{\"marvel_version\": \"1.6.2-SNAPSHOT\"}");
templates.add("{\"marvel_version\": \"1.7.1\"}");
templates.add("{\"marvel_version\": \"2.0.0-beta1\"}");
templates.add("{\"marvel_version\": \"2.0.0\"}");
templates.add("{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }");
for (String template : templates) {
Version version = HttpESExporterUtils.parseTemplateVersion(template);
assertNotNull(version);
}
Version version = HttpESExporterUtils.parseTemplateVersion("{\"marvel.index_format\": \"7\"}");
assertNull(version);
}
@Test
public void testParseVersion() throws IOException {
assertNotNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0-beta1\"}"));
assertNotNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0\"}"));
assertNotNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"1.5.2\"}"));
assertNotNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }"));
assertNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel.index_format\": \"7\"}"));
assertNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD + "unkown", "{\"marvel_version\": \"1.5.2\"}"));
}
@Test
public void testHostParsing() throws MalformedURLException, URISyntaxException {
URL url = HttpESExporterUtils.parseHostWithPath("localhost:9200", "");
verifyUrl(url, "http", "localhost", 9200, "/");
url = HttpESExporterUtils.parseHostWithPath("localhost", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/_bulk");
url = HttpESExporterUtils.parseHostWithPath("http://localhost:9200", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/_bulk");
url = HttpESExporterUtils.parseHostWithPath("http://localhost", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/_bulk");
url = HttpESExporterUtils.parseHostWithPath("https://localhost:9200", "_bulk");
verifyUrl(url, "https", "localhost", 9200, "/_bulk");
url = HttpESExporterUtils.parseHostWithPath("https://boaz-air.local:9200", "_bulk");
verifyUrl(url, "https", "boaz-air.local", 9200, "/_bulk");
url = HttpESExporterUtils.parseHostWithPath("boaz:test@localhost:9200", "");
verifyUrl(url, "http", "localhost", 9200, "/", "boaz:test");
url = HttpESExporterUtils.parseHostWithPath("boaz:test@localhost", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/_bulk", "boaz:test");
url = HttpESExporterUtils.parseHostWithPath("http://boaz:test@localhost:9200", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/_bulk", "boaz:test");
url = HttpESExporterUtils.parseHostWithPath("http://boaz:test@localhost", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/_bulk", "boaz:test");
url = HttpESExporterUtils.parseHostWithPath("https://boaz:test@localhost:9200", "_bulk");
verifyUrl(url, "https", "localhost", 9200, "/_bulk", "boaz:test");
url = HttpESExporterUtils.parseHostWithPath("boaz:test@localhost:9200/suburl", "");
verifyUrl(url, "http", "localhost", 9200, "/suburl/", "boaz:test");
url = HttpESExporterUtils.parseHostWithPath("boaz:test@localhost:9200/suburl/", "");
verifyUrl(url, "http", "localhost", 9200, "/suburl/", "boaz:test");
url = HttpESExporterUtils.parseHostWithPath("localhost/suburl", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/suburl/_bulk");
url = HttpESExporterUtils.parseHostWithPath("http://boaz:test@localhost:9200/suburl/suburl1", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/suburl/suburl1/_bulk", "boaz:test");
url = HttpESExporterUtils.parseHostWithPath("http://boaz:test@localhost/suburl", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/suburl/_bulk", "boaz:test");
url = HttpESExporterUtils.parseHostWithPath("https://boaz:test@localhost:9200/suburl", "_bulk");
verifyUrl(url, "https", "localhost", 9200, "/suburl/_bulk", "boaz:test");
url = HttpESExporterUtils.parseHostWithPath("https://user:test@server_with_underscore:9300", "_bulk");
verifyUrl(url, "https", "server_with_underscore", 9300, "/_bulk", "user:test");
url = HttpESExporterUtils.parseHostWithPath("user:test@server_with_underscore:9300", "_bulk");
verifyUrl(url, "http", "server_with_underscore", 9300, "/_bulk", "user:test");
url = HttpESExporterUtils.parseHostWithPath("server_with_underscore:9300", "_bulk");
verifyUrl(url, "http", "server_with_underscore", 9300, "/_bulk");
url = HttpESExporterUtils.parseHostWithPath("server_with_underscore", "_bulk");
verifyUrl(url, "http", "server_with_underscore", 9200, "/_bulk");
url = HttpESExporterUtils.parseHostWithPath("https://user:test@server-dash:9300", "_bulk");
verifyUrl(url, "https", "server-dash", 9300, "/_bulk", "user:test");
url = HttpESExporterUtils.parseHostWithPath("user:test@server-dash:9300", "_bulk");
verifyUrl(url, "http", "server-dash", 9300, "/_bulk", "user:test");
url = HttpESExporterUtils.parseHostWithPath("server-dash:9300", "_bulk");
verifyUrl(url, "http", "server-dash", 9300, "/_bulk");
url = HttpESExporterUtils.parseHostWithPath("server-dash", "_bulk");
verifyUrl(url, "http", "server-dash", 9200, "/_bulk");
}
@Test
public void sanitizeUrlPadTest() throws UnsupportedEncodingException {
String pwd = URLEncoder.encode(randomRealisticUnicodeOfCodepointLengthBetween(3, 20), "UTF-8");
String[] inputs = new String[]{
"https://boaz:" + pwd + "@hostname:9200",
"http://boaz:" + pwd + "@hostname:9200",
"boaz:" + pwd + "@hostname",
"boaz:" + pwd + "@hostname/hello",
"Parse exception in [boaz:" + pwd + "@hostname:9200,boaz1:" + pwd + "@hostname \n" +
"caused: by exception ,boaz1:" + pwd + "@hostname",
"failed to upload index template, stopping export\n" +
"java.lang.RuntimeException: failed to load/verify index template\n" +
" at org.elasticsearch.marvel.agent.exporter.ESExporter.checkAndUploadIndexTemplate(ESExporter.java:525)\n" +
" at org.elasticsearch.marvel.agent.exporter.ESExporter.openExportingConnection(ESExporter.java:213)\n" +
" at org.elasticsearch.marvel.agent.exporter.ESExporter.exportXContent(ESExporter.java:285)\n" +
" at org.elasticsearch.marvel.agent.exporter.ESExporter.exportClusterStats(ESExporter.java:206)\n" +
" at org.elasticsearch.marvel.agent.AgentService$ExportingWorker.exportClusterStats(AgentService.java:288)\n" +
" at org.elasticsearch.marvel.agent.AgentService$ExportingWorker.run(AgentService.java:245)\n" +
" at java.lang.Thread.run(Thread.java:745)\n" +
"Caused by: java.io.IOException: Server returned HTTP response code: 401 for URL: http://marvel_exporter:" + pwd + "@localhost:9200/_template/marvel\n" +
" at sun.reflect.GeneratedConstructorAccessor3.createMarvelDoc(Unknown Source)\n" +
" at sun.reflect.DelegatingConstructorAccessorImpl.createMarvelDoc(DelegatingConstructorAccessorImpl.java:45)\n" +
" at java.lang.reflect.Constructor.createMarvelDoc(Constructor.java:526)\n" +
" at sun.net.www.protocol.http.HttpURLConnection$6.run(HttpURLConnection.java:1675)\n" +
" at sun.net.www.protocol.http.HttpURLConnection$6.run(HttpURLConnection.java:1673)\n" +
" at java.security.AccessController.doPrivileged(Native Method)\n" +
" at sun.net.www.protocol.http.HttpURLConnection.getChainedException(HttpURLConnection.java:1671)\n" +
" at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1244)\n" +
" at org.elasticsearch.marvel.agent.exporter.ESExporter.checkAndUploadIndexTemplate(ESExporter.java:519)\n" +
" ... 6 more\n" +
"Caused by: java.io.IOException: Server returned HTTP response code: 401 for URL: http://marvel_exporter:" + pwd + "@localhost:9200/_template/marvel\n" +
" at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1626)\n" +
" at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:468)\n" +
" at org.elasticsearch.marvel.agent.exporter.ESExporter.checkAndUploadIndexTemplate(ESExporter.java:514)\n" +
" ... 6 more"
};
for (String input : inputs) {
String sanitized = HttpESExporterUtils.santizeUrlPwds(input);
assertThat(sanitized, not(containsString(pwd)));
}
}
void verifyUrl(URL url, String protocol, String host, int port, String path) throws URISyntaxException {
assertThat(url.getProtocol(), equalTo(protocol));
assertThat(url.getHost(), equalTo(host));
assertThat(url.getPort(), equalTo(port));
assertThat(url.toURI().getPath(), equalTo(path));
}
void verifyUrl(URL url, String protocol, String host, int port, String path, String userInfo) throws URISyntaxException {
verifyUrl(url, protocol, host, port, path);
assertThat(url.getUserInfo(), equalTo(userInfo));
}
}

View File

@ -0,0 +1,355 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter.http;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.license.plugin.LicensePlugin;
import org.elasticsearch.marvel.MarvelPlugin;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMarvelDoc;
import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryCollector;
import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryMarvelDoc;
import org.elasticsearch.marvel.agent.exporter.Exporters;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import org.hamcrest.Matchers;
import org.joda.time.format.DateTimeFormat;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
// Transport Client instantiation also calls the marvel plugin, which then fails to find modules
@ClusterScope(transportClientRatio = 0.0, scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
@ESIntegTestCase.SuppressLocalMode
public class HttpExporterTests extends ESIntegTestCase {
final static AtomicLong timeStampGenerator = new AtomicLong();
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LicensePlugin.class, MarvelPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(Node.HTTP_ENABLED, true)
.put("shield.enabled", false)
.build();
}
@Test
public void testSimpleExport() throws Exception {
TargetNode target = TargetNode.start(internalCluster());
Settings.Builder builder = Settings.builder()
.put("marvel.agent.exporters._http.type", "http")
.put("marvel.agent.exporters._http.host", target.httpAddress);
String agentNode = internalCluster().startNode(builder);
ensureGreen();
HttpExporter exporter = getExporter(agentNode);
MarvelDoc doc = newRandomMarvelDoc();
exporter.export(Collections.singletonList(doc));
flush();
refresh();
SearchResponse response = client().prepareSearch(".marvel-es-*").setTypes(doc.type()).get();
assertThat(response, notNullValue());
assertThat(response.getHits().totalHits(), is(1L));
}
// TODO not sure what this tests... in any case... I'm pretty sure we can remove it
// @Test
// public void testHttpServerOff() {
// Settings.Builder builder = Settings.builder()
// .put(MarvelSettings.STARTUP_DELAY, "200m")
// .put(Node.HTTP_ENABLED, false);
// internalCluster().startNode(builder);
// HttpExporter httpEsExporter = getEsExporter();
// logger.info("trying exporting despite of no target");
// httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc()));
// }
//
@Test
public void testTemplateAdditionDespiteOfLateClusterForming() throws Exception {
TargetNode target = TargetNode.start(internalCluster());
Settings.Builder builder = Settings.builder()
.put(MarvelSettings.STARTUP_DELAY, "200m")
.put(Node.HTTP_ENABLED, true)
.put("discovery.type", "zen")
.put("discovery.zen.ping_timeout", "1s")
.put("discovery.initial_state_timeout", "100ms")
.put("discovery.zen.minimum_master_nodes", 2)
.put("marvel.agent.exporters._http.type", "http")
.put("marvel.agent.exporters._http.host", target.httpAddress)
.put("marvel.agent.exporters._http." + HttpExporter.BULK_TIMEOUT_SETTING, "1s")
.put("marvel.agent.exporters._http." + HttpExporter.TEMPLATE_CHECK_TIMEOUT_SETTING, "1s");
String nodeName = internalCluster().startNode(builder);
HttpExporter exporter = getExporter(nodeName);
logger.info("exporting events while there is no cluster");
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
logger.info("bringing up a second node");
internalCluster().startNode(builder);
ensureGreen();
logger.info("exporting a second event");
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
logger.info("verifying that template has been created");
assertMarvelTemplateExists();
}
@Test
public void testDynamicHostChange() {
// disable exporting to be able to use non valid hosts
Settings.Builder builder = Settings.builder()
.put(MarvelSettings.INTERVAL, "-1")
.put("marvel.agent.exporters._http.type", "http")
.put("marvel.agent.exporters._http.host", "test0");
String nodeName = internalCluster().startNode(builder);
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.putArray("marvel.agent.exporters._http.host", "test1")));
assertThat(getExporter(nodeName).hosts, Matchers.arrayContaining("test1"));
// wipes the non array settings
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.putArray("marvel.agent.exporters._http.host", "test2")
.put("marvel.agent.exporters._http.host", "")));
assertThat(getExporter(nodeName).hosts, Matchers.arrayContaining("test2"));
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.putArray("marvel.agent.exporters._http.host", "test3")));
assertThat(getExporter(nodeName).hosts, Matchers.arrayContaining("test3"));
}
@Test
public void testHostChangeReChecksTemplate() throws Exception {
TargetNode targetNode = TargetNode.start(internalCluster());
Settings.Builder builder = Settings.builder()
.put(MarvelSettings.STARTUP_DELAY, "200m")
.put("marvel.agent.exporters._http.type", "http")
.put("marvel.agent.exporters._http.host", targetNode.httpAddress);
String agentNode = internalCluster().startNode(builder);
HttpExporter exporter = getExporter(agentNode);
logger.info("exporting an event");
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
logger.info("removing the marvel template");
assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get());
assertMarvelTemplateNotExists();
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(
Settings.builder().putArray("marvel.agent.exporters._http.host", exporter.hosts)).get());
// a new exporter is created on update, so we need to re-fetch it
exporter = getExporter(agentNode);
logger.info("exporting a second event");
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
logger.info("verifying that template has been created");
assertMarvelTemplateExists();
}
@Test
public void testHostFailureChecksTemplate() throws Exception {
TargetNode target0 = TargetNode.start(internalCluster());
assertThat(target0.name, is(internalCluster().getMasterName()));
TargetNode target1 = TargetNode.start(internalCluster());
// lets start node0 & node1 first, such that node0 will be the master (it's first to start)
final String node0 = internalCluster().startNode(Settings.builder()
.put(MarvelSettings.STARTUP_DELAY, "200m")
.put("marvel.agent.exporters._http.type", "http")
.putArray("marvel.agent.exporters._http.host", target0.httpAddress, target1.httpAddress));
HttpExporter exporter = getExporter(node0);
logger.info("--> exporting events to have new settings take effect");
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
logger.info("verifying that template has been created");
assertMarvelTemplateExists();
logger.info("--> removing the marvel template");
assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get());
assertMarvelTemplateNotExists();
logger.info("--> shutting down target0");
assertThat(target0.name, is(internalCluster().getMasterName())); // just to be sure it's still the master
internalCluster().stopCurrentMasterNode();
// we use assert busy node because url caching may cause the node failure to be only detected while sending the event
assertBusy(new Runnable() {
@Override
public void run() {
try {
logger.info("--> exporting events from node0");
getExporter(node0).export(Collections.singletonList(newRandomMarvelDoc()));
} catch (Exception e) {
e.printStackTrace();
fail("failed to export event from node0");
}
logger.debug("--> checking for template");
assertMarvelTemplateExists();
logger.debug("--> template exists");
}
});
}
@Test
public void testDynamicIndexFormatChange() throws Exception {
TargetNode targetNode = TargetNode.start(internalCluster());
Settings.Builder builder = Settings.builder()
.put(MarvelSettings.STARTUP_DELAY, "200m")
.put("marvel.agent.exporters._http.type", "http")
.put("marvel.agent.exporters._http.host", targetNode.httpAddress);
String agentNode = internalCluster().startNode(builder);
logger.info("exporting a first event");
HttpExporter exporter = getExporter(agentNode);
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
String indexName = exporter.getIndexName();
logger.info("checks that the index [{}] is created", indexName);
assertTrue(client().admin().indices().prepareExists(indexName).get().isExists());
String newTimeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM");
logger.info("updating index time format setting to {}", newTimeFormat);
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put("marvel.agent.exporters._http.index.name.time_format", newTimeFormat)));
exporter = getExporter(agentNode);
logger.info("exporting a second event");
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
String expectedMarvelIndex = MarvelSettings.MARVEL_INDICES_PREFIX
+ DateTimeFormat.forPattern(newTimeFormat).withZoneUTC().print(System.currentTimeMillis());
logger.info("checks that the index [{}] is created", expectedMarvelIndex);
assertTrue(client().admin().indices().prepareExists(expectedMarvelIndex).get().isExists());
logger.info("verifying that template has been created");
assertMarvelTemplateExists();
}
@Test
public void testLoadRemoteClusterVersion() {
TargetNode targetNode = TargetNode.start(internalCluster());
Settings.Builder builder = Settings.builder()
.put(MarvelSettings.STARTUP_DELAY, "200m")
.put("marvel.agent.exporters._http.type", "http")
.put("marvel.agent.exporters._http.host", targetNode.httpAddress);
String agentNode = internalCluster().startNode(builder);
HttpExporter exporter = getExporter(agentNode);
logger.info("--> loading remote cluster version");
Version resolved = exporter.loadRemoteClusterVersion(targetNode.httpAddress);
assertTrue(resolved.equals(Version.CURRENT));
}
private HttpExporter getExporter(String nodeName) {
Exporters exporters = internalCluster().getInstance(Exporters.class, nodeName);
return (HttpExporter) exporters.iterator().next();
}
private MarvelDoc newRandomMarvelDoc() {
if (randomBoolean()) {
return new IndexRecoveryMarvelDoc(internalCluster().getClusterName(),
IndexRecoveryCollector.TYPE, timeStampGenerator.incrementAndGet(), new RecoveryResponse());
} else {
return new ClusterStateMarvelDoc(internalCluster().getClusterName(),
ClusterStateCollector.TYPE, timeStampGenerator.incrementAndGet(), ClusterState.PROTO, ClusterHealthStatus.GREEN);
}
}
private void assertMarvelTemplateExists() {
assertTrue("marvel template must exists", isTemplateExists("marvel"));
}
private void assertMarvelTemplateNotExists() {
assertFalse("marvel template must not exists", isTemplateExists("marvel"));
}
private boolean isTemplateExists(String templateName) {
for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(templateName).get().getIndexTemplates()) {
if (template.getName().equals(templateName)) {
return true;
}
}
return false;
}
static class TargetNode {
private final String name;
private final TransportAddress address;
private final String httpAddress;
private final Client client;
private TargetNode(InternalTestCluster cluster) {
name = cluster.startNode();
address = cluster.getInstance(HttpServerTransport.class, name).boundAddress().publishAddress();
httpAddress = address.getHost() + ":" + address.getPort();
this.client = cluster.client(name);
}
static TargetNode start(InternalTestCluster cluster) {
return new TargetNode(cluster);
}
}
}

View File

@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter.http;
import org.elasticsearch.Version;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.CoreMatchers.equalTo;
public class HttpExporterUtilsTests extends ESTestCase {
@Test
public void testLoadTemplate() {
byte[] template = HttpExporterUtils.loadDefaultTemplate();
assertNotNull(template);
assertThat(template.length, Matchers.greaterThan(0));
}
@Test
public void testParseTemplateVersionFromByteArrayTemplate() throws IOException {
byte[] template = HttpExporterUtils.loadDefaultTemplate();
assertNotNull(template);
Version version = HttpExporterUtils.parseTemplateVersion(template);
assertNotNull(version);
}
@Test
public void testParseTemplateVersionFromStringTemplate() throws IOException {
List<String> templates = new ArrayList<>();
templates.add("{\"marvel_version\": \"1.4.0.Beta1\"}");
templates.add("{\"marvel_version\": \"1.6.2-SNAPSHOT\"}");
templates.add("{\"marvel_version\": \"1.7.1\"}");
templates.add("{\"marvel_version\": \"2.0.0-beta1\"}");
templates.add("{\"marvel_version\": \"2.0.0\"}");
templates.add("{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }");
for (String template : templates) {
Version version = HttpExporterUtils.parseTemplateVersion(template);
assertNotNull(version);
}
Version version = HttpExporterUtils.parseTemplateVersion("{\"marvel.index_format\": \"7\"}");
assertNull(version);
}
@Test
public void testParseVersion() throws IOException {
assertNotNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0-beta1\"}"));
assertNotNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0\"}"));
assertNotNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"1.5.2\"}"));
assertNotNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD, "{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }"));
assertNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel.index_format\": \"7\"}"));
assertNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD + "unkown", "{\"marvel_version\": \"1.5.2\"}"));
}
@Test
public void testHostParsing() throws MalformedURLException, URISyntaxException {
URL url = HttpExporterUtils.parseHostWithPath("localhost:9200", "");
verifyUrl(url, "http", "localhost", 9200, "/");
url = HttpExporterUtils.parseHostWithPath("localhost", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/_bulk");
url = HttpExporterUtils.parseHostWithPath("http://localhost:9200", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/_bulk");
url = HttpExporterUtils.parseHostWithPath("http://localhost", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/_bulk");
url = HttpExporterUtils.parseHostWithPath("https://localhost:9200", "_bulk");
verifyUrl(url, "https", "localhost", 9200, "/_bulk");
url = HttpExporterUtils.parseHostWithPath("https://boaz-air.local:9200", "_bulk");
verifyUrl(url, "https", "boaz-air.local", 9200, "/_bulk");
url = HttpExporterUtils.parseHostWithPath("localhost:9200/suburl", "");
verifyUrl(url, "http", "localhost", 9200, "/suburl/");
url = HttpExporterUtils.parseHostWithPath("localhost/suburl", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/suburl/_bulk");
url = HttpExporterUtils.parseHostWithPath("http://localhost:9200/suburl/suburl1", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/suburl/suburl1/_bulk");
url = HttpExporterUtils.parseHostWithPath("https://localhost:9200/suburl", "_bulk");
verifyUrl(url, "https", "localhost", 9200, "/suburl/_bulk");
url = HttpExporterUtils.parseHostWithPath("https://server_with_underscore:9300", "_bulk");
verifyUrl(url, "https", "server_with_underscore", 9300, "/_bulk");
url = HttpExporterUtils.parseHostWithPath("server_with_underscore:9300", "_bulk");
verifyUrl(url, "http", "server_with_underscore", 9300, "/_bulk");
url = HttpExporterUtils.parseHostWithPath("server_with_underscore", "_bulk");
verifyUrl(url, "http", "server_with_underscore", 9200, "/_bulk");
url = HttpExporterUtils.parseHostWithPath("https://server-dash:9300", "_bulk");
verifyUrl(url, "https", "server-dash", 9300, "/_bulk");
url = HttpExporterUtils.parseHostWithPath("server-dash:9300", "_bulk");
verifyUrl(url, "http", "server-dash", 9300, "/_bulk");
url = HttpExporterUtils.parseHostWithPath("server-dash", "_bulk");
verifyUrl(url, "http", "server-dash", 9200, "/_bulk");
}
void verifyUrl(URL url, String protocol, String host, int port, String path) throws URISyntaxException {
assertThat(url.getProtocol(), equalTo(protocol));
assertThat(url.getHost(), equalTo(host));
assertThat(url.getPort(), equalTo(port));
assertThat(url.toURI().getPath(), equalTo(path));
}
}

View File

@ -9,7 +9,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector;
import org.elasticsearch.marvel.agent.exporter.HttpESExporterUtils;
import org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils;
import org.elasticsearch.marvel.agent.renderer.AbstractRendererTestCase;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.search.SearchHit;
@ -60,7 +60,7 @@ public class ClusterStateIT extends AbstractRendererTestCase {
@Test
public void testNoNodesIndexing() throws Exception {
logger.debug("--> forcing marvel's index template update");
assertAcked(client().admin().indices().preparePutTemplate("marvel").setSource(HttpESExporterUtils.loadDefaultTemplate()).execute().actionGet());
assertAcked(client().admin().indices().preparePutTemplate("marvel").setSource(HttpExporterUtils.loadDefaultTemplate()).execute().actionGet());
logger.debug("--> deleting all marvel indices");
cluster().wipeIndices(MarvelSettings.MARVEL_INDICES_PREFIX + "*");