From d2ff01af13df1aab31d43e27571496a31c384723 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 12 Nov 2018 19:31:10 +0100 Subject: [PATCH] Zen2: Add basic Zen1 transport-level BWC (#35443) Implements serialization compatibility between Zen1 and Zen2 transport action, allowing a Zen1 node to join a fully formed Zen2 cluster and vice-versa. --- .../cluster/coordination/Coordinator.java | 14 +- .../coordination/FollowersChecker.java | 25 +- .../cluster/coordination/JoinHelper.java | 87 +++++-- .../cluster/coordination/LeaderChecker.java | 43 +++- .../PublicationTransportHandler.java | 236 +++++++++++------- .../elasticsearch/discovery/PeerFinder.java | 131 +++++++--- .../discovery/zen/MasterFaultDetection.java | 16 +- .../discovery/zen/MembershipAction.java | 8 +- .../discovery/zen/NodesFaultDetection.java | 8 +- .../zen/PublishClusterStateAction.java | 4 +- .../discovery/zen/UnicastZenPing.java | 20 +- .../elasticsearch/discovery/zen/ZenPing.java | 2 +- .../transport/TransportResponseHandler.java | 29 +++ .../cluster/coordination/Zen1IT.java | 48 ++++ 14 files changed, 483 insertions(+), 188 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/cluster/coordination/Zen1IT.java diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 31b481d93c3..6b15d8a51ca 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.coordination; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -1110,7 +1111,18 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery @Override protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit, ActionListener responseActionListener) { - publicationHandler.sendApplyCommit(destination, applyCommit, wrapWithMutex(responseActionListener)); + publicationContext.sendApplyCommit(destination, applyCommit, wrapWithMutex(responseActionListener)); } } + + // TODO: only here temporarily for BWC development, remove once complete + public static Settings.Builder addZen1Attribute(Settings.Builder builder) { + return builder.put("node.attr.zen1", true); + } + + // TODO: only here temporarily for BWC development, remove once complete + public static boolean isZen1Node(DiscoveryNode discoveryNode) { + return discoveryNode.getVersion().before(Version.V_7_0_0) || + discoveryNode.getAttributes().containsKey("zen1"); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java index 50b4911d0c7..467f7f5c804 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java @@ -20,6 +20,8 @@ package org.elasticsearch.cluster.coordination; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.coordination.Coordinator.Mode; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -30,6 +32,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.discovery.zen.NodesFaultDetection; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportChannel; @@ -77,6 +80,8 @@ public class FollowersChecker extends AbstractComponent { public static final Setting FOLLOWER_CHECK_RETRY_COUNT_SETTING = Setting.intSetting("cluster.fault_detection.follower_check.retry_count", 3, 1, Setting.Property.NodeScope); + private final Settings settings; + private final TimeValue followerCheckInterval; private final TimeValue followerCheckTimeout; private final int followerCheckRetryCount; @@ -94,6 +99,7 @@ public class FollowersChecker extends AbstractComponent { public FollowersChecker(Settings settings, TransportService transportService, Consumer handleRequestAndUpdateState, BiConsumer onNodeFailure) { + this.settings = settings; this.transportService = transportService; this.handleRequestAndUpdateState = handleRequestAndUpdateState; this.onNodeFailure = onNodeFailure; @@ -103,8 +109,12 @@ public class FollowersChecker extends AbstractComponent { followerCheckRetryCount = FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings); updateFastResponseState(0, Mode.CANDIDATE); - transportService.registerRequestHandler(FOLLOWER_CHECK_ACTION_NAME, Names.SAME, FollowerCheckRequest::new, + transportService.registerRequestHandler(FOLLOWER_CHECK_ACTION_NAME, Names.SAME, false, false, FollowerCheckRequest::new, (request, transportChannel, task) -> handleFollowerCheck(request, transportChannel)); + transportService.registerRequestHandler( + NodesFaultDetection.PING_ACTION_NAME, NodesFaultDetection.PingRequest::new, Names.SAME, false, false, + (request, channel, task) -> // TODO: check that we're a follower of the requesting node? + channel.sendResponse(new NodesFaultDetection.PingResponse())); transportService.addConnectionListener(new TransportConnectionListener() { @Override public void onNodeDisconnected(DiscoveryNode node) { @@ -290,7 +300,18 @@ public class FollowersChecker extends AbstractComponent { final FollowerCheckRequest request = new FollowerCheckRequest(fastResponseState.term, transportService.getLocalNode()); logger.trace("handleWakeUp: checking {} with {}", discoveryNode, request); - transportService.sendRequest(discoveryNode, FOLLOWER_CHECK_ACTION_NAME, request, + + final String actionName; + final TransportRequest transportRequest; + if (Coordinator.isZen1Node(discoveryNode)) { + actionName = NodesFaultDetection.PING_ACTION_NAME; + transportRequest = new NodesFaultDetection.PingRequest(discoveryNode, ClusterName.CLUSTER_NAME_SETTING.get(settings), + transportService.getLocalNode(), ClusterState.UNKNOWN_VERSION); + } else { + actionName = FOLLOWER_CHECK_ACTION_NAME; + transportRequest = request; + } + transportService.sendRequest(discoveryNode, actionName, transportRequest, TransportRequestOptions.builder().withTimeout(followerCheckTimeout).withType(Type.PING).build(), new TransportResponseHandler() { @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 12174ea1ddc..fcd98f2a5a2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -34,11 +34,14 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.discovery.zen.MembershipAction; +import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; @@ -98,32 +101,12 @@ public class JoinHelper extends AbstractComponent { }; transportService.registerRequestHandler(JOIN_ACTION_NAME, ThreadPool.Names.GENERIC, false, false, JoinRequest::new, - (request, channel, task) -> joinHandler.accept(request, new JoinCallback() { + (request, channel, task) -> joinHandler.accept(request, transportJoinCallback(request, channel))); - @Override - public void onSuccess() { - try { - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } catch (IOException e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (Exception inner) { - inner.addSuppressed(e); - logger.warn("failed to send back failure on join request", inner); - } - } - - @Override - public String toString() { - return "JoinCallback{request=" + request + "}"; - } - })); + transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_ACTION_NAME, MembershipAction.JoinRequest::new, + ThreadPool.Names.GENERIC, false, false, + (request, channel, task) -> joinHandler.accept(new JoinRequest(request.node, Optional.empty()), // treat as non-voting join + transportJoinCallback(request, channel))); transportService.registerRequestHandler(START_JOIN_ACTION_NAME, Names.GENERIC, false, false, StartJoinRequest::new, @@ -132,6 +115,47 @@ public class JoinHelper extends AbstractComponent { sendJoinRequest(destination, Optional.of(joinLeaderInTerm.apply(request))); channel.sendResponse(Empty.INSTANCE); }); + + transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_VALIDATE_ACTION_NAME, + () -> new MembershipAction.ValidateJoinRequest(), ThreadPool.Names.GENERIC, + (request, channel, task) -> channel.sendResponse(Empty.INSTANCE)); // TODO: implement join validation + + transportService.registerRequestHandler( + ZenDiscovery.DISCOVERY_REJOIN_ACTION_NAME, ZenDiscovery.RejoinClusterRequest::new, ThreadPool.Names.SAME, + (request, channel, task) -> channel.sendResponse(Empty.INSTANCE)); // TODO: do we need to implement anything here? + + transportService.registerRequestHandler( + MembershipAction.DISCOVERY_LEAVE_ACTION_NAME, MembershipAction.LeaveRequest::new, ThreadPool.Names.SAME, + (request, channel, task) -> channel.sendResponse(Empty.INSTANCE)); // TODO: do we need to implement anything here? + } + + private JoinCallback transportJoinCallback(TransportRequest request, TransportChannel channel) { + return new JoinCallback() { + + @Override + public void onSuccess() { + try { + channel.sendResponse(Empty.INSTANCE); + } catch (IOException e) { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (Exception inner) { + inner.addSuppressed(e); + logger.warn("failed to send back failure on join request", inner); + } + } + + @Override + public String toString() { + return "JoinCallback{request=" + request + "}"; + } + }; } public void sendJoinRequest(DiscoveryNode destination, Optional optionalJoin) { @@ -139,7 +163,16 @@ public class JoinHelper extends AbstractComponent { final Tuple dedupKey = Tuple.tuple(destination, joinRequest); if (pendingOutgoingJoins.add(dedupKey)) { logger.debug("attempting to join {} with {}", destination, joinRequest); - transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest, + final String actionName; + final TransportRequest transportRequest; + if (Coordinator.isZen1Node(destination)) { + actionName = MembershipAction.DISCOVERY_JOIN_ACTION_NAME; + transportRequest = new MembershipAction.JoinRequest(transportService.getLocalNode()); + } else { + actionName = JOIN_ACTION_NAME; + transportRequest = joinRequest; + } + transportService.sendRequest(destination, actionName, transportRequest, TransportRequestOptions.builder().withTimeout(joinTimeout).build(), new TransportResponseHandler() { @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index aeb21ef880b..7a165ceb643 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.coordination; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; @@ -30,10 +31,9 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.tasks.Task; +import org.elasticsearch.discovery.zen.MasterFaultDetection; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -74,6 +74,8 @@ public class LeaderChecker extends AbstractComponent { public static final Setting LEADER_CHECK_RETRY_COUNT_SETTING = Setting.intSetting("cluster.fault_detection.leader_check.retry_count", 3, 1, Setting.Property.NodeScope); + private final Settings settings; + private final TimeValue leaderCheckInterval; private final TimeValue leaderCheckTimeout; private final int leaderCheckRetryCount; @@ -85,13 +87,29 @@ public class LeaderChecker extends AbstractComponent { private volatile DiscoveryNodes discoveryNodes; public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) { + this.settings = settings; leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings); leaderCheckTimeout = LEADER_CHECK_TIMEOUT_SETTING.get(settings); leaderCheckRetryCount = LEADER_CHECK_RETRY_COUNT_SETTING.get(settings); this.transportService = transportService; this.onLeaderFailure = onLeaderFailure; - transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, LeaderCheckRequest::new, this::handleLeaderCheck); + transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, false, false, LeaderCheckRequest::new, + (request, channel, task) -> { + handleLeaderCheck(request); + channel.sendResponse(Empty.INSTANCE); + }); + + transportService.registerRequestHandler(MasterFaultDetection.MASTER_PING_ACTION_NAME, MasterFaultDetection.MasterPingRequest::new, + Names.SAME, false, false, (request, channel, task) -> { + try { + handleLeaderCheck(new LeaderCheckRequest(request.sourceNode)); + } catch (CoordinationStateRejectedException e) { + throw new MasterFaultDetection.ThisIsNotTheMasterYouAreLookingForException(e.getMessage()); + } + channel.sendResponse(new MasterFaultDetection.MasterPingResponseResponse()); + }); + transportService.addConnectionListener(new TransportConnectionListener() { @Override public void onNodeDisconnected(DiscoveryNode node) { @@ -145,19 +163,18 @@ public class LeaderChecker extends AbstractComponent { return discoveryNodes.isLocalNodeElectedMaster(); } - private void handleLeaderCheck(LeaderCheckRequest request, TransportChannel transportChannel, Task task) throws IOException { + private void handleLeaderCheck(LeaderCheckRequest request) { final DiscoveryNodes discoveryNodes = this.discoveryNodes; assert discoveryNodes != null; if (discoveryNodes.isLocalNodeElectedMaster() == false) { logger.debug("non-master handling {}", request); - transportChannel.sendResponse(new CoordinationStateRejectedException("non-leader rejecting leader check")); + throw new CoordinationStateRejectedException("non-leader rejecting leader check"); } else if (discoveryNodes.nodeExists(request.getSender()) == false) { logger.debug("leader check from unknown node: {}", request); - transportChannel.sendResponse(new CoordinationStateRejectedException("leader check from unknown node")); + throw new CoordinationStateRejectedException("leader check from unknown node"); } else { logger.trace("handling {}", request); - transportChannel.sendResponse(Empty.INSTANCE); } } @@ -197,11 +214,21 @@ public class LeaderChecker extends AbstractComponent { logger.trace("checking {} with [{}] = {}", leader, LEADER_CHECK_TIMEOUT_SETTING.getKey(), leaderCheckTimeout); + final String actionName; + final TransportRequest transportRequest; + if (Coordinator.isZen1Node(leader)) { + actionName = MasterFaultDetection.MASTER_PING_ACTION_NAME; + transportRequest = new MasterFaultDetection.MasterPingRequest( + transportService.getLocalNode(), leader, ClusterName.CLUSTER_NAME_SETTING.get(settings)); + } else { + actionName = LEADER_CHECK_ACTION_NAME; + transportRequest = new LeaderCheckRequest(transportService.getLocalNode()); + } // TODO lag detection: // In the PoC, the leader sent its current version to the follower in the response to a LeaderCheck, so the follower // could detect if it was lagging. We'd prefer this to be implemented on the leader, so the response is just // TransportResponse.Empty here. - transportService.sendRequest(leader, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(transportService.getLocalNode()), + transportService.sendRequest(leader, actionName, transportRequest, TransportRequestOptions.builder().withTimeout(leaderCheckTimeout).withType(Type.PING).build(), new TransportResponseHandler() { diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java index d91c8e71504..7c1ca0d8705 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java @@ -39,11 +39,13 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.discovery.zen.PublishClusterStateAction; import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BytesTransportRequest; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; @@ -52,9 +54,11 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Function; public class PublicationTransportHandler extends AbstractComponent { @@ -80,31 +84,57 @@ public class PublicationTransportHandler extends AbstractComponent { this.handlePublishRequest = handlePublishRequest; transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, BytesTransportRequest::new, ThreadPool.Names.GENERIC, - false, false, (request, channel, task) -> handleIncomingPublishRequest(request, channel)); + false, false, (request, channel, task) -> channel.sendResponse(handleIncomingPublishRequest(request))); + + transportService.registerRequestHandler(PublishClusterStateAction.SEND_ACTION_NAME, BytesTransportRequest::new, + ThreadPool.Names.GENERIC, + false, false, (request, channel, task) -> { + handleIncomingPublishRequest(request); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + }); transportService.registerRequestHandler(COMMIT_STATE_ACTION_NAME, ThreadPool.Names.GENERIC, false, false, ApplyCommitRequest::new, - (request, channel, task) -> handleApplyCommit.accept(request, new ActionListener() { + (request, channel, task) -> handleApplyCommit.accept(request, transportCommitCallback(channel))); - @Override - public void onResponse(Void aVoid) { - try { - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } catch (IOException e) { - logger.debug("failed to send response on commit", e); - } + transportService.registerRequestHandler(PublishClusterStateAction.COMMIT_ACTION_NAME, + PublishClusterStateAction.CommitClusterStateRequest::new, + ThreadPool.Names.GENERIC, false, false, + (request, channel, task) -> { + final Optional matchingClusterState = Optional.ofNullable(lastSeenClusterState.get()).filter( + cs -> cs.stateUUID().equals(request.stateUUID)); + if (matchingClusterState.isPresent() == false) { + throw new IllegalStateException("can't resolve cluster state with uuid" + + " [" + request.stateUUID + "] to commit"); } + final ApplyCommitRequest applyCommitRequest = new ApplyCommitRequest(matchingClusterState.get().getNodes().getMasterNode(), + matchingClusterState.get().term(), matchingClusterState.get().version()); + handleApplyCommit.accept(applyCommitRequest, transportCommitCallback(channel)); + }); + } - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (IOException ie) { - e.addSuppressed(ie); - logger.debug("failed to send response on commit", e); - } + private ActionListener transportCommitCallback(TransportChannel channel) { + return new ActionListener() { + + @Override + public void onResponse(Void aVoid) { + try { + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } catch (IOException e) { + logger.debug("failed to send response on commit", e); } - })); + } + + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (IOException ie) { + e.addSuppressed(ie); + logger.debug("failed to send response on commit", e); + } + } + }; } public PublishClusterStateStats stats() { @@ -119,6 +149,9 @@ public class PublicationTransportHandler extends AbstractComponent { void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, ActionListener responseActionListener); + void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest, + ActionListener responseActionListener); + } public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent) { @@ -136,34 +169,78 @@ public class PublicationTransportHandler extends AbstractComponent { buildDiffAndSerializeStates(clusterChangedEvent.state(), clusterChangedEvent.previousState(), nodes, sendFullVersion, serializedStates, serializedDiffs); - return (destination, publishRequest, responseActionListener) -> { - assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us"; - if (destination.equals(nodes.getLocalNode())) { - // the master needs the original non-serialized state as the cluster state contains some volatile information that we don't - // want to be replicated because it's not usable on another node (e.g. UnassignedInfo.unassignedTimeNanos) or because it's - // mostly just debugging info that would unnecessarily blow up CS updates (I think there was one in snapshot code). - // TODO: look into these and check how to get rid of them - transportService.getThreadPool().generic().execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - // wrap into fake TransportException, as that's what we expect in Publication - responseActionListener.onFailure(new TransportException(e)); - } + return new PublicationContext() { + @Override + public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, + ActionListener responseActionListener) { + assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us"; + if (destination.equals(nodes.getLocalNode())) { + // the master needs the original non-serialized state as the cluster state contains some volatile information that we + // don't want to be replicated because it's not usable on another node (e.g. UnassignedInfo.unassignedTimeNanos) or + // because it's mostly just debugging info that would unnecessarily blow up CS updates (I think there was one in + // snapshot code). + // TODO: look into these and check how to get rid of them + transportService.getThreadPool().generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + // wrap into fake TransportException, as that's what we expect in Publication + responseActionListener.onFailure(new TransportException(e)); + } - @Override - protected void doRun() { - responseActionListener.onResponse(handlePublishRequest.apply(publishRequest)); - } + @Override + protected void doRun() { + responseActionListener.onResponse(handlePublishRequest.apply(publishRequest)); + } - @Override - public String toString() { - return "publish to self of " + publishRequest; - } - }); - } else if (sendFullVersion || !previousState.nodes().nodeExists(destination)) { - sendFullClusterState(newState, serializedStates, destination, responseActionListener); - } else { - sendClusterStateDiff(newState, serializedDiffs, serializedStates, destination, responseActionListener); + @Override + public String toString() { + return "publish to self of " + publishRequest; + } + }); + } else if (sendFullVersion || !previousState.nodes().nodeExists(destination)) { + PublicationTransportHandler.this.sendFullClusterState(newState, serializedStates, destination, responseActionListener); + } else { + PublicationTransportHandler.this.sendClusterStateDiff(newState, serializedDiffs, serializedStates, destination, + responseActionListener); + } + } + + @Override + public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest, + ActionListener responseActionListener) { + TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build(); + final String actionName; + final TransportRequest transportRequest; + if (Coordinator.isZen1Node(destination)) { + actionName = PublishClusterStateAction.COMMIT_ACTION_NAME; + transportRequest = new PublishClusterStateAction.CommitClusterStateRequest(newState.stateUUID()); + } else { + actionName = COMMIT_STATE_ACTION_NAME; + transportRequest = applyCommitRequest; + } + transportService.sendRequest(destination, actionName, transportRequest, options, + new TransportResponseHandler() { + + @Override + public TransportResponse.Empty read(StreamInput in) { + return TransportResponse.Empty.INSTANCE; + } + + @Override + public void handleResponse(TransportResponse.Empty response) { + responseActionListener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + responseActionListener.onFailure(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + }); } }; } @@ -175,11 +252,19 @@ public class PublicationTransportHandler extends AbstractComponent { // -> no need to put a timeout on the options here, because we want the response to eventually be received // and not log an error if it arrives after the timeout // -> no need to compress, we already compressed the bytes - TransportRequestOptions options = TransportRequestOptions.builder() + final TransportRequestOptions options = TransportRequestOptions.builder() .withType(TransportRequestOptions.Type.STATE).withCompress(false).build(); - transportService.sendRequest(node, PUBLISH_STATE_ACTION_NAME, - new BytesTransportRequest(bytes, node.getVersion()), - options, + final BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion()); + final Consumer transportExceptionHandler = exp -> { + if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) { + logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage()); + sendFullClusterState(clusterState, serializedStates, node, responseActionListener); + } else { + logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", node), exp); + responseActionListener.onFailure(exp); + } + }; + final TransportResponseHandler publishWithJoinResponseHandler = new TransportResponseHandler() { @Override @@ -194,20 +279,27 @@ public class PublicationTransportHandler extends AbstractComponent { @Override public void handleException(TransportException exp) { - if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) { - logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage()); - sendFullClusterState(clusterState, serializedStates, node, responseActionListener); - } else { - logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", node), exp); - responseActionListener.onFailure(exp); - } + transportExceptionHandler.accept(exp); } @Override public String executor() { return ThreadPool.Names.GENERIC; } - }); + }; + final String actionName; + final TransportResponseHandler transportResponseHandler; + if (Coordinator.isZen1Node(node)) { + actionName = PublishClusterStateAction.SEND_ACTION_NAME; + transportResponseHandler = publishWithJoinResponseHandler.wrap(empty -> new PublishWithJoinResponse( + new PublishResponse(clusterState.term(), clusterState.version()), + Optional.of(new Join(node, transportService.getLocalNode(), clusterState.term(), clusterState.term(), + clusterState.version()))), in -> TransportResponse.Empty.INSTANCE); + } else { + actionName = PUBLISH_STATE_ACTION_NAME; + transportResponseHandler = publishWithJoinResponseHandler; + } + transportService.sendRequest(node, actionName, request, options, transportResponseHandler); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e); responseActionListener.onFailure(e); @@ -283,7 +375,7 @@ public class PublicationTransportHandler extends AbstractComponent { return bStream.bytes(); } - private void handleIncomingPublishRequest(BytesTransportRequest request, TransportChannel channel) throws IOException { + private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException { final Compressor compressor = CompressorFactory.compressor(request.bytes()); StreamInput in = request.bytes().streamInput(); final ClusterState incomingState; @@ -324,34 +416,6 @@ public class PublicationTransportHandler extends AbstractComponent { IOUtils.close(in); } - channel.sendResponse(handlePublishRequest.apply(new PublishRequest(incomingState))); - } - - public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest, - ActionListener responseActionListener) { - TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build(); - transportService.sendRequest(destination, COMMIT_STATE_ACTION_NAME, applyCommitRequest, options, - new TransportResponseHandler() { - - @Override - public TransportResponse.Empty read(StreamInput in) { - return TransportResponse.Empty.INSTANCE; - } - - @Override - public void handleResponse(TransportResponse.Empty response) { - responseActionListener.onResponse(response); - } - - @Override - public void handleException(TransportException exp) { - responseActionListener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } - }); + return handlePublishRequest.apply(new PublishRequest(incomingState)); } } diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index b012928d73c..b244ef5766f 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -23,6 +23,9 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.coordination.PeersResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -34,13 +37,22 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.discovery.zen.UnicastZenPing; +import org.elasticsearch.discovery.zen.ZenDiscovery; +import org.elasticsearch.discovery.zen.ZenPing; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -64,6 +76,8 @@ public abstract class PeerFinder extends AbstractComponent { Setting.timeSetting("discovery.request_peers_timeout", TimeValue.timeValueMillis(3000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + private final Settings settings; + private final TimeValue findPeersInterval; private final TimeValue requestPeersTimeout; @@ -80,6 +94,7 @@ public abstract class PeerFinder extends AbstractComponent { public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) { + this.settings = settings; findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings); requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings); this.transportService = transportService; @@ -89,6 +104,9 @@ public abstract class PeerFinder extends AbstractComponent { transportService.registerRequestHandler(REQUEST_PEERS_ACTION_NAME, Names.GENERIC, false, false, PeersRequest::new, (request, channel, task) -> channel.sendResponse(handlePeersRequest(request))); + + transportService.registerRequestHandler(UnicastZenPing.ACTION_NAME, Names.GENERIC, false, false, + UnicastZenPing.UnicastPingRequest::new, new Zen1UnicastPingRequestHandler()); } public void activate(final DiscoveryNodes lastAcceptedNodes) { @@ -384,50 +402,77 @@ public abstract class PeerFinder extends AbstractComponent { logger.trace("{} requesting peers", this); peersRequestInFlight = true; - List knownNodes = getFoundPeersUnderLock(); + final List knownNodes = getFoundPeersUnderLock(); - transportService.sendRequest(discoveryNode, REQUEST_PEERS_ACTION_NAME, - new PeersRequest(getLocalNode(), knownNodes), - TransportRequestOptions.builder().withTimeout(requestPeersTimeout).build(), - new TransportResponseHandler() { + final TransportResponseHandler peersResponseHandler = new TransportResponseHandler() { - @Override - public PeersResponse read(StreamInput in) throws IOException { - return new PeersResponse(in); - } + @Override + public PeersResponse read(StreamInput in) throws IOException { + return new PeersResponse(in); + } - @Override - public void handleResponse(PeersResponse response) { - logger.trace("{} received {}", Peer.this, response); - synchronized (mutex) { - if (active == false) { - return; - } - - peersRequestInFlight = false; - - response.getMasterNode().map(DiscoveryNode::getAddress).ifPresent(PeerFinder.this::startProbe); - response.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(PeerFinder.this::startProbe); + @Override + public void handleResponse(PeersResponse response) { + logger.trace("{} received {}", Peer.this, response); + synchronized (mutex) { + if (active == false) { + return; } - if (response.getMasterNode().equals(Optional.of(discoveryNode))) { - // Must not hold lock here to avoid deadlock - assert holdsLock() == false : "PeerFinder mutex is held in error"; - onActiveMasterFound(discoveryNode, response.getTerm()); - } - } - - @Override - public void handleException(TransportException exp) { peersRequestInFlight = false; - logger.debug(new ParameterizedMessage("{} peers request failed", Peer.this), exp); + + response.getMasterNode().map(DiscoveryNode::getAddress).ifPresent(PeerFinder.this::startProbe); + response.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(PeerFinder.this::startProbe); } - @Override - public String executor() { - return Names.GENERIC; + if (response.getMasterNode().equals(Optional.of(discoveryNode))) { + // Must not hold lock here to avoid deadlock + assert holdsLock() == false : "PeerFinder mutex is held in error"; + onActiveMasterFound(discoveryNode, response.getTerm()); } - }); + } + + @Override + public void handleException(TransportException exp) { + peersRequestInFlight = false; + logger.debug(new ParameterizedMessage("{} peers request failed", Peer.this), exp); + } + + @Override + public String executor() { + return Names.GENERIC; + } + }; + final String actionName; + final TransportRequest transportRequest; + final TransportResponseHandler transportResponseHandler; + if (Coordinator.isZen1Node(discoveryNode)) { + actionName = UnicastZenPing.ACTION_NAME; + transportRequest = new UnicastZenPing.UnicastPingRequest(1, ZenDiscovery.PING_TIMEOUT_SETTING.get(settings), + new ZenPing.PingResponse(getLocalNode(), null, ClusterName.CLUSTER_NAME_SETTING.get(settings), + ClusterState.UNKNOWN_VERSION)); + transportResponseHandler = peersResponseHandler.wrap(ucResponse -> { + Optional optionalMasterNode = Arrays.stream(ucResponse.pingResponses) + .filter(pr -> discoveryNode.equals(pr.node()) && discoveryNode.equals(pr.master())) + .map(ZenPing.PingResponse::node) + .findFirst(); + List discoveredNodes = new ArrayList<>(); + if (optionalMasterNode.isPresent() == false) { + Arrays.stream(ucResponse.pingResponses).map(pr -> pr.master()).filter(Objects::nonNull) + .forEach(discoveredNodes::add); + Arrays.stream(ucResponse.pingResponses).map(pr -> pr.node()).forEach(discoveredNodes::add); + } + return new PeersResponse(optionalMasterNode, discoveredNodes, 0L); + }, UnicastZenPing.UnicastPingResponse::new); + } else { + actionName = REQUEST_PEERS_ACTION_NAME; + transportRequest = new PeersRequest(getLocalNode(), knownNodes); + transportResponseHandler = peersResponseHandler; + } + transportService.sendRequest(discoveryNode, actionName, + transportRequest, + TransportRequestOptions.builder().withTimeout(requestPeersTimeout).build(), + transportResponseHandler); } @Override @@ -439,4 +484,20 @@ public abstract class PeerFinder extends AbstractComponent { '}'; } } + + private class Zen1UnicastPingRequestHandler implements TransportRequestHandler { + @Override + public void messageReceived(UnicastZenPing.UnicastPingRequest request, TransportChannel channel, Task task) throws Exception { + final PeersRequest peersRequest = new PeersRequest(request.pingResponse.node(), + Optional.ofNullable(request.pingResponse.master()).map(Collections::singletonList).orElse(Collections.emptyList())); + final PeersResponse peersResponse = handlePeersRequest(peersRequest); + final List pingResponses = new ArrayList<>(); + final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); + pingResponses.add(new ZenPing.PingResponse(transportService.getLocalNode(), peersResponse.getMasterNode().orElse(null), + clusterName, 0L)); + peersResponse.getKnownPeers().forEach(dn -> pingResponses.add( + new ZenPing.PingResponse(dn, null, clusterName, ClusterState.UNKNOWN_VERSION))); + channel.sendResponse(new UnicastZenPing.UnicastPingResponse(request.id, pingResponses.toArray(new ZenPing.PingResponse[0]))); + } + } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java b/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java index b48ea77e64c..8608571b4fa 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java @@ -297,13 +297,13 @@ public class MasterFaultDetection extends FaultDetection { } /** Thrown when a ping reaches the wrong node */ - static class ThisIsNotTheMasterYouAreLookingForException extends IllegalStateException { + public static class ThisIsNotTheMasterYouAreLookingForException extends IllegalStateException { - ThisIsNotTheMasterYouAreLookingForException(String msg) { + public ThisIsNotTheMasterYouAreLookingForException(String msg) { super(msg); } - ThisIsNotTheMasterYouAreLookingForException() { + public ThisIsNotTheMasterYouAreLookingForException() { } @Override @@ -397,7 +397,7 @@ public class MasterFaultDetection extends FaultDetection { public static class MasterPingRequest extends TransportRequest { - private DiscoveryNode sourceNode; + public DiscoveryNode sourceNode; private DiscoveryNode masterNode; private ClusterName clusterName; @@ -405,7 +405,7 @@ public class MasterFaultDetection extends FaultDetection { public MasterPingRequest() { } - private MasterPingRequest(DiscoveryNode sourceNode, DiscoveryNode masterNode, ClusterName clusterName) { + public MasterPingRequest(DiscoveryNode sourceNode, DiscoveryNode masterNode, ClusterName clusterName) { this.sourceNode = sourceNode; this.masterNode = masterNode; this.clusterName = clusterName; @@ -428,12 +428,12 @@ public class MasterFaultDetection extends FaultDetection { } } - private static class MasterPingResponseResponse extends TransportResponse { + public static class MasterPingResponseResponse extends TransportResponse { - private MasterPingResponseResponse() { + public MasterPingResponseResponse() { } - private MasterPingResponseResponse(StreamInput in) throws IOException { + public MasterPingResponseResponse(StreamInput in) throws IOException { super(in); } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java b/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java index ac66c9e999c..2ae053da378 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java @@ -102,12 +102,12 @@ public class MembershipAction extends AbstractComponent { public static class JoinRequest extends TransportRequest { - DiscoveryNode node; + public DiscoveryNode node; public JoinRequest() { } - private JoinRequest(DiscoveryNode node) { + public JoinRequest(DiscoveryNode node) { this.node = node; } @@ -152,10 +152,10 @@ public class MembershipAction extends AbstractComponent { } } - static class ValidateJoinRequest extends TransportRequest { + public static class ValidateJoinRequest extends TransportRequest { private ClusterState state; - ValidateJoinRequest() {} + public ValidateJoinRequest() {} ValidateJoinRequest(ClusterState state) { this.state = state; diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java b/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java index 40bde9ee81d..bd64094a306 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java @@ -312,7 +312,7 @@ public class NodesFaultDetection extends FaultDetection { public PingRequest() { } - PingRequest(DiscoveryNode targetNode, ClusterName clusterName, DiscoveryNode masterNode, long clusterStateVersion) { + public PingRequest(DiscoveryNode targetNode, ClusterName clusterName, DiscoveryNode masterNode, long clusterStateVersion) { this.targetNode = targetNode; this.clusterName = clusterName; this.masterNode = masterNode; @@ -354,12 +354,12 @@ public class NodesFaultDetection extends FaultDetection { } } - private static class PingResponse extends TransportResponse { + public static class PingResponse extends TransportResponse { - private PingResponse() { + public PingResponse() { } - private PingResponse(StreamInput in) throws IOException { + public PingResponse(StreamInput in) throws IOException { super(in); } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java b/server/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java index 07490b99005..467ebe714f6 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java @@ -458,9 +458,9 @@ public class PublishClusterStateAction extends AbstractComponent { } } - protected static class CommitClusterStateRequest extends TransportRequest { + public static class CommitClusterStateRequest extends TransportRequest { - String stateUUID; + public String stateUUID; public CommitClusterStateRequest() { } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 8fb9cfce0bf..abeec3a67d2 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -585,19 +585,19 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { } - static class UnicastPingRequest extends TransportRequest { + public static class UnicastPingRequest extends TransportRequest { - final int id; - final TimeValue timeout; - final PingResponse pingResponse; + public final int id; + public final TimeValue timeout; + public final PingResponse pingResponse; - UnicastPingRequest(int id, TimeValue timeout, PingResponse pingResponse) { + public UnicastPingRequest(int id, TimeValue timeout, PingResponse pingResponse) { this.id = id; this.timeout = timeout; this.pingResponse = pingResponse; } - UnicastPingRequest(StreamInput in) throws IOException { + public UnicastPingRequest(StreamInput in) throws IOException { super(in); id = in.readInt(); timeout = in.readTimeValue(); @@ -623,18 +623,18 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { return new PingResponse(discoNodes.getLocalNode(), discoNodes.getMasterNode(), clusterState); } - static class UnicastPingResponse extends TransportResponse { + public static class UnicastPingResponse extends TransportResponse { final int id; - final PingResponse[] pingResponses; + public final PingResponse[] pingResponses; - UnicastPingResponse(int id, PingResponse[] pingResponses) { + public UnicastPingResponse(int id, PingResponse[] pingResponses) { this.id = id; this.pingResponses = pingResponses; } - UnicastPingResponse(StreamInput in) throws IOException { + public UnicastPingResponse(StreamInput in) throws IOException { id = in.readInt(); pingResponses = new PingResponse[in.readVInt()]; for (int i = 0; i < pingResponses.length; i++) { diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java b/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java index a16830c25b0..af5a3d285a3 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java @@ -67,7 +67,7 @@ public interface ZenPing extends Releasable { * @param clusterStateVersion the current cluster state version of that node * ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered) */ - PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) { + public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) { this.id = idGenerator.incrementAndGet(); this.node = node; this.master = master; diff --git a/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java b/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java index 29720216cf4..c43f57754d6 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java @@ -19,8 +19,12 @@ package org.elasticsearch.transport; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; +import java.io.IOException; +import java.util.function.Function; + public interface TransportResponseHandler extends Writeable.Reader { void handleResponse(T response); @@ -28,4 +32,29 @@ public interface TransportResponseHandler extends W void handleException(TransportException exp); String executor(); + + default TransportResponseHandler wrap(Function converter, Writeable.Reader reader) { + final TransportResponseHandler self = this; + return new TransportResponseHandler() { + @Override + public void handleResponse(Q response) { + self.handleResponse(converter.apply(response)); + } + + @Override + public void handleException(TransportException exp) { + self.handleException(exp); + } + + @Override + public String executor() { + return self.executor(); + } + + @Override + public Q read(StreamInput in) throws IOException { + return reader.read(in); + } + }; + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/Zen1IT.java b/server/src/test/java/org/elasticsearch/cluster/coordination/Zen1IT.java new file mode 100644 index 00000000000..a4ede1df6de --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/Zen1IT.java @@ -0,0 +1,48 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.discovery.TestZenDiscovery; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class Zen1IT extends ESIntegTestCase { + + private static Settings ZEN1_SETTINGS = Coordinator.addZen1Attribute(Settings.builder() + .put(TestZenDiscovery.USE_ZEN2.getKey(), false) + .put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false)) // Zen2 does not know about mock pings + .build(); + + private static Settings ZEN2_SETTINGS = Settings.builder() + .put(TestZenDiscovery.USE_ZEN2.getKey(), true) + .build(); + + public void testZen2NodesJoiningZen1Cluster() { + internalCluster().startNodes(randomIntBetween(1, 3), ZEN1_SETTINGS); + internalCluster().startNodes(randomIntBetween(1, 3), ZEN2_SETTINGS); + createIndex("test"); + } + + public void testZen1NodesJoiningZen2Cluster() { + internalCluster().startNodes(randomIntBetween(1, 3), ZEN2_SETTINGS); + internalCluster().startNodes(randomIntBetween(1, 3), ZEN1_SETTINGS); + createIndex("test"); + } +}