diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java index e592b5092b7..7fc0a4d09b0 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -52,7 +52,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.local.LocalDiscovery; import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import java.io.IOException; @@ -72,8 +71,7 @@ import java.util.Set; * single thread and controlled by the {@link ClusterService}. After every update the * {@link Discovery#publish} method publishes new version of the cluster state to all other nodes in the * cluster. The actual publishing mechanism is delegated to the {@link Discovery#publish} method and depends on - * the type of discovery. For example, for local discovery it is implemented by the {@link LocalDiscovery#publish} - * method. In the Zen Discovery it is handled in the {@link PublishClusterStateAction#publish} method. The + * the type of discovery. In the Zen Discovery it is handled in the {@link PublishClusterStateAction#publish} method. The * publishing mechanism can be overridden by other discovery. *

* The cluster state implements the {@link Diffable} interface in order to support publishing of cluster state diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java index 395d3472329..0e7159c857b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java @@ -54,7 +54,8 @@ public class MaxRetryAllocationDecider extends AllocationDecider { @Override public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) { - UnassignedInfo unassignedInfo = shardRouting.unassignedInfo(); + final UnassignedInfo unassignedInfo = shardRouting.unassignedInfo(); + final Decision decision; if (unassignedInfo != null && unassignedInfo.getNumFailedAllocations() > 0) { final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index()); final int maxRetry = SETTING_ALLOCATION_MAX_RETRY.get(indexMetaData.getSettings()); @@ -62,16 +63,21 @@ public class MaxRetryAllocationDecider extends AllocationDecider { // if we are called via the _reroute API we ignore the failure counter and try to allocate // this improves the usability since people don't need to raise the limits to issue retries since a simple _reroute call is // enough to manually retry. - return allocation.decision(Decision.YES, NAME, "shard has already failed allocating [" + decision = allocation.decision(Decision.YES, NAME, "shard has already failed allocating [" + unassignedInfo.getNumFailedAllocations() + "] times vs. [" + maxRetry + "] retries allowed " + unassignedInfo.toString() + " - retrying once on manual allocation"); } else if (unassignedInfo.getNumFailedAllocations() >= maxRetry) { - return allocation.decision(Decision.NO, NAME, "shard has already failed allocating [" + decision = allocation.decision(Decision.NO, NAME, "shard has already failed allocating [" + unassignedInfo.getNumFailedAllocations() + "] times vs. [" + maxRetry + "] retries allowed " + unassignedInfo.toString() + " - manually call [/_cluster/reroute?retry_failed=true] to retry"); + } else { + decision = allocation.decision(Decision.YES, NAME, "shard has already failed allocating [" + + unassignedInfo.getNumFailedAllocations() + "] times but [" + maxRetry + "] retries are allowed"); } + } else { + decision = allocation.decision(Decision.YES, NAME, "shard has no previous failures"); } - return allocation.decision(Decision.YES, NAME, "shard has no previous failures"); + return decision; } @Override diff --git a/core/src/main/java/org/elasticsearch/common/util/ExtensionPoint.java b/core/src/main/java/org/elasticsearch/common/util/ExtensionPoint.java index fcdfaafb1d5..a5dac12fab7 100644 --- a/core/src/main/java/org/elasticsearch/common/util/ExtensionPoint.java +++ b/core/src/main/java/org/elasticsearch/common/util/ExtensionPoint.java @@ -200,6 +200,10 @@ public abstract class ExtensionPoint { allocationMultibinder.addBinding().to(clazz); } } + + public boolean isEmpty() { + return extensions.isEmpty(); + } } /** diff --git a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index b41316b6534..1e41204d64a 100644 --- a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.ExtensionPoint; -import org.elasticsearch.discovery.local.LocalDiscovery; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.ping.ZenPing; @@ -59,11 +58,9 @@ public class DiscoveryModule extends AbstractModule { public DiscoveryModule(Settings settings) { this.settings = settings; - addDiscoveryType("local", LocalDiscovery.class); + addDiscoveryType("none", NoneDiscovery.class); addDiscoveryType("zen", ZenDiscovery.class); addElectMasterService("zen", ElectMasterService.class); - // always add the unicast hosts, or things get angry! - addZenPing(UnicastZenPing.class); } /** @@ -113,7 +110,7 @@ public class DiscoveryModule extends AbstractModule { throw new IllegalArgumentException("Unknown Discovery type [" + discoveryType + "]"); } - if (discoveryType.equals("local") == false) { + if (discoveryType.equals("none") == false) { String masterServiceTypeKey = ZEN_MASTER_SERVICE_TYPE_SETTING.get(settings); final Class masterService = masterServiceType.get(masterServiceTypeKey); if (masterService == null) { @@ -130,6 +127,9 @@ public class DiscoveryModule extends AbstractModule { unicastHostProviders.getOrDefault(discoveryType, Collections.emptyList())) { unicastHostsProviderMultibinder.addBinding().to(unicastHostProvider); } + if (zenPings.isEmpty()) { + zenPings.registerExtension(UnicastZenPing.class); + } zenPings.bind(binder()); } bind(Discovery.class).to(discoveryClass).asEagerSingleton(); diff --git a/core/src/main/java/org/elasticsearch/discovery/NoneDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/NoneDiscovery.java new file mode 100644 index 00000000000..91b04ce396b --- /dev/null +++ b/core/src/main/java/org/elasticsearch/discovery/NoneDiscovery.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.discovery; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.zen.ElectMasterService; + +/** + * A {@link Discovery} implementation that is used by {@link org.elasticsearch.tribe.TribeService}. This implementation + * doesn't support any clustering features. Most notably {@link #startInitialJoin()} does nothing and + * {@link #publish(ClusterChangedEvent, AckListener)} is not supported. + */ +public class NoneDiscovery extends AbstractLifecycleComponent implements Discovery { + + private final ClusterService clusterService; + private final DiscoverySettings discoverySettings; + + @Inject + public NoneDiscovery(Settings settings, ClusterService clusterService, ClusterSettings clusterSettings) { + super(settings); + this.clusterService = clusterService; + this.discoverySettings = new DiscoverySettings(settings, clusterSettings); + } + + @Override + public DiscoveryNode localNode() { + return clusterService.localNode(); + } + + @Override + public String nodeDescription() { + return clusterService.getClusterName().value() + "/" + clusterService.localNode().getId(); + } + + @Override + public void setAllocationService(AllocationService allocationService) { + + } + + @Override + public void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener) { + throw new UnsupportedOperationException(); + } + + @Override + public DiscoveryStats stats() { + return null; + } + + @Override + public DiscoverySettings getDiscoverySettings() { + return discoverySettings; + } + + @Override + public void startInitialJoin() { + + } + + @Override + public int getMinimumMasterNodes() { + return ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings); + } + + @Override + protected void doStart() { + + } + + @Override + protected void doStop() { + + } + + @Override + protected void doClose() { + + } +} diff --git a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java deleted file mode 100644 index 389f5ee03bb..00000000000 --- a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ /dev/null @@ -1,422 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.discovery.local; - -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.Diff; -import org.elasticsearch.cluster.IncompatibleClusterStateVersionException; -import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.allocation.AllocationService; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler; -import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler; -import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.DiscoveryStats; -import org.elasticsearch.discovery.zen.publish.PendingClusterStateStats; - -import java.util.HashSet; -import java.util.Optional; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; - -import static org.elasticsearch.cluster.ClusterState.Builder; - -public class LocalDiscovery extends AbstractLifecycleComponent implements Discovery { - - private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0]; - - private final ClusterService clusterService; - private AllocationService allocationService; - private final ClusterName clusterName; - - private final DiscoverySettings discoverySettings; - - private volatile boolean master = false; - - private static final ConcurrentMap clusterGroups = ConcurrentCollections.newConcurrentMap(); - - private volatile ClusterState lastProcessedClusterState; - - @Inject - public LocalDiscovery(Settings settings, ClusterService clusterService, ClusterSettings clusterSettings) { - super(settings); - this.clusterName = clusterService.getClusterName(); - this.clusterService = clusterService; - this.discoverySettings = new DiscoverySettings(settings, clusterSettings); - } - - @Override - public void setAllocationService(AllocationService allocationService) { - this.allocationService = allocationService; - } - - @Override - protected void doStart() { - - } - - @Override - public void startInitialJoin() { - synchronized (clusterGroups) { - ClusterGroup clusterGroup = clusterGroups.get(clusterName); - if (clusterGroup == null) { - clusterGroup = new ClusterGroup(); - clusterGroups.put(clusterName, clusterGroup); - } - logger.debug("Connected to cluster [{}]", clusterName); - - Optional current = clusterGroup.members().stream().filter(other -> ( - other.localNode().equals(this.localNode()) || other.localNode().getId().equals(this.localNode().getId()) - )).findFirst(); - if (current.isPresent()) { - throw new IllegalStateException("current cluster group already contains a node with the same id. current " - + current.get().localNode() + ", this node " + localNode()); - } - - clusterGroup.members().add(this); - - LocalDiscovery firstMaster = null; - for (LocalDiscovery localDiscovery : clusterGroup.members()) { - if (localDiscovery.localNode().isMasterNode()) { - firstMaster = localDiscovery; - break; - } - } - - if (firstMaster != null && firstMaster.equals(this)) { - // we are the first master (and the master) - master = true; - final LocalDiscovery master = firstMaster; - clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ClusterStateUpdateTask() { - - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) { - DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); - for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) { - nodesBuilder.add(discovery.localNode()); - } - nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId()); - // remove the NO_MASTER block in this case - ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()); - return ClusterState.builder(currentState).nodes(nodesBuilder).blocks(blocks).build(); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e); - } - }); - } else if (firstMaster != null) { - // tell the master to send the fact that we are here - final LocalDiscovery master = firstMaster; - firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode() + "])", new ClusterStateUpdateTask() { - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) { - DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); - for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) { - nodesBuilder.add(discovery.localNode()); - } - nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId()); - currentState = ClusterState.builder(currentState).nodes(nodesBuilder).build(); - return master.allocationService.reroute(currentState, "node_add"); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e); - } - - }); - } - } // else, no master node, the next node that will start will fill things in... - } - - @Override - protected void doStop() { - synchronized (clusterGroups) { - ClusterGroup clusterGroup = clusterGroups.get(clusterName); - if (clusterGroup == null) { - logger.warn("Illegal state, should not have an empty cluster group when stopping, I should be there at teh very least..."); - return; - } - clusterGroup.members().remove(this); - if (clusterGroup.members().isEmpty()) { - // no more members, remove and return - clusterGroups.remove(clusterName); - return; - } - - LocalDiscovery firstMaster = null; - for (LocalDiscovery localDiscovery : clusterGroup.members()) { - if (localDiscovery.localNode().isMasterNode()) { - firstMaster = localDiscovery; - break; - } - } - - if (firstMaster != null) { - // if the removed node is the master, make the next one as the master - if (master) { - firstMaster.master = true; - } - - final Set newMembers = new HashSet<>(); - for (LocalDiscovery discovery : clusterGroup.members()) { - newMembers.add(discovery.localNode().getId()); - } - - final LocalDiscovery master = firstMaster; - master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateUpdateTask() { - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) { - DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode().getId()); - DiscoveryNodes.Delta delta = newNodes.delta(currentState.nodes()); - if (delta.added()) { - logger.warn("No new nodes should be created when a new discovery view is accepted"); - } - // reroute here, so we eagerly remove dead nodes from the routing - ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build(); - return master.allocationService.deassociateDeadNodes(updatedState, true, "node stopped"); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e); - } - }); - } - } - } - - @Override - protected void doClose() { - } - - @Override - public DiscoveryNode localNode() { - return clusterService.localNode(); - } - - @Override - public String nodeDescription() { - return clusterName.value() + "/" + localNode().getId(); - } - - @Override - public void publish(ClusterChangedEvent clusterChangedEvent, final Discovery.AckListener ackListener) { - if (!master) { - throw new IllegalStateException("Shouldn't publish state when not master"); - } - LocalDiscovery[] members = members(); - if (members.length > 0) { - Set nodesToPublishTo = new HashSet<>(members.length); - for (LocalDiscovery localDiscovery : members) { - if (localDiscovery.master) { - continue; - } - nodesToPublishTo.add(localDiscovery.localNode()); - } - publish(members, clusterChangedEvent, new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener)); - } - } - - @Override - public DiscoveryStats stats() { - return new DiscoveryStats((PendingClusterStateStats)null); - } - - @Override - public DiscoverySettings getDiscoverySettings() { - return discoverySettings; - } - - @Override - public int getMinimumMasterNodes() { - return -1; - } - - private LocalDiscovery[] members() { - ClusterGroup clusterGroup = clusterGroups.get(clusterName); - if (clusterGroup == null) { - return NO_MEMBERS; - } - Queue members = clusterGroup.members(); - return members.toArray(new LocalDiscovery[members.size()]); - } - - private void publish(LocalDiscovery[] members, ClusterChangedEvent clusterChangedEvent, final BlockingClusterStatePublishResponseHandler publishResponseHandler) { - - try { - // we do the marshaling intentionally, to check it works well... - byte[] clusterStateBytes = null; - byte[] clusterStateDiffBytes = null; - - ClusterState clusterState = clusterChangedEvent.state(); - for (final LocalDiscovery discovery : members) { - if (discovery.master) { - continue; - } - ClusterState newNodeSpecificClusterState = null; - synchronized (this) { - // we do the marshaling intentionally, to check it works well... - // check if we published cluster state at least once and node was in the cluster when we published cluster state the last time - if (discovery.lastProcessedClusterState != null && clusterChangedEvent.previousState().nodes().nodeExists(discovery.localNode())) { - // both conditions are true - which means we can try sending cluster state as diffs - if (clusterStateDiffBytes == null) { - Diff diff = clusterState.diff(clusterChangedEvent.previousState()); - BytesStreamOutput os = new BytesStreamOutput(); - diff.writeTo(os); - clusterStateDiffBytes = BytesReference.toBytes(os.bytes()); - } - try { - newNodeSpecificClusterState = discovery.lastProcessedClusterState.readDiffFrom(StreamInput.wrap(clusterStateDiffBytes)).apply(discovery.lastProcessedClusterState); - logger.trace("sending diff cluster state version [{}] with size {} to [{}]", clusterState.version(), clusterStateDiffBytes.length, discovery.localNode().getName()); - } catch (IncompatibleClusterStateVersionException ex) { - logger.warn((Supplier) () -> new ParameterizedMessage("incompatible cluster state version [{}] - resending complete cluster state", clusterState.version()), ex); - } - } - if (newNodeSpecificClusterState == null) { - if (clusterStateBytes == null) { - clusterStateBytes = Builder.toBytes(clusterState); - } - newNodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode()); - } - discovery.lastProcessedClusterState = newNodeSpecificClusterState; - } - final ClusterState nodeSpecificClusterState = newNodeSpecificClusterState; - - nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED); - // ignore cluster state messages that do not include "me", not in the game yet... - if (nodeSpecificClusterState.nodes().getLocalNode() != null) { - assert nodeSpecificClusterState.nodes().getMasterNode() != null : "received a cluster state without a master"; - assert !nodeSpecificClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block"; - - discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ClusterStateUpdateTask() { - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) { - if (currentState.supersedes(nodeSpecificClusterState)) { - return currentState; - } - - if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) { - // its a fresh update from the master as we transition from a start of not having a master to having one - logger.debug("got first state from fresh master [{}]", nodeSpecificClusterState.nodes().getMasterNodeId()); - return nodeSpecificClusterState; - } - - ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState); - // if the routing table did not change, use the original one - if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) { - builder.routingTable(currentState.routingTable()); - } - if (nodeSpecificClusterState.metaData().version() == currentState.metaData().version()) { - builder.metaData(currentState.metaData()); - } - - return builder.build(); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e); - publishResponseHandler.onFailure(discovery.localNode(), e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - publishResponseHandler.onResponse(discovery.localNode()); - } - }); - } else { - publishResponseHandler.onResponse(discovery.localNode()); - } - } - - TimeValue publishTimeout = discoverySettings.getPublishTimeout(); - if (publishTimeout.millis() > 0) { - try { - boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout); - if (!awaited) { - DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes(); - // everyone may have just responded - if (pendingNodes.length > 0) { - logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})", clusterState.version(), publishTimeout, pendingNodes); - } - } - } catch (InterruptedException e) { - // ignore & restore interrupt - Thread.currentThread().interrupt(); - } - } - - - } catch (Exception e) { - // failure to marshal or un-marshal - throw new IllegalStateException("Cluster state failed to serialize", e); - } - } - - private class ClusterGroup { - - private Queue members = ConcurrentCollections.newQueue(); - - Queue members() { - return members; - } - } -} diff --git a/core/src/main/java/org/elasticsearch/tribe/TribeService.java b/core/src/main/java/org/elasticsearch/tribe/TribeService.java index fd697340cd7..7871a0a6f39 100644 --- a/core/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/core/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -124,7 +124,7 @@ public class TribeService extends AbstractLifecycleComponent { if (!NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.exists(settings)) { sb.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), nodesSettings.size()); } - sb.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local"); // a tribe node should not use zen discovery + sb.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "none"); // a tribe node should not use zen discovery // nothing is going to be discovered, since no master will be elected sb.put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0); if (sb.get("cluster.name") == null) { diff --git a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java index 29dd2a150a3..38328a80054 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java @@ -22,7 +22,6 @@ package org.elasticsearch.action.support.master; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.fd.FaultDetection; @@ -46,6 +45,11 @@ import static org.hamcrest.Matchers.equalTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class IndexingMasterFailoverIT extends ESIntegTestCase { + @Override + protected boolean addMockZenPings() { + return false; + } + @Override protected Collection> nodePlugins() { final HashSet> classes = new HashSet<>(super.nodePlugins()); @@ -62,7 +66,6 @@ public class IndexingMasterFailoverIT extends ESIntegTestCase { logger.info("--> start 4 nodes, 3 master, 1 data"); final Settings sharedSettings = Settings.builder() - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen") .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java index 93c5e29208c..dbb066dcb1b 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java @@ -32,12 +32,13 @@ import org.elasticsearch.node.NodeValidationException; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.transport.MockTransportClient; import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.Collections; +import java.util.Arrays; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -65,7 +66,7 @@ public class TransportClientIT extends ESIntegTestCase { .put(NetworkModule.HTTP_ENABLED.getKey(), false) .put(Node.NODE_DATA_SETTING.getKey(), false) .put("cluster.name", "foobar") - .build(), Collections.singleton(MockTcpTransportPlugin.class)).start()) { + .build(), Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class)).start()) { TransportAddress transportAddress = node.injector().getInstance(TransportService.class).boundAddress().publishAddress(); client.addTransportAddress(transportAddress); // since we force transport clients there has to be one node started that we connect to. diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index b8e95343932..83472a1edc4 100644 --- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; @@ -75,15 +74,13 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { } @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); + protected boolean addMockZenPings() { + return false; } public void testSimpleMinimumMasterNodes() throws Exception { Settings settings = Settings.builder() - .put("discovery.type", "zen") .put("discovery.zen.minimum_master_nodes", 2) .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "200ms") .put("discovery.initial_state_timeout", "500ms") @@ -195,7 +192,6 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { public void testMultipleNodesShutdownNonMasterNodes() throws Exception { Settings settings = Settings.builder() - .put("discovery.type", "zen") .put("discovery.zen.minimum_master_nodes", 3) .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "1s") .put("discovery.initial_state_timeout", "500ms") @@ -271,7 +267,6 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { public void testDynamicUpdateMinimumMasterNodes() throws Exception { Settings settings = Settings.builder() - .put("discovery.type", "zen") .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "400ms") .put("discovery.initial_state_timeout", "500ms") .build(); @@ -329,7 +324,6 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { public void testCanNotBringClusterDown() throws ExecutionException, InterruptedException { int nodeCount = scaledRandomIntBetween(1, 5); Settings.Builder settings = Settings.builder() - .put("discovery.type", "zen") .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "200ms") .put("discovery.initial_state_timeout", "500ms"); @@ -368,7 +362,6 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { public void testCanNotPublishWithoutMinMastNodes() throws Exception { Settings settings = Settings.builder() - .put("discovery.type", "zen") .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "200ms") .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2) .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "100ms") // speed things up diff --git a/core/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java index 4d62f9b664f..c033ad7ff27 100644 --- a/core/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java @@ -22,7 +22,6 @@ package org.elasticsearch.cluster; import org.apache.lucene.search.join.ScoreMode; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.node.Node; @@ -41,18 +40,9 @@ import static org.hamcrest.Matchers.nullValue; @ClusterScope(scope = Scope.TEST, numDataNodes = 0) public class SpecificMasterNodesIT extends ESIntegTestCase { - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); - } - protected final Settings.Builder settingsBuilder() { - return Settings.builder().put("discovery.type", "zen"); - } - public void testSimpleOnlyMasterNodeElection() throws IOException { logger.info("--> start data node / non master node"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false).put("discovery.initial_state_timeout", "1s")); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false).put("discovery.initial_state_timeout", "1s")); try { assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().getMasterNodeId(), nullValue()); fail("should not be able to find master"); @@ -60,7 +50,7 @@ public class SpecificMasterNodesIT extends ESIntegTestCase { // all is well, no master elected } logger.info("--> start master node"); - final String masterNodeName = internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + final String masterNodeName = internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); assertThat(internalCluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); @@ -75,14 +65,14 @@ public class SpecificMasterNodesIT extends ESIntegTestCase { } logger.info("--> start master node"); - final String nextMasterEligibleNodeName = internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + final String nextMasterEligibleNodeName = internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligibleNodeName)); assertThat(internalCluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligibleNodeName)); } public void testElectOnlyBetweenMasterNodes() throws IOException { logger.info("--> start data node / non master node"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false).put("discovery.initial_state_timeout", "1s")); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false).put("discovery.initial_state_timeout", "1s")); try { assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().getMasterNodeId(), nullValue()); fail("should not be able to find master"); @@ -90,12 +80,12 @@ public class SpecificMasterNodesIT extends ESIntegTestCase { // all is well, no master elected } logger.info("--> start master node (1)"); - final String masterNodeName = internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + final String masterNodeName = internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); assertThat(internalCluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); logger.info("--> start master node (2)"); - final String nextMasterEligableNodeName = internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + final String nextMasterEligableNodeName = internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); assertThat(internalCluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); @@ -112,10 +102,10 @@ public class SpecificMasterNodesIT extends ESIntegTestCase { */ public void testCustomDefaultMapping() throws Exception { logger.info("--> start master node / non data"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); logger.info("--> start data node / non master node"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false)); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false)); createIndex("test"); assertAcked(client().admin().indices().preparePutMapping("test").setType("_default_").setSource("timestamp", "type=date")); @@ -134,10 +124,10 @@ public class SpecificMasterNodesIT extends ESIntegTestCase { public void testAliasFilterValidation() throws Exception { logger.info("--> start master node / non data"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); logger.info("--> start data node / non master node"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false)); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false)); assertAcked(prepareCreate("test").addMapping("type1", "{\"type1\" : {\"properties\" : {\"table_a\" : { \"type\" : \"nested\", \"properties\" : {\"field_a\" : { \"type\" : \"keyword\" },\"field_b\" :{ \"type\" : \"keyword\" }}}}}}")); client().admin().indices().prepareAliases().addAlias("test", "a_test", QueryBuilders.nestedQuery("table_a", QueryBuilders.termQuery("table_a.field_b", "y"), ScoreMode.Avg)).get(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index 9f21d133264..6243f138380 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -30,7 +30,6 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimary import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -63,9 +62,8 @@ public class PrimaryAllocationIT extends ESIntegTestCase { } @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); + protected boolean addMockZenPings() { + return false; } private void createStaleReplicaScenario() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java index a25e3abe65d..111b84f981a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Singleton; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -68,17 +67,8 @@ public class ClusterServiceIT extends ESIntegTestCase { return Arrays.asList(TestPlugin.class); } - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); - } - public void testAckedUpdateTask() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); + internalCluster().startNode(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class); final AtomicBoolean allNodesAcked = new AtomicBoolean(false); @@ -151,10 +141,7 @@ public class ClusterServiceIT extends ESIntegTestCase { } public void testAckedUpdateTaskSameClusterState() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); + internalCluster().startNode(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class); final AtomicBoolean allNodesAcked = new AtomicBoolean(false); @@ -222,10 +209,7 @@ public class ClusterServiceIT extends ESIntegTestCase { } public void testAckedUpdateTaskNoAckExpected() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); + internalCluster().startNode(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class); final AtomicBoolean allNodesAcked = new AtomicBoolean(false); @@ -294,10 +278,7 @@ public class ClusterServiceIT extends ESIntegTestCase { } public void testAckedUpdateTaskTimeoutZero() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); + internalCluster().startNode(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class); final AtomicBoolean allNodesAcked = new AtomicBoolean(false); @@ -371,11 +352,8 @@ public class ClusterServiceIT extends ESIntegTestCase { @TestLogging("_root:debug,org.elasticsearch.action.admin.cluster.tasks:trace") public void testPendingUpdateTask() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "local") - .build(); - String node_0 = internalCluster().startNode(settings); - internalCluster().startCoordinatingOnlyNode(settings); + String node_0 = internalCluster().startNode(); + internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node_0); final CountDownLatch block1 = new CountDownLatch(1); @@ -507,7 +485,6 @@ public class ClusterServiceIT extends ESIntegTestCase { public void testLocalNodeMasterListenerCallbacks() throws Exception { Settings settings = Settings.builder() - .put("discovery.type", "zen") .put("discovery.zen.minimum_master_nodes", 1) .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "400ms") .put("discovery.initial_state_timeout", "500ms") diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index 58a67769730..b400b0c7a5f 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -128,6 +128,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { private ClusterDiscoveryConfiguration discoveryConfig; + @Override + protected boolean addMockZenPings() { + return false; + } @Override protected Settings nodeSettings(int nodeOrdinal) { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java index d336534a158..c99187c7a93 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java @@ -24,9 +24,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -38,7 +36,6 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.membership.MembershipAction; @@ -46,7 +43,6 @@ import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.TestCustomMetaData; -import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BytesTransportRequest; @@ -55,7 +51,6 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.hamcrest.Matchers; -import org.junit.Before; import java.io.IOException; import java.net.UnknownHostException; @@ -78,32 +73,10 @@ import static org.hamcrest.Matchers.notNullValue; @TestLogging("_root:DEBUG") public class ZenDiscoveryIT extends ESIntegTestCase { - private Version previousMajorVersion; - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); - } - - @Before - public void computePrevMajorVersion() { - Version previousMajor; - // find a GA build whose major version is statesFound = new ArrayList<>(); final CountDownLatch nodesStopped = new CountDownLatch(1); - clusterService.add(new ClusterStateListener() { - @Override - public void clusterChanged(ClusterChangedEvent event) { - statesFound.add(event.state()); - try { - // block until both nodes have stopped to accumulate node failures - nodesStopped.await(); - } catch (InterruptedException e) { - //meh - } + clusterService.add(event -> { + statesFound.add(event.state()); + try { + // block until both nodes have stopped to accumulate node failures + nodesStopped.await(); + } catch (InterruptedException e) { + //meh } }); @@ -189,10 +158,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase { } public void testNodeRejectsClusterStateWithWrongMasterNode() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "zen") - .build(); - List nodeNames = internalCluster().startNodesAsync(2, settings).get(); + List nodeNames = internalCluster().startNodesAsync(2).get(); client().admin().cluster().prepareHealth().setWaitForNodes("2").get(); List nonMasterNodes = new ArrayList<>(nodeNames); @@ -303,10 +269,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase { " }\n" + "}"; - Settings nodeSettings = Settings.builder() - .put("discovery.type", "zen") // <-- To override the local setting if set externally - .build(); - internalCluster().startNode(nodeSettings); + internalCluster().startNode(); logger.info("--> request node discovery stats"); NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear().setDiscovery(true).get(); diff --git a/core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java b/core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java deleted file mode 100644 index 91a18f9c053..00000000000 --- a/core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index; - -import org.elasticsearch.action.index.IndexAction; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingNodes; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.zen.fd.FaultDetection; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.TransportService; - -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -import static java.util.Collections.singleton; -import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; -import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; -import static org.hamcrest.Matchers.equalTo; - -/** - * Test failure when index replication actions fail mid-flight - */ -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) -public class TransportIndexFailuresIT extends ESIntegTestCase { - - private static final Settings nodeSettings = Settings.builder() - .put("discovery.type", "zen") // <-- To override the local setting if set externally - .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly - .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // <-- for hitting simulated network failures quickly - .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly - .put("discovery.zen.minimum_master_nodes", 1) - .build(); - - @Override - protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class); - } - - @Override - protected int numberOfShards() { - return 1; - } - - @Override - protected int numberOfReplicas() { - return 1; - } - - public void testNetworkPartitionDuringReplicaIndexOp() throws Exception { - final String INDEX = "testidx"; - - List nodes = internalCluster().startNodesAsync(2, nodeSettings).get(); - - // Create index test with 1 shard, 1 replica and ensure it is green - createIndex(INDEX); - ensureGreen(INDEX); - - // Disable allocation so the replica cannot be reallocated when it fails - Settings s = Settings.builder().put("cluster.routing.allocation.enable", "none").build(); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(s).get(); - - // Determine which node holds the primary shard - ClusterState state = getNodeClusterState(nodes.get(0)); - IndexShardRoutingTable shard = state.getRoutingTable().index(INDEX).shard(0); - String primaryNode; - String replicaNode; - if (shard.getShards().get(0).primary()) { - primaryNode = nodes.get(0); - replicaNode = nodes.get(1); - } else { - primaryNode = nodes.get(1); - replicaNode = nodes.get(0); - } - logger.info("--> primary shard is on {}", primaryNode); - - // Index a document to make sure everything works well - IndexResponse resp = internalCluster().client(primaryNode).prepareIndex(INDEX, "doc").setSource("foo", "bar").get(); - assertThat("document exists on primary node", - internalCluster().client(primaryNode).prepareGet(INDEX, "doc", resp.getId()).setPreference("_only_local").get().isExists(), - equalTo(true)); - assertThat("document exists on replica node", - internalCluster().client(replicaNode).prepareGet(INDEX, "doc", resp.getId()).setPreference("_only_local").get().isExists(), - equalTo(true)); - - // Disrupt the network so indexing requests fail to replicate - logger.info("--> preventing index/replica operations"); - TransportService mockTransportService = internalCluster().getInstance(TransportService.class, primaryNode); - ((MockTransportService) mockTransportService).addFailToSendNoConnectRule( - internalCluster().getInstance(TransportService.class, replicaNode), - singleton(IndexAction.NAME + "[r]") - ); - mockTransportService = internalCluster().getInstance(TransportService.class, replicaNode); - ((MockTransportService) mockTransportService).addFailToSendNoConnectRule( - internalCluster().getInstance(TransportService.class, primaryNode), - singleton(IndexAction.NAME + "[r]") - ); - - logger.info("--> indexing into primary"); - // the replica shard should now be marked as failed because the replication operation will fail - resp = internalCluster().client(primaryNode).prepareIndex(INDEX, "doc").setSource("foo", "baz").get(); - // wait until the cluster reaches an exact yellow state, meaning replica has failed - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(client().admin().cluster().prepareHealth().get().getStatus(), equalTo(ClusterHealthStatus.YELLOW)); - } - }); - assertThat("document should still be indexed and available", - client().prepareGet(INDEX, "doc", resp.getId()).get().isExists(), equalTo(true)); - - state = getNodeClusterState(randomFrom(nodes.toArray(Strings.EMPTY_ARRAY))); - RoutingNodes rn = state.getRoutingNodes(); - logger.info("--> counts: total: {}, unassigned: {}, initializing: {}, relocating: {}, started: {}", - rn.shards(input -> true).size(), - rn.shardsWithState(UNASSIGNED).size(), - rn.shardsWithState(INITIALIZING).size(), - rn.shardsWithState(RELOCATING).size(), - rn.shardsWithState(STARTED).size()); - logger.info("--> unassigned: {}, initializing: {}, relocating: {}, started: {}", - rn.shardsWithState(UNASSIGNED), - rn.shardsWithState(INITIALIZING), - rn.shardsWithState(RELOCATING), - rn.shardsWithState(STARTED)); - - assertThat("only a single shard is now active (replica should be failed and not reallocated)", - rn.shardsWithState(STARTED).size(), equalTo(1)); - } - - private ClusterState getNodeClusterState(String node) { - return internalCluster().client(node).admin().cluster().prepareState().setLocal(true).get().getState(); - } -} diff --git a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java index 5a257cc64fd..8582ca0e02f 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java @@ -42,7 +42,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.index.Index; @@ -77,9 +76,8 @@ import static org.hamcrest.Matchers.instanceOf; public class RareClusterStateIT extends ESIntegTestCase { @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); + protected boolean addMockZenPings() { + return false; } @Override @@ -173,9 +171,7 @@ public class RareClusterStateIT extends ESIntegTestCase { @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/14932") public void testDeleteCreateInOneBulk() throws Exception { - internalCluster().startNodesAsync(2, Settings.builder() - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen") - .build()).get(); + internalCluster().startNodesAsync(2).get(); assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut()); prepareCreate("test").setSettings(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, true).addMapping("type").get(); ensureGreen("test"); diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index fd57a1198e0..0e777cbd97a 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -44,7 +44,6 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.index.store.IndexStore; @@ -59,7 +58,6 @@ import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.action.admin.cluster.RestClusterStateAction; import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction; import org.elasticsearch.snapshots.mockstore.MockRepository; -import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.InternalTestCluster; @@ -93,13 +91,6 @@ import static org.hamcrest.Matchers.nullValue; @ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0) public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCase { - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - // TODO only restorePersistentSettingsTest needs this maybe factor out? - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); - } - @Override protected Collection> nodePlugins() { return Arrays.asList(MockRepository.Plugin.class); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 200ec6ac4b1..bf69f6016de 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -57,9 +57,9 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.IndexStore; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; @@ -693,7 +693,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertAcked(client.admin().cluster().preparePutRepository("test-repo") .setType("fs").setSettings(Settings.builder().put("location", repositoryLocation))); - prepareCreate("test-idx").setSettings(Settings.builder().put("index.allocation.max_retries", Integer.MAX_VALUE)).get(); + createIndex("test-idx"); ensureGreen(); logger.info("--> indexing some data"); diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index 89a32c11828..0bc4974f285 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -33,19 +33,15 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.MasterNotDiscoveredException; -import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.NodeConfigurationSource; import org.elasticsearch.transport.MockTcpTransportPlugin; -import org.elasticsearch.transport.Transport; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -54,7 +50,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.function.Consumer; @@ -66,6 +61,7 @@ import java.util.stream.StreamSupport; import static java.util.stream.Collectors.toSet; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -195,22 +191,12 @@ public class TribeIT extends ESIntegTestCase { settings.put(Node.NODE_MASTER_SETTING.getKey(), true); settings.put(NetworkModule.HTTP_ENABLED.getKey(), false); settings.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME); - settings.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local"); doWithAllClusters(filter, c -> { String tribeSetting = "tribe." + c.getClusterName() + "."; settings.put(tribeSetting + ClusterName.CLUSTER_NAME_SETTING.getKey(), c.getClusterName()); settings.put(tribeSetting + DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "100ms"); settings.put(tribeSetting + NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME); - settings.put(tribeSetting + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local"); - - Set hosts = new HashSet<>(); - for (Transport transport : c.getInstances(Transport.class)) { - TransportAddress address = transport.boundAddress().publishAddress(); - hosts.add(address.getAddress() + ":" + address.getPort()); - } - settings.putArray(tribeSetting + UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey(), - hosts.toArray(new String[hosts.size()])); }); return settings; @@ -497,7 +483,7 @@ public class TribeIT extends ESIntegTestCase { assertBusy(() -> { ClusterState state = client().admin().cluster().prepareState().setNodes(true).get().getState(); Set nodes = StreamSupport.stream(state.getNodes().spliterator(), false).map(DiscoveryNode::getName).collect(toSet()); - assertThat(nodes.containsAll(expectedNodes), is(true)); + assertThat(nodes, containsInAnyOrder(expectedNodes.toArray())); }); } diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTests.java index 014014b432c..d1050e80adc 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTests.java @@ -35,7 +35,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.repositories.azure.AzureRepository.Repository; @@ -75,14 +74,6 @@ public class AzureSnapshotRestoreTests extends AbstractAzureWithThirdPartyIntegT return testName.contains(" ") ? Strings.split(testName, " ")[0] : testName; } - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - // In snapshot tests, we explicitly disable cloud discovery - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") - .build(); - } - @Override public Settings indexSettings() { // During restore we frequently restore index to exactly the same state it was before, that might cause the same diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java index 7ca0aa57d7a..fd54c5fadbe 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java @@ -26,22 +26,23 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.node.MockNode; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeValidationException; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.transport.MockTcpTransportPlugin; import org.junit.AfterClass; import org.junit.BeforeClass; import java.io.IOException; import java.nio.file.Path; -import java.util.Collections; +import java.util.Arrays; +import java.util.List; import static org.hamcrest.CoreMatchers.either; import static org.hamcrest.CoreMatchers.equalTo; @@ -63,25 +64,25 @@ public class TribeUnitTests extends ESTestCase { Settings baseSettings = Settings.builder() .put(NetworkModule.HTTP_ENABLED.getKey(), false) .put("transport.type", MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2) .build(); + final List> mockPlugins = Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class); tribe1 = new TribeClientNode( Settings.builder() .put(baseSettings) .put("cluster.name", "tribe1") .put("node.name", "tribe1_node") .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong()) - .build(), Collections.singleton(MockTcpTransportPlugin.class)).start(); + .build(), mockPlugins).start(); tribe2 = new TribeClientNode( Settings.builder() .put(baseSettings) .put("cluster.name", "tribe2") .put("node.name", "tribe2_node") .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong()) - .build(), Collections.singleton(MockTcpTransportPlugin.class)).start(); + .build(), mockPlugins).start(); } @AfterClass @@ -106,11 +107,10 @@ public class TribeUnitTests extends ESTestCase { .put("transport.type", MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).put("discovery.type", "local") .put("tribe.t1.transport.type", MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) .put("tribe.t2.transport.type",MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) - .put("tribe.t1.discovery.type", "local").put("tribe.t2.discovery.type", "local") .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) .put(extraSettings).build(); - try (Node node = new MockNode(settings, Collections.singleton(MockTcpTransportPlugin.class)).start()) { + try (Node node = new MockNode(settings, Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class)).start()) { try (Client client = node.client()) { assertBusy(() -> { ClusterState state = client.admin().cluster().prepareState().clear().setNodes(true).get().getState(); diff --git a/test/framework/src/main/java/org/elasticsearch/node/NodeTests.java b/test/framework/src/main/java/org/elasticsearch/node/NodeTests.java index cf565499a8d..ea924734765 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/NodeTests.java +++ b/test/framework/src/main/java/org/elasticsearch/node/NodeTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.transport.MockTcpTransportPlugin; import java.io.IOException; import java.nio.file.Path; @@ -46,13 +47,12 @@ public class NodeTests extends ESTestCase { .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), InternalTestCluster.clusterName("single-node-cluster", randomLong())) .put(Environment.PATH_HOME_SETTING.getKey(), tempDir) .put(NetworkModule.HTTP_ENABLED.getKey(), false) - .put("discovery.type", "local") - .put("transport.type", "local") + .put("transport.type", "mock-socket-network") .put(Node.NODE_DATA_SETTING.getKey(), true); if (name != null) { settings.put(Node.NODE_NAME_SETTING.getKey(), name); } - try (Node node = new MockNode(settings.build(), Collections.emptyList())) { + try (Node node = new MockNode(settings.build(), Collections.singleton(MockTcpTransportPlugin.class))) { final Settings nodeSettings = randomBoolean() ? node.settings() : node.getEnvironment().settings(); if (name == null) { assertThat(Node.NODE_NAME_SETTING.get(nodeSettings), equalTo(node.getNodeEnvironment().nodeId().substring(0, 7))); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java index 16647b04a47..0ece6fad393 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java @@ -29,8 +29,6 @@ import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoveryModule; -import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.listeners.LoggingListener; @@ -206,6 +204,11 @@ public abstract class ESBackcompatTestCase extends ESIntegTestCase { return finalSettings.build(); } + @Override + protected boolean addMockZenPings() { + return false; + } + protected int minExternalNodes() { return 1; } protected int maxExternalNodes() { @@ -243,7 +246,6 @@ public abstract class ESBackcompatTestCase extends ESIntegTestCase { protected Settings commonNodeSettings(int nodeOrdinal) { Settings.Builder builder = Settings.builder().put(requiredSettings()); builder.put(NetworkModule.TRANSPORT_TYPE_KEY, randomBoolean() ? "netty3" : "netty4"); // run same transport / disco as external - builder.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen"); return builder.build(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 26059ba60ca..1f2f01cc352 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -28,15 +28,8 @@ import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.TestUtil; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; -import org.elasticsearch.discovery.DiscoveryModule; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.transport.AssertingTransportInterceptor; -import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; @@ -65,6 +58,7 @@ import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -74,6 +68,7 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -99,9 +94,10 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.ElectMasterService; +import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; @@ -119,12 +115,16 @@ import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.node.NodeMocksPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.MockSearchService; import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.client.RandomizingClient; +import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.AssertingTransportInterceptor; +import org.elasticsearch.transport.MockTcpTransportPlugin; import org.hamcrest.Matchers; import org.junit.After; import org.junit.AfterClass; @@ -1750,7 +1750,6 @@ public abstract class ESIntegTestCase extends ESTestCase { public Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(NetworkModule.HTTP_ENABLED.getKey(), false) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") .put(networkSettings.build()). put(ESIntegTestCase.this.nodeSettings(nodeOrdinal)).build(); } @@ -1787,6 +1786,10 @@ public abstract class ESIntegTestCase extends ESTestCase { return true; } + protected boolean addMockZenPings() { + return true; + } + /** * Returns a function that allows to wrap / filter all clients that are exposed by the test cluster. This is useful * for debugging or request / response pre and post processing. It also allows to intercept all calls done by the test @@ -1823,6 +1826,10 @@ public abstract class ESIntegTestCase extends ESTestCase { if (addMockTransportService()) { mocks.add(MockTcpTransportPlugin.class); } + + if (addMockZenPings()) { + mocks.add(MockZenPing.TestPlugin.class); + } mocks.add(TestSeedPlugin.class); return Collections.unmodifiableList(mocks); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index 2752e0f2eca..f096e662f4a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -45,6 +45,7 @@ import org.elasticsearch.node.NodeValidationException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.MockTcpTransportPlugin; import org.junit.After; @@ -181,7 +182,6 @@ public abstract class ESSingleNodeTestCase extends ESTestCase { .put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 1000) .put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created .put(NetworkModule.HTTP_ENABLED.getKey(), false) - .put("discovery.type", "local") .put("transport.type", MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) .put(Node.NODE_DATA_SETTING.getKey(), true) .put(nodeSettings()) // allow test cases to provide their own settings or override these @@ -191,6 +191,10 @@ public abstract class ESSingleNodeTestCase extends ESTestCase { plugins = new ArrayList<>(plugins); plugins.add(MockTcpTransportPlugin.class); } + if (plugins.contains(MockZenPing.TestPlugin.class) == false) { + plugins = new ArrayList<>(plugins); + plugins.add(MockZenPing.TestPlugin.class); + } Node build = new MockNode(settings, plugins); try { build.start(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java index b393498ec89..3fd2b024a1d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.NodeConfigurationSource; @@ -39,7 +38,7 @@ import java.util.Set; public class ClusterDiscoveryConfiguration extends NodeConfigurationSource { - static Settings DEFAULT_NODE_SETTINGS = Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); + static Settings DEFAULT_NODE_SETTINGS = Settings.EMPTY; private static final String IP_ADDR = "127.0.0.1"; final int numOfNodes; diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java new file mode 100644 index 00000000000..d1e38e061bf --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java @@ -0,0 +1,107 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.discovery; + +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.discovery.DiscoveryModule; +import org.elasticsearch.discovery.zen.ping.PingContextProvider; +import org.elasticsearch.discovery.zen.ping.ZenPing; +import org.elasticsearch.plugins.DiscoveryPlugin; +import org.elasticsearch.plugins.Plugin; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A {@link ZenPing} implementation which returns results based on an static in-memory map. This allows pinging + * to be immediate and can be used to speed up tests. + */ +public final class MockZenPing extends AbstractLifecycleComponent implements ZenPing { + + static final Map> activeNodesPerCluster = ConcurrentCollections.newConcurrentMap(); + + private volatile PingContextProvider contextProvider; + + @Inject + public MockZenPing(Settings settings) { + super(settings); + } + + @Override + public void setPingContextProvider(PingContextProvider contextProvider) { + this.contextProvider = contextProvider; + } + + @Override + public void ping(PingListener listener, TimeValue timeout) { + logger.info("pinging using mock zen ping"); + List responseList = getActiveNodesForCurrentCluster().stream() + .filter(p -> p != this) // remove this as pings are not expected to return the local node + .map(MockZenPing::getPingResponse) + .collect(Collectors.toList()); + listener.onPing(responseList); + } + + private ClusterName getClusterName() { + return contextProvider.clusterState().getClusterName(); + } + + private PingResponse getPingResponse() { + final ClusterState clusterState = contextProvider.clusterState(); + return new PingResponse(clusterState.nodes().getLocalNode(), clusterState.nodes().getMasterNode(), clusterState); + } + + @Override + protected void doStart() { + assert contextProvider != null; + boolean added = getActiveNodesForCurrentCluster().add(this); + assert added; + } + + private Set getActiveNodesForCurrentCluster() { + return activeNodesPerCluster.computeIfAbsent(getClusterName(), + clusterName -> ConcurrentCollections.newConcurrentSet()); + } + + @Override + protected void doStop() { + boolean found = getActiveNodesForCurrentCluster().remove(this); + assert found; + } + + @Override + protected void doClose() { + + } + + public static class TestPlugin extends Plugin implements DiscoveryPlugin { + + public void onModule(DiscoveryModule discoveryModule) { + discoveryModule.addZenPing(MockZenPing.class); + } + } +} diff --git a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java index e5b704b2f2c..7c001f910d7 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java @@ -27,13 +27,13 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.NodeConfigurationSource; -import org.elasticsearch.transport.MockTcpTransport; +import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.transport.TransportSettings; @@ -141,7 +141,6 @@ public class InternalTestClusterTests extends ESTestCase { NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2 * ((masterNodes ? InternalTestCluster.DEFAULT_HIGH_NUM_MASTER_NODES : 0) + maxNumDataNodes + numClientNodes)) .put(NetworkModule.HTTP_ENABLED.getKey(), false) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build(); } @@ -156,12 +155,13 @@ public class InternalTestClusterTests extends ESTestCase { String nodePrefix = "foobar"; Path baseDir = createTempDir(); + final List> mockPlugins = Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class); InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, masterNodes, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes, - enableHttpPipelining, nodePrefix, Collections.singleton(MockTcpTransportPlugin.class), Function.identity()); + enableHttpPipelining, nodePrefix, mockPlugins, Function.identity()); InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, baseDir, masterNodes, minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes, - enableHttpPipelining, nodePrefix, Collections.singleton(MockTcpTransportPlugin.class), Function.identity()); + enableHttpPipelining, nodePrefix, mockPlugins, Function.identity()); assertClusters(cluster0, cluster1, false); long seed = randomLong(); @@ -205,7 +205,6 @@ public class InternalTestClusterTests extends ESTestCase { NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2 + (masterNodes ? InternalTestCluster.DEFAULT_HIGH_NUM_MASTER_NODES : 0) + maxNumDataNodes + numClientNodes) .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") .build(); } @Override @@ -219,7 +218,8 @@ public class InternalTestClusterTests extends ESTestCase { Path baseDir = createTempDir(); InternalTestCluster cluster = new InternalTestCluster(clusterSeed, baseDir, masterNodes, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes, - enableHttpPipelining, nodePrefix, Collections.singleton(MockTcpTransportPlugin.class), Function.identity()); + enableHttpPipelining, nodePrefix, Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class), + Function.identity()); try { cluster.beforeTest(random(), 0.0); final Map shardNodePaths = new HashMap<>(); @@ -288,7 +288,6 @@ public class InternalTestClusterTests extends ESTestCase { .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), numNodes) .put(NetworkModule.HTTP_ENABLED.getKey(), false) .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") .put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0).build(); } @@ -297,7 +296,7 @@ public class InternalTestClusterTests extends ESTestCase { return Settings.builder() .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build(); } - }, 0, randomBoolean(), "", Collections.singleton(MockTcpTransportPlugin.class), Function.identity()); + }, 0, randomBoolean(), "", Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class), Function.identity()); cluster.beforeTest(random(), 0.0); try { Map> pathsPerRole = new HashMap<>();