diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 4720bb087dc..64621999cf5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -195,6 +195,7 @@ public class ClusterModule extends AbstractModule { registerClusterDynamicSetting(DestructiveOperations.REQUIRES_NAME, Validator.EMPTY); registerClusterDynamicSetting(DiscoverySettings.PUBLISH_TIMEOUT, Validator.TIME_NON_NEGATIVE); registerClusterDynamicSetting(DiscoverySettings.PUBLISH_DIFF_ENABLE, Validator.BOOLEAN); + registerClusterDynamicSetting(DiscoverySettings.COMMIT_TIMEOUT, Validator.TIME_NON_NEGATIVE); registerClusterDynamicSetting(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE); registerClusterDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE); registerClusterDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE); diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java index 2bae50771ae..5b05f199dfa 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -256,7 +256,7 @@ public class ClusterState implements ToXContent, Diffable { } // Used for testing and logging to determine how this cluster state was send over the wire - boolean wasReadFromDiff() { + public boolean wasReadFromDiff() { return wasReadFromDiff; } diff --git a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index b992c3612ee..76262a381b4 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -482,8 +482,14 @@ public class InternalClusterService extends AbstractLifecycleComponent implemen try { publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener); } catch (PublishClusterStateAction.FailedToCommitException t) { - logger.warn("failed to publish [{}] (not enough nodes acknowledged, min master nodes [{}])", clusterChangedEvent.state().version(), electMaster.minimumMasterNodes()); + // cluster service logs a WARN message + logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])", clusterChangedEvent.state().version(), electMaster.minimumMasterNodes()); clusterService.submitStateUpdateTask("zen-disco-failed-to-publish", Priority.IMMEDIATE, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -856,7 +856,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen * If the second condition fails we ignore the cluster state. */ static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) { - rejectNewClusterStateIfNeeded(logger, currentState.nodes(), newClusterState); + validateStateIsFromCurrentMaster(logger, currentState.nodes(), newClusterState); if (currentState.nodes().masterNodeId() != null && newClusterState.version() < currentState.version()) { // if the new state has a smaller version, and it has the same master node, then no need to process it logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version()); @@ -871,7 +871,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen * This method checks for this and throws an exception if needed */ - public static void rejectNewClusterStateIfNeeded(ESLogger logger, DiscoveryNodes currentNodes, ClusterState newClusterState) { + public static void validateStateIsFromCurrentMaster(ESLogger logger, DiscoveryNodes currentNodes, ClusterState newClusterState) { if (currentNodes.masterNodeId() == null) { return; } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 050805bcf5a..1100eeb7680 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; @@ -96,13 +97,11 @@ public class PublishClusterStateAction extends AbstractComponent { } public void publish(ClusterChangedEvent clusterChangedEvent, int minMasterNodes, final Discovery.AckListener ackListener) { - Set nodesToPublishTo = new HashSet<>(clusterChangedEvent.state().nodes().size()); - DiscoveryNode localNode = nodesProvider.nodes().localNode(); - int totalMasterNodes = 0; - for (final DiscoveryNode node : clusterChangedEvent.state().nodes()) { - if (node.isMasterNode()) { - totalMasterNodes++; - } + final DiscoveryNodes nodes = clusterChangedEvent.state().nodes(); + Set nodesToPublishTo = new HashSet<>(nodes.size()); + DiscoveryNode localNode = nodes.localNode(); + final int totalMasterNodes = nodes.masterNodes().size(); + for (final DiscoveryNode node : nodes) { if (node.equals(localNode) == false) { nodesToPublishTo.add(node); } @@ -118,24 +117,24 @@ public class PublishClusterStateAction extends AbstractComponent { final ClusterState clusterState = clusterChangedEvent.state(); final ClusterState previousState = clusterChangedEvent.previousState(); - final AtomicBoolean timedOutWaitingForNodes = new AtomicBoolean(false); final TimeValue publishTimeout = discoverySettings.getPublishTimeout(); final boolean sendFullVersion = !discoverySettings.getPublishDiff() || previousState == null; final SendingController sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes, totalMasterNodes, publishResponseHandler); - Diff diff = null; + + // we build these early as a best effort not to commit in the case of error. + // sadly this is not water tight as it may that a failed diff based publishing to a node + // will cause a full serialization based on an older version, which may fail after the + // change has been committed. + buildDiffAndSerializeStates(clusterState, previousState, nodesToPublishTo, sendFullVersion, serializedStates, serializedDiffs); for (final DiscoveryNode node : nodesToPublishTo) { - // try and serialize the cluster state once (or per version), so we don't serialize it // per node when we send it over the wire, compress it while we are at it... // we don't send full version if node didn't exist in the previous version of cluster state if (sendFullVersion || !previousState.nodes().nodeExists(node.id())) { - sendFullClusterState(clusterState, serializedStates, node, timedOutWaitingForNodes, publishTimeout, sendingController); + sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController); } else { - if (diff == null) { - diff = clusterState.diff(previousState); - } - sendClusterStateDiff(clusterState, diff, serializedDiffs, node, timedOutWaitingForNodes, publishTimeout, sendingController); + sendClusterStateDiff(clusterState, serializedDiffs, serializedStates, node, publishTimeout, sendingController); } } @@ -144,8 +143,8 @@ public class PublishClusterStateAction extends AbstractComponent { if (publishTimeout.millis() > 0) { // only wait if the publish timeout is configured... try { - timedOutWaitingForNodes.set(!publishResponseHandler.awaitAllNodes(publishTimeout)); - if (timedOutWaitingForNodes.get()) { + sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(publishTimeout)); + if (sendingController.getPublishingTimedOut()) { DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes(); // everyone may have just responded if (pendingNodes.length > 0) { @@ -159,13 +158,34 @@ public class PublishClusterStateAction extends AbstractComponent { } } - private void sendFullClusterState(ClusterState clusterState, @Nullable Map serializedStates, - DiscoveryNode node, AtomicBoolean timedOutWaitingForNodes, TimeValue publishTimeout, - SendingController sendingController) { - BytesReference bytes = null; - if (serializedStates != null) { - bytes = serializedStates.get(node.version()); + private void buildDiffAndSerializeStates(ClusterState clusterState, ClusterState previousState, Set nodesToPublishTo, + boolean sendFullVersion, Map serializedStates, Map serializedDiffs) { + Diff diff = null; + for (final DiscoveryNode node : nodesToPublishTo) { + try { + if (sendFullVersion || !previousState.nodes().nodeExists(node.id())) { + // will send a full reference + if (serializedStates.containsKey(node.version()) == false) { + serializedStates.put(node.version(), serializeFullClusterState(clusterState, node.version())); + } + } else { + // will send a diff + if (diff == null) { + diff = clusterState.diff(previousState); + } + if (serializedDiffs.containsKey(node.version()) == false) { + serializedDiffs.put(node.version(), serializeDiffClusterState(diff, node.version())); + } + } + } catch (IOException e) { + throw new ElasticsearchException("failed to serialize cluster_state for publishing to node {}", e, node); + } } + } + + private void sendFullClusterState(ClusterState clusterState, @Nullable Map serializedStates, + DiscoveryNode node, TimeValue publishTimeout, SendingController sendingController) { + BytesReference bytes = serializedStates.get(node.version()); if (bytes == null) { try { bytes = serializeFullClusterState(clusterState, node.version()); @@ -178,31 +198,22 @@ public class PublishClusterStateAction extends AbstractComponent { return; } } - sendClusterStateToNode(clusterState, bytes, node, timedOutWaitingForNodes, publishTimeout, sendingController, false); + sendClusterStateToNode(clusterState, bytes, node, publishTimeout, sendingController, false, serializedStates); } - private void sendClusterStateDiff(ClusterState clusterState, Diff diff, Map serializedDiffs, DiscoveryNode node, - AtomicBoolean timedOutWaitingForNodes, TimeValue publishTimeout, - SendingController sendingController) { + private void sendClusterStateDiff(ClusterState clusterState, + Map serializedDiffs, Map serializedStates, + DiscoveryNode node, TimeValue publishTimeout, SendingController sendingController) { BytesReference bytes = serializedDiffs.get(node.version()); - if (bytes == null) { - try { - bytes = serializeDiffClusterState(diff, node.version()); - serializedDiffs.put(node.version(), bytes); - } catch (Throwable e) { - logger.warn("failed to serialize diff of cluster_state before publishing it to node {}", e, node); - sendingController.onNodeSendFailed(node, e); - return; - } - } - sendClusterStateToNode(clusterState, bytes, node, timedOutWaitingForNodes, publishTimeout, sendingController, true); + assert bytes != null : "failed to find serialized diff for node " + node + " of version [" + node.version() + "]"; + sendClusterStateToNode(clusterState, bytes, node, publishTimeout, sendingController, true, serializedStates); } private void sendClusterStateToNode(final ClusterState clusterState, BytesReference bytes, - final DiscoveryNode node, final AtomicBoolean timedOutWaitingForNodes, + final DiscoveryNode node, final TimeValue publishTimeout, final SendingController sendingController, - final boolean sendDiffs) { + final boolean sendDiffs, final Map serializedStates) { try { TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false); // no need to put a timeout on the options here, because we want the response to eventually be received @@ -215,7 +226,7 @@ public class PublishClusterStateAction extends AbstractComponent { @Override public void handleResponse(TransportResponse.Empty response) { - if (timedOutWaitingForNodes.get()) { + if (sendingController.getPublishingTimedOut()) { logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node, clusterState.version(), publishTimeout); } sendingController.onNodeSendAck(node); @@ -225,7 +236,7 @@ public class PublishClusterStateAction extends AbstractComponent { public void handleException(TransportException exp) { if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) { logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage()); - sendFullClusterState(clusterState, null, node, timedOutWaitingForNodes, publishTimeout, sendingController); + sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController); } else { logger.debug("failed to send cluster state to {}", exp, node); sendingController.onNodeSendFailed(node, exp); @@ -238,7 +249,7 @@ public class PublishClusterStateAction extends AbstractComponent { } } - private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final BlockingClusterStatePublishResponseHandler publishResponseHandler) { + private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) { try { logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]", clusterState.stateUUID(), clusterState.version(), node); TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false); @@ -252,21 +263,21 @@ public class PublishClusterStateAction extends AbstractComponent { @Override public void handleResponse(TransportResponse.Empty response) { -// if (timedOutWaitingForNodes.get()) { - logger.debug("node {} responded to cluster state commit [{}]", node, clusterState.version()); -// } - publishResponseHandler.onResponse(node); + if (sendingController.getPublishingTimedOut()) { + logger.debug("node {} responded to cluster state commit [{}]", node, clusterState.version()); + } + sendingController.getPublishResponseHandler().onResponse(node); } @Override public void handleException(TransportException exp) { logger.debug("failed to commit cluster state (uuid [{}], version [{}]) to {}", exp, clusterState.stateUUID(), clusterState.version(), node); - publishResponseHandler.onFailure(node, exp); + sendingController.getPublishResponseHandler().onFailure(node, exp); } }); } catch (Throwable t) { logger.warn("error sending cluster state commit (uuid [{}], version [{}]) to {}", t, clusterState.stateUUID(), clusterState.version(), node); - publishResponseHandler.onFailure(node, t); + sendingController.getPublishResponseHandler().onFailure(node, t); } } @@ -294,99 +305,116 @@ public class PublishClusterStateAction extends AbstractComponent { private Object lastSeenClusterStateMutex = new Object(); private ClusterState lastSeenClusterState; + protected void handleIncomingClusterStateRequest(BytesTransportRequest request, TransportChannel channel) throws IOException { + Compressor compressor = CompressorFactory.compressor(request.bytes()); + StreamInput in; + if (compressor != null) { + in = compressor.streamInput(request.bytes().streamInput()); + } else { + in = request.bytes().streamInput(); + } + in.setVersion(request.version()); + synchronized (lastSeenClusterStateMutex) { + final ClusterState incomingState; + // If true we received full cluster state - otherwise diffs + if (in.readBoolean()) { + incomingState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); + logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); + } else if (lastSeenClusterState != null) { + Diff diff = lastSeenClusterState.readDiffFrom(in); + incomingState = diff.apply(lastSeenClusterState); + logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", incomingState.version(), incomingState.stateUUID(), request.bytes().length()); + } else { + logger.debug("received diff for but don't have any local cluster state - requesting full state"); + throw new IncompatibleClusterStateVersionException("have no local cluster state"); + } + // sanity check incoming state + validateIncomingState(incomingState); + + lastSeenClusterState = incomingState; + lastSeenClusterState.status(ClusterState.ClusterStateStatus.RECEIVED); + } + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + + // package private for testing + + /** + * does simple sanity check of the incoming cluster state. Throws an exception on rejections. + */ + void validateIncomingState(ClusterState state) { + final ClusterName incomingClusterName = state.getClusterName(); + if (!incomingClusterName.equals(PublishClusterStateAction.this.clusterName)) { + logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", state.nodes().masterNode(), incomingClusterName); + throw new IllegalStateException("received state from a node that is not part of the cluster"); + } + final DiscoveryNodes currentNodes = nodesProvider.nodes(); + + if (currentNodes.localNode().equals(state.nodes().localNode()) == false) { + logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", state.nodes().masterNode()); + throw new IllegalStateException("received state from a node that is not part of the cluster"); + } + // state from another master requires more subtle checks, so we let it pass for now (it will be checked in ZenDiscovery) + if (currentNodes.localNodeMaster() == false) { + ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, state); + } + } + + protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) { + ClusterState committedClusterState; + synchronized (lastSeenClusterStateMutex) { + committedClusterState = lastSeenClusterState; + } + + // if this message somehow comes without a previous send, we won't have a cluster state + String lastSeenUUID = committedClusterState == null ? null : committedClusterState.stateUUID(); + if (request.stateUUID.equals(lastSeenUUID) == false) { + throw new IllegalStateException("tried to commit cluster state UUID [" + request.stateUUID + "], but last seen UUID is [" + lastSeenUUID + "]"); + } + + try { + listener.onNewClusterState(committedClusterState, new NewClusterStateListener.NewStateProcessed() { + @Override + public void onNewClusterStateProcessed() { + try { + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } catch (Throwable e) { + logger.debug("failed to send response on cluster state processed", e); + onNewClusterStateFailed(e); + } + } + + @Override + public void onNewClusterStateFailed(Throwable t) { + try { + channel.sendResponse(t); + } catch (Throwable e) { + logger.debug("failed to send response on cluster state processed", e); + } + } + }); + } catch (Exception e) { + logger.warn("unexpected error while processing cluster state version [{}]", e, lastSeenClusterState.version()); + throw e; + } + } + private class SendClusterStateRequestHandler implements TransportRequestHandler { @Override public void messageReceived(BytesTransportRequest request, final TransportChannel channel) throws Exception { - Compressor compressor = CompressorFactory.compressor(request.bytes()); - StreamInput in; - if (compressor != null) { - in = compressor.streamInput(request.bytes().streamInput()); - } else { - in = request.bytes().streamInput(); - } - in.setVersion(request.version()); - synchronized (lastSeenClusterStateMutex) { - final ClusterState incomingState; - // If true we received full cluster state - otherwise diffs - if (in.readBoolean()) { - incomingState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); - logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); - } else if (lastSeenClusterState != null) { - Diff diff = lastSeenClusterState.readDiffFrom(in); - incomingState = diff.apply(lastSeenClusterState); - logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", incomingState.version(), incomingState.stateUUID(), request.bytes().length()); - } else { - logger.debug("received diff for but don't have any local cluster state - requesting full state"); - throw new IncompatibleClusterStateVersionException("have no local cluster state"); - } - // sanity check incoming state - final ClusterName incomingClusterName = incomingState.getClusterName(); - if (!incomingClusterName.equals(PublishClusterStateAction.this.clusterName)) { - logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().masterNode(), incomingClusterName); - throw new IllegalStateException("received state from a node that is not part of the cluster"); - } - if (incomingState.nodes().localNode() == null) { - logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().masterNode()); - throw new IllegalStateException("received state from a node that is not part of the cluster"); - } - // state from another master requires more subtle checks, so we let it pass for now (it will be checked in ZenDiscovery) - if (nodesProvider.nodes().localNodeMaster() == false) { - ZenDiscovery.rejectNewClusterStateIfNeeded(logger, nodesProvider.nodes(), incomingState); - } - - lastSeenClusterState = incomingState; - lastSeenClusterState.status(ClusterState.ClusterStateStatus.RECEIVED); - } - channel.sendResponse(TransportResponse.Empty.INSTANCE); + handleIncomingClusterStateRequest(request, channel); } } private class CommitClusterStateRequestHandler implements TransportRequestHandler { @Override public void messageReceived(CommitClusterStateRequest request, final TransportChannel channel) throws Exception { - ClusterState committedClusterState; - synchronized (lastSeenClusterStateMutex) { - committedClusterState = lastSeenClusterState; - } - if (committedClusterState.stateUUID().equals(request.stateUUID) == false) { - // nocommit: we need something better here - channel.sendResponse(TransportResponse.Empty.INSTANCE); - return; - } - - try { - listener.onNewClusterState(committedClusterState, new NewClusterStateListener.NewStateProcessed() { - @Override - public void onNewClusterStateProcessed() { - try { - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } catch (Throwable e) { - logger.debug("failed to send response on cluster state processed", e); - } - } - - @Override - public void onNewClusterStateFailed(Throwable t) { - try { - channel.sendResponse(t); - } catch (Throwable e) { - logger.debug("failed to send response on cluster state processed", e); - } - } - }); - } catch (Exception e) { - logger.warn("unexpected error while processing cluster state version [{}]", e, lastSeenClusterState.version()); - try { - channel.sendResponse(e); - } catch (Throwable e1) { - logger.debug("failed to send response on cluster state processed", e1); - } - } + handleCommitRequest(request, channel); } } - static class CommitClusterStateRequest extends TransportRequest { + protected static class CommitClusterStateRequest extends TransportRequest { String stateUUID; @@ -413,14 +441,19 @@ public class PublishClusterStateAction extends AbstractComponent { public class FailedToCommitException extends ElasticsearchException { - public FailedToCommitException(String msg) { - super(msg); + public FailedToCommitException(String msg, Object... args) { + super(msg, args); } } class SendingController { private final ClusterState clusterState; + + public BlockingClusterStatePublishResponseHandler getPublishResponseHandler() { + return publishResponseHandler; + } + private final BlockingClusterStatePublishResponseHandler publishResponseHandler; volatile int neededMastersToCommit; int pendingMasterNodes; @@ -428,23 +461,31 @@ public class PublishClusterStateAction extends AbstractComponent { final CountDownLatch comittedOrFailed; final AtomicBoolean committed; + // an external marker to note that the publishing process is timed out. This is usefull for proper logging. + final AtomicBoolean publishingTimedOut = new AtomicBoolean(); + private SendingController(ClusterState clusterState, int minMasterNodes, int totalMasterNodes, BlockingClusterStatePublishResponseHandler publishResponseHandler) { this.clusterState = clusterState; this.publishResponseHandler = publishResponseHandler; this.neededMastersToCommit = Math.max(0, minMasterNodes - 1); // we are one of the master nodes this.pendingMasterNodes = totalMasterNodes - 1; + if (this.neededMastersToCommit > this.pendingMasterNodes) { + throw new FailedToCommitException("not enough masters to ack sent cluster state. [{}] needed , have [{}]", neededMastersToCommit, pendingMasterNodes); + } this.committed = new AtomicBoolean(neededMastersToCommit == 0); this.comittedOrFailed = new CountDownLatch(committed.get() ? 0 : 1); } public void waitForCommit(TimeValue commitTimeout) { + boolean timedout = false; try { - comittedOrFailed.await(commitTimeout.millis(), TimeUnit.MILLISECONDS); + timedout = comittedOrFailed.await(commitTimeout.millis(), TimeUnit.MILLISECONDS) == false; } catch (InterruptedException e) { } if (committed.get() == false) { - throw new FailedToCommitException("failed to get enough masters to ack sent cluster state. [" + neededMastersToCommit + "] left"); + throw new FailedToCommitException("{} enough masters to ack sent cluster state. [{}] left", + timedout ? "timed out while waiting for" : "failed to get", neededMastersToCommit); } } @@ -456,17 +497,19 @@ public class PublishClusterStateAction extends AbstractComponent { } } else { assert sendAckedBeforeCommit.isEmpty(); - sendCommitToNode(node, clusterState, publishResponseHandler); + sendCommitToNode(node, clusterState, this); } } private void onMasterNodeSendAck(DiscoveryNode node) { + logger.trace("master node {} acked cluster state version [{}]. processing ... (current pending [{}], needed [{}])", + node, clusterState.version(), pendingMasterNodes, neededMastersToCommit); neededMastersToCommit--; if (neededMastersToCommit == 0) { logger.trace("committing version [{}]", clusterState.version()); for (DiscoveryNode nodeToCommit : sendAckedBeforeCommit) { - sendCommitToNode(nodeToCommit, clusterState, publishResponseHandler); + sendCommitToNode(nodeToCommit, clusterState, this); } sendAckedBeforeCommit.clear(); boolean success = committed.compareAndSet(false, true); @@ -478,17 +521,28 @@ public class PublishClusterStateAction extends AbstractComponent { private void onMasterNodeDone(DiscoveryNode node) { pendingMasterNodes--; - if (pendingMasterNodes == 0) { + if (pendingMasterNodes == 0 && neededMastersToCommit > 0) { + logger.trace("failed to commit version [{}]. All master nodes acked or failed but [{}] acks are still needed", + clusterState.version(), neededMastersToCommit); comittedOrFailed.countDown(); } } synchronized public void onNodeSendFailed(DiscoveryNode node, Throwable t) { if (node.isMasterNode()) { + logger.trace("master node {} failed to ack cluster state version [{}]. processing ... (current pending [{}], needed [{}])", + node, clusterState.version(), pendingMasterNodes, neededMastersToCommit); onMasterNodeDone(node); } publishResponseHandler.onFailure(node, t); } + public boolean getPublishingTimedOut() { + return publishingTimedOut.get(); + } + + public void setPublishingTimedOut(boolean isTimedOut) { + publishingTimedOut.set(isTimedOut); + } } } \ No newline at end of file diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index a00679d05be..93238b70477 100644 --- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -23,24 +23,37 @@ import com.google.common.base.Predicate; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.fd.FaultDetection; +import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.disruption.NetworkDelaysPartition; +import org.elasticsearch.test.disruption.NetworkUnresponsivePartition; import org.elasticsearch.test.junit.annotations.TestLogging; import org.junit.Test; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.test.ESIntegTestCase.Scope; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; import static org.hamcrest.Matchers.*; @ClusterScope(scope = Scope.TEST, numDataNodes = 0) @@ -332,4 +345,69 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { logger.info("--> verifying no node left and master is up"); assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(nodeCount)).get().isTimedOut()); } + + public void testCanNotPublishWithoutMinMastNodes() throws Exception { + Settings settings = settingsBuilder() + .put("discovery.type", "zen") + .put(FaultDetection.SETTING_PING_TIMEOUT, "1h") // disable it + .put(ZenDiscovery.SETTING_PING_TIMEOUT, "200ms") + .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2) + .put(DiscoverySettings.COMMIT_TIMEOUT, "100ms") // speed things up + .build(); + internalCluster().startNodesAsync(3, settings).get(); + + final String master = internalCluster().getMasterName(); + Set otherNodes = new HashSet<>(Arrays.asList(internalCluster().getNodeNames())); + otherNodes.remove(master); + NetworkDelaysPartition partition = new NetworkDelaysPartition(Collections.singleton(master), otherNodes, 60000, random()); + internalCluster().setDisruptionScheme(partition); + partition.startDisrupting(); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference failure = new AtomicReference<>(); + logger.debug("--> submitting for cluster state to be rejected"); + final ClusterService masterClusterService = internalCluster().clusterService(master); + masterClusterService.submitStateUpdateTask("test", new ProcessedClusterStateUpdateTask() { + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + MetaData.Builder metaData = MetaData.builder(currentState.metaData()).persistentSettings( + Settings.builder().put(currentState.metaData().persistentSettings()).put("_SHOULD_NOT_BE_THERE_", true).build() + ); + return ClusterState.builder(currentState).metaData(metaData).build(); + } + + @Override + public void onFailure(String source, Throwable t) { + failure.set(t); + latch.countDown(); + } + }); + + logger.debug("--> waiting for cluster state to be processed/rejected"); + latch.await(); + + assertThat(failure.get(), instanceOf(PublishClusterStateAction.FailedToCommitException.class)); + assertBusy(new Runnable() { + @Override + public void run() { + assertThat(masterClusterService.state().nodes().masterNode(), nullValue()); + } + }); + + partition.stopDisrupting(); + + logger.debug("--> waiting for cluster to heal"); + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes("3").setWaitForEvents(Priority.LANGUID)); + + for (String node : internalCluster().getNodeNames()) { + Settings nodeSetting = internalCluster().clusterService(node).state().metaData().settings(); + assertThat(node + " processed the cluster state despite of a min master node violation", nodeSetting.get("_SHOULD_NOT_BE_THERE_"), nullValue()); + } + + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java similarity index 53% rename from core/src/test/java/org/elasticsearch/cluster/PublishClusterStateActionTests.java rename to core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java index a395ebdd3d4..80224f055a1 100644 --- a/core/src/test/java/org/elasticsearch/cluster/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java @@ -17,16 +17,20 @@ * under the License. */ -package org.elasticsearch.cluster; +package org.elasticsearch.discovery.zen.publish; -import com.google.common.collect.ImmutableMap; +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import com.google.common.collect.Maps; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -36,51 +40,51 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; -import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportConnectionListener; -import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.*; import org.elasticsearch.transport.local.LocalTransport; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static com.google.common.collect.Maps.newHashMap; import static org.hamcrest.Matchers.*; +@TestLogging("discovery.zen.publish:TRACE") public class PublishClusterStateActionTests extends ESTestCase { protected ThreadPool threadPool; protected Map nodes = newHashMap(); - public static class MockNode implements PublishClusterStateAction.NewClusterStateListener { + public static class MockNode implements PublishClusterStateAction.NewClusterStateListener, DiscoveryNodesProvider { public final DiscoveryNode discoveryNode; public final MockTransportService service; - public PublishClusterStateAction action; - public final MockDiscoveryNodesProvider nodesProvider; + public MockPublishAction action; public final ClusterStateListener listener; public volatile ClusterState clusterState; private final ESLogger logger; - public MockNode(DiscoveryNode discoveryNode, MockTransportService service, MockDiscoveryNodesProvider nodesProvider, @Nullable ClusterStateListener listener, ESLogger logger) { + public MockNode(DiscoveryNode discoveryNode, MockTransportService service, @Nullable ClusterStateListener listener, ESLogger logger) { this.discoveryNode = discoveryNode; this.service = service; - this.nodesProvider = nodesProvider; this.listener = listener; this.logger = logger; this.clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(DiscoveryNodes.builder().put(discoveryNode).localNodeId(discoveryNode.id()).build()).build(); @@ -88,7 +92,6 @@ public class PublishClusterStateActionTests extends ESTestCase { public void connectTo(DiscoveryNode node) { service.connectToNode(node); - nodesProvider.addNode(node); } @Override @@ -101,6 +104,25 @@ public class PublishClusterStateActionTests extends ESTestCase { clusterState = newClusterState; newStateProcessed.onNewClusterStateProcessed(); } + + @Override + public DiscoveryNodes nodes() { + return clusterState.nodes(); + } + + @Override + public NodeService nodeService() { + assert false; + throw new UnsupportedOperationException("Shouldn't be here"); + } + } + + public MockNode createMockNode(final String name) throws Exception { + return createMockNode(name, Settings.EMPTY, Version.CURRENT); + } + + public MockNode createMockNode(String name, Settings settings) throws Exception { + return createMockNode(name, settings, Version.CURRENT); } public MockNode createMockNode(final String name, Settings settings, Version version) throws Exception { @@ -108,15 +130,17 @@ public class PublishClusterStateActionTests extends ESTestCase { } public MockNode createMockNode(String name, Settings settings, Version version, @Nullable ClusterStateListener listener) throws Exception { - MockTransportService service = buildTransportService( - Settings.builder().put(settings).put("name", name, TransportService.SETTING_TRACE_LOG_INCLUDE, "", TransportService.SETTING_TRACE_LOG_EXCLUDE, "NOTHING").build(), - version - ); - DiscoveryNode discoveryNode = new DiscoveryNode(name, name, service.boundAddress().publishAddress(), ImmutableMap.of(), version); - MockDiscoveryNodesProvider nodesProvider = new MockDiscoveryNodesProvider(discoveryNode); - MockNode node = new MockNode(discoveryNode, service, nodesProvider, listener, logger); - nodesProvider.addNode(discoveryNode); - node.action = buildPublishClusterStateAction(settings, service, nodesProvider, node); + settings = Settings.builder() + .put("name", name) + .put(TransportService.SETTING_TRACE_LOG_INCLUDE, "", TransportService.SETTING_TRACE_LOG_EXCLUDE, "NOTHING") + .put(settings) + .build(); + + MockTransportService service = buildTransportService(settings, version); + DiscoveryNode discoveryNode = new DiscoveryNode(name, name, service.boundAddress().publishAddress(), + Maps.newHashMap(settings.getByPrefix("node.").getAsMap()), version); + MockNode node = new MockNode(discoveryNode, service, listener, logger); + node.action = buildPublishClusterStateAction(settings, service, node, node); final CountDownLatch latch = new CountDownLatch(nodes.size() * 2 + 1); TransportConnectionListener waitForConnection = new TransportConnectionListener() { @Override @@ -187,40 +211,13 @@ public class PublishClusterStateActionTests extends ESTestCase { return transportService; } - protected PublishClusterStateAction buildPublishClusterStateAction(Settings settings, MockTransportService transportService, MockDiscoveryNodesProvider nodesProvider, - PublishClusterStateAction.NewClusterStateListener listener) { + protected MockPublishAction buildPublishClusterStateAction(Settings settings, MockTransportService transportService, DiscoveryNodesProvider nodesProvider, + PublishClusterStateAction.NewClusterStateListener listener) { DiscoverySettings discoverySettings = new DiscoverySettings(settings, new NodeSettingsService(settings)); - return new PublishClusterStateAction(settings, transportService, nodesProvider, listener, discoverySettings, ClusterName.DEFAULT); + return new MockPublishAction(settings, transportService, nodesProvider, listener, discoverySettings, ClusterName.DEFAULT); } - - static class MockDiscoveryNodesProvider implements DiscoveryNodesProvider { - - private DiscoveryNodes discoveryNodes = DiscoveryNodes.EMPTY_NODES; - - public MockDiscoveryNodesProvider(DiscoveryNode localNode) { - discoveryNodes = DiscoveryNodes.builder().put(localNode).localNodeId(localNode.id()).build(); - } - - public void addNode(DiscoveryNode node) { - discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(node).build(); - } - - @Override - public DiscoveryNodes nodes() { - return discoveryNodes; - } - - @Override - public NodeService nodeService() { - assert false; - throw new UnsupportedOperationException("Shouldn't be here"); - } - } - - @Test - @TestLogging("cluster:DEBUG,discovery.zen.publish:DEBUG") public void testSimpleClusterStatePublishing() throws Exception { MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT); MockNode nodeB = createMockNode("nodeB", Settings.EMPTY, Version.CURRENT); @@ -233,20 +230,20 @@ public class PublishClusterStateActionTests extends ESTestCase { discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(nodeB.discoveryNode).build(); ClusterState previousClusterState = clusterState; clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build(); - publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState); + publishStateAndWait(nodeA.action, clusterState, previousClusterState); assertSameStateFromFull(nodeB.clusterState, clusterState); // cluster state update - add block previousClusterState = clusterState; clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder().addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build(); - publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState); + publishStateAndWait(nodeA.action, clusterState, previousClusterState); assertSameStateFromDiff(nodeB.clusterState, clusterState); assertThat(nodeB.clusterState.blocks().global().size(), equalTo(1)); // cluster state update - remove block previousClusterState = clusterState; clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).incrementVersion().build(); - publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState); + publishStateAndWait(nodeA.action, clusterState, previousClusterState); assertSameStateFromDiff(nodeB.clusterState, clusterState); assertTrue(nodeB.clusterState.wasReadFromDiff()); @@ -258,7 +255,7 @@ public class PublishClusterStateActionTests extends ESTestCase { previousClusterState = clusterState; discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(nodeC.discoveryNode).build(); clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build(); - publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState); + publishStateAndWait(nodeA.action, clusterState, previousClusterState); assertSameStateFromDiff(nodeB.clusterState, clusterState); // First state assertSameStateFromFull(nodeC.clusterState, clusterState); @@ -267,7 +264,7 @@ public class PublishClusterStateActionTests extends ESTestCase { previousClusterState = clusterState; MetaData metaData = MetaData.builder(clusterState.metaData()).transientSettings(Settings.settingsBuilder().put("foo", "bar").build()).build(); clusterState = ClusterState.builder(clusterState).metaData(metaData).incrementVersion().build(); - publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState); + publishStateAndWait(nodeA.action, clusterState, previousClusterState); assertSameStateFromDiff(nodeB.clusterState, clusterState); assertThat(nodeB.clusterState.blocks().global().size(), equalTo(0)); assertSameStateFromDiff(nodeC.clusterState, clusterState); @@ -276,7 +273,7 @@ public class PublishClusterStateActionTests extends ESTestCase { // cluster state update - skipping one version change - should request full cluster state previousClusterState = ClusterState.builder(clusterState).incrementVersion().build(); clusterState = ClusterState.builder(clusterState).incrementVersion().build(); - publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState); + publishStateAndWait(nodeA.action, clusterState, previousClusterState); assertSameStateFromFull(nodeB.clusterState, clusterState); assertSameStateFromFull(nodeC.clusterState, clusterState); assertFalse(nodeC.clusterState.wasReadFromDiff()); @@ -286,16 +283,17 @@ public class PublishClusterStateActionTests extends ESTestCase { .put(nodeA.discoveryNode) .put(nodeB.discoveryNode) .put(nodeC.discoveryNode) + .masterNodeId(nodeB.discoveryNode.id()) + .localNodeId(nodeB.discoveryNode.id()) .build(); previousClusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build(); clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build(); - publishStateDiffAndWait(nodeB.action, clusterState, previousClusterState); + publishStateAndWait(nodeB.action, clusterState, previousClusterState); assertSameStateFromFull(nodeA.clusterState, clusterState); assertSameStateFromFull(nodeC.clusterState, clusterState); } @Test - @TestLogging("cluster:DEBUG,discovery.zen.publish:DEBUG") public void testUnexpectedDiffPublishing() throws Exception { MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT, new ClusterStateListener() { @@ -311,18 +309,17 @@ public class PublishClusterStateActionTests extends ESTestCase { DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).put(nodeB.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build(); ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build(); - publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState); + publishStateAndWait(nodeA.action, clusterState, previousClusterState); assertSameStateFromFull(nodeB.clusterState, clusterState); // cluster state update - add block previousClusterState = clusterState; clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder().addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build(); - publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState); + publishStateAndWait(nodeA.action, clusterState, previousClusterState); assertSameStateFromDiff(nodeB.clusterState, clusterState); } @Test - @TestLogging("cluster:DEBUG,discovery.zen.publish:DEBUG") public void testDisablingDiffPublishing() throws Exception { Settings noDiffPublishingSettings = Settings.builder().put(DiscoverySettings.PUBLISH_DIFF_ENABLE, false).build(); @@ -348,39 +345,41 @@ public class PublishClusterStateActionTests extends ESTestCase { discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(nodeB.discoveryNode).build(); ClusterState previousClusterState = clusterState; clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build(); - publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState); + publishStateAndWait(nodeA.action, clusterState, previousClusterState); // cluster state update - add block previousClusterState = clusterState; clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder().addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build(); - publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState); + publishStateAndWait(nodeA.action, clusterState, previousClusterState); } - @Test - @TestLogging("cluster:DEBUG,discovery.zen.publish:DEBUG") /** * Test concurrent publishing works correctly (although not strictly required, it's a good testamne */ + @Test public void testSimultaneousClusterStatePublishing() throws Exception { int numberOfNodes = randomIntBetween(2, 10); - int numberOfIterations = randomIntBetween(50, 200); + int numberOfIterations = randomIntBetween(10, 50); Settings settings = Settings.builder().put(DiscoverySettings.PUBLISH_DIFF_ENABLE, randomBoolean()).build(); - MockNode[] nodes = new MockNode[numberOfNodes]; DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder(); - for (int i = 0; i < nodes.length; i++) { + MockNode master = null; + for (int i = 0; i < numberOfNodes; i++) { final String name = "node" + i; - nodes[i] = createMockNode(name, settings, Version.CURRENT, new ClusterStateListener() { + final MockNode node = createMockNode(name, settings, Version.CURRENT, new ClusterStateListener() { @Override public void clusterChanged(ClusterChangedEvent event) { assertProperMetaDataForVersion(event.state().metaData(), event.state().version()); } }); - discoveryNodesBuilder.put(nodes[i].discoveryNode); + if (i == 0) { + master = node; + } + discoveryNodesBuilder.put(node.discoveryNode); } AssertingAckListener[] listeners = new AssertingAckListener[numberOfIterations]; - discoveryNodesBuilder.localNodeId(nodes[0].discoveryNode.id()); + discoveryNodesBuilder.localNodeId(master.discoveryNode.id()); DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build(); MetaData metaData = MetaData.EMPTY_META_DATA; ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build(); @@ -389,17 +388,17 @@ public class PublishClusterStateActionTests extends ESTestCase { previousState = clusterState; metaData = buildMetaDataForVersion(metaData, i + 1); clusterState = ClusterState.builder(clusterState).incrementVersion().metaData(metaData).nodes(discoveryNodes).build(); - listeners[i] = publishStateDiff(nodes[0].action, clusterState, previousState); + listeners[i] = publishState(master.action, clusterState, previousState); } for (int i = 0; i < numberOfIterations; i++) { listeners[i].await(1, TimeUnit.SECONDS); } - // fake node[0] - it is the master - nodes[0].clusterState = clusterState; + // set the master cs + master.clusterState = clusterState; - for (MockNode node : nodes) { + for (MockNode node : nodes.values()) { assertThat(node.discoveryNode + " misses a cluster state", node.clusterState, notNullValue()); assertThat(node.discoveryNode + " unexpected cluster state: " + node.clusterState, node.clusterState.version(), equalTo(clusterState.version())); assertThat(node.clusterState.nodes().localNode(), equalTo(node.discoveryNode)); @@ -407,7 +406,6 @@ public class PublishClusterStateActionTests extends ESTestCase { } @Test - @TestLogging("cluster:DEBUG,discovery.zen.publish:DEBUG") public void testSerializationFailureDuringDiffPublishing() throws Exception { MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT, new ClusterStateListener() { @@ -423,7 +421,7 @@ public class PublishClusterStateActionTests extends ESTestCase { DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).put(nodeB.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build(); ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build(); - publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState); + publishStateAndWait(nodeA.action, clusterState, previousClusterState); assertSameStateFromFull(nodeB.clusterState, clusterState); // cluster state update - add block @@ -447,11 +445,217 @@ public class PublishClusterStateActionTests extends ESTestCase { }; } }; - List> errors = publishStateDiff(nodeA.action, unserializableClusterState, previousClusterState).awaitErrors(1, TimeUnit.SECONDS); - assertThat(errors.size(), equalTo(1)); - assertThat(errors.get(0).v2().getMessage(), containsString("Simulated failure of diff serialization")); + try { + publishStateAndWait(nodeA.action, unserializableClusterState, previousClusterState); + fail("cluster state published despite of diff errors"); + } catch (ElasticsearchException e) { + assertThat(e.getCause(), notNullValue()); + assertThat(e.getCause().getMessage(), containsString("Simulated")); + } } + + public void testFailToPublishWithLessThanMinMasterNodes() throws Exception { + final int masterNodes = randomIntBetween(1, 10); + + MockNode master = createMockNode("master"); + DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder().put(master.discoveryNode); + for (int i = 1; i < masterNodes; i++) { + discoveryNodesBuilder.put(createMockNode("node" + i).discoveryNode); + } + final int dataNodes = randomIntBetween(0, 5); + final Settings dataSettings = Settings.builder().put("node.master", false).build(); + for (int i = 0; i < dataNodes; i++) { + discoveryNodesBuilder.put(createMockNode("data_" + i, dataSettings).discoveryNode); + } + discoveryNodesBuilder.localNodeId(master.discoveryNode.id()).masterNodeId(master.discoveryNode.id()); + DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build(); + MetaData metaData = MetaData.EMPTY_META_DATA; + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).nodes(discoveryNodes).build(); + ClusterState previousState = master.clusterState; + try { + publishState(master.action, clusterState, previousState, masterNodes + randomIntBetween(1, 5)); + fail("cluster state publishing didn't fail despite of not having enough nodes"); + } catch (PublishClusterStateAction.FailedToCommitException expected) { + logger.debug("failed to publish as expected", expected); + } + } + + public void testPublishingWithSendingErrors() throws Exception { + int goodNodes = randomIntBetween(2, 5); + int errorNodes = randomIntBetween(1, 5); + int timeOutNodes = randomBoolean() ? 0 : randomIntBetween(1, 5); // adding timeout nodes will force timeout errors + final int numberOfMasterNodes = goodNodes + errorNodes + timeOutNodes + 1; // master + final boolean expectingToCommit = randomBoolean(); + Settings.Builder settings = Settings.builder(); + // make sure we have a reasonable timeout if we expect to timeout, o.w. one that will make the test "hang" + settings.put(DiscoverySettings.COMMIT_TIMEOUT, expectingToCommit == false && timeOutNodes > 0 ? "100ms" : "1h") + .put(DiscoverySettings.PUBLISH_TIMEOUT, "5ms"); // test is about comitting + + MockNode master = createMockNode("master", settings.build()); + + // randomize things a bit + int[] nodeTypes = new int[goodNodes + errorNodes + timeOutNodes]; + for (int i = 0; i < goodNodes; i++) { + nodeTypes[i] = 0; + } + for (int i = goodNodes; i < goodNodes + errorNodes; i++) { + nodeTypes[i] = 1; + } + for (int i = goodNodes + errorNodes; i < nodeTypes.length; i++) { + nodeTypes[i] = 2; + } + Collections.shuffle(Arrays.asList(nodeTypes), random()); + + DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder().put(master.discoveryNode); + for (int i = 0; i < nodeTypes.length; i++) { + final MockNode mockNode = createMockNode("node" + i); + discoveryNodesBuilder.put(mockNode.discoveryNode); + switch (nodeTypes[i]) { + case 1: + mockNode.action.errorOnSend.set(true); + break; + case 2: + mockNode.action.timeoutOnSend.set(true); + break; + } + } + final int dataNodes = randomIntBetween(0, 3); // data nodes don't matter + for (int i = 0; i < dataNodes; i++) { + final MockNode mockNode = createMockNode("data_" + i, Settings.builder().put("node.master", false).build()); + discoveryNodesBuilder.put(mockNode.discoveryNode); + if (randomBoolean()) { + // we really don't care - just chaos monkey + mockNode.action.errorOnCommit.set(randomBoolean()); + mockNode.action.errorOnSend.set(randomBoolean()); + mockNode.action.timeoutOnCommit.set(randomBoolean()); + mockNode.action.timeoutOnSend.set(randomBoolean()); + } + } + + final int minMasterNodes; + final String expectedBehavior; + if (expectingToCommit) { + minMasterNodes = randomIntBetween(0, goodNodes + 1); // count master + expectedBehavior = "succeed"; + } else { + minMasterNodes = randomIntBetween(goodNodes + 2, numberOfMasterNodes); // +2 because of master + expectedBehavior = timeOutNodes > 0 ? "timeout" : "fail"; + } + logger.info("--> expecting commit to {}. good nodes [{}], errors [{}], timeouts [{}]. min_master_nodes [{}]", + expectedBehavior, goodNodes + 1, errorNodes, timeOutNodes, minMasterNodes); + + discoveryNodesBuilder.localNodeId(master.discoveryNode.id()).masterNodeId(master.discoveryNode.id()); + DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build(); + MetaData metaData = MetaData.EMPTY_META_DATA; + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).nodes(discoveryNodes).build(); + ClusterState previousState = master.clusterState; + try { + publishState(master.action, clusterState, previousState, minMasterNodes); + if (expectingToCommit == false) { + fail("cluster state publishing didn't fail despite of not have enough nodes"); + } + } catch (PublishClusterStateAction.FailedToCommitException exception) { + logger.debug("failed to publish as expected", exception); + if (expectingToCommit) { + throw exception; + } + assertThat(exception.getMessage(), containsString(timeOutNodes > 0 ? "timed out" : "failed")); + } + } + + public void testIncomingClusterStateVerification() throws Exception { + MockNode node = createMockNode("node"); + + logger.info("--> testing acceptances of any master when having no master"); + ClusterState state = ClusterState.builder(node.clusterState) + .nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId(randomAsciiOfLength(10))).build(); + node.action.validateIncomingState(state); + + // now set a master node + node.clusterState = ClusterState.builder(node.clusterState).nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build(); + logger.info("--> testing rejection of another master"); + try { + node.action.validateIncomingState(state); + fail("node accepted state from another master"); + } catch (IllegalStateException OK) { + } + + logger.info("--> test state from the current master is accepted"); + node.action.validateIncomingState(ClusterState.builder(node.clusterState) + .nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build()); + + + logger.info("--> testing rejection of another cluster name"); + try { + node.action.validateIncomingState(ClusterState.builder(new ClusterName(randomAsciiOfLength(10))).nodes(node.nodes()).build()); + fail("node accepted state with another cluster name"); + } catch (IllegalStateException OK) { + } + + logger.info("--> testing rejection of a cluster state with wrong local node"); + try { + state = ClusterState.builder(node.clusterState).nodes(DiscoveryNodes.builder(node.nodes()).localNodeId("_non_existing_").build()).build(); + node.action.validateIncomingState(state); + fail("node accepted state with non-existence local node"); + } catch (IllegalStateException OK) { + } + + try { + MockNode otherNode = createMockNode("otherNode"); + state = ClusterState.builder(node.clusterState).nodes( + DiscoveryNodes.builder(node.nodes()).put(otherNode.discoveryNode).localNodeId(otherNode.discoveryNode.id()).build() + ).build(); + node.action.validateIncomingState(state); + fail("node accepted state with existent but wrong local node"); + } catch (IllegalStateException OK) { + } + } + + public void testInterleavedPublishCommit() throws Throwable { + MockNode node = createMockNode("node"); + final ClusterState state1 = ClusterState.builder(node.clusterState).incrementVersion().build(); + final ClusterState state2 = ClusterState.builder(state1).incrementVersion().build(); + final BytesReference state1Bytes = PublishClusterStateAction.serializeFullClusterState(state1, Version.CURRENT); + final BytesReference state2Bytes = PublishClusterStateAction.serializeFullClusterState(state2, Version.CURRENT); + final CapturingTransportChannel channel = new CapturingTransportChannel(); + + node.action.handleIncomingClusterStateRequest(new BytesTransportRequest(state1Bytes, Version.CURRENT), channel); + assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); + assertThat(channel.error.get(), nullValue()); + channel.clear(); + + // another incoming state is OK. Should just override pending state + node.action.handleIncomingClusterStateRequest(new BytesTransportRequest(state2Bytes, Version.CURRENT), channel); + assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); + assertThat(channel.error.get(), nullValue()); + channel.clear(); + + // committing previous state should fail + try { + node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state1.stateUUID()), channel); + // sadly, there are ways to percolate errors + assertThat(channel.response.get(), nullValue()); + assertThat(channel.error.get(), notNullValue()); + if (channel.error.get() instanceof IllegalStateException == false) { + throw channel.error.get(); + } + } catch (IllegalStateException OK) { + + } + channel.clear(); + + // committing second state should succeed + node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state2.stateUUID()), channel); + assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); + assertThat(channel.error.get(), nullValue()); + channel.clear(); + + // now check it was really committed + assertSameState(node.clusterState, state2); + } + + private MetaData buildMetaDataForVersion(MetaData metaData, long version) { ImmutableOpenMap.Builder indices = ImmutableOpenMap.builder(metaData.indices()); indices.put("test" + version, IndexMetaData.builder("test" + version).settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) @@ -471,15 +675,19 @@ public class PublishClusterStateActionTests extends ESTestCase { assertThat(metaData.transientSettings().get("test"), equalTo(Long.toString(version))); } - public void publishStateDiffAndWait(PublishClusterStateAction action, ClusterState state, ClusterState previousState) throws InterruptedException { - publishStateDiff(action, state, previousState).await(1, TimeUnit.SECONDS); + public void publishStateAndWait(PublishClusterStateAction action, ClusterState state, ClusterState previousState) throws InterruptedException { + publishState(action, state, previousState).await(1, TimeUnit.SECONDS); } - public AssertingAckListener publishStateDiff(PublishClusterStateAction action, ClusterState state, ClusterState previousState) throws InterruptedException { + public AssertingAckListener publishState(PublishClusterStateAction action, ClusterState state, ClusterState previousState) throws InterruptedException { + final int minimumMasterNodes = randomIntBetween(-1, state.nodes().getMasterNodes().size()); + return publishState(action, state, previousState, minimumMasterNodes); + } + + public AssertingAckListener publishState(PublishClusterStateAction action, ClusterState state, ClusterState previousState, int minMasterNodes) throws InterruptedException { AssertingAckListener assertingAckListener = new AssertingAckListener(state.nodes().getSize() - 1); ClusterChangedEvent changedEvent = new ClusterChangedEvent("test update", state, previousState); - int requiredNodes = randomIntBetween(-1, state.nodes().getSize() - 1); - action.publish(changedEvent, requiredNodes, assertingAckListener); + action.publish(changedEvent, minMasterNodes, assertingAckListener); return assertingAckListener; } @@ -522,19 +730,11 @@ public class PublishClusterStateActionTests extends ESTestCase { } - public static class DelegatingClusterState extends ClusterState { - - public DelegatingClusterState(ClusterState clusterState) { - super(clusterState.version(), clusterState.stateUUID(), clusterState); - } - - - } - - void assertSameState(ClusterState actual, ClusterState expected) { assertThat(actual, notNullValue()); - assertThat("\n--> actual ClusterState: " + actual.prettyPrint() + "\n--> expected ClusterState:" + expected.prettyPrint(), actual.stateUUID(), equalTo(expected.stateUUID())); + final String reason = "\n--> actual ClusterState: " + actual.prettyPrint() + "\n--> expected ClusterState:" + expected.prettyPrint(); + assertThat("unequal UUIDs" + reason, actual.stateUUID(), equalTo(expected.stateUUID())); + assertThat("unequal versions" + reason, actual.version(), equalTo(expected.version())); } void assertSameStateFromDiff(ClusterState actual, ClusterState expected) { @@ -546,4 +746,77 @@ public class PublishClusterStateActionTests extends ESTestCase { assertSameState(actual, expected); assertFalse(actual.wasReadFromDiff()); } + + static class MockPublishAction extends PublishClusterStateAction { + + AtomicBoolean timeoutOnSend = new AtomicBoolean(); + AtomicBoolean errorOnSend = new AtomicBoolean(); + AtomicBoolean timeoutOnCommit = new AtomicBoolean(); + AtomicBoolean errorOnCommit = new AtomicBoolean(); + + public MockPublishAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, NewClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { + super(settings, transportService, nodesProvider, listener, discoverySettings, clusterName); + } + + @Override + protected void handleIncomingClusterStateRequest(BytesTransportRequest request, TransportChannel channel) throws IOException { + if (errorOnSend.get()) { + throw new ElasticsearchException("forced error on incoming cluster state"); + } + if (timeoutOnSend.get()) { + return; + } + super.handleIncomingClusterStateRequest(request, channel); + } + + @Override + protected void handleCommitRequest(PublishClusterStateAction.CommitClusterStateRequest request, TransportChannel channel) { + if (errorOnCommit.get()) { + throw new ElasticsearchException("forced error on incoming commit"); + } + if (timeoutOnCommit.get()) { + return; + } + super.handleCommitRequest(request, channel); + } + } + + static class CapturingTransportChannel implements TransportChannel { + + AtomicReference response = new AtomicReference<>(); + AtomicReference error = new AtomicReference<>(); + + public void clear() { + response.set(null); + error.set(null); + } + + @Override + public String action() { + return "_noop_"; + } + + @Override + public String getProfileName() { + return "_noop_"; + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + this.response.set(response); + assertThat(error.get(), nullValue()); + } + + @Override + public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { + this.response.set(response); + assertThat(error.get(), nullValue()); + } + + @Override + public void sendResponse(Throwable error) throws IOException { + this.error.set(error); + assertThat(response.get(), nullValue()); + } + } } diff --git a/core/src/test/java/org/elasticsearch/test/disruption/NetworkDelaysPartition.java b/core/src/test/java/org/elasticsearch/test/disruption/NetworkDelaysPartition.java index 9eb99302e46..8439f6e8f76 100644 --- a/core/src/test/java/org/elasticsearch/test/disruption/NetworkDelaysPartition.java +++ b/core/src/test/java/org/elasticsearch/test/disruption/NetworkDelaysPartition.java @@ -60,6 +60,10 @@ public class NetworkDelaysPartition extends NetworkPartition { this(nodesSideOne, nodesSideTwo, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX, random); } + public NetworkDelaysPartition(Set nodesSideOne, Set nodesSideTwo, long delay, Random random) { + this(nodesSideOne, nodesSideTwo, delay, delay, random); + } + public NetworkDelaysPartition(Set nodesSideOne, Set nodesSideTwo, long delayMin, long delayMax, Random random) { super(nodesSideOne, nodesSideTwo, random); this.delayMin = delayMin; @@ -69,7 +73,7 @@ public class NetworkDelaysPartition extends NetworkPartition { @Override public synchronized void startDisrupting() { - duration = new TimeValue(delayMin + random.nextInt((int) (delayMax - delayMin))); + duration = new TimeValue(delayMin == delayMax ? delayMin : delayMin + random.nextInt((int) (delayMax - delayMin))); super.startDisrupting(); }