Zen2: Add infrastructure for integration tests (#34365)

Adds the infrastructure to run integration tests against Zen2.
This commit is contained in:
Yannick Welsch 2018-10-14 20:55:04 +01:00 committed by GitHub
parent 8b9fa55c93
commit 5fbead00a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 230 additions and 69 deletions

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
@ -429,8 +430,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
}
// package-visible for testing
DiscoveryNode getLocalNode() {
// visible for testing
public DiscoveryNode getLocalNode() {
return transportService.getLocalNode();
}
@ -578,6 +579,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
final Builder builder = masterService.incrementVersion(currentState);
builder.lastAcceptedConfiguration(votingConfiguration);
builder.lastCommittedConfiguration(votingConfiguration);
MetaData.Builder metaDataBuilder = MetaData.builder();
// automatically generate a UID for the metadata if we need to
metaDataBuilder.generateClusterUuidIfNeeded(); // TODO generate UUID in bootstrapping tool?
builder.metaData(metaDataBuilder);
coordinationState.get().setInitialState(builder.build());
preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version
startElectionScheduler();

View File

@ -21,8 +21,10 @@ package org.elasticsearch.action.admin.indices.stats;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import java.util.Arrays;
@ -33,6 +35,14 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY
@ClusterScope(scope = ESIntegTestCase.Scope.TEST)
public class IndicesStatsBlocksIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(TestZenDiscovery.USE_ZEN2.getKey(), true)
.build();
}
public void testIndicesStatsWithBlocks() {
createIndex("ro");
ensureGreen("ro");

View File

@ -809,44 +809,7 @@ public class CoordinationStateTests extends ESTestCase {
public static long value(ClusterState clusterState) {
return clusterState.metaData().persistentSettings().getAsLong("value", 0L);
}
public static class InMemoryPersistedState implements PersistedState {
// TODO add support and tests for behaviour with persistence-layer failures
private long currentTerm;
private ClusterState acceptedState;
public InMemoryPersistedState(long term, ClusterState acceptedState) {
this.currentTerm = term;
this.acceptedState = acceptedState;
assert currentTerm >= 0;
assert getLastAcceptedState().term() <= currentTerm :
"last accepted term " + getLastAcceptedState().term() + " cannot be above current term " + currentTerm;
}
@Override
public void setCurrentTerm(long currentTerm) {
assert this.currentTerm <= currentTerm;
this.currentTerm = currentTerm;
}
@Override
public void setLastAcceptedState(ClusterState clusterState) {
this.acceptedState = clusterState;
}
@Override
public long getCurrentTerm() {
return currentTerm;
}
@Override
public ClusterState getLastAcceptedState() {
return acceptedState;
}
}
static class ClusterNode {
final DiscoveryNode localNode;

View File

@ -28,7 +28,6 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener;
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
import org.elasticsearch.cluster.coordination.CoordinationStateTests.InMemoryPersistedState;
import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterApplier;

View File

@ -164,7 +164,7 @@ public class NodeJoinTests extends ESTestCase {
transportService,
ESAllocationTestCase.createAllocationService(Settings.EMPTY),
masterService,
() -> new CoordinationStateTests.InMemoryPersistedState(term, initialState), r -> emptyList(),
() -> new InMemoryPersistedState(term, initialState), r -> emptyList(),
new NoOpClusterApplier(),
random);
transportService.start();

View File

@ -22,7 +22,6 @@ package org.elasticsearch.cluster.coordination;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.VotingConfiguration;
import org.elasticsearch.cluster.coordination.CoordinationStateTests.InMemoryPersistedState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;

View File

@ -62,7 +62,7 @@ public class PublicationTests extends ESTestCase {
this.localNode = localNode;
ClusterState initialState = CoordinationStateTests.clusterState(0L, 0L, localNode,
VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L);
coordinationState = new CoordinationState(settings, localNode, new CoordinationStateTests.InMemoryPersistedState(0L,
coordinationState = new CoordinationState(settings, localNode, new InMemoryPersistedState(0L,
initialState));
}

View File

@ -56,6 +56,7 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.MockIndexEventListener;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.test.transport.StubbableTransport;
@ -99,6 +100,13 @@ public class RelocationIT extends ESIntegTestCase {
return Arrays.asList(InternalSettingsPlugin.class, MockTransportService.TestPlugin.class, MockIndexEventListener.TestPlugin.class);
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(TestZenDiscovery.USE_ZEN2.getKey(), true)
.build();
}
@Override
protected void beforeIndexDeletion() throws Exception {
super.beforeIndexDeletion();

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.cluster.ClusterState;
public class InMemoryPersistedState implements CoordinationState.PersistedState {
// TODO add support and tests for behaviour with persistence-layer failures
private long currentTerm;
private ClusterState acceptedState;
public InMemoryPersistedState(long term, ClusterState acceptedState) {
this.currentTerm = term;
this.acceptedState = acceptedState;
assert currentTerm >= 0;
assert getLastAcceptedState().term() <= currentTerm :
"last accepted term " + getLastAcceptedState().term() + " cannot be above current term " + currentTerm;
}
@Override
public void setCurrentTerm(long currentTerm) {
assert this.currentTerm <= currentTerm;
this.currentTerm = currentTerm;
}
@Override
public void setLastAcceptedState(ClusterState clusterState) {
this.acceptedState = clusterState;
}
@Override
public long getCurrentTerm() {
return currentTerm;
}
@Override
public ClusterState getLastAcceptedState() {
return acceptedState;
}
}

View File

@ -1944,16 +1944,17 @@ public abstract class ESIntegTestCase extends ESTestCase {
}
protected NodeConfigurationSource getNodeConfigSource() {
Settings.Builder networkSettings = Settings.builder();
Settings.Builder initialNodeSettings = Settings.builder();
Settings.Builder initialTransportClientSettings = Settings.builder();
if (addMockTransportService()) {
networkSettings.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType());
initialNodeSettings.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType());
initialTransportClientSettings.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType());
}
return new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(networkSettings.build())
.put(initialNodeSettings.build())
.put(ESIntegTestCase.this.nodeSettings(nodeOrdinal)).build();
}
@ -1969,7 +1970,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
@Override
public Settings transportClientSettings() {
return Settings.builder().put(networkSettings.build())
return Settings.builder().put(initialTransportClientSettings.build())
.put(ESIntegTestCase.this.transportClientSettings()).build();
}

View File

@ -190,6 +190,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
.put("transport.type", getTestTransportType())
.put(Node.NODE_DATA_SETTING.getKey(), true)
.put(TestZenDiscovery.USE_ZEN2.getKey(), true)
.put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong())
// default the watermarks low values to prevent tests from failing on nodes without enough disk space
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b")
@ -213,13 +214,16 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
if (addMockHttpTransport()) {
plugins.add(MockHttpTransport.TestPlugin.class);
}
Node build = new MockNode(settings, plugins, forbidPrivateIndexSettings());
try {
build.start();
} catch (NodeValidationException e) {
throw new RuntimeException(e);
}
return build;
Node node = new MockNode(settings, plugins, forbidPrivateIndexSettings());
bootstrapNodes(true,
() -> {
try {
node.start();
} catch (NodeValidationException e) {
throw new RuntimeException(e);
}
}, Collections.singletonList(node), logger);
return node;
}
/**

View File

@ -39,6 +39,7 @@ import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Configurator;
import org.apache.logging.log4j.core.layout.PatternLayout;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.status.StatusConsoleListener;
import org.apache.logging.log4j.status.StatusData;
import org.apache.logging.log4j.status.StatusLogger;
@ -51,11 +52,17 @@ import org.elasticsearch.Version;
import org.elasticsearch.bootstrap.BootstrapForTesting;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.io.PathUtilsForTesting;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -85,6 +92,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.TestEnvironment;
@ -100,6 +108,7 @@ import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
@ -148,6 +157,7 @@ import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
@ -290,6 +300,76 @@ public abstract class ESTestCase extends LuceneTestCase {
return new TransportAddress(TransportAddress.META_ADDRESS, portGenerator.incrementAndGet());
}
public static void bootstrapNodes(boolean condition, Runnable startAction, List<Node> nodes, Logger logger) {
final AtomicBoolean stopBootstrapThread = new AtomicBoolean();
Thread bootstrapThread = null;
if (condition) {
final Set<String> zen2MasterNodeIds = new HashSet<>();
final Set<Node> zen2MasterNodes = new HashSet<>();
for (Node node : nodes) {
if (DiscoveryNode.isMasterNode(node.settings())) {
Discovery discovery = node.injector().getInstance(Discovery.class);
if (discovery instanceof Coordinator) {
zen2MasterNodeIds.add(node.getNodeEnvironment().nodeId());
zen2MasterNodes.add(node);
}
}
}
if (zen2MasterNodes.isEmpty() == false) {
Set<String> configNodeIds = new HashSet<>(randomSubsetOf(zen2MasterNodeIds));
if (configNodeIds.isEmpty()) {
configNodeIds = zen2MasterNodeIds;
}
final ClusterState.VotingConfiguration initalConfiguration = new ClusterState.VotingConfiguration(configNodeIds);
logger.info("Bootstrapping cluster using initial configuration {}", initalConfiguration);
final Random bootstrapRandom = new Random(randomLong());
bootstrapThread = new Thread(() -> {
while (stopBootstrapThread.get() == false) {
final Discovery discovery = randomFrom(bootstrapRandom, zen2MasterNodes).injector().getInstance(Discovery.class);
assert discovery instanceof Coordinator;
final Coordinator coordinator = (Coordinator) discovery;
try {
if (coordinator.lifecycleState() == Lifecycle.State.STARTED) {
coordinator.setInitialConfiguration(initalConfiguration);
if (usually(bootstrapRandom)) {
return;
}
}
} catch (CoordinationStateRejectedException e) {
logger.trace(
() -> new ParameterizedMessage("node [{}] rejected initial configuration", coordinator.getLocalNode()), e);
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
logger.trace("interrupted while sleeping", e);
return;
}
}
}, "Bootstrap-Thread for " + ClusterName.CLUSTER_NAME_SETTING.get(zen2MasterNodes.iterator().next().settings()));
bootstrapThread.start();
}
}
try {
startAction.run();
} finally {
if (bootstrapThread != null) {
stopBootstrapThread.set(true);
try {
bootstrapThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
/**
* Called when a test fails, supplying the errors it generated. Not called when the test fails because assumptions are violated.
*/

View File

@ -142,6 +142,7 @@ import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_M
import static org.elasticsearch.discovery.zen.FileBasedUnicastHostsProvider.UNICAST_HOSTS_FILE;
import static org.elasticsearch.test.ESTestCase.assertBusy;
import static org.elasticsearch.test.ESTestCase.awaitBusy;
import static org.elasticsearch.test.ESTestCase.bootstrapNodes;
import static org.elasticsearch.test.ESTestCase.getTestTransportType;
import static org.elasticsearch.test.ESTestCase.randomFrom;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -509,6 +510,8 @@ public final class InternalTestCluster extends TestCluster {
final int ord = nextNodeId.getAndIncrement();
final Runnable onTransportServiceStarted = () -> {}; // do not create unicast host file for this one node.
final NodeAndClient buildNode = buildNode(ord, random.nextLong(), null, false, 1, onTransportServiceStarted);
assert nodes.isEmpty();
bootstrapNodes(true, buildNode::startNode, Collections.singletonList(buildNode.node()), logger);
buildNode.startNode();
publishNode(buildNode);
return buildNode;
@ -1086,6 +1089,8 @@ public final class InternalTestCluster extends TestCluster {
wipePendingDataDirectories();
}
final int prevNodeCount = nodes.size();
// start any missing node
assert newSize == numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes;
final int numberOfMasterNodes = numSharedDedicatedMasterNodes > 0 ? numSharedDedicatedMasterNodes : numSharedDataNodes;
@ -1120,6 +1125,9 @@ public final class InternalTestCluster extends TestCluster {
toStartAndPublish.add(nodeAndClient);
}
bootstrapNodes(prevNodeCount == 0, () -> startAndPublishNodesAndClients(toStartAndPublish),
toStartAndPublish.stream().map(NodeAndClient::node).collect(Collectors.toList()), logger);
startAndPublishNodesAndClients(toStartAndPublish);
nextNodeId.set(newSize);
@ -1808,14 +1816,18 @@ public final class InternalTestCluster extends TestCluster {
defaultMinMasterNodes = -1;
}
final List<NodeAndClient> nodes = new ArrayList<>();
final int prevMasterCount = getMasterNodesCount();
for (Settings nodeSettings : settings) {
nodes.add(buildNode(nodeSettings, defaultMinMasterNodes, () -> rebuildUnicastHostFiles(nodes)));
}
startAndPublishNodesAndClients(nodes);
if (autoManageMinMasterNodes) {
validateClusterFormed();
}
bootstrapNodes(prevMasterCount == 0,
() -> {
startAndPublishNodesAndClients(nodes);
if (autoManageMinMasterNodes) {
validateClusterFormed();
}
},
nodes.stream().map(NodeAndClient::node).collect(Collectors.toList()), logger);
return nodes.stream().map(NodeAndClient::getName).collect(Collectors.toList());
}

View File

@ -19,9 +19,16 @@
package org.elasticsearch.test.discovery;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationState;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
@ -36,9 +43,11 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Supplier;
import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
@ -52,6 +61,9 @@ public class TestZenDiscovery extends ZenDiscovery {
public static final Setting<Boolean> USE_MOCK_PINGS =
Setting.boolSetting("discovery.zen.use_mock_pings", true, Setting.Property.NodeScope);
public static final Setting<Boolean> USE_ZEN2 =
Setting.boolSetting("discovery.zen.use_zen2", false, Setting.Property.NodeScope);
/** A plugin which installs mock discovery and configures it to be used. */
public static class TestPlugin extends Plugin implements DiscoveryPlugin {
protected final Settings settings;
@ -65,17 +77,27 @@ public class TestZenDiscovery extends ZenDiscovery {
MasterService masterService, ClusterApplier clusterApplier,
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider,
AllocationService allocationService) {
return Collections.singletonMap("test-zen",
() -> new TestZenDiscovery(
// we don't get the latest setting which were updated by the extra settings for the plugin. TODO: fix.
Settings.builder().put(settings).putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()).build(),
threadPool, transportService, namedWriteableRegistry, masterService,
clusterApplier, clusterSettings, hostsProvider, allocationService));
// we don't get the latest setting which were updated by the extra settings for the plugin. TODO: fix.
Settings fixedSettings = Settings.builder().put(settings).putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()).build();
return Collections.singletonMap("test-zen", () -> {
if (USE_ZEN2.get(settings)) {
// TODO: needs a proper storage layer
Supplier<CoordinationState.PersistedState> persistedStateSupplier =
() -> new InMemoryPersistedState(0L, ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
.nodes(DiscoveryNodes.builder().add(transportService.getLocalNode())
.localNodeId(transportService.getLocalNode().getId()).build()).build());
return new Coordinator(fixedSettings, transportService, allocationService, masterService, persistedStateSupplier,
hostsProvider, clusterApplier, new Random(Randomness.get().nextLong()));
} else {
return new TestZenDiscovery(fixedSettings, threadPool, transportService, namedWriteableRegistry, masterService,
clusterApplier, clusterSettings, hostsProvider, allocationService);
}
});
}
@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(USE_MOCK_PINGS);
return Arrays.asList(USE_MOCK_PINGS, USE_ZEN2);
}
@Override
@ -98,7 +120,7 @@ public class TestZenDiscovery extends ZenDiscovery {
@Override
protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
UnicastHostsProvider hostsProvider) {
if (USE_MOCK_PINGS.get(settings)) {
if (USE_MOCK_PINGS.get(settings) && USE_ZEN2.get(settings) == false) {
return new MockZenPing(settings, this);
} else {
return super.newZenPing(settings, threadPool, transportService, hostsProvider);