beefed up testing...

This commit is contained in:
Boaz Leskes 2015-08-23 18:13:36 +02:00
parent 81e07e81e0
commit b702843fe9
9 changed files with 664 additions and 241 deletions

View File

@ -195,6 +195,7 @@ public class ClusterModule extends AbstractModule {
registerClusterDynamicSetting(DestructiveOperations.REQUIRES_NAME, Validator.EMPTY);
registerClusterDynamicSetting(DiscoverySettings.PUBLISH_TIMEOUT, Validator.TIME_NON_NEGATIVE);
registerClusterDynamicSetting(DiscoverySettings.PUBLISH_DIFF_ENABLE, Validator.BOOLEAN);
registerClusterDynamicSetting(DiscoverySettings.COMMIT_TIMEOUT, Validator.TIME_NON_NEGATIVE);
registerClusterDynamicSetting(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);
registerClusterDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);
registerClusterDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);

View File

@ -256,7 +256,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
}
// Used for testing and logging to determine how this cluster state was send over the wire
boolean wasReadFromDiff() {
public boolean wasReadFromDiff() {
return wasReadFromDiff;
}

View File

@ -482,8 +482,14 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
// we publish here before we send a notification to all the listeners, since if it fails
// we don't want to notify
if (newClusterState.nodes().localNodeMaster()) {
logger.debug("publishing cluster state version {}", newClusterState.version());
logger.debug("publishing cluster state version [{}]", newClusterState.version());
try {
discoveryService.publish(clusterChangedEvent, ackListener);
} catch (Throwable t) {
logger.warn("failing [{}]: failed to publish cluster state version [{}]", t, source, newClusterState.version());
updateTask.onFailure(source, t);
return;
}
}
// update the current cluster state

View File

@ -60,7 +60,7 @@ public class DiscoverySettings extends AbstractComponent {
nodeSettingsService.addListener(new ApplySettings());
this.noMasterBlock = parseNoMasterBlock(settings.get(NO_MASTER_BLOCK, DEFAULT_NO_MASTER_BLOCK));
this.publishTimeout = settings.getAsTime(PUBLISH_TIMEOUT, publishTimeout);
this.commitTimeout = settings.getAsTime(PUBLISH_TIMEOUT, publishTimeout);
this.commitTimeout = settings.getAsTime(COMMIT_TIMEOUT, publishTimeout);
this.publishDiff = settings.getAsBoolean(PUBLISH_DIFF_ENABLE, DEFAULT_PUBLISH_DIFF_ENABLE);
}
@ -91,6 +91,13 @@ public class DiscoverySettings extends AbstractComponent {
publishTimeout = newPublishTimeout;
}
}
TimeValue newCommitTimeout = settings.getAsTime(COMMIT_TIMEOUT, null);
if (newCommitTimeout != null) {
if (newCommitTimeout.millis() != commitTimeout.millis()) {
logger.info("updating [{}] from [{}] to [{}]", COMMIT_TIMEOUT, commitTimeout, newCommitTimeout);
commitTimeout = newCommitTimeout;
}
}
String newNoMasterBlockValue = settings.get(NO_MASTER_BLOCK);
if (newNoMasterBlockValue != null) {
ClusterBlock newNoMasterBlock = parseNoMasterBlock(newNoMasterBlockValue);

View File

@ -36,7 +36,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.InternalClusterService;
@ -333,7 +332,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
try {
publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
} catch (PublishClusterStateAction.FailedToCommitException t) {
logger.warn("failed to publish [{}] (not enough nodes acknowledged, min master nodes [{}])", clusterChangedEvent.state().version(), electMaster.minimumMasterNodes());
// cluster service logs a WARN message
logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])", clusterChangedEvent.state().version(), electMaster.minimumMasterNodes());
clusterService.submitStateUpdateTask("zen-disco-failed-to-publish", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
@ -856,7 +856,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
* If the second condition fails we ignore the cluster state.
*/
static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) {
rejectNewClusterStateIfNeeded(logger, currentState.nodes(), newClusterState);
validateStateIsFromCurrentMaster(logger, currentState.nodes(), newClusterState);
if (currentState.nodes().masterNodeId() != null && newClusterState.version() < currentState.version()) {
// if the new state has a smaller version, and it has the same master node, then no need to process it
logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
@ -871,7 +871,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
* This method checks for this and throws an exception if needed
*/
public static void rejectNewClusterStateIfNeeded(ESLogger logger, DiscoveryNodes currentNodes, ClusterState newClusterState) {
public static void validateStateIsFromCurrentMaster(ESLogger logger, DiscoveryNodes currentNodes, ClusterState newClusterState) {
if (currentNodes.masterNodeId() == null) {
return;
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
@ -96,13 +97,11 @@ public class PublishClusterStateAction extends AbstractComponent {
}
public void publish(ClusterChangedEvent clusterChangedEvent, int minMasterNodes, final Discovery.AckListener ackListener) {
Set<DiscoveryNode> nodesToPublishTo = new HashSet<>(clusterChangedEvent.state().nodes().size());
DiscoveryNode localNode = nodesProvider.nodes().localNode();
int totalMasterNodes = 0;
for (final DiscoveryNode node : clusterChangedEvent.state().nodes()) {
if (node.isMasterNode()) {
totalMasterNodes++;
}
final DiscoveryNodes nodes = clusterChangedEvent.state().nodes();
Set<DiscoveryNode> nodesToPublishTo = new HashSet<>(nodes.size());
DiscoveryNode localNode = nodes.localNode();
final int totalMasterNodes = nodes.masterNodes().size();
for (final DiscoveryNode node : nodes) {
if (node.equals(localNode) == false) {
nodesToPublishTo.add(node);
}
@ -118,24 +117,24 @@ public class PublishClusterStateAction extends AbstractComponent {
final ClusterState clusterState = clusterChangedEvent.state();
final ClusterState previousState = clusterChangedEvent.previousState();
final AtomicBoolean timedOutWaitingForNodes = new AtomicBoolean(false);
final TimeValue publishTimeout = discoverySettings.getPublishTimeout();
final boolean sendFullVersion = !discoverySettings.getPublishDiff() || previousState == null;
final SendingController sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes, totalMasterNodes, publishResponseHandler);
Diff<ClusterState> diff = null;
// we build these early as a best effort not to commit in the case of error.
// sadly this is not water tight as it may that a failed diff based publishing to a node
// will cause a full serialization based on an older version, which may fail after the
// change has been committed.
buildDiffAndSerializeStates(clusterState, previousState, nodesToPublishTo, sendFullVersion, serializedStates, serializedDiffs);
for (final DiscoveryNode node : nodesToPublishTo) {
// try and serialize the cluster state once (or per version), so we don't serialize it
// per node when we send it over the wire, compress it while we are at it...
// we don't send full version if node didn't exist in the previous version of cluster state
if (sendFullVersion || !previousState.nodes().nodeExists(node.id())) {
sendFullClusterState(clusterState, serializedStates, node, timedOutWaitingForNodes, publishTimeout, sendingController);
sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);
} else {
if (diff == null) {
diff = clusterState.diff(previousState);
}
sendClusterStateDiff(clusterState, diff, serializedDiffs, node, timedOutWaitingForNodes, publishTimeout, sendingController);
sendClusterStateDiff(clusterState, serializedDiffs, serializedStates, node, publishTimeout, sendingController);
}
}
@ -144,8 +143,8 @@ public class PublishClusterStateAction extends AbstractComponent {
if (publishTimeout.millis() > 0) {
// only wait if the publish timeout is configured...
try {
timedOutWaitingForNodes.set(!publishResponseHandler.awaitAllNodes(publishTimeout));
if (timedOutWaitingForNodes.get()) {
sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(publishTimeout));
if (sendingController.getPublishingTimedOut()) {
DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes();
// everyone may have just responded
if (pendingNodes.length > 0) {
@ -159,13 +158,34 @@ public class PublishClusterStateAction extends AbstractComponent {
}
}
private void sendFullClusterState(ClusterState clusterState, @Nullable Map<Version, BytesReference> serializedStates,
DiscoveryNode node, AtomicBoolean timedOutWaitingForNodes, TimeValue publishTimeout,
SendingController sendingController) {
BytesReference bytes = null;
if (serializedStates != null) {
bytes = serializedStates.get(node.version());
private void buildDiffAndSerializeStates(ClusterState clusterState, ClusterState previousState, Set<DiscoveryNode> nodesToPublishTo,
boolean sendFullVersion, Map<Version, BytesReference> serializedStates, Map<Version, BytesReference> serializedDiffs) {
Diff<ClusterState> diff = null;
for (final DiscoveryNode node : nodesToPublishTo) {
try {
if (sendFullVersion || !previousState.nodes().nodeExists(node.id())) {
// will send a full reference
if (serializedStates.containsKey(node.version()) == false) {
serializedStates.put(node.version(), serializeFullClusterState(clusterState, node.version()));
}
} else {
// will send a diff
if (diff == null) {
diff = clusterState.diff(previousState);
}
if (serializedDiffs.containsKey(node.version()) == false) {
serializedDiffs.put(node.version(), serializeDiffClusterState(diff, node.version()));
}
}
} catch (IOException e) {
throw new ElasticsearchException("failed to serialize cluster_state for publishing to node {}", e, node);
}
}
}
private void sendFullClusterState(ClusterState clusterState, @Nullable Map<Version, BytesReference> serializedStates,
DiscoveryNode node, TimeValue publishTimeout, SendingController sendingController) {
BytesReference bytes = serializedStates.get(node.version());
if (bytes == null) {
try {
bytes = serializeFullClusterState(clusterState, node.version());
@ -178,31 +198,22 @@ public class PublishClusterStateAction extends AbstractComponent {
return;
}
}
sendClusterStateToNode(clusterState, bytes, node, timedOutWaitingForNodes, publishTimeout, sendingController, false);
sendClusterStateToNode(clusterState, bytes, node, publishTimeout, sendingController, false, serializedStates);
}
private void sendClusterStateDiff(ClusterState clusterState, Diff diff, Map<Version, BytesReference> serializedDiffs, DiscoveryNode node,
AtomicBoolean timedOutWaitingForNodes, TimeValue publishTimeout,
SendingController sendingController) {
private void sendClusterStateDiff(ClusterState clusterState,
Map<Version, BytesReference> serializedDiffs, Map<Version, BytesReference> serializedStates,
DiscoveryNode node, TimeValue publishTimeout, SendingController sendingController) {
BytesReference bytes = serializedDiffs.get(node.version());
if (bytes == null) {
try {
bytes = serializeDiffClusterState(diff, node.version());
serializedDiffs.put(node.version(), bytes);
} catch (Throwable e) {
logger.warn("failed to serialize diff of cluster_state before publishing it to node {}", e, node);
sendingController.onNodeSendFailed(node, e);
return;
}
}
sendClusterStateToNode(clusterState, bytes, node, timedOutWaitingForNodes, publishTimeout, sendingController, true);
assert bytes != null : "failed to find serialized diff for node " + node + " of version [" + node.version() + "]";
sendClusterStateToNode(clusterState, bytes, node, publishTimeout, sendingController, true, serializedStates);
}
private void sendClusterStateToNode(final ClusterState clusterState, BytesReference bytes,
final DiscoveryNode node, final AtomicBoolean timedOutWaitingForNodes,
final DiscoveryNode node,
final TimeValue publishTimeout,
final SendingController sendingController,
final boolean sendDiffs) {
final boolean sendDiffs, final Map<Version, BytesReference> serializedStates) {
try {
TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false);
// no need to put a timeout on the options here, because we want the response to eventually be received
@ -215,7 +226,7 @@ public class PublishClusterStateAction extends AbstractComponent {
@Override
public void handleResponse(TransportResponse.Empty response) {
if (timedOutWaitingForNodes.get()) {
if (sendingController.getPublishingTimedOut()) {
logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node, clusterState.version(), publishTimeout);
}
sendingController.onNodeSendAck(node);
@ -225,7 +236,7 @@ public class PublishClusterStateAction extends AbstractComponent {
public void handleException(TransportException exp) {
if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage());
sendFullClusterState(clusterState, null, node, timedOutWaitingForNodes, publishTimeout, sendingController);
sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);
} else {
logger.debug("failed to send cluster state to {}", exp, node);
sendingController.onNodeSendFailed(node, exp);
@ -238,7 +249,7 @@ public class PublishClusterStateAction extends AbstractComponent {
}
}
private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final BlockingClusterStatePublishResponseHandler publishResponseHandler) {
private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) {
try {
logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]", clusterState.stateUUID(), clusterState.version(), node);
TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false);
@ -252,21 +263,21 @@ public class PublishClusterStateAction extends AbstractComponent {
@Override
public void handleResponse(TransportResponse.Empty response) {
// if (timedOutWaitingForNodes.get()) {
if (sendingController.getPublishingTimedOut()) {
logger.debug("node {} responded to cluster state commit [{}]", node, clusterState.version());
// }
publishResponseHandler.onResponse(node);
}
sendingController.getPublishResponseHandler().onResponse(node);
}
@Override
public void handleException(TransportException exp) {
logger.debug("failed to commit cluster state (uuid [{}], version [{}]) to {}", exp, clusterState.stateUUID(), clusterState.version(), node);
publishResponseHandler.onFailure(node, exp);
sendingController.getPublishResponseHandler().onFailure(node, exp);
}
});
} catch (Throwable t) {
logger.warn("error sending cluster state commit (uuid [{}], version [{}]) to {}", t, clusterState.stateUUID(), clusterState.version(), node);
publishResponseHandler.onFailure(node, t);
sendingController.getPublishResponseHandler().onFailure(node, t);
}
}
@ -294,10 +305,7 @@ public class PublishClusterStateAction extends AbstractComponent {
private Object lastSeenClusterStateMutex = new Object();
private ClusterState lastSeenClusterState;
private class SendClusterStateRequestHandler implements TransportRequestHandler<BytesTransportRequest> {
@Override
public void messageReceived(BytesTransportRequest request, final TransportChannel channel) throws Exception {
protected void handleIncomingClusterStateRequest(BytesTransportRequest request, TransportChannel channel) throws IOException {
Compressor compressor = CompressorFactory.compressor(request.bytes());
StreamInput in;
if (compressor != null) {
@ -321,38 +329,47 @@ public class PublishClusterStateAction extends AbstractComponent {
throw new IncompatibleClusterStateVersionException("have no local cluster state");
}
// sanity check incoming state
final ClusterName incomingClusterName = incomingState.getClusterName();
if (!incomingClusterName.equals(PublishClusterStateAction.this.clusterName)) {
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().masterNode(), incomingClusterName);
throw new IllegalStateException("received state from a node that is not part of the cluster");
}
if (incomingState.nodes().localNode() == null) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().masterNode());
throw new IllegalStateException("received state from a node that is not part of the cluster");
}
// state from another master requires more subtle checks, so we let it pass for now (it will be checked in ZenDiscovery)
if (nodesProvider.nodes().localNodeMaster() == false) {
ZenDiscovery.rejectNewClusterStateIfNeeded(logger, nodesProvider.nodes(), incomingState);
}
validateIncomingState(incomingState);
lastSeenClusterState = incomingState;
lastSeenClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
// package private for testing
/**
* does simple sanity check of the incoming cluster state. Throws an exception on rejections.
*/
void validateIncomingState(ClusterState state) {
final ClusterName incomingClusterName = state.getClusterName();
if (!incomingClusterName.equals(PublishClusterStateAction.this.clusterName)) {
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", state.nodes().masterNode(), incomingClusterName);
throw new IllegalStateException("received state from a node that is not part of the cluster");
}
final DiscoveryNodes currentNodes = nodesProvider.nodes();
if (currentNodes.localNode().equals(state.nodes().localNode()) == false) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", state.nodes().masterNode());
throw new IllegalStateException("received state from a node that is not part of the cluster");
}
// state from another master requires more subtle checks, so we let it pass for now (it will be checked in ZenDiscovery)
if (currentNodes.localNodeMaster() == false) {
ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, state);
}
}
private class CommitClusterStateRequestHandler implements TransportRequestHandler<CommitClusterStateRequest> {
@Override
public void messageReceived(CommitClusterStateRequest request, final TransportChannel channel) throws Exception {
protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) {
ClusterState committedClusterState;
synchronized (lastSeenClusterStateMutex) {
committedClusterState = lastSeenClusterState;
}
if (committedClusterState.stateUUID().equals(request.stateUUID) == false) {
// nocommit: we need something better here
channel.sendResponse(TransportResponse.Empty.INSTANCE);
return;
// if this message somehow comes without a previous send, we won't have a cluster state
String lastSeenUUID = committedClusterState == null ? null : committedClusterState.stateUUID();
if (request.stateUUID.equals(lastSeenUUID) == false) {
throw new IllegalStateException("tried to commit cluster state UUID [" + request.stateUUID + "], but last seen UUID is [" + lastSeenUUID + "]");
}
try {
@ -363,6 +380,7 @@ public class PublishClusterStateAction extends AbstractComponent {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Throwable e) {
logger.debug("failed to send response on cluster state processed", e);
onNewClusterStateFailed(e);
}
}
@ -377,16 +395,26 @@ public class PublishClusterStateAction extends AbstractComponent {
});
} catch (Exception e) {
logger.warn("unexpected error while processing cluster state version [{}]", e, lastSeenClusterState.version());
try {
channel.sendResponse(e);
} catch (Throwable e1) {
logger.debug("failed to send response on cluster state processed", e1);
}
}
throw e;
}
}
static class CommitClusterStateRequest extends TransportRequest {
private class SendClusterStateRequestHandler implements TransportRequestHandler<BytesTransportRequest> {
@Override
public void messageReceived(BytesTransportRequest request, final TransportChannel channel) throws Exception {
handleIncomingClusterStateRequest(request, channel);
}
}
private class CommitClusterStateRequestHandler implements TransportRequestHandler<CommitClusterStateRequest> {
@Override
public void messageReceived(CommitClusterStateRequest request, final TransportChannel channel) throws Exception {
handleCommitRequest(request, channel);
}
}
protected static class CommitClusterStateRequest extends TransportRequest {
String stateUUID;
@ -413,14 +441,19 @@ public class PublishClusterStateAction extends AbstractComponent {
public class FailedToCommitException extends ElasticsearchException {
public FailedToCommitException(String msg) {
super(msg);
public FailedToCommitException(String msg, Object... args) {
super(msg, args);
}
}
class SendingController {
private final ClusterState clusterState;
public BlockingClusterStatePublishResponseHandler getPublishResponseHandler() {
return publishResponseHandler;
}
private final BlockingClusterStatePublishResponseHandler publishResponseHandler;
volatile int neededMastersToCommit;
int pendingMasterNodes;
@ -428,23 +461,31 @@ public class PublishClusterStateAction extends AbstractComponent {
final CountDownLatch comittedOrFailed;
final AtomicBoolean committed;
// an external marker to note that the publishing process is timed out. This is usefull for proper logging.
final AtomicBoolean publishingTimedOut = new AtomicBoolean();
private SendingController(ClusterState clusterState, int minMasterNodes, int totalMasterNodes, BlockingClusterStatePublishResponseHandler publishResponseHandler) {
this.clusterState = clusterState;
this.publishResponseHandler = publishResponseHandler;
this.neededMastersToCommit = Math.max(0, minMasterNodes - 1); // we are one of the master nodes
this.pendingMasterNodes = totalMasterNodes - 1;
if (this.neededMastersToCommit > this.pendingMasterNodes) {
throw new FailedToCommitException("not enough masters to ack sent cluster state. [{}] needed , have [{}]", neededMastersToCommit, pendingMasterNodes);
}
this.committed = new AtomicBoolean(neededMastersToCommit == 0);
this.comittedOrFailed = new CountDownLatch(committed.get() ? 0 : 1);
}
public void waitForCommit(TimeValue commitTimeout) {
boolean timedout = false;
try {
comittedOrFailed.await(commitTimeout.millis(), TimeUnit.MILLISECONDS);
timedout = comittedOrFailed.await(commitTimeout.millis(), TimeUnit.MILLISECONDS) == false;
} catch (InterruptedException e) {
}
if (committed.get() == false) {
throw new FailedToCommitException("failed to get enough masters to ack sent cluster state. [" + neededMastersToCommit + "] left");
throw new FailedToCommitException("{} enough masters to ack sent cluster state. [{}] left",
timedout ? "timed out while waiting for" : "failed to get", neededMastersToCommit);
}
}
@ -456,17 +497,19 @@ public class PublishClusterStateAction extends AbstractComponent {
}
} else {
assert sendAckedBeforeCommit.isEmpty();
sendCommitToNode(node, clusterState, publishResponseHandler);
sendCommitToNode(node, clusterState, this);
}
}
private void onMasterNodeSendAck(DiscoveryNode node) {
logger.trace("master node {} acked cluster state version [{}]. processing ... (current pending [{}], needed [{}])",
node, clusterState.version(), pendingMasterNodes, neededMastersToCommit);
neededMastersToCommit--;
if (neededMastersToCommit == 0) {
logger.trace("committing version [{}]", clusterState.version());
for (DiscoveryNode nodeToCommit : sendAckedBeforeCommit) {
sendCommitToNode(nodeToCommit, clusterState, publishResponseHandler);
sendCommitToNode(nodeToCommit, clusterState, this);
}
sendAckedBeforeCommit.clear();
boolean success = committed.compareAndSet(false, true);
@ -478,17 +521,28 @@ public class PublishClusterStateAction extends AbstractComponent {
private void onMasterNodeDone(DiscoveryNode node) {
pendingMasterNodes--;
if (pendingMasterNodes == 0) {
if (pendingMasterNodes == 0 && neededMastersToCommit > 0) {
logger.trace("failed to commit version [{}]. All master nodes acked or failed but [{}] acks are still needed",
clusterState.version(), neededMastersToCommit);
comittedOrFailed.countDown();
}
}
synchronized public void onNodeSendFailed(DiscoveryNode node, Throwable t) {
if (node.isMasterNode()) {
logger.trace("master node {} failed to ack cluster state version [{}]. processing ... (current pending [{}], needed [{}])",
node, clusterState.version(), pendingMasterNodes, neededMastersToCommit);
onMasterNodeDone(node);
}
publishResponseHandler.onFailure(node, t);
}
public boolean getPublishingTimedOut() {
return publishingTimedOut.get();
}
public void setPublishingTimedOut(boolean isTimedOut) {
publishingTimedOut.set(isTimedOut);
}
}
}

View File

@ -23,24 +23,37 @@ import com.google.common.base.Predicate;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.disruption.NetworkDelaysPartition;
import org.elasticsearch.test.disruption.NetworkUnresponsivePartition;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.test.ESIntegTestCase.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
import static org.hamcrest.Matchers.*;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
@ -332,4 +345,69 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
logger.info("--> verifying no node left and master is up");
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(nodeCount)).get().isTimedOut());
}
public void testCanNotPublishWithoutMinMastNodes() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "zen")
.put(FaultDetection.SETTING_PING_TIMEOUT, "1h") // disable it
.put(ZenDiscovery.SETTING_PING_TIMEOUT, "200ms")
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2)
.put(DiscoverySettings.COMMIT_TIMEOUT, "100ms") // speed things up
.build();
internalCluster().startNodesAsync(3, settings).get();
final String master = internalCluster().getMasterName();
Set<String> otherNodes = new HashSet<>(Arrays.asList(internalCluster().getNodeNames()));
otherNodes.remove(master);
NetworkDelaysPartition partition = new NetworkDelaysPartition(Collections.singleton(master), otherNodes, 60000, random());
internalCluster().setDisruptionScheme(partition);
partition.startDisrupting();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> failure = new AtomicReference<>();
logger.debug("--> submitting for cluster state to be rejected");
final ClusterService masterClusterService = internalCluster().clusterService(master);
masterClusterService.submitStateUpdateTask("test", new ProcessedClusterStateUpdateTask() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MetaData.Builder metaData = MetaData.builder(currentState.metaData()).persistentSettings(
Settings.builder().put(currentState.metaData().persistentSettings()).put("_SHOULD_NOT_BE_THERE_", true).build()
);
return ClusterState.builder(currentState).metaData(metaData).build();
}
@Override
public void onFailure(String source, Throwable t) {
failure.set(t);
latch.countDown();
}
});
logger.debug("--> waiting for cluster state to be processed/rejected");
latch.await();
assertThat(failure.get(), instanceOf(PublishClusterStateAction.FailedToCommitException.class));
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(masterClusterService.state().nodes().masterNode(), nullValue());
}
});
partition.stopDisrupting();
logger.debug("--> waiting for cluster to heal");
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes("3").setWaitForEvents(Priority.LANGUID));
for (String node : internalCluster().getNodeNames()) {
Settings nodeSetting = internalCluster().clusterService(node).state().metaData().settings();
assertThat(node + " processed the cluster state despite of a min master node violation", nodeSetting.get("_SHOULD_NOT_BE_THERE_"), nullValue());
}
}
}

View File

@ -17,16 +17,20 @@
* under the License.
*/
package org.elasticsearch.cluster;
package org.elasticsearch.discovery.zen.publish;
import com.google.common.collect.ImmutableMap;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -36,51 +40,51 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.local.LocalTransport;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.collect.Maps.newHashMap;
import static org.hamcrest.Matchers.*;
@TestLogging("discovery.zen.publish:TRACE")
public class PublishClusterStateActionTests extends ESTestCase {
protected ThreadPool threadPool;
protected Map<String, MockNode> nodes = newHashMap();
public static class MockNode implements PublishClusterStateAction.NewClusterStateListener {
public static class MockNode implements PublishClusterStateAction.NewClusterStateListener, DiscoveryNodesProvider {
public final DiscoveryNode discoveryNode;
public final MockTransportService service;
public PublishClusterStateAction action;
public final MockDiscoveryNodesProvider nodesProvider;
public MockPublishAction action;
public final ClusterStateListener listener;
public volatile ClusterState clusterState;
private final ESLogger logger;
public MockNode(DiscoveryNode discoveryNode, MockTransportService service, MockDiscoveryNodesProvider nodesProvider, @Nullable ClusterStateListener listener, ESLogger logger) {
public MockNode(DiscoveryNode discoveryNode, MockTransportService service, @Nullable ClusterStateListener listener, ESLogger logger) {
this.discoveryNode = discoveryNode;
this.service = service;
this.nodesProvider = nodesProvider;
this.listener = listener;
this.logger = logger;
this.clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(DiscoveryNodes.builder().put(discoveryNode).localNodeId(discoveryNode.id()).build()).build();
@ -88,7 +92,6 @@ public class PublishClusterStateActionTests extends ESTestCase {
public void connectTo(DiscoveryNode node) {
service.connectToNode(node);
nodesProvider.addNode(node);
}
@Override
@ -101,6 +104,25 @@ public class PublishClusterStateActionTests extends ESTestCase {
clusterState = newClusterState;
newStateProcessed.onNewClusterStateProcessed();
}
@Override
public DiscoveryNodes nodes() {
return clusterState.nodes();
}
@Override
public NodeService nodeService() {
assert false;
throw new UnsupportedOperationException("Shouldn't be here");
}
}
public MockNode createMockNode(final String name) throws Exception {
return createMockNode(name, Settings.EMPTY, Version.CURRENT);
}
public MockNode createMockNode(String name, Settings settings) throws Exception {
return createMockNode(name, settings, Version.CURRENT);
}
public MockNode createMockNode(final String name, Settings settings, Version version) throws Exception {
@ -108,15 +130,17 @@ public class PublishClusterStateActionTests extends ESTestCase {
}
public MockNode createMockNode(String name, Settings settings, Version version, @Nullable ClusterStateListener listener) throws Exception {
MockTransportService service = buildTransportService(
Settings.builder().put(settings).put("name", name, TransportService.SETTING_TRACE_LOG_INCLUDE, "", TransportService.SETTING_TRACE_LOG_EXCLUDE, "NOTHING").build(),
version
);
DiscoveryNode discoveryNode = new DiscoveryNode(name, name, service.boundAddress().publishAddress(), ImmutableMap.<String, String>of(), version);
MockDiscoveryNodesProvider nodesProvider = new MockDiscoveryNodesProvider(discoveryNode);
MockNode node = new MockNode(discoveryNode, service, nodesProvider, listener, logger);
nodesProvider.addNode(discoveryNode);
node.action = buildPublishClusterStateAction(settings, service, nodesProvider, node);
settings = Settings.builder()
.put("name", name)
.put(TransportService.SETTING_TRACE_LOG_INCLUDE, "", TransportService.SETTING_TRACE_LOG_EXCLUDE, "NOTHING")
.put(settings)
.build();
MockTransportService service = buildTransportService(settings, version);
DiscoveryNode discoveryNode = new DiscoveryNode(name, name, service.boundAddress().publishAddress(),
Maps.newHashMap(settings.getByPrefix("node.").getAsMap()), version);
MockNode node = new MockNode(discoveryNode, service, listener, logger);
node.action = buildPublishClusterStateAction(settings, service, node, node);
final CountDownLatch latch = new CountDownLatch(nodes.size() * 2 + 1);
TransportConnectionListener waitForConnection = new TransportConnectionListener() {
@Override
@ -187,40 +211,13 @@ public class PublishClusterStateActionTests extends ESTestCase {
return transportService;
}
protected PublishClusterStateAction buildPublishClusterStateAction(Settings settings, MockTransportService transportService, MockDiscoveryNodesProvider nodesProvider,
protected MockPublishAction buildPublishClusterStateAction(Settings settings, MockTransportService transportService, DiscoveryNodesProvider nodesProvider,
PublishClusterStateAction.NewClusterStateListener listener) {
DiscoverySettings discoverySettings = new DiscoverySettings(settings, new NodeSettingsService(settings));
return new PublishClusterStateAction(settings, transportService, nodesProvider, listener, discoverySettings, ClusterName.DEFAULT);
return new MockPublishAction(settings, transportService, nodesProvider, listener, discoverySettings, ClusterName.DEFAULT);
}
static class MockDiscoveryNodesProvider implements DiscoveryNodesProvider {
private DiscoveryNodes discoveryNodes = DiscoveryNodes.EMPTY_NODES;
public MockDiscoveryNodesProvider(DiscoveryNode localNode) {
discoveryNodes = DiscoveryNodes.builder().put(localNode).localNodeId(localNode.id()).build();
}
public void addNode(DiscoveryNode node) {
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(node).build();
}
@Override
public DiscoveryNodes nodes() {
return discoveryNodes;
}
@Override
public NodeService nodeService() {
assert false;
throw new UnsupportedOperationException("Shouldn't be here");
}
}
@Test
@TestLogging("cluster:DEBUG,discovery.zen.publish:DEBUG")
public void testSimpleClusterStatePublishing() throws Exception {
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT);
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY, Version.CURRENT);
@ -233,20 +230,20 @@ public class PublishClusterStateActionTests extends ESTestCase {
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(nodeB.discoveryNode).build();
ClusterState previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build();
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromFull(nodeB.clusterState, clusterState);
// cluster state update - add block
previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder().addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build();
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromDiff(nodeB.clusterState, clusterState);
assertThat(nodeB.clusterState.blocks().global().size(), equalTo(1));
// cluster state update - remove block
previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).incrementVersion().build();
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromDiff(nodeB.clusterState, clusterState);
assertTrue(nodeB.clusterState.wasReadFromDiff());
@ -258,7 +255,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
previousClusterState = clusterState;
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(nodeC.discoveryNode).build();
clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build();
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromDiff(nodeB.clusterState, clusterState);
// First state
assertSameStateFromFull(nodeC.clusterState, clusterState);
@ -267,7 +264,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
previousClusterState = clusterState;
MetaData metaData = MetaData.builder(clusterState.metaData()).transientSettings(Settings.settingsBuilder().put("foo", "bar").build()).build();
clusterState = ClusterState.builder(clusterState).metaData(metaData).incrementVersion().build();
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromDiff(nodeB.clusterState, clusterState);
assertThat(nodeB.clusterState.blocks().global().size(), equalTo(0));
assertSameStateFromDiff(nodeC.clusterState, clusterState);
@ -276,7 +273,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
// cluster state update - skipping one version change - should request full cluster state
previousClusterState = ClusterState.builder(clusterState).incrementVersion().build();
clusterState = ClusterState.builder(clusterState).incrementVersion().build();
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromFull(nodeB.clusterState, clusterState);
assertSameStateFromFull(nodeC.clusterState, clusterState);
assertFalse(nodeC.clusterState.wasReadFromDiff());
@ -286,16 +283,17 @@ public class PublishClusterStateActionTests extends ESTestCase {
.put(nodeA.discoveryNode)
.put(nodeB.discoveryNode)
.put(nodeC.discoveryNode)
.masterNodeId(nodeB.discoveryNode.id())
.localNodeId(nodeB.discoveryNode.id())
.build();
previousClusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build();
clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build();
publishStateDiffAndWait(nodeB.action, clusterState, previousClusterState);
publishStateAndWait(nodeB.action, clusterState, previousClusterState);
assertSameStateFromFull(nodeA.clusterState, clusterState);
assertSameStateFromFull(nodeC.clusterState, clusterState);
}
@Test
@TestLogging("cluster:DEBUG,discovery.zen.publish:DEBUG")
public void testUnexpectedDiffPublishing() throws Exception {
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT, new ClusterStateListener() {
@ -311,18 +309,17 @@ public class PublishClusterStateActionTests extends ESTestCase {
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).put(nodeB.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build();
ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build();
ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build();
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromFull(nodeB.clusterState, clusterState);
// cluster state update - add block
previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder().addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build();
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromDiff(nodeB.clusterState, clusterState);
}
@Test
@TestLogging("cluster:DEBUG,discovery.zen.publish:DEBUG")
public void testDisablingDiffPublishing() throws Exception {
Settings noDiffPublishingSettings = Settings.builder().put(DiscoverySettings.PUBLISH_DIFF_ENABLE, false).build();
@ -348,39 +345,41 @@ public class PublishClusterStateActionTests extends ESTestCase {
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(nodeB.discoveryNode).build();
ClusterState previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build();
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
// cluster state update - add block
previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder().addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build();
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
}
@Test
@TestLogging("cluster:DEBUG,discovery.zen.publish:DEBUG")
/**
* Test concurrent publishing works correctly (although not strictly required, it's a good testamne
*/
@Test
public void testSimultaneousClusterStatePublishing() throws Exception {
int numberOfNodes = randomIntBetween(2, 10);
int numberOfIterations = randomIntBetween(50, 200);
int numberOfIterations = randomIntBetween(10, 50);
Settings settings = Settings.builder().put(DiscoverySettings.PUBLISH_DIFF_ENABLE, randomBoolean()).build();
MockNode[] nodes = new MockNode[numberOfNodes];
DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder();
for (int i = 0; i < nodes.length; i++) {
MockNode master = null;
for (int i = 0; i < numberOfNodes; i++) {
final String name = "node" + i;
nodes[i] = createMockNode(name, settings, Version.CURRENT, new ClusterStateListener() {
final MockNode node = createMockNode(name, settings, Version.CURRENT, new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
assertProperMetaDataForVersion(event.state().metaData(), event.state().version());
}
});
discoveryNodesBuilder.put(nodes[i].discoveryNode);
if (i == 0) {
master = node;
}
discoveryNodesBuilder.put(node.discoveryNode);
}
AssertingAckListener[] listeners = new AssertingAckListener[numberOfIterations];
discoveryNodesBuilder.localNodeId(nodes[0].discoveryNode.id());
discoveryNodesBuilder.localNodeId(master.discoveryNode.id());
DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build();
MetaData metaData = MetaData.EMPTY_META_DATA;
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
@ -389,17 +388,17 @@ public class PublishClusterStateActionTests extends ESTestCase {
previousState = clusterState;
metaData = buildMetaDataForVersion(metaData, i + 1);
clusterState = ClusterState.builder(clusterState).incrementVersion().metaData(metaData).nodes(discoveryNodes).build();
listeners[i] = publishStateDiff(nodes[0].action, clusterState, previousState);
listeners[i] = publishState(master.action, clusterState, previousState);
}
for (int i = 0; i < numberOfIterations; i++) {
listeners[i].await(1, TimeUnit.SECONDS);
}
// fake node[0] - it is the master
nodes[0].clusterState = clusterState;
// set the master cs
master.clusterState = clusterState;
for (MockNode node : nodes) {
for (MockNode node : nodes.values()) {
assertThat(node.discoveryNode + " misses a cluster state", node.clusterState, notNullValue());
assertThat(node.discoveryNode + " unexpected cluster state: " + node.clusterState, node.clusterState.version(), equalTo(clusterState.version()));
assertThat(node.clusterState.nodes().localNode(), equalTo(node.discoveryNode));
@ -407,7 +406,6 @@ public class PublishClusterStateActionTests extends ESTestCase {
}
@Test
@TestLogging("cluster:DEBUG,discovery.zen.publish:DEBUG")
public void testSerializationFailureDuringDiffPublishing() throws Exception {
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT, new ClusterStateListener() {
@ -423,7 +421,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).put(nodeB.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build();
ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build();
ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build();
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromFull(nodeB.clusterState, clusterState);
// cluster state update - add block
@ -447,10 +445,216 @@ public class PublishClusterStateActionTests extends ESTestCase {
};
}
};
List<Tuple<DiscoveryNode, Throwable>> errors = publishStateDiff(nodeA.action, unserializableClusterState, previousClusterState).awaitErrors(1, TimeUnit.SECONDS);
assertThat(errors.size(), equalTo(1));
assertThat(errors.get(0).v2().getMessage(), containsString("Simulated failure of diff serialization"));
try {
publishStateAndWait(nodeA.action, unserializableClusterState, previousClusterState);
fail("cluster state published despite of diff errors");
} catch (ElasticsearchException e) {
assertThat(e.getCause(), notNullValue());
assertThat(e.getCause().getMessage(), containsString("Simulated"));
}
}
public void testFailToPublishWithLessThanMinMasterNodes() throws Exception {
final int masterNodes = randomIntBetween(1, 10);
MockNode master = createMockNode("master");
DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder().put(master.discoveryNode);
for (int i = 1; i < masterNodes; i++) {
discoveryNodesBuilder.put(createMockNode("node" + i).discoveryNode);
}
final int dataNodes = randomIntBetween(0, 5);
final Settings dataSettings = Settings.builder().put("node.master", false).build();
for (int i = 0; i < dataNodes; i++) {
discoveryNodesBuilder.put(createMockNode("data_" + i, dataSettings).discoveryNode);
}
discoveryNodesBuilder.localNodeId(master.discoveryNode.id()).masterNodeId(master.discoveryNode.id());
DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build();
MetaData metaData = MetaData.EMPTY_META_DATA;
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).nodes(discoveryNodes).build();
ClusterState previousState = master.clusterState;
try {
publishState(master.action, clusterState, previousState, masterNodes + randomIntBetween(1, 5));
fail("cluster state publishing didn't fail despite of not having enough nodes");
} catch (PublishClusterStateAction.FailedToCommitException expected) {
logger.debug("failed to publish as expected", expected);
}
}
public void testPublishingWithSendingErrors() throws Exception {
int goodNodes = randomIntBetween(2, 5);
int errorNodes = randomIntBetween(1, 5);
int timeOutNodes = randomBoolean() ? 0 : randomIntBetween(1, 5); // adding timeout nodes will force timeout errors
final int numberOfMasterNodes = goodNodes + errorNodes + timeOutNodes + 1; // master
final boolean expectingToCommit = randomBoolean();
Settings.Builder settings = Settings.builder();
// make sure we have a reasonable timeout if we expect to timeout, o.w. one that will make the test "hang"
settings.put(DiscoverySettings.COMMIT_TIMEOUT, expectingToCommit == false && timeOutNodes > 0 ? "100ms" : "1h")
.put(DiscoverySettings.PUBLISH_TIMEOUT, "5ms"); // test is about comitting
MockNode master = createMockNode("master", settings.build());
// randomize things a bit
int[] nodeTypes = new int[goodNodes + errorNodes + timeOutNodes];
for (int i = 0; i < goodNodes; i++) {
nodeTypes[i] = 0;
}
for (int i = goodNodes; i < goodNodes + errorNodes; i++) {
nodeTypes[i] = 1;
}
for (int i = goodNodes + errorNodes; i < nodeTypes.length; i++) {
nodeTypes[i] = 2;
}
Collections.shuffle(Arrays.asList(nodeTypes), random());
DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder().put(master.discoveryNode);
for (int i = 0; i < nodeTypes.length; i++) {
final MockNode mockNode = createMockNode("node" + i);
discoveryNodesBuilder.put(mockNode.discoveryNode);
switch (nodeTypes[i]) {
case 1:
mockNode.action.errorOnSend.set(true);
break;
case 2:
mockNode.action.timeoutOnSend.set(true);
break;
}
}
final int dataNodes = randomIntBetween(0, 3); // data nodes don't matter
for (int i = 0; i < dataNodes; i++) {
final MockNode mockNode = createMockNode("data_" + i, Settings.builder().put("node.master", false).build());
discoveryNodesBuilder.put(mockNode.discoveryNode);
if (randomBoolean()) {
// we really don't care - just chaos monkey
mockNode.action.errorOnCommit.set(randomBoolean());
mockNode.action.errorOnSend.set(randomBoolean());
mockNode.action.timeoutOnCommit.set(randomBoolean());
mockNode.action.timeoutOnSend.set(randomBoolean());
}
}
final int minMasterNodes;
final String expectedBehavior;
if (expectingToCommit) {
minMasterNodes = randomIntBetween(0, goodNodes + 1); // count master
expectedBehavior = "succeed";
} else {
minMasterNodes = randomIntBetween(goodNodes + 2, numberOfMasterNodes); // +2 because of master
expectedBehavior = timeOutNodes > 0 ? "timeout" : "fail";
}
logger.info("--> expecting commit to {}. good nodes [{}], errors [{}], timeouts [{}]. min_master_nodes [{}]",
expectedBehavior, goodNodes + 1, errorNodes, timeOutNodes, minMasterNodes);
discoveryNodesBuilder.localNodeId(master.discoveryNode.id()).masterNodeId(master.discoveryNode.id());
DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build();
MetaData metaData = MetaData.EMPTY_META_DATA;
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).nodes(discoveryNodes).build();
ClusterState previousState = master.clusterState;
try {
publishState(master.action, clusterState, previousState, minMasterNodes);
if (expectingToCommit == false) {
fail("cluster state publishing didn't fail despite of not have enough nodes");
}
} catch (PublishClusterStateAction.FailedToCommitException exception) {
logger.debug("failed to publish as expected", exception);
if (expectingToCommit) {
throw exception;
}
assertThat(exception.getMessage(), containsString(timeOutNodes > 0 ? "timed out" : "failed"));
}
}
public void testIncomingClusterStateVerification() throws Exception {
MockNode node = createMockNode("node");
logger.info("--> testing acceptances of any master when having no master");
ClusterState state = ClusterState.builder(node.clusterState)
.nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId(randomAsciiOfLength(10))).build();
node.action.validateIncomingState(state);
// now set a master node
node.clusterState = ClusterState.builder(node.clusterState).nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build();
logger.info("--> testing rejection of another master");
try {
node.action.validateIncomingState(state);
fail("node accepted state from another master");
} catch (IllegalStateException OK) {
}
logger.info("--> test state from the current master is accepted");
node.action.validateIncomingState(ClusterState.builder(node.clusterState)
.nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build());
logger.info("--> testing rejection of another cluster name");
try {
node.action.validateIncomingState(ClusterState.builder(new ClusterName(randomAsciiOfLength(10))).nodes(node.nodes()).build());
fail("node accepted state with another cluster name");
} catch (IllegalStateException OK) {
}
logger.info("--> testing rejection of a cluster state with wrong local node");
try {
state = ClusterState.builder(node.clusterState).nodes(DiscoveryNodes.builder(node.nodes()).localNodeId("_non_existing_").build()).build();
node.action.validateIncomingState(state);
fail("node accepted state with non-existence local node");
} catch (IllegalStateException OK) {
}
try {
MockNode otherNode = createMockNode("otherNode");
state = ClusterState.builder(node.clusterState).nodes(
DiscoveryNodes.builder(node.nodes()).put(otherNode.discoveryNode).localNodeId(otherNode.discoveryNode.id()).build()
).build();
node.action.validateIncomingState(state);
fail("node accepted state with existent but wrong local node");
} catch (IllegalStateException OK) {
}
}
public void testInterleavedPublishCommit() throws Throwable {
MockNode node = createMockNode("node");
final ClusterState state1 = ClusterState.builder(node.clusterState).incrementVersion().build();
final ClusterState state2 = ClusterState.builder(state1).incrementVersion().build();
final BytesReference state1Bytes = PublishClusterStateAction.serializeFullClusterState(state1, Version.CURRENT);
final BytesReference state2Bytes = PublishClusterStateAction.serializeFullClusterState(state2, Version.CURRENT);
final CapturingTransportChannel channel = new CapturingTransportChannel();
node.action.handleIncomingClusterStateRequest(new BytesTransportRequest(state1Bytes, Version.CURRENT), channel);
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
assertThat(channel.error.get(), nullValue());
channel.clear();
// another incoming state is OK. Should just override pending state
node.action.handleIncomingClusterStateRequest(new BytesTransportRequest(state2Bytes, Version.CURRENT), channel);
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
assertThat(channel.error.get(), nullValue());
channel.clear();
// committing previous state should fail
try {
node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state1.stateUUID()), channel);
// sadly, there are ways to percolate errors
assertThat(channel.response.get(), nullValue());
assertThat(channel.error.get(), notNullValue());
if (channel.error.get() instanceof IllegalStateException == false) {
throw channel.error.get();
}
} catch (IllegalStateException OK) {
}
channel.clear();
// committing second state should succeed
node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state2.stateUUID()), channel);
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
assertThat(channel.error.get(), nullValue());
channel.clear();
// now check it was really committed
assertSameState(node.clusterState, state2);
}
private MetaData buildMetaDataForVersion(MetaData metaData, long version) {
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.builder(metaData.indices());
@ -471,15 +675,19 @@ public class PublishClusterStateActionTests extends ESTestCase {
assertThat(metaData.transientSettings().get("test"), equalTo(Long.toString(version)));
}
public void publishStateDiffAndWait(PublishClusterStateAction action, ClusterState state, ClusterState previousState) throws InterruptedException {
publishStateDiff(action, state, previousState).await(1, TimeUnit.SECONDS);
public void publishStateAndWait(PublishClusterStateAction action, ClusterState state, ClusterState previousState) throws InterruptedException {
publishState(action, state, previousState).await(1, TimeUnit.SECONDS);
}
public AssertingAckListener publishStateDiff(PublishClusterStateAction action, ClusterState state, ClusterState previousState) throws InterruptedException {
public AssertingAckListener publishState(PublishClusterStateAction action, ClusterState state, ClusterState previousState) throws InterruptedException {
final int minimumMasterNodes = randomIntBetween(-1, state.nodes().getMasterNodes().size());
return publishState(action, state, previousState, minimumMasterNodes);
}
public AssertingAckListener publishState(PublishClusterStateAction action, ClusterState state, ClusterState previousState, int minMasterNodes) throws InterruptedException {
AssertingAckListener assertingAckListener = new AssertingAckListener(state.nodes().getSize() - 1);
ClusterChangedEvent changedEvent = new ClusterChangedEvent("test update", state, previousState);
int requiredNodes = randomIntBetween(-1, state.nodes().getSize() - 1);
action.publish(changedEvent, requiredNodes, assertingAckListener);
action.publish(changedEvent, minMasterNodes, assertingAckListener);
return assertingAckListener;
}
@ -522,19 +730,11 @@ public class PublishClusterStateActionTests extends ESTestCase {
}
public static class DelegatingClusterState extends ClusterState {
public DelegatingClusterState(ClusterState clusterState) {
super(clusterState.version(), clusterState.stateUUID(), clusterState);
}
}
void assertSameState(ClusterState actual, ClusterState expected) {
assertThat(actual, notNullValue());
assertThat("\n--> actual ClusterState: " + actual.prettyPrint() + "\n--> expected ClusterState:" + expected.prettyPrint(), actual.stateUUID(), equalTo(expected.stateUUID()));
final String reason = "\n--> actual ClusterState: " + actual.prettyPrint() + "\n--> expected ClusterState:" + expected.prettyPrint();
assertThat("unequal UUIDs" + reason, actual.stateUUID(), equalTo(expected.stateUUID()));
assertThat("unequal versions" + reason, actual.version(), equalTo(expected.version()));
}
void assertSameStateFromDiff(ClusterState actual, ClusterState expected) {
@ -546,4 +746,77 @@ public class PublishClusterStateActionTests extends ESTestCase {
assertSameState(actual, expected);
assertFalse(actual.wasReadFromDiff());
}
static class MockPublishAction extends PublishClusterStateAction {
AtomicBoolean timeoutOnSend = new AtomicBoolean();
AtomicBoolean errorOnSend = new AtomicBoolean();
AtomicBoolean timeoutOnCommit = new AtomicBoolean();
AtomicBoolean errorOnCommit = new AtomicBoolean();
public MockPublishAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, NewClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
super(settings, transportService, nodesProvider, listener, discoverySettings, clusterName);
}
@Override
protected void handleIncomingClusterStateRequest(BytesTransportRequest request, TransportChannel channel) throws IOException {
if (errorOnSend.get()) {
throw new ElasticsearchException("forced error on incoming cluster state");
}
if (timeoutOnSend.get()) {
return;
}
super.handleIncomingClusterStateRequest(request, channel);
}
@Override
protected void handleCommitRequest(PublishClusterStateAction.CommitClusterStateRequest request, TransportChannel channel) {
if (errorOnCommit.get()) {
throw new ElasticsearchException("forced error on incoming commit");
}
if (timeoutOnCommit.get()) {
return;
}
super.handleCommitRequest(request, channel);
}
}
static class CapturingTransportChannel implements TransportChannel {
AtomicReference<TransportResponse> response = new AtomicReference<>();
AtomicReference<Throwable> error = new AtomicReference<>();
public void clear() {
response.set(null);
error.set(null);
}
@Override
public String action() {
return "_noop_";
}
@Override
public String getProfileName() {
return "_noop_";
}
@Override
public void sendResponse(TransportResponse response) throws IOException {
this.response.set(response);
assertThat(error.get(), nullValue());
}
@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
this.response.set(response);
assertThat(error.get(), nullValue());
}
@Override
public void sendResponse(Throwable error) throws IOException {
this.error.set(error);
assertThat(response.get(), nullValue());
}
}
}

View File

@ -60,6 +60,10 @@ public class NetworkDelaysPartition extends NetworkPartition {
this(nodesSideOne, nodesSideTwo, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX, random);
}
public NetworkDelaysPartition(Set<String> nodesSideOne, Set<String> nodesSideTwo, long delay, Random random) {
this(nodesSideOne, nodesSideTwo, delay, delay, random);
}
public NetworkDelaysPartition(Set<String> nodesSideOne, Set<String> nodesSideTwo, long delayMin, long delayMax, Random random) {
super(nodesSideOne, nodesSideTwo, random);
this.delayMin = delayMin;
@ -69,7 +73,7 @@ public class NetworkDelaysPartition extends NetworkPartition {
@Override
public synchronized void startDisrupting() {
duration = new TimeValue(delayMin + random.nextInt((int) (delayMax - delayMin)));
duration = new TimeValue(delayMin == delayMax ? delayMin : delayMin + random.nextInt((int) (delayMax - delayMin)));
super.startDisrupting();
}