Zen2: Add basic Zen1 transport-level BWC (#35443)

Implements serialization compatibility between Zen1 and Zen2 transport action, allowing a Zen1 node to join a fully formed Zen2 cluster and vice-versa.
This commit is contained in:
Yannick Welsch 2018-11-12 19:31:10 +01:00 committed by GitHub
parent fe29b18c26
commit d2ff01af13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 483 additions and 188 deletions

View File

@ -20,6 +20,7 @@ package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -1110,7 +1111,18 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
@Override
protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit,
ActionListener<Empty> responseActionListener) {
publicationHandler.sendApplyCommit(destination, applyCommit, wrapWithMutex(responseActionListener));
publicationContext.sendApplyCommit(destination, applyCommit, wrapWithMutex(responseActionListener));
}
}
// TODO: only here temporarily for BWC development, remove once complete
public static Settings.Builder addZen1Attribute(Settings.Builder builder) {
return builder.put("node.attr.zen1", true);
}
// TODO: only here temporarily for BWC development, remove once complete
public static boolean isZen1Node(DiscoveryNode discoveryNode) {
return discoveryNode.getVersion().before(Version.V_7_0_0) ||
discoveryNode.getAttributes().containsKey("zen1");
}
}

View File

@ -20,6 +20,8 @@
package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -30,6 +32,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.zen.NodesFaultDetection;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
@ -77,6 +80,8 @@ public class FollowersChecker extends AbstractComponent {
public static final Setting<Integer> FOLLOWER_CHECK_RETRY_COUNT_SETTING =
Setting.intSetting("cluster.fault_detection.follower_check.retry_count", 3, 1, Setting.Property.NodeScope);
private final Settings settings;
private final TimeValue followerCheckInterval;
private final TimeValue followerCheckTimeout;
private final int followerCheckRetryCount;
@ -94,6 +99,7 @@ public class FollowersChecker extends AbstractComponent {
public FollowersChecker(Settings settings, TransportService transportService,
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
BiConsumer<DiscoveryNode, String> onNodeFailure) {
this.settings = settings;
this.transportService = transportService;
this.handleRequestAndUpdateState = handleRequestAndUpdateState;
this.onNodeFailure = onNodeFailure;
@ -103,8 +109,12 @@ public class FollowersChecker extends AbstractComponent {
followerCheckRetryCount = FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings);
updateFastResponseState(0, Mode.CANDIDATE);
transportService.registerRequestHandler(FOLLOWER_CHECK_ACTION_NAME, Names.SAME, FollowerCheckRequest::new,
transportService.registerRequestHandler(FOLLOWER_CHECK_ACTION_NAME, Names.SAME, false, false, FollowerCheckRequest::new,
(request, transportChannel, task) -> handleFollowerCheck(request, transportChannel));
transportService.registerRequestHandler(
NodesFaultDetection.PING_ACTION_NAME, NodesFaultDetection.PingRequest::new, Names.SAME, false, false,
(request, channel, task) -> // TODO: check that we're a follower of the requesting node?
channel.sendResponse(new NodesFaultDetection.PingResponse()));
transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
@ -290,7 +300,18 @@ public class FollowersChecker extends AbstractComponent {
final FollowerCheckRequest request = new FollowerCheckRequest(fastResponseState.term, transportService.getLocalNode());
logger.trace("handleWakeUp: checking {} with {}", discoveryNode, request);
transportService.sendRequest(discoveryNode, FOLLOWER_CHECK_ACTION_NAME, request,
final String actionName;
final TransportRequest transportRequest;
if (Coordinator.isZen1Node(discoveryNode)) {
actionName = NodesFaultDetection.PING_ACTION_NAME;
transportRequest = new NodesFaultDetection.PingRequest(discoveryNode, ClusterName.CLUSTER_NAME_SETTING.get(settings),
transportService.getLocalNode(), ClusterState.UNKNOWN_VERSION);
} else {
actionName = FOLLOWER_CHECK_ACTION_NAME;
transportRequest = request;
}
transportService.sendRequest(discoveryNode, actionName, transportRequest,
TransportRequestOptions.builder().withTimeout(followerCheckTimeout).withType(Type.PING).build(),
new TransportResponseHandler<Empty>() {
@Override

View File

@ -34,11 +34,14 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.zen.MembershipAction;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
@ -98,32 +101,12 @@ public class JoinHelper extends AbstractComponent {
};
transportService.registerRequestHandler(JOIN_ACTION_NAME, ThreadPool.Names.GENERIC, false, false, JoinRequest::new,
(request, channel, task) -> joinHandler.accept(request, new JoinCallback() {
(request, channel, task) -> joinHandler.accept(request, transportJoinCallback(request, channel)));
@Override
public void onSuccess() {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (IOException e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn("failed to send back failure on join request", inner);
}
}
@Override
public String toString() {
return "JoinCallback{request=" + request + "}";
}
}));
transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_ACTION_NAME, MembershipAction.JoinRequest::new,
ThreadPool.Names.GENERIC, false, false,
(request, channel, task) -> joinHandler.accept(new JoinRequest(request.node, Optional.empty()), // treat as non-voting join
transportJoinCallback(request, channel)));
transportService.registerRequestHandler(START_JOIN_ACTION_NAME, Names.GENERIC, false, false,
StartJoinRequest::new,
@ -132,6 +115,47 @@ public class JoinHelper extends AbstractComponent {
sendJoinRequest(destination, Optional.of(joinLeaderInTerm.apply(request)));
channel.sendResponse(Empty.INSTANCE);
});
transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
() -> new MembershipAction.ValidateJoinRequest(), ThreadPool.Names.GENERIC,
(request, channel, task) -> channel.sendResponse(Empty.INSTANCE)); // TODO: implement join validation
transportService.registerRequestHandler(
ZenDiscovery.DISCOVERY_REJOIN_ACTION_NAME, ZenDiscovery.RejoinClusterRequest::new, ThreadPool.Names.SAME,
(request, channel, task) -> channel.sendResponse(Empty.INSTANCE)); // TODO: do we need to implement anything here?
transportService.registerRequestHandler(
MembershipAction.DISCOVERY_LEAVE_ACTION_NAME, MembershipAction.LeaveRequest::new, ThreadPool.Names.SAME,
(request, channel, task) -> channel.sendResponse(Empty.INSTANCE)); // TODO: do we need to implement anything here?
}
private JoinCallback transportJoinCallback(TransportRequest request, TransportChannel channel) {
return new JoinCallback() {
@Override
public void onSuccess() {
try {
channel.sendResponse(Empty.INSTANCE);
} catch (IOException e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn("failed to send back failure on join request", inner);
}
}
@Override
public String toString() {
return "JoinCallback{request=" + request + "}";
}
};
}
public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
@ -139,7 +163,16 @@ public class JoinHelper extends AbstractComponent {
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
if (pendingOutgoingJoins.add(dedupKey)) {
logger.debug("attempting to join {} with {}", destination, joinRequest);
transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest,
final String actionName;
final TransportRequest transportRequest;
if (Coordinator.isZen1Node(destination)) {
actionName = MembershipAction.DISCOVERY_JOIN_ACTION_NAME;
transportRequest = new MembershipAction.JoinRequest(transportService.getLocalNode());
} else {
actionName = JOIN_ACTION_NAME;
transportRequest = joinRequest;
}
transportService.sendRequest(destination, actionName, transportRequest,
TransportRequestOptions.builder().withTimeout(joinTimeout).build(),
new TransportResponseHandler<Empty>() {
@Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
@ -30,10 +31,9 @@ import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.discovery.zen.MasterFaultDetection;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
@ -74,6 +74,8 @@ public class LeaderChecker extends AbstractComponent {
public static final Setting<Integer> LEADER_CHECK_RETRY_COUNT_SETTING =
Setting.intSetting("cluster.fault_detection.leader_check.retry_count", 3, 1, Setting.Property.NodeScope);
private final Settings settings;
private final TimeValue leaderCheckInterval;
private final TimeValue leaderCheckTimeout;
private final int leaderCheckRetryCount;
@ -85,13 +87,29 @@ public class LeaderChecker extends AbstractComponent {
private volatile DiscoveryNodes discoveryNodes;
public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) {
this.settings = settings;
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
leaderCheckTimeout = LEADER_CHECK_TIMEOUT_SETTING.get(settings);
leaderCheckRetryCount = LEADER_CHECK_RETRY_COUNT_SETTING.get(settings);
this.transportService = transportService;
this.onLeaderFailure = onLeaderFailure;
transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, LeaderCheckRequest::new, this::handleLeaderCheck);
transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, false, false, LeaderCheckRequest::new,
(request, channel, task) -> {
handleLeaderCheck(request);
channel.sendResponse(Empty.INSTANCE);
});
transportService.registerRequestHandler(MasterFaultDetection.MASTER_PING_ACTION_NAME, MasterFaultDetection.MasterPingRequest::new,
Names.SAME, false, false, (request, channel, task) -> {
try {
handleLeaderCheck(new LeaderCheckRequest(request.sourceNode));
} catch (CoordinationStateRejectedException e) {
throw new MasterFaultDetection.ThisIsNotTheMasterYouAreLookingForException(e.getMessage());
}
channel.sendResponse(new MasterFaultDetection.MasterPingResponseResponse());
});
transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
@ -145,19 +163,18 @@ public class LeaderChecker extends AbstractComponent {
return discoveryNodes.isLocalNodeElectedMaster();
}
private void handleLeaderCheck(LeaderCheckRequest request, TransportChannel transportChannel, Task task) throws IOException {
private void handleLeaderCheck(LeaderCheckRequest request) {
final DiscoveryNodes discoveryNodes = this.discoveryNodes;
assert discoveryNodes != null;
if (discoveryNodes.isLocalNodeElectedMaster() == false) {
logger.debug("non-master handling {}", request);
transportChannel.sendResponse(new CoordinationStateRejectedException("non-leader rejecting leader check"));
throw new CoordinationStateRejectedException("non-leader rejecting leader check");
} else if (discoveryNodes.nodeExists(request.getSender()) == false) {
logger.debug("leader check from unknown node: {}", request);
transportChannel.sendResponse(new CoordinationStateRejectedException("leader check from unknown node"));
throw new CoordinationStateRejectedException("leader check from unknown node");
} else {
logger.trace("handling {}", request);
transportChannel.sendResponse(Empty.INSTANCE);
}
}
@ -197,11 +214,21 @@ public class LeaderChecker extends AbstractComponent {
logger.trace("checking {} with [{}] = {}", leader, LEADER_CHECK_TIMEOUT_SETTING.getKey(), leaderCheckTimeout);
final String actionName;
final TransportRequest transportRequest;
if (Coordinator.isZen1Node(leader)) {
actionName = MasterFaultDetection.MASTER_PING_ACTION_NAME;
transportRequest = new MasterFaultDetection.MasterPingRequest(
transportService.getLocalNode(), leader, ClusterName.CLUSTER_NAME_SETTING.get(settings));
} else {
actionName = LEADER_CHECK_ACTION_NAME;
transportRequest = new LeaderCheckRequest(transportService.getLocalNode());
}
// TODO lag detection:
// In the PoC, the leader sent its current version to the follower in the response to a LeaderCheck, so the follower
// could detect if it was lagging. We'd prefer this to be implemented on the leader, so the response is just
// TransportResponse.Empty here.
transportService.sendRequest(leader, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(transportService.getLocalNode()),
transportService.sendRequest(leader, actionName, transportRequest,
TransportRequestOptions.builder().withTimeout(leaderCheckTimeout).withType(Type.PING).build(),
new TransportResponseHandler<TransportResponse.Empty>() {

View File

@ -39,11 +39,13 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.discovery.zen.PublishClusterStateAction;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesTransportRequest;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
@ -52,9 +54,11 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
public class PublicationTransportHandler extends AbstractComponent {
@ -80,31 +84,57 @@ public class PublicationTransportHandler extends AbstractComponent {
this.handlePublishRequest = handlePublishRequest;
transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, BytesTransportRequest::new, ThreadPool.Names.GENERIC,
false, false, (request, channel, task) -> handleIncomingPublishRequest(request, channel));
false, false, (request, channel, task) -> channel.sendResponse(handleIncomingPublishRequest(request)));
transportService.registerRequestHandler(PublishClusterStateAction.SEND_ACTION_NAME, BytesTransportRequest::new,
ThreadPool.Names.GENERIC,
false, false, (request, channel, task) -> {
handleIncomingPublishRequest(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
});
transportService.registerRequestHandler(COMMIT_STATE_ACTION_NAME, ThreadPool.Names.GENERIC, false, false,
ApplyCommitRequest::new,
(request, channel, task) -> handleApplyCommit.accept(request, new ActionListener<Void>() {
(request, channel, task) -> handleApplyCommit.accept(request, transportCommitCallback(channel)));
@Override
public void onResponse(Void aVoid) {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (IOException e) {
logger.debug("failed to send response on commit", e);
}
transportService.registerRequestHandler(PublishClusterStateAction.COMMIT_ACTION_NAME,
PublishClusterStateAction.CommitClusterStateRequest::new,
ThreadPool.Names.GENERIC, false, false,
(request, channel, task) -> {
final Optional<ClusterState> matchingClusterState = Optional.ofNullable(lastSeenClusterState.get()).filter(
cs -> cs.stateUUID().equals(request.stateUUID));
if (matchingClusterState.isPresent() == false) {
throw new IllegalStateException("can't resolve cluster state with uuid" +
" [" + request.stateUUID + "] to commit");
}
final ApplyCommitRequest applyCommitRequest = new ApplyCommitRequest(matchingClusterState.get().getNodes().getMasterNode(),
matchingClusterState.get().term(), matchingClusterState.get().version());
handleApplyCommit.accept(applyCommitRequest, transportCommitCallback(channel));
});
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException ie) {
e.addSuppressed(ie);
logger.debug("failed to send response on commit", e);
}
private ActionListener<Void> transportCommitCallback(TransportChannel channel) {
return new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (IOException e) {
logger.debug("failed to send response on commit", e);
}
}));
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException ie) {
e.addSuppressed(ie);
logger.debug("failed to send response on commit", e);
}
}
};
}
public PublishClusterStateStats stats() {
@ -119,6 +149,9 @@ public class PublicationTransportHandler extends AbstractComponent {
void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener);
void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
ActionListener<TransportResponse.Empty> responseActionListener);
}
public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent) {
@ -136,34 +169,78 @@ public class PublicationTransportHandler extends AbstractComponent {
buildDiffAndSerializeStates(clusterChangedEvent.state(), clusterChangedEvent.previousState(),
nodes, sendFullVersion, serializedStates, serializedDiffs);
return (destination, publishRequest, responseActionListener) -> {
assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us";
if (destination.equals(nodes.getLocalNode())) {
// the master needs the original non-serialized state as the cluster state contains some volatile information that we don't
// want to be replicated because it's not usable on another node (e.g. UnassignedInfo.unassignedTimeNanos) or because it's
// mostly just debugging info that would unnecessarily blow up CS updates (I think there was one in snapshot code).
// TODO: look into these and check how to get rid of them
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
// wrap into fake TransportException, as that's what we expect in Publication
responseActionListener.onFailure(new TransportException(e));
}
return new PublicationContext() {
@Override
public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener) {
assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us";
if (destination.equals(nodes.getLocalNode())) {
// the master needs the original non-serialized state as the cluster state contains some volatile information that we
// don't want to be replicated because it's not usable on another node (e.g. UnassignedInfo.unassignedTimeNanos) or
// because it's mostly just debugging info that would unnecessarily blow up CS updates (I think there was one in
// snapshot code).
// TODO: look into these and check how to get rid of them
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
// wrap into fake TransportException, as that's what we expect in Publication
responseActionListener.onFailure(new TransportException(e));
}
@Override
protected void doRun() {
responseActionListener.onResponse(handlePublishRequest.apply(publishRequest));
}
@Override
protected void doRun() {
responseActionListener.onResponse(handlePublishRequest.apply(publishRequest));
}
@Override
public String toString() {
return "publish to self of " + publishRequest;
}
});
} else if (sendFullVersion || !previousState.nodes().nodeExists(destination)) {
sendFullClusterState(newState, serializedStates, destination, responseActionListener);
} else {
sendClusterStateDiff(newState, serializedDiffs, serializedStates, destination, responseActionListener);
@Override
public String toString() {
return "publish to self of " + publishRequest;
}
});
} else if (sendFullVersion || !previousState.nodes().nodeExists(destination)) {
PublicationTransportHandler.this.sendFullClusterState(newState, serializedStates, destination, responseActionListener);
} else {
PublicationTransportHandler.this.sendClusterStateDiff(newState, serializedDiffs, serializedStates, destination,
responseActionListener);
}
}
@Override
public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
ActionListener<TransportResponse.Empty> responseActionListener) {
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build();
final String actionName;
final TransportRequest transportRequest;
if (Coordinator.isZen1Node(destination)) {
actionName = PublishClusterStateAction.COMMIT_ACTION_NAME;
transportRequest = new PublishClusterStateAction.CommitClusterStateRequest(newState.stateUUID());
} else {
actionName = COMMIT_STATE_ACTION_NAME;
transportRequest = applyCommitRequest;
}
transportService.sendRequest(destination, actionName, transportRequest, options,
new TransportResponseHandler<TransportResponse.Empty>() {
@Override
public TransportResponse.Empty read(StreamInput in) {
return TransportResponse.Empty.INSTANCE;
}
@Override
public void handleResponse(TransportResponse.Empty response) {
responseActionListener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
responseActionListener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
});
}
};
}
@ -175,11 +252,19 @@ public class PublicationTransportHandler extends AbstractComponent {
// -> no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
// -> no need to compress, we already compressed the bytes
TransportRequestOptions options = TransportRequestOptions.builder()
final TransportRequestOptions options = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.STATE).withCompress(false).build();
transportService.sendRequest(node, PUBLISH_STATE_ACTION_NAME,
new BytesTransportRequest(bytes, node.getVersion()),
options,
final BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion());
final Consumer<TransportException> transportExceptionHandler = exp -> {
if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage());
sendFullClusterState(clusterState, serializedStates, node, responseActionListener);
} else {
logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", node), exp);
responseActionListener.onFailure(exp);
}
};
final TransportResponseHandler<PublishWithJoinResponse> publishWithJoinResponseHandler =
new TransportResponseHandler<PublishWithJoinResponse>() {
@Override
@ -194,20 +279,27 @@ public class PublicationTransportHandler extends AbstractComponent {
@Override
public void handleException(TransportException exp) {
if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage());
sendFullClusterState(clusterState, serializedStates, node, responseActionListener);
} else {
logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", node), exp);
responseActionListener.onFailure(exp);
}
transportExceptionHandler.accept(exp);
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
});
};
final String actionName;
final TransportResponseHandler<?> transportResponseHandler;
if (Coordinator.isZen1Node(node)) {
actionName = PublishClusterStateAction.SEND_ACTION_NAME;
transportResponseHandler = publishWithJoinResponseHandler.wrap(empty -> new PublishWithJoinResponse(
new PublishResponse(clusterState.term(), clusterState.version()),
Optional.of(new Join(node, transportService.getLocalNode(), clusterState.term(), clusterState.term(),
clusterState.version()))), in -> TransportResponse.Empty.INSTANCE);
} else {
actionName = PUBLISH_STATE_ACTION_NAME;
transportResponseHandler = publishWithJoinResponseHandler;
}
transportService.sendRequest(node, actionName, request, options, transportResponseHandler);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e);
responseActionListener.onFailure(e);
@ -283,7 +375,7 @@ public class PublicationTransportHandler extends AbstractComponent {
return bStream.bytes();
}
private void handleIncomingPublishRequest(BytesTransportRequest request, TransportChannel channel) throws IOException {
private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
final Compressor compressor = CompressorFactory.compressor(request.bytes());
StreamInput in = request.bytes().streamInput();
final ClusterState incomingState;
@ -324,34 +416,6 @@ public class PublicationTransportHandler extends AbstractComponent {
IOUtils.close(in);
}
channel.sendResponse(handlePublishRequest.apply(new PublishRequest(incomingState)));
}
public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
ActionListener<TransportResponse.Empty> responseActionListener) {
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build();
transportService.sendRequest(destination, COMMIT_STATE_ACTION_NAME, applyCommitRequest, options,
new TransportResponseHandler<TransportResponse.Empty>() {
@Override
public TransportResponse.Empty read(StreamInput in) {
return TransportResponse.Empty.INSTANCE;
}
@Override
public void handleResponse(TransportResponse.Empty response) {
responseActionListener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
responseActionListener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
});
return handlePublishRequest.apply(new PublishRequest(incomingState));
}
}

View File

@ -23,6 +23,9 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.PeersResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -34,13 +37,22 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -64,6 +76,8 @@ public abstract class PeerFinder extends AbstractComponent {
Setting.timeSetting("discovery.request_peers_timeout",
TimeValue.timeValueMillis(3000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
private final Settings settings;
private final TimeValue findPeersInterval;
private final TimeValue requestPeersTimeout;
@ -80,6 +94,7 @@ public abstract class PeerFinder extends AbstractComponent {
public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,
ConfiguredHostsResolver configuredHostsResolver) {
this.settings = settings;
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings);
this.transportService = transportService;
@ -89,6 +104,9 @@ public abstract class PeerFinder extends AbstractComponent {
transportService.registerRequestHandler(REQUEST_PEERS_ACTION_NAME, Names.GENERIC, false, false,
PeersRequest::new,
(request, channel, task) -> channel.sendResponse(handlePeersRequest(request)));
transportService.registerRequestHandler(UnicastZenPing.ACTION_NAME, Names.GENERIC, false, false,
UnicastZenPing.UnicastPingRequest::new, new Zen1UnicastPingRequestHandler());
}
public void activate(final DiscoveryNodes lastAcceptedNodes) {
@ -384,50 +402,77 @@ public abstract class PeerFinder extends AbstractComponent {
logger.trace("{} requesting peers", this);
peersRequestInFlight = true;
List<DiscoveryNode> knownNodes = getFoundPeersUnderLock();
final List<DiscoveryNode> knownNodes = getFoundPeersUnderLock();
transportService.sendRequest(discoveryNode, REQUEST_PEERS_ACTION_NAME,
new PeersRequest(getLocalNode(), knownNodes),
TransportRequestOptions.builder().withTimeout(requestPeersTimeout).build(),
new TransportResponseHandler<PeersResponse>() {
final TransportResponseHandler<PeersResponse> peersResponseHandler = new TransportResponseHandler<PeersResponse>() {
@Override
public PeersResponse read(StreamInput in) throws IOException {
return new PeersResponse(in);
}
@Override
public PeersResponse read(StreamInput in) throws IOException {
return new PeersResponse(in);
}
@Override
public void handleResponse(PeersResponse response) {
logger.trace("{} received {}", Peer.this, response);
synchronized (mutex) {
if (active == false) {
return;
}
peersRequestInFlight = false;
response.getMasterNode().map(DiscoveryNode::getAddress).ifPresent(PeerFinder.this::startProbe);
response.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(PeerFinder.this::startProbe);
@Override
public void handleResponse(PeersResponse response) {
logger.trace("{} received {}", Peer.this, response);
synchronized (mutex) {
if (active == false) {
return;
}
if (response.getMasterNode().equals(Optional.of(discoveryNode))) {
// Must not hold lock here to avoid deadlock
assert holdsLock() == false : "PeerFinder mutex is held in error";
onActiveMasterFound(discoveryNode, response.getTerm());
}
}
@Override
public void handleException(TransportException exp) {
peersRequestInFlight = false;
logger.debug(new ParameterizedMessage("{} peers request failed", Peer.this), exp);
response.getMasterNode().map(DiscoveryNode::getAddress).ifPresent(PeerFinder.this::startProbe);
response.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(PeerFinder.this::startProbe);
}
@Override
public String executor() {
return Names.GENERIC;
if (response.getMasterNode().equals(Optional.of(discoveryNode))) {
// Must not hold lock here to avoid deadlock
assert holdsLock() == false : "PeerFinder mutex is held in error";
onActiveMasterFound(discoveryNode, response.getTerm());
}
});
}
@Override
public void handleException(TransportException exp) {
peersRequestInFlight = false;
logger.debug(new ParameterizedMessage("{} peers request failed", Peer.this), exp);
}
@Override
public String executor() {
return Names.GENERIC;
}
};
final String actionName;
final TransportRequest transportRequest;
final TransportResponseHandler<?> transportResponseHandler;
if (Coordinator.isZen1Node(discoveryNode)) {
actionName = UnicastZenPing.ACTION_NAME;
transportRequest = new UnicastZenPing.UnicastPingRequest(1, ZenDiscovery.PING_TIMEOUT_SETTING.get(settings),
new ZenPing.PingResponse(getLocalNode(), null, ClusterName.CLUSTER_NAME_SETTING.get(settings),
ClusterState.UNKNOWN_VERSION));
transportResponseHandler = peersResponseHandler.wrap(ucResponse -> {
Optional<DiscoveryNode> optionalMasterNode = Arrays.stream(ucResponse.pingResponses)
.filter(pr -> discoveryNode.equals(pr.node()) && discoveryNode.equals(pr.master()))
.map(ZenPing.PingResponse::node)
.findFirst();
List<DiscoveryNode> discoveredNodes = new ArrayList<>();
if (optionalMasterNode.isPresent() == false) {
Arrays.stream(ucResponse.pingResponses).map(pr -> pr.master()).filter(Objects::nonNull)
.forEach(discoveredNodes::add);
Arrays.stream(ucResponse.pingResponses).map(pr -> pr.node()).forEach(discoveredNodes::add);
}
return new PeersResponse(optionalMasterNode, discoveredNodes, 0L);
}, UnicastZenPing.UnicastPingResponse::new);
} else {
actionName = REQUEST_PEERS_ACTION_NAME;
transportRequest = new PeersRequest(getLocalNode(), knownNodes);
transportResponseHandler = peersResponseHandler;
}
transportService.sendRequest(discoveryNode, actionName,
transportRequest,
TransportRequestOptions.builder().withTimeout(requestPeersTimeout).build(),
transportResponseHandler);
}
@Override
@ -439,4 +484,20 @@ public abstract class PeerFinder extends AbstractComponent {
'}';
}
}
private class Zen1UnicastPingRequestHandler implements TransportRequestHandler<UnicastZenPing.UnicastPingRequest> {
@Override
public void messageReceived(UnicastZenPing.UnicastPingRequest request, TransportChannel channel, Task task) throws Exception {
final PeersRequest peersRequest = new PeersRequest(request.pingResponse.node(),
Optional.ofNullable(request.pingResponse.master()).map(Collections::singletonList).orElse(Collections.emptyList()));
final PeersResponse peersResponse = handlePeersRequest(peersRequest);
final List<ZenPing.PingResponse> pingResponses = new ArrayList<>();
final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
pingResponses.add(new ZenPing.PingResponse(transportService.getLocalNode(), peersResponse.getMasterNode().orElse(null),
clusterName, 0L));
peersResponse.getKnownPeers().forEach(dn -> pingResponses.add(
new ZenPing.PingResponse(dn, null, clusterName, ClusterState.UNKNOWN_VERSION)));
channel.sendResponse(new UnicastZenPing.UnicastPingResponse(request.id, pingResponses.toArray(new ZenPing.PingResponse[0])));
}
}
}

View File

@ -297,13 +297,13 @@ public class MasterFaultDetection extends FaultDetection {
}
/** Thrown when a ping reaches the wrong node */
static class ThisIsNotTheMasterYouAreLookingForException extends IllegalStateException {
public static class ThisIsNotTheMasterYouAreLookingForException extends IllegalStateException {
ThisIsNotTheMasterYouAreLookingForException(String msg) {
public ThisIsNotTheMasterYouAreLookingForException(String msg) {
super(msg);
}
ThisIsNotTheMasterYouAreLookingForException() {
public ThisIsNotTheMasterYouAreLookingForException() {
}
@Override
@ -397,7 +397,7 @@ public class MasterFaultDetection extends FaultDetection {
public static class MasterPingRequest extends TransportRequest {
private DiscoveryNode sourceNode;
public DiscoveryNode sourceNode;
private DiscoveryNode masterNode;
private ClusterName clusterName;
@ -405,7 +405,7 @@ public class MasterFaultDetection extends FaultDetection {
public MasterPingRequest() {
}
private MasterPingRequest(DiscoveryNode sourceNode, DiscoveryNode masterNode, ClusterName clusterName) {
public MasterPingRequest(DiscoveryNode sourceNode, DiscoveryNode masterNode, ClusterName clusterName) {
this.sourceNode = sourceNode;
this.masterNode = masterNode;
this.clusterName = clusterName;
@ -428,12 +428,12 @@ public class MasterFaultDetection extends FaultDetection {
}
}
private static class MasterPingResponseResponse extends TransportResponse {
public static class MasterPingResponseResponse extends TransportResponse {
private MasterPingResponseResponse() {
public MasterPingResponseResponse() {
}
private MasterPingResponseResponse(StreamInput in) throws IOException {
public MasterPingResponseResponse(StreamInput in) throws IOException {
super(in);
}
}

View File

@ -102,12 +102,12 @@ public class MembershipAction extends AbstractComponent {
public static class JoinRequest extends TransportRequest {
DiscoveryNode node;
public DiscoveryNode node;
public JoinRequest() {
}
private JoinRequest(DiscoveryNode node) {
public JoinRequest(DiscoveryNode node) {
this.node = node;
}
@ -152,10 +152,10 @@ public class MembershipAction extends AbstractComponent {
}
}
static class ValidateJoinRequest extends TransportRequest {
public static class ValidateJoinRequest extends TransportRequest {
private ClusterState state;
ValidateJoinRequest() {}
public ValidateJoinRequest() {}
ValidateJoinRequest(ClusterState state) {
this.state = state;

View File

@ -312,7 +312,7 @@ public class NodesFaultDetection extends FaultDetection {
public PingRequest() {
}
PingRequest(DiscoveryNode targetNode, ClusterName clusterName, DiscoveryNode masterNode, long clusterStateVersion) {
public PingRequest(DiscoveryNode targetNode, ClusterName clusterName, DiscoveryNode masterNode, long clusterStateVersion) {
this.targetNode = targetNode;
this.clusterName = clusterName;
this.masterNode = masterNode;
@ -354,12 +354,12 @@ public class NodesFaultDetection extends FaultDetection {
}
}
private static class PingResponse extends TransportResponse {
public static class PingResponse extends TransportResponse {
private PingResponse() {
public PingResponse() {
}
private PingResponse(StreamInput in) throws IOException {
public PingResponse(StreamInput in) throws IOException {
super(in);
}
}

View File

@ -458,9 +458,9 @@ public class PublishClusterStateAction extends AbstractComponent {
}
}
protected static class CommitClusterStateRequest extends TransportRequest {
public static class CommitClusterStateRequest extends TransportRequest {
String stateUUID;
public String stateUUID;
public CommitClusterStateRequest() {
}

View File

@ -585,19 +585,19 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
}
static class UnicastPingRequest extends TransportRequest {
public static class UnicastPingRequest extends TransportRequest {
final int id;
final TimeValue timeout;
final PingResponse pingResponse;
public final int id;
public final TimeValue timeout;
public final PingResponse pingResponse;
UnicastPingRequest(int id, TimeValue timeout, PingResponse pingResponse) {
public UnicastPingRequest(int id, TimeValue timeout, PingResponse pingResponse) {
this.id = id;
this.timeout = timeout;
this.pingResponse = pingResponse;
}
UnicastPingRequest(StreamInput in) throws IOException {
public UnicastPingRequest(StreamInput in) throws IOException {
super(in);
id = in.readInt();
timeout = in.readTimeValue();
@ -623,18 +623,18 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
return new PingResponse(discoNodes.getLocalNode(), discoNodes.getMasterNode(), clusterState);
}
static class UnicastPingResponse extends TransportResponse {
public static class UnicastPingResponse extends TransportResponse {
final int id;
final PingResponse[] pingResponses;
public final PingResponse[] pingResponses;
UnicastPingResponse(int id, PingResponse[] pingResponses) {
public UnicastPingResponse(int id, PingResponse[] pingResponses) {
this.id = id;
this.pingResponses = pingResponses;
}
UnicastPingResponse(StreamInput in) throws IOException {
public UnicastPingResponse(StreamInput in) throws IOException {
id = in.readInt();
pingResponses = new PingResponse[in.readVInt()];
for (int i = 0; i < pingResponses.length; i++) {

View File

@ -67,7 +67,7 @@ public interface ZenPing extends Releasable {
* @param clusterStateVersion the current cluster state version of that node
* ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered)
*/
PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) {
public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) {
this.id = idGenerator.incrementAndGet();
this.node = node;
this.master = master;

View File

@ -19,8 +19,12 @@
package org.elasticsearch.transport;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
import java.util.function.Function;
public interface TransportResponseHandler<T extends TransportResponse> extends Writeable.Reader<T> {
void handleResponse(T response);
@ -28,4 +32,29 @@ public interface TransportResponseHandler<T extends TransportResponse> extends W
void handleException(TransportException exp);
String executor();
default <Q extends TransportResponse> TransportResponseHandler<Q> wrap(Function<Q, T> converter, Writeable.Reader<Q> reader) {
final TransportResponseHandler<T> self = this;
return new TransportResponseHandler<Q>() {
@Override
public void handleResponse(Q response) {
self.handleResponse(converter.apply(response));
}
@Override
public void handleException(TransportException exp) {
self.handleException(exp);
}
@Override
public String executor() {
return self.executor();
}
@Override
public Q read(StreamInput in) throws IOException {
return reader.read(in);
}
};
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.discovery.TestZenDiscovery;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class Zen1IT extends ESIntegTestCase {
private static Settings ZEN1_SETTINGS = Coordinator.addZen1Attribute(Settings.builder()
.put(TestZenDiscovery.USE_ZEN2.getKey(), false)
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false)) // Zen2 does not know about mock pings
.build();
private static Settings ZEN2_SETTINGS = Settings.builder()
.put(TestZenDiscovery.USE_ZEN2.getKey(), true)
.build();
public void testZen2NodesJoiningZen1Cluster() {
internalCluster().startNodes(randomIntBetween(1, 3), ZEN1_SETTINGS);
internalCluster().startNodes(randomIntBetween(1, 3), ZEN2_SETTINGS);
createIndex("test");
}
public void testZen1NodesJoiningZen2Cluster() {
internalCluster().startNodes(randomIntBetween(1, 3), ZEN2_SETTINGS);
internalCluster().startNodes(randomIntBetween(1, 3), ZEN1_SETTINGS);
createIndex("test");
}
}