Cleaned up local exporter

- remove state - the existing of the internal `bulk` is enough to determine the state of the exporter. When the exporter is started, the bulk is instantiated.
- simplified & structured the startup of the exporter - now it listens to cluster state events and acts on those, rather than fetching & checking the cluster state on demand.
- the master is responsible for "putting" the marvel template... everything else will just wait for the right template to appear in the cluster state.
- started to fix the local exporter tests

Original commit: elastic/x-pack-elasticsearch@7fae23d166
This commit is contained in:
uboness 2015-09-25 02:18:51 +02:00
parent 899f359946
commit 8fa83b9109
12 changed files with 262 additions and 204 deletions

View File

@ -172,13 +172,16 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> imple
if (bulk == null) { // exporters are either not ready or faulty if (bulk == null) { // exporters are either not ready or faulty
continue; continue;
} }
// long start = System.nanoTime(); //TODO remove
try { try {
for (Collector collector : collectors) { for (Collector collector : collectors) {
logger.trace("collecting [{}]", collector.name());
if (collecting) { if (collecting) {
Collection<MarvelDoc> docs = collector.collect(); Collection<MarvelDoc> docs = collector.collect();
if (docs != null) { if (docs != null) {
logger.trace("bulk [{}] - adding collected docs from [{}] collector", bulk, collector.name());
bulk.add(docs); bulk.add(docs);
} else {
logger.trace("bulk [{}] - skipping collected docs from [{}] collector", bulk, collector.name());
} }
} }
if (closed) { if (closed) {
@ -187,13 +190,16 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> imple
} }
} }
} finally { } finally {
// long delta = System.nanoTime() - start; TODO remove
// logger.trace("closing bulk [{}] - collection took [{}] seconds", bulk, TimeValue.timeValueNanos(delta).format(PeriodType.seconds()));
bulk.close(!closed && collecting); bulk.close(!closed && collecting);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.trace("interrupted");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} catch (Throwable t) { } catch (Throwable t) {
logger.error("Background thread had an uncaught exception:", t); logger.error("background thread had an uncaught exception", t);
} finally { } finally {
firstRun = false; firstRun = false;
} }

View File

@ -64,13 +64,14 @@ public abstract class AbstractCollector<T> extends AbstractLifecycleComponent<T>
public Collection<MarvelDoc> collect() { public Collection<MarvelDoc> collect() {
try { try {
if (canCollect()) { if (canCollect()) {
logger.trace("collector [{}] - collecting data...", name());
return doCollect(); return doCollect();
} }
logger.trace("collector [{}] can not collect data", name()); logger.trace("collector [{}] can not collect data", name());
} catch (ElasticsearchTimeoutException e) { } catch (ElasticsearchTimeoutException e) {
logger.error("collector [{}] timed out when collecting data"); logger.error("collector [{}] timed out when collecting data");
} catch (Exception e) { } catch (Exception e) {
logger.error("collector [{}] throws exception when collecting data", e, name()); logger.error("collector [{}] - failed collecting data", e, name());
} }
return null; return null;
} }

View File

@ -15,7 +15,7 @@ import java.util.Collection;
*/ */
public abstract class ExportBulk { public abstract class ExportBulk {
private final String name; protected final String name;
public ExportBulk(String name) { public ExportBulk(String name) {
this.name = name; this.name = name;
@ -25,6 +25,11 @@ public abstract class ExportBulk {
return add(Arrays.asList(docs)); return add(Arrays.asList(docs));
} }
@Override
public String toString() {
return name;
}
public abstract ExportBulk add(Collection<MarvelDoc> docs) throws Exception; public abstract ExportBulk add(Collection<MarvelDoc> docs) throws Exception;
public abstract void flush() throws Exception; public abstract void flush() throws Exception;

View File

@ -5,10 +5,12 @@
*/ */
package org.elasticsearch.marvel.agent.exporter; package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.shield.MarvelSettingsFilter; import org.elasticsearch.marvel.shield.MarvelSettingsFilter;
import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormat;
@ -19,18 +21,23 @@ import java.util.Collection;
public abstract class Exporter { public abstract class Exporter {
public static final String INDEX_NAME_TIME_FORMAT_SETTING = "index.name.time_format"; public static final String INDEX_NAME_TIME_FORMAT_SETTING = "index.name.time_format";
public static final String BULK_TIMEOUT_SETTING = "bulk.timeout";
public static final String DEFAULT_INDEX_NAME_TIME_FORMAT = "YYYY.MM.dd"; public static final String DEFAULT_INDEX_NAME_TIME_FORMAT = "YYYY.MM.dd";
public static final String INDEX_TEMPLATE_NAME = "marvel";
protected final String type; protected final String type;
protected final Config config; protected final Config config;
protected final ESLogger logger; protected final ESLogger logger;
protected final IndexNameResolver indexNameResolver; protected final IndexNameResolver indexNameResolver;
protected final @Nullable TimeValue bulkTimeout;
public Exporter(String type, Config config) { public Exporter(String type, Config config) {
this.type = type; this.type = type;
this.config = config; this.config = config;
this.logger = config.logger(getClass()); this.logger = config.logger(getClass());
this.indexNameResolver = new DefaultIndexNameResolver(config.settings); this.indexNameResolver = new DefaultIndexNameResolver(config.settings);
bulkTimeout = config.settings().getAsTime(BULK_TIMEOUT_SETTING, null);
} }
public String type() { public String type() {

View File

@ -58,9 +58,6 @@ public class HttpExporter extends Exporter {
// es level timeout used when checking and writing templates (used to speed up tests) // es level timeout used when checking and writing templates (used to speed up tests)
public static final String TEMPLATE_CHECK_TIMEOUT_SETTING = "index.template.master_timeout"; public static final String TEMPLATE_CHECK_TIMEOUT_SETTING = "index.template.master_timeout";
// es level timeout used for bulk indexing (used to speed up tests)
public static final String BULK_TIMEOUT_SETTING = "bulk.timeout";
public static final String SSL_SETTING = "ssl"; public static final String SSL_SETTING = "ssl";
public static final String SSL_PROTOCOL_SETTING = SSL_SETTING + ".protocol"; public static final String SSL_PROTOCOL_SETTING = SSL_SETTING + ".protocol";
public static final String SSL_TRUSTSTORE_SETTING = SSL_SETTING + ".truststore.path"; public static final String SSL_TRUSTSTORE_SETTING = SSL_SETTING + ".truststore.path";
@ -87,7 +84,6 @@ public class HttpExporter extends Exporter {
final RendererRegistry rendererRegistry; final RendererRegistry rendererRegistry;
final @Nullable TimeValue templateCheckTimeout; final @Nullable TimeValue templateCheckTimeout;
final @Nullable TimeValue bulkTimeout;
volatile boolean checkedAndUploadedIndexTemplate = false; volatile boolean checkedAndUploadedIndexTemplate = false;
volatile boolean supportedClusterVersion = false; volatile boolean supportedClusterVersion = false;
@ -121,7 +117,6 @@ public class HttpExporter extends Exporter {
// TimeValue SHOULD NOT SELECTIVELY CHOOSE WHAT FIELDS TO PARSE BASED ON THEIR NAMES!!!! // TimeValue SHOULD NOT SELECTIVELY CHOOSE WHAT FIELDS TO PARSE BASED ON THEIR NAMES!!!!
String templateCheckTimeoutValue = config.settings().get(TEMPLATE_CHECK_TIMEOUT_SETTING, null); String templateCheckTimeoutValue = config.settings().get(TEMPLATE_CHECK_TIMEOUT_SETTING, null);
templateCheckTimeout = TimeValue.parseTimeValue(templateCheckTimeoutValue, null, settingFQN(TEMPLATE_CHECK_TIMEOUT_SETTING)); templateCheckTimeout = TimeValue.parseTimeValue(templateCheckTimeoutValue, null, settingFQN(TEMPLATE_CHECK_TIMEOUT_SETTING));
bulkTimeout = config.settings().getAsTime(BULK_TIMEOUT_SETTING, null);
keepAliveWorker = new ConnectionKeepAliveWorker(); keepAliveWorker = new ConnectionKeepAliveWorker();

View File

@ -94,7 +94,11 @@ public class LocalBulk extends ExportBulk {
if (state.get() != State.ACTIVE || requestBuilder == null) { if (state.get() != State.ACTIVE || requestBuilder == null) {
return; return;
} }
logger.trace("exporter [{}] - exporting data...", name);
// long start = System.nanoTime(); TODO remove
BulkResponse bulkResponse = requestBuilder.get(); BulkResponse bulkResponse = requestBuilder.get();
// TimeValue time = TimeValue.timeValueNanos(System.nanoTime() - start);
// logger.trace("exporter [{}] - data exported, took [{}] seconds", name, time.format(PeriodType.seconds()));
if (bulkResponse.hasFailures()) { if (bulkResponse.hasFailures()) {
throw new ElasticsearchException(bulkResponse.buildFailureMessage()); throw new ElasticsearchException(bulkResponse.buildFailureMessage());
} }

View File

@ -6,30 +6,28 @@
package org.elasticsearch.marvel.agent.exporter.local; package org.elasticsearch.marvel.agent.exporter.local;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.marvel.agent.exporter.ExportBulk; import org.elasticsearch.marvel.agent.exporter.ExportBulk;
import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils;
import org.elasticsearch.marvel.agent.renderer.RendererRegistry; import org.elasticsearch.marvel.agent.renderer.RendererRegistry;
import org.elasticsearch.marvel.shield.SecuredClient; import org.elasticsearch.marvel.shield.SecuredClient;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.InputStream; import java.io.InputStream;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.marvel.agent.exporter.http.HttpExporter.MIN_SUPPORTED_CLUSTER_VERSION;
import static org.elasticsearch.marvel.agent.exporter.http.HttpExporter.MIN_SUPPORTED_TEMPLATE_VERSION; import static org.elasticsearch.marvel.agent.exporter.http.HttpExporter.MIN_SUPPORTED_TEMPLATE_VERSION;
import static org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils.MARVEL_VERSION_FIELD; import static org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils.MARVEL_VERSION_FIELD;
@ -40,24 +38,11 @@ public class LocalExporter extends Exporter {
public static final String TYPE = "local"; public static final String TYPE = "local";
public static final String INDEX_TEMPLATE_NAME = "marvel";
public static final String BULK_TIMEOUT_SETTING = "bulk.timeout";
private final Client client; private final Client client;
private final ClusterService clusterService; private final ClusterService clusterService;
private final RendererRegistry renderers; private final RendererRegistry renderers;
private final LocalBulk bulk; private volatile LocalBulk bulk;
final @Nullable TimeValue bulkTimeout;
private final AtomicReference<State> state = new AtomicReference<>();
/**
* Version of the built-in template
**/
private final Version builtInTemplateVersion;
public LocalExporter(Exporter.Config config, SecuredClient client, ClusterService clusterService, RendererRegistry renderers) { public LocalExporter(Exporter.Config config, SecuredClient client, ClusterService clusterService, RendererRegistry renderers) {
super(TYPE, config); super(TYPE, config);
@ -65,86 +50,132 @@ public class LocalExporter extends Exporter {
this.clusterService = clusterService; this.clusterService = clusterService;
this.renderers = renderers; this.renderers = renderers;
// Checks that the built-in template is versioned clusterService.add(new ClusterStateListener() {
builtInTemplateVersion = HttpExporterUtils.parseTemplateVersion(HttpExporterUtils.loadDefaultTemplate()); @Override
if (builtInTemplateVersion == null) { public void clusterChanged(ClusterChangedEvent event) {
throw new IllegalStateException("unable to find built-in template version"); bulk = start(event.state());
} }
});
bulkTimeout = config.settings().getAsTime(BULK_TIMEOUT_SETTING, null);
state.set(State.STARTING);
bulk = new LocalBulk(name(), logger, client, indexNameResolver, renderers);
} }
@Override @Override
public ExportBulk openBulk() { public ExportBulk openBulk() {
if (!canExport()) {
return null;
}
return bulk; return bulk;
} }
@Override @Override
public void close() { public void close() {
if (state.compareAndSet(State.STARTING, State.STOPPING) || state.compareAndSet(State.STARTED, State.STOPPING)) { if (bulk != null) {
try { try {
bulk.terminate(); bulk.terminate();
} catch (Exception e) { } catch (Exception e) {
logger.error("failed to cleanly close open bulk for [{}] exporter", e, name()); logger.error("failed to cleanly close open bulk for [{}] exporter", e, name());
} }
state.set(State.STOPPED);
} }
} }
ClusterState clusterState() { LocalBulk start(ClusterState clusterState) {
return client.admin().cluster().prepareState().get().getState(); if (bulk != null) {
return bulk;
} }
Version clusterVersion() { if (clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
return Version.CURRENT; // wait until the gateway has recovered from disk, otherwise we think may not have .marvel-es-
// indices but they may not have been restored from the cluster state on disk
logger.debug("exporter [{}] waiting until gateway has recovered from disk", name());
return null;
} }
Version templateVersion() { IndexTemplateMetaData installedTemplate = clusterState.getMetaData().getTemplates().get(INDEX_TEMPLATE_NAME);
for (IndexTemplateMetaData template : client.admin().indices().prepareGetTemplates(INDEX_TEMPLATE_NAME).get().getIndexTemplates()) {
if (template.getName().equals(INDEX_TEMPLATE_NAME)) { // if this is not the master, we'll just look to see if the marvel template is already
String version = template.settings().get("index." + MARVEL_VERSION_FIELD); // installed and if so, if it has a compatible version. If it is (installed and compatible)
// we'll be able to start this exporter. Otherwise, we'll just wait for a new cluster state.
if (!clusterService.localNode().masterNode()) {
if (installedTemplate == null) {
// the marvel template is not yet installed in the given cluster state, we'll wait.
logger.debug("marvel index template [{}] does not exist, so service cannot start", INDEX_TEMPLATE_NAME);
return null;
}
Version installedTemplateVersion = templateVersion(installedTemplate);
if (!installedTemplateVersionIsSufficient(Version.CURRENT, installedTemplateVersion)) {
logger.debug("exporter cannot start. the currently installed marvel template (version [{}]) is incompatible with the " +
"current elasticsearch version [{}]. waiting until the template is updated", installedTemplateVersion, Version.CURRENT);
}
// ok.. we have a compatible template... we can start
logger.debug("marvel [{}] exporter started!", name());
return new LocalBulk(name(), logger, client, indexNameResolver, renderers);
}
// we are on master
//
// if we cannot find a template or a compatible template, we'll install one in / update it.
if (installedTemplate == null) {
putTemplate(config.settings().getAsSettings("template.settings"));
// we'll get that template on the next cluster state update
return null;
}
Version installedTemplateVersion = templateVersion(installedTemplate);
if (installedTemplateVersionMandatesAnUpdate(Version.CURRENT, installedTemplateVersion)) {
logger.debug("installing new marvel template [{}], replacing [{}]", Version.CURRENT, installedTemplateVersion);
putTemplate(config.settings().getAsSettings("template.settings"));
// we'll get that template on the next cluster state update
return null;
}
// ok.. we have a compatible template... we can start
logger.debug("marvel [{}] exporter started!", name());
return new LocalBulk(name(), logger, client, indexNameResolver, renderers);
}
static Version templateVersion(IndexTemplateMetaData templateMetaData) {
String version = templateMetaData.settings().get("index." + MARVEL_VERSION_FIELD);
if (Strings.hasLength(version)) { if (Strings.hasLength(version)) {
return Version.fromString(version); return Version.fromString(version);
} }
}
}
return null; return null;
} }
boolean shouldUpdateTemplate(Version current, Version expected) { boolean installedTemplateVersionIsSufficient(Version current, Version installed) {
// Always update a template even if its version is not found if (installed == null) {
if (current == null) {
return true;
}
// Never update a template in an unknown version
if (expected == null) {
return false; return false;
} }
if (installed.before(MIN_SUPPORTED_TEMPLATE_VERSION)) {
return false;
}
if (current.after(installed)) {
return true;
}
if (current.equals(installed)) {
return current.snapshot();
}
return false;
}
boolean installedTemplateVersionMandatesAnUpdate(Version current, Version installed) {
if (installed == null) {
return true;
}
// Never update a very old template // Never update a very old template
if (current.before(MIN_SUPPORTED_TEMPLATE_VERSION)) { if (installed.before(MIN_SUPPORTED_TEMPLATE_VERSION)) {
logger.error("marvel template version [{}] is below the minimum compatible version [{}]. " logger.error("marvel template version [{}] is below the minimum compatible version [{}]. "
+ "please manually update the marvel template to a more recent version" + "please manually update the marvel template to a more recent version"
+ "and delete the current active marvel index (don't forget to back up it first if needed)", + "and delete the current active marvel index (don't forget to back up it first if needed)",
current, MIN_SUPPORTED_TEMPLATE_VERSION); installed, MIN_SUPPORTED_TEMPLATE_VERSION);
return false; return false;
} }
// Always update a template to the last up-to-date version // Always update a template to the last up-to-date version
if (expected.after(current)) { if (current.after(installed)) {
logger.info("marvel template version will be updated to a newer version [current:{}, expected:{}]", current, expected); logger.debug("the installed marvel template version [{}] will be updated to a newer version [{}]", installed, current);
return true; return true;
// When the template is up-to-date, force an update for snapshot versions only // When the template is up-to-date, force an update for snapshot versions only
} else if (expected.equals(current)) { } else if (current.equals(installed)) {
logger.debug("marvel template version is up-to-date [current:{}, expected:{}]", current, expected); logger.debug("the installed marvel template version [{}] is up-to-date", installed);
return expected.snapshot(); return installed.snapshot() && !current.snapshot();
// Never update a template that is newer than the expected one // Never update a template that is newer than the expected one
} else { } else {
logger.debug("marvel template version is newer than the one required by the marvel agent [current:{}, expected:{}]", current, expected); logger.debug("the installed marvel template version [{}] is newer than the one required [{}]... keeping it.", installed, current);
return false; return false;
} }
} }
@ -165,69 +196,26 @@ public class LocalExporter extends Exporter {
assert !Thread.currentThread().isInterrupted() : "current thread has been interrupted before putting index template!!!"; assert !Thread.currentThread().isInterrupted() : "current thread has been interrupted before putting index template!!!";
PutIndexTemplateResponse response = client.admin().indices().putTemplate(request).actionGet(); // async call, so we won't block cluster event thread
client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
@Override
public void onResponse(PutIndexTemplateResponse response) {
if (!response.isAcknowledged()) { if (!response.isAcknowledged()) {
throw new IllegalStateException("failed to put marvel index template"); logger.error("failed to update marvel index template");
} }
}
@Override
public void onFailure(Throwable throwable) {
logger.error("failed to update marvel index template", throwable);
}
});
} catch (Exception e) { } catch (Exception e) {
throw new IllegalStateException("failed to update marvel index template", e); throw new IllegalStateException("failed to update marvel index template", e);
} }
} }
boolean canExport() {
if (state.get() == State.STARTED) {
return true;
}
if (state.get() != State.STARTING) {
return false;
}
ClusterState clusterState = clusterState();
if (clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have .marvel-es-
// indices but they may not have been restored from the cluster state on disk
logger.debug("exporter [{}] waiting until gateway has recovered from disk", name());
return false;
}
Version clusterVersion = clusterVersion();
if ((clusterVersion == null) || clusterVersion.before(MIN_SUPPORTED_CLUSTER_VERSION)) {
logger.error("cluster version [" + clusterVersion + "] is not supported, please use a cluster with minimum version [" + MIN_SUPPORTED_CLUSTER_VERSION + "]");
state.set(State.FAILED);
return false;
}
Version templateVersion = templateVersion();
if (!clusterService.state().nodes().localNodeMaster()) {
if (templateVersion == null) {
logger.debug("marvel index template [{}] does not exist, so service cannot start", INDEX_TEMPLATE_NAME);
return false;
}
// TODO why do we need this check? the marvel indices are anyway auto-created
// String indexName = indexNameResolver.resolve(System.currentTimeMillis());
// if (!clusterState.routingTable().index(indexName).allPrimaryShardsActive()) {
// logger.debug("marvel index [{}] has some primary shards not yet started, so service cannot start", indexName);
// return false;
// }
}
//TODO this is erroneous
// the check may figure out that the existing version is too old and therefore
// it can't and won't update the template (prompting the user to delete the template).
// In this case, we shouldn't export data. But we do.. the "shouldUpdate" method
// needs to be "boolean ensureCompatibleTemplate". The boolean returned indicates whether
// the template is valid (either was valid or was updated to a valid one) or not. If
// not, the state of this exporter should not be set to STARTED.
if (shouldUpdateTemplate(templateVersion, builtInTemplateVersion)) {
putTemplate(config.settings().getAsSettings("template.settings"));
}
logger.debug("exporter [{}] can now export marvel data", name());
state.set(State.STARTED);
return true;
}
public enum State { public enum State {
STARTING, STARTING,
STARTED, STARTED,

View File

@ -1,5 +1,5 @@
{ {
"template": ".marvel*", "template": ".marvel-es-*",
"settings": { "settings": {
"marvel_version": "${project.version}", "marvel_version": "${project.version}",
"index.number_of_shards": 1, "index.number_of_shards": 1,

View File

@ -124,7 +124,7 @@ public class HttpExporterTests extends MarvelIntegTestCase {
exporter.export(Collections.singletonList(newRandomMarvelDoc())); exporter.export(Collections.singletonList(newRandomMarvelDoc()));
logger.info("verifying that template has been created"); logger.info("verifying that template has been created");
assertMarvelTemplateExists(); assertMarvelTemplateInstalled();
} }
@Test @Test
@ -172,7 +172,7 @@ public class HttpExporterTests extends MarvelIntegTestCase {
logger.info("removing the marvel template"); logger.info("removing the marvel template");
assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get()); assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get());
assertMarvelTemplateNotExists(); assertMarvelTemplateMissing();
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings( assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(
Settings.builder().putArray("marvel.agent.exporters._http.host", exporter.hosts)).get()); Settings.builder().putArray("marvel.agent.exporters._http.host", exporter.hosts)).get());
@ -184,7 +184,7 @@ public class HttpExporterTests extends MarvelIntegTestCase {
exporter.export(Collections.singletonList(newRandomMarvelDoc())); exporter.export(Collections.singletonList(newRandomMarvelDoc()));
logger.info("verifying that template has been created"); logger.info("verifying that template has been created");
assertMarvelTemplateExists(); assertMarvelTemplateInstalled();
} }
@Test @Test
@ -207,11 +207,11 @@ public class HttpExporterTests extends MarvelIntegTestCase {
exporter.export(Collections.singletonList(newRandomMarvelDoc())); exporter.export(Collections.singletonList(newRandomMarvelDoc()));
logger.info("verifying that template has been created"); logger.info("verifying that template has been created");
assertMarvelTemplateExists(); assertMarvelTemplateInstalled();
logger.info("--> removing the marvel template"); logger.info("--> removing the marvel template");
assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get()); assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get());
assertMarvelTemplateNotExists(); assertMarvelTemplateMissing();
logger.info("--> shutting down target0"); logger.info("--> shutting down target0");
assertThat(target0.name, is(internalCluster().getMasterName())); // just to be sure it's still the master assertThat(target0.name, is(internalCluster().getMasterName())); // just to be sure it's still the master
@ -229,7 +229,7 @@ public class HttpExporterTests extends MarvelIntegTestCase {
fail("failed to export event from node0"); fail("failed to export event from node0");
} }
logger.debug("--> checking for template"); logger.debug("--> checking for template");
assertMarvelTemplateExists(); assertMarvelTemplateInstalled();
logger.debug("--> template exists"); logger.debug("--> template exists");
} }
}, 30, TimeUnit.SECONDS); }, 30, TimeUnit.SECONDS);
@ -274,7 +274,7 @@ public class HttpExporterTests extends MarvelIntegTestCase {
assertTrue(client().admin().indices().prepareExists(expectedMarvelIndex).get().isExists()); assertTrue(client().admin().indices().prepareExists(expectedMarvelIndex).get().isExists());
logger.info("verifying that template has been created"); logger.info("verifying that template has been created");
assertMarvelTemplateExists(); assertMarvelTemplateInstalled();
} }
@Test @Test

View File

@ -8,8 +8,11 @@ package org.elasticsearch.marvel.agent.exporter.local;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMarvelDoc; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMarvelDoc;
@ -23,6 +26,8 @@ import org.elasticsearch.marvel.test.MarvelIntegTestCase;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormat;
import org.junit.Test; import org.junit.Test;
@ -30,6 +35,8 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.marvel.agent.exporter.http.HttpExporter.MIN_SUPPORTED_TEMPLATE_VERSION; import static org.elasticsearch.marvel.agent.exporter.http.HttpExporter.MIN_SUPPORTED_TEMPLATE_VERSION;
@ -37,7 +44,7 @@ import static org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils.MAR
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0) @ClusterScope(scope = Scope.SUITE, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
public class LocalExporterTests extends MarvelIntegTestCase { public class LocalExporterTests extends MarvelIntegTestCase {
private final static AtomicLong timeStampGenerator = new AtomicLong(); private final static AtomicLong timeStampGenerator = new AtomicLong();
@ -46,7 +53,7 @@ public class LocalExporterTests extends MarvelIntegTestCase {
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder() return Settings.builder()
.put(super.nodeSettings(nodeOrdinal)) .put(super.nodeSettings(nodeOrdinal))
.put(MarvelSettings.STARTUP_DELAY, "1h") // .put(MarvelSettings.STARTUP_DELAY, "1h")
.build(); .build();
} }
@ -91,16 +98,14 @@ public class LocalExporterTests extends MarvelIntegTestCase {
ensureGreen(); ensureGreen();
LocalExporter exporter = getLocalExporter("_local"); LocalExporter exporter = getLocalExporter("_local");
assertTrue(exporter.shouldUpdateTemplate(null, Version.CURRENT)); assertTrue(exporter.installedTemplateVersionMandatesAnUpdate(Version.CURRENT, null));
assertMarvelTemplateNotExists(); // lets wait until the marvel template will be installed
awaitMarvelTemplateInstalled();
logger.debug("--> exporting when the marvel template does not exists: template should be created"); awaitMarvelDocsCount(greaterThan(0L));
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
awaitMarvelDocsCount(is(1L));
assertMarvelTemplateExists();
assertThat(exporter.templateVersion(), equalTo(Version.CURRENT)); assertThat(getCurrentlyInstalledTemplateVersion(), is(Version.CURRENT));
} }
@Test @Test
@ -112,50 +117,58 @@ public class LocalExporterTests extends MarvelIntegTestCase {
LocalExporter exporter = getLocalExporter("_local"); LocalExporter exporter = getLocalExporter("_local");
Version fakeVersion = MIN_SUPPORTED_TEMPLATE_VERSION; Version fakeVersion = MIN_SUPPORTED_TEMPLATE_VERSION;
assertTrue(exporter.shouldUpdateTemplate(fakeVersion, Version.CURRENT)); assertThat(exporter.installedTemplateVersionMandatesAnUpdate(Version.CURRENT, fakeVersion), is(true));
logger.debug("--> creating the marvel template with a fake version [{}]", fakeVersion); // first, lets wait for the marvel template to be installed
awaitMarvelTemplateInstalled();
// now lets update the template with an old one and then restart the cluster
exporter.putTemplate(Settings.builder().put(MARVEL_VERSION_FIELD, fakeVersion.toString()).build()); exporter.putTemplate(Settings.builder().put(MARVEL_VERSION_FIELD, fakeVersion.toString()).build());
assertMarvelTemplateExists(); logger.debug("full cluster restart");
final CountDownLatch latch = new CountDownLatch(1);
assertThat(exporter.templateVersion(), equalTo(fakeVersion)); internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
@Override
logger.debug("--> exporting when the marvel template must be updated: document is exported and the template is updated"); public void doAfterNodes(int n, Client client) throws Exception {
exporter.export(Collections.singletonList(newRandomMarvelDoc())); latch.countDown();
awaitMarvelDocsCount(is(1L)); }
assertMarvelTemplateExists(); });
if (!latch.await(30, TimeUnit.SECONDS)) {
assertThat(exporter.templateVersion(), equalTo(Version.CURRENT)); fail("waited too long (at least 30 seconds) for the cluster to restart");
} }
@Test @AwaitsFix(bugUrl = "LocalExporter#210") // now that the cluster is restarting, lets wait for the new template version to be installed
public void testUnsupportedTemplateVersion() throws Exception { awaitMarvelTemplateInstalled(Version.CURRENT);
internalCluster().startNode(Settings.builder()
.put("marvel.agent.exporters._local.type", LocalExporter.TYPE)
.build());
ensureGreen();
LocalExporter exporter = getLocalExporter("_local");
Version fakeVersion = randomFrom(Version.V_0_18_0, Version.V_1_0_0, Version.V_1_4_0);
assertFalse(exporter.shouldUpdateTemplate(fakeVersion, Version.CURRENT));
logger.debug("--> creating the marvel template with a fake version [{}]", fakeVersion);
exporter.putTemplate(Settings.builder().put(MARVEL_VERSION_FIELD, fakeVersion.toString()).build());
assertMarvelTemplateExists();
assertThat(exporter.templateVersion(), equalTo(fakeVersion));
logger.debug("--> exporting when the marvel template is tool old: no document is exported and the template is not updated");
awaitMarvelDocsCount(is(0L));
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
awaitMarvelDocsCount(is(0L));
assertMarvelTemplateExists();
assertThat(exporter.templateVersion(), equalTo(fakeVersion));
} }
@Test //TODO needs a rewrite, the `start(ClusterState)` should be unit tested
// @Test @AwaitsFix(bugUrl = "LocalExporter#210")
// public void testUnsupportedTemplateVersion() throws Exception {
// internalCluster().startNode(Settings.builder()
// .put("marvel.agent.exporters._local.type", LocalExporter.TYPE)
// .build());
// ensureGreen();
//
// LocalExporter exporter = getLocalExporter("_local");
//
// Version fakeVersion = randomFrom(Version.V_0_18_0, Version.V_1_0_0, Version.V_1_4_0);
// assertFalse(exporter.shouldUpdateTemplate(fakeVersion, Version.CURRENT));
//
// logger.debug("--> creating the marvel template with a fake version [{}]", fakeVersion);
// exporter.putTemplate(Settings.builder().put(MARVEL_VERSION_FIELD, fakeVersion.toString()).build());
// assertMarvelTemplateInstalled();
//
// assertThat(exporter.templateVersion(), equalTo(fakeVersion));
//
// logger.debug("--> exporting when the marvel template is tool old: no document is exported and the template is not updated");
// awaitMarvelDocsCount(is(0L));
// exporter.export(Collections.singletonList(newRandomMarvelDoc()));
// awaitMarvelDocsCount(is(0L));
// assertMarvelTemplateInstalled();
//
// assertThat(exporter.templateVersion(), equalTo(fakeVersion));
// }
@Test @TestLogging("marvel.agent:debug")
public void testIndexTimestampFormat() throws Exception { public void testIndexTimestampFormat() throws Exception {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
final String timeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM"); final String timeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM");
@ -178,7 +191,7 @@ public class LocalExporterTests extends MarvelIntegTestCase {
expectedIndexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(doc.timestamp()); expectedIndexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(doc.timestamp());
logger.debug("--> check that the index [{}] has the correct timestamp [{}]", timeFormat, expectedIndexName); logger.debug("--> check that the index [{}] has the correct timestamp [{}]", timeFormat, expectedIndexName);
assertTrue(client().admin().indices().prepareExists(expectedIndexName).get().isExists()); assertThat(client().admin().indices().prepareExists(expectedIndexName).get().isExists(), is(true));
logger.debug("--> updates the timestamp"); logger.debug("--> updates the timestamp");
final String newTimeFormat = randomFrom("dd", "dd.MM.YYYY", "dd.MM"); final String newTimeFormat = randomFrom("dd", "dd.MM.YYYY", "dd.MM");
@ -214,4 +227,43 @@ public class LocalExporterTests extends MarvelIntegTestCase {
} }
} }
private void awaitMarvelTemplateInstalled() throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
assertMarvelTemplateInstalled();
}
}, 30, TimeUnit.SECONDS);
}
private void awaitMarvelTemplateInstalled(Version version) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
assertMarvelTemplateInstalled(version);
}
}, 30, TimeUnit.SECONDS);
}
protected void assertMarvelTemplateInstalled(Version version) {
for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(Exporter.INDEX_TEMPLATE_NAME).get().getIndexTemplates()) {
if (template.getName().equals(Exporter.INDEX_TEMPLATE_NAME)) {
Version templateVersion = LocalExporter.templateVersion(template);
if (templateVersion != null && templateVersion.id == version.id) {
return;
}
fail("did not find marvel template with expected version [" + version + "]. found version [" + templateVersion + "]");
}
}
fail("marvel template could not be found");
}
private Version getCurrentlyInstalledTemplateVersion() {
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(Exporter.INDEX_TEMPLATE_NAME).get();
assertThat(response, notNullValue());
assertThat(response.getIndexTemplates(), notNullValue());
assertThat(response.getIndexTemplates(), hasSize(1));
assertThat(response.getIndexTemplates().get(0), notNullValue());
return LocalExporter.templateVersion(response.getIndexTemplates().get(0));
}
} }

View File

@ -11,7 +11,6 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector;
import org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils; import org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils;
import org.elasticsearch.marvel.agent.renderer.AbstractRendererTestCase; import org.elasticsearch.marvel.agent.renderer.AbstractRendererTestCase;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.junit.Test; import org.junit.Test;
@ -67,7 +66,7 @@ public class ClusterStateIT extends AbstractRendererTestCase {
deleteMarvelIndices(); deleteMarvelIndices();
logger.debug("--> checking for template existence"); logger.debug("--> checking for template existence");
assertMarvelTemplateExists(); assertMarvelTemplateInstalled();
awaitMarvelDocsCount(greaterThan(0L), ClusterStateCollector.TYPE); awaitMarvelDocsCount(greaterThan(0L), ClusterStateCollector.TYPE);
logger.debug("--> searching for marvel documents of type [{}]", ClusterStateCollector.TYPE); logger.debug("--> searching for marvel documents of type [{}]", ClusterStateCollector.TYPE);

View File

@ -14,7 +14,6 @@ import org.elasticsearch.index.cache.IndexCacheModule;
import org.elasticsearch.license.plugin.LicensePlugin; import org.elasticsearch.license.plugin.LicensePlugin;
import org.elasticsearch.marvel.MarvelPlugin; import org.elasticsearch.marvel.MarvelPlugin;
import org.elasticsearch.marvel.agent.AgentService; import org.elasticsearch.marvel.agent.AgentService;
import org.elasticsearch.marvel.agent.exporter.local.LocalExporter;
import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.shield.ShieldPlugin; import org.elasticsearch.shield.ShieldPlugin;
@ -35,7 +34,9 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.marvel.agent.exporter.Exporter.INDEX_TEMPLATE_NAME;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
/** /**
@ -134,7 +135,7 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
public void run() { public void run() {
assertMarvelDocsCount(matcher, types); assertMarvelDocsCount(matcher, types);
} }
}); }, 30, TimeUnit.SECONDS);
} }
protected void assertMarvelDocsCount(Matcher<Long> matcher, String... types) { protected void assertMarvelDocsCount(Matcher<Long> matcher, String... types) {
@ -151,21 +152,21 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
} }
} }
protected void assertMarvelTemplateExists() { protected void assertMarvelTemplateInstalled() {
assertTrue("marvel template shouldn't exists", isTemplateExists(LocalExporter.INDEX_TEMPLATE_NAME)); for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(INDEX_TEMPLATE_NAME).get().getIndexTemplates()) {
if (template.getName().equals(INDEX_TEMPLATE_NAME)) {
return;
}
}
fail("marvel template shouldn't exists");
} }
protected void assertMarvelTemplateNotExists() { protected void assertMarvelTemplateMissing() {
assertFalse("marvel template should exists", isTemplateExists(LocalExporter.INDEX_TEMPLATE_NAME)); for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(INDEX_TEMPLATE_NAME).get().getIndexTemplates()) {
} if (template.getName().equals(INDEX_TEMPLATE_NAME)) {
fail("marvel template should exists");
private boolean isTemplateExists(String templateName) {
for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(templateName).get().getIndexTemplates()) {
if (template.getName().equals(templateName)) {
return true;
} }
} }
return false;
} }
protected void securedRefresh() { protected void securedRefresh() {
@ -228,12 +229,12 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
" '*': all\n" + " '*': all\n" +
"\n" + "\n" +
"admin:\n" + "admin:\n" +
" cluster: manage_watcher, cluster:monitor/nodes/info, cluster:monitor/nodes/liveness\n" + " cluster: cluster:monitor/nodes/info, cluster:monitor/nodes/liveness\n" +
"transport_client:\n" + "transport_client:\n" +
" cluster: cluster:monitor/nodes/info, cluster:monitor/nodes/liveness\n" + " cluster: cluster:monitor/nodes/info, cluster:monitor/nodes/liveness\n" +
"\n" + "\n" +
"monitor:\n" + "monitor:\n" +
" cluster: monitor_watcher, cluster:monitor/nodes/info, cluster:monitor/nodes/liveness\n" " cluster: cluster:monitor/nodes/info, cluster:monitor/nodes/liveness\n"
; ;