From c3987156ab0f3505df7f84de9cc6bc9e55db5c44 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 18 Oct 2016 21:12:15 +0200 Subject: [PATCH] Remove local discovery in favor of a simpler `MockZenPings` (#20960) `LocalDiscovery` is a discovery implementation that uses static in memory maps to keep track of current live nodes. This is used extensively in our tests in order to speed up cluster formation (i.e., shortcut the 3 second ping period used by `ZenDiscovery` by default). This is sad as that mean that most of the test run using a different discovery semantics than what is used in production. Instead of replacing the entire discovery logic, we can use a similar approach to only shortcut the pinging components. --- .../elasticsearch/cluster/ClusterState.java | 4 +- .../decider/MaxRetryAllocationDecider.java | 14 +- .../common/util/ExtensionPoint.java | 4 + .../discovery/DiscoveryModule.java | 10 +- .../discovery/NoneDiscovery.java | 102 +++++ .../discovery/local/LocalDiscovery.java | 422 ------------------ .../org/elasticsearch/tribe/TribeService.java | 2 +- .../master/IndexingMasterFailoverIT.java | 7 +- .../client/transport/TransportClientIT.java | 5 +- .../cluster/MinimumMasterNodesIT.java | 11 +- .../cluster/SpecificMasterNodesIT.java | 30 +- .../cluster/routing/PrimaryAllocationIT.java | 6 +- .../cluster/service/ClusterServiceIT.java | 35 +- .../DiscoveryWithServiceDisruptionsIT.java | 4 + .../discovery/zen/ZenDiscoveryIT.java | 55 +-- .../index/TransportIndexFailuresIT.java | 160 ------- .../indices/state/RareClusterStateIT.java | 10 +- .../DedicatedClusterSnapshotRestoreIT.java | 9 - .../SharedClusterSnapshotRestoreIT.java | 4 +- .../java/org/elasticsearch/tribe/TribeIT.java | 18 +- .../azure/AzureSnapshotRestoreTests.java | 9 - .../elasticsearch/tribe/TribeUnitTests.java | 16 +- .../org/elasticsearch/node/NodeTests.java | 6 +- .../test/ESBackcompatTestCase.java | 8 +- .../elasticsearch/test/ESIntegTestCase.java | 27 +- .../test/ESSingleNodeTestCase.java | 6 +- .../ClusterDiscoveryConfiguration.java | 3 +- .../test/discovery/MockZenPing.java | 107 +++++ .../test/test/InternalTestClusterTests.java | 17 +- 29 files changed, 325 insertions(+), 786 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/discovery/NoneDiscovery.java delete mode 100644 core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java delete mode 100644 core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java create mode 100644 test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java index e592b5092b7..7fc0a4d09b0 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -52,7 +52,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.local.LocalDiscovery; import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import java.io.IOException; @@ -72,8 +71,7 @@ import java.util.Set; * single thread and controlled by the {@link ClusterService}. After every update the * {@link Discovery#publish} method publishes new version of the cluster state to all other nodes in the * cluster. The actual publishing mechanism is delegated to the {@link Discovery#publish} method and depends on - * the type of discovery. For example, for local discovery it is implemented by the {@link LocalDiscovery#publish} - * method. In the Zen Discovery it is handled in the {@link PublishClusterStateAction#publish} method. The + * the type of discovery. In the Zen Discovery it is handled in the {@link PublishClusterStateAction#publish} method. The * publishing mechanism can be overridden by other discovery. *

* The cluster state implements the {@link Diffable} interface in order to support publishing of cluster state diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java index 395d3472329..0e7159c857b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java @@ -54,7 +54,8 @@ public class MaxRetryAllocationDecider extends AllocationDecider { @Override public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) { - UnassignedInfo unassignedInfo = shardRouting.unassignedInfo(); + final UnassignedInfo unassignedInfo = shardRouting.unassignedInfo(); + final Decision decision; if (unassignedInfo != null && unassignedInfo.getNumFailedAllocations() > 0) { final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index()); final int maxRetry = SETTING_ALLOCATION_MAX_RETRY.get(indexMetaData.getSettings()); @@ -62,16 +63,21 @@ public class MaxRetryAllocationDecider extends AllocationDecider { // if we are called via the _reroute API we ignore the failure counter and try to allocate // this improves the usability since people don't need to raise the limits to issue retries since a simple _reroute call is // enough to manually retry. - return allocation.decision(Decision.YES, NAME, "shard has already failed allocating [" + decision = allocation.decision(Decision.YES, NAME, "shard has already failed allocating [" + unassignedInfo.getNumFailedAllocations() + "] times vs. [" + maxRetry + "] retries allowed " + unassignedInfo.toString() + " - retrying once on manual allocation"); } else if (unassignedInfo.getNumFailedAllocations() >= maxRetry) { - return allocation.decision(Decision.NO, NAME, "shard has already failed allocating [" + decision = allocation.decision(Decision.NO, NAME, "shard has already failed allocating [" + unassignedInfo.getNumFailedAllocations() + "] times vs. [" + maxRetry + "] retries allowed " + unassignedInfo.toString() + " - manually call [/_cluster/reroute?retry_failed=true] to retry"); + } else { + decision = allocation.decision(Decision.YES, NAME, "shard has already failed allocating [" + + unassignedInfo.getNumFailedAllocations() + "] times but [" + maxRetry + "] retries are allowed"); } + } else { + decision = allocation.decision(Decision.YES, NAME, "shard has no previous failures"); } - return allocation.decision(Decision.YES, NAME, "shard has no previous failures"); + return decision; } @Override diff --git a/core/src/main/java/org/elasticsearch/common/util/ExtensionPoint.java b/core/src/main/java/org/elasticsearch/common/util/ExtensionPoint.java index fcdfaafb1d5..a5dac12fab7 100644 --- a/core/src/main/java/org/elasticsearch/common/util/ExtensionPoint.java +++ b/core/src/main/java/org/elasticsearch/common/util/ExtensionPoint.java @@ -200,6 +200,10 @@ public abstract class ExtensionPoint { allocationMultibinder.addBinding().to(clazz); } } + + public boolean isEmpty() { + return extensions.isEmpty(); + } } /** diff --git a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index b41316b6534..1e41204d64a 100644 --- a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.ExtensionPoint; -import org.elasticsearch.discovery.local.LocalDiscovery; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.ping.ZenPing; @@ -59,11 +58,9 @@ public class DiscoveryModule extends AbstractModule { public DiscoveryModule(Settings settings) { this.settings = settings; - addDiscoveryType("local", LocalDiscovery.class); + addDiscoveryType("none", NoneDiscovery.class); addDiscoveryType("zen", ZenDiscovery.class); addElectMasterService("zen", ElectMasterService.class); - // always add the unicast hosts, or things get angry! - addZenPing(UnicastZenPing.class); } /** @@ -113,7 +110,7 @@ public class DiscoveryModule extends AbstractModule { throw new IllegalArgumentException("Unknown Discovery type [" + discoveryType + "]"); } - if (discoveryType.equals("local") == false) { + if (discoveryType.equals("none") == false) { String masterServiceTypeKey = ZEN_MASTER_SERVICE_TYPE_SETTING.get(settings); final Class masterService = masterServiceType.get(masterServiceTypeKey); if (masterService == null) { @@ -130,6 +127,9 @@ public class DiscoveryModule extends AbstractModule { unicastHostProviders.getOrDefault(discoveryType, Collections.emptyList())) { unicastHostsProviderMultibinder.addBinding().to(unicastHostProvider); } + if (zenPings.isEmpty()) { + zenPings.registerExtension(UnicastZenPing.class); + } zenPings.bind(binder()); } bind(Discovery.class).to(discoveryClass).asEagerSingleton(); diff --git a/core/src/main/java/org/elasticsearch/discovery/NoneDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/NoneDiscovery.java new file mode 100644 index 00000000000..91b04ce396b --- /dev/null +++ b/core/src/main/java/org/elasticsearch/discovery/NoneDiscovery.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.discovery; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.zen.ElectMasterService; + +/** + * A {@link Discovery} implementation that is used by {@link org.elasticsearch.tribe.TribeService}. This implementation + * doesn't support any clustering features. Most notably {@link #startInitialJoin()} does nothing and + * {@link #publish(ClusterChangedEvent, AckListener)} is not supported. + */ +public class NoneDiscovery extends AbstractLifecycleComponent implements Discovery { + + private final ClusterService clusterService; + private final DiscoverySettings discoverySettings; + + @Inject + public NoneDiscovery(Settings settings, ClusterService clusterService, ClusterSettings clusterSettings) { + super(settings); + this.clusterService = clusterService; + this.discoverySettings = new DiscoverySettings(settings, clusterSettings); + } + + @Override + public DiscoveryNode localNode() { + return clusterService.localNode(); + } + + @Override + public String nodeDescription() { + return clusterService.getClusterName().value() + "/" + clusterService.localNode().getId(); + } + + @Override + public void setAllocationService(AllocationService allocationService) { + + } + + @Override + public void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener) { + throw new UnsupportedOperationException(); + } + + @Override + public DiscoveryStats stats() { + return null; + } + + @Override + public DiscoverySettings getDiscoverySettings() { + return discoverySettings; + } + + @Override + public void startInitialJoin() { + + } + + @Override + public int getMinimumMasterNodes() { + return ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings); + } + + @Override + protected void doStart() { + + } + + @Override + protected void doStop() { + + } + + @Override + protected void doClose() { + + } +} diff --git a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java deleted file mode 100644 index 389f5ee03bb..00000000000 --- a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ /dev/null @@ -1,422 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.discovery.local; - -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.Diff; -import org.elasticsearch.cluster.IncompatibleClusterStateVersionException; -import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.allocation.AllocationService; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler; -import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler; -import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.DiscoveryStats; -import org.elasticsearch.discovery.zen.publish.PendingClusterStateStats; - -import java.util.HashSet; -import java.util.Optional; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; - -import static org.elasticsearch.cluster.ClusterState.Builder; - -public class LocalDiscovery extends AbstractLifecycleComponent implements Discovery { - - private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0]; - - private final ClusterService clusterService; - private AllocationService allocationService; - private final ClusterName clusterName; - - private final DiscoverySettings discoverySettings; - - private volatile boolean master = false; - - private static final ConcurrentMap clusterGroups = ConcurrentCollections.newConcurrentMap(); - - private volatile ClusterState lastProcessedClusterState; - - @Inject - public LocalDiscovery(Settings settings, ClusterService clusterService, ClusterSettings clusterSettings) { - super(settings); - this.clusterName = clusterService.getClusterName(); - this.clusterService = clusterService; - this.discoverySettings = new DiscoverySettings(settings, clusterSettings); - } - - @Override - public void setAllocationService(AllocationService allocationService) { - this.allocationService = allocationService; - } - - @Override - protected void doStart() { - - } - - @Override - public void startInitialJoin() { - synchronized (clusterGroups) { - ClusterGroup clusterGroup = clusterGroups.get(clusterName); - if (clusterGroup == null) { - clusterGroup = new ClusterGroup(); - clusterGroups.put(clusterName, clusterGroup); - } - logger.debug("Connected to cluster [{}]", clusterName); - - Optional current = clusterGroup.members().stream().filter(other -> ( - other.localNode().equals(this.localNode()) || other.localNode().getId().equals(this.localNode().getId()) - )).findFirst(); - if (current.isPresent()) { - throw new IllegalStateException("current cluster group already contains a node with the same id. current " - + current.get().localNode() + ", this node " + localNode()); - } - - clusterGroup.members().add(this); - - LocalDiscovery firstMaster = null; - for (LocalDiscovery localDiscovery : clusterGroup.members()) { - if (localDiscovery.localNode().isMasterNode()) { - firstMaster = localDiscovery; - break; - } - } - - if (firstMaster != null && firstMaster.equals(this)) { - // we are the first master (and the master) - master = true; - final LocalDiscovery master = firstMaster; - clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ClusterStateUpdateTask() { - - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) { - DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); - for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) { - nodesBuilder.add(discovery.localNode()); - } - nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId()); - // remove the NO_MASTER block in this case - ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()); - return ClusterState.builder(currentState).nodes(nodesBuilder).blocks(blocks).build(); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e); - } - }); - } else if (firstMaster != null) { - // tell the master to send the fact that we are here - final LocalDiscovery master = firstMaster; - firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode() + "])", new ClusterStateUpdateTask() { - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) { - DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); - for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) { - nodesBuilder.add(discovery.localNode()); - } - nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId()); - currentState = ClusterState.builder(currentState).nodes(nodesBuilder).build(); - return master.allocationService.reroute(currentState, "node_add"); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e); - } - - }); - } - } // else, no master node, the next node that will start will fill things in... - } - - @Override - protected void doStop() { - synchronized (clusterGroups) { - ClusterGroup clusterGroup = clusterGroups.get(clusterName); - if (clusterGroup == null) { - logger.warn("Illegal state, should not have an empty cluster group when stopping, I should be there at teh very least..."); - return; - } - clusterGroup.members().remove(this); - if (clusterGroup.members().isEmpty()) { - // no more members, remove and return - clusterGroups.remove(clusterName); - return; - } - - LocalDiscovery firstMaster = null; - for (LocalDiscovery localDiscovery : clusterGroup.members()) { - if (localDiscovery.localNode().isMasterNode()) { - firstMaster = localDiscovery; - break; - } - } - - if (firstMaster != null) { - // if the removed node is the master, make the next one as the master - if (master) { - firstMaster.master = true; - } - - final Set newMembers = new HashSet<>(); - for (LocalDiscovery discovery : clusterGroup.members()) { - newMembers.add(discovery.localNode().getId()); - } - - final LocalDiscovery master = firstMaster; - master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateUpdateTask() { - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) { - DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode().getId()); - DiscoveryNodes.Delta delta = newNodes.delta(currentState.nodes()); - if (delta.added()) { - logger.warn("No new nodes should be created when a new discovery view is accepted"); - } - // reroute here, so we eagerly remove dead nodes from the routing - ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build(); - return master.allocationService.deassociateDeadNodes(updatedState, true, "node stopped"); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e); - } - }); - } - } - } - - @Override - protected void doClose() { - } - - @Override - public DiscoveryNode localNode() { - return clusterService.localNode(); - } - - @Override - public String nodeDescription() { - return clusterName.value() + "/" + localNode().getId(); - } - - @Override - public void publish(ClusterChangedEvent clusterChangedEvent, final Discovery.AckListener ackListener) { - if (!master) { - throw new IllegalStateException("Shouldn't publish state when not master"); - } - LocalDiscovery[] members = members(); - if (members.length > 0) { - Set nodesToPublishTo = new HashSet<>(members.length); - for (LocalDiscovery localDiscovery : members) { - if (localDiscovery.master) { - continue; - } - nodesToPublishTo.add(localDiscovery.localNode()); - } - publish(members, clusterChangedEvent, new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener)); - } - } - - @Override - public DiscoveryStats stats() { - return new DiscoveryStats((PendingClusterStateStats)null); - } - - @Override - public DiscoverySettings getDiscoverySettings() { - return discoverySettings; - } - - @Override - public int getMinimumMasterNodes() { - return -1; - } - - private LocalDiscovery[] members() { - ClusterGroup clusterGroup = clusterGroups.get(clusterName); - if (clusterGroup == null) { - return NO_MEMBERS; - } - Queue members = clusterGroup.members(); - return members.toArray(new LocalDiscovery[members.size()]); - } - - private void publish(LocalDiscovery[] members, ClusterChangedEvent clusterChangedEvent, final BlockingClusterStatePublishResponseHandler publishResponseHandler) { - - try { - // we do the marshaling intentionally, to check it works well... - byte[] clusterStateBytes = null; - byte[] clusterStateDiffBytes = null; - - ClusterState clusterState = clusterChangedEvent.state(); - for (final LocalDiscovery discovery : members) { - if (discovery.master) { - continue; - } - ClusterState newNodeSpecificClusterState = null; - synchronized (this) { - // we do the marshaling intentionally, to check it works well... - // check if we published cluster state at least once and node was in the cluster when we published cluster state the last time - if (discovery.lastProcessedClusterState != null && clusterChangedEvent.previousState().nodes().nodeExists(discovery.localNode())) { - // both conditions are true - which means we can try sending cluster state as diffs - if (clusterStateDiffBytes == null) { - Diff diff = clusterState.diff(clusterChangedEvent.previousState()); - BytesStreamOutput os = new BytesStreamOutput(); - diff.writeTo(os); - clusterStateDiffBytes = BytesReference.toBytes(os.bytes()); - } - try { - newNodeSpecificClusterState = discovery.lastProcessedClusterState.readDiffFrom(StreamInput.wrap(clusterStateDiffBytes)).apply(discovery.lastProcessedClusterState); - logger.trace("sending diff cluster state version [{}] with size {} to [{}]", clusterState.version(), clusterStateDiffBytes.length, discovery.localNode().getName()); - } catch (IncompatibleClusterStateVersionException ex) { - logger.warn((Supplier) () -> new ParameterizedMessage("incompatible cluster state version [{}] - resending complete cluster state", clusterState.version()), ex); - } - } - if (newNodeSpecificClusterState == null) { - if (clusterStateBytes == null) { - clusterStateBytes = Builder.toBytes(clusterState); - } - newNodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode()); - } - discovery.lastProcessedClusterState = newNodeSpecificClusterState; - } - final ClusterState nodeSpecificClusterState = newNodeSpecificClusterState; - - nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED); - // ignore cluster state messages that do not include "me", not in the game yet... - if (nodeSpecificClusterState.nodes().getLocalNode() != null) { - assert nodeSpecificClusterState.nodes().getMasterNode() != null : "received a cluster state without a master"; - assert !nodeSpecificClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block"; - - discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ClusterStateUpdateTask() { - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) { - if (currentState.supersedes(nodeSpecificClusterState)) { - return currentState; - } - - if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) { - // its a fresh update from the master as we transition from a start of not having a master to having one - logger.debug("got first state from fresh master [{}]", nodeSpecificClusterState.nodes().getMasterNodeId()); - return nodeSpecificClusterState; - } - - ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState); - // if the routing table did not change, use the original one - if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) { - builder.routingTable(currentState.routingTable()); - } - if (nodeSpecificClusterState.metaData().version() == currentState.metaData().version()) { - builder.metaData(currentState.metaData()); - } - - return builder.build(); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e); - publishResponseHandler.onFailure(discovery.localNode(), e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - publishResponseHandler.onResponse(discovery.localNode()); - } - }); - } else { - publishResponseHandler.onResponse(discovery.localNode()); - } - } - - TimeValue publishTimeout = discoverySettings.getPublishTimeout(); - if (publishTimeout.millis() > 0) { - try { - boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout); - if (!awaited) { - DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes(); - // everyone may have just responded - if (pendingNodes.length > 0) { - logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})", clusterState.version(), publishTimeout, pendingNodes); - } - } - } catch (InterruptedException e) { - // ignore & restore interrupt - Thread.currentThread().interrupt(); - } - } - - - } catch (Exception e) { - // failure to marshal or un-marshal - throw new IllegalStateException("Cluster state failed to serialize", e); - } - } - - private class ClusterGroup { - - private Queue members = ConcurrentCollections.newQueue(); - - Queue members() { - return members; - } - } -} diff --git a/core/src/main/java/org/elasticsearch/tribe/TribeService.java b/core/src/main/java/org/elasticsearch/tribe/TribeService.java index fd697340cd7..7871a0a6f39 100644 --- a/core/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/core/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -124,7 +124,7 @@ public class TribeService extends AbstractLifecycleComponent { if (!NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.exists(settings)) { sb.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), nodesSettings.size()); } - sb.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local"); // a tribe node should not use zen discovery + sb.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "none"); // a tribe node should not use zen discovery // nothing is going to be discovered, since no master will be elected sb.put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0); if (sb.get("cluster.name") == null) { diff --git a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java index 29dd2a150a3..38328a80054 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java @@ -22,7 +22,6 @@ package org.elasticsearch.action.support.master; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.fd.FaultDetection; @@ -46,6 +45,11 @@ import static org.hamcrest.Matchers.equalTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class IndexingMasterFailoverIT extends ESIntegTestCase { + @Override + protected boolean addMockZenPings() { + return false; + } + @Override protected Collection> nodePlugins() { final HashSet> classes = new HashSet<>(super.nodePlugins()); @@ -62,7 +66,6 @@ public class IndexingMasterFailoverIT extends ESIntegTestCase { logger.info("--> start 4 nodes, 3 master, 1 data"); final Settings sharedSettings = Settings.builder() - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen") .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java index 93c5e29208c..dbb066dcb1b 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java @@ -32,12 +32,13 @@ import org.elasticsearch.node.NodeValidationException; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.transport.MockTransportClient; import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.Collections; +import java.util.Arrays; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -65,7 +66,7 @@ public class TransportClientIT extends ESIntegTestCase { .put(NetworkModule.HTTP_ENABLED.getKey(), false) .put(Node.NODE_DATA_SETTING.getKey(), false) .put("cluster.name", "foobar") - .build(), Collections.singleton(MockTcpTransportPlugin.class)).start()) { + .build(), Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class)).start()) { TransportAddress transportAddress = node.injector().getInstance(TransportService.class).boundAddress().publishAddress(); client.addTransportAddress(transportAddress); // since we force transport clients there has to be one node started that we connect to. diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index b8e95343932..83472a1edc4 100644 --- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; @@ -75,15 +74,13 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { } @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); + protected boolean addMockZenPings() { + return false; } public void testSimpleMinimumMasterNodes() throws Exception { Settings settings = Settings.builder() - .put("discovery.type", "zen") .put("discovery.zen.minimum_master_nodes", 2) .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "200ms") .put("discovery.initial_state_timeout", "500ms") @@ -195,7 +192,6 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { public void testMultipleNodesShutdownNonMasterNodes() throws Exception { Settings settings = Settings.builder() - .put("discovery.type", "zen") .put("discovery.zen.minimum_master_nodes", 3) .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "1s") .put("discovery.initial_state_timeout", "500ms") @@ -271,7 +267,6 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { public void testDynamicUpdateMinimumMasterNodes() throws Exception { Settings settings = Settings.builder() - .put("discovery.type", "zen") .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "400ms") .put("discovery.initial_state_timeout", "500ms") .build(); @@ -329,7 +324,6 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { public void testCanNotBringClusterDown() throws ExecutionException, InterruptedException { int nodeCount = scaledRandomIntBetween(1, 5); Settings.Builder settings = Settings.builder() - .put("discovery.type", "zen") .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "200ms") .put("discovery.initial_state_timeout", "500ms"); @@ -368,7 +362,6 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { public void testCanNotPublishWithoutMinMastNodes() throws Exception { Settings settings = Settings.builder() - .put("discovery.type", "zen") .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "200ms") .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2) .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "100ms") // speed things up diff --git a/core/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java index 4d62f9b664f..c033ad7ff27 100644 --- a/core/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java @@ -22,7 +22,6 @@ package org.elasticsearch.cluster; import org.apache.lucene.search.join.ScoreMode; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.node.Node; @@ -41,18 +40,9 @@ import static org.hamcrest.Matchers.nullValue; @ClusterScope(scope = Scope.TEST, numDataNodes = 0) public class SpecificMasterNodesIT extends ESIntegTestCase { - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); - } - protected final Settings.Builder settingsBuilder() { - return Settings.builder().put("discovery.type", "zen"); - } - public void testSimpleOnlyMasterNodeElection() throws IOException { logger.info("--> start data node / non master node"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false).put("discovery.initial_state_timeout", "1s")); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false).put("discovery.initial_state_timeout", "1s")); try { assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().getMasterNodeId(), nullValue()); fail("should not be able to find master"); @@ -60,7 +50,7 @@ public class SpecificMasterNodesIT extends ESIntegTestCase { // all is well, no master elected } logger.info("--> start master node"); - final String masterNodeName = internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + final String masterNodeName = internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); assertThat(internalCluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); @@ -75,14 +65,14 @@ public class SpecificMasterNodesIT extends ESIntegTestCase { } logger.info("--> start master node"); - final String nextMasterEligibleNodeName = internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + final String nextMasterEligibleNodeName = internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligibleNodeName)); assertThat(internalCluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligibleNodeName)); } public void testElectOnlyBetweenMasterNodes() throws IOException { logger.info("--> start data node / non master node"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false).put("discovery.initial_state_timeout", "1s")); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false).put("discovery.initial_state_timeout", "1s")); try { assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().getMasterNodeId(), nullValue()); fail("should not be able to find master"); @@ -90,12 +80,12 @@ public class SpecificMasterNodesIT extends ESIntegTestCase { // all is well, no master elected } logger.info("--> start master node (1)"); - final String masterNodeName = internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + final String masterNodeName = internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); assertThat(internalCluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); logger.info("--> start master node (2)"); - final String nextMasterEligableNodeName = internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + final String nextMasterEligableNodeName = internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); assertThat(internalCluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); @@ -112,10 +102,10 @@ public class SpecificMasterNodesIT extends ESIntegTestCase { */ public void testCustomDefaultMapping() throws Exception { logger.info("--> start master node / non data"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); logger.info("--> start data node / non master node"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false)); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false)); createIndex("test"); assertAcked(client().admin().indices().preparePutMapping("test").setType("_default_").setSource("timestamp", "type=date")); @@ -134,10 +124,10 @@ public class SpecificMasterNodesIT extends ESIntegTestCase { public void testAliasFilterValidation() throws Exception { logger.info("--> start master node / non data"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); logger.info("--> start data node / non master node"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false)); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false)); assertAcked(prepareCreate("test").addMapping("type1", "{\"type1\" : {\"properties\" : {\"table_a\" : { \"type\" : \"nested\", \"properties\" : {\"field_a\" : { \"type\" : \"keyword\" },\"field_b\" :{ \"type\" : \"keyword\" }}}}}}")); client().admin().indices().prepareAliases().addAlias("test", "a_test", QueryBuilders.nestedQuery("table_a", QueryBuilders.termQuery("table_a.field_b", "y"), ScoreMode.Avg)).get(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index 9f21d133264..6243f138380 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -30,7 +30,6 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimary import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -63,9 +62,8 @@ public class PrimaryAllocationIT extends ESIntegTestCase { } @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); + protected boolean addMockZenPings() { + return false; } private void createStaleReplicaScenario() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java index a25e3abe65d..111b84f981a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Singleton; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -68,17 +67,8 @@ public class ClusterServiceIT extends ESIntegTestCase { return Arrays.asList(TestPlugin.class); } - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); - } - public void testAckedUpdateTask() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); + internalCluster().startNode(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class); final AtomicBoolean allNodesAcked = new AtomicBoolean(false); @@ -151,10 +141,7 @@ public class ClusterServiceIT extends ESIntegTestCase { } public void testAckedUpdateTaskSameClusterState() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); + internalCluster().startNode(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class); final AtomicBoolean allNodesAcked = new AtomicBoolean(false); @@ -222,10 +209,7 @@ public class ClusterServiceIT extends ESIntegTestCase { } public void testAckedUpdateTaskNoAckExpected() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); + internalCluster().startNode(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class); final AtomicBoolean allNodesAcked = new AtomicBoolean(false); @@ -294,10 +278,7 @@ public class ClusterServiceIT extends ESIntegTestCase { } public void testAckedUpdateTaskTimeoutZero() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); + internalCluster().startNode(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class); final AtomicBoolean allNodesAcked = new AtomicBoolean(false); @@ -371,11 +352,8 @@ public class ClusterServiceIT extends ESIntegTestCase { @TestLogging("_root:debug,org.elasticsearch.action.admin.cluster.tasks:trace") public void testPendingUpdateTask() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "local") - .build(); - String node_0 = internalCluster().startNode(settings); - internalCluster().startCoordinatingOnlyNode(settings); + String node_0 = internalCluster().startNode(); + internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node_0); final CountDownLatch block1 = new CountDownLatch(1); @@ -507,7 +485,6 @@ public class ClusterServiceIT extends ESIntegTestCase { public void testLocalNodeMasterListenerCallbacks() throws Exception { Settings settings = Settings.builder() - .put("discovery.type", "zen") .put("discovery.zen.minimum_master_nodes", 1) .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "400ms") .put("discovery.initial_state_timeout", "500ms") diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index 58a67769730..b400b0c7a5f 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -128,6 +128,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { private ClusterDiscoveryConfiguration discoveryConfig; + @Override + protected boolean addMockZenPings() { + return false; + } @Override protected Settings nodeSettings(int nodeOrdinal) { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java index d336534a158..c99187c7a93 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java @@ -24,9 +24,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -38,7 +36,6 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.membership.MembershipAction; @@ -46,7 +43,6 @@ import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.TestCustomMetaData; -import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BytesTransportRequest; @@ -55,7 +51,6 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.hamcrest.Matchers; -import org.junit.Before; import java.io.IOException; import java.net.UnknownHostException; @@ -78,32 +73,10 @@ import static org.hamcrest.Matchers.notNullValue; @TestLogging("_root:DEBUG") public class ZenDiscoveryIT extends ESIntegTestCase { - private Version previousMajorVersion; - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); - } - - @Before - public void computePrevMajorVersion() { - Version previousMajor; - // find a GA build whose major version is statesFound = new ArrayList<>(); final CountDownLatch nodesStopped = new CountDownLatch(1); - clusterService.add(new ClusterStateListener() { - @Override - public void clusterChanged(ClusterChangedEvent event) { - statesFound.add(event.state()); - try { - // block until both nodes have stopped to accumulate node failures - nodesStopped.await(); - } catch (InterruptedException e) { - //meh - } + clusterService.add(event -> { + statesFound.add(event.state()); + try { + // block until both nodes have stopped to accumulate node failures + nodesStopped.await(); + } catch (InterruptedException e) { + //meh } }); @@ -189,10 +158,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase { } public void testNodeRejectsClusterStateWithWrongMasterNode() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "zen") - .build(); - List nodeNames = internalCluster().startNodesAsync(2, settings).get(); + List nodeNames = internalCluster().startNodesAsync(2).get(); client().admin().cluster().prepareHealth().setWaitForNodes("2").get(); List nonMasterNodes = new ArrayList<>(nodeNames); @@ -303,10 +269,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase { " }\n" + "}"; - Settings nodeSettings = Settings.builder() - .put("discovery.type", "zen") // <-- To override the local setting if set externally - .build(); - internalCluster().startNode(nodeSettings); + internalCluster().startNode(); logger.info("--> request node discovery stats"); NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear().setDiscovery(true).get(); diff --git a/core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java b/core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java deleted file mode 100644 index 91a18f9c053..00000000000 --- a/core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index; - -import org.elasticsearch.action.index.IndexAction; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingNodes; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.zen.fd.FaultDetection; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.TransportService; - -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -import static java.util.Collections.singleton; -import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; -import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; -import static org.hamcrest.Matchers.equalTo; - -/** - * Test failure when index replication actions fail mid-flight - */ -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) -public class TransportIndexFailuresIT extends ESIntegTestCase { - - private static final Settings nodeSettings = Settings.builder() - .put("discovery.type", "zen") // <-- To override the local setting if set externally - .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly - .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // <-- for hitting simulated network failures quickly - .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly - .put("discovery.zen.minimum_master_nodes", 1) - .build(); - - @Override - protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class); - } - - @Override - protected int numberOfShards() { - return 1; - } - - @Override - protected int numberOfReplicas() { - return 1; - } - - public void testNetworkPartitionDuringReplicaIndexOp() throws Exception { - final String INDEX = "testidx"; - - List nodes = internalCluster().startNodesAsync(2, nodeSettings).get(); - - // Create index test with 1 shard, 1 replica and ensure it is green - createIndex(INDEX); - ensureGreen(INDEX); - - // Disable allocation so the replica cannot be reallocated when it fails - Settings s = Settings.builder().put("cluster.routing.allocation.enable", "none").build(); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(s).get(); - - // Determine which node holds the primary shard - ClusterState state = getNodeClusterState(nodes.get(0)); - IndexShardRoutingTable shard = state.getRoutingTable().index(INDEX).shard(0); - String primaryNode; - String replicaNode; - if (shard.getShards().get(0).primary()) { - primaryNode = nodes.get(0); - replicaNode = nodes.get(1); - } else { - primaryNode = nodes.get(1); - replicaNode = nodes.get(0); - } - logger.info("--> primary shard is on {}", primaryNode); - - // Index a document to make sure everything works well - IndexResponse resp = internalCluster().client(primaryNode).prepareIndex(INDEX, "doc").setSource("foo", "bar").get(); - assertThat("document exists on primary node", - internalCluster().client(primaryNode).prepareGet(INDEX, "doc", resp.getId()).setPreference("_only_local").get().isExists(), - equalTo(true)); - assertThat("document exists on replica node", - internalCluster().client(replicaNode).prepareGet(INDEX, "doc", resp.getId()).setPreference("_only_local").get().isExists(), - equalTo(true)); - - // Disrupt the network so indexing requests fail to replicate - logger.info("--> preventing index/replica operations"); - TransportService mockTransportService = internalCluster().getInstance(TransportService.class, primaryNode); - ((MockTransportService) mockTransportService).addFailToSendNoConnectRule( - internalCluster().getInstance(TransportService.class, replicaNode), - singleton(IndexAction.NAME + "[r]") - ); - mockTransportService = internalCluster().getInstance(TransportService.class, replicaNode); - ((MockTransportService) mockTransportService).addFailToSendNoConnectRule( - internalCluster().getInstance(TransportService.class, primaryNode), - singleton(IndexAction.NAME + "[r]") - ); - - logger.info("--> indexing into primary"); - // the replica shard should now be marked as failed because the replication operation will fail - resp = internalCluster().client(primaryNode).prepareIndex(INDEX, "doc").setSource("foo", "baz").get(); - // wait until the cluster reaches an exact yellow state, meaning replica has failed - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(client().admin().cluster().prepareHealth().get().getStatus(), equalTo(ClusterHealthStatus.YELLOW)); - } - }); - assertThat("document should still be indexed and available", - client().prepareGet(INDEX, "doc", resp.getId()).get().isExists(), equalTo(true)); - - state = getNodeClusterState(randomFrom(nodes.toArray(Strings.EMPTY_ARRAY))); - RoutingNodes rn = state.getRoutingNodes(); - logger.info("--> counts: total: {}, unassigned: {}, initializing: {}, relocating: {}, started: {}", - rn.shards(input -> true).size(), - rn.shardsWithState(UNASSIGNED).size(), - rn.shardsWithState(INITIALIZING).size(), - rn.shardsWithState(RELOCATING).size(), - rn.shardsWithState(STARTED).size()); - logger.info("--> unassigned: {}, initializing: {}, relocating: {}, started: {}", - rn.shardsWithState(UNASSIGNED), - rn.shardsWithState(INITIALIZING), - rn.shardsWithState(RELOCATING), - rn.shardsWithState(STARTED)); - - assertThat("only a single shard is now active (replica should be failed and not reallocated)", - rn.shardsWithState(STARTED).size(), equalTo(1)); - } - - private ClusterState getNodeClusterState(String node) { - return internalCluster().client(node).admin().cluster().prepareState().setLocal(true).get().getState(); - } -} diff --git a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java index 5a257cc64fd..8582ca0e02f 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java @@ -42,7 +42,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.index.Index; @@ -77,9 +76,8 @@ import static org.hamcrest.Matchers.instanceOf; public class RareClusterStateIT extends ESIntegTestCase { @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); + protected boolean addMockZenPings() { + return false; } @Override @@ -173,9 +171,7 @@ public class RareClusterStateIT extends ESIntegTestCase { @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/14932") public void testDeleteCreateInOneBulk() throws Exception { - internalCluster().startNodesAsync(2, Settings.builder() - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen") - .build()).get(); + internalCluster().startNodesAsync(2).get(); assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut()); prepareCreate("test").setSettings(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, true).addMapping("type").get(); ensureGreen("test"); diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index fd57a1198e0..0e777cbd97a 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -44,7 +44,6 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.index.store.IndexStore; @@ -59,7 +58,6 @@ import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.action.admin.cluster.RestClusterStateAction; import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction; import org.elasticsearch.snapshots.mockstore.MockRepository; -import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.InternalTestCluster; @@ -93,13 +91,6 @@ import static org.hamcrest.Matchers.nullValue; @ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0) public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCase { - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - // TODO only restorePersistentSettingsTest needs this maybe factor out? - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); - } - @Override protected Collection> nodePlugins() { return Arrays.asList(MockRepository.Plugin.class); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 200ec6ac4b1..bf69f6016de 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -57,9 +57,9 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.IndexStore; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; @@ -693,7 +693,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertAcked(client.admin().cluster().preparePutRepository("test-repo") .setType("fs").setSettings(Settings.builder().put("location", repositoryLocation))); - prepareCreate("test-idx").setSettings(Settings.builder().put("index.allocation.max_retries", Integer.MAX_VALUE)).get(); + createIndex("test-idx"); ensureGreen(); logger.info("--> indexing some data"); diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index 89a32c11828..0bc4974f285 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -33,19 +33,15 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.MasterNotDiscoveredException; -import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.NodeConfigurationSource; import org.elasticsearch.transport.MockTcpTransportPlugin; -import org.elasticsearch.transport.Transport; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -54,7 +50,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.function.Consumer; @@ -66,6 +61,7 @@ import java.util.stream.StreamSupport; import static java.util.stream.Collectors.toSet; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -195,22 +191,12 @@ public class TribeIT extends ESIntegTestCase { settings.put(Node.NODE_MASTER_SETTING.getKey(), true); settings.put(NetworkModule.HTTP_ENABLED.getKey(), false); settings.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME); - settings.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local"); doWithAllClusters(filter, c -> { String tribeSetting = "tribe." + c.getClusterName() + "."; settings.put(tribeSetting + ClusterName.CLUSTER_NAME_SETTING.getKey(), c.getClusterName()); settings.put(tribeSetting + DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "100ms"); settings.put(tribeSetting + NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME); - settings.put(tribeSetting + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local"); - - Set hosts = new HashSet<>(); - for (Transport transport : c.getInstances(Transport.class)) { - TransportAddress address = transport.boundAddress().publishAddress(); - hosts.add(address.getAddress() + ":" + address.getPort()); - } - settings.putArray(tribeSetting + UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey(), - hosts.toArray(new String[hosts.size()])); }); return settings; @@ -497,7 +483,7 @@ public class TribeIT extends ESIntegTestCase { assertBusy(() -> { ClusterState state = client().admin().cluster().prepareState().setNodes(true).get().getState(); Set nodes = StreamSupport.stream(state.getNodes().spliterator(), false).map(DiscoveryNode::getName).collect(toSet()); - assertThat(nodes.containsAll(expectedNodes), is(true)); + assertThat(nodes, containsInAnyOrder(expectedNodes.toArray())); }); } diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTests.java index 014014b432c..d1050e80adc 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTests.java @@ -35,7 +35,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.repositories.azure.AzureRepository.Repository; @@ -75,14 +74,6 @@ public class AzureSnapshotRestoreTests extends AbstractAzureWithThirdPartyIntegT return testName.contains(" ") ? Strings.split(testName, " ")[0] : testName; } - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - // In snapshot tests, we explicitly disable cloud discovery - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") - .build(); - } - @Override public Settings indexSettings() { // During restore we frequently restore index to exactly the same state it was before, that might cause the same diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java index 7ca0aa57d7a..fd54c5fadbe 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java @@ -26,22 +26,23 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.node.MockNode; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeValidationException; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.transport.MockTcpTransportPlugin; import org.junit.AfterClass; import org.junit.BeforeClass; import java.io.IOException; import java.nio.file.Path; -import java.util.Collections; +import java.util.Arrays; +import java.util.List; import static org.hamcrest.CoreMatchers.either; import static org.hamcrest.CoreMatchers.equalTo; @@ -63,25 +64,25 @@ public class TribeUnitTests extends ESTestCase { Settings baseSettings = Settings.builder() .put(NetworkModule.HTTP_ENABLED.getKey(), false) .put("transport.type", MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2) .build(); + final List> mockPlugins = Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class); tribe1 = new TribeClientNode( Settings.builder() .put(baseSettings) .put("cluster.name", "tribe1") .put("node.name", "tribe1_node") .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong()) - .build(), Collections.singleton(MockTcpTransportPlugin.class)).start(); + .build(), mockPlugins).start(); tribe2 = new TribeClientNode( Settings.builder() .put(baseSettings) .put("cluster.name", "tribe2") .put("node.name", "tribe2_node") .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong()) - .build(), Collections.singleton(MockTcpTransportPlugin.class)).start(); + .build(), mockPlugins).start(); } @AfterClass @@ -106,11 +107,10 @@ public class TribeUnitTests extends ESTestCase { .put("transport.type", MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).put("discovery.type", "local") .put("tribe.t1.transport.type", MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) .put("tribe.t2.transport.type",MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) - .put("tribe.t1.discovery.type", "local").put("tribe.t2.discovery.type", "local") .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) .put(extraSettings).build(); - try (Node node = new MockNode(settings, Collections.singleton(MockTcpTransportPlugin.class)).start()) { + try (Node node = new MockNode(settings, Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class)).start()) { try (Client client = node.client()) { assertBusy(() -> { ClusterState state = client.admin().cluster().prepareState().clear().setNodes(true).get().getState(); diff --git a/test/framework/src/main/java/org/elasticsearch/node/NodeTests.java b/test/framework/src/main/java/org/elasticsearch/node/NodeTests.java index cf565499a8d..ea924734765 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/NodeTests.java +++ b/test/framework/src/main/java/org/elasticsearch/node/NodeTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.transport.MockTcpTransportPlugin; import java.io.IOException; import java.nio.file.Path; @@ -46,13 +47,12 @@ public class NodeTests extends ESTestCase { .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), InternalTestCluster.clusterName("single-node-cluster", randomLong())) .put(Environment.PATH_HOME_SETTING.getKey(), tempDir) .put(NetworkModule.HTTP_ENABLED.getKey(), false) - .put("discovery.type", "local") - .put("transport.type", "local") + .put("transport.type", "mock-socket-network") .put(Node.NODE_DATA_SETTING.getKey(), true); if (name != null) { settings.put(Node.NODE_NAME_SETTING.getKey(), name); } - try (Node node = new MockNode(settings.build(), Collections.emptyList())) { + try (Node node = new MockNode(settings.build(), Collections.singleton(MockTcpTransportPlugin.class))) { final Settings nodeSettings = randomBoolean() ? node.settings() : node.getEnvironment().settings(); if (name == null) { assertThat(Node.NODE_NAME_SETTING.get(nodeSettings), equalTo(node.getNodeEnvironment().nodeId().substring(0, 7))); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java index 16647b04a47..0ece6fad393 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java @@ -29,8 +29,6 @@ import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoveryModule; -import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.listeners.LoggingListener; @@ -206,6 +204,11 @@ public abstract class ESBackcompatTestCase extends ESIntegTestCase { return finalSettings.build(); } + @Override + protected boolean addMockZenPings() { + return false; + } + protected int minExternalNodes() { return 1; } protected int maxExternalNodes() { @@ -243,7 +246,6 @@ public abstract class ESBackcompatTestCase extends ESIntegTestCase { protected Settings commonNodeSettings(int nodeOrdinal) { Settings.Builder builder = Settings.builder().put(requiredSettings()); builder.put(NetworkModule.TRANSPORT_TYPE_KEY, randomBoolean() ? "netty3" : "netty4"); // run same transport / disco as external - builder.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen"); return builder.build(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 26059ba60ca..1f2f01cc352 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -28,15 +28,8 @@ import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.TestUtil; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; -import org.elasticsearch.discovery.DiscoveryModule; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.transport.AssertingTransportInterceptor; -import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; @@ -65,6 +58,7 @@ import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -74,6 +68,7 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -99,9 +94,10 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.ElectMasterService; +import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; @@ -119,12 +115,16 @@ import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.node.NodeMocksPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.MockSearchService; import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.client.RandomizingClient; +import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.AssertingTransportInterceptor; +import org.elasticsearch.transport.MockTcpTransportPlugin; import org.hamcrest.Matchers; import org.junit.After; import org.junit.AfterClass; @@ -1750,7 +1750,6 @@ public abstract class ESIntegTestCase extends ESTestCase { public Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(NetworkModule.HTTP_ENABLED.getKey(), false) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") .put(networkSettings.build()). put(ESIntegTestCase.this.nodeSettings(nodeOrdinal)).build(); } @@ -1787,6 +1786,10 @@ public abstract class ESIntegTestCase extends ESTestCase { return true; } + protected boolean addMockZenPings() { + return true; + } + /** * Returns a function that allows to wrap / filter all clients that are exposed by the test cluster. This is useful * for debugging or request / response pre and post processing. It also allows to intercept all calls done by the test @@ -1823,6 +1826,10 @@ public abstract class ESIntegTestCase extends ESTestCase { if (addMockTransportService()) { mocks.add(MockTcpTransportPlugin.class); } + + if (addMockZenPings()) { + mocks.add(MockZenPing.TestPlugin.class); + } mocks.add(TestSeedPlugin.class); return Collections.unmodifiableList(mocks); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index 2752e0f2eca..f096e662f4a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -45,6 +45,7 @@ import org.elasticsearch.node.NodeValidationException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.MockTcpTransportPlugin; import org.junit.After; @@ -181,7 +182,6 @@ public abstract class ESSingleNodeTestCase extends ESTestCase { .put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 1000) .put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created .put(NetworkModule.HTTP_ENABLED.getKey(), false) - .put("discovery.type", "local") .put("transport.type", MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) .put(Node.NODE_DATA_SETTING.getKey(), true) .put(nodeSettings()) // allow test cases to provide their own settings or override these @@ -191,6 +191,10 @@ public abstract class ESSingleNodeTestCase extends ESTestCase { plugins = new ArrayList<>(plugins); plugins.add(MockTcpTransportPlugin.class); } + if (plugins.contains(MockZenPing.TestPlugin.class) == false) { + plugins = new ArrayList<>(plugins); + plugins.add(MockZenPing.TestPlugin.class); + } Node build = new MockNode(settings, plugins); try { build.start(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java index b393498ec89..3fd2b024a1d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.NodeConfigurationSource; @@ -39,7 +38,7 @@ import java.util.Set; public class ClusterDiscoveryConfiguration extends NodeConfigurationSource { - static Settings DEFAULT_NODE_SETTINGS = Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); + static Settings DEFAULT_NODE_SETTINGS = Settings.EMPTY; private static final String IP_ADDR = "127.0.0.1"; final int numOfNodes; diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java new file mode 100644 index 00000000000..d1e38e061bf --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java @@ -0,0 +1,107 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.discovery; + +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.discovery.DiscoveryModule; +import org.elasticsearch.discovery.zen.ping.PingContextProvider; +import org.elasticsearch.discovery.zen.ping.ZenPing; +import org.elasticsearch.plugins.DiscoveryPlugin; +import org.elasticsearch.plugins.Plugin; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A {@link ZenPing} implementation which returns results based on an static in-memory map. This allows pinging + * to be immediate and can be used to speed up tests. + */ +public final class MockZenPing extends AbstractLifecycleComponent implements ZenPing { + + static final Map> activeNodesPerCluster = ConcurrentCollections.newConcurrentMap(); + + private volatile PingContextProvider contextProvider; + + @Inject + public MockZenPing(Settings settings) { + super(settings); + } + + @Override + public void setPingContextProvider(PingContextProvider contextProvider) { + this.contextProvider = contextProvider; + } + + @Override + public void ping(PingListener listener, TimeValue timeout) { + logger.info("pinging using mock zen ping"); + List responseList = getActiveNodesForCurrentCluster().stream() + .filter(p -> p != this) // remove this as pings are not expected to return the local node + .map(MockZenPing::getPingResponse) + .collect(Collectors.toList()); + listener.onPing(responseList); + } + + private ClusterName getClusterName() { + return contextProvider.clusterState().getClusterName(); + } + + private PingResponse getPingResponse() { + final ClusterState clusterState = contextProvider.clusterState(); + return new PingResponse(clusterState.nodes().getLocalNode(), clusterState.nodes().getMasterNode(), clusterState); + } + + @Override + protected void doStart() { + assert contextProvider != null; + boolean added = getActiveNodesForCurrentCluster().add(this); + assert added; + } + + private Set getActiveNodesForCurrentCluster() { + return activeNodesPerCluster.computeIfAbsent(getClusterName(), + clusterName -> ConcurrentCollections.newConcurrentSet()); + } + + @Override + protected void doStop() { + boolean found = getActiveNodesForCurrentCluster().remove(this); + assert found; + } + + @Override + protected void doClose() { + + } + + public static class TestPlugin extends Plugin implements DiscoveryPlugin { + + public void onModule(DiscoveryModule discoveryModule) { + discoveryModule.addZenPing(MockZenPing.class); + } + } +} diff --git a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java index e5b704b2f2c..7c001f910d7 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java @@ -27,13 +27,13 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.NodeConfigurationSource; -import org.elasticsearch.transport.MockTcpTransport; +import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.transport.TransportSettings; @@ -141,7 +141,6 @@ public class InternalTestClusterTests extends ESTestCase { NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2 * ((masterNodes ? InternalTestCluster.DEFAULT_HIGH_NUM_MASTER_NODES : 0) + maxNumDataNodes + numClientNodes)) .put(NetworkModule.HTTP_ENABLED.getKey(), false) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build(); } @@ -156,12 +155,13 @@ public class InternalTestClusterTests extends ESTestCase { String nodePrefix = "foobar"; Path baseDir = createTempDir(); + final List> mockPlugins = Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class); InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, masterNodes, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes, - enableHttpPipelining, nodePrefix, Collections.singleton(MockTcpTransportPlugin.class), Function.identity()); + enableHttpPipelining, nodePrefix, mockPlugins, Function.identity()); InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, baseDir, masterNodes, minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes, - enableHttpPipelining, nodePrefix, Collections.singleton(MockTcpTransportPlugin.class), Function.identity()); + enableHttpPipelining, nodePrefix, mockPlugins, Function.identity()); assertClusters(cluster0, cluster1, false); long seed = randomLong(); @@ -205,7 +205,6 @@ public class InternalTestClusterTests extends ESTestCase { NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2 + (masterNodes ? InternalTestCluster.DEFAULT_HIGH_NUM_MASTER_NODES : 0) + maxNumDataNodes + numClientNodes) .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") .build(); } @Override @@ -219,7 +218,8 @@ public class InternalTestClusterTests extends ESTestCase { Path baseDir = createTempDir(); InternalTestCluster cluster = new InternalTestCluster(clusterSeed, baseDir, masterNodes, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes, - enableHttpPipelining, nodePrefix, Collections.singleton(MockTcpTransportPlugin.class), Function.identity()); + enableHttpPipelining, nodePrefix, Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class), + Function.identity()); try { cluster.beforeTest(random(), 0.0); final Map shardNodePaths = new HashMap<>(); @@ -288,7 +288,6 @@ public class InternalTestClusterTests extends ESTestCase { .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), numNodes) .put(NetworkModule.HTTP_ENABLED.getKey(), false) .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") .put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0).build(); } @@ -297,7 +296,7 @@ public class InternalTestClusterTests extends ESTestCase { return Settings.builder() .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build(); } - }, 0, randomBoolean(), "", Collections.singleton(MockTcpTransportPlugin.class), Function.identity()); + }, 0, randomBoolean(), "", Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class), Function.identity()); cluster.beforeTest(random(), 0.0); try { Map> pathsPerRole = new HashMap<>();