fixed ClusterStatsIT test

- `ClusterStatsIT` is now `ClusterStatsTests` (moving it to maven `test` phase)
- Simplified and restructured the test
- changed the `LocalExporter` to handle the case where the marvel template was deleted. Now every time the cluster state is updated, it'll make sure the template still exists and if not put it back.
- moved all the marvel template logic to a centrailzed place (`MarvelTemplateUtils`)
- moved all es/marvel version logic to a cernralized place (`VersionUtils`)
- now the `MarvelIntegTestCase` doesn't allow the marvel template to be deleted by the test infra
- improved logging output

Original commit: elastic/x-pack-elasticsearch@502532ddad
This commit is contained in:
uboness 2015-09-27 14:43:29 +02:00
parent 9e9b835213
commit 6ad26f1248
24 changed files with 404 additions and 303 deletions

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.marvel.agent; package org.elasticsearch.marvel.agent;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.regex.Regex;
@ -172,13 +173,15 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> imple
if (bulk == null) { // exporters are either not ready or faulty if (bulk == null) { // exporters are either not ready or faulty
continue; continue;
} }
// long start = System.nanoTime(); //TODO remove
try { try {
if (logger.isTraceEnabled()) {
logger.trace("collecting data - collectors [{}]", Strings.collectionToCommaDelimitedString(collectors));
}
for (Collector collector : collectors) { for (Collector collector : collectors) {
if (collecting) { if (collecting) {
Collection<MarvelDoc> docs = collector.collect(); Collection<MarvelDoc> docs = collector.collect();
if (docs != null) { if (docs != null) {
logger.trace("bulk [{}] - adding collected docs from [{}] collector", bulk, collector.name()); logger.trace("bulk [{}] - adding [{}] collected docs from [{}] collector", bulk, docs.size(), collector.name());
bulk.add(docs); bulk.add(docs);
} else { } else {
logger.trace("bulk [{}] - skipping collected docs from [{}] collector", bulk, collector.name()); logger.trace("bulk [{}] - skipping collected docs from [{}] collector", bulk, collector.name());
@ -190,8 +193,6 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> imple
} }
} }
} finally { } finally {
// long delta = System.nanoTime() - start; TODO remove
// logger.trace("closing bulk [{}] - collection took [{}] seconds", bulk, TimeValue.timeValueNanos(delta).format(PeriodType.seconds()));
bulk.close(!closed && collecting); bulk.close(!closed && collecting);
} }

View File

@ -39,6 +39,11 @@ public abstract class AbstractCollector<T> extends AbstractLifecycleComponent<T>
return name; return name;
} }
@Override
public String toString() {
return name;
}
@Override @Override
public T start() { public T start() {
logger.debug("starting collector [{}]", name()); logger.debug("starting collector [{}]", name());
@ -52,8 +57,12 @@ public abstract class AbstractCollector<T> extends AbstractLifecycleComponent<T>
/** /**
* Indicates if the current collector is allowed to collect data * Indicates if the current collector is allowed to collect data
*/ */
protected boolean canCollect() { protected boolean shouldCollect() {
return licenseService.enabled() || licenseService.inExpirationGracePeriod(); boolean validLicense = licenseService.enabled() || licenseService.inExpirationGracePeriod();
if (!validLicense) {
logger.trace("collector [{}] can not collect data due to invalid license", name());
}
return validLicense;
} }
protected boolean isLocalNodeMaster() { protected boolean isLocalNodeMaster() {
@ -63,11 +72,10 @@ public abstract class AbstractCollector<T> extends AbstractLifecycleComponent<T>
@Override @Override
public Collection<MarvelDoc> collect() { public Collection<MarvelDoc> collect() {
try { try {
if (canCollect()) { if (shouldCollect()) {
logger.trace("collector [{}] - collecting data...", name()); logger.trace("collector [{}] - collecting data...", name());
return doCollect(); return doCollect();
} }
logger.trace("collector [{}] can not collect data", name());
} catch (ElasticsearchTimeoutException e) { } catch (ElasticsearchTimeoutException e) {
logger.error("collector [{}] timed out when collecting data"); logger.error("collector [{}] timed out when collecting data");
} catch (Exception e) { } catch (Exception e) {

View File

@ -50,7 +50,7 @@ public class ClusterInfoCollector extends AbstractCollector<ClusterInfoMarvelDoc
} }
@Override @Override
protected boolean canCollect() { protected boolean shouldCollect() {
// This collector can always collect data on the master node // This collector can always collect data on the master node
return isLocalNodeMaster(); return isLocalNodeMaster();
} }

View File

@ -43,8 +43,8 @@ public class ClusterStateCollector extends AbstractCollector<ClusterStateCollect
} }
@Override @Override
protected boolean canCollect() { protected boolean shouldCollect() {
return super.canCollect() && isLocalNodeMaster(); return super.shouldCollect() && isLocalNodeMaster();
} }
@Override @Override

View File

@ -42,8 +42,8 @@ public class ClusterStatsCollector extends AbstractCollector<ClusterStatsCollect
} }
@Override @Override
protected boolean canCollect() { protected boolean shouldCollect() {
return super.canCollect() && isLocalNodeMaster(); return super.shouldCollect() && isLocalNodeMaster();
} }
@Override @Override

View File

@ -43,8 +43,8 @@ public class IndexRecoveryCollector extends AbstractCollector<IndexRecoveryColle
} }
@Override @Override
protected boolean canCollect() { protected boolean shouldCollect() {
return super.canCollect() && isLocalNodeMaster(); return super.shouldCollect() && isLocalNodeMaster();
} }
@Override @Override

View File

@ -44,8 +44,8 @@ public class IndexStatsCollector extends AbstractCollector<IndexStatsCollector>
} }
@Override @Override
protected boolean canCollect() { protected boolean shouldCollect() {
return super.canCollect() && isLocalNodeMaster(); return super.shouldCollect() && isLocalNodeMaster();
} }
@Override @Override

View File

@ -39,8 +39,8 @@ public class IndicesStatsCollector extends AbstractCollector<IndicesStatsCollect
} }
@Override @Override
protected boolean canCollect() { protected boolean shouldCollect() {
return super.canCollect() && isLocalNodeMaster(); return super.shouldCollect() && isLocalNodeMaster();
} }
@Override @Override

View File

@ -59,8 +59,8 @@ public class NodeStatsCollector extends AbstractCollector<NodeStatsCollector> {
} }
@Override @Override
protected boolean canCollect() { protected boolean shouldCollect() {
return super.canCollect() && nodeEnvironment.hasNodeFile(); return super.shouldCollect() && nodeEnvironment.hasNodeFile();
} }
@Override @Override

View File

@ -40,8 +40,8 @@ public class ShardsCollector extends AbstractCollector<ShardsCollector> {
} }
@Override @Override
protected boolean canCollect() { protected boolean shouldCollect() {
return super.canCollect() && isLocalNodeMaster(); return super.shouldCollect() && isLocalNodeMaster();
} }
@Override @Override

View File

@ -6,7 +6,9 @@
package org.elasticsearch.marvel.agent.exporter; package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -26,7 +28,6 @@ public abstract class Exporter {
public static final Version MIN_SUPPORTED_TEMPLATE_VERSION = Version.V_2_0_0_beta2; public static final Version MIN_SUPPORTED_TEMPLATE_VERSION = Version.V_2_0_0_beta2;
public static final String DEFAULT_INDEX_NAME_TIME_FORMAT = "YYYY.MM.dd"; public static final String DEFAULT_INDEX_NAME_TIME_FORMAT = "YYYY.MM.dd";
public static final String INDEX_TEMPLATE_NAME = "marvel";
protected final String type; protected final String type;
protected final Config config; protected final Config config;

View File

@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.marvel.support.VersionUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
/**
*
*/
public final class MarvelTemplateUtils {
public static final String MARVEL_TEMPLATE_FILE = "/marvel_index_template.json";
public static final String INDEX_TEMPLATE_NAME = ".marvel-es";
public static final String MARVEL_VERSION_FIELD = "marvel_version";
private MarvelTemplateUtils() {
}
/**
* Loads the default Marvel template
*/
public static byte[] loadDefaultTemplate() {
try (InputStream is = MarvelTemplateUtils.class.getResourceAsStream(MARVEL_TEMPLATE_FILE)) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, out);
return out.toByteArray();
} catch (IOException e) {
throw new IllegalStateException("unable to load marvel template", e);
}
}
public static Version templateVersion(IndexTemplateMetaData templateMetaData) {
String version = templateMetaData.settings().get("index." + MARVEL_VERSION_FIELD);
if (Strings.hasLength(version)) {
return Version.fromString(version);
}
return null;
}
public static IndexTemplateMetaData findMarvelTemplate(ClusterState state) {
MetaData metaData = state.getMetaData();
return metaData != null ? metaData.getTemplates().get(INDEX_TEMPLATE_NAME) : null;
}
public static Version parseTemplateVersion(byte[] template) {
return VersionUtils.parseVersion(MARVEL_VERSION_FIELD, template);
}
public static Version parseTemplateVersion(String template) {
return VersionUtils.parseVersion(MARVEL_VERSION_FIELD, template);
}
}

View File

@ -24,10 +24,12 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.marvel.agent.exporter.ExportBulk; import org.elasticsearch.marvel.agent.exporter.ExportBulk;
import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc; import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.renderer.Renderer; import org.elasticsearch.marvel.agent.renderer.Renderer;
import org.elasticsearch.marvel.agent.renderer.RendererRegistry; import org.elasticsearch.marvel.agent.renderer.RendererRegistry;
import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.shield.MarvelSettingsFilter; import org.elasticsearch.marvel.shield.MarvelSettingsFilter;
import org.elasticsearch.marvel.support.VersionUtils;
import javax.net.ssl.*; import javax.net.ssl.*;
import java.io.*; import java.io.*;
@ -121,7 +123,7 @@ public class HttpExporter extends Exporter {
hostnameVerification = config.settings().getAsBoolean(SSL_HOSTNAME_VERIFICATION_SETTING, true); hostnameVerification = config.settings().getAsBoolean(SSL_HOSTNAME_VERIFICATION_SETTING, true);
// Checks that the built-in template is versioned // Checks that the built-in template is versioned
templateVersion = HttpExporterUtils.parseTemplateVersion(HttpExporterUtils.loadDefaultTemplate()); templateVersion = MarvelTemplateUtils.parseTemplateVersion(MarvelTemplateUtils.loadDefaultTemplate());
if (templateVersion == null) { if (templateVersion == null) {
throw new IllegalStateException("unable to find built-in template version"); throw new IllegalStateException("unable to find built-in template version");
} }
@ -354,7 +356,7 @@ public class HttpExporter extends Exporter {
try (InputStream is = connection.getInputStream()) { try (InputStream is = connection.getInputStream()) {
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, out); Streams.copy(is, out);
return HttpExporterUtils.parseElasticsearchVersion(out.toByteArray()); return VersionUtils.parseVersion(out.toByteArray());
} }
} catch (IOException e) { } catch (IOException e) {
throw new ElasticsearchException("failed to verify the remote cluster version on host [" + host + "]:\n" + e.getMessage()); throw new ElasticsearchException("failed to verify the remote cluster version on host [" + host + "]:\n" + e.getMessage());
@ -408,7 +410,7 @@ public class HttpExporter extends Exporter {
return false; return false;
} }
Version remoteVersion = HttpExporterUtils.parseTemplateVersion(remoteTemplate); Version remoteVersion = MarvelTemplateUtils.parseTemplateVersion(remoteTemplate);
logger.debug("detected existing remote template in version [{}] on host [{}]", remoteVersion, host); logger.debug("detected existing remote template in version [{}] on host [{}]", remoteVersion, host);
if (remoteVersion == null) { if (remoteVersion == null) {
@ -461,7 +463,7 @@ public class HttpExporter extends Exporter {
} }
logger.debug("loading marvel pre-configured template"); logger.debug("loading marvel pre-configured template");
byte[] template = HttpExporterUtils.loadDefaultTemplate(); byte[] template = MarvelTemplateUtils.loadDefaultTemplate();
// Uploads the template and closes the outputstream // Uploads the template and closes the outputstream
Streams.copy(template, connection.getOutputStream()); Streams.copy(template, connection.getOutputStream());

View File

@ -5,26 +5,12 @@
*/ */
package org.elasticsearch.marvel.agent.exporter.http; package org.elasticsearch.marvel.agent.exporter.http;
import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.nio.charset.Charset;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class HttpExporterUtils { public class HttpExporterUtils {
public static final String MARVEL_TEMPLATE_FILE = "/marvel_index_template.json";
public static final String MARVEL_VERSION_FIELD = "marvel_version";
static final String VERSION_FIELD = "number";
public static URL parseHostWithPath(String host, String path) throws URISyntaxException, MalformedURLException { public static URL parseHostWithPath(String host, String path) throws URISyntaxException, MalformedURLException {
if (!host.contains("://")) { if (!host.contains("://")) {
@ -49,49 +35,4 @@ public class HttpExporterUtils {
} }
/**
* Loads the default Marvel template
*/
public static byte[] loadDefaultTemplate() {
try (InputStream is = HttpExporterUtils.class.getResourceAsStream(MARVEL_TEMPLATE_FILE)) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, out);
return out.toByteArray();
} catch (IOException e) {
throw new IllegalStateException("unable to load marvel template", e);
}
}
/**
* Extract &amp; parse the version contained in the given template
*/
public static Version parseTemplateVersion(byte[] template) {
return parseTemplateVersion(new String(template, Charset.forName("UTF-8")));
}
/**
* Extract &amp; parse the version contained in the given template
*/
public static Version parseTemplateVersion(String template) {
return parseVersion(MARVEL_VERSION_FIELD, template);
}
/**
* Extract &amp; parse the elasticsearch version, as returned by the REST API
*/
public static Version parseElasticsearchVersion(byte[] template) {
return parseVersion(VERSION_FIELD, new String(template, Charset.forName("UTF-8")));
}
static Version parseVersion(String field, String template) {
Pattern pattern = Pattern.compile(field + "\"\\s*:\\s*\"?([0-9a-zA-Z\\.\\-]+)\"?");
Matcher matcher = pattern.matcher(template);
if (matcher.find()) {
String parsedVersion = matcher.group(1);
if (Strings.hasText(parsedVersion)) {
return Version.fromString(parsedVersion);
}
}
return null;
}
} }

View File

@ -95,10 +95,7 @@ public class LocalBulk extends ExportBulk {
return; return;
} }
logger.trace("exporter [{}] - exporting data...", name); logger.trace("exporter [{}] - exporting data...", name);
// long start = System.nanoTime(); TODO remove
BulkResponse bulkResponse = requestBuilder.get(); BulkResponse bulkResponse = requestBuilder.get();
// TimeValue time = TimeValue.timeValueNanos(System.nanoTime() - start);
// logger.trace("exporter [{}] - data exported, took [{}] seconds", name, time.format(PeriodType.seconds()));
if (bulkResponse.hasFailures()) { if (bulkResponse.hasFailures()) {
throw new ElasticsearchException(bulkResponse.buildFailureMessage()); throw new ElasticsearchException(bulkResponse.buildFailureMessage());
} }

View File

@ -15,21 +15,19 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.marvel.agent.exporter.ExportBulk; import org.elasticsearch.marvel.agent.exporter.ExportBulk;
import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.renderer.RendererRegistry; import org.elasticsearch.marvel.agent.renderer.RendererRegistry;
import org.elasticsearch.marvel.shield.SecuredClient; import org.elasticsearch.marvel.shield.SecuredClient;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.InputStream; import java.io.InputStream;
import static org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils.MARVEL_VERSION_FIELD;
/** /**
* *
*/ */
@ -42,19 +40,38 @@ public class LocalExporter extends Exporter implements ClusterStateListener {
private final RendererRegistry renderers; private final RendererRegistry renderers;
private volatile LocalBulk bulk; private volatile LocalBulk bulk;
private volatile boolean active = true;
public LocalExporter(Exporter.Config config, Client client, ClusterService clusterService, RendererRegistry renderers) { public LocalExporter(Exporter.Config config, Client client, ClusterService clusterService, RendererRegistry renderers) {
super(TYPE, config); super(TYPE, config);
this.client = client; this.client = client;
this.clusterService = clusterService; this.clusterService = clusterService;
this.renderers = renderers; this.renderers = renderers;
bulk = start(clusterService.state()); bulk = resolveBulk(clusterService.state(), bulk);
clusterService.add(this); clusterService.add(this);
} }
@Override @Override
public void clusterChanged(ClusterChangedEvent event) { public void clusterChanged(ClusterChangedEvent event) {
bulk = start(event.state()); LocalBulk currentBulk = bulk;
LocalBulk newBulk = resolveBulk(event.state(), currentBulk);
// yes, this method will always be called by the cluster event loop thread
// but we need to sync with the {@code #close()} mechanism
synchronized (this) {
if (active) {
bulk = newBulk;
} else if (newBulk != null) {
newBulk.terminate();
}
if (currentBulk == null && bulk != null) {
logger.debug("local exporter [{}] - started!", name());
}
if (bulk != currentBulk && currentBulk != null) {
logger.debug("local exporter [{}] - stopped!", name());
currentBulk.terminate();
}
}
} }
@Override @Override
@ -62,31 +79,34 @@ public class LocalExporter extends Exporter implements ClusterStateListener {
return bulk; return bulk;
} }
// requires synchronization due to cluster state update events (see above)
@Override @Override
public void close() { public synchronized void close() {
active = false;
clusterService.remove(this); clusterService.remove(this);
if (bulk != null) { if (bulk != null) {
try { try {
bulk.terminate(); bulk.terminate();
bulk = null;
} catch (Exception e) { } catch (Exception e) {
logger.error("failed to cleanly close open bulk for [{}] exporter", e, name()); logger.error("local exporter [{}] - failed to cleanly close bulk", e, name());
} }
} }
} }
LocalBulk start(ClusterState clusterState) { LocalBulk resolveBulk(ClusterState clusterState, LocalBulk currentBulk) {
if (clusterService.localNode() == null || clusterState == null || bulk != null) { if (clusterService.localNode() == null || clusterState == null) {
return bulk; return currentBulk;
} }
if (clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { if (clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have .marvel-es- // wait until the gateway has recovered from disk, otherwise we think may not have .marvel-es-
// indices but they may not have been restored from the cluster state on disk // indices but they may not have been restored from the cluster state on disk
logger.debug("exporter [{}] waiting until gateway has recovered from disk", name()); logger.debug("local exporter [{}] - waiting until gateway has recovered from disk", name());
return null; return null;
} }
IndexTemplateMetaData installedTemplate = clusterState.getMetaData().getTemplates().get(INDEX_TEMPLATE_NAME); IndexTemplateMetaData installedTemplate = MarvelTemplateUtils.findMarvelTemplate(clusterState);
// if this is not the master, we'll just look to see if the marvel template is already // if this is not the master, we'll just look to see if the marvel template is already
// installed and if so, if it has a compatible version. If it is (installed and compatible) // installed and if so, if it has a compatible version. If it is (installed and compatible)
@ -94,40 +114,41 @@ public class LocalExporter extends Exporter implements ClusterStateListener {
if (!clusterService.localNode().masterNode()) { if (!clusterService.localNode().masterNode()) {
if (installedTemplate == null) { if (installedTemplate == null) {
// the marvel template is not yet installed in the given cluster state, we'll wait. // the marvel template is not yet installed in the given cluster state, we'll wait.
logger.debug("marvel index template [{}] does not exist, so service cannot start", INDEX_TEMPLATE_NAME); logger.debug("local exporter [{}] - marvel index template [{}] does not exist, so service cannot start", name(), MarvelTemplateUtils.INDEX_TEMPLATE_NAME);
return null; return null;
} }
Version installedTemplateVersion = templateVersion(installedTemplate); Version installedTemplateVersion = MarvelTemplateUtils.templateVersion(installedTemplate);
if (!installedTemplateVersionIsSufficient(Version.CURRENT, installedTemplateVersion)) { if (!installedTemplateVersionIsSufficient(Version.CURRENT, installedTemplateVersion)) {
logger.debug("exporter cannot start. the currently installed marvel template (version [{}]) is incompatible with the " + logger.debug("local exporter [{}] - cannot start. the currently installed marvel template (version [{}]) is incompatible with the " +
"current elasticsearch version [{}]. waiting until the template is updated", installedTemplateVersion, Version.CURRENT); "current elasticsearch version [{}]. waiting until the template is updated", name(), installedTemplateVersion, Version.CURRENT);
return null; return null;
} }
// ok.. we have a compatible template... we can start // ok.. we have a compatible template... we can start
logger.debug("marvel [{}] exporter started!", name()); logger.debug("local exporter [{}] - started!", name());
return new LocalBulk(name(), logger, client, indexNameResolver, renderers); return currentBulk != null ? currentBulk : new LocalBulk(name(), logger, client, indexNameResolver, renderers);
} }
// we are on master // we are on master
// //
// if we cannot find a template or a compatible template, we'll install one in / update it. // if we cannot find a template or a compatible template, we'll install one in / update it.
if (installedTemplate == null) { if (installedTemplate == null) {
logger.debug("local exporter [{}] - could not find existing marvel template, installing a new one", name());
putTemplate(config.settings().getAsSettings("template.settings")); putTemplate(config.settings().getAsSettings("template.settings"));
// we'll get that template on the next cluster state update // we'll get that template on the next cluster state update
return null; return null;
} }
Version installedTemplateVersion = templateVersion(installedTemplate); Version installedTemplateVersion = MarvelTemplateUtils.templateVersion(installedTemplate);
if (installedTemplateVersionMandatesAnUpdate(Version.CURRENT, installedTemplateVersion)) { if (installedTemplateVersionMandatesAnUpdate(Version.CURRENT, installedTemplateVersion)) {
logger.debug("installing new marvel template [{}], replacing [{}]", Version.CURRENT, installedTemplateVersion); logger.debug("local exporter [{}] - installing new marvel template [{}], replacing [{}]", name(), Version.CURRENT, installedTemplateVersion);
putTemplate(config.settings().getAsSettings("template.settings")); putTemplate(config.settings().getAsSettings("template.settings"));
// we'll get that template on the next cluster state update // we'll get that template on the next cluster state update
return null; return null;
} else if (!installedTemplateVersionIsSufficient(Version.CURRENT, installedTemplateVersion)) { } else if (!installedTemplateVersionIsSufficient(Version.CURRENT, installedTemplateVersion)) {
logger.error("marvel template version [{}] is below the minimum compatible version [{}]. " logger.error("local exporter [{}] - marvel template version [{}] is below the minimum compatible version [{}]. "
+ "please manually update the marvel template to a more recent version" + "please manually update the marvel template to a more recent version"
+ "and delete the current active marvel index (don't forget to back up it first if needed)", + "and delete the current active marvel index (don't forget to back up it first if needed)",
installedTemplateVersion, MIN_SUPPORTED_TEMPLATE_VERSION); name(), installedTemplateVersion, MIN_SUPPORTED_TEMPLATE_VERSION);
// we're not going to do anything with the template.. it's too old, and the schema might // we're not going to do anything with the template.. it's too old, and the schema might
// be too different than what this version of marvel/es can work with. For this reason we're // be too different than what this version of marvel/es can work with. For this reason we're
// not going to export any data, to avoid mapping conflicts. // not going to export any data, to avoid mapping conflicts.
@ -135,16 +156,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener {
} }
// ok.. we have a compatible template... we can start // ok.. we have a compatible template... we can start
logger.debug("marvel [{}] exporter started!", name()); return currentBulk != null ? currentBulk : new LocalBulk(name(), logger, client, indexNameResolver, renderers);
return new LocalBulk(name(), logger, client, indexNameResolver, renderers);
}
static Version templateVersion(IndexTemplateMetaData templateMetaData) {
String version = templateMetaData.settings().get("index." + MARVEL_VERSION_FIELD);
if (Strings.hasLength(version)) {
return Version.fromString(version);
}
return null;
} }
boolean installedTemplateVersionIsSufficient(Version current, Version installed) { boolean installedTemplateVersionIsSufficient(Version current, Version installed) {
@ -165,6 +177,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener {
boolean installedTemplateVersionMandatesAnUpdate(Version current, Version installed) { boolean installedTemplateVersionMandatesAnUpdate(Version current, Version installed) {
if (installed == null) { if (installed == null) {
logger.debug("local exporter [{}] - currently installed marvel template is missing a version - installing a new one [{}]", name(), current);
return true; return true;
} }
// Never update a very old template // Never update a very old template
@ -173,15 +186,15 @@ public class LocalExporter extends Exporter implements ClusterStateListener {
} }
// Always update a template to the last up-to-date version // Always update a template to the last up-to-date version
if (current.after(installed)) { if (current.after(installed)) {
logger.debug("the installed marvel template version [{}] will be updated to a newer version [{}]", installed, current); logger.debug("local exporter [{}] - currently installed marvel template version [{}] will be updated to a newer version [{}]", name(), installed, current);
return true; return true;
// When the template is up-to-date, force an update for snapshot versions only // When the template is up-to-date, force an update for snapshot versions only
} else if (current.equals(installed)) { } else if (current.equals(installed)) {
logger.debug("the installed marvel template version [{}] is up-to-date", installed); logger.debug("local exporter [{}] - currently installed marvel template version [{}] is up-to-date", name(), installed);
return installed.snapshot() && !current.snapshot(); return installed.snapshot() && !current.snapshot();
// Never update a template that is newer than the expected one // Never update a template that is newer than the expected one
} else { } else {
logger.debug("the installed marvel template version [{}] is newer than the one required [{}]... keeping it.", installed, current); logger.debug("local exporter [{}] - currently installed marvel template version [{}] is newer than the one required [{}]... keeping it.", name(), installed, current);
return false; return false;
} }
} }
@ -191,11 +204,13 @@ public class LocalExporter extends Exporter implements ClusterStateListener {
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, out); Streams.copy(is, out);
final byte[] template = out.toByteArray(); final byte[] template = out.toByteArray();
PutIndexTemplateRequest request = new PutIndexTemplateRequest(INDEX_TEMPLATE_NAME).source(template); PutIndexTemplateRequest request = new PutIndexTemplateRequest(MarvelTemplateUtils.INDEX_TEMPLATE_NAME).source(template);
if (customSettings != null && customSettings.names().size() > 0) { if (customSettings != null && customSettings.names().size() > 0) {
Settings updatedSettings = Settings.builder() Settings updatedSettings = Settings.builder()
.put(request.settings()) .put(request.settings())
.put(customSettings) .put(customSettings)
// making sure we override any other template that may apply
.put("order", Integer.MAX_VALUE)
.build(); .build();
request.settings(updatedSettings); request.settings(updatedSettings);
} }
@ -206,14 +221,16 @@ public class LocalExporter extends Exporter implements ClusterStateListener {
client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() { client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
@Override @Override
public void onResponse(PutIndexTemplateResponse response) { public void onResponse(PutIndexTemplateResponse response) {
if (!response.isAcknowledged()) { if (response.isAcknowledged()) {
logger.error("failed to update marvel index template"); logger.trace("local exporter [{}] - successfully installed marvel template", name());
} else {
logger.error("local exporter [{}] - failed to update marvel index template", name());
} }
} }
@Override @Override
public void onFailure(Throwable throwable) { public void onFailure(Throwable throwable) {
logger.error("failed to update marvel index template", throwable); logger.error("local exporter [{}] - failed to update marvel index template", throwable, name());
} }
}); });

View File

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.support;
import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import java.nio.charset.Charset;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
*
*/
public final class VersionUtils {
private VersionUtils() {
}
public static Version parseVersion(byte[] text) {
return parseVersion("", new String(text, Charset.forName("UTF-8")));
}
/**
* Extract &amp; parse the version contained in the given template
*/
public static Version parseVersion(String prefix, byte[] text) {
return parseVersion(prefix, new String(text, Charset.forName("UTF-8")));
}
public static Version parseVersion(String prefix, String text) {
Pattern pattern = Pattern.compile(prefix + "\"\\s*:\\s*\"?([0-9a-zA-Z\\.\\-]+)\"?");
Matcher matcher = pattern.matcher(text);
if (matcher.find()) {
String parsedVersion = matcher.group(1);
if (Strings.hasText(parsedVersion)) {
return Version.fromString(parsedVersion);
}
}
return null;
}
}

View File

@ -74,14 +74,14 @@ public class AbstractCollectorTestCase extends MarvelIntegTestCase {
protected void assertCanCollect(AbstractCollector collector) { protected void assertCanCollect(AbstractCollector collector) {
assertNotNull(collector); assertNotNull(collector);
assertTrue("collector [" + collector.name() + "] should be able to collect data", collector.canCollect()); assertTrue("collector [" + collector.name() + "] should be able to collect data", collector.shouldCollect());
Collection results = collector.collect(); Collection results = collector.collect();
assertNotNull(results); assertNotNull(results);
} }
protected void assertCannotCollect(AbstractCollector collector) { protected void assertCannotCollect(AbstractCollector collector) {
assertNotNull(collector); assertNotNull(collector);
assertFalse("collector [" + collector.name() + "] should not be able to collect data", collector.canCollect()); assertFalse("collector [" + collector.name() + "] should not be able to collect data", collector.shouldCollect());
Collection results = collector.collect(); Collection results = collector.collect();
assertTrue(results == null || results.isEmpty()); assertTrue(results == null || results.isEmpty());
} }

View File

@ -6,6 +6,8 @@
package org.elasticsearch.marvel.agent.exporter.http; package org.elasticsearch.marvel.agent.exporter.http;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.support.VersionUtils;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Test; import org.junit.Test;
@ -17,6 +19,7 @@ import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.MARVEL_VERSION_FIELD;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
@ -24,17 +27,17 @@ public class HttpExporterUtilsTests extends ESTestCase {
@Test @Test
public void testLoadTemplate() { public void testLoadTemplate() {
byte[] template = HttpExporterUtils.loadDefaultTemplate(); byte[] template = MarvelTemplateUtils.loadDefaultTemplate();
assertNotNull(template); assertNotNull(template);
assertThat(template.length, Matchers.greaterThan(0)); assertThat(template.length, Matchers.greaterThan(0));
} }
@Test @Test
public void testParseTemplateVersionFromByteArrayTemplate() throws IOException { public void testParseTemplateVersionFromByteArrayTemplate() throws IOException {
byte[] template = HttpExporterUtils.loadDefaultTemplate(); byte[] template = MarvelTemplateUtils.loadDefaultTemplate();
assertNotNull(template); assertNotNull(template);
Version version = HttpExporterUtils.parseTemplateVersion(template); Version version = MarvelTemplateUtils.parseTemplateVersion(template);
assertNotNull(version); assertNotNull(version);
} }
@ -49,22 +52,22 @@ public class HttpExporterUtilsTests extends ESTestCase {
templates.add("{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }"); templates.add("{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }");
for (String template : templates) { for (String template : templates) {
Version version = HttpExporterUtils.parseTemplateVersion(template); Version version = MarvelTemplateUtils.parseTemplateVersion(template);
assertNotNull(version); assertNotNull(version);
} }
Version version = HttpExporterUtils.parseTemplateVersion("{\"marvel.index_format\": \"7\"}"); Version version = MarvelTemplateUtils.parseTemplateVersion("{\"marvel.index_format\": \"7\"}");
assertNull(version); assertNull(version);
} }
@Test @Test
public void testParseVersion() throws IOException { public void testParseVersion() throws IOException {
assertNotNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0-beta1\"}")); assertNotNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0-beta1\"}"));
assertNotNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0\"}")); assertNotNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0\"}"));
assertNotNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"1.5.2\"}")); assertNotNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{\"marvel_version\": \"1.5.2\"}"));
assertNotNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD, "{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }")); assertNotNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }"));
assertNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel.index_format\": \"7\"}")); assertNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{\"marvel.index_format\": \"7\"}"));
assertNull(HttpExporterUtils.parseVersion(HttpExporterUtils.MARVEL_VERSION_FIELD + "unkown", "{\"marvel_version\": \"1.5.2\"}")); assertNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD + "unkown", "{\"marvel_version\": \"1.5.2\"}"));
} }

View File

@ -27,6 +27,7 @@ import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryMarvelDoc;
import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.Exporters; import org.elasticsearch.marvel.agent.exporter.Exporters;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc; import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.renderer.RendererRegistry; import org.elasticsearch.marvel.agent.renderer.RendererRegistry;
import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.test.MarvelIntegTestCase; import org.elasticsearch.marvel.test.MarvelIntegTestCase;
@ -47,7 +48,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.marvel.agent.exporter.Exporter.MIN_SUPPORTED_TEMPLATE_VERSION; import static org.elasticsearch.marvel.agent.exporter.Exporter.MIN_SUPPORTED_TEMPLATE_VERSION;
import static org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils.MARVEL_VERSION_FIELD;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
@ -129,7 +129,7 @@ public class LocalExporterTests extends MarvelIntegTestCase {
awaitMarvelTemplateInstalled(); awaitMarvelTemplateInstalled();
// now lets update the template with an old one and then restart the cluster // now lets update the template with an old one and then restart the cluster
exporter.putTemplate(Settings.builder().put(MARVEL_VERSION_FIELD, fakeVersion.toString()).build()); exporter.putTemplate(Settings.builder().put(MarvelTemplateUtils.MARVEL_VERSION_FIELD, fakeVersion.toString()).build());
logger.debug("full cluster restart"); logger.debug("full cluster restart");
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
internalCluster().fullRestart(new InternalTestCluster.RestartCallback() { internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
@ -168,7 +168,7 @@ public class LocalExporterTests extends MarvelIntegTestCase {
IndexTemplateMetaData template = mock(IndexTemplateMetaData.class); IndexTemplateMetaData template = mock(IndexTemplateMetaData.class);
when(template.settings()).thenReturn(Settings.builder().put("index.marvel_version", unsupportedVersion.toString()).build()); when(template.settings()).thenReturn(Settings.builder().put("index.marvel_version", unsupportedVersion.toString()).build());
MetaData metaData = mock(MetaData.class); MetaData metaData = mock(MetaData.class);
when(metaData.getTemplates()).thenReturn(ImmutableOpenMap.<String, IndexTemplateMetaData>builder().fPut(Exporter.INDEX_TEMPLATE_NAME, template).build()); when(metaData.getTemplates()).thenReturn(ImmutableOpenMap.<String, IndexTemplateMetaData>builder().fPut(MarvelTemplateUtils.INDEX_TEMPLATE_NAME, template).build());
ClusterBlocks blocks = mock(ClusterBlocks.class); ClusterBlocks blocks = mock(ClusterBlocks.class);
when(blocks.hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)).thenReturn(false); when(blocks.hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)).thenReturn(false);
ClusterState clusterState = mock(ClusterState.class); ClusterState clusterState = mock(ClusterState.class);
@ -176,7 +176,7 @@ public class LocalExporterTests extends MarvelIntegTestCase {
when(clusterState.blocks()).thenReturn(blocks); when(clusterState.blocks()).thenReturn(blocks);
when(clusterService.state()).thenReturn(clusterState); when(clusterService.state()).thenReturn(clusterState);
assertThat(exporter.start(clusterState), nullValue()); assertThat(exporter.resolveBulk(clusterState, null), nullValue());
verifyZeroInteractions(client); verifyZeroInteractions(client);
if (master) { if (master) {
verify(exporter, times(1)).installedTemplateVersionMandatesAnUpdate(Version.CURRENT, unsupportedVersion); verify(exporter, times(1)).installedTemplateVersionMandatesAnUpdate(Version.CURRENT, unsupportedVersion);
@ -239,43 +239,14 @@ public class LocalExporterTests extends MarvelIntegTestCase {
} }
} }
private void awaitMarvelTemplateInstalled() throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
assertMarvelTemplateInstalled();
}
}, 30, TimeUnit.SECONDS);
}
private void awaitMarvelTemplateInstalled(Version version) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
assertMarvelTemplateInstalled(version);
}
}, 30, TimeUnit.SECONDS);
}
protected void assertMarvelTemplateInstalled(Version version) {
for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(Exporter.INDEX_TEMPLATE_NAME).get().getIndexTemplates()) {
if (template.getName().equals(Exporter.INDEX_TEMPLATE_NAME)) {
Version templateVersion = LocalExporter.templateVersion(template);
if (templateVersion != null && templateVersion.id == version.id) {
return;
}
fail("did not find marvel template with expected version [" + version + "]. found version [" + templateVersion + "]");
}
}
fail("marvel template could not be found");
}
private Version getCurrentlyInstalledTemplateVersion() { private Version getCurrentlyInstalledTemplateVersion() {
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(Exporter.INDEX_TEMPLATE_NAME).get(); GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(MarvelTemplateUtils.INDEX_TEMPLATE_NAME).get();
assertThat(response, notNullValue()); assertThat(response, notNullValue());
assertThat(response.getIndexTemplates(), notNullValue()); assertThat(response.getIndexTemplates(), notNullValue());
assertThat(response.getIndexTemplates(), hasSize(1)); assertThat(response.getIndexTemplates(), hasSize(1));
assertThat(response.getIndexTemplates().get(0), notNullValue()); assertThat(response.getIndexTemplates().get(0), notNullValue());
return LocalExporter.templateVersion(response.getIndexTemplates().get(0)); return MarvelTemplateUtils.templateVersion(response.getIndexTemplates().get(0));
} }
} }

View File

@ -10,7 +10,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector;
import org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils; import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.test.MarvelIntegTestCase; import org.elasticsearch.marvel.test.MarvelIntegTestCase;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
@ -68,7 +68,7 @@ public class ClusterStateIT extends MarvelIntegTestCase {
ensureGreen(); ensureGreen();
logger.debug("--> forcing marvel's index template update"); logger.debug("--> forcing marvel's index template update");
assertAcked(client().admin().indices().preparePutTemplate("marvel").setSource(HttpExporterUtils.loadDefaultTemplate()).execute().actionGet()); assertAcked(client().admin().indices().preparePutTemplate("marvel").setSource(MarvelTemplateUtils.loadDefaultTemplate()).execute().actionGet());
logger.debug("--> deleting all marvel indices"); logger.debug("--> deleting all marvel indices");
deleteMarvelIndices(); deleteMarvelIndices();

View File

@ -1,100 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.renderer.cluster;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsNodes;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.test.MarvelIntegTestCase;
import org.elasticsearch.search.SearchHit;
import org.junit.Test;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.greaterThan;
public class ClusterStatsIT extends MarvelIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(MarvelSettings.INTERVAL, "3s")
.put(MarvelSettings.COLLECTORS, ClusterStatsCollector.NAME)
.build();
}
@Test
public void testClusterStats() throws Exception {
ensureGreen();
logger.debug("--> creating some indices so that every data nodes will at leats a shard");
ClusterStatsNodes.Counts counts = client().admin().cluster().prepareClusterStats().get().getNodesStats().getCounts();
assertThat(counts.getTotal(), greaterThan(0));
for (int i = 0; i < randomIntBetween(1, 5); i++) {
assertAcked(prepareCreate("test-" + i).setSettings(Settings.settingsBuilder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, counts.getDataOnly() + counts.getMasterData())
.build()));
index("test-" + i, "foo", "1", jsonBuilder().startObject().field("dummy_field", 1).endObject());
}
securedFlush();
securedRefresh();
logger.debug("--> waiting for cluster stats to report data for each node");
assertBusy(new Runnable() {
@Override
public void run() {
NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().get();
for (Map.Entry<String, NodeInfo> node : nodesInfoResponse.getNodesMap().entrySet()) {
// Checks that node has shards
ClusterStatsResponse clusterStatsResponse = client().admin().cluster().prepareClusterStats().setNodesIds(node.getKey()).get();
assertNotNull(clusterStatsResponse.getIndicesStats().getShards());
assertNotNull(clusterStatsResponse.getIndicesStats().getShards());
assertThat(clusterStatsResponse.getIndicesStats().getShards().getTotal(), greaterThan(0));
NodesStatsResponse nodeStatsResponse = client().admin().cluster().prepareNodesStats(node.getKey()).setFs(true).get();
for (NodeStats nodeStats : nodeStatsResponse) {
assertThat(nodeStats.getFs().total().getAvailable().bytes(), greaterThan(-1L));
}
}
}
}, 30L, TimeUnit.SECONDS);
logger.debug("--> delete all indices in case of cluster stats documents have been indexed with no shards data");
deleteMarvelIndices();
awaitMarvelDocsCount(greaterThan(0L), ClusterStatsCollector.TYPE);
logger.debug("--> searching for marvel documents of type [{}]", ClusterStatsCollector.TYPE);
SearchResponse response = client().prepareSearch().setTypes(ClusterStatsCollector.TYPE).get();
assertThat(response.getHits().getTotalHits(), greaterThan(0L));
logger.debug("--> checking that every document contains the expected fields");
String[] filters = ClusterStatsRenderer.FILTERS;
for (SearchHit searchHit : response.getHits().getHits()) {
Map<String, Object> fields = searchHit.sourceAsMap();
for (String filter : filters) {
assertContains(filter, fields);
}
}
logger.debug("--> cluster stats successfully collected");
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.renderer.cluster;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsNodes;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.test.MarvelIntegTestCase;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.junit.Test;
import java.util.Locale;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
import static org.hamcrest.Matchers.greaterThan;
@ClusterScope(scope = SUITE, maxNumDataNodes = 2)
public class ClusterStatsTests extends MarvelIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(MarvelSettings.INTERVAL, "3s")
.put(MarvelSettings.COLLECTORS, ClusterStatsCollector.NAME)
.build();
}
@Test
public void testClusterStats() throws Exception {
// lets wait with the collection until all the shards started
stopCollection();
logger.debug("--> creating some indices so that every data nodes will at least a shard");
ClusterStatsNodes.Counts counts = client().admin().cluster().prepareClusterStats().get().getNodesStats().getCounts();
assertThat(counts.getTotal(), greaterThan(0));
String indexNameBase = randomAsciiOfLength(5).toLowerCase(Locale.ROOT);
int indicesCount = randomIntBetween(1, 5);
String[] indices = new String[indicesCount];
for (int i = 0; i < indicesCount; i++) {
indices[i] = indexNameBase + "-" + i;
index(indices[i], "foo", "1", jsonBuilder().startObject().field("dummy_field", 1).endObject());
}
securedFlush();
securedRefresh();
securedEnsureGreen();
// ok.. we'll start collecting now...
startCollection();
awaitMarvelTemplateInstalled();
assertBusy(new Runnable() {
@Override
public void run() {
logger.debug("--> searching for marvel [{}] documents", ClusterStatsCollector.TYPE);
SearchResponse response = client().prepareSearch().setTypes(ClusterStatsCollector.TYPE).get();
assertThat(response.getHits().getTotalHits(), greaterThan(0L));
logger.debug("--> checking that every document contains the expected fields");
String[] filters = ClusterStatsRenderer.FILTERS;
for (SearchHit searchHit : response.getHits().getHits()) {
Map<String, Object> fields = searchHit.sourceAsMap();
for (String filter : filters) {
assertContains(filter, fields);
}
}
logger.debug("--> cluster stats successfully collected");
}
});
}
}

View File

@ -6,7 +6,12 @@
package org.elasticsearch.marvel.test; package org.elasticsearch.marvel.test;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -15,6 +20,7 @@ import org.elasticsearch.index.cache.IndexCacheModule;
import org.elasticsearch.license.plugin.LicensePlugin; import org.elasticsearch.license.plugin.LicensePlugin;
import org.elasticsearch.marvel.MarvelPlugin; import org.elasticsearch.marvel.MarvelPlugin;
import org.elasticsearch.marvel.agent.AgentService; import org.elasticsearch.marvel.agent.AgentService;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.shield.ShieldPlugin; import org.elasticsearch.shield.ShieldPlugin;
@ -33,12 +39,9 @@ import java.io.OutputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Arrays; import java.util.*;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.marvel.agent.exporter.Exporter.INDEX_TEMPLATE_NAME;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
@ -55,6 +58,11 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
return super.buildTestCluster(scope, seed); return super.buildTestCluster(scope, seed);
} }
@Override
protected Set<String> excludeTemplates() {
return Collections.singleton(MarvelTemplateUtils.INDEX_TEMPLATE_NAME);
}
@Override @Override
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder builder = Settings.builder() Settings.Builder builder = Settings.builder()
@ -114,15 +122,12 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
protected void deleteMarvelIndices() { protected void deleteMarvelIndices() {
if (shieldEnabled) { if (shieldEnabled) {
try { try {
assertAcked(client().admin().indices().prepareDelete(MarvelSettings.MARVEL_INDICES_PREFIX + "*")); assertAcked(client().admin().indices().prepareDelete(".marvel-es-*"));
} catch (Exception e) { } catch (IndexNotFoundException e) {
// if shield couldn't resolve any marvel index, it'll throw index not found exception. // if shield couldn't resolve any marvel index, it'll throw index not found exception.
if (!(e instanceof IndexNotFoundException)) {
throw e;
}
} }
} else { } else {
assertAcked(client().admin().indices().prepareDelete(MarvelSettings.MARVEL_INDICES_PREFIX + "*")); assertAcked(client().admin().indices().prepareDelete(".marvel-es-*"));
} }
} }
@ -137,10 +142,23 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
}, 30, TimeUnit.SECONDS); }, 30, TimeUnit.SECONDS);
} }
protected void ensureMarvelIndicesGreen() {
if (shieldEnabled) {
try {
ensureGreen(".marvel-es-*");
} catch (IndexNotFoundException e) {
// might happen with shield...
}
} else {
ensureGreen(".marvel-es-*");
}
}
protected void assertMarvelDocsCount(Matcher<Long> matcher, String... types) { protected void assertMarvelDocsCount(Matcher<Long> matcher, String... types) {
try { try {
long count = client().prepareCount(MarvelSettings.MARVEL_INDICES_PREFIX + "*") long count = client().prepareCount(MarvelSettings.MARVEL_INDICES_PREFIX + "*")
.setTypes(types).get().getCount(); .setTypes(types).get().getCount();
logger.trace("--> searched for [{}] documents, found [{}]", Strings.arrayToCommaDelimitedString(types), count);
assertThat(count, matcher); assertThat(count, matcher);
} catch (IndexNotFoundException e) { } catch (IndexNotFoundException e) {
if (shieldEnabled) { if (shieldEnabled) {
@ -152,22 +170,59 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
} }
protected void assertMarvelTemplateInstalled() { protected void assertMarvelTemplateInstalled() {
for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(INDEX_TEMPLATE_NAME).get().getIndexTemplates()) { ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
if (template.getName().equals(INDEX_TEMPLATE_NAME)) { if (clusterStateResponse != null) {
ClusterState state = clusterStateResponse.getState();
MetaData md = state.getMetaData();
}
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates().get();
for (IndexTemplateMetaData template : response.getIndexTemplates()) {
if (template.getName().equals(MarvelTemplateUtils.INDEX_TEMPLATE_NAME)) {
return; return;
} }
} }
fail("marvel template should exists"); fail("marvel template should exist");
} }
protected void assertMarvelTemplateMissing() { protected void assertMarvelTemplateMissing() {
for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(INDEX_TEMPLATE_NAME).get().getIndexTemplates()) { for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(MarvelTemplateUtils.INDEX_TEMPLATE_NAME).get().getIndexTemplates()) {
if (template.getName().equals(INDEX_TEMPLATE_NAME)) { if (template.getName().equals(MarvelTemplateUtils.INDEX_TEMPLATE_NAME)) {
fail("marvel template shouldn't exists"); fail("marvel template shouldn't exist");
} }
} }
} }
protected void awaitMarvelTemplateInstalled() throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
assertMarvelTemplateInstalled();
}
}, 30, TimeUnit.SECONDS);
}
protected void awaitMarvelTemplateInstalled(Version version) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
assertMarvelTemplateInstalled(version);
}
}, 30, TimeUnit.SECONDS);
}
protected void assertMarvelTemplateInstalled(Version version) {
for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(MarvelTemplateUtils.INDEX_TEMPLATE_NAME).get().getIndexTemplates()) {
if (template.getName().equals(MarvelTemplateUtils.INDEX_TEMPLATE_NAME)) {
Version templateVersion = MarvelTemplateUtils.templateVersion(template);
if (templateVersion != null && templateVersion.id == version.id) {
return;
}
fail("did not find marvel template with expected version [" + version + "]. found version [" + templateVersion + "]");
}
}
fail("marvel template could not be found");
}
protected void awaitIndexExists(final String... indices) throws Exception { protected void awaitIndexExists(final String... indices) throws Exception {
assertBusy(new Runnable() { assertBusy(new Runnable() {
@Override @Override
@ -194,10 +249,8 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
if (shieldEnabled) { if (shieldEnabled) {
try { try {
refresh(); refresh();
} catch (Exception e) { } catch (IndexNotFoundException e) {
if (!(e instanceof IndexNotFoundException)) { // with shield we might get that if wildcards were resolved to no indices
throw e;
}
} }
} else { } else {
refresh(); refresh();
@ -208,16 +261,26 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
if (shieldEnabled) { if (shieldEnabled) {
try { try {
flush(indices); flush(indices);
} catch (Exception e) { } catch (IndexNotFoundException e) {
if (!(e instanceof IndexNotFoundException)) { // with shield we might get that if wildcards were resolved to no indices
throw e;
}
} }
} else { } else {
flush(indices); flush(indices);
} }
} }
protected void securedEnsureGreen(String... indices) {
if (shieldEnabled) {
try {
ensureGreen(indices);
} catch (IndexNotFoundException e) {
// with shield we might get that if wildcards were resolved to no indices
}
} else {
ensureGreen(indices);
}
}
/** /**
* Checks if a field exist in a map of values. If the field contains a dot like 'foo.bar' * Checks if a field exist in a map of values. If the field contains a dot like 'foo.bar'
* it checks that 'foo' exists in the map of values and that it points to a sub-map. Then * it checks that 'foo' exists in the map of values and that it points to a sub-map. Then