TemplateUpgraders should be called during rolling restart (#25263)

In #24379 we added ability to upgrade templates on full cluster startup. This PR invokes the same update procedure also when a new node first joins the cluster allowing to update templates on a rolling cluster restart as well.

Closes #24680
This commit is contained in:
Igor Motov 2017-06-22 14:55:28 -04:00 committed by GitHub
parent 8dcb1f5c7c
commit e6e5ae6202
7 changed files with 925 additions and 10 deletions

View File

@ -32,7 +32,7 @@ public class DeleteIndexTemplateResponse extends AcknowledgedResponse {
DeleteIndexTemplateResponse() {
}
DeleteIndexTemplateResponse(boolean acknowledged) {
protected DeleteIndexTemplateResponse(boolean acknowledged) {
super(acknowledged);
}

View File

@ -387,6 +387,14 @@ public class IndexTemplateMetaData extends AbstractDiffable<IndexTemplateMetaDat
throws IOException {
builder.startObject(indexTemplateMetaData.name());
toInnerXContent(indexTemplateMetaData, builder, params);
builder.endObject();
}
public static void toInnerXContent(IndexTemplateMetaData indexTemplateMetaData, XContentBuilder builder, ToXContent.Params params)
throws IOException {
builder.field("order", indexTemplateMetaData.order());
if (indexTemplateMetaData.version() != null) {
builder.field("version", indexTemplateMetaData.version());
@ -430,8 +438,6 @@ public class IndexTemplateMetaData extends AbstractDiffable<IndexTemplateMetaDat
AliasMetaData.Builder.toXContent(cursor.value, builder, params);
}
builder.endObject();
builder.endObject();
}
public static IndexTemplateMetaData fromXContent(XContentParser parser, String templateName) throws IOException {

View File

@ -0,0 +1,257 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.indices.IndexTemplateMissingException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.UnaryOperator;
import static java.util.Collections.singletonMap;
/**
* Upgrades Templates on behalf of installed {@link Plugin}s when a node joins the cluster
*/
public class TemplateUpgradeService extends AbstractComponent implements ClusterStateListener {
private final UnaryOperator<Map<String, IndexTemplateMetaData>> indexTemplateMetaDataUpgraders;
public final ClusterService clusterService;
public final ThreadPool threadPool;
public final Client client;
private final AtomicInteger updatesInProgress = new AtomicInteger();
private ImmutableOpenMap<String, IndexTemplateMetaData> lastTemplateMetaData;
public TemplateUpgradeService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool,
Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders) {
super(settings);
this.client = client;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.indexTemplateMetaDataUpgraders = templates -> {
Map<String, IndexTemplateMetaData> upgradedTemplates = new HashMap<>(templates);
for (UnaryOperator<Map<String, IndexTemplateMetaData>> upgrader : indexTemplateMetaDataUpgraders) {
upgradedTemplates = upgrader.apply(upgradedTemplates);
}
return upgradedTemplates;
};
clusterService.addListener(this);
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
ClusterState state = event.state();
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have the index templates,
// while they actually do exist
return;
}
if (updatesInProgress.get() > 0) {
// we are already running some updates - skip this cluster state update
return;
}
ImmutableOpenMap<String, IndexTemplateMetaData> templates = state.getMetaData().getTemplates();
if (templates == lastTemplateMetaData) {
// we already checked these sets of templates - no reason to check it again
// we can do identity check here because due to cluster state diffs the actual map will not change
// if there were no changes
return;
}
if (shouldLocalNodeUpdateTemplates(state.nodes()) == false) {
return;
}
lastTemplateMetaData = templates;
Optional<Tuple<Map<String, BytesReference>, Set<String>>> changes = calculateTemplateChanges(templates);
if (changes.isPresent()) {
if (updatesInProgress.compareAndSet(0, changes.get().v1().size() + changes.get().v2().size())) {
threadPool.generic().execute(() -> updateTemplates(changes.get().v1(), changes.get().v2()));
}
}
}
/**
* Checks if the current node should update the templates
*
* If the master has the newest verison in the cluster - it will be dedicated template updater.
* Otherwise the node with the highest id among nodes with the highest version should update the templates
*/
boolean shouldLocalNodeUpdateTemplates(DiscoveryNodes nodes) {
DiscoveryNode localNode = nodes.getLocalNode();
// Only data and master nodes should update the template
if (localNode.isDataNode() || localNode.isMasterNode()) {
Version maxVersion = nodes.getLargestNonClientNodeVersion();
if (maxVersion.equals(nodes.getMasterNode().getVersion())) {
// If the master has the latest version - we will allow it to handle the update
return nodes.isLocalNodeElectedMaster();
} else {
if (maxVersion.equals(localNode.getVersion()) == false) {
// The localhost node doesn't have the latest version - not going to update
return false;
}
for (ObjectCursor<DiscoveryNode> node : nodes.getMasterAndDataNodes().values()) {
if (node.value.getVersion().equals(maxVersion) && node.value.getId().compareTo(localNode.getId()) > 0) {
// We have a node with higher id then mine - it should update
return false;
}
}
// We have the highest version and highest id - we should perform the update
return true;
}
} else {
return false;
}
}
void updateTemplates(Map<String, BytesReference> changes, Set<String> deletions) {
for (Map.Entry<String, BytesReference> change : changes.entrySet()) {
PutIndexTemplateRequest request =
new PutIndexTemplateRequest(change.getKey()).source(change.getValue(), XContentType.JSON);
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
@Override
public void onResponse(PutIndexTemplateResponse response) {
updatesInProgress.decrementAndGet();
if (response.isAcknowledged() == false) {
logger.warn("Error updating template [{}], request was not acknowledged", change.getKey());
}
}
@Override
public void onFailure(Exception e) {
updatesInProgress.decrementAndGet();
logger.warn(new ParameterizedMessage("Error updating template [{}]", change.getKey()), e);
}
});
}
for (String template : deletions) {
DeleteIndexTemplateRequest request = new DeleteIndexTemplateRequest(template);
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
client.admin().indices().deleteTemplate(request, new ActionListener<DeleteIndexTemplateResponse>() {
@Override
public void onResponse(DeleteIndexTemplateResponse response) {
updatesInProgress.decrementAndGet();
if (response.isAcknowledged() == false) {
logger.warn("Error deleting template [{}], request was not acknowledged", template);
}
}
@Override
public void onFailure(Exception e) {
updatesInProgress.decrementAndGet();
if (e instanceof IndexTemplateMissingException == false) {
// we might attempt to delete the same template from different nodes - so that's ok if template doesn't exist
// otherwise we need to warn
logger.warn(new ParameterizedMessage("Error deleting template [{}]", template), e);
}
}
});
}
}
int getUpdatesInProgress() {
return updatesInProgress.get();
}
Optional<Tuple<Map<String, BytesReference>, Set<String>>> calculateTemplateChanges(
ImmutableOpenMap<String, IndexTemplateMetaData> templates) {
// collect current templates
Map<String, IndexTemplateMetaData> existingMap = new HashMap<>();
for (ObjectObjectCursor<String, IndexTemplateMetaData> customCursor : templates) {
existingMap.put(customCursor.key, customCursor.value);
}
// upgrade global custom meta data
Map<String, IndexTemplateMetaData> upgradedMap = indexTemplateMetaDataUpgraders.apply(existingMap);
if (upgradedMap.equals(existingMap) == false) {
Set<String> deletes = new HashSet<>();
Map<String, BytesReference> changes = new HashMap<>();
// remove templates if needed
existingMap.keySet().forEach(s -> {
if (upgradedMap.containsKey(s) == false) {
deletes.add(s);
}
});
upgradedMap.forEach((key, value) -> {
if (value.equals(existingMap.get(key)) == false) {
changes.put(key, toBytesReference(value));
}
});
return Optional.of(new Tuple<>(changes, deletes));
}
return Optional.empty();
}
private static final ToXContent.Params PARAMS = new ToXContent.MapParams(singletonMap("reduce_mappings", "true"));
private BytesReference toBytesReference(IndexTemplateMetaData templateMetaData) {
try {
return XContentHelper.toXContent((builder, params) -> {
IndexTemplateMetaData.Builder.toInnerXContent(templateMetaData, builder, params);
return builder;
}, XContentType.JSON, PARAMS, false);
} catch (IOException ex) {
throw new IllegalStateException("Cannot serialize template [" + templateMetaData.getName() + "]", ex);
}
}
}

View File

@ -56,13 +56,14 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
private final String masterNodeId;
private final String localNodeId;
private final Version minNonClientNodeVersion;
private final Version maxNonClientNodeVersion;
private final Version maxNodeVersion;
private final Version minNodeVersion;
private DiscoveryNodes(ImmutableOpenMap<String, DiscoveryNode> nodes, ImmutableOpenMap<String, DiscoveryNode> dataNodes,
ImmutableOpenMap<String, DiscoveryNode> masterNodes, ImmutableOpenMap<String, DiscoveryNode> ingestNodes,
String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNodeVersion,
Version minNodeVersion) {
String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNonClientNodeVersion,
Version maxNodeVersion, Version minNodeVersion) {
this.nodes = nodes;
this.dataNodes = dataNodes;
this.masterNodes = masterNodes;
@ -70,6 +71,7 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
this.masterNodeId = masterNodeId;
this.localNodeId = localNodeId;
this.minNonClientNodeVersion = minNonClientNodeVersion;
this.maxNonClientNodeVersion = maxNonClientNodeVersion;
this.minNodeVersion = minNodeVersion;
this.maxNodeVersion = maxNodeVersion;
}
@ -234,12 +236,25 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
/**
* Returns the version of the node with the oldest version in the cluster that is not a client node
*
* If there are no non-client nodes, Version.CURRENT will be returned.
*
* @return the oldest version in the cluster
*/
public Version getSmallestNonClientNodeVersion() {
return minNonClientNodeVersion;
}
/**
* Returns the version of the node with the youngest version in the cluster that is not a client node.
*
* If there are no non-client nodes, Version.CURRENT will be returned.
*
* @return the youngest version in the cluster
*/
public Version getLargestNonClientNodeVersion() {
return maxNonClientNodeVersion;
}
/**
* Returns the version of the node with the oldest version in the cluster.
*
@ -252,7 +267,7 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
/**
* Returns the version of the node with the youngest version in the cluster
*
* @return the oldest version in the cluster
* @return the youngest version in the cluster
*/
public Version getMaxNodeVersion() {
return maxNodeVersion;
@ -654,15 +669,25 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
ImmutableOpenMap.Builder<String, DiscoveryNode> ingestNodesBuilder = ImmutableOpenMap.builder();
Version minNodeVersion = Version.CURRENT;
Version maxNodeVersion = Version.CURRENT;
Version minNonClientNodeVersion = Version.CURRENT;
// The node where we are building this on might not be a master or a data node, so we cannot assume
// that there is a node with the current version as a part of the cluster.
Version minNonClientNodeVersion = null;
Version maxNonClientNodeVersion = null;
for (ObjectObjectCursor<String, DiscoveryNode> nodeEntry : nodes) {
if (nodeEntry.value.isDataNode()) {
dataNodesBuilder.put(nodeEntry.key, nodeEntry.value);
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
}
if (nodeEntry.value.isMasterNode()) {
masterNodesBuilder.put(nodeEntry.key, nodeEntry.value);
}
if (nodeEntry.value.isDataNode() || nodeEntry.value.isMasterNode()) {
if (minNonClientNodeVersion == null) {
minNonClientNodeVersion = nodeEntry.value.getVersion();
maxNonClientNodeVersion = nodeEntry.value.getVersion();
} else {
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
maxNonClientNodeVersion = Version.max(maxNonClientNodeVersion, nodeEntry.value.getVersion());
}
}
if (nodeEntry.value.isIngestNode()) {
ingestNodesBuilder.put(nodeEntry.key, nodeEntry.value);
@ -673,7 +698,8 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
return new DiscoveryNodes(
nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), ingestNodesBuilder.build(),
masterNodeId, localNodeId, minNonClientNodeVersion, maxNodeVersion, minNodeVersion
masterNodeId, localNodeId, minNonClientNodeVersion == null ? Version.CURRENT : minNonClientNodeVersion,
maxNonClientNodeVersion == null ? Version.CURRENT : maxNonClientNodeVersion, maxNodeVersion, minNodeVersion
);
}

View File

@ -47,6 +47,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.metadata.TemplateUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.service.ClusterService;
@ -415,6 +416,7 @@ public class Node implements Closeable {
Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream()
.map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList());
final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders);
new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders);
final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());

View File

@ -0,0 +1,186 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.UnaryOperator;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
public class TemplateUpgradeServiceIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(TestPlugin.class);
}
public static class TestPlugin extends Plugin {
// This setting is used to simulate cluster state updates
static final Setting<Integer> UPDATE_TEMPLATE_DUMMY_SETTING =
Setting.intSetting("tests.update_template_count", 0, Setting.Property.NodeScope, Setting.Property.Dynamic);
protected final Logger logger;
protected final Settings settings;
public TestPlugin(Settings settings) {
this.logger = Loggers.getLogger(getClass(), settings);
this.settings = settings;
}
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry) {
clusterService.getClusterSettings().addSettingsUpdateConsumer(UPDATE_TEMPLATE_DUMMY_SETTING, integer -> {
logger.debug("the template dummy setting was updated to {}", integer);
});
return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry);
}
@Override
public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDataUpgrader() {
return templates -> {
templates.put("test_added_template", IndexTemplateMetaData.builder("test_added_template")
.patterns(Collections.singletonList("*")).build());
templates.remove("test_removed_template");
templates.put("test_changed_template", IndexTemplateMetaData.builder("test_changed_template").order(10)
.patterns(Collections.singletonList("*")).build());
return templates;
};
}
@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(UPDATE_TEMPLATE_DUMMY_SETTING);
}
}
public void testTemplateUpdate() throws Exception {
assertTemplates();
// Change some templates
assertAcked(client().admin().indices().preparePutTemplate("test_dummy_template").setOrder(0)
.setPatterns(Collections.singletonList("*")).get());
assertAcked(client().admin().indices().preparePutTemplate("test_changed_template").setOrder(0)
.setPatterns(Collections.singletonList("*")).get());
assertAcked(client().admin().indices().preparePutTemplate("test_removed_template").setOrder(1)
.setPatterns(Collections.singletonList("*")).get());
// Wait for the templates to be updated back to normal
assertBusy(() -> {
List<IndexTemplateMetaData> templates = client().admin().indices().prepareGetTemplates("test_*").get().getIndexTemplates();
assertThat(templates.size(), equalTo(3));
boolean addedFound = false;
boolean changedFound = false;
boolean dummyFound = false;
for (int i = 0; i < 3; i++) {
IndexTemplateMetaData templateMetaData = templates.get(i);
switch (templateMetaData.getName()) {
case "test_added_template":
assertFalse(addedFound);
addedFound = true;
break;
case "test_changed_template":
assertFalse(changedFound);
changedFound = true;
assertThat(templateMetaData.getOrder(), equalTo(10));
break;
case "test_dummy_template":
assertFalse(dummyFound);
dummyFound = true;
break;
default:
fail("unexpected template " + templateMetaData.getName());
break;
}
}
assertTrue(addedFound);
assertTrue(changedFound);
assertTrue(dummyFound);
});
// Wipe out all templates
assertAcked(client().admin().indices().prepareDeleteTemplate("test_*").get());
assertTemplates();
}
private void assertTemplates() throws Exception {
AtomicInteger updateCount = new AtomicInteger();
// Make sure all templates are recreated correctly
assertBusy(() -> {
// the updates only happen on cluster state updates, so we need to make sure that the cluster state updates are happening
// so we need to simulate updates to make sure the template upgrade kicks in
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(
Settings.builder().put(TestPlugin.UPDATE_TEMPLATE_DUMMY_SETTING.getKey(), updateCount.incrementAndGet())
).get());
List<IndexTemplateMetaData> templates = client().admin().indices().prepareGetTemplates("test_*").get().getIndexTemplates();
assertThat(templates.size(), equalTo(2));
boolean addedFound = false;
boolean changedFound = false;
for (int i = 0; i < 2; i++) {
IndexTemplateMetaData templateMetaData = templates.get(i);
switch (templateMetaData.getName()) {
case "test_added_template":
assertFalse(addedFound);
addedFound = true;
break;
case "test_changed_template":
assertFalse(changedFound);
changedFound = true;
assertThat(templateMetaData.getOrder(), equalTo(10));
break;
default:
fail("unexpected template " + templateMetaData.getName());
break;
}
}
assertTrue(addedFound);
assertTrue(changedFound);
});
}
}

View File

@ -0,0 +1,438 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TemplateUpgradeServiceTests extends ESTestCase {
private final ClusterService clusterService = new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null);
public void testCalculateChangesAddChangeAndDelete() {
boolean shouldAdd = randomBoolean();
boolean shouldRemove = randomBoolean();
boolean shouldChange = randomBoolean();
MetaData metaData = randomMetaData(
IndexTemplateMetaData.builder("user_template").build(),
IndexTemplateMetaData.builder("removed_test_template").build(),
IndexTemplateMetaData.builder("changed_test_template").build()
);
TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, null, clusterService, null,
Arrays.asList(
templates -> {
if (shouldAdd) {
assertNull(templates.put("added_test_template", IndexTemplateMetaData.builder("added_test_template").build()));
}
return templates;
},
templates -> {
if (shouldRemove) {
assertNotNull(templates.remove("removed_test_template"));
}
return templates;
},
templates -> {
if (shouldChange) {
assertNotNull(templates.put("changed_test_template",
IndexTemplateMetaData.builder("changed_test_template").order(10).build()));
}
return templates;
}
));
Optional<Tuple<Map<String, BytesReference>, Set<String>>> optChanges =
service.calculateTemplateChanges(metaData.templates());
if (shouldAdd || shouldRemove || shouldChange) {
Tuple<Map<String, BytesReference>, Set<String>> changes = optChanges.orElseThrow(() ->
new AssertionError("Should have non empty changes"));
if (shouldAdd) {
assertThat(changes.v1().get("added_test_template"), notNullValue());
if (shouldChange) {
assertThat(changes.v1().keySet(), hasSize(2));
assertThat(changes.v1().get("changed_test_template"), notNullValue());
} else {
assertThat(changes.v1().keySet(), hasSize(1));
}
} else {
if (shouldChange) {
assertThat(changes.v1().get("changed_test_template"), notNullValue());
assertThat(changes.v1().keySet(), hasSize(1));
} else {
assertThat(changes.v1().keySet(), empty());
}
}
if (shouldRemove) {
assertThat(changes.v2(), hasSize(1));
assertThat(changes.v2().contains("removed_test_template"), equalTo(true));
} else {
assertThat(changes.v2(), empty());
}
} else {
assertThat(optChanges.isPresent(), equalTo(false));
}
}
@SuppressWarnings("unchecked")
public void testUpdateTemplates() {
int additionsCount = randomIntBetween(0, 5);
int deletionsCount = randomIntBetween(0, 3);
List<ActionListener<PutIndexTemplateResponse>> putTemplateListeners = new ArrayList<>();
List<ActionListener<DeleteIndexTemplateResponse>> deleteTemplateListeners = new ArrayList<>();
Client mockClient = mock(Client.class);
AdminClient mockAdminClient = mock(AdminClient.class);
IndicesAdminClient mockIndicesAdminClient = mock(IndicesAdminClient.class);
when(mockClient.admin()).thenReturn(mockAdminClient);
when(mockAdminClient.indices()).thenReturn(mockIndicesAdminClient);
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
assert args.length == 2;
PutIndexTemplateRequest request = (PutIndexTemplateRequest) args[0];
assertThat(request.name(), equalTo("add_template_" + request.order()));
putTemplateListeners.add((ActionListener) args[1]);
return null;
}).when(mockIndicesAdminClient).putTemplate(any(PutIndexTemplateRequest.class), any(ActionListener.class));
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
assert args.length == 2;
DeleteIndexTemplateRequest request = (DeleteIndexTemplateRequest) args[0];
assertThat(request.name(), startsWith("remove_template_"));
deleteTemplateListeners.add((ActionListener) args[1]);
return null;
}).when(mockIndicesAdminClient).deleteTemplate(any(DeleteIndexTemplateRequest.class), any(ActionListener.class));
Set<String> deletions = new HashSet<>(deletionsCount);
for (int i = 0; i < deletionsCount; i++) {
deletions.add("remove_template_" + i);
}
Map<String, BytesReference> additions = new HashMap<>(additionsCount);
for (int i = 0; i < additionsCount; i++) {
additions.put("add_template_" + i, new BytesArray("{\"index_patterns\" : \"*\", \"order\" : " + i + "}"));
}
TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, mockClient, clusterService, null,
Collections.emptyList());
service.updateTemplates(additions, deletions);
int updatesInProgress = service.getUpdatesInProgress();
assertThat(putTemplateListeners, hasSize(additionsCount));
assertThat(deleteTemplateListeners, hasSize(deletionsCount));
for (int i = 0; i < additionsCount; i++) {
if (randomBoolean()) {
putTemplateListeners.get(i).onFailure(new RuntimeException("test - ignore"));
} else {
putTemplateListeners.get(i).onResponse(new PutIndexTemplateResponse(randomBoolean()) {
});
}
}
for (int i = 0; i < deletionsCount; i++) {
if (randomBoolean()) {
int prevUpdatesInProgress = service.getUpdatesInProgress();
deleteTemplateListeners.get(i).onFailure(new RuntimeException("test - ignore"));
assertThat(prevUpdatesInProgress - service.getUpdatesInProgress(), equalTo(1));
} else {
int prevUpdatesInProgress = service.getUpdatesInProgress();
deleteTemplateListeners.get(i).onResponse(new DeleteIndexTemplateResponse(randomBoolean()) {
});
assertThat(prevUpdatesInProgress - service.getUpdatesInProgress(), equalTo(1));
}
}
assertThat(updatesInProgress - service.getUpdatesInProgress(), equalTo(additionsCount + deletionsCount));
}
private static final Set<DiscoveryNode.Role> MASTER_DATA_ROLES =
Collections.unmodifiableSet(EnumSet.of(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.DATA));
@SuppressWarnings("unchecked")
public void testClusterStateUpdate() {
AtomicReference<ActionListener<PutIndexTemplateResponse>> addedListener = new AtomicReference<>();
AtomicReference<ActionListener<PutIndexTemplateResponse>> changedListener = new AtomicReference<>();
AtomicReference<ActionListener<DeleteIndexTemplateResponse>> removedListener = new AtomicReference<>();
AtomicInteger updateInvocation = new AtomicInteger();
MetaData metaData = randomMetaData(
IndexTemplateMetaData.builder("user_template").build(),
IndexTemplateMetaData.builder("removed_test_template").build(),
IndexTemplateMetaData.builder("changed_test_template").build()
);
ThreadPool threadPool = mock(ThreadPool.class);
ExecutorService executorService = mock(ExecutorService.class);
when(threadPool.generic()).thenReturn(executorService);
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
assert args.length == 1;
Runnable runnable = (Runnable) args[0];
runnable.run();
updateInvocation.incrementAndGet();
return null;
}).when(executorService).execute(any(Runnable.class));
Client mockClient = mock(Client.class);
AdminClient mockAdminClient = mock(AdminClient.class);
IndicesAdminClient mockIndicesAdminClient = mock(IndicesAdminClient.class);
when(mockClient.admin()).thenReturn(mockAdminClient);
when(mockAdminClient.indices()).thenReturn(mockIndicesAdminClient);
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
assert args.length == 2;
PutIndexTemplateRequest request = (PutIndexTemplateRequest) args[0];
if (request.name().equals("added_test_template")) {
assertThat(addedListener.getAndSet((ActionListener) args[1]), nullValue());
} else if (request.name().equals("changed_test_template")) {
assertThat(changedListener.getAndSet((ActionListener) args[1]), nullValue());
} else {
fail("unexpected put template call for " + request.name());
}
return null;
}).when(mockIndicesAdminClient).putTemplate(any(PutIndexTemplateRequest.class), any(ActionListener.class));
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
assert args.length == 2;
DeleteIndexTemplateRequest request = (DeleteIndexTemplateRequest) args[0];
assertThat(request.name(), startsWith("removed_test_template"));
assertThat(removedListener.getAndSet((ActionListener) args[1]), nullValue());
return null;
}).when(mockIndicesAdminClient).deleteTemplate(any(DeleteIndexTemplateRequest.class), any(ActionListener.class));
TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, mockClient, clusterService, threadPool,
Arrays.asList(
templates -> {
assertNull(templates.put("added_test_template", IndexTemplateMetaData.builder("added_test_template")
.patterns(Collections.singletonList("*")).build()));
return templates;
},
templates -> {
assertNotNull(templates.remove("removed_test_template"));
return templates;
},
templates -> {
assertNotNull(templates.put("changed_test_template", IndexTemplateMetaData.builder("changed_test_template")
.patterns(Collections.singletonList("*")).order(10).build()));
return templates;
}
));
ClusterState prevState = ClusterState.EMPTY_STATE;
ClusterState state = ClusterState.builder(prevState).nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("node1", "node1", buildNewFakeTransportAddress(), emptyMap(), MASTER_DATA_ROLES, Version.CURRENT)
).localNodeId("node1").masterNodeId("node1").build()
).metaData(metaData).build();
service.clusterChanged(new ClusterChangedEvent("test", state, prevState));
assertThat(updateInvocation.get(), equalTo(1));
assertThat(addedListener.get(), notNullValue());
assertThat(changedListener.get(), notNullValue());
assertThat(removedListener.get(), notNullValue());
prevState = state;
state = ClusterState.builder(prevState).metaData(MetaData.builder(state.metaData()).removeTemplate("user_template")).build();
service.clusterChanged(new ClusterChangedEvent("test 2", state, prevState));
// Make sure that update wasn't invoked since we are still running
assertThat(updateInvocation.get(), equalTo(1));
addedListener.getAndSet(null).onResponse(new PutIndexTemplateResponse(true) {
});
changedListener.getAndSet(null).onResponse(new PutIndexTemplateResponse(true) {
});
removedListener.getAndSet(null).onResponse(new DeleteIndexTemplateResponse(true) {
});
service.clusterChanged(new ClusterChangedEvent("test 3", state, prevState));
// Make sure that update was called this time since we are no longer running
assertThat(updateInvocation.get(), equalTo(2));
addedListener.getAndSet(null).onFailure(new RuntimeException("test - ignore"));
changedListener.getAndSet(null).onFailure(new RuntimeException("test - ignore"));
removedListener.getAndSet(null).onFailure(new RuntimeException("test - ignore"));
service.clusterChanged(new ClusterChangedEvent("test 3", state, prevState));
// Make sure that update wasn't called this time since the index template metadata didn't change
assertThat(updateInvocation.get(), equalTo(2));
}
private static final int NODE_TEST_ITERS = 100;
public void testOnlyOneNodeRunsTemplateUpdates() {
TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, null, clusterService, null, Collections.emptyList());
for (int i = 0; i < NODE_TEST_ITERS; i++) {
int nodesCount = randomIntBetween(1, 10);
int clientNodesCount = randomIntBetween(0, 4);
DiscoveryNodes nodes = randomNodes(nodesCount, clientNodesCount);
int updaterNode = -1;
for (int j = 0; j < nodesCount; j++) {
DiscoveryNodes localNodes = DiscoveryNodes.builder(nodes).localNodeId(nodes.resolveNode("node_" + j).getId()).build();
if (service.shouldLocalNodeUpdateTemplates(localNodes)) {
assertThat("Expected only one node to update template, found " + updaterNode + " and " + j, updaterNode, lessThan(0));
updaterNode = j;
}
}
assertThat("Expected one node to update template", updaterNode, greaterThanOrEqualTo(0));
}
}
public void testIfMasterHasTheHighestVersionItShouldRunsTemplateUpdates() {
for (int i = 0; i < NODE_TEST_ITERS; i++) {
int nodesCount = randomIntBetween(1, 10);
int clientNodesCount = randomIntBetween(0, 4);
DiscoveryNodes nodes = randomNodes(nodesCount, clientNodesCount);
DiscoveryNodes.Builder builder = DiscoveryNodes.builder(nodes).localNodeId(nodes.resolveNode("_master").getId());
nodes = builder.build();
TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, null, clusterService, null,
Collections.emptyList());
assertThat(service.shouldLocalNodeUpdateTemplates(nodes),
equalTo(nodes.getLargestNonClientNodeVersion().equals(nodes.getMasterNode().getVersion())));
}
}
public void testClientNodeDontRunTemplateUpdates() {
for (int i = 0; i < NODE_TEST_ITERS; i++) {
int nodesCount = randomIntBetween(1, 10);
int clientNodesCount = randomIntBetween(1, 4);
DiscoveryNodes nodes = randomNodes(nodesCount, clientNodesCount);
int testClient = randomIntBetween(0, clientNodesCount - 1);
DiscoveryNodes.Builder builder = DiscoveryNodes.builder(nodes).localNodeId(nodes.resolveNode("client_" + testClient).getId());
TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, null, clusterService, null,
Collections.emptyList());
assertThat(service.shouldLocalNodeUpdateTemplates(builder.build()), equalTo(false));
}
}
private DiscoveryNodes randomNodes(int dataAndMasterNodes, int clientNodes) {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
String masterNodeId = null;
for (int i = 0; i < dataAndMasterNodes; i++) {
String id = randomAlphaOfLength(10) + "_" + i;
Set<DiscoveryNode.Role> roles;
if (i == 0) {
masterNodeId = id;
// The first node has to be master node
if (randomBoolean()) {
roles = EnumSet.of(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.DATA);
} else {
roles = EnumSet.of(DiscoveryNode.Role.MASTER);
}
} else {
if (randomBoolean()) {
roles = EnumSet.of(DiscoveryNode.Role.DATA);
} else {
roles = EnumSet.of(DiscoveryNode.Role.MASTER);
}
}
String node = "node_" + i;
builder.add(new DiscoveryNode(node, id, buildNewFakeTransportAddress(), emptyMap(), roles, randomVersion(random())));
}
builder.masterNodeId(masterNodeId); // Node 0 is always a master node
for (int i = 0; i < clientNodes; i++) {
String node = "client_" + i;
builder.add(new DiscoveryNode(node, randomAlphaOfLength(10) + "__" + i, buildNewFakeTransportAddress(), emptyMap(),
EnumSet.noneOf(DiscoveryNode.Role.class), randomVersion(random())));
}
return builder.build();
}
public static MetaData randomMetaData(IndexTemplateMetaData... templates) {
MetaData.Builder builder = MetaData.builder();
for (IndexTemplateMetaData template : templates) {
builder.put(template);
}
for (int i = 0; i < randomIntBetween(1, 5); i++) {
builder.put(
IndexMetaData.builder(randomAlphaOfLength(10))
.settings(settings(Version.CURRENT))
.numberOfReplicas(randomIntBetween(0, 3))
.numberOfShards(randomIntBetween(1, 5))
);
}
return builder.build();
}
}