mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Introduce zen2
discovery type (#36298)
With this change it is now possible to start a node running Zen2.
This commit is contained in:
parent
f0340d6d32
commit
ed1c5a0241
@ -22,10 +22,13 @@ package org.elasticsearch.discovery;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.coordination.Coordinator;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.ClusterApplier;
|
||||
import org.elasticsearch.cluster.service.ClusterApplierService;
|
||||
import org.elasticsearch.cluster.service.MasterService;
|
||||
import org.elasticsearch.common.Randomness;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
@ -58,14 +61,19 @@ import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
|
||||
|
||||
/**
|
||||
* A module for loading classes for node discovery.
|
||||
*/
|
||||
public class DiscoveryModule {
|
||||
private static final Logger logger = LogManager.getLogger(DiscoveryModule.class);
|
||||
|
||||
public static final String ZEN_DISCOVERY_TYPE = "zen";
|
||||
public static final String ZEN2_DISCOVERY_TYPE = "zen2";
|
||||
|
||||
public static final Setting<String> DISCOVERY_TYPE_SETTING =
|
||||
new Setting<>("discovery.type", "zen", Function.identity(), Property.NodeScope);
|
||||
new Setting<>("discovery.type", ZEN_DISCOVERY_TYPE, Function.identity(), Property.NodeScope);
|
||||
public static final Setting<List<String>> DISCOVERY_HOSTS_PROVIDER_SETTING =
|
||||
Setting.listSetting("discovery.zen.hosts_provider", Collections.emptyList(), Function.identity(), Property.NodeScope);
|
||||
|
||||
@ -75,14 +83,14 @@ public class DiscoveryModule {
|
||||
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,
|
||||
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
|
||||
AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState) {
|
||||
final Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators = new ArrayList<>();
|
||||
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
|
||||
final Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
|
||||
hostProviders.put("settings", () -> new SettingsBasedHostsProvider(settings, transportService));
|
||||
hostProviders.put("file", () -> new FileBasedUnicastHostsProvider(configFile));
|
||||
for (DiscoveryPlugin plugin : plugins) {
|
||||
plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> {
|
||||
if (hostProviders.put(entry.getKey(), entry.getValue()) != null) {
|
||||
throw new IllegalArgumentException("Cannot register zen hosts provider [" + entry.getKey() + "] twice");
|
||||
plugin.getZenHostsProviders(transportService, networkService).forEach((key, value) -> {
|
||||
if (hostProviders.put(key, value) != null) {
|
||||
throw new IllegalArgumentException("Cannot register zen hosts provider [" + key + "] twice");
|
||||
}
|
||||
});
|
||||
BiConsumer<DiscoveryNode, ClusterState> joinValidator = plugin.getJoinValidator();
|
||||
@ -117,18 +125,21 @@ public class DiscoveryModule {
|
||||
};
|
||||
|
||||
Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
|
||||
discoveryTypes.put("zen",
|
||||
discoveryTypes.put(ZEN_DISCOVERY_TYPE,
|
||||
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
|
||||
clusterSettings, hostsProvider, allocationService, Collections.unmodifiableCollection(joinValidators), gatewayMetaState));
|
||||
discoveryTypes.put(ZEN2_DISCOVERY_TYPE, () -> new Coordinator(NODE_NAME_SETTING.get(settings), settings, clusterSettings,
|
||||
transportService, namedWriteableRegistry, allocationService, masterService,
|
||||
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), hostsProvider, clusterApplier,
|
||||
Randomness.get()));
|
||||
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier));
|
||||
for (DiscoveryPlugin plugin : plugins) {
|
||||
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry,
|
||||
masterService, clusterApplier, clusterSettings, hostsProvider, allocationService, gatewayMetaState).entrySet()
|
||||
.forEach(entry -> {
|
||||
if (discoveryTypes.put(entry.getKey(), entry.getValue()) != null) {
|
||||
throw new IllegalArgumentException("Cannot register discovery type [" + entry.getKey() + "] twice");
|
||||
}
|
||||
});
|
||||
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, clusterSettings,
|
||||
hostsProvider, allocationService, gatewayMetaState).forEach((key, value) -> {
|
||||
if (discoveryTypes.put(key, value) != null) {
|
||||
throw new IllegalArgumentException("Cannot register discovery type [" + key + "] twice");
|
||||
}
|
||||
});
|
||||
}
|
||||
String discoveryType = DISCOVERY_TYPE_SETTING.get(settings);
|
||||
Supplier<Discovery> discoverySupplier = discoveryTypes.get(discoveryType);
|
||||
|
@ -30,6 +30,8 @@ import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateApplier;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.coordination.CoordinationState;
|
||||
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
|
||||
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.Manifest;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
@ -37,6 +39,7 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.service.ClusterApplierService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
@ -109,6 +112,17 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState.
|
||||
incrementalWrite = false;
|
||||
}
|
||||
|
||||
public PersistedState getPersistedState(Settings settings, ClusterApplierService clusterApplierService) {
|
||||
applyClusterStateUpdaters();
|
||||
if (DiscoveryNode.isMasterNode(settings) == false) {
|
||||
// use Zen1 way of writing cluster state for non-master-eligible nodes
|
||||
// this avoids concurrent manipulating of IndexMetadata with IndicesStore
|
||||
clusterApplierService.addLowPriorityApplier(this);
|
||||
return new InMemoryPersistedState(getCurrentTerm(), getLastAcceptedState());
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
private void initializeClusterState(ClusterName clusterName) throws IOException {
|
||||
long startNS = System.nanoTime();
|
||||
Tuple<Manifest, MetaData> manifestAndMetaData = metaStateService.loadFullState();
|
||||
|
@ -40,6 +40,8 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.discovery.DiscoveryModule.ZEN2_DISCOVERY_TYPE;
|
||||
import static org.elasticsearch.discovery.DiscoveryModule.ZEN_DISCOVERY_TYPE;
|
||||
import static org.hamcrest.CoreMatchers.allOf;
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
@ -101,7 +103,7 @@ public class BootstrapChecksTests extends ESTestCase {
|
||||
when(boundTransportAddress.boundAddresses()).thenReturn(transportAddresses.toArray(new TransportAddress[0]));
|
||||
when(boundTransportAddress.publishAddress()).thenReturn(publishAddress);
|
||||
|
||||
final String discoveryType = randomFrom("zen", "single-node");
|
||||
final String discoveryType = randomFrom(ZEN_DISCOVERY_TYPE, ZEN2_DISCOVERY_TYPE, "single-node");
|
||||
|
||||
assertEquals(BootstrapChecks.enforceLimits(boundTransportAddress, discoveryType), !"single-node".equals(discoveryType));
|
||||
}
|
||||
@ -119,7 +121,7 @@ public class BootstrapChecksTests extends ESTestCase {
|
||||
when(boundTransportAddress.boundAddresses()).thenReturn(transportAddresses.toArray(new TransportAddress[0]));
|
||||
when(boundTransportAddress.publishAddress()).thenReturn(publishAddress);
|
||||
|
||||
final String discoveryType = randomFrom("zen", "single-node");
|
||||
final String discoveryType = randomFrom(ZEN_DISCOVERY_TYPE, ZEN2_DISCOVERY_TYPE, "single-node");
|
||||
|
||||
assertEquals(BootstrapChecks.enforceLimits(boundTransportAddress, discoveryType), !"single-node".equals(discoveryType));
|
||||
}
|
||||
|
@ -19,10 +19,7 @@
|
||||
|
||||
package org.elasticsearch.test.discovery;
|
||||
|
||||
import org.elasticsearch.cluster.coordination.CoordinationState;
|
||||
import org.elasticsearch.cluster.coordination.Coordinator;
|
||||
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.ClusterApplier;
|
||||
import org.elasticsearch.cluster.service.ClusterApplierService;
|
||||
@ -81,20 +78,10 @@ public class TestZenDiscovery extends ZenDiscovery {
|
||||
Settings fixedSettings = Settings.builder().put(settings).putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()).build();
|
||||
return Collections.singletonMap("test-zen", () -> {
|
||||
if (USE_ZEN2.get(settings)) {
|
||||
Supplier<CoordinationState.PersistedState> persistedStateSupplier = () -> {
|
||||
gatewayMetaState.applyClusterStateUpdaters();
|
||||
if (DiscoveryNode.isMasterNode(settings) == false) {
|
||||
// use Zen1 way of writing cluster state for non-master-eligible nodes
|
||||
// this avoids concurrent manipulating of IndexMetadata with IndicesStore
|
||||
((ClusterApplierService) clusterApplier).addLowPriorityApplier(gatewayMetaState);
|
||||
return new InMemoryPersistedState(gatewayMetaState.getCurrentTerm(), gatewayMetaState.getLastAcceptedState());
|
||||
}
|
||||
return gatewayMetaState;
|
||||
};
|
||||
|
||||
return new Coordinator("test_node", fixedSettings, clusterSettings, transportService, namedWriteableRegistry,
|
||||
allocationService, masterService, persistedStateSupplier, hostsProvider, clusterApplier,
|
||||
new Random(Randomness.get().nextLong()));
|
||||
allocationService, masterService,
|
||||
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), hostsProvider,
|
||||
clusterApplier, new Random(Randomness.get().nextLong()));
|
||||
} else {
|
||||
return new TestZenDiscovery(fixedSettings, threadPool, transportService, namedWriteableRegistry, masterService,
|
||||
clusterApplier, clusterSettings, hostsProvider, allocationService, gatewayMetaState);
|
||||
|
@ -64,6 +64,8 @@ import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_FORMAT_SETTING;
|
||||
import static org.elasticsearch.discovery.DiscoveryModule.ZEN2_DISCOVERY_TYPE;
|
||||
import static org.elasticsearch.discovery.DiscoveryModule.ZEN_DISCOVERY_TYPE;
|
||||
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.SECURITY_INDEX_NAME;
|
||||
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.INTERNAL_INDEX_FORMAT;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
@ -281,7 +283,7 @@ public class SecurityTests extends ESTestCase {
|
||||
int numIters = randomIntBetween(1, 10);
|
||||
for (int i = 0; i < numIters; i++) {
|
||||
boolean tlsOn = randomBoolean();
|
||||
String discoveryType = randomFrom("single-node", "zen", randomAlphaOfLength(4));
|
||||
String discoveryType = randomFrom("single-node", ZEN_DISCOVERY_TYPE, ZEN2_DISCOVERY_TYPE, randomAlphaOfLength(4));
|
||||
Security.ValidateTLSOnJoin validator = new Security.ValidateTLSOnJoin(tlsOn, discoveryType);
|
||||
MetaData.Builder builder = MetaData.builder();
|
||||
License license = TestUtils.generateSignedLicense(TimeValue.timeValueHours(24));
|
||||
|
Loading…
x
Reference in New Issue
Block a user