diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/InMemoryPersistedState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/InMemoryPersistedState.java similarity index 100% rename from test/framework/src/main/java/org/elasticsearch/cluster/coordination/InMemoryPersistedState.java rename to server/src/main/java/org/elasticsearch/cluster/coordination/InMemoryPersistedState.java diff --git a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index e63e03fd87c..1d01b2d3cf6 100644 --- a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -22,10 +22,13 @@ package org.elasticsearch.discovery; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; +import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; @@ -58,14 +61,19 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; + /** * A module for loading classes for node discovery. */ public class DiscoveryModule { private static final Logger logger = LogManager.getLogger(DiscoveryModule.class); + public static final String ZEN_DISCOVERY_TYPE = "zen"; + public static final String ZEN2_DISCOVERY_TYPE = "zen2"; + public static final Setting DISCOVERY_TYPE_SETTING = - new Setting<>("discovery.type", "zen", Function.identity(), Property.NodeScope); + new Setting<>("discovery.type", ZEN_DISCOVERY_TYPE, Function.identity(), Property.NodeScope); public static final Setting> DISCOVERY_HOSTS_PROVIDER_SETTING = Setting.listSetting("discovery.zen.hosts_provider", Collections.emptyList(), Function.identity(), Property.NodeScope); @@ -75,14 +83,14 @@ public class DiscoveryModule { NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService, ClusterApplier clusterApplier, ClusterSettings clusterSettings, List plugins, AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState) { - final Collection> joinValidators = new ArrayList<>(); + final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); hostProviders.put("settings", () -> new SettingsBasedHostsProvider(settings, transportService)); hostProviders.put("file", () -> new FileBasedUnicastHostsProvider(configFile)); for (DiscoveryPlugin plugin : plugins) { - plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> { - if (hostProviders.put(entry.getKey(), entry.getValue()) != null) { - throw new IllegalArgumentException("Cannot register zen hosts provider [" + entry.getKey() + "] twice"); + plugin.getZenHostsProviders(transportService, networkService).forEach((key, value) -> { + if (hostProviders.put(key, value) != null) { + throw new IllegalArgumentException("Cannot register zen hosts provider [" + key + "] twice"); } }); BiConsumer joinValidator = plugin.getJoinValidator(); @@ -117,18 +125,21 @@ public class DiscoveryModule { }; Map> discoveryTypes = new HashMap<>(); - discoveryTypes.put("zen", + discoveryTypes.put(ZEN_DISCOVERY_TYPE, () -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, clusterSettings, hostsProvider, allocationService, Collections.unmodifiableCollection(joinValidators), gatewayMetaState)); + discoveryTypes.put(ZEN2_DISCOVERY_TYPE, () -> new Coordinator(NODE_NAME_SETTING.get(settings), settings, clusterSettings, + transportService, namedWriteableRegistry, allocationService, masterService, + () -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), hostsProvider, clusterApplier, + Randomness.get())); discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier)); for (DiscoveryPlugin plugin : plugins) { - plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry, - masterService, clusterApplier, clusterSettings, hostsProvider, allocationService, gatewayMetaState).entrySet() - .forEach(entry -> { - if (discoveryTypes.put(entry.getKey(), entry.getValue()) != null) { - throw new IllegalArgumentException("Cannot register discovery type [" + entry.getKey() + "] twice"); - } - }); + plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, clusterSettings, + hostsProvider, allocationService, gatewayMetaState).forEach((key, value) -> { + if (discoveryTypes.put(key, value) != null) { + throw new IllegalArgumentException("Cannot register discovery type [" + key + "] twice"); + } + }); } String discoveryType = DISCOVERY_TYPE_SETTING.get(settings); Supplier discoverySupplier = discoveryTypes.get(discoveryType); diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index c5bfa922490..7d3a905d30e 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -30,6 +30,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.CoordinationState; +import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState; +import org.elasticsearch.cluster.coordination.InMemoryPersistedState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; @@ -37,6 +39,7 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.Tuple; @@ -109,6 +112,17 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState. incrementalWrite = false; } + public PersistedState getPersistedState(Settings settings, ClusterApplierService clusterApplierService) { + applyClusterStateUpdaters(); + if (DiscoveryNode.isMasterNode(settings) == false) { + // use Zen1 way of writing cluster state for non-master-eligible nodes + // this avoids concurrent manipulating of IndexMetadata with IndicesStore + clusterApplierService.addLowPriorityApplier(this); + return new InMemoryPersistedState(getCurrentTerm(), getLastAcceptedState()); + } + return this; + } + private void initializeClusterState(ClusterName clusterName) throws IOException { long startNS = System.nanoTime(); Tuple manifestAndMetaData = metaStateService.loadFullState(); diff --git a/server/src/test/java/org/elasticsearch/bootstrap/BootstrapChecksTests.java b/server/src/test/java/org/elasticsearch/bootstrap/BootstrapChecksTests.java index 1e18135f4eb..4f3a3a615da 100644 --- a/server/src/test/java/org/elasticsearch/bootstrap/BootstrapChecksTests.java +++ b/server/src/test/java/org/elasticsearch/bootstrap/BootstrapChecksTests.java @@ -40,6 +40,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import static org.elasticsearch.discovery.DiscoveryModule.ZEN2_DISCOVERY_TYPE; +import static org.elasticsearch.discovery.DiscoveryModule.ZEN_DISCOVERY_TYPE; import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; @@ -101,7 +103,7 @@ public class BootstrapChecksTests extends ESTestCase { when(boundTransportAddress.boundAddresses()).thenReturn(transportAddresses.toArray(new TransportAddress[0])); when(boundTransportAddress.publishAddress()).thenReturn(publishAddress); - final String discoveryType = randomFrom("zen", "single-node"); + final String discoveryType = randomFrom(ZEN_DISCOVERY_TYPE, ZEN2_DISCOVERY_TYPE, "single-node"); assertEquals(BootstrapChecks.enforceLimits(boundTransportAddress, discoveryType), !"single-node".equals(discoveryType)); } @@ -119,7 +121,7 @@ public class BootstrapChecksTests extends ESTestCase { when(boundTransportAddress.boundAddresses()).thenReturn(transportAddresses.toArray(new TransportAddress[0])); when(boundTransportAddress.publishAddress()).thenReturn(publishAddress); - final String discoveryType = randomFrom("zen", "single-node"); + final String discoveryType = randomFrom(ZEN_DISCOVERY_TYPE, ZEN2_DISCOVERY_TYPE, "single-node"); assertEquals(BootstrapChecks.enforceLimits(boundTransportAddress, discoveryType), !"single-node".equals(discoveryType)); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java index 41007b9e9fd..783dc6325c4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java @@ -19,10 +19,7 @@ package org.elasticsearch.test.discovery; -import org.elasticsearch.cluster.coordination.CoordinationState; import org.elasticsearch.cluster.coordination.Coordinator; -import org.elasticsearch.cluster.coordination.InMemoryPersistedState; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.ClusterApplierService; @@ -81,20 +78,10 @@ public class TestZenDiscovery extends ZenDiscovery { Settings fixedSettings = Settings.builder().put(settings).putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()).build(); return Collections.singletonMap("test-zen", () -> { if (USE_ZEN2.get(settings)) { - Supplier persistedStateSupplier = () -> { - gatewayMetaState.applyClusterStateUpdaters(); - if (DiscoveryNode.isMasterNode(settings) == false) { - // use Zen1 way of writing cluster state for non-master-eligible nodes - // this avoids concurrent manipulating of IndexMetadata with IndicesStore - ((ClusterApplierService) clusterApplier).addLowPriorityApplier(gatewayMetaState); - return new InMemoryPersistedState(gatewayMetaState.getCurrentTerm(), gatewayMetaState.getLastAcceptedState()); - } - return gatewayMetaState; - }; - return new Coordinator("test_node", fixedSettings, clusterSettings, transportService, namedWriteableRegistry, - allocationService, masterService, persistedStateSupplier, hostsProvider, clusterApplier, - new Random(Randomness.get().nextLong())); + allocationService, masterService, + () -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), hostsProvider, + clusterApplier, new Random(Randomness.get().nextLong())); } else { return new TestZenDiscovery(fixedSettings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, clusterSettings, hostsProvider, allocationService, gatewayMetaState); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java index ec0a20faf58..e9924b9d852 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java @@ -64,6 +64,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_FORMAT_SETTING; +import static org.elasticsearch.discovery.DiscoveryModule.ZEN2_DISCOVERY_TYPE; +import static org.elasticsearch.discovery.DiscoveryModule.ZEN_DISCOVERY_TYPE; import static org.elasticsearch.xpack.security.support.SecurityIndexManager.SECURITY_INDEX_NAME; import static org.elasticsearch.xpack.security.support.SecurityIndexManager.INTERNAL_INDEX_FORMAT; import static org.hamcrest.Matchers.containsString; @@ -281,7 +283,7 @@ public class SecurityTests extends ESTestCase { int numIters = randomIntBetween(1, 10); for (int i = 0; i < numIters; i++) { boolean tlsOn = randomBoolean(); - String discoveryType = randomFrom("single-node", "zen", randomAlphaOfLength(4)); + String discoveryType = randomFrom("single-node", ZEN_DISCOVERY_TYPE, ZEN2_DISCOVERY_TYPE, randomAlphaOfLength(4)); Security.ValidateTLSOnJoin validator = new Security.ValidateTLSOnJoin(tlsOn, discoveryType); MetaData.Builder builder = MetaData.builder(); License license = TestUtils.generateSignedLicense(TimeValue.timeValueHours(24));