From 6f82b0c6e22675a831ed9818ac146f92511d0c43 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 11 Aug 2017 09:51:49 +0200 Subject: [PATCH] Allow `ClusterState.Custom` to be created on initial cluster states (#26144) Today we have a `null` invariant on all `ClusterState.Custom`. This makes several code paths complicated and requires complex state handling in some cases. This change allows to register a custom supplier that is used to initialize the initial clusterstate with these transient customs. --- .../elasticsearch/cluster/ClusterModule.java | 23 +++++-- .../cluster/RestoreInProgress.java | 9 --- .../cluster/SnapshotDeletionsInProgress.java | 4 ++ .../SnapshotInProgressAllocationDecider.java | 2 +- .../cluster/service/ClusterApplier.java | 6 ++ .../service/ClusterApplierService.java | 10 ++- .../cluster/service/ClusterService.java | 19 +++++- .../discovery/single/SingleNodeDiscovery.java | 4 +- .../discovery/zen/ZenDiscovery.java | 6 +- .../org/elasticsearch/gateway/Gateway.java | 2 +- .../java/org/elasticsearch/node/Node.java | 8 ++- .../elasticsearch/plugins/ClusterPlugin.java | 9 ++- .../cluster/ClusterModuleTests.java | 46 +++++++++++++- .../metadata/TemplateUpgradeServiceTests.java | 2 +- .../service/ClusterApplierServiceTests.java | 2 +- .../cluster/service/ClusterSerivceTests.java | 62 +++++++++++++++++++ .../single/SingleNodeDiscoveryTests.java | 6 ++ .../discovery/zen/ZenDiscoveryUnitTests.java | 6 ++ .../gateway/GatewayServiceTests.java | 3 +- .../node/ResponseCollectorServiceTests.java | 4 +- .../DedicatedClusterSnapshotRestoreIT.java | 21 +++++++ .../test/ClusterServiceUtils.java | 3 +- 22 files changed, 220 insertions(+), 37 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 0d7d13bab77..a5b7f422a93 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -73,6 +73,7 @@ import org.elasticsearch.tasks.TaskResultsService; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -94,7 +95,6 @@ public class ClusterModule extends AbstractModule { private final IndexNameExpressionResolver indexNameExpressionResolver; private final AllocationDeciders allocationDeciders; private final AllocationService allocationService; - private final Runnable onStarted; // pkg private for tests final Collection deciderList; final ShardsAllocator shardsAllocator; @@ -107,9 +107,24 @@ public class ClusterModule extends AbstractModule { this.clusterService = clusterService; this.indexNameExpressionResolver = new IndexNameExpressionResolver(settings); this.allocationService = new AllocationService(settings, allocationDeciders, shardsAllocator, clusterInfoService); - this.onStarted = () -> clusterPlugins.forEach(plugin -> plugin.onNodeStarted()); } + public static Map> getClusterStateCustomSuppliers(List clusterPlugins) { + final Map> customSupplier = new HashMap<>(); + customSupplier.put(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress::new); + customSupplier.put(RestoreInProgress.TYPE, RestoreInProgress::new); + customSupplier.put(SnapshotsInProgress.TYPE, SnapshotsInProgress::new); + for (ClusterPlugin plugin : clusterPlugins) { + Map> initialCustomSupplier = plugin.getInitialClusterStateCustomSupplier(); + for (String key : initialCustomSupplier.keySet()) { + if (customSupplier.containsKey(key)) { + throw new IllegalStateException("custom supplier key [" + key + "] is registered more than once"); + } + } + customSupplier.putAll(initialCustomSupplier); + } + return Collections.unmodifiableMap(customSupplier); + } public static List getNamedWriteables() { List entries = new ArrayList<>(); @@ -243,8 +258,4 @@ public class ClusterModule extends AbstractModule { bind(AllocationDeciders.class).toInstance(allocationDeciders); bind(ShardsAllocator.class).toInstance(shardsAllocator); } - - public Runnable onStarted() { - return onStarted; - } } diff --git a/core/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java b/core/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java index 8f6527ccaa7..5c036f94285 100644 --- a/core/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java +++ b/core/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java @@ -45,15 +45,6 @@ public class RestoreInProgress extends AbstractNamedDiffable implements private final List entries; - /** - * Constructs new restore metadata - * - * @param entries list of currently running restore processes - */ - public RestoreInProgress(List entries) { - this.entries = entries; - } - /** * Constructs new restore metadata * diff --git a/core/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java b/core/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java index e0336f61e39..981d6128419 100644 --- a/core/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java +++ b/core/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java @@ -45,6 +45,10 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable i // the list of snapshot deletion request entries private final List entries; + public SnapshotDeletionsInProgress() { + this(Collections.emptyList()); + } + private SnapshotDeletionsInProgress(List entries) { this.entries = Collections.unmodifiableList(entries); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java index 18ee6395bd4..eb4cc0c4420 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java @@ -67,7 +67,7 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider { // Only primary shards are snapshotted SnapshotsInProgress snapshotsInProgress = allocation.custom(SnapshotsInProgress.TYPE); - if (snapshotsInProgress == null) { + if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) { // Snapshots are not running return allocation.decision(Decision.YES, NAME, "no snapshots are currently running"); } diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java index aa5c74e15e8..0a2ef347d06 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java @@ -39,4 +39,10 @@ public interface ClusterApplier { * @param listener callback that is invoked after cluster state is applied */ void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterStateTaskListener listener); + + /** + * Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs. + */ + ClusterState.Builder newClusterStateBuilder(); + } diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index b029f10f5f0..13c2e50eba2 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -97,14 +97,17 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements private final AtomicReference state; // last applied state private NodeConnectionsService nodeConnectionsService; + private Supplier stateBuilderSupplier; - public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, Supplier stateBuilderSupplier) { super(settings); this.clusterSettings = clusterSettings; this.threadPool = threadPool; this.state = new AtomicReference<>(); this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings); this.localNodeMasterListeners = new LocalNodeMasterListeners(threadPool); + this.stateBuilderSupplier = stateBuilderSupplier; } public void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) { @@ -653,4 +656,9 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements protected long currentTimeInNanos() { return System.nanoTime(); } + + @Override + public ClusterState.Builder newClusterStateBuilder() { + return stateBuilderSupplier.get(); + } } diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index 18445e62c7e..7610d75f677 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -42,6 +42,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; import java.util.Map; +import java.util.function.Supplier; public class ClusterService extends AbstractLifecycleComponent { @@ -58,16 +59,30 @@ public class ClusterService extends AbstractLifecycleComponent { private final OperationRouting operationRouting; private final ClusterSettings clusterSettings; + private final Map> initialClusterStateCustoms; - public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, + Map> initialClusterStateCustoms) { super(settings); - this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool); this.masterService = new MasterService(settings, threadPool); this.operationRouting = new OperationRouting(settings, clusterSettings); this.clusterSettings = clusterSettings; this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, this::setSlowTaskLoggingThreshold); + this.initialClusterStateCustoms = initialClusterStateCustoms; + this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool, this::newClusterStateBuilder); + } + + /** + * Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs. + */ + public ClusterState.Builder newClusterStateBuilder() { + ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)); + for (Map.Entry> entry : initialClusterStateCustoms.entrySet()) { + builder.putCustom(entry.getKey(), entry.getValue().get()); + } + return builder; } private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) { diff --git a/core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java index a61253b7c2a..2f3124010cc 100644 --- a/core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java @@ -117,8 +117,8 @@ public class SingleNodeDiscovery extends AbstractLifecycleComponent implements D } protected ClusterState createInitialState(DiscoveryNode localNode) { - return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)) - .nodes(DiscoveryNodes.builder().add(localNode) + ClusterState.Builder builder = clusterApplier.newClusterStateBuilder(); + return builder.nodes(DiscoveryNodes.builder().add(localNode) .localNodeId(localNode.getId()) .masterNodeId(localNode.getId()) .build()) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 72276386a2c..a4817fada36 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -113,7 +113,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover private final TransportService transportService; private final MasterService masterService; - private final ClusterName clusterName; private final DiscoverySettings discoverySettings; protected final ZenPing zenPing; // protected to allow tests access private final MasterFaultDetection masterFD; @@ -169,7 +168,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover this.maxPingsFromAnotherMaster = MAX_PINGS_FROM_ANOTHER_MASTER_SETTING.get(settings); this.sendLeaveRequest = SEND_LEAVE_REQUEST_SETTING.get(settings); this.threadPool = threadPool; - this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); + ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); this.committedState = new AtomicReference<>(); this.masterElectionIgnoreNonMasters = MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING.get(settings); @@ -238,7 +237,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover // set initial state assert committedState.get() == null; assert localNode != null; - ClusterState initialState = ClusterState.builder(clusterName) + ClusterState.Builder builder = clusterApplier.newClusterStateBuilder(); + ClusterState initialState = builder .blocks(ClusterBlocks.builder() .addGlobalBlock(STATE_NOT_RECOVERED_BLOCK) .addGlobalBlock(discoverySettings.getNoMasterBlock())) diff --git a/core/src/main/java/org/elasticsearch/gateway/Gateway.java b/core/src/main/java/org/elasticsearch/gateway/Gateway.java index 4407e97d5a1..2e258ca54de 100644 --- a/core/src/main/java/org/elasticsearch/gateway/Gateway.java +++ b/core/src/main/java/org/elasticsearch/gateway/Gateway.java @@ -155,7 +155,7 @@ public class Gateway extends AbstractComponent implements ClusterStateApplier { metaDataBuilder.transientSettings(), e -> logUnknownSetting("transient", e), (e, ex) -> logInvalidSetting("transient", e, ex))); - ClusterState.Builder builder = ClusterState.builder(clusterService.getClusterName()); + ClusterState.Builder builder = clusterService.newClusterStateBuilder(); builder.metaData(metaDataBuilder); listener.onSuccess(builder.build()); } diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 9752cc4c2dd..10d8ddcf210 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -329,7 +329,10 @@ public class Node implements Closeable { resourcesToClose.add(resourceWatcherService); final NetworkService networkService = new NetworkService( getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class))); - final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool); + + List clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class); + final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool, + ClusterModule.getClusterStateCustomSuppliers(clusterPlugins)); clusterService.addListener(scriptModule.getScriptService()); resourcesToClose.add(clusterService); final IngestService ingestService = new IngestService(settings, threadPool, this.environment, @@ -346,8 +349,7 @@ public class Node implements Closeable { modules.add(pluginModule); } final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService); - ClusterModule clusterModule = new ClusterModule(settings, clusterService, - pluginsService.filterPlugins(ClusterPlugin.class), clusterInfoService); + ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService); modules.add(clusterModule); IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class)); modules.add(indicesModule); diff --git a/core/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java b/core/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java index 612a0f2f9fc..5e58aa5a3a9 100644 --- a/core/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java +++ b/core/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Map; import java.util.function.Supplier; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.common.settings.ClusterSettings; @@ -63,6 +64,12 @@ public interface ClusterPlugin { * Called when the node is started */ default void onNodeStarted() { - } + + /** + * Returns a map of {@link ClusterState.Custom} supplier that should be invoked to initialize the initial clusterstate. + * This allows custom clusterstate extensions to be always present and prevents invariants where clusterstates are published + * but customs are not initialized. + */ + default Map> getInitialClusterStateCustomSupplier() { return Collections.emptyMap(); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java index 67d04c3c235..7ec086d4d68 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java @@ -59,7 +59,7 @@ import java.util.function.Supplier; public class ClusterModuleTests extends ModuleTestCase { private ClusterInfoService clusterInfoService = EmptyClusterInfoService.INSTANCE; private ClusterService clusterService = new ClusterService(Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null); + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, Collections.emptyMap()); static class FakeAllocationDecider extends AllocationDecider { protected FakeAllocationDecider(Settings settings) { super(settings); @@ -196,4 +196,48 @@ public class ClusterModuleTests extends ModuleTestCase { assertSame(decider.getClass(), expectedDeciders.get(idx++)); } } + + public void testCustomSuppliers() { + Map> customSuppliers = ClusterModule.getClusterStateCustomSuppliers(Collections.emptyList()); + assertEquals(3, customSuppliers.size()); + assertTrue(customSuppliers.containsKey(SnapshotsInProgress.TYPE)); + assertTrue(customSuppliers.containsKey(SnapshotDeletionsInProgress.TYPE)); + assertTrue(customSuppliers.containsKey(RestoreInProgress.TYPE)); + + customSuppliers = ClusterModule.getClusterStateCustomSuppliers(Collections.singletonList(new ClusterPlugin() { + @Override + public Map> getInitialClusterStateCustomSupplier() { + return Collections.singletonMap("foo", () -> null); + } + })); + assertEquals(4, customSuppliers.size()); + assertTrue(customSuppliers.containsKey(SnapshotsInProgress.TYPE)); + assertTrue(customSuppliers.containsKey(SnapshotDeletionsInProgress.TYPE)); + assertTrue(customSuppliers.containsKey(RestoreInProgress.TYPE)); + assertTrue(customSuppliers.containsKey("foo")); + + + IllegalStateException ise = expectThrows(IllegalStateException.class, + () -> ClusterModule.getClusterStateCustomSuppliers(Collections.singletonList(new ClusterPlugin() { + @Override + public Map> getInitialClusterStateCustomSupplier() { + return Collections.singletonMap(SnapshotsInProgress.TYPE, () -> null); + } + }))); + assertEquals(ise.getMessage(), "custom supplier key [snapshots] is registered more than once"); + + ise = expectThrows(IllegalStateException.class, + () -> ClusterModule.getClusterStateCustomSuppliers(Arrays.asList(new ClusterPlugin() { + @Override + public Map> getInitialClusterStateCustomSupplier() { + return Collections.singletonMap("foo", () -> null); + } + }, new ClusterPlugin() { + @Override + public Map> getInitialClusterStateCustomSupplier() { + return Collections.singletonMap("foo", () -> null); + } + }))); + assertEquals(ise.getMessage(), "custom supplier key [foo] is registered more than once"); + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java index f4e8ba21fc0..0b32ae2eb99 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java @@ -74,7 +74,7 @@ import static org.mockito.Mockito.when; public class TemplateUpgradeServiceTests extends ESTestCase { private final ClusterService clusterService = new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, - ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null); + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, Collections.emptyMap()); public void testCalculateChangesAddChangeAndDelete() { diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java index 9782dcdc858..34750180ff1 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java @@ -413,7 +413,7 @@ public class ClusterApplierServiceTests extends ESTestCase { public volatile Long currentTimeOverride = null; TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { - super(settings, clusterSettings, threadPool); + super(settings, clusterSettings, threadPool, () -> ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))); } @Override diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java new file mode 100644 index 00000000000..e7cbd04ce4b --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.service; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.Collections; + +public class ClusterSerivceTests extends ESTestCase { + + public void testNewBuilderContainsCustoms() { + ClusterState.Custom custom = new ClusterState.Custom() { + @Override + public Diff diff(ClusterState.Custom previousState) { + return null; + } + + @Override + public String getWriteableName() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return null; + } + }; + ClusterService service = new ClusterService(Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, Collections.singletonMap("foo", () -> + custom)); + ClusterState.Builder builder = service.newClusterStateBuilder(); + assertSame(builder.build().custom("foo"), custom); + } +} diff --git a/core/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java b/core/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java index 135692fd645..f5dabf705fd 100644 --- a/core/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.discovery.single; import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -64,6 +65,11 @@ public class SingleNodeDiscoveryTests extends ESTestCase { clusterState.set(initialState); } + @Override + public ClusterState.Builder newClusterStateBuilder() { + return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)); + } + @Override public void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterStateTaskListener listener) { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index cb88213cfe3..bc653e14e32 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -47,6 +47,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -308,6 +309,11 @@ public class ZenDiscoveryUnitTests extends ESTestCase { } + @Override + public ClusterState.Builder newClusterStateBuilder() { + return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)); + } + @Override public void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterStateTaskListener listener) { listener.clusterStateProcessed(source, clusterStateSupplier.get(), clusterStateSupplier.get()); diff --git a/core/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java b/core/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java index 3e4a3dce091..bd3d5beeaf5 100644 --- a/core/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java @@ -27,13 +27,14 @@ import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; import java.io.IOException; +import java.util.Collections; public class GatewayServiceTests extends ESTestCase { private GatewayService createService(Settings.Builder settings) { ClusterService clusterService = new ClusterService(Settings.builder().put("cluster.name", "GatewayServiceTests").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - null); + null, Collections.emptyMap()); return new GatewayService(settings.build(), null, clusterService, null, null, null, null); } diff --git a/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java b/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java index 2ff3d156247..d620007d2cd 100644 --- a/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java +++ b/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java @@ -22,7 +22,6 @@ package org.elasticsearch.node; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -35,6 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; +import java.util.Collections; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -53,7 +53,7 @@ public class ResponseCollectorServiceTests extends ESTestCase { threadpool = new TestThreadPool("response_collector_tests"); clusterService = new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadpool); + threadpool, Collections.emptyMap()); collector = new ResponseCollectorService(Settings.EMPTY, clusterService); } diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 3cf3c3a2363..1529b050baa 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; @@ -36,6 +37,8 @@ import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.MetaData; @@ -148,6 +151,24 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest return Arrays.asList(MockRepository.Plugin.class, TestCustomMetaDataPlugin.class); } + public void testClusterStateHasCustoms() throws Exception { + ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().all().get(); + assertNotNull(clusterStateResponse.getState().custom(SnapshotsInProgress.TYPE)); + assertNotNull(clusterStateResponse.getState().custom(RestoreInProgress.TYPE)); + assertNotNull(clusterStateResponse.getState().custom(SnapshotDeletionsInProgress.TYPE)); + internalCluster().ensureAtLeastNumDataNodes(2); + if (randomBoolean()) { + internalCluster().fullRestart(); + } else { + internalCluster().rollingRestart(); + } + + clusterStateResponse = client().admin().cluster().prepareState().all().get(); + assertNotNull(clusterStateResponse.getState().custom(SnapshotsInProgress.TYPE)); + assertNotNull(clusterStateResponse.getState().custom(RestoreInProgress.TYPE)); + assertNotNull(clusterStateResponse.getState().custom(SnapshotDeletionsInProgress.TYPE)); + } + public void testRestorePersistentSettings() throws Exception { logger.info("--> start 2 nodes"); internalCluster().startNode(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index f6c595b9b97..df1e216f4bb 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -39,7 +39,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery.AckListener; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.concurrent.CountDownLatch; @@ -133,7 +132,7 @@ public class ClusterServiceUtils { public static ClusterService createClusterService(ThreadPool threadPool, DiscoveryNode localNode, ClusterSettings clusterSettings) { ClusterService clusterService = new ClusterService(Settings.builder().put("cluster.name", "ClusterServiceTests").build(), - clusterSettings, threadPool); + clusterSettings, threadPool, Collections.emptyMap()); clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { @Override public void connectToNodes(DiscoveryNodes discoveryNodes) {