From 53cff0f00ff767d028d654ec75c12bc8ff16206b Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Thu, 20 Oct 2016 00:44:48 -0700 Subject: [PATCH] Move all zen discovery classes into o.e.discovery.zen (#21032) * Move all zen discovery classes into o.e.discovery.zen This collapses sub packages of zen into zen. These all had just a couple classes each, and there is really no reason to have the subpackages. * fix checkstyle --- .../resources/checkstyle_suppressions.xml | 7 -- .../elasticsearch/cluster/ClusterState.java | 2 +- .../cluster/NodeConnectionsService.java | 6 +- .../common/settings/ClusterSettings.java | 4 +- .../discovery/DiscoveryModule.java | 9 +- .../discovery/DiscoveryStats.java | 2 +- .../zen/{fd => }/FaultDetection.java | 5 +- .../zen/{fd => }/MasterFaultDetection.java | 2 +- .../{membership => }/MembershipAction.java | 28 ++++--- .../discovery/zen/NodeJoinController.java | 1 - .../zen/{fd => }/NodesFaultDetection.java | 2 +- .../PendingClusterStateStats.java | 2 +- .../PendingClusterStatesQueue.java | 53 ++++++------ .../zen/{ping => }/PingContextProvider.java | 2 +- .../PublishClusterStateAction.java | 74 +++++++++++------ .../unicast => }/UnicastHostsProvider.java | 2 +- .../{ping/unicast => }/UnicastZenPing.java | 7 +- .../discovery/zen/ZenDiscovery.java | 12 +-- .../discovery/zen/{ping => }/ZenPing.java | 7 +- .../zen/{ping => }/ZenPingService.java | 2 +- .../cluster/node/stats/NodeStatsTests.java | 2 +- .../master/IndexingMasterFailoverIT.java | 2 +- .../DiscoveryWithServiceDisruptionsIT.java | 12 +-- .../discovery/ZenFaultDetectionTests.java | 6 +- .../zen/NodeJoinControllerTests.java | 1 - .../PendingClusterStatesQueueTests.java | 6 +- .../PublishClusterStateActionTests.java | 82 ++++++++++++------- .../unicast => }/UnicastZenPingTests.java | 7 +- .../discovery/zen/ZenDiscoveryIT.java | 3 - .../discovery/zen/ZenDiscoveryUnitTests.java | 4 +- .../discovery/zen/ZenPingTests.java | 1 - .../classic/AzureUnicastHostsProvider.java | 2 +- .../ec2/AwsEc2UnicastHostsProvider.java | 2 +- .../file/FileBasedUnicastHostsProvider.java | 4 +- .../gce/GceUnicastHostsProvider.java | 2 +- .../test/discovery/MockZenPing.java | 4 +- 36 files changed, 205 insertions(+), 164 deletions(-) rename core/src/main/java/org/elasticsearch/discovery/zen/{fd => }/FaultDetection.java (95%) rename core/src/main/java/org/elasticsearch/discovery/zen/{fd => }/MasterFaultDetection.java (99%) rename core/src/main/java/org/elasticsearch/discovery/zen/{membership => }/MembershipAction.java (87%) rename core/src/main/java/org/elasticsearch/discovery/zen/{fd => }/NodesFaultDetection.java (99%) rename core/src/main/java/org/elasticsearch/discovery/zen/{publish => }/PendingClusterStateStats.java (98%) rename core/src/main/java/org/elasticsearch/discovery/zen/{publish => }/PendingClusterStatesQueue.java (88%) rename core/src/main/java/org/elasticsearch/discovery/zen/{ping => }/PingContextProvider.java (95%) rename core/src/main/java/org/elasticsearch/discovery/zen/{publish => }/PublishClusterStateAction.java (91%) rename core/src/main/java/org/elasticsearch/discovery/zen/{ping/unicast => }/UnicastHostsProvider.java (95%) rename core/src/main/java/org/elasticsearch/discovery/zen/{ping/unicast => }/UnicastZenPing.java (98%) rename core/src/main/java/org/elasticsearch/discovery/zen/{ping => }/ZenPing.java (96%) rename core/src/main/java/org/elasticsearch/discovery/zen/{ping => }/ZenPingService.java (98%) rename core/src/test/java/org/elasticsearch/discovery/zen/{publish => }/PendingClusterStatesQueueTests.java (98%) rename core/src/test/java/org/elasticsearch/discovery/zen/{publish => }/PublishClusterStateActionTests.java (93%) rename core/src/test/java/org/elasticsearch/discovery/zen/{ping/unicast => }/UnicastZenPingTests.java (98%) diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 8374e14a7f1..6acc73fc91a 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -319,12 +319,6 @@ - - - - - - @@ -721,7 +715,6 @@ - diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java index 7fc0a4d09b0..3bbd56d9f49 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -52,7 +52,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; +import org.elasticsearch.discovery.zen.PublishClusterStateAction; import java.io.IOException; import java.util.EnumSet; diff --git a/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index 99f161b9da5..6a9d0ae160f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -31,6 +31,8 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.KeyedLock; +import org.elasticsearch.discovery.zen.MasterFaultDetection; +import org.elasticsearch.discovery.zen.NodesFaultDetection; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -45,8 +47,8 @@ import static org.elasticsearch.common.settings.Setting.positiveTimeSetting; * This component is responsible for connecting to nodes once they are added to the cluster state, and disconnect when they are * removed. Also, it periodically checks that all connections are still open and if needed restores them. * Note that this component is *not* responsible for removing nodes from the cluster if they disconnect / do not respond - * to pings. This is done by {@link org.elasticsearch.discovery.zen.fd.NodesFaultDetection}. Master fault detection - * is done by {@link org.elasticsearch.discovery.zen.fd.MasterFaultDetection}. + * to pings. This is done by {@link NodesFaultDetection}. Master fault detection + * is done by {@link MasterFaultDetection}. */ public class NodeConnectionsService extends AbstractLifecycleComponent { diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 522ba96e844..bffa25753c3 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -56,8 +56,8 @@ import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.fd.FaultDetection; -import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; +import org.elasticsearch.discovery.zen.FaultDetection; +import org.elasticsearch.discovery.zen.UnicastZenPing; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.GatewayService; diff --git a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index 28a17ef22b7..a293a291407 100644 --- a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -25,12 +25,11 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.ExtensionPoint; -import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.ping.ZenPing; -import org.elasticsearch.discovery.zen.ping.ZenPingService; -import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; -import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; +import org.elasticsearch.discovery.zen.ZenPing; +import org.elasticsearch.discovery.zen.ZenPingService; +import org.elasticsearch.discovery.zen.UnicastHostsProvider; +import org.elasticsearch.discovery.zen.UnicastZenPing; import java.util.ArrayList; import java.util.Collections; diff --git a/core/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java b/core/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java index fc419ff06a6..9542b14e569 100644 --- a/core/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java +++ b/core/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java @@ -25,7 +25,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.discovery.zen.publish.PendingClusterStateStats; +import org.elasticsearch.discovery.zen.PendingClusterStateStats; import java.io.IOException; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/fd/FaultDetection.java b/core/src/main/java/org/elasticsearch/discovery/zen/FaultDetection.java similarity index 95% rename from core/src/main/java/org/elasticsearch/discovery/zen/fd/FaultDetection.java rename to core/src/main/java/org/elasticsearch/discovery/zen/FaultDetection.java index 1cfd46634a5..f1f8b28ad09 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/fd/FaultDetection.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/FaultDetection.java @@ -16,7 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.discovery.zen.fd; + +package org.elasticsearch.discovery.zen; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -32,7 +33,7 @@ import org.elasticsearch.transport.TransportService; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; /** - * A base class for {@link org.elasticsearch.discovery.zen.fd.MasterFaultDetection} & {@link org.elasticsearch.discovery.zen.fd.NodesFaultDetection}, + * A base class for {@link MasterFaultDetection} & {@link NodesFaultDetection}, * making sure both use the same setting. */ public abstract class FaultDetection extends AbstractComponent { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/core/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java similarity index 99% rename from core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java rename to core/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java index 04aee9db3d8..b7acfb685de 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.discovery.zen.fd; +package org.elasticsearch.discovery.zen; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java similarity index 87% rename from core/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java rename to core/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java index f0140c660cd..7ff8f935927 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.discovery.zen.membership; +package org.elasticsearch.discovery.zen; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -62,36 +62,42 @@ public class MembershipAction extends AbstractComponent { private final MembershipListener listener; - public MembershipAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, MembershipListener listener) { + public MembershipAction(Settings settings, TransportService transportService, + DiscoveryNodesProvider nodesProvider, MembershipListener listener) { super(settings); this.transportService = transportService; this.nodesProvider = nodesProvider; this.listener = listener; - transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new, ThreadPool.Names.GENERIC, new JoinRequestRequestHandler()); - transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME, ValidateJoinRequest::new, ThreadPool.Names.GENERIC, new ValidateJoinRequestRequestHandler()); - transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new, ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler()); + transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new, + ThreadPool.Names.GENERIC, new JoinRequestRequestHandler()); + transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME, ValidateJoinRequest::new, + ThreadPool.Names.GENERIC, new ValidateJoinRequestRequestHandler()); + transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new, + ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler()); } public void sendLeaveRequest(DiscoveryNode masterNode, DiscoveryNode node) { - transportService.sendRequest(node, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(masterNode), EmptyTransportResponseHandler.INSTANCE_SAME); + transportService.sendRequest(node, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(masterNode), + EmptyTransportResponseHandler.INSTANCE_SAME); } public void sendLeaveRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) { - transportService.submitRequest(masterNode, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(node), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS); + transportService.submitRequest(masterNode, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(node), + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS); } public void sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) { - transportService.submitRequest(masterNode, DISCOVERY_JOIN_ACTION_NAME, new JoinRequest(node), EmptyTransportResponseHandler.INSTANCE_SAME) - .txGet(timeout.millis(), TimeUnit.MILLISECONDS); + transportService.submitRequest(masterNode, DISCOVERY_JOIN_ACTION_NAME, new JoinRequest(node), + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS); } /** * Validates the join request, throwing a failure if it failed. */ public void sendValidateJoinRequestBlocking(DiscoveryNode node, ClusterState state, TimeValue timeout) { - transportService.submitRequest(node, DISCOVERY_JOIN_VALIDATE_ACTION_NAME, new ValidateJoinRequest(state), EmptyTransportResponseHandler.INSTANCE_SAME) - .txGet(timeout.millis(), TimeUnit.MILLISECONDS); + transportService.submitRequest(node, DISCOVERY_JOIN_VALIDATE_ACTION_NAME, new ValidateJoinRequest(state), + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS); } public static class JoinRequest extends TransportRequest { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java index a4a51d6ab31..3c8deee7c5f 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java @@ -40,7 +40,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.zen.membership.MembershipAction; import java.util.ArrayList; import java.util.Collections; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/core/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java similarity index 99% rename from core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java rename to core/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java index 6361d3cde39..5cd02a52504 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.discovery.zen.fd; +package org.elasticsearch.discovery.zen; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStateStats.java b/core/src/main/java/org/elasticsearch/discovery/zen/PendingClusterStateStats.java similarity index 98% rename from core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStateStats.java rename to core/src/main/java/org/elasticsearch/discovery/zen/PendingClusterStateStats.java index e060f688338..8facf2f282c 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStateStats.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/PendingClusterStateStats.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.discovery.zen.publish; +package org.elasticsearch.discovery.zen; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java b/core/src/main/java/org/elasticsearch/discovery/zen/PendingClusterStatesQueue.java similarity index 88% rename from core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java rename to core/src/main/java/org/elasticsearch/discovery/zen/PendingClusterStatesQueue.java index 01fb96b7133..018258066de 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/PendingClusterStatesQueue.java @@ -16,7 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.discovery.zen.publish; + +package org.elasticsearch.discovery.zen; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; @@ -34,10 +35,12 @@ import java.util.Objects; *

* The queue is bound by {@link #maxQueueSize}. When the queue is at capacity and a new cluster state is inserted * the oldest cluster state will be dropped. This is safe because: - * 1) Under normal operations, master will publish & commit a cluster state before processing another change (i.e., the queue length is 1) + * 1) Under normal operations, master will publish & commit a cluster state before processing + * another change (i.e., the queue length is 1) * 2) If the master fails to commit a change, it will step down, causing a master election, which will flush the queue. * 3) In general it's safe to process the incoming cluster state as a replacement to the cluster state that's dropped. - * a) If the dropped cluster is from the same master as the incoming one is, it is likely to be superseded by the incoming state (or another state in the queue). + * a) If the dropped cluster is from the same master as the incoming one is, it is likely to be superseded by the + * incoming state (or another state in the queue). * This is only not true in very extreme cases of out of order delivery. * b) If the dropping cluster state is not from the same master, it means that: * i) we are no longer following the master of the dropped cluster state but follow the incoming one @@ -70,7 +73,8 @@ public class PendingClusterStatesQueue { ClusterStateContext context = pendingStates.remove(0); logger.warn("dropping pending state [{}]. more than [{}] pending states.", context, maxQueueSize); if (context.committed()) { - context.listener.onNewClusterStateFailed(new ElasticsearchException("too many pending states ([{}] pending)", maxQueueSize)); + context.listener.onNewClusterStateFailed(new ElasticsearchException("too many pending states ([{}] pending)", + maxQueueSize)); } } } @@ -82,11 +86,13 @@ public class PendingClusterStatesQueue { public synchronized ClusterState markAsCommitted(String stateUUID, StateProcessedListener listener) { final ClusterStateContext context = findState(stateUUID); if (context == null) { - listener.onNewClusterStateFailed(new IllegalStateException("can't resolve cluster state with uuid [" + stateUUID + "] to commit")); + listener.onNewClusterStateFailed(new IllegalStateException("can't resolve cluster state with uuid" + + " [" + stateUUID + "] to commit")); return null; } if (context.committed()) { - listener.onNewClusterStateFailed(new IllegalStateException("cluster state with uuid [" + stateUUID + "] is already committed")); + listener.onNewClusterStateFailed(new IllegalStateException("cluster state with uuid" + + " [" + stateUUID + "] is already committed")); return null; } context.markAsCommitted(listener); @@ -94,13 +100,14 @@ public class PendingClusterStatesQueue { } /** - * mark that the processing of the given state has failed. All committed states that are {@link ClusterState#supersedes(ClusterState)}-ed - * by this failed state, will be failed as well + * mark that the processing of the given state has failed. All committed states that are + * {@link ClusterState#supersedes(ClusterState)}-ed by this failed state, will be failed as well */ public synchronized void markAsFailed(ClusterState state, Exception reason) { final ClusterStateContext failedContext = findState(state.stateUUID()); if (failedContext == null) { - throw new IllegalArgumentException("can't resolve failed cluster state with uuid [" + state.stateUUID() + "], version [" + state.version() + "]"); + throw new IllegalArgumentException("can't resolve failed cluster state with uuid [" + state.stateUUID() + + "], version [" + state.version() + "]"); } if (failedContext.committed() == false) { throw new IllegalArgumentException("failed cluster state is not committed " + state); @@ -128,15 +135,16 @@ public class PendingClusterStatesQueue { } /** - * indicates that a cluster state was successfully processed. Any committed state that is {@link ClusterState#supersedes(ClusterState)}-ed - * by the processed state will be marked as processed as well. + * indicates that a cluster state was successfully processed. Any committed state that is + * {@link ClusterState#supersedes(ClusterState)}-ed by the processed state will be marked as processed as well. *

- * NOTE: successfully processing a state indicates we are following the master it came from. Any committed state from another master will - * be failed by this method + * NOTE: successfully processing a state indicates we are following the master it came from. Any committed state + * from another master will be failed by this method */ public synchronized void markAsProcessed(ClusterState state) { if (findState(state.stateUUID()) == null) { - throw new IllegalStateException("can't resolve processed cluster state with uuid [" + state.stateUUID() + "], version [" + state.version() + "]"); + throw new IllegalStateException("can't resolve processed cluster state with uuid [" + state.stateUUID() + + "], version [" + state.version() + "]"); } final DiscoveryNode currentMaster = state.nodes().getMasterNode(); assert currentMaster != null : "processed cluster state mast have a master. " + state; @@ -152,17 +160,16 @@ public class PendingClusterStatesQueue { contextsToRemove.add(pendingContext); if (pendingContext.committed()) { // this is a committed state , warn - logger.warn("received a cluster state (uuid[{}]/v[{}]) from a different master than the current one, rejecting (received {}, current {})", - pendingState.stateUUID(), pendingState.version(), - pendingMasterNode, currentMaster); + logger.warn("received a cluster state (uuid[{}]/v[{}]) from a different master than the current one," + + " rejecting (received {}, current {})", + pendingState.stateUUID(), pendingState.version(), pendingMasterNode, currentMaster); pendingContext.listener.onNewClusterStateFailed( - new IllegalStateException("cluster state from a different master than the current one, rejecting (received " + pendingMasterNode + ", current " + currentMaster + ")") - ); + new IllegalStateException("cluster state from a different master than the current one," + + " rejecting (received " + pendingMasterNode + ", current " + currentMaster + ")")); } else { - logger.trace("removing non-committed state with uuid[{}]/v[{}] from [{}] - a state from [{}] was successfully processed", - pendingState.stateUUID(), pendingState.version(), pendingMasterNode, - currentMaster - ); + logger.trace("removing non-committed state with uuid[{}]/v[{}] from [{}] - a state from" + + " [{}] was successfully processed", + pendingState.stateUUID(), pendingState.version(), pendingMasterNode, currentMaster); } } else if (pendingState.stateUUID().equals(state.stateUUID())) { assert pendingContext.committed() : "processed cluster state is not committed " + state; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/PingContextProvider.java b/core/src/main/java/org/elasticsearch/discovery/zen/PingContextProvider.java similarity index 95% rename from core/src/main/java/org/elasticsearch/discovery/zen/ping/PingContextProvider.java rename to core/src/main/java/org/elasticsearch/discovery/zen/PingContextProvider.java index 43f4618472b..b705c918392 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/PingContextProvider.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/PingContextProvider.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.discovery.zen.ping; +package org.elasticsearch.discovery.zen; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java similarity index 91% rename from core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java rename to core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java index 2a173c755fe..f92c496c088 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.discovery.zen.publish; +package org.elasticsearch.discovery.zen; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; @@ -42,7 +42,6 @@ import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler; import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BytesTransportRequest; import org.elasticsearch.transport.EmptyTransportResponseHandler; @@ -114,10 +113,12 @@ public class PublishClusterStateAction extends AbstractComponent { * publishes a cluster change event to other nodes. if at least minMasterNodes acknowledge the change it is committed and will * be processed by the master and the other nodes. *

- * The method is guaranteed to throw a {@link org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException} if the change is not committed and should be rejected. + * The method is guaranteed to throw a {@link org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException} + * if the change is not committed and should be rejected. * Any other exception signals the something wrong happened but the change is committed. */ - public void publish(final ClusterChangedEvent clusterChangedEvent, final int minMasterNodes, final Discovery.AckListener ackListener) throws Discovery.FailedToCommitClusterStateException { + public void publish(final ClusterChangedEvent clusterChangedEvent, final int minMasterNodes, + final Discovery.AckListener ackListener) throws Discovery.FailedToCommitClusterStateException { final DiscoveryNodes nodes; final SendingController sendingController; final Set nodesToPublishTo; @@ -145,8 +146,10 @@ public class PublishClusterStateAction extends AbstractComponent { buildDiffAndSerializeStates(clusterChangedEvent.state(), clusterChangedEvent.previousState(), nodesToPublishTo, sendFullVersion, serializedStates, serializedDiffs); - final BlockingClusterStatePublishResponseHandler publishResponseHandler = new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener); - sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes, totalMasterNodes, publishResponseHandler); + final BlockingClusterStatePublishResponseHandler publishResponseHandler = + new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener); + sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes, + totalMasterNodes, publishResponseHandler); } catch (Exception e) { throw new Discovery.FailedToCommitClusterStateException("unexpected error while preparing to publish", e); } @@ -197,7 +200,8 @@ public class PublishClusterStateAction extends AbstractComponent { DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes(); // everyone may have just responded if (pendingNodes.length > 0) { - logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})", clusterState.version(), publishTimeout, pendingNodes); + logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})", + clusterState.version(), publishTimeout, pendingNodes); } } } catch (InterruptedException e) { @@ -207,7 +211,8 @@ public class PublishClusterStateAction extends AbstractComponent { } private void buildDiffAndSerializeStates(ClusterState clusterState, ClusterState previousState, Set nodesToPublishTo, - boolean sendFullVersion, Map serializedStates, Map serializedDiffs) { + boolean sendFullVersion, Map serializedStates, + Map serializedDiffs) { Diff diff = null; for (final DiscoveryNode node : nodesToPublishTo) { try { @@ -240,7 +245,8 @@ public class PublishClusterStateAction extends AbstractComponent { serializedStates.put(node.getVersion(), bytes); } catch (Exception e) { logger.warn( - (org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage("failed to serialize cluster_state before publishing it to node {}", node), e); + (org.apache.logging.log4j.util.Supplier) () -> + new ParameterizedMessage("failed to serialize cluster_state before publishing it to node {}", node), e); sendingController.onNodeSendFailed(node, e); return; } @@ -266,7 +272,8 @@ public class PublishClusterStateAction extends AbstractComponent { // -> no need to put a timeout on the options here, because we want the response to eventually be received // and not log an error if it arrives after the timeout // -> no need to compress, we already compressed the bytes - TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withCompress(false).build(); + TransportRequestOptions options = TransportRequestOptions.builder() + .withType(TransportRequestOptions.Type.STATE).withCompress(false).build(); transportService.sendRequest(node, SEND_ACTION_NAME, new BytesTransportRequest(bytes, node.getVersion()), options, @@ -275,7 +282,8 @@ public class PublishClusterStateAction extends AbstractComponent { @Override public void handleResponse(TransportResponse.Empty response) { if (sendingController.getPublishingTimedOut()) { - logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node, clusterState.version(), publishTimeout); + logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node, + clusterState.version(), publishTimeout); } sendingController.onNodeSendAck(node); } @@ -286,21 +294,24 @@ public class PublishClusterStateAction extends AbstractComponent { logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage()); sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController); } else { - logger.debug((org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage("failed to send cluster state to {}", node), exp); + logger.debug((org.apache.logging.log4j.util.Supplier) () -> + new ParameterizedMessage("failed to send cluster state to {}", node), exp); sendingController.onNodeSendFailed(node, exp); } } }); } catch (Exception e) { logger.warn( - (org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage("error sending cluster state to {}", node), e); + (org.apache.logging.log4j.util.Supplier) () -> + new ParameterizedMessage("error sending cluster state to {}", node), e); sendingController.onNodeSendFailed(node, e); } } private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) { try { - logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]", clusterState.stateUUID(), clusterState.version(), node); + logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]", + clusterState.stateUUID(), clusterState.version(), node); TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build(); // no need to put a timeout on the options here, because we want the response to eventually be received // and not log an error if it arrives after the timeout @@ -319,12 +330,16 @@ public class PublishClusterStateAction extends AbstractComponent { @Override public void handleException(TransportException exp) { - logger.debug((org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage("failed to commit cluster state (uuid [{}], version [{}]) to {}", clusterState.stateUUID(), clusterState.version(), node), exp); + logger.debug((org.apache.logging.log4j.util.Supplier) () -> + new ParameterizedMessage("failed to commit cluster state (uuid [{}], version [{}]) to {}", + clusterState.stateUUID(), clusterState.version(), node), exp); sendingController.getPublishResponseHandler().onFailure(node, exp); } }); } catch (Exception t) { - logger.warn((org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage("error sending cluster state commit (uuid [{}], version [{}]) to {}", clusterState.stateUUID(), clusterState.version(), node), t); + logger.warn((org.apache.logging.log4j.util.Supplier) () -> + new ParameterizedMessage("error sending cluster state commit (uuid [{}], version [{}]) to {}", + clusterState.stateUUID(), clusterState.version(), node), t); sendingController.getPublishResponseHandler().onFailure(node, t); } } @@ -371,7 +386,8 @@ public class PublishClusterStateAction extends AbstractComponent { } else if (lastSeenClusterState != null) { Diff diff = lastSeenClusterState.readDiffFrom(in); incomingState = diff.apply(lastSeenClusterState); - logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", incomingState.version(), incomingState.stateUUID(), request.bytes().length()); + logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", + incomingState.version(), incomingState.stateUUID(), request.bytes().length()); } else { logger.debug("received diff for but don't have any local cluster state - requesting full state"); throw new IncompatibleClusterStateVersionException("have no local cluster state"); @@ -394,13 +410,15 @@ public class PublishClusterStateAction extends AbstractComponent { void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClusterState) { final ClusterName incomingClusterName = incomingState.getClusterName(); if (!incomingClusterName.equals(this.clusterName)) { - logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().getMasterNode(), incomingClusterName); + logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", + incomingState.nodes().getMasterNode(), incomingClusterName); throw new IllegalStateException("received state from a node that is not part of the cluster"); } final ClusterState clusterState = clusterStateSupplier.get(); if (clusterState.nodes().getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) { - logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().getMasterNode()); + logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", + incomingState.nodes().getMasterNode()); throw new IllegalStateException("received state with a local node that does not match the current local node"); } @@ -419,7 +437,8 @@ public class PublishClusterStateAction extends AbstractComponent { } protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) { - final ClusterState state = pendingStatesQueue.markAsCommitted(request.stateUUID, new PendingClusterStatesQueue.StateProcessedListener() { + final ClusterState state = pendingStatesQueue.markAsCommitted(request.stateUUID, + new PendingClusterStatesQueue.StateProcessedListener() { @Override public void onNewClusterStateProcessed() { try { @@ -442,7 +461,8 @@ public class PublishClusterStateAction extends AbstractComponent { } }); if (state != null) { - newPendingClusterStatelistener.onNewClusterState("master " + state.nodes().getMasterNode() + " committed version [" + state.version() + "]"); + newPendingClusterStatelistener.onNewClusterState("master " + state.nodes().getMasterNode() + + " committed version [" + state.version() + "]"); } } @@ -512,13 +532,15 @@ public class PublishClusterStateAction extends AbstractComponent { // an external marker to note that the publishing process is timed out. This is useful for proper logging. final AtomicBoolean publishingTimedOut = new AtomicBoolean(); - private SendingController(ClusterState clusterState, int minMasterNodes, int totalMasterNodes, BlockingClusterStatePublishResponseHandler publishResponseHandler) { + private SendingController(ClusterState clusterState, int minMasterNodes, int totalMasterNodes, + BlockingClusterStatePublishResponseHandler publishResponseHandler) { this.clusterState = clusterState; this.publishResponseHandler = publishResponseHandler; this.neededMastersToCommit = Math.max(0, minMasterNodes - 1); // we are one of the master nodes this.pendingMasterNodes = totalMasterNodes - 1; if (this.neededMastersToCommit > this.pendingMasterNodes) { - throw new Discovery.FailedToCommitClusterStateException("not enough masters to ack sent cluster state. [{}] needed , have [{}]", neededMastersToCommit, pendingMasterNodes); + throw new Discovery.FailedToCommitClusterStateException("not enough masters to ack sent cluster state." + + "[{}] needed , have [{}]", neededMastersToCommit, pendingMasterNodes); } this.committed = neededMastersToCommit == 0; this.committedOrFailedLatch = new CountDownLatch(committed ? 0 : 1); @@ -592,7 +614,8 @@ public class PublishClusterStateAction extends AbstractComponent { public synchronized void onNodeSendFailed(DiscoveryNode node, Exception e) { if (node.isMasterNode()) { - logger.trace("master node {} failed to ack cluster state version [{}]. processing ... (current pending [{}], needed [{}])", + logger.trace("master node {} failed to ack cluster state version [{}]. " + + "processing ... (current pending [{}], needed [{}])", node, clusterState.version(), pendingMasterNodes, neededMastersToCommit); decrementPendingMasterAcksAndChangeForFailure(); } @@ -623,7 +646,8 @@ public class PublishClusterStateAction extends AbstractComponent { if (committedOrFailed()) { return committed == false; } - logger.trace((org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage("failed to commit version [{}]. {}", clusterState.version(), details), reason); + logger.trace((org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage("failed to commit version [{}]. {}", + clusterState.version(), details), reason); committed = false; committedOrFailedLatch.countDown(); return true; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastHostsProvider.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastHostsProvider.java similarity index 95% rename from core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastHostsProvider.java rename to core/src/main/java/org/elasticsearch/discovery/zen/UnicastHostsProvider.java index dbfaed572b1..9ff3215cd64 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastHostsProvider.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastHostsProvider.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.discovery.zen.ping.unicast; +package org.elasticsearch.discovery.zen; import org.elasticsearch.cluster.node.DiscoveryNode; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java similarity index 98% rename from core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java rename to core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index bc466adfc7f..60b1b0b75d1 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.discovery.zen.ping.unicast; +package org.elasticsearch.discovery.zen; import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -44,9 +44,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.discovery.zen.ElectMasterService; -import org.elasticsearch.discovery.zen.ping.PingContextProvider; -import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.RemoteTransportException; @@ -84,7 +81,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; -import static org.elasticsearch.discovery.zen.ping.ZenPing.PingResponse.readPingResponse; +import static org.elasticsearch.discovery.zen.ZenPing.PingResponse.readPingResponse; public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPing { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index b295f07c80a..ba76c9ab7d2 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -54,14 +54,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.DiscoveryStats; -import org.elasticsearch.discovery.zen.fd.MasterFaultDetection; -import org.elasticsearch.discovery.zen.fd.NodesFaultDetection; -import org.elasticsearch.discovery.zen.membership.MembershipAction; -import org.elasticsearch.discovery.zen.ping.PingContextProvider; -import org.elasticsearch.discovery.zen.ping.ZenPing; -import org.elasticsearch.discovery.zen.ping.ZenPingService; -import org.elasticsearch.discovery.zen.publish.PendingClusterStateStats; -import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.TransportChannel; @@ -288,7 +280,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover return clusterName.value() + "/" + clusterService.localNode().getId(); } - /** start of {@link org.elasticsearch.discovery.zen.ping.PingContextProvider } implementation */ + /** start of {@link PingContextProvider } implementation */ @Override public DiscoveryNodes nodes() { return clusterService.state().nodes(); @@ -299,7 +291,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover return clusterService.state(); } - /** end of {@link org.elasticsearch.discovery.zen.ping.PingContextProvider } implementation */ + /** end of {@link PingContextProvider } implementation */ @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java similarity index 96% rename from core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java rename to core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java index b4bb61ad461..be1df88d334 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.discovery.zen.ping; +package org.elasticsearch.discovery.zen; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -27,7 +27,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.zen.ElectMasterService; import java.io.IOException; import java.util.ArrayList; @@ -159,8 +158,8 @@ public interface ZenPing extends LifecycleComponent { @Override public String toString() { - return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "], cluster_state_version [" + clusterStateVersion - + "], cluster_name[" + clusterName.value() + "]}"; + return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "]," + + "cluster_state_version [" + clusterStateVersion + "], cluster_name[" + clusterName.value() + "]}"; } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPingService.java similarity index 98% rename from core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java rename to core/src/main/java/org/elasticsearch/discovery/zen/ZenPingService.java index 3a2ddc10cfb..3aa3017f549 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPingService.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.discovery.zen.ping; +package org.elasticsearch.discovery.zen; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 6e8159589f1..88aaabe301f 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -23,7 +23,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.discovery.DiscoveryStats; -import org.elasticsearch.discovery.zen.publish.PendingClusterStateStats; +import org.elasticsearch.discovery.zen.PendingClusterStateStats; import org.elasticsearch.http.HttpStats; import org.elasticsearch.indices.breaker.AllCircuitBreakerStats; import org.elasticsearch.indices.breaker.CircuitBreakerStats; diff --git a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java index 38328a80054..73085276628 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java @@ -24,7 +24,7 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.ElectMasterService; -import org.elasticsearch.discovery.zen.fd.FaultDetection; +import org.elasticsearch.discovery.zen.FaultDetection; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.disruption.NetworkDisruption; diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index b400b0c7a5f..5e40e529dae 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -51,12 +51,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.fd.FaultDetection; -import org.elasticsearch.discovery.zen.membership.MembershipAction; -import org.elasticsearch.discovery.zen.ping.ZenPing; -import org.elasticsearch.discovery.zen.ping.ZenPingService; -import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; -import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; +import org.elasticsearch.discovery.zen.FaultDetection; +import org.elasticsearch.discovery.zen.MembershipAction; +import org.elasticsearch.discovery.zen.ZenPing; +import org.elasticsearch.discovery.zen.ZenPingService; +import org.elasticsearch.discovery.zen.UnicastZenPing; +import org.elasticsearch.discovery.zen.PublishClusterStateAction; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.indices.store.IndicesStoreIntegrationIT; import org.elasticsearch.monitor.jvm.HotThreads; diff --git a/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java b/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java index f58ff0edfde..f089f76dcff 100644 --- a/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java @@ -34,9 +34,9 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.discovery.zen.fd.FaultDetection; -import org.elasticsearch.discovery.zen.fd.MasterFaultDetection; -import org.elasticsearch.discovery.zen.fd.NodesFaultDetection; +import org.elasticsearch.discovery.zen.FaultDetection; +import org.elasticsearch.discovery.zen.MasterFaultDetection; +import org.elasticsearch.discovery.zen.NodesFaultDetection; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.test.ESTestCase; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index 60842e624db..eb580716622 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -42,7 +42,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.zen.membership.MembershipAction; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueueTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/PendingClusterStatesQueueTests.java similarity index 98% rename from core/src/test/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueueTests.java rename to core/src/test/java/org/elasticsearch/discovery/zen/PendingClusterStatesQueueTests.java index 9ff09137bf9..b1faaba576b 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueueTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/PendingClusterStatesQueueTests.java @@ -16,7 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.discovery.zen.publish; + +package org.elasticsearch.discovery.zen; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -25,7 +26,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.zen.publish.PendingClusterStatesQueue.ClusterStateContext; +import org.elasticsearch.discovery.zen.PendingClusterStatesQueue; +import org.elasticsearch.discovery.zen.PendingClusterStatesQueue.ClusterStateContext; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java similarity index 93% rename from core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java rename to core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java index 3ce16c0d093..eb8153c8354 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.discovery.zen.publish; +package org.elasticsearch.discovery.zen; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; @@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; +import org.elasticsearch.discovery.zen.PublishClusterStateAction; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESTestCase; @@ -100,21 +101,25 @@ public class PublishClusterStateActionTests extends ESTestCase { private final Logger logger; - public MockNode(DiscoveryNode discoveryNode, MockTransportService service, @Nullable ClusterStateListener listener, Logger logger) { + public MockNode(DiscoveryNode discoveryNode, MockTransportService service, + @Nullable ClusterStateListener listener, Logger logger) { this.discoveryNode = discoveryNode; this.service = service; this.listener = listener; this.logger = logger; - this.clusterState = ClusterState.builder(CLUSTER_NAME).nodes(DiscoveryNodes.builder().add(discoveryNode).localNodeId(discoveryNode.getId()).build()).build(); + this.clusterState = ClusterState.builder(CLUSTER_NAME).nodes(DiscoveryNodes.builder() + .add(discoveryNode).localNodeId(discoveryNode.getId()).build()).build(); } public MockNode setAsMaster() { - this.clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).masterNodeId(discoveryNode.getId())).build(); + this.clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) + .masterNodeId(discoveryNode.getId())).build(); return this; } public MockNode resetMasterId() { - this.clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).masterNodeId(null)).build(); + this.clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) + .masterNodeId(null)).build(); return this; } @@ -126,7 +131,8 @@ public class PublishClusterStateActionTests extends ESTestCase { @Override public void onNewClusterState(String reason) { ClusterState newClusterState = action.pendingStatesQueue().getNextClusterStateToProcess(); - logger.debug("[{}] received version [{}], uuid [{}]", discoveryNode.getName(), newClusterState.version(), newClusterState.stateUUID()); + logger.debug("[{}] received version [{}], uuid [{}]", + discoveryNode.getName(), newClusterState.version(), newClusterState.stateUUID()); if (listener != null) { ClusterChangedEvent event = new ClusterChangedEvent("", newClusterState, clusterState); listener.clusterChanged(event); @@ -156,7 +162,8 @@ public class PublishClusterStateActionTests extends ESTestCase { ThreadPool threadPool, Logger logger, Map nodes) throws Exception { final Settings settings = Settings.builder() .put("name", name) - .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "", TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") + .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "", + TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") .put(basSettings) .build(); @@ -229,7 +236,7 @@ public class PublishClusterStateActionTests extends ESTestCase { } private static MockTransportService buildTransportService(Settings settings, ThreadPool threadPool) { - MockTransportService transportService = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null ); + MockTransportService transportService = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null); transportService.start(); transportService.acceptIncomingRequests(); return transportService; @@ -268,7 +275,8 @@ public class PublishClusterStateActionTests extends ESTestCase { // cluster state update - add block previousClusterState = clusterState; - clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder().addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build(); + clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder() + .addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build(); publishStateAndWait(nodeA.action, clusterState, previousClusterState); assertSameStateFromDiff(nodeB.clusterState, clusterState); assertThat(nodeB.clusterState.blocks().global().size(), equalTo(1)); @@ -295,7 +303,8 @@ public class PublishClusterStateActionTests extends ESTestCase { // cluster state update 4 - update settings previousClusterState = clusterState; - MetaData metaData = MetaData.builder(clusterState.metaData()).transientSettings(Settings.builder().put("foo", "bar").build()).build(); + MetaData metaData = MetaData.builder(clusterState.metaData()) + .transientSettings(Settings.builder().put("foo", "bar").build()).build(); clusterState = ClusterState.builder(clusterState).metaData(metaData).incrementVersion().build(); publishStateAndWait(nodeA.action, clusterState, previousClusterState); assertSameStateFromDiff(nodeB.clusterState, clusterState); @@ -338,7 +347,8 @@ public class PublishClusterStateActionTests extends ESTestCase { MockNode nodeB = createMockNode("nodeB"); - // Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state + // Initial cluster state with both states - the second node still shouldn't + // get diff even though it's present in the previous cluster state DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).add(nodeB.discoveryNode).build(); ClusterState previousClusterState = ClusterState.builder(CLUSTER_NAME).nodes(discoveryNodes).build(); ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build(); @@ -347,7 +357,8 @@ public class PublishClusterStateActionTests extends ESTestCase { // cluster state update - add block previousClusterState = clusterState; - clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder().addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build(); + clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder() + .addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build(); publishStateAndWait(nodeA.action, clusterState, previousClusterState); assertSameStateFromDiff(nodeB.clusterState, clusterState); } @@ -370,7 +381,8 @@ public class PublishClusterStateActionTests extends ESTestCase { }); // Initial cluster state - DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(nodeA.discoveryNode).localNodeId(nodeA.discoveryNode.getId()).masterNodeId(nodeA.discoveryNode.getId()).build(); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(nodeA.discoveryNode).localNodeId(nodeA.discoveryNode.getId()).masterNodeId(nodeA.discoveryNode.getId()).build(); ClusterState clusterState = ClusterState.builder(CLUSTER_NAME).nodes(discoveryNodes).build(); // cluster state update - add nodeB @@ -381,7 +393,8 @@ public class PublishClusterStateActionTests extends ESTestCase { // cluster state update - add block previousClusterState = clusterState; - clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder().addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build(); + clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder() + .addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build(); publishStateAndWait(nodeA.action, clusterState, previousClusterState); } @@ -446,7 +459,8 @@ public class PublishClusterStateActionTests extends ESTestCase { MockNode nodeB = createMockNode("nodeB"); - // Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state + // Initial cluster state with both states - the second node still shouldn't get + // diff even though it's present in the previous cluster state DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).add(nodeB.discoveryNode).build(); ClusterState previousClusterState = ClusterState.builder(CLUSTER_NAME).nodes(discoveryNodes).build(); ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build(); @@ -455,7 +469,8 @@ public class PublishClusterStateActionTests extends ESTestCase { // cluster state update - add block previousClusterState = clusterState; - clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder().addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build(); + clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder() + .addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build(); ClusterState unserializableClusterState = new ClusterState(clusterState.version(), clusterState.stateUUID(), clusterState) { @Override @@ -603,7 +618,8 @@ public class PublishClusterStateActionTests extends ESTestCase { node.action.validateIncomingState(state, null); // now set a master node - node.clusterState = ClusterState.builder(node.clusterState).nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build(); + node.clusterState = ClusterState.builder(node.clusterState) + .nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build(); logger.info("--> testing rejection of another master"); try { node.action.validateIncomingState(state, node.clusterState); @@ -619,7 +635,8 @@ public class PublishClusterStateActionTests extends ESTestCase { logger.info("--> testing rejection of another cluster name"); try { - node.action.validateIncomingState(ClusterState.builder(new ClusterName(randomAsciiOfLength(10))).nodes(node.nodes()).build(), node.clusterState); + node.action.validateIncomingState(ClusterState.builder(new ClusterName(randomAsciiOfLength(10))) + .nodes(node.nodes()).build(), node.clusterState); fail("node accepted state with another cluster name"); } catch (IllegalStateException OK) { assertThat(OK.toString(), containsString("received state from a node that is not part of the cluster")); @@ -687,8 +704,8 @@ public class PublishClusterStateActionTests extends ESTestCase { logger.info("--> publishing states"); for (ClusterState state : states) { node.action.handleIncomingClusterStateRequest( - new BytesTransportRequest(PublishClusterStateAction.serializeFullClusterState(state, Version.CURRENT), Version.CURRENT), - channel); + new BytesTransportRequest(PublishClusterStateAction.serializeFullClusterState(state, Version.CURRENT), Version.CURRENT), + channel); assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); assertThat(channel.error.get(), nullValue()); channel.clear(); @@ -725,12 +742,14 @@ public class PublishClusterStateActionTests extends ESTestCase { */ public void testTimeoutOrCommit() throws Exception { Settings settings = Settings.builder() - .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "1ms").build(); // short but so we will sometime commit sometime timeout + // short but so we will sometime commit sometime timeout + .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "1ms").build(); MockNode master = createMockNode("master", settings, null); MockNode node = createMockNode("node", settings, null); ClusterState state = ClusterState.builder(master.clusterState) - .nodes(DiscoveryNodes.builder(master.clusterState.nodes()).add(node.discoveryNode).masterNodeId(master.discoveryNode.getId())).build(); + .nodes(DiscoveryNodes.builder(master.clusterState.nodes()) + .add(node.discoveryNode).masterNodeId(master.discoveryNode.getId())).build(); for (int i = 0; i < 10; i++) { state = ClusterState.builder(state).incrementVersion().build(); @@ -755,7 +774,8 @@ public class PublishClusterStateActionTests extends ESTestCase { private MetaData buildMetaDataForVersion(MetaData metaData, long version) { ImmutableOpenMap.Builder indices = ImmutableOpenMap.builder(metaData.indices()); - indices.put("test" + version, IndexMetaData.builder("test" + version).settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + indices.put("test" + version, IndexMetaData.builder("test" + version) + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) .numberOfShards((int) version).numberOfReplicas(0).build()); return MetaData.builder(metaData) .transientSettings(Settings.builder().put("test", version).build()) @@ -772,16 +792,19 @@ public class PublishClusterStateActionTests extends ESTestCase { assertThat(metaData.transientSettings().get("test"), equalTo(Long.toString(version))); } - public void publishStateAndWait(PublishClusterStateAction action, ClusterState state, ClusterState previousState) throws InterruptedException { + public void publishStateAndWait(PublishClusterStateAction action, ClusterState state, + ClusterState previousState) throws InterruptedException { publishState(action, state, previousState).await(1, TimeUnit.SECONDS); } - public AssertingAckListener publishState(PublishClusterStateAction action, ClusterState state, ClusterState previousState) throws InterruptedException { + public AssertingAckListener publishState(PublishClusterStateAction action, ClusterState state, + ClusterState previousState) throws InterruptedException { final int minimumMasterNodes = randomIntBetween(-1, state.nodes().getMasterNodes().size()); return publishState(action, state, previousState, minimumMasterNodes); } - public AssertingAckListener publishState(PublishClusterStateAction action, ClusterState state, ClusterState previousState, int minMasterNodes) throws InterruptedException { + public AssertingAckListener publishState(PublishClusterStateAction action, ClusterState state, + ClusterState previousState, int minMasterNodes) throws InterruptedException { AssertingAckListener assertingAckListener = new AssertingAckListener(state.nodes().getSize() - 1); ClusterChangedEvent changedEvent = new ClusterChangedEvent("test update", state, previousState); action.publish(changedEvent, minMasterNodes, assertingAckListener); @@ -829,7 +852,8 @@ public class PublishClusterStateActionTests extends ESTestCase { void assertSameState(ClusterState actual, ClusterState expected) { assertThat(actual, notNullValue()); - final String reason = "\n--> actual ClusterState: " + actual.prettyPrint() + "\n--> expected ClusterState:" + expected.prettyPrint(); + final String reason = "\n--> actual ClusterState: " + actual.prettyPrint() + "\n" + + "--> expected ClusterState:" + expected.prettyPrint(); assertThat("unequal UUIDs" + reason, actual.stateUUID(), equalTo(expected.stateUUID())); assertThat("unequal versions" + reason, actual.version(), equalTo(expected.version())); } @@ -851,7 +875,9 @@ public class PublishClusterStateActionTests extends ESTestCase { AtomicBoolean timeoutOnCommit = new AtomicBoolean(); AtomicBoolean errorOnCommit = new AtomicBoolean(); - public MockPublishAction(Settings settings, TransportService transportService, Supplier clusterStateSupplier, NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { + public MockPublishAction(Settings settings, TransportService transportService, + Supplier clusterStateSupplier, NewPendingClusterStateListener listener, + DiscoverySettings discoverySettings, ClusterName clusterName) { super(settings, transportService, clusterStateSupplier, listener, discoverySettings, clusterName); } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java similarity index 98% rename from core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java rename to core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index e382f93f651..f30a40fbbf9 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.discovery.zen.ping.unicast; +package org.elasticsearch.discovery.zen; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; @@ -34,8 +34,9 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.discovery.zen.ElectMasterService; -import org.elasticsearch.discovery.zen.ping.PingContextProvider; -import org.elasticsearch.discovery.zen.ping.ZenPing; +import org.elasticsearch.discovery.zen.UnicastZenPing; +import org.elasticsearch.discovery.zen.PingContextProvider; +import org.elasticsearch.discovery.zen.ZenPing; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java index c99187c7a93..6856d05365a 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java @@ -37,9 +37,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryStats; -import org.elasticsearch.discovery.zen.fd.FaultDetection; -import org.elasticsearch.discovery.zen.membership.MembershipAction; -import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.TestCustomMetaData; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index e86afe7664b..8c13b5783d9 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -32,9 +32,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.zen.ping.ZenPing; -import org.elasticsearch.discovery.zen.ping.ZenPingService; -import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.AssertingAckListener; +import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java index cad640b4dfa..9c70587d0e5 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java @@ -23,7 +23,6 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; diff --git a/plugins/discovery-azure-classic/src/main/java/org/elasticsearch/discovery/azure/classic/AzureUnicastHostsProvider.java b/plugins/discovery-azure-classic/src/main/java/org/elasticsearch/discovery/azure/classic/AzureUnicastHostsProvider.java index 4172ddc2bb7..cc503f0050d 100644 --- a/plugins/discovery-azure-classic/src/main/java/org/elasticsearch/discovery/azure/classic/AzureUnicastHostsProvider.java +++ b/plugins/discovery-azure-classic/src/main/java/org/elasticsearch/discovery/azure/classic/AzureUnicastHostsProvider.java @@ -39,7 +39,7 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; +import org.elasticsearch.discovery.zen.UnicastHostsProvider; import org.elasticsearch.transport.TransportService; import java.io.IOException; diff --git a/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java b/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java index 0fd6a0bea99..e2342a032ba 100644 --- a/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java +++ b/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java @@ -39,7 +39,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.SingleObjectCache; -import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; +import org.elasticsearch.discovery.zen.UnicastHostsProvider; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; diff --git a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java index 78393d34001..f00a78c3437 100644 --- a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java +++ b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java @@ -25,7 +25,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; +import org.elasticsearch.discovery.zen.UnicastHostsProvider; import org.elasticsearch.env.Environment; import org.elasticsearch.transport.TransportService; @@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing.resolveDiscoveryNodes; +import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveDiscoveryNodes; /** * An implementation of {@link UnicastHostsProvider} that reads hosts/ports diff --git a/plugins/discovery-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java b/plugins/discovery-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java index fe0eb005532..09a66a6f844 100644 --- a/plugins/discovery-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java +++ b/plugins/discovery-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java @@ -37,7 +37,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; +import org.elasticsearch.discovery.zen.UnicastHostsProvider; import org.elasticsearch.transport.TransportService; import java.io.IOException; diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java index d1e38e061bf..3e02b9de0fb 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java @@ -26,8 +26,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.discovery.DiscoveryModule; -import org.elasticsearch.discovery.zen.ping.PingContextProvider; -import org.elasticsearch.discovery.zen.ping.ZenPing; +import org.elasticsearch.discovery.zen.PingContextProvider; +import org.elasticsearch.discovery.zen.ZenPing; import org.elasticsearch.plugins.DiscoveryPlugin; import org.elasticsearch.plugins.Plugin;