Fixed the cluster state handling in local exporter

- addded additional tests for local exporter

Original commit: elastic/x-pack-elasticsearch@b188394078
This commit is contained in:
uboness 2015-09-25 09:45:21 +02:00
parent 8fa83b9109
commit 71b0b121f7
3 changed files with 108 additions and 73 deletions

View File

@ -34,7 +34,7 @@ import static org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils.MAR
/**
*
*/
public class LocalExporter extends Exporter {
public class LocalExporter extends Exporter implements ClusterStateListener {
public static final String TYPE = "local";
@ -44,18 +44,18 @@ public class LocalExporter extends Exporter {
private volatile LocalBulk bulk;
public LocalExporter(Exporter.Config config, SecuredClient client, ClusterService clusterService, RendererRegistry renderers) {
public LocalExporter(Exporter.Config config, Client client, ClusterService clusterService, RendererRegistry renderers) {
super(TYPE, config);
this.client = client;
this.clusterService = clusterService;
this.renderers = renderers;
bulk = start(clusterService.state());
clusterService.add(this);
}
clusterService.add(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
bulk = start(event.state());
}
});
@Override
public void clusterChanged(ClusterChangedEvent event) {
bulk = start(event.state());
}
@Override
@ -65,6 +65,7 @@ public class LocalExporter extends Exporter {
@Override
public void close() {
clusterService.remove(this);
if (bulk != null) {
try {
bulk.terminate();
@ -75,7 +76,7 @@ public class LocalExporter extends Exporter {
}
LocalBulk start(ClusterState clusterState) {
if (bulk != null) {
if (clusterService.localNode() == null || clusterState == null || bulk != null) {
return bulk;
}
@ -101,6 +102,7 @@ public class LocalExporter extends Exporter {
if (!installedTemplateVersionIsSufficient(Version.CURRENT, installedTemplateVersion)) {
logger.debug("exporter cannot start. the currently installed marvel template (version [{}]) is incompatible with the " +
"current elasticsearch version [{}]. waiting until the template is updated", installedTemplateVersion, Version.CURRENT);
return null;
}
// ok.. we have a compatible template... we can start
@ -122,6 +124,15 @@ public class LocalExporter extends Exporter {
putTemplate(config.settings().getAsSettings("template.settings"));
// we'll get that template on the next cluster state update
return null;
} else if (!installedTemplateVersionIsSufficient(Version.CURRENT, installedTemplateVersion)) {
logger.error("marvel template version [{}] is below the minimum compatible version [{}]. "
+ "please manually update the marvel template to a more recent version"
+ "and delete the current active marvel index (don't forget to back up it first if needed)",
installedTemplateVersion, MIN_SUPPORTED_TEMPLATE_VERSION);
// we're not going to do anything with the template.. it's too old, and the schema might
// be too different than what this version of marvel/es can work with. For this reason we're
// not going to export any data, to avoid mapping conflicts.
return null;
}
// ok.. we have a compatible template... we can start
@ -159,10 +170,6 @@ public class LocalExporter extends Exporter {
}
// Never update a very old template
if (installed.before(MIN_SUPPORTED_TEMPLATE_VERSION)) {
logger.error("marvel template version [{}] is below the minimum compatible version [{}]. "
+ "please manually update the marvel template to a more recent version"
+ "and delete the current active marvel index (don't forget to back up it first if needed)",
installed, MIN_SUPPORTED_TEMPLATE_VERSION);
return false;
}
// Always update a template to the last up-to-date version
@ -216,14 +223,6 @@ public class LocalExporter extends Exporter {
}
}
public enum State {
STARTING,
STARTED,
STOPPING,
STOPPED,
FAILED
}
public static class Factory extends Exporter.Factory<LocalExporter> {
private final SecuredClient client;

View File

@ -11,9 +11,15 @@ import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMarvelDoc;
import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryCollector;
@ -21,6 +27,7 @@ import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryMarvelDoc;
import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.Exporters;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.agent.renderer.RendererRegistry;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.test.MarvelIntegTestCase;
import org.elasticsearch.search.SearchHit;
@ -41,10 +48,10 @@ import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.marvel.agent.exporter.http.HttpExporter.MIN_SUPPORTED_TEMPLATE_VERSION;
import static org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils.MARVEL_VERSION_FIELD;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.*;
@ClusterScope(scope = Scope.SUITE, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
public class LocalExporterTests extends MarvelIntegTestCase {
private final static AtomicLong timeStampGenerator = new AtomicLong();
@ -53,7 +60,6 @@ public class LocalExporterTests extends MarvelIntegTestCase {
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
// .put(MarvelSettings.STARTUP_DELAY, "1h")
.build();
}
@ -140,39 +146,48 @@ public class LocalExporterTests extends MarvelIntegTestCase {
awaitMarvelTemplateInstalled(Version.CURRENT);
}
//TODO needs a rewrite, the `start(ClusterState)` should be unit tested
// @Test @AwaitsFix(bugUrl = "LocalExporter#210")
// public void testUnsupportedTemplateVersion() throws Exception {
// internalCluster().startNode(Settings.builder()
// .put("marvel.agent.exporters._local.type", LocalExporter.TYPE)
// .build());
// ensureGreen();
//
// LocalExporter exporter = getLocalExporter("_local");
//
// Version fakeVersion = randomFrom(Version.V_0_18_0, Version.V_1_0_0, Version.V_1_4_0);
// assertFalse(exporter.shouldUpdateTemplate(fakeVersion, Version.CURRENT));
//
// logger.debug("--> creating the marvel template with a fake version [{}]", fakeVersion);
// exporter.putTemplate(Settings.builder().put(MARVEL_VERSION_FIELD, fakeVersion.toString()).build());
// assertMarvelTemplateInstalled();
//
// assertThat(exporter.templateVersion(), equalTo(fakeVersion));
//
// logger.debug("--> exporting when the marvel template is tool old: no document is exported and the template is not updated");
// awaitMarvelDocsCount(is(0L));
// exporter.export(Collections.singletonList(newRandomMarvelDoc()));
// awaitMarvelDocsCount(is(0L));
// assertMarvelTemplateInstalled();
//
// assertThat(exporter.templateVersion(), equalTo(fakeVersion));
// }
@Test
public void testUnsupportedTemplateVersion() throws Exception {
@Test @TestLogging("marvel.agent:debug")
Exporter.Config config = new Exporter.Config("_name", Settings.EMPTY, Settings.builder()
.put("type", "local").build());
Client client = mock(Client.class);
ClusterService clusterService = mock(ClusterService.class);
boolean master = randomBoolean();
DiscoveryNode localNode = mock(DiscoveryNode.class);
when(localNode.masterNode()).thenReturn(master);
when(clusterService.localNode()).thenReturn(localNode);
RendererRegistry renderers = mock(RendererRegistry.class);
LocalExporter exporter = spy(new LocalExporter(config, client, clusterService, renderers));
// creating a cluster state mock that holds unsupported template version
Version unsupportedVersion = randomFrom(Version.V_0_18_0, Version.V_1_0_0, Version.V_1_4_0);
IndexTemplateMetaData template = mock(IndexTemplateMetaData.class);
when(template.settings()).thenReturn(Settings.builder().put("index.marvel_version", unsupportedVersion.toString()).build());
MetaData metaData = mock(MetaData.class);
when(metaData.getTemplates()).thenReturn(ImmutableOpenMap.<String, IndexTemplateMetaData>builder().fPut(Exporter.INDEX_TEMPLATE_NAME, template).build());
ClusterBlocks blocks = mock(ClusterBlocks.class);
when(blocks.hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)).thenReturn(false);
ClusterState clusterState = mock(ClusterState.class);
when(clusterState.getMetaData()).thenReturn(metaData);
when(clusterState.blocks()).thenReturn(blocks);
when(clusterService.state()).thenReturn(clusterState);
assertThat(exporter.start(clusterState), nullValue());
verifyZeroInteractions(client);
if (master) {
verify(exporter, times(1)).installedTemplateVersionMandatesAnUpdate(Version.CURRENT, unsupportedVersion);
}
verify(exporter, times(1)).installedTemplateVersionIsSufficient(Version.CURRENT, unsupportedVersion);
}
@Test @TestLogging("marvel.agent:trace")
public void testIndexTimestampFormat() throws Exception {
long time = System.currentTimeMillis();
final String timeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM");
String expectedIndexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(time);
String timeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM");
internalCluster().startNode(Settings.builder()
.put("marvel.agent.exporters._local.type", LocalExporter.TYPE)
@ -182,32 +197,29 @@ public class LocalExporterTests extends MarvelIntegTestCase {
LocalExporter exporter = getLocalExporter("_local");
assertThat(exporter.indexNameResolver().resolve(time), equalTo(expectedIndexName));
// first lets test that the index resolver works with time
String indexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(time);
assertThat(exporter.indexNameResolver().resolve(time), equalTo(indexName));
// now lets test that the index name resolver works with a doc
MarvelDoc doc = newRandomMarvelDoc();
indexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(doc.timestamp());
assertThat(exporter.indexNameResolver().resolve(doc), equalTo(indexName));
logger.debug("--> exporting a random marvel document");
MarvelDoc doc = newRandomMarvelDoc();
exporter.export(Collections.singletonList(doc));
awaitMarvelDocsCount(is(1L));
expectedIndexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(doc.timestamp());
logger.debug("--> check that the index [{}] has the correct timestamp [{}]", timeFormat, expectedIndexName);
assertThat(client().admin().indices().prepareExists(expectedIndexName).get().isExists(), is(true));
awaitIndexExists(indexName);
logger.debug("--> updates the timestamp");
final String newTimeFormat = randomFrom("dd", "dd.MM.YYYY", "dd.MM");
timeFormat = randomFrom("dd", "dd.MM.YYYY", "dd.MM");
updateClusterSettings(Settings.builder().put("marvel.agent.exporters._local.index.name.time_format", timeFormat));
exporter = getLocalExporter("_local"); // we need to get it again.. as it was rebuilt
indexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(doc.timestamp());
assertThat(exporter.indexNameResolver().resolve(doc), equalTo(indexName));
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put("marvel.agent.exporters._local.index.name.time_format", newTimeFormat)));
logger.debug("--> exporting a random marvel document");
doc = newRandomMarvelDoc();
logger.debug("--> exporting the document again (this time with the the new index name time format [{}], expecting index name [{}]", timeFormat, indexName);
exporter.export(Collections.singletonList(doc));
awaitMarvelDocsCount(is(1L));
String newExpectedIndexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(doc.timestamp());
logger.debug("--> check that the index [{}] has the correct timestamp [{}]", newTimeFormat, newExpectedIndexName);
assertThat(exporter.indexNameResolver().resolve(doc.timestamp()), equalTo(newExpectedIndexName));
assertTrue(client().admin().indices().prepareExists(newExpectedIndexName).get().isExists());
awaitIndexExists(indexName);
}
private LocalExporter getLocalExporter(String name) throws Exception {

View File

@ -7,6 +7,7 @@ package org.elasticsearch.marvel.test;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
@ -38,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.marvel.agent.exporter.Exporter.INDEX_TEMPLATE_NAME;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.is;
/**
*
@ -169,6 +171,28 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
}
}
protected void awaitIndexExists(final String... indices) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
assertIndicesExists(indices);
}
}, 30, TimeUnit.SECONDS);
}
protected void assertIndicesExists(String... indices) {
logger.trace("checking if index exists [{}]", Strings.arrayToCommaDelimitedString(indices));
assertThat(client().admin().indices().prepareExists(indices).get().isExists(), is(true));
}
protected void updateClusterSettings(Settings.Builder settings) {
updateClusterSettings(settings.build());
}
protected void updateClusterSettings(Settings settings) {
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings));
}
protected void securedRefresh() {
if (shieldEnabled) {
try {