diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java index 5c2a2b166bc..9519f0f9fcf 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java @@ -32,7 +32,7 @@ public class DeleteIndexTemplateResponse extends AcknowledgedResponse { DeleteIndexTemplateResponse() { } - DeleteIndexTemplateResponse(boolean acknowledged) { + protected DeleteIndexTemplateResponse(boolean acknowledged) { super(acknowledged); } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java index b22106d9710..cae2042f52f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java @@ -387,6 +387,14 @@ public class IndexTemplateMetaData extends AbstractDiffable> indexTemplateMetaDataUpgraders; + + public final ClusterService clusterService; + + public final ThreadPool threadPool; + + public final Client client; + + private final AtomicInteger updatesInProgress = new AtomicInteger(); + + private ImmutableOpenMap lastTemplateMetaData; + + public TemplateUpgradeService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool, + Collection>> indexTemplateMetaDataUpgraders) { + super(settings); + this.client = client; + this.clusterService = clusterService; + this.threadPool = threadPool; + this.indexTemplateMetaDataUpgraders = templates -> { + Map upgradedTemplates = new HashMap<>(templates); + for (UnaryOperator> upgrader : indexTemplateMetaDataUpgraders) { + upgradedTemplates = upgrader.apply(upgradedTemplates); + } + return upgradedTemplates; + }; + clusterService.addListener(this); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + ClusterState state = event.state(); + if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // wait until the gateway has recovered from disk, otherwise we think may not have the index templates, + // while they actually do exist + return; + } + + if (updatesInProgress.get() > 0) { + // we are already running some updates - skip this cluster state update + return; + } + + ImmutableOpenMap templates = state.getMetaData().getTemplates(); + + if (templates == lastTemplateMetaData) { + // we already checked these sets of templates - no reason to check it again + // we can do identity check here because due to cluster state diffs the actual map will not change + // if there were no changes + return; + } + + if (shouldLocalNodeUpdateTemplates(state.nodes()) == false) { + return; + } + + + lastTemplateMetaData = templates; + Optional, Set>> changes = calculateTemplateChanges(templates); + if (changes.isPresent()) { + if (updatesInProgress.compareAndSet(0, changes.get().v1().size() + changes.get().v2().size())) { + threadPool.generic().execute(() -> updateTemplates(changes.get().v1(), changes.get().v2())); + } + } + } + + /** + * Checks if the current node should update the templates + * + * If the master has the newest verison in the cluster - it will be dedicated template updater. + * Otherwise the node with the highest id among nodes with the highest version should update the templates + */ + boolean shouldLocalNodeUpdateTemplates(DiscoveryNodes nodes) { + DiscoveryNode localNode = nodes.getLocalNode(); + // Only data and master nodes should update the template + if (localNode.isDataNode() || localNode.isMasterNode()) { + Version maxVersion = nodes.getLargestNonClientNodeVersion(); + if (maxVersion.equals(nodes.getMasterNode().getVersion())) { + // If the master has the latest version - we will allow it to handle the update + return nodes.isLocalNodeElectedMaster(); + } else { + if (maxVersion.equals(localNode.getVersion()) == false) { + // The localhost node doesn't have the latest version - not going to update + return false; + } + for (ObjectCursor node : nodes.getMasterAndDataNodes().values()) { + if (node.value.getVersion().equals(maxVersion) && node.value.getId().compareTo(localNode.getId()) > 0) { + // We have a node with higher id then mine - it should update + return false; + } + } + // We have the highest version and highest id - we should perform the update + return true; + } + } else { + return false; + } + } + + void updateTemplates(Map changes, Set deletions) { + for (Map.Entry change : changes.entrySet()) { + PutIndexTemplateRequest request = + new PutIndexTemplateRequest(change.getKey()).source(change.getValue(), XContentType.JSON); + request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); + client.admin().indices().putTemplate(request, new ActionListener() { + @Override + public void onResponse(PutIndexTemplateResponse response) { + updatesInProgress.decrementAndGet(); + if (response.isAcknowledged() == false) { + logger.warn("Error updating template [{}], request was not acknowledged", change.getKey()); + } + } + + @Override + public void onFailure(Exception e) { + updatesInProgress.decrementAndGet(); + logger.warn(new ParameterizedMessage("Error updating template [{}]", change.getKey()), e); + } + }); + } + + for (String template : deletions) { + DeleteIndexTemplateRequest request = new DeleteIndexTemplateRequest(template); + request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); + client.admin().indices().deleteTemplate(request, new ActionListener() { + @Override + public void onResponse(DeleteIndexTemplateResponse response) { + updatesInProgress.decrementAndGet(); + if (response.isAcknowledged() == false) { + logger.warn("Error deleting template [{}], request was not acknowledged", template); + } + } + + @Override + public void onFailure(Exception e) { + updatesInProgress.decrementAndGet(); + if (e instanceof IndexTemplateMissingException == false) { + // we might attempt to delete the same template from different nodes - so that's ok if template doesn't exist + // otherwise we need to warn + logger.warn(new ParameterizedMessage("Error deleting template [{}]", template), e); + } + } + }); + } + } + + int getUpdatesInProgress() { + return updatesInProgress.get(); + } + + Optional, Set>> calculateTemplateChanges( + ImmutableOpenMap templates) { + // collect current templates + Map existingMap = new HashMap<>(); + for (ObjectObjectCursor customCursor : templates) { + existingMap.put(customCursor.key, customCursor.value); + } + // upgrade global custom meta data + Map upgradedMap = indexTemplateMetaDataUpgraders.apply(existingMap); + if (upgradedMap.equals(existingMap) == false) { + Set deletes = new HashSet<>(); + Map changes = new HashMap<>(); + // remove templates if needed + existingMap.keySet().forEach(s -> { + if (upgradedMap.containsKey(s) == false) { + deletes.add(s); + } + }); + upgradedMap.forEach((key, value) -> { + if (value.equals(existingMap.get(key)) == false) { + changes.put(key, toBytesReference(value)); + } + }); + return Optional.of(new Tuple<>(changes, deletes)); + } + return Optional.empty(); + } + + private static final ToXContent.Params PARAMS = new ToXContent.MapParams(singletonMap("reduce_mappings", "true")); + + private BytesReference toBytesReference(IndexTemplateMetaData templateMetaData) { + try { + return XContentHelper.toXContent((builder, params) -> { + IndexTemplateMetaData.Builder.toInnerXContent(templateMetaData, builder, params); + return builder; + }, XContentType.JSON, PARAMS, false); + } catch (IOException ex) { + throw new IllegalStateException("Cannot serialize template [" + templateMetaData.getName() + "]", ex); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java index c38f556a0a8..2ed88aa1127 100644 --- a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java @@ -56,13 +56,14 @@ public class DiscoveryNodes extends AbstractDiffable implements private final String masterNodeId; private final String localNodeId; private final Version minNonClientNodeVersion; + private final Version maxNonClientNodeVersion; private final Version maxNodeVersion; private final Version minNodeVersion; private DiscoveryNodes(ImmutableOpenMap nodes, ImmutableOpenMap dataNodes, ImmutableOpenMap masterNodes, ImmutableOpenMap ingestNodes, - String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNodeVersion, - Version minNodeVersion) { + String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNonClientNodeVersion, + Version maxNodeVersion, Version minNodeVersion) { this.nodes = nodes; this.dataNodes = dataNodes; this.masterNodes = masterNodes; @@ -70,6 +71,7 @@ public class DiscoveryNodes extends AbstractDiffable implements this.masterNodeId = masterNodeId; this.localNodeId = localNodeId; this.minNonClientNodeVersion = minNonClientNodeVersion; + this.maxNonClientNodeVersion = maxNonClientNodeVersion; this.minNodeVersion = minNodeVersion; this.maxNodeVersion = maxNodeVersion; } @@ -234,12 +236,25 @@ public class DiscoveryNodes extends AbstractDiffable implements /** * Returns the version of the node with the oldest version in the cluster that is not a client node * + * If there are no non-client nodes, Version.CURRENT will be returned. + * * @return the oldest version in the cluster */ public Version getSmallestNonClientNodeVersion() { return minNonClientNodeVersion; } + /** + * Returns the version of the node with the youngest version in the cluster that is not a client node. + * + * If there are no non-client nodes, Version.CURRENT will be returned. + * + * @return the youngest version in the cluster + */ + public Version getLargestNonClientNodeVersion() { + return maxNonClientNodeVersion; + } + /** * Returns the version of the node with the oldest version in the cluster. * @@ -252,7 +267,7 @@ public class DiscoveryNodes extends AbstractDiffable implements /** * Returns the version of the node with the youngest version in the cluster * - * @return the oldest version in the cluster + * @return the youngest version in the cluster */ public Version getMaxNodeVersion() { return maxNodeVersion; @@ -654,15 +669,25 @@ public class DiscoveryNodes extends AbstractDiffable implements ImmutableOpenMap.Builder ingestNodesBuilder = ImmutableOpenMap.builder(); Version minNodeVersion = Version.CURRENT; Version maxNodeVersion = Version.CURRENT; - Version minNonClientNodeVersion = Version.CURRENT; + // The node where we are building this on might not be a master or a data node, so we cannot assume + // that there is a node with the current version as a part of the cluster. + Version minNonClientNodeVersion = null; + Version maxNonClientNodeVersion = null; for (ObjectObjectCursor nodeEntry : nodes) { if (nodeEntry.value.isDataNode()) { dataNodesBuilder.put(nodeEntry.key, nodeEntry.value); - minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion()); } if (nodeEntry.value.isMasterNode()) { masterNodesBuilder.put(nodeEntry.key, nodeEntry.value); - minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion()); + } + if (nodeEntry.value.isDataNode() || nodeEntry.value.isMasterNode()) { + if (minNonClientNodeVersion == null) { + minNonClientNodeVersion = nodeEntry.value.getVersion(); + maxNonClientNodeVersion = nodeEntry.value.getVersion(); + } else { + minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion()); + maxNonClientNodeVersion = Version.max(maxNonClientNodeVersion, nodeEntry.value.getVersion()); + } } if (nodeEntry.value.isIngestNode()) { ingestNodesBuilder.put(nodeEntry.key, nodeEntry.value); @@ -673,7 +698,8 @@ public class DiscoveryNodes extends AbstractDiffable implements return new DiscoveryNodes( nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), ingestNodesBuilder.build(), - masterNodeId, localNodeId, minNonClientNodeVersion, maxNodeVersion, minNodeVersion + masterNodeId, localNodeId, minNonClientNodeVersion == null ? Version.CURRENT : minNonClientNodeVersion, + maxNonClientNodeVersion == null ? Version.CURRENT : maxNonClientNodeVersion, maxNodeVersion, minNodeVersion ); } diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 13c829844e1..0945d58e453 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -47,6 +47,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; +import org.elasticsearch.cluster.metadata.TemplateUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.service.ClusterService; @@ -415,6 +416,7 @@ public class Node implements Closeable { Collection> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream() .map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList()); final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders); + new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders); final Transport transport = networkModule.getTransportSupplier().get(); final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings()); diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java new file mode 100644 index 00000000000..52e19711265 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java @@ -0,0 +1,186 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.UnaryOperator; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) +public class TemplateUpgradeServiceIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(TestPlugin.class); + } + + public static class TestPlugin extends Plugin { + // This setting is used to simulate cluster state updates + static final Setting UPDATE_TEMPLATE_DUMMY_SETTING = + Setting.intSetting("tests.update_template_count", 0, Setting.Property.NodeScope, Setting.Property.Dynamic); + + protected final Logger logger; + protected final Settings settings; + + public TestPlugin(Settings settings) { + this.logger = Loggers.getLogger(getClass(), settings); + this.settings = settings; + } + + @Override + public Collection createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, ScriptService scriptService, + NamedXContentRegistry xContentRegistry) { + clusterService.getClusterSettings().addSettingsUpdateConsumer(UPDATE_TEMPLATE_DUMMY_SETTING, integer -> { + logger.debug("the template dummy setting was updated to {}", integer); + }); + return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry); + } + + @Override + public UnaryOperator> getIndexTemplateMetaDataUpgrader() { + return templates -> { + templates.put("test_added_template", IndexTemplateMetaData.builder("test_added_template") + .patterns(Collections.singletonList("*")).build()); + templates.remove("test_removed_template"); + templates.put("test_changed_template", IndexTemplateMetaData.builder("test_changed_template").order(10) + .patterns(Collections.singletonList("*")).build()); + return templates; + }; + } + + @Override + public List> getSettings() { + return Collections.singletonList(UPDATE_TEMPLATE_DUMMY_SETTING); + } + } + + + public void testTemplateUpdate() throws Exception { + assertTemplates(); + + // Change some templates + assertAcked(client().admin().indices().preparePutTemplate("test_dummy_template").setOrder(0) + .setPatterns(Collections.singletonList("*")).get()); + assertAcked(client().admin().indices().preparePutTemplate("test_changed_template").setOrder(0) + .setPatterns(Collections.singletonList("*")).get()); + assertAcked(client().admin().indices().preparePutTemplate("test_removed_template").setOrder(1) + .setPatterns(Collections.singletonList("*")).get()); + + // Wait for the templates to be updated back to normal + assertBusy(() -> { + List templates = client().admin().indices().prepareGetTemplates("test_*").get().getIndexTemplates(); + assertThat(templates.size(), equalTo(3)); + boolean addedFound = false; + boolean changedFound = false; + boolean dummyFound = false; + for (int i = 0; i < 3; i++) { + IndexTemplateMetaData templateMetaData = templates.get(i); + switch (templateMetaData.getName()) { + case "test_added_template": + assertFalse(addedFound); + addedFound = true; + break; + case "test_changed_template": + assertFalse(changedFound); + changedFound = true; + assertThat(templateMetaData.getOrder(), equalTo(10)); + break; + case "test_dummy_template": + assertFalse(dummyFound); + dummyFound = true; + break; + default: + fail("unexpected template " + templateMetaData.getName()); + break; + } + } + + assertTrue(addedFound); + assertTrue(changedFound); + assertTrue(dummyFound); + }); + + // Wipe out all templates + assertAcked(client().admin().indices().prepareDeleteTemplate("test_*").get()); + + assertTemplates(); + + } + + private void assertTemplates() throws Exception { + AtomicInteger updateCount = new AtomicInteger(); + // Make sure all templates are recreated correctly + assertBusy(() -> { + // the updates only happen on cluster state updates, so we need to make sure that the cluster state updates are happening + // so we need to simulate updates to make sure the template upgrade kicks in + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings( + Settings.builder().put(TestPlugin.UPDATE_TEMPLATE_DUMMY_SETTING.getKey(), updateCount.incrementAndGet()) + ).get()); + + List templates = client().admin().indices().prepareGetTemplates("test_*").get().getIndexTemplates(); + assertThat(templates.size(), equalTo(2)); + boolean addedFound = false; + boolean changedFound = false; + for (int i = 0; i < 2; i++) { + IndexTemplateMetaData templateMetaData = templates.get(i); + switch (templateMetaData.getName()) { + case "test_added_template": + assertFalse(addedFound); + addedFound = true; + break; + case "test_changed_template": + assertFalse(changedFound); + changedFound = true; + assertThat(templateMetaData.getOrder(), equalTo(10)); + break; + default: + fail("unexpected template " + templateMetaData.getName()); + break; + } + } + + assertTrue(addedFound); + assertTrue(changedFound); + }); + } + +} diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java new file mode 100644 index 00000000000..f4e8ba21fc0 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java @@ -0,0 +1,438 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; +import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; +import org.elasticsearch.client.AdminClient; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Collections.emptyMap; +import static org.elasticsearch.test.VersionUtils.randomVersion; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.startsWith; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TemplateUpgradeServiceTests extends ESTestCase { + + private final ClusterService clusterService = new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null); + + public void testCalculateChangesAddChangeAndDelete() { + + boolean shouldAdd = randomBoolean(); + boolean shouldRemove = randomBoolean(); + boolean shouldChange = randomBoolean(); + + MetaData metaData = randomMetaData( + IndexTemplateMetaData.builder("user_template").build(), + IndexTemplateMetaData.builder("removed_test_template").build(), + IndexTemplateMetaData.builder("changed_test_template").build() + ); + + TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, null, clusterService, null, + Arrays.asList( + templates -> { + if (shouldAdd) { + assertNull(templates.put("added_test_template", IndexTemplateMetaData.builder("added_test_template").build())); + } + return templates; + }, + templates -> { + if (shouldRemove) { + assertNotNull(templates.remove("removed_test_template")); + } + return templates; + }, + templates -> { + if (shouldChange) { + assertNotNull(templates.put("changed_test_template", + IndexTemplateMetaData.builder("changed_test_template").order(10).build())); + } + return templates; + } + )); + + Optional, Set>> optChanges = + service.calculateTemplateChanges(metaData.templates()); + + if (shouldAdd || shouldRemove || shouldChange) { + Tuple, Set> changes = optChanges.orElseThrow(() -> + new AssertionError("Should have non empty changes")); + if (shouldAdd) { + assertThat(changes.v1().get("added_test_template"), notNullValue()); + if (shouldChange) { + assertThat(changes.v1().keySet(), hasSize(2)); + assertThat(changes.v1().get("changed_test_template"), notNullValue()); + } else { + assertThat(changes.v1().keySet(), hasSize(1)); + } + } else { + if (shouldChange) { + assertThat(changes.v1().get("changed_test_template"), notNullValue()); + assertThat(changes.v1().keySet(), hasSize(1)); + } else { + assertThat(changes.v1().keySet(), empty()); + } + } + + if (shouldRemove) { + assertThat(changes.v2(), hasSize(1)); + assertThat(changes.v2().contains("removed_test_template"), equalTo(true)); + } else { + assertThat(changes.v2(), empty()); + } + } else { + assertThat(optChanges.isPresent(), equalTo(false)); + } + } + + + @SuppressWarnings("unchecked") + public void testUpdateTemplates() { + int additionsCount = randomIntBetween(0, 5); + int deletionsCount = randomIntBetween(0, 3); + + List> putTemplateListeners = new ArrayList<>(); + List> deleteTemplateListeners = new ArrayList<>(); + + Client mockClient = mock(Client.class); + AdminClient mockAdminClient = mock(AdminClient.class); + IndicesAdminClient mockIndicesAdminClient = mock(IndicesAdminClient.class); + when(mockClient.admin()).thenReturn(mockAdminClient); + when(mockAdminClient.indices()).thenReturn(mockIndicesAdminClient); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 2; + PutIndexTemplateRequest request = (PutIndexTemplateRequest) args[0]; + assertThat(request.name(), equalTo("add_template_" + request.order())); + putTemplateListeners.add((ActionListener) args[1]); + return null; + }).when(mockIndicesAdminClient).putTemplate(any(PutIndexTemplateRequest.class), any(ActionListener.class)); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 2; + DeleteIndexTemplateRequest request = (DeleteIndexTemplateRequest) args[0]; + assertThat(request.name(), startsWith("remove_template_")); + deleteTemplateListeners.add((ActionListener) args[1]); + return null; + }).when(mockIndicesAdminClient).deleteTemplate(any(DeleteIndexTemplateRequest.class), any(ActionListener.class)); + + Set deletions = new HashSet<>(deletionsCount); + for (int i = 0; i < deletionsCount; i++) { + deletions.add("remove_template_" + i); + } + Map additions = new HashMap<>(additionsCount); + for (int i = 0; i < additionsCount; i++) { + additions.put("add_template_" + i, new BytesArray("{\"index_patterns\" : \"*\", \"order\" : " + i + "}")); + } + + TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, mockClient, clusterService, null, + Collections.emptyList()); + + service.updateTemplates(additions, deletions); + int updatesInProgress = service.getUpdatesInProgress(); + + assertThat(putTemplateListeners, hasSize(additionsCount)); + assertThat(deleteTemplateListeners, hasSize(deletionsCount)); + + for (int i = 0; i < additionsCount; i++) { + if (randomBoolean()) { + putTemplateListeners.get(i).onFailure(new RuntimeException("test - ignore")); + } else { + putTemplateListeners.get(i).onResponse(new PutIndexTemplateResponse(randomBoolean()) { + + }); + } + } + + for (int i = 0; i < deletionsCount; i++) { + if (randomBoolean()) { + int prevUpdatesInProgress = service.getUpdatesInProgress(); + deleteTemplateListeners.get(i).onFailure(new RuntimeException("test - ignore")); + assertThat(prevUpdatesInProgress - service.getUpdatesInProgress(), equalTo(1)); + } else { + int prevUpdatesInProgress = service.getUpdatesInProgress(); + deleteTemplateListeners.get(i).onResponse(new DeleteIndexTemplateResponse(randomBoolean()) { + + }); + assertThat(prevUpdatesInProgress - service.getUpdatesInProgress(), equalTo(1)); + } + } + assertThat(updatesInProgress - service.getUpdatesInProgress(), equalTo(additionsCount + deletionsCount)); + } + + private static final Set MASTER_DATA_ROLES = + Collections.unmodifiableSet(EnumSet.of(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.DATA)); + + @SuppressWarnings("unchecked") + public void testClusterStateUpdate() { + + AtomicReference> addedListener = new AtomicReference<>(); + AtomicReference> changedListener = new AtomicReference<>(); + AtomicReference> removedListener = new AtomicReference<>(); + AtomicInteger updateInvocation = new AtomicInteger(); + + MetaData metaData = randomMetaData( + IndexTemplateMetaData.builder("user_template").build(), + IndexTemplateMetaData.builder("removed_test_template").build(), + IndexTemplateMetaData.builder("changed_test_template").build() + ); + + ThreadPool threadPool = mock(ThreadPool.class); + ExecutorService executorService = mock(ExecutorService.class); + when(threadPool.generic()).thenReturn(executorService); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 1; + Runnable runnable = (Runnable) args[0]; + runnable.run(); + updateInvocation.incrementAndGet(); + return null; + }).when(executorService).execute(any(Runnable.class)); + + Client mockClient = mock(Client.class); + AdminClient mockAdminClient = mock(AdminClient.class); + IndicesAdminClient mockIndicesAdminClient = mock(IndicesAdminClient.class); + when(mockClient.admin()).thenReturn(mockAdminClient); + when(mockAdminClient.indices()).thenReturn(mockIndicesAdminClient); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 2; + PutIndexTemplateRequest request = (PutIndexTemplateRequest) args[0]; + if (request.name().equals("added_test_template")) { + assertThat(addedListener.getAndSet((ActionListener) args[1]), nullValue()); + } else if (request.name().equals("changed_test_template")) { + assertThat(changedListener.getAndSet((ActionListener) args[1]), nullValue()); + } else { + fail("unexpected put template call for " + request.name()); + } + return null; + }).when(mockIndicesAdminClient).putTemplate(any(PutIndexTemplateRequest.class), any(ActionListener.class)); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 2; + DeleteIndexTemplateRequest request = (DeleteIndexTemplateRequest) args[0]; + assertThat(request.name(), startsWith("removed_test_template")); + assertThat(removedListener.getAndSet((ActionListener) args[1]), nullValue()); + return null; + }).when(mockIndicesAdminClient).deleteTemplate(any(DeleteIndexTemplateRequest.class), any(ActionListener.class)); + + TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, mockClient, clusterService, threadPool, + Arrays.asList( + templates -> { + assertNull(templates.put("added_test_template", IndexTemplateMetaData.builder("added_test_template") + .patterns(Collections.singletonList("*")).build())); + return templates; + }, + templates -> { + assertNotNull(templates.remove("removed_test_template")); + return templates; + }, + templates -> { + assertNotNull(templates.put("changed_test_template", IndexTemplateMetaData.builder("changed_test_template") + .patterns(Collections.singletonList("*")).order(10).build())); + return templates; + } + )); + + ClusterState prevState = ClusterState.EMPTY_STATE; + ClusterState state = ClusterState.builder(prevState).nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("node1", "node1", buildNewFakeTransportAddress(), emptyMap(), MASTER_DATA_ROLES, Version.CURRENT) + ).localNodeId("node1").masterNodeId("node1").build() + ).metaData(metaData).build(); + service.clusterChanged(new ClusterChangedEvent("test", state, prevState)); + + assertThat(updateInvocation.get(), equalTo(1)); + assertThat(addedListener.get(), notNullValue()); + assertThat(changedListener.get(), notNullValue()); + assertThat(removedListener.get(), notNullValue()); + + prevState = state; + state = ClusterState.builder(prevState).metaData(MetaData.builder(state.metaData()).removeTemplate("user_template")).build(); + service.clusterChanged(new ClusterChangedEvent("test 2", state, prevState)); + + // Make sure that update wasn't invoked since we are still running + assertThat(updateInvocation.get(), equalTo(1)); + + addedListener.getAndSet(null).onResponse(new PutIndexTemplateResponse(true) { + }); + changedListener.getAndSet(null).onResponse(new PutIndexTemplateResponse(true) { + }); + removedListener.getAndSet(null).onResponse(new DeleteIndexTemplateResponse(true) { + }); + + service.clusterChanged(new ClusterChangedEvent("test 3", state, prevState)); + + // Make sure that update was called this time since we are no longer running + assertThat(updateInvocation.get(), equalTo(2)); + + addedListener.getAndSet(null).onFailure(new RuntimeException("test - ignore")); + changedListener.getAndSet(null).onFailure(new RuntimeException("test - ignore")); + removedListener.getAndSet(null).onFailure(new RuntimeException("test - ignore")); + + service.clusterChanged(new ClusterChangedEvent("test 3", state, prevState)); + + // Make sure that update wasn't called this time since the index template metadata didn't change + assertThat(updateInvocation.get(), equalTo(2)); + } + + private static final int NODE_TEST_ITERS = 100; + + public void testOnlyOneNodeRunsTemplateUpdates() { + TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, null, clusterService, null, Collections.emptyList()); + for (int i = 0; i < NODE_TEST_ITERS; i++) { + int nodesCount = randomIntBetween(1, 10); + int clientNodesCount = randomIntBetween(0, 4); + DiscoveryNodes nodes = randomNodes(nodesCount, clientNodesCount); + int updaterNode = -1; + for (int j = 0; j < nodesCount; j++) { + DiscoveryNodes localNodes = DiscoveryNodes.builder(nodes).localNodeId(nodes.resolveNode("node_" + j).getId()).build(); + if (service.shouldLocalNodeUpdateTemplates(localNodes)) { + assertThat("Expected only one node to update template, found " + updaterNode + " and " + j, updaterNode, lessThan(0)); + updaterNode = j; + } + } + assertThat("Expected one node to update template", updaterNode, greaterThanOrEqualTo(0)); + } + } + + public void testIfMasterHasTheHighestVersionItShouldRunsTemplateUpdates() { + for (int i = 0; i < NODE_TEST_ITERS; i++) { + int nodesCount = randomIntBetween(1, 10); + int clientNodesCount = randomIntBetween(0, 4); + DiscoveryNodes nodes = randomNodes(nodesCount, clientNodesCount); + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(nodes).localNodeId(nodes.resolveNode("_master").getId()); + nodes = builder.build(); + TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, null, clusterService, null, + Collections.emptyList()); + assertThat(service.shouldLocalNodeUpdateTemplates(nodes), + equalTo(nodes.getLargestNonClientNodeVersion().equals(nodes.getMasterNode().getVersion()))); + } + } + + public void testClientNodeDontRunTemplateUpdates() { + for (int i = 0; i < NODE_TEST_ITERS; i++) { + int nodesCount = randomIntBetween(1, 10); + int clientNodesCount = randomIntBetween(1, 4); + DiscoveryNodes nodes = randomNodes(nodesCount, clientNodesCount); + int testClient = randomIntBetween(0, clientNodesCount - 1); + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(nodes).localNodeId(nodes.resolveNode("client_" + testClient).getId()); + TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, null, clusterService, null, + Collections.emptyList()); + assertThat(service.shouldLocalNodeUpdateTemplates(builder.build()), equalTo(false)); + } + } + + private DiscoveryNodes randomNodes(int dataAndMasterNodes, int clientNodes) { + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + String masterNodeId = null; + for (int i = 0; i < dataAndMasterNodes; i++) { + String id = randomAlphaOfLength(10) + "_" + i; + Set roles; + if (i == 0) { + masterNodeId = id; + // The first node has to be master node + if (randomBoolean()) { + roles = EnumSet.of(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.DATA); + } else { + roles = EnumSet.of(DiscoveryNode.Role.MASTER); + } + } else { + if (randomBoolean()) { + roles = EnumSet.of(DiscoveryNode.Role.DATA); + } else { + roles = EnumSet.of(DiscoveryNode.Role.MASTER); + } + } + String node = "node_" + i; + builder.add(new DiscoveryNode(node, id, buildNewFakeTransportAddress(), emptyMap(), roles, randomVersion(random()))); + } + builder.masterNodeId(masterNodeId); // Node 0 is always a master node + + for (int i = 0; i < clientNodes; i++) { + String node = "client_" + i; + builder.add(new DiscoveryNode(node, randomAlphaOfLength(10) + "__" + i, buildNewFakeTransportAddress(), emptyMap(), + EnumSet.noneOf(DiscoveryNode.Role.class), randomVersion(random()))); + } + return builder.build(); + } + + public static MetaData randomMetaData(IndexTemplateMetaData... templates) { + MetaData.Builder builder = MetaData.builder(); + for (IndexTemplateMetaData template : templates) { + builder.put(template); + } + for (int i = 0; i < randomIntBetween(1, 5); i++) { + builder.put( + IndexMetaData.builder(randomAlphaOfLength(10)) + .settings(settings(Version.CURRENT)) + .numberOfReplicas(randomIntBetween(0, 3)) + .numberOfShards(randomIntBetween(1, 5)) + ); + } + return builder.build(); + } +}