From 71b0b121f70376ad34c690331e28487ea2a72698 Mon Sep 17 00:00:00 2001 From: uboness Date: Fri, 25 Sep 2015 09:45:21 +0200 Subject: [PATCH] Fixed the cluster state handling in local exporter - addded additional tests for local exporter Original commit: elastic/x-pack-elasticsearch@b18839407886d3b0079f7dcd7908e807826bc415 --- .../agent/exporter/local/LocalExporter.java | 41 +++---- .../exporter/local/LocalExporterTests.java | 116 ++++++++++-------- .../marvel/test/MarvelIntegTestCase.java | 24 ++++ 3 files changed, 108 insertions(+), 73 deletions(-) diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java index 5a9c1fc8641..7368ab28f2f 100644 --- a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java +++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java @@ -34,7 +34,7 @@ import static org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils.MAR /** * */ -public class LocalExporter extends Exporter { +public class LocalExporter extends Exporter implements ClusterStateListener { public static final String TYPE = "local"; @@ -44,18 +44,18 @@ public class LocalExporter extends Exporter { private volatile LocalBulk bulk; - public LocalExporter(Exporter.Config config, SecuredClient client, ClusterService clusterService, RendererRegistry renderers) { + public LocalExporter(Exporter.Config config, Client client, ClusterService clusterService, RendererRegistry renderers) { super(TYPE, config); this.client = client; this.clusterService = clusterService; this.renderers = renderers; + bulk = start(clusterService.state()); + clusterService.add(this); + } - clusterService.add(new ClusterStateListener() { - @Override - public void clusterChanged(ClusterChangedEvent event) { - bulk = start(event.state()); - } - }); + @Override + public void clusterChanged(ClusterChangedEvent event) { + bulk = start(event.state()); } @Override @@ -65,6 +65,7 @@ public class LocalExporter extends Exporter { @Override public void close() { + clusterService.remove(this); if (bulk != null) { try { bulk.terminate(); @@ -75,7 +76,7 @@ public class LocalExporter extends Exporter { } LocalBulk start(ClusterState clusterState) { - if (bulk != null) { + if (clusterService.localNode() == null || clusterState == null || bulk != null) { return bulk; } @@ -101,6 +102,7 @@ public class LocalExporter extends Exporter { if (!installedTemplateVersionIsSufficient(Version.CURRENT, installedTemplateVersion)) { logger.debug("exporter cannot start. the currently installed marvel template (version [{}]) is incompatible with the " + "current elasticsearch version [{}]. waiting until the template is updated", installedTemplateVersion, Version.CURRENT); + return null; } // ok.. we have a compatible template... we can start @@ -122,6 +124,15 @@ public class LocalExporter extends Exporter { putTemplate(config.settings().getAsSettings("template.settings")); // we'll get that template on the next cluster state update return null; + } else if (!installedTemplateVersionIsSufficient(Version.CURRENT, installedTemplateVersion)) { + logger.error("marvel template version [{}] is below the minimum compatible version [{}]. " + + "please manually update the marvel template to a more recent version" + + "and delete the current active marvel index (don't forget to back up it first if needed)", + installedTemplateVersion, MIN_SUPPORTED_TEMPLATE_VERSION); + // we're not going to do anything with the template.. it's too old, and the schema might + // be too different than what this version of marvel/es can work with. For this reason we're + // not going to export any data, to avoid mapping conflicts. + return null; } // ok.. we have a compatible template... we can start @@ -159,10 +170,6 @@ public class LocalExporter extends Exporter { } // Never update a very old template if (installed.before(MIN_SUPPORTED_TEMPLATE_VERSION)) { - logger.error("marvel template version [{}] is below the minimum compatible version [{}]. " - + "please manually update the marvel template to a more recent version" - + "and delete the current active marvel index (don't forget to back up it first if needed)", - installed, MIN_SUPPORTED_TEMPLATE_VERSION); return false; } // Always update a template to the last up-to-date version @@ -216,14 +223,6 @@ public class LocalExporter extends Exporter { } } - public enum State { - STARTING, - STARTED, - STOPPING, - STOPPED, - FAILED - } - public static class Factory extends Exporter.Factory { private final SecuredClient client; diff --git a/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporterTests.java b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporterTests.java index 0cd170109a8..96ad9a6d99e 100644 --- a/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporterTests.java +++ b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporterTests.java @@ -11,9 +11,15 @@ import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMarvelDoc; import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryCollector; @@ -21,6 +27,7 @@ import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryMarvelDoc; import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.marvel.agent.exporter.Exporters; import org.elasticsearch.marvel.agent.exporter.MarvelDoc; +import org.elasticsearch.marvel.agent.renderer.RendererRegistry; import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.marvel.test.MarvelIntegTestCase; import org.elasticsearch.search.SearchHit; @@ -41,10 +48,10 @@ import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.marvel.agent.exporter.http.HttpExporter.MIN_SUPPORTED_TEMPLATE_VERSION; import static org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils.MARVEL_VERSION_FIELD; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.*; +import static org.mockito.Mockito.*; -@ClusterScope(scope = Scope.SUITE, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0) +@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0) public class LocalExporterTests extends MarvelIntegTestCase { private final static AtomicLong timeStampGenerator = new AtomicLong(); @@ -53,7 +60,6 @@ public class LocalExporterTests extends MarvelIntegTestCase { protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) -// .put(MarvelSettings.STARTUP_DELAY, "1h") .build(); } @@ -140,39 +146,48 @@ public class LocalExporterTests extends MarvelIntegTestCase { awaitMarvelTemplateInstalled(Version.CURRENT); } - //TODO needs a rewrite, the `start(ClusterState)` should be unit tested -// @Test @AwaitsFix(bugUrl = "LocalExporter#210") -// public void testUnsupportedTemplateVersion() throws Exception { -// internalCluster().startNode(Settings.builder() -// .put("marvel.agent.exporters._local.type", LocalExporter.TYPE) -// .build()); -// ensureGreen(); -// -// LocalExporter exporter = getLocalExporter("_local"); -// -// Version fakeVersion = randomFrom(Version.V_0_18_0, Version.V_1_0_0, Version.V_1_4_0); -// assertFalse(exporter.shouldUpdateTemplate(fakeVersion, Version.CURRENT)); -// -// logger.debug("--> creating the marvel template with a fake version [{}]", fakeVersion); -// exporter.putTemplate(Settings.builder().put(MARVEL_VERSION_FIELD, fakeVersion.toString()).build()); -// assertMarvelTemplateInstalled(); -// -// assertThat(exporter.templateVersion(), equalTo(fakeVersion)); -// -// logger.debug("--> exporting when the marvel template is tool old: no document is exported and the template is not updated"); -// awaitMarvelDocsCount(is(0L)); -// exporter.export(Collections.singletonList(newRandomMarvelDoc())); -// awaitMarvelDocsCount(is(0L)); -// assertMarvelTemplateInstalled(); -// -// assertThat(exporter.templateVersion(), equalTo(fakeVersion)); -// } + @Test + public void testUnsupportedTemplateVersion() throws Exception { - @Test @TestLogging("marvel.agent:debug") + Exporter.Config config = new Exporter.Config("_name", Settings.EMPTY, Settings.builder() + .put("type", "local").build()); + Client client = mock(Client.class); + + ClusterService clusterService = mock(ClusterService.class); + boolean master = randomBoolean(); + DiscoveryNode localNode = mock(DiscoveryNode.class); + when(localNode.masterNode()).thenReturn(master); + when(clusterService.localNode()).thenReturn(localNode); + + RendererRegistry renderers = mock(RendererRegistry.class); + + LocalExporter exporter = spy(new LocalExporter(config, client, clusterService, renderers)); + + // creating a cluster state mock that holds unsupported template version + Version unsupportedVersion = randomFrom(Version.V_0_18_0, Version.V_1_0_0, Version.V_1_4_0); + IndexTemplateMetaData template = mock(IndexTemplateMetaData.class); + when(template.settings()).thenReturn(Settings.builder().put("index.marvel_version", unsupportedVersion.toString()).build()); + MetaData metaData = mock(MetaData.class); + when(metaData.getTemplates()).thenReturn(ImmutableOpenMap.builder().fPut(Exporter.INDEX_TEMPLATE_NAME, template).build()); + ClusterBlocks blocks = mock(ClusterBlocks.class); + when(blocks.hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)).thenReturn(false); + ClusterState clusterState = mock(ClusterState.class); + when(clusterState.getMetaData()).thenReturn(metaData); + when(clusterState.blocks()).thenReturn(blocks); + when(clusterService.state()).thenReturn(clusterState); + + assertThat(exporter.start(clusterState), nullValue()); + verifyZeroInteractions(client); + if (master) { + verify(exporter, times(1)).installedTemplateVersionMandatesAnUpdate(Version.CURRENT, unsupportedVersion); + } + verify(exporter, times(1)).installedTemplateVersionIsSufficient(Version.CURRENT, unsupportedVersion); + } + + @Test @TestLogging("marvel.agent:trace") public void testIndexTimestampFormat() throws Exception { long time = System.currentTimeMillis(); - final String timeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM"); - String expectedIndexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(time); + String timeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM"); internalCluster().startNode(Settings.builder() .put("marvel.agent.exporters._local.type", LocalExporter.TYPE) @@ -182,32 +197,29 @@ public class LocalExporterTests extends MarvelIntegTestCase { LocalExporter exporter = getLocalExporter("_local"); - assertThat(exporter.indexNameResolver().resolve(time), equalTo(expectedIndexName)); + // first lets test that the index resolver works with time + String indexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(time); + assertThat(exporter.indexNameResolver().resolve(time), equalTo(indexName)); + + // now lets test that the index name resolver works with a doc + MarvelDoc doc = newRandomMarvelDoc(); + indexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(doc.timestamp()); + assertThat(exporter.indexNameResolver().resolve(doc), equalTo(indexName)); logger.debug("--> exporting a random marvel document"); - MarvelDoc doc = newRandomMarvelDoc(); exporter.export(Collections.singletonList(doc)); - awaitMarvelDocsCount(is(1L)); - expectedIndexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(doc.timestamp()); - - logger.debug("--> check that the index [{}] has the correct timestamp [{}]", timeFormat, expectedIndexName); - assertThat(client().admin().indices().prepareExists(expectedIndexName).get().isExists(), is(true)); + awaitIndexExists(indexName); logger.debug("--> updates the timestamp"); - final String newTimeFormat = randomFrom("dd", "dd.MM.YYYY", "dd.MM"); + timeFormat = randomFrom("dd", "dd.MM.YYYY", "dd.MM"); + updateClusterSettings(Settings.builder().put("marvel.agent.exporters._local.index.name.time_format", timeFormat)); + exporter = getLocalExporter("_local"); // we need to get it again.. as it was rebuilt + indexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(doc.timestamp()); + assertThat(exporter.indexNameResolver().resolve(doc), equalTo(indexName)); - assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() - .put("marvel.agent.exporters._local.index.name.time_format", newTimeFormat))); - - logger.debug("--> exporting a random marvel document"); - doc = newRandomMarvelDoc(); + logger.debug("--> exporting the document again (this time with the the new index name time format [{}], expecting index name [{}]", timeFormat, indexName); exporter.export(Collections.singletonList(doc)); - awaitMarvelDocsCount(is(1L)); - String newExpectedIndexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(doc.timestamp()); - - logger.debug("--> check that the index [{}] has the correct timestamp [{}]", newTimeFormat, newExpectedIndexName); - assertThat(exporter.indexNameResolver().resolve(doc.timestamp()), equalTo(newExpectedIndexName)); - assertTrue(client().admin().indices().prepareExists(newExpectedIndexName).get().isExists()); + awaitIndexExists(indexName); } private LocalExporter getLocalExporter(String name) throws Exception { diff --git a/marvel/src/test/java/org/elasticsearch/marvel/test/MarvelIntegTestCase.java b/marvel/src/test/java/org/elasticsearch/marvel/test/MarvelIntegTestCase.java index e1b9c5c809a..82f53fde4a7 100644 --- a/marvel/src/test/java/org/elasticsearch/marvel/test/MarvelIntegTestCase.java +++ b/marvel/src/test/java/org/elasticsearch/marvel/test/MarvelIntegTestCase.java @@ -7,6 +7,7 @@ package org.elasticsearch.marvel.test; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; @@ -38,6 +39,7 @@ import java.util.concurrent.TimeUnit; import static org.elasticsearch.marvel.agent.exporter.Exporter.INDEX_TEMPLATE_NAME; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.is; /** * @@ -169,6 +171,28 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase { } } + protected void awaitIndexExists(final String... indices) throws Exception { + assertBusy(new Runnable() { + @Override + public void run() { + assertIndicesExists(indices); + } + }, 30, TimeUnit.SECONDS); + } + + protected void assertIndicesExists(String... indices) { + logger.trace("checking if index exists [{}]", Strings.arrayToCommaDelimitedString(indices)); + assertThat(client().admin().indices().prepareExists(indices).get().isExists(), is(true)); + } + + protected void updateClusterSettings(Settings.Builder settings) { + updateClusterSettings(settings.build()); + } + + protected void updateClusterSettings(Settings settings) { + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings)); + } + protected void securedRefresh() { if (shieldEnabled) { try {