diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java index dd5a1d6aeb9..6dfaf619dbc 100644 --- a/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java +++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java @@ -172,13 +172,16 @@ public class AgentService extends AbstractLifecycleComponent imple if (bulk == null) { // exporters are either not ready or faulty continue; } +// long start = System.nanoTime(); //TODO remove try { for (Collector collector : collectors) { - logger.trace("collecting [{}]", collector.name()); if (collecting) { Collection docs = collector.collect(); if (docs != null) { + logger.trace("bulk [{}] - adding collected docs from [{}] collector", bulk, collector.name()); bulk.add(docs); + } else { + logger.trace("bulk [{}] - skipping collected docs from [{}] collector", bulk, collector.name()); } } if (closed) { @@ -187,13 +190,16 @@ public class AgentService extends AbstractLifecycleComponent imple } } } finally { +// long delta = System.nanoTime() - start; TODO remove +// logger.trace("closing bulk [{}] - collection took [{}] seconds", bulk, TimeValue.timeValueNanos(delta).format(PeriodType.seconds())); bulk.close(!closed && collecting); } } catch (InterruptedException e) { + logger.trace("interrupted"); Thread.currentThread().interrupt(); } catch (Throwable t) { - logger.error("Background thread had an uncaught exception:", t); + logger.error("background thread had an uncaught exception", t); } finally { firstRun = false; } diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/AbstractCollector.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/AbstractCollector.java index f12a8a6f85c..b61470e3c2d 100644 --- a/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/AbstractCollector.java +++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/AbstractCollector.java @@ -64,13 +64,14 @@ public abstract class AbstractCollector extends AbstractLifecycleComponent public Collection collect() { try { if (canCollect()) { + logger.trace("collector [{}] - collecting data...", name()); return doCollect(); } logger.trace("collector [{}] can not collect data", name()); } catch (ElasticsearchTimeoutException e) { logger.error("collector [{}] timed out when collecting data"); } catch (Exception e) { - logger.error("collector [{}] throws exception when collecting data", e, name()); + logger.error("collector [{}] - failed collecting data", e, name()); } return null; } diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportBulk.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportBulk.java index 7ef58af956e..5e2632e9667 100644 --- a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportBulk.java +++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportBulk.java @@ -15,7 +15,7 @@ import java.util.Collection; */ public abstract class ExportBulk { - private final String name; + protected final String name; public ExportBulk(String name) { this.name = name; @@ -25,6 +25,11 @@ public abstract class ExportBulk { return add(Arrays.asList(docs)); } + @Override + public String toString() { + return name; + } + public abstract ExportBulk add(Collection docs) throws Exception; public abstract void flush() throws Exception; diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporter.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporter.java index d11e9f75786..24014208593 100644 --- a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporter.java +++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporter.java @@ -5,10 +5,12 @@ */ package org.elasticsearch.marvel.agent.exporter; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.marvel.shield.MarvelSettingsFilter; import org.joda.time.format.DateTimeFormat; @@ -19,18 +21,23 @@ import java.util.Collection; public abstract class Exporter { public static final String INDEX_NAME_TIME_FORMAT_SETTING = "index.name.time_format"; + public static final String BULK_TIMEOUT_SETTING = "bulk.timeout"; + public static final String DEFAULT_INDEX_NAME_TIME_FORMAT = "YYYY.MM.dd"; + public static final String INDEX_TEMPLATE_NAME = "marvel"; protected final String type; protected final Config config; protected final ESLogger logger; protected final IndexNameResolver indexNameResolver; + protected final @Nullable TimeValue bulkTimeout; public Exporter(String type, Config config) { this.type = type; this.config = config; this.logger = config.logger(getClass()); this.indexNameResolver = new DefaultIndexNameResolver(config.settings); + bulkTimeout = config.settings().getAsTime(BULK_TIMEOUT_SETTING, null); } public String type() { diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java index 3f05589b34c..0f440dcf610 100644 --- a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java +++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java @@ -58,9 +58,6 @@ public class HttpExporter extends Exporter { // es level timeout used when checking and writing templates (used to speed up tests) public static final String TEMPLATE_CHECK_TIMEOUT_SETTING = "index.template.master_timeout"; - // es level timeout used for bulk indexing (used to speed up tests) - public static final String BULK_TIMEOUT_SETTING = "bulk.timeout"; - public static final String SSL_SETTING = "ssl"; public static final String SSL_PROTOCOL_SETTING = SSL_SETTING + ".protocol"; public static final String SSL_TRUSTSTORE_SETTING = SSL_SETTING + ".truststore.path"; @@ -87,7 +84,6 @@ public class HttpExporter extends Exporter { final RendererRegistry rendererRegistry; final @Nullable TimeValue templateCheckTimeout; - final @Nullable TimeValue bulkTimeout; volatile boolean checkedAndUploadedIndexTemplate = false; volatile boolean supportedClusterVersion = false; @@ -121,7 +117,6 @@ public class HttpExporter extends Exporter { // TimeValue SHOULD NOT SELECTIVELY CHOOSE WHAT FIELDS TO PARSE BASED ON THEIR NAMES!!!! String templateCheckTimeoutValue = config.settings().get(TEMPLATE_CHECK_TIMEOUT_SETTING, null); templateCheckTimeout = TimeValue.parseTimeValue(templateCheckTimeoutValue, null, settingFQN(TEMPLATE_CHECK_TIMEOUT_SETTING)); - bulkTimeout = config.settings().getAsTime(BULK_TIMEOUT_SETTING, null); keepAliveWorker = new ConnectionKeepAliveWorker(); diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalBulk.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalBulk.java index 1bc18daebc5..2327fefde80 100644 --- a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalBulk.java +++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalBulk.java @@ -94,7 +94,11 @@ public class LocalBulk extends ExportBulk { if (state.get() != State.ACTIVE || requestBuilder == null) { return; } + logger.trace("exporter [{}] - exporting data...", name); +// long start = System.nanoTime(); TODO remove BulkResponse bulkResponse = requestBuilder.get(); +// TimeValue time = TimeValue.timeValueNanos(System.nanoTime() - start); +// logger.trace("exporter [{}] - data exported, took [{}] seconds", name, time.format(PeriodType.seconds())); if (bulkResponse.hasFailures()) { throw new ElasticsearchException(bulkResponse.buildFailureMessage()); } diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java index 58f16d253d2..5a9c1fc8641 100644 --- a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java +++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java @@ -6,30 +6,28 @@ package org.elasticsearch.marvel.agent.exporter.local; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.marvel.agent.exporter.ExportBulk; import org.elasticsearch.marvel.agent.exporter.Exporter; -import org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils; import org.elasticsearch.marvel.agent.renderer.RendererRegistry; import org.elasticsearch.marvel.shield.SecuredClient; import java.io.ByteArrayOutputStream; import java.io.InputStream; -import java.util.concurrent.atomic.AtomicReference; -import static org.elasticsearch.marvel.agent.exporter.http.HttpExporter.MIN_SUPPORTED_CLUSTER_VERSION; import static org.elasticsearch.marvel.agent.exporter.http.HttpExporter.MIN_SUPPORTED_TEMPLATE_VERSION; import static org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils.MARVEL_VERSION_FIELD; @@ -40,24 +38,11 @@ public class LocalExporter extends Exporter { public static final String TYPE = "local"; - public static final String INDEX_TEMPLATE_NAME = "marvel"; - - public static final String BULK_TIMEOUT_SETTING = "bulk.timeout"; - private final Client client; private final ClusterService clusterService; private final RendererRegistry renderers; - private final LocalBulk bulk; - - final @Nullable TimeValue bulkTimeout; - - private final AtomicReference state = new AtomicReference<>(); - - /** - * Version of the built-in template - **/ - private final Version builtInTemplateVersion; + private volatile LocalBulk bulk; public LocalExporter(Exporter.Config config, SecuredClient client, ClusterService clusterService, RendererRegistry renderers) { super(TYPE, config); @@ -65,86 +50,132 @@ public class LocalExporter extends Exporter { this.clusterService = clusterService; this.renderers = renderers; - // Checks that the built-in template is versioned - builtInTemplateVersion = HttpExporterUtils.parseTemplateVersion(HttpExporterUtils.loadDefaultTemplate()); - if (builtInTemplateVersion == null) { - throw new IllegalStateException("unable to find built-in template version"); - } - - bulkTimeout = config.settings().getAsTime(BULK_TIMEOUT_SETTING, null); - - state.set(State.STARTING); - bulk = new LocalBulk(name(), logger, client, indexNameResolver, renderers); + clusterService.add(new ClusterStateListener() { + @Override + public void clusterChanged(ClusterChangedEvent event) { + bulk = start(event.state()); + } + }); } @Override public ExportBulk openBulk() { - if (!canExport()) { - return null; - } return bulk; } @Override public void close() { - if (state.compareAndSet(State.STARTING, State.STOPPING) || state.compareAndSet(State.STARTED, State.STOPPING)) { + if (bulk != null) { try { bulk.terminate(); } catch (Exception e) { logger.error("failed to cleanly close open bulk for [{}] exporter", e, name()); } - state.set(State.STOPPED); } } - ClusterState clusterState() { - return client.admin().cluster().prepareState().get().getState(); - } + LocalBulk start(ClusterState clusterState) { + if (bulk != null) { + return bulk; + } - Version clusterVersion() { - return Version.CURRENT; - } + if (clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // wait until the gateway has recovered from disk, otherwise we think may not have .marvel-es- + // indices but they may not have been restored from the cluster state on disk + logger.debug("exporter [{}] waiting until gateway has recovered from disk", name()); + return null; + } - Version templateVersion() { - for (IndexTemplateMetaData template : client.admin().indices().prepareGetTemplates(INDEX_TEMPLATE_NAME).get().getIndexTemplates()) { - if (template.getName().equals(INDEX_TEMPLATE_NAME)) { - String version = template.settings().get("index." + MARVEL_VERSION_FIELD); - if (Strings.hasLength(version)) { - return Version.fromString(version); - } + IndexTemplateMetaData installedTemplate = clusterState.getMetaData().getTemplates().get(INDEX_TEMPLATE_NAME); + + // if this is not the master, we'll just look to see if the marvel template is already + // installed and if so, if it has a compatible version. If it is (installed and compatible) + // we'll be able to start this exporter. Otherwise, we'll just wait for a new cluster state. + if (!clusterService.localNode().masterNode()) { + if (installedTemplate == null) { + // the marvel template is not yet installed in the given cluster state, we'll wait. + logger.debug("marvel index template [{}] does not exist, so service cannot start", INDEX_TEMPLATE_NAME); + return null; } + Version installedTemplateVersion = templateVersion(installedTemplate); + if (!installedTemplateVersionIsSufficient(Version.CURRENT, installedTemplateVersion)) { + logger.debug("exporter cannot start. the currently installed marvel template (version [{}]) is incompatible with the " + + "current elasticsearch version [{}]. waiting until the template is updated", installedTemplateVersion, Version.CURRENT); + } + + // ok.. we have a compatible template... we can start + logger.debug("marvel [{}] exporter started!", name()); + return new LocalBulk(name(), logger, client, indexNameResolver, renderers); + } + + // we are on master + // + // if we cannot find a template or a compatible template, we'll install one in / update it. + if (installedTemplate == null) { + putTemplate(config.settings().getAsSettings("template.settings")); + // we'll get that template on the next cluster state update + return null; + } + Version installedTemplateVersion = templateVersion(installedTemplate); + if (installedTemplateVersionMandatesAnUpdate(Version.CURRENT, installedTemplateVersion)) { + logger.debug("installing new marvel template [{}], replacing [{}]", Version.CURRENT, installedTemplateVersion); + putTemplate(config.settings().getAsSettings("template.settings")); + // we'll get that template on the next cluster state update + return null; + } + + // ok.. we have a compatible template... we can start + logger.debug("marvel [{}] exporter started!", name()); + return new LocalBulk(name(), logger, client, indexNameResolver, renderers); + } + + static Version templateVersion(IndexTemplateMetaData templateMetaData) { + String version = templateMetaData.settings().get("index." + MARVEL_VERSION_FIELD); + if (Strings.hasLength(version)) { + return Version.fromString(version); } return null; } - boolean shouldUpdateTemplate(Version current, Version expected) { - // Always update a template even if its version is not found - if (current == null) { - return true; - } - // Never update a template in an unknown version - if (expected == null) { + boolean installedTemplateVersionIsSufficient(Version current, Version installed) { + if (installed == null) { return false; } + if (installed.before(MIN_SUPPORTED_TEMPLATE_VERSION)) { + return false; + } + if (current.after(installed)) { + return true; + } + if (current.equals(installed)) { + return current.snapshot(); + } + return false; + } + + boolean installedTemplateVersionMandatesAnUpdate(Version current, Version installed) { + if (installed == null) { + return true; + } // Never update a very old template - if (current.before(MIN_SUPPORTED_TEMPLATE_VERSION)) { + if (installed.before(MIN_SUPPORTED_TEMPLATE_VERSION)) { logger.error("marvel template version [{}] is below the minimum compatible version [{}]. " + "please manually update the marvel template to a more recent version" + "and delete the current active marvel index (don't forget to back up it first if needed)", - current, MIN_SUPPORTED_TEMPLATE_VERSION); + installed, MIN_SUPPORTED_TEMPLATE_VERSION); return false; } // Always update a template to the last up-to-date version - if (expected.after(current)) { - logger.info("marvel template version will be updated to a newer version [current:{}, expected:{}]", current, expected); + if (current.after(installed)) { + logger.debug("the installed marvel template version [{}] will be updated to a newer version [{}]", installed, current); return true; // When the template is up-to-date, force an update for snapshot versions only - } else if (expected.equals(current)) { - logger.debug("marvel template version is up-to-date [current:{}, expected:{}]", current, expected); - return expected.snapshot(); + } else if (current.equals(installed)) { + logger.debug("the installed marvel template version [{}] is up-to-date", installed); + return installed.snapshot() && !current.snapshot(); // Never update a template that is newer than the expected one } else { - logger.debug("marvel template version is newer than the one required by the marvel agent [current:{}, expected:{}]", current, expected); + logger.debug("the installed marvel template version [{}] is newer than the one required [{}]... keeping it.", installed, current); return false; } } @@ -165,69 +196,26 @@ public class LocalExporter extends Exporter { assert !Thread.currentThread().isInterrupted() : "current thread has been interrupted before putting index template!!!"; - PutIndexTemplateResponse response = client.admin().indices().putTemplate(request).actionGet(); - if (!response.isAcknowledged()) { - throw new IllegalStateException("failed to put marvel index template"); - } + // async call, so we won't block cluster event thread + client.admin().indices().putTemplate(request, new ActionListener() { + @Override + public void onResponse(PutIndexTemplateResponse response) { + if (!response.isAcknowledged()) { + logger.error("failed to update marvel index template"); + } + } + + @Override + public void onFailure(Throwable throwable) { + logger.error("failed to update marvel index template", throwable); + } + }); + } catch (Exception e) { throw new IllegalStateException("failed to update marvel index template", e); } } - boolean canExport() { - if (state.get() == State.STARTED) { - return true; - } - - if (state.get() != State.STARTING) { - return false; - } - - ClusterState clusterState = clusterState(); - if (clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { - // wait until the gateway has recovered from disk, otherwise we think may not have .marvel-es- - // indices but they may not have been restored from the cluster state on disk - logger.debug("exporter [{}] waiting until gateway has recovered from disk", name()); - return false; - } - - Version clusterVersion = clusterVersion(); - if ((clusterVersion == null) || clusterVersion.before(MIN_SUPPORTED_CLUSTER_VERSION)) { - logger.error("cluster version [" + clusterVersion + "] is not supported, please use a cluster with minimum version [" + MIN_SUPPORTED_CLUSTER_VERSION + "]"); - state.set(State.FAILED); - return false; - } - - Version templateVersion = templateVersion(); - if (!clusterService.state().nodes().localNodeMaster()) { - if (templateVersion == null) { - logger.debug("marvel index template [{}] does not exist, so service cannot start", INDEX_TEMPLATE_NAME); - return false; - } - - // TODO why do we need this check? the marvel indices are anyway auto-created -// String indexName = indexNameResolver.resolve(System.currentTimeMillis()); -// if (!clusterState.routingTable().index(indexName).allPrimaryShardsActive()) { -// logger.debug("marvel index [{}] has some primary shards not yet started, so service cannot start", indexName); -// return false; -// } - } - //TODO this is erroneous - // the check may figure out that the existing version is too old and therefore - // it can't and won't update the template (prompting the user to delete the template). - // In this case, we shouldn't export data. But we do.. the "shouldUpdate" method - // needs to be "boolean ensureCompatibleTemplate". The boolean returned indicates whether - // the template is valid (either was valid or was updated to a valid one) or not. If - // not, the state of this exporter should not be set to STARTED. - if (shouldUpdateTemplate(templateVersion, builtInTemplateVersion)) { - putTemplate(config.settings().getAsSettings("template.settings")); - } - - logger.debug("exporter [{}] can now export marvel data", name()); - state.set(State.STARTED); - return true; - } - public enum State { STARTING, STARTED, diff --git a/marvel/src/main/resources/marvel_index_template.json b/marvel/src/main/resources/marvel_index_template.json index 56464478a2b..9fc37552123 100644 --- a/marvel/src/main/resources/marvel_index_template.json +++ b/marvel/src/main/resources/marvel_index_template.json @@ -1,5 +1,5 @@ { - "template": ".marvel*", + "template": ".marvel-es-*", "settings": { "marvel_version": "${project.version}", "index.number_of_shards": 1, diff --git a/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporterTests.java b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporterTests.java index 80f8ec65bac..3741eb1bf7c 100644 --- a/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporterTests.java +++ b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporterTests.java @@ -124,7 +124,7 @@ public class HttpExporterTests extends MarvelIntegTestCase { exporter.export(Collections.singletonList(newRandomMarvelDoc())); logger.info("verifying that template has been created"); - assertMarvelTemplateExists(); + assertMarvelTemplateInstalled(); } @Test @@ -172,7 +172,7 @@ public class HttpExporterTests extends MarvelIntegTestCase { logger.info("removing the marvel template"); assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get()); - assertMarvelTemplateNotExists(); + assertMarvelTemplateMissing(); assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings( Settings.builder().putArray("marvel.agent.exporters._http.host", exporter.hosts)).get()); @@ -184,7 +184,7 @@ public class HttpExporterTests extends MarvelIntegTestCase { exporter.export(Collections.singletonList(newRandomMarvelDoc())); logger.info("verifying that template has been created"); - assertMarvelTemplateExists(); + assertMarvelTemplateInstalled(); } @Test @@ -207,11 +207,11 @@ public class HttpExporterTests extends MarvelIntegTestCase { exporter.export(Collections.singletonList(newRandomMarvelDoc())); logger.info("verifying that template has been created"); - assertMarvelTemplateExists(); + assertMarvelTemplateInstalled(); logger.info("--> removing the marvel template"); assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get()); - assertMarvelTemplateNotExists(); + assertMarvelTemplateMissing(); logger.info("--> shutting down target0"); assertThat(target0.name, is(internalCluster().getMasterName())); // just to be sure it's still the master @@ -229,7 +229,7 @@ public class HttpExporterTests extends MarvelIntegTestCase { fail("failed to export event from node0"); } logger.debug("--> checking for template"); - assertMarvelTemplateExists(); + assertMarvelTemplateInstalled(); logger.debug("--> template exists"); } }, 30, TimeUnit.SECONDS); @@ -274,7 +274,7 @@ public class HttpExporterTests extends MarvelIntegTestCase { assertTrue(client().admin().indices().prepareExists(expectedMarvelIndex).get().isExists()); logger.info("verifying that template has been created"); - assertMarvelTemplateExists(); + assertMarvelTemplateInstalled(); } @Test diff --git a/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporterTests.java b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporterTests.java index 2ca2c1ff4ba..0cd170109a8 100644 --- a/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporterTests.java +++ b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporterTests.java @@ -8,8 +8,11 @@ package org.elasticsearch.marvel.agent.exporter.local; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; +import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMarvelDoc; @@ -23,6 +26,8 @@ import org.elasticsearch.marvel.test.MarvelIntegTestCase; import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.joda.time.format.DateTimeFormat; import org.junit.Test; @@ -30,6 +35,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.marvel.agent.exporter.http.HttpExporter.MIN_SUPPORTED_TEMPLATE_VERSION; @@ -37,7 +44,7 @@ import static org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils.MAR import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.*; -@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0) +@ClusterScope(scope = Scope.SUITE, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0) public class LocalExporterTests extends MarvelIntegTestCase { private final static AtomicLong timeStampGenerator = new AtomicLong(); @@ -46,7 +53,7 @@ public class LocalExporterTests extends MarvelIntegTestCase { protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - .put(MarvelSettings.STARTUP_DELAY, "1h") +// .put(MarvelSettings.STARTUP_DELAY, "1h") .build(); } @@ -91,16 +98,14 @@ public class LocalExporterTests extends MarvelIntegTestCase { ensureGreen(); LocalExporter exporter = getLocalExporter("_local"); - assertTrue(exporter.shouldUpdateTemplate(null, Version.CURRENT)); + assertTrue(exporter.installedTemplateVersionMandatesAnUpdate(Version.CURRENT, null)); - assertMarvelTemplateNotExists(); + // lets wait until the marvel template will be installed + awaitMarvelTemplateInstalled(); - logger.debug("--> exporting when the marvel template does not exists: template should be created"); - exporter.export(Collections.singletonList(newRandomMarvelDoc())); - awaitMarvelDocsCount(is(1L)); - assertMarvelTemplateExists(); + awaitMarvelDocsCount(greaterThan(0L)); - assertThat(exporter.templateVersion(), equalTo(Version.CURRENT)); + assertThat(getCurrentlyInstalledTemplateVersion(), is(Version.CURRENT)); } @Test @@ -112,50 +117,58 @@ public class LocalExporterTests extends MarvelIntegTestCase { LocalExporter exporter = getLocalExporter("_local"); Version fakeVersion = MIN_SUPPORTED_TEMPLATE_VERSION; - assertTrue(exporter.shouldUpdateTemplate(fakeVersion, Version.CURRENT)); + assertThat(exporter.installedTemplateVersionMandatesAnUpdate(Version.CURRENT, fakeVersion), is(true)); - logger.debug("--> creating the marvel template with a fake version [{}]", fakeVersion); + // first, lets wait for the marvel template to be installed + awaitMarvelTemplateInstalled(); + + // now lets update the template with an old one and then restart the cluster exporter.putTemplate(Settings.builder().put(MARVEL_VERSION_FIELD, fakeVersion.toString()).build()); - assertMarvelTemplateExists(); + logger.debug("full cluster restart"); + final CountDownLatch latch = new CountDownLatch(1); + internalCluster().fullRestart(new InternalTestCluster.RestartCallback() { + @Override + public void doAfterNodes(int n, Client client) throws Exception { + latch.countDown(); + } + }); + if (!latch.await(30, TimeUnit.SECONDS)) { + fail("waited too long (at least 30 seconds) for the cluster to restart"); + } - assertThat(exporter.templateVersion(), equalTo(fakeVersion)); - - logger.debug("--> exporting when the marvel template must be updated: document is exported and the template is updated"); - exporter.export(Collections.singletonList(newRandomMarvelDoc())); - awaitMarvelDocsCount(is(1L)); - assertMarvelTemplateExists(); - - assertThat(exporter.templateVersion(), equalTo(Version.CURRENT)); + // now that the cluster is restarting, lets wait for the new template version to be installed + awaitMarvelTemplateInstalled(Version.CURRENT); } - @Test @AwaitsFix(bugUrl = "LocalExporter#210") - public void testUnsupportedTemplateVersion() throws Exception { - internalCluster().startNode(Settings.builder() - .put("marvel.agent.exporters._local.type", LocalExporter.TYPE) - .build()); - ensureGreen(); + //TODO needs a rewrite, the `start(ClusterState)` should be unit tested +// @Test @AwaitsFix(bugUrl = "LocalExporter#210") +// public void testUnsupportedTemplateVersion() throws Exception { +// internalCluster().startNode(Settings.builder() +// .put("marvel.agent.exporters._local.type", LocalExporter.TYPE) +// .build()); +// ensureGreen(); +// +// LocalExporter exporter = getLocalExporter("_local"); +// +// Version fakeVersion = randomFrom(Version.V_0_18_0, Version.V_1_0_0, Version.V_1_4_0); +// assertFalse(exporter.shouldUpdateTemplate(fakeVersion, Version.CURRENT)); +// +// logger.debug("--> creating the marvel template with a fake version [{}]", fakeVersion); +// exporter.putTemplate(Settings.builder().put(MARVEL_VERSION_FIELD, fakeVersion.toString()).build()); +// assertMarvelTemplateInstalled(); +// +// assertThat(exporter.templateVersion(), equalTo(fakeVersion)); +// +// logger.debug("--> exporting when the marvel template is tool old: no document is exported and the template is not updated"); +// awaitMarvelDocsCount(is(0L)); +// exporter.export(Collections.singletonList(newRandomMarvelDoc())); +// awaitMarvelDocsCount(is(0L)); +// assertMarvelTemplateInstalled(); +// +// assertThat(exporter.templateVersion(), equalTo(fakeVersion)); +// } - LocalExporter exporter = getLocalExporter("_local"); - - Version fakeVersion = randomFrom(Version.V_0_18_0, Version.V_1_0_0, Version.V_1_4_0); - assertFalse(exporter.shouldUpdateTemplate(fakeVersion, Version.CURRENT)); - - logger.debug("--> creating the marvel template with a fake version [{}]", fakeVersion); - exporter.putTemplate(Settings.builder().put(MARVEL_VERSION_FIELD, fakeVersion.toString()).build()); - assertMarvelTemplateExists(); - - assertThat(exporter.templateVersion(), equalTo(fakeVersion)); - - logger.debug("--> exporting when the marvel template is tool old: no document is exported and the template is not updated"); - awaitMarvelDocsCount(is(0L)); - exporter.export(Collections.singletonList(newRandomMarvelDoc())); - awaitMarvelDocsCount(is(0L)); - assertMarvelTemplateExists(); - - assertThat(exporter.templateVersion(), equalTo(fakeVersion)); - } - - @Test + @Test @TestLogging("marvel.agent:debug") public void testIndexTimestampFormat() throws Exception { long time = System.currentTimeMillis(); final String timeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM"); @@ -178,7 +191,7 @@ public class LocalExporterTests extends MarvelIntegTestCase { expectedIndexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(doc.timestamp()); logger.debug("--> check that the index [{}] has the correct timestamp [{}]", timeFormat, expectedIndexName); - assertTrue(client().admin().indices().prepareExists(expectedIndexName).get().isExists()); + assertThat(client().admin().indices().prepareExists(expectedIndexName).get().isExists(), is(true)); logger.debug("--> updates the timestamp"); final String newTimeFormat = randomFrom("dd", "dd.MM.YYYY", "dd.MM"); @@ -214,4 +227,43 @@ public class LocalExporterTests extends MarvelIntegTestCase { } } + private void awaitMarvelTemplateInstalled() throws Exception { + assertBusy(new Runnable() { + @Override + public void run() { + assertMarvelTemplateInstalled(); + } + }, 30, TimeUnit.SECONDS); + } + + private void awaitMarvelTemplateInstalled(Version version) throws Exception { + assertBusy(new Runnable() { + @Override + public void run() { + assertMarvelTemplateInstalled(version); + } + }, 30, TimeUnit.SECONDS); + } + + protected void assertMarvelTemplateInstalled(Version version) { + for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(Exporter.INDEX_TEMPLATE_NAME).get().getIndexTemplates()) { + if (template.getName().equals(Exporter.INDEX_TEMPLATE_NAME)) { + Version templateVersion = LocalExporter.templateVersion(template); + if (templateVersion != null && templateVersion.id == version.id) { + return; + } + fail("did not find marvel template with expected version [" + version + "]. found version [" + templateVersion + "]"); + } + } + fail("marvel template could not be found"); + } + + private Version getCurrentlyInstalledTemplateVersion() { + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(Exporter.INDEX_TEMPLATE_NAME).get(); + assertThat(response, notNullValue()); + assertThat(response.getIndexTemplates(), notNullValue()); + assertThat(response.getIndexTemplates(), hasSize(1)); + assertThat(response.getIndexTemplates().get(0), notNullValue()); + return LocalExporter.templateVersion(response.getIndexTemplates().get(0)); + } } diff --git a/marvel/src/test/java/org/elasticsearch/marvel/agent/renderer/cluster/ClusterStateIT.java b/marvel/src/test/java/org/elasticsearch/marvel/agent/renderer/cluster/ClusterStateIT.java index 8fcf4c26a75..4aa708dfcc6 100644 --- a/marvel/src/test/java/org/elasticsearch/marvel/agent/renderer/cluster/ClusterStateIT.java +++ b/marvel/src/test/java/org/elasticsearch/marvel/agent/renderer/cluster/ClusterStateIT.java @@ -11,7 +11,6 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector; import org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils; import org.elasticsearch.marvel.agent.renderer.AbstractRendererTestCase; -import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.search.SearchHit; import org.junit.Test; @@ -67,7 +66,7 @@ public class ClusterStateIT extends AbstractRendererTestCase { deleteMarvelIndices(); logger.debug("--> checking for template existence"); - assertMarvelTemplateExists(); + assertMarvelTemplateInstalled(); awaitMarvelDocsCount(greaterThan(0L), ClusterStateCollector.TYPE); logger.debug("--> searching for marvel documents of type [{}]", ClusterStateCollector.TYPE); diff --git a/marvel/src/test/java/org/elasticsearch/marvel/test/MarvelIntegTestCase.java b/marvel/src/test/java/org/elasticsearch/marvel/test/MarvelIntegTestCase.java index 24adc2e5a9c..e1b9c5c809a 100644 --- a/marvel/src/test/java/org/elasticsearch/marvel/test/MarvelIntegTestCase.java +++ b/marvel/src/test/java/org/elasticsearch/marvel/test/MarvelIntegTestCase.java @@ -14,7 +14,6 @@ import org.elasticsearch.index.cache.IndexCacheModule; import org.elasticsearch.license.plugin.LicensePlugin; import org.elasticsearch.marvel.MarvelPlugin; import org.elasticsearch.marvel.agent.AgentService; -import org.elasticsearch.marvel.agent.exporter.local.LocalExporter; import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.shield.ShieldPlugin; @@ -35,7 +34,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; +import java.util.concurrent.TimeUnit; +import static org.elasticsearch.marvel.agent.exporter.Exporter.INDEX_TEMPLATE_NAME; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; /** @@ -134,7 +135,7 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase { public void run() { assertMarvelDocsCount(matcher, types); } - }); + }, 30, TimeUnit.SECONDS); } protected void assertMarvelDocsCount(Matcher matcher, String... types) { @@ -151,21 +152,21 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase { } } - protected void assertMarvelTemplateExists() { - assertTrue("marvel template shouldn't exists", isTemplateExists(LocalExporter.INDEX_TEMPLATE_NAME)); - } - - protected void assertMarvelTemplateNotExists() { - assertFalse("marvel template should exists", isTemplateExists(LocalExporter.INDEX_TEMPLATE_NAME)); - } - - private boolean isTemplateExists(String templateName) { - for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(templateName).get().getIndexTemplates()) { - if (template.getName().equals(templateName)) { - return true; + protected void assertMarvelTemplateInstalled() { + for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(INDEX_TEMPLATE_NAME).get().getIndexTemplates()) { + if (template.getName().equals(INDEX_TEMPLATE_NAME)) { + return; + } + } + fail("marvel template shouldn't exists"); + } + + protected void assertMarvelTemplateMissing() { + for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(INDEX_TEMPLATE_NAME).get().getIndexTemplates()) { + if (template.getName().equals(INDEX_TEMPLATE_NAME)) { + fail("marvel template should exists"); } } - return false; } protected void securedRefresh() { @@ -228,12 +229,12 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase { " '*': all\n" + "\n" + "admin:\n" + - " cluster: manage_watcher, cluster:monitor/nodes/info, cluster:monitor/nodes/liveness\n" + + " cluster: cluster:monitor/nodes/info, cluster:monitor/nodes/liveness\n" + "transport_client:\n" + " cluster: cluster:monitor/nodes/info, cluster:monitor/nodes/liveness\n" + "\n" + "monitor:\n" + - " cluster: monitor_watcher, cluster:monitor/nodes/info, cluster:monitor/nodes/liveness\n" + " cluster: cluster:monitor/nodes/info, cluster:monitor/nodes/liveness\n" ;