diff --git a/core/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java b/core/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java index acce73ccd9f..e699489281b 100644 --- a/core/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java +++ b/core/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java @@ -36,10 +36,12 @@ import java.util.EnumSet; public class DiscoverySettings extends AbstractComponent { public static final String PUBLISH_TIMEOUT = "discovery.zen.publish_timeout"; + public static final String COMMIT_TIMEOUT = "discovery.zen.commit_timeout"; public static final String NO_MASTER_BLOCK = "discovery.zen.no_master_block"; public static final String PUBLISH_DIFF_ENABLE = "discovery.zen.publish_diff.enable"; public static final TimeValue DEFAULT_PUBLISH_TIMEOUT = TimeValue.timeValueSeconds(30); + public static final TimeValue DEFAULT_COMMIT_TIMEOUT = TimeValue.timeValueSeconds(1); public static final String DEFAULT_NO_MASTER_BLOCK = "write"; public final static int NO_MASTER_BLOCK_ID = 2; public final static boolean DEFAULT_PUBLISH_DIFF_ENABLE = true; @@ -49,6 +51,7 @@ public class DiscoverySettings extends AbstractComponent { private volatile ClusterBlock noMasterBlock; private volatile TimeValue publishTimeout = DEFAULT_PUBLISH_TIMEOUT; + private volatile TimeValue commitTimeout = DEFAULT_COMMIT_TIMEOUT; private volatile boolean publishDiff = DEFAULT_PUBLISH_DIFF_ENABLE; @Inject @@ -57,6 +60,7 @@ public class DiscoverySettings extends AbstractComponent { nodeSettingsService.addListener(new ApplySettings()); this.noMasterBlock = parseNoMasterBlock(settings.get(NO_MASTER_BLOCK, DEFAULT_NO_MASTER_BLOCK)); this.publishTimeout = settings.getAsTime(PUBLISH_TIMEOUT, publishTimeout); + this.commitTimeout = settings.getAsTime(PUBLISH_TIMEOUT, publishTimeout); this.publishDiff = settings.getAsBoolean(PUBLISH_DIFF_ENABLE, DEFAULT_PUBLISH_DIFF_ENABLE); } @@ -67,6 +71,10 @@ public class DiscoverySettings extends AbstractComponent { return publishTimeout; } + public TimeValue getCommitTimeout() { + return commitTimeout; + } + public ClusterBlock getNoMasterBlock() { return noMasterBlock; } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index b62db46aca9..f522468c3c1 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.InternalClusterService; @@ -199,7 +200,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName); this.nodesFD.addListener(new NodeFaultDetectionListener()); - this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings); + this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings, clusterName); this.pingService.setPingContextProvider(this); this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener()); @@ -329,7 +330,24 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen throw new IllegalStateException("Shouldn't publish state when not master"); } nodesFD.updateNodesAndPing(clusterChangedEvent.state()); - publishClusterState.publish(clusterChangedEvent, ackListener); + try { + publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener); + } catch (PublishClusterStateAction.FailedToCommitException t) { + logger.warn("failed to publish [{}] (not enough nodes acknowledged, min master nodes [{}])", clusterChangedEvent.state().version(), electMaster.minimumMasterNodes()); + clusterService.submitStateUpdateTask("zen-disco-failed-to-publish", Priority.IMMEDIATE, new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return rejoin(currentState, "failed to publish to min_master_nodes"); + } + + @Override + public void onFailure(String source, Throwable t) { + logger.error("unexpected failure during [{}]", t, source); + } + + }); + throw t; + } } /** @@ -677,12 +695,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen void handleNewClusterStateFromMaster(ClusterState newClusterState, final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) { final ClusterName incomingClusterName = newClusterState.getClusterName(); - /* The cluster name can still be null if the state comes from a node that is prev 1.1.1*/ - if (incomingClusterName != null && !incomingClusterName.equals(this.clusterName)) { - logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", newClusterState.nodes().masterNode(), incomingClusterName); - newStateProcessed.onNewClusterStateFailed(new IllegalStateException("received state from a node that is not part of the cluster")); - return; - } if (localNodeMaster()) { logger.debug("received cluster state from [{}] which is also master with cluster name [{}]", newClusterState.nodes().masterNode(), incomingClusterName); final ClusterState newState = newClusterState; @@ -705,101 +717,97 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen }); } else { - if (newClusterState.nodes().localNode() == null) { - logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", newClusterState.nodes().masterNode()); - newStateProcessed.onNewClusterStateFailed(new IllegalStateException("received state from a node that is not part of the cluster")); - } else { - - final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState); - processNewClusterStates.add(processClusterState); - - assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master"; - assert !newClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block"; - - clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - // we already processed it in a previous event - if (processClusterState.processed) { - return currentState; - } - - // TODO: once improvement that we can do is change the message structure to include version and masterNodeId - // at the start, this will allow us to keep the "compressed bytes" around, and only parse the first page - // to figure out if we need to use it or not, and only once we picked the latest one, parse the whole state - ClusterState updatedState = selectNextStateToProcess(processNewClusterStates); - if (updatedState == null) { - updatedState = currentState; - } - if (shouldIgnoreOrRejectNewClusterState(logger, currentState, updatedState)) { - return currentState; - } + final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState); + processNewClusterStates.add(processClusterState); - // we don't need to do this, since we ping the master, and get notified when it has moved from being a master - // because it doesn't have enough master nodes... - //if (!electMaster.hasEnoughMasterNodes(newState.nodes())) { - // return disconnectFromCluster(newState, "not enough master nodes on new cluster state wreceived from [" + newState.nodes().masterNode() + "]"); - //} + assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master"; + assert !newClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block"; - // check to see that we monitor the correct master of the cluster - if (masterFD.masterNode() == null || !masterFD.masterNode().equals(updatedState.nodes().masterNode())) { - masterFD.restart(updatedState.nodes().masterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]"); - } + clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + // we already processed it in a previous event + if (processClusterState.processed) { + return currentState; + } - if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) { - // its a fresh update from the master as we transition from a start of not having a master to having one - logger.debug("got first state from fresh master [{}]", updatedState.nodes().masterNodeId()); - long count = clusterJoinsCounter.incrementAndGet(); - logger.trace("updated cluster join cluster to [{}]", count); - - return updatedState; - } + // TODO: once improvement that we can do is change the message structure to include version and masterNodeId + // at the start, this will allow us to keep the "compressed bytes" around, and only parse the first page + // to figure out if we need to use it or not, and only once we picked the latest one, parse the whole state - // some optimizations to make sure we keep old objects where possible - ClusterState.Builder builder = ClusterState.builder(updatedState); + ClusterState updatedState = selectNextStateToProcess(processNewClusterStates); + if (updatedState == null) { + updatedState = currentState; + } + if (shouldIgnoreOrRejectNewClusterState(logger, currentState, updatedState)) { + return currentState; + } - // if the routing table did not change, use the original one - if (updatedState.routingTable().version() == currentState.routingTable().version()) { - builder.routingTable(currentState.routingTable()); - } - // same for metadata - if (updatedState.metaData().version() == currentState.metaData().version()) { - builder.metaData(currentState.metaData()); - } else { - // if its not the same version, only copy over new indices or ones that changed the version - MetaData.Builder metaDataBuilder = MetaData.builder(updatedState.metaData()).removeAllIndices(); - for (IndexMetaData indexMetaData : updatedState.metaData()) { - IndexMetaData currentIndexMetaData = currentState.metaData().index(indexMetaData.index()); - if (currentIndexMetaData != null && currentIndexMetaData.isSameUUID(indexMetaData.indexUUID()) && - currentIndexMetaData.version() == indexMetaData.version()) { - // safe to reuse - metaDataBuilder.put(currentIndexMetaData, false); - } else { - metaDataBuilder.put(indexMetaData, false); - } + // we don't need to do this, since we ping the master, and get notified when it has moved from being a master + // because it doesn't have enough master nodes... + //if (!electMaster.hasEnoughMasterNodes(newState.nodes())) { + // return disconnectFromCluster(newState, "not enough master nodes on new cluster state wreceived from [" + newState.nodes().masterNode() + "]"); + //} + + // check to see that we monitor the correct master of the cluster + if (masterFD.masterNode() == null || !masterFD.masterNode().equals(updatedState.nodes().masterNode())) { + masterFD.restart(updatedState.nodes().masterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]"); + } + + if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) { + // its a fresh update from the master as we transition from a start of not having a master to having one + logger.debug("got first state from fresh master [{}]", updatedState.nodes().masterNodeId()); + long count = clusterJoinsCounter.incrementAndGet(); + logger.trace("updated cluster join cluster to [{}]", count); + + return updatedState; + } + + + // some optimizations to make sure we keep old objects where possible + ClusterState.Builder builder = ClusterState.builder(updatedState); + + // if the routing table did not change, use the original one + if (updatedState.routingTable().version() == currentState.routingTable().version()) { + builder.routingTable(currentState.routingTable()); + } + // same for metadata + if (updatedState.metaData().version() == currentState.metaData().version()) { + builder.metaData(currentState.metaData()); + } else { + // if its not the same version, only copy over new indices or ones that changed the version + MetaData.Builder metaDataBuilder = MetaData.builder(updatedState.metaData()).removeAllIndices(); + for (IndexMetaData indexMetaData : updatedState.metaData()) { + IndexMetaData currentIndexMetaData = currentState.metaData().index(indexMetaData.index()); + if (currentIndexMetaData != null && currentIndexMetaData.isSameUUID(indexMetaData.indexUUID()) && + currentIndexMetaData.version() == indexMetaData.version()) { + // safe to reuse + metaDataBuilder.put(currentIndexMetaData, false); + } else { + metaDataBuilder.put(indexMetaData, false); } - builder.metaData(metaDataBuilder); } - - return builder.build(); + builder.metaData(metaDataBuilder); } - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); - newStateProcessed.onNewClusterStateFailed(t); - } + return builder.build(); + } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - sendInitialStateEventIfNeeded(); - newStateProcessed.onNewClusterStateProcessed(); - } - }); - } + @Override + public void onFailure(String source, Throwable t) { + logger.error("unexpected failure during [{}]", t, source); + newStateProcessed.onNewClusterStateFailed(t); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + sendInitialStateEventIfNeeded(); + newStateProcessed.onNewClusterStateProcessed(); + } + }); } } @@ -848,13 +856,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen * If the second condition fails we ignore the cluster state. */ static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) { - if (currentState.nodes().masterNodeId() == null) { - return false; - } - if (!currentState.nodes().masterNodeId().equals(newClusterState.nodes().masterNodeId())) { - logger.warn("received a cluster state from a different master then the current one, rejecting (received {}, current {})", newClusterState.nodes().masterNode(), currentState.nodes().masterNode()); - throw new IllegalStateException("cluster state from a different master than the current one, rejecting (received " + newClusterState.nodes().masterNode() + ", current " + currentState.nodes().masterNode() + ")"); - } else if (newClusterState.version() < currentState.version()) { + rejectNewClusterStateIfNeeded(logger, currentState.nodes(), newClusterState); + if (currentState.nodes().masterNodeId() != null && newClusterState.version() < currentState.version()) { // if the new state has a smaller version, and it has the same master node, then no need to process it logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version()); return true; @@ -863,6 +866,21 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } } + /** + * In the case we follow an elected master the new cluster state needs to have the same elected master + * This method checks for this and throws an exception if needed + */ + + public static void rejectNewClusterStateIfNeeded(ESLogger logger, DiscoveryNodes currentNodes, ClusterState newClusterState) { + if (currentNodes.masterNodeId() == null) { + return; + } + if (!currentNodes.masterNodeId().equals(newClusterState.nodes().masterNodeId())) { + logger.warn("received a cluster state from a different master then the current one, rejecting (received {}, current {})", newClusterState.nodes().masterNode(), currentNodes.masterNode()); + throw new IllegalStateException("cluster state from a different master then the current one, rejecting (received " + newClusterState.nodes().masterNode() + ", current " + currentNodes.masterNode() + ")"); + } + } + void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) { if (!transportService.addressSupported(node.address().getClass())) { @@ -1300,4 +1318,4 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } } -} \ No newline at end of file +} diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 78c13f8ce53..050805bcf5a 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -20,11 +20,9 @@ package org.elasticsearch.discovery.zen.publish; import com.google.common.collect.Maps; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.Diff; -import org.elasticsearch.cluster.IncompatibleClusterStateVersionException; +import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; @@ -41,13 +39,17 @@ import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; +import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -55,7 +57,8 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class PublishClusterStateAction extends AbstractComponent { - public static final String ACTION_NAME = "internal:discovery/zen/publish"; + public static final String SEND_ACTION_NAME = "internal:discovery/zen/publish/send"; + public static final String COMMIT_ACTION_NAME = "internal:discovery/zen/publish/commit"; public interface NewClusterStateListener { @@ -73,34 +76,41 @@ public class PublishClusterStateAction extends AbstractComponent { private final DiscoveryNodesProvider nodesProvider; private final NewClusterStateListener listener; private final DiscoverySettings discoverySettings; + private final ClusterName clusterName; public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, - NewClusterStateListener listener, DiscoverySettings discoverySettings) { + NewClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { super(settings); this.transportService = transportService; this.nodesProvider = nodesProvider; this.listener = listener; this.discoverySettings = discoverySettings; - transportService.registerRequestHandler(ACTION_NAME, BytesTransportRequest.class, ThreadPool.Names.SAME, new PublishClusterStateRequestHandler()); + this.clusterName = clusterName; + transportService.registerRequestHandler(SEND_ACTION_NAME, BytesTransportRequest.class, ThreadPool.Names.SAME, new SendClusterStateRequestHandler()); + transportService.registerRequestHandler(COMMIT_ACTION_NAME, CommitClusterStateRequest.class, ThreadPool.Names.SAME, new CommitClusterStateRequestHandler()); } public void close() { - transportService.removeHandler(ACTION_NAME); + transportService.removeHandler(SEND_ACTION_NAME); + transportService.removeHandler(COMMIT_ACTION_NAME); } - public void publish(ClusterChangedEvent clusterChangedEvent, final Discovery.AckListener ackListener) { + public void publish(ClusterChangedEvent clusterChangedEvent, int minMasterNodes, final Discovery.AckListener ackListener) { Set nodesToPublishTo = new HashSet<>(clusterChangedEvent.state().nodes().size()); DiscoveryNode localNode = nodesProvider.nodes().localNode(); + int totalMasterNodes = 0; for (final DiscoveryNode node : clusterChangedEvent.state().nodes()) { - if (node.equals(localNode)) { - continue; + if (node.isMasterNode()) { + totalMasterNodes++; + } + if (node.equals(localNode) == false) { + nodesToPublishTo.add(node); } - nodesToPublishTo.add(node); } - publish(clusterChangedEvent, nodesToPublishTo, new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener)); + publish(clusterChangedEvent, minMasterNodes, totalMasterNodes, nodesToPublishTo, new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener)); } - private void publish(final ClusterChangedEvent clusterChangedEvent, final Set nodesToPublishTo, + private void publish(final ClusterChangedEvent clusterChangedEvent, int minMasterNodes, int totalMasterNodes, final Set nodesToPublishTo, final BlockingClusterStatePublishResponseHandler publishResponseHandler) { Map serializedStates = Maps.newHashMap(); @@ -111,6 +121,7 @@ public class PublishClusterStateAction extends AbstractComponent { final AtomicBoolean timedOutWaitingForNodes = new AtomicBoolean(false); final TimeValue publishTimeout = discoverySettings.getPublishTimeout(); final boolean sendFullVersion = !discoverySettings.getPublishDiff() || previousState == null; + final SendingController sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes, totalMasterNodes, publishResponseHandler); Diff diff = null; for (final DiscoveryNode node : nodesToPublishTo) { @@ -119,15 +130,17 @@ public class PublishClusterStateAction extends AbstractComponent { // per node when we send it over the wire, compress it while we are at it... // we don't send full version if node didn't exist in the previous version of cluster state if (sendFullVersion || !previousState.nodes().nodeExists(node.id())) { - sendFullClusterState(clusterState, serializedStates, node, timedOutWaitingForNodes, publishTimeout, publishResponseHandler); + sendFullClusterState(clusterState, serializedStates, node, timedOutWaitingForNodes, publishTimeout, sendingController); } else { if (diff == null) { diff = clusterState.diff(previousState); } - sendClusterStateDiff(clusterState, diff, serializedDiffs, node, timedOutWaitingForNodes, publishTimeout, publishResponseHandler); + sendClusterStateDiff(clusterState, diff, serializedDiffs, node, timedOutWaitingForNodes, publishTimeout, sendingController); } } + sendingController.waitForCommit(discoverySettings.getCommitTimeout()); + if (publishTimeout.millis() > 0) { // only wait if the publish timeout is configured... try { @@ -148,7 +161,7 @@ public class PublishClusterStateAction extends AbstractComponent { private void sendFullClusterState(ClusterState clusterState, @Nullable Map serializedStates, DiscoveryNode node, AtomicBoolean timedOutWaitingForNodes, TimeValue publishTimeout, - BlockingClusterStatePublishResponseHandler publishResponseHandler) { + SendingController sendingController) { BytesReference bytes = null; if (serializedStates != null) { bytes = serializedStates.get(node.version()); @@ -161,16 +174,16 @@ public class PublishClusterStateAction extends AbstractComponent { } } catch (Throwable e) { logger.warn("failed to serialize cluster_state before publishing it to node {}", e, node); - publishResponseHandler.onFailure(node, e); + sendingController.onNodeSendFailed(node, e); return; } } - publishClusterStateToNode(clusterState, bytes, node, timedOutWaitingForNodes, publishTimeout, publishResponseHandler, false); + sendClusterStateToNode(clusterState, bytes, node, timedOutWaitingForNodes, publishTimeout, sendingController, false); } private void sendClusterStateDiff(ClusterState clusterState, Diff diff, Map serializedDiffs, DiscoveryNode node, AtomicBoolean timedOutWaitingForNodes, TimeValue publishTimeout, - BlockingClusterStatePublishResponseHandler publishResponseHandler) { + SendingController sendingController) { BytesReference bytes = serializedDiffs.get(node.version()); if (bytes == null) { try { @@ -178,23 +191,23 @@ public class PublishClusterStateAction extends AbstractComponent { serializedDiffs.put(node.version(), bytes); } catch (Throwable e) { logger.warn("failed to serialize diff of cluster_state before publishing it to node {}", e, node); - publishResponseHandler.onFailure(node, e); + sendingController.onNodeSendFailed(node, e); return; } } - publishClusterStateToNode(clusterState, bytes, node, timedOutWaitingForNodes, publishTimeout, publishResponseHandler, true); + sendClusterStateToNode(clusterState, bytes, node, timedOutWaitingForNodes, publishTimeout, sendingController, true); } - private void publishClusterStateToNode(final ClusterState clusterState, BytesReference bytes, - final DiscoveryNode node, final AtomicBoolean timedOutWaitingForNodes, - final TimeValue publishTimeout, - final BlockingClusterStatePublishResponseHandler publishResponseHandler, - final boolean sendDiffs) { + private void sendClusterStateToNode(final ClusterState clusterState, BytesReference bytes, + final DiscoveryNode node, final AtomicBoolean timedOutWaitingForNodes, + final TimeValue publishTimeout, + final SendingController sendingController, + final boolean sendDiffs) { try { TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false); // no need to put a timeout on the options here, because we want the response to eventually be received // and not log an error if it arrives after the timeout - transportService.sendRequest(node, ACTION_NAME, + transportService.sendRequest(node, SEND_ACTION_NAME, new BytesTransportRequest(bytes, node.version()), options, // no need to compress, we already compressed the bytes @@ -205,26 +218,59 @@ public class PublishClusterStateAction extends AbstractComponent { if (timedOutWaitingForNodes.get()) { logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node, clusterState.version(), publishTimeout); } - publishResponseHandler.onResponse(node); + sendingController.onNodeSendAck(node); } @Override public void handleException(TransportException exp) { if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) { logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage()); - sendFullClusterState(clusterState, null, node, timedOutWaitingForNodes, publishTimeout, publishResponseHandler); + sendFullClusterState(clusterState, null, node, timedOutWaitingForNodes, publishTimeout, sendingController); } else { logger.debug("failed to send cluster state to {}", exp, node); - publishResponseHandler.onFailure(node, exp); + sendingController.onNodeSendFailed(node, exp); } } }); } catch (Throwable t) { logger.warn("error sending cluster state to {}", t, node); + sendingController.onNodeSendFailed(node, t); + } + } + + private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final BlockingClusterStatePublishResponseHandler publishResponseHandler) { + try { + logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]", clusterState.stateUUID(), clusterState.version(), node); + TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false); + // no need to put a timeout on the options here, because we want the response to eventually be received + // and not log an error if it arrives after the timeout + transportService.sendRequest(node, COMMIT_ACTION_NAME, + new CommitClusterStateRequest(clusterState.stateUUID()), + options, // no need to compress, we already compressed the bytes + + new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + + @Override + public void handleResponse(TransportResponse.Empty response) { +// if (timedOutWaitingForNodes.get()) { + logger.debug("node {} responded to cluster state commit [{}]", node, clusterState.version()); +// } + publishResponseHandler.onResponse(node); + } + + @Override + public void handleException(TransportException exp) { + logger.debug("failed to commit cluster state (uuid [{}], version [{}]) to {}", exp, clusterState.stateUUID(), clusterState.version(), node); + publishResponseHandler.onFailure(node, exp); + } + }); + } catch (Throwable t) { + logger.warn("error sending cluster state commit (uuid [{}], version [{}]) to {}", t, clusterState.stateUUID(), clusterState.version(), node); publishResponseHandler.onFailure(node, t); } } + public static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException { BytesStreamOutput bStream = new BytesStreamOutput(); try (StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream)) { @@ -245,8 +291,10 @@ public class PublishClusterStateAction extends AbstractComponent { return bStream.bytes(); } - private class PublishClusterStateRequestHandler implements TransportRequestHandler { - private ClusterState lastSeenClusterState; + private Object lastSeenClusterStateMutex = new Object(); + private ClusterState lastSeenClusterState; + + private class SendClusterStateRequestHandler implements TransportRequestHandler { @Override public void messageReceived(BytesTransportRequest request, final TransportChannel channel) throws Exception { @@ -258,24 +306,57 @@ public class PublishClusterStateAction extends AbstractComponent { in = request.bytes().streamInput(); } in.setVersion(request.version()); - synchronized (this) { + synchronized (lastSeenClusterStateMutex) { + final ClusterState incomingState; // If true we received full cluster state - otherwise diffs if (in.readBoolean()) { - lastSeenClusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); - logger.debug("received full cluster state version {} with size {}", lastSeenClusterState.version(), request.bytes().length()); + incomingState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); + logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); } else if (lastSeenClusterState != null) { Diff diff = lastSeenClusterState.readDiffFrom(in); - lastSeenClusterState = diff.apply(lastSeenClusterState); - logger.debug("received diff cluster state version {} with uuid {}, diff size {}", lastSeenClusterState.version(), lastSeenClusterState.stateUUID(), request.bytes().length()); + incomingState = diff.apply(lastSeenClusterState); + logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", incomingState.version(), incomingState.stateUUID(), request.bytes().length()); } else { logger.debug("received diff for but don't have any local cluster state - requesting full state"); throw new IncompatibleClusterStateVersionException("have no local cluster state"); } + // sanity check incoming state + final ClusterName incomingClusterName = incomingState.getClusterName(); + if (!incomingClusterName.equals(PublishClusterStateAction.this.clusterName)) { + logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().masterNode(), incomingClusterName); + throw new IllegalStateException("received state from a node that is not part of the cluster"); + } + if (incomingState.nodes().localNode() == null) { + logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().masterNode()); + throw new IllegalStateException("received state from a node that is not part of the cluster"); + } + // state from another master requires more subtle checks, so we let it pass for now (it will be checked in ZenDiscovery) + if (nodesProvider.nodes().localNodeMaster() == false) { + ZenDiscovery.rejectNewClusterStateIfNeeded(logger, nodesProvider.nodes(), incomingState); + } + + lastSeenClusterState = incomingState; lastSeenClusterState.status(ClusterState.ClusterStateStatus.RECEIVED); } + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + } + + private class CommitClusterStateRequestHandler implements TransportRequestHandler { + @Override + public void messageReceived(CommitClusterStateRequest request, final TransportChannel channel) throws Exception { + ClusterState committedClusterState; + synchronized (lastSeenClusterStateMutex) { + committedClusterState = lastSeenClusterState; + } + if (committedClusterState.stateUUID().equals(request.stateUUID) == false) { + // nocommit: we need something better here + channel.sendResponse(TransportResponse.Empty.INSTANCE); + return; + } try { - listener.onNewClusterState(lastSeenClusterState, new NewClusterStateListener.NewStateProcessed() { + listener.onNewClusterState(committedClusterState, new NewClusterStateListener.NewStateProcessed() { @Override public void onNewClusterStateProcessed() { try { @@ -304,4 +385,110 @@ public class PublishClusterStateAction extends AbstractComponent { } } } -} + + static class CommitClusterStateRequest extends TransportRequest { + + String stateUUID; + + public CommitClusterStateRequest() { + } + + public CommitClusterStateRequest(String stateUUID) { + this.stateUUID = stateUUID; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + stateUUID = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(stateUUID); + } + } + + + public class FailedToCommitException extends ElasticsearchException { + + public FailedToCommitException(String msg) { + super(msg); + } + } + + class SendingController { + + private final ClusterState clusterState; + private final BlockingClusterStatePublishResponseHandler publishResponseHandler; + volatile int neededMastersToCommit; + int pendingMasterNodes; + final ArrayList sendAckedBeforeCommit = new ArrayList<>(); + final CountDownLatch comittedOrFailed; + final AtomicBoolean committed; + + private SendingController(ClusterState clusterState, int minMasterNodes, int totalMasterNodes, BlockingClusterStatePublishResponseHandler publishResponseHandler) { + this.clusterState = clusterState; + this.publishResponseHandler = publishResponseHandler; + this.neededMastersToCommit = Math.max(0, minMasterNodes - 1); // we are one of the master nodes + this.pendingMasterNodes = totalMasterNodes - 1; + this.committed = new AtomicBoolean(neededMastersToCommit == 0); + this.comittedOrFailed = new CountDownLatch(committed.get() ? 0 : 1); + } + + public void waitForCommit(TimeValue commitTimeout) { + try { + comittedOrFailed.await(commitTimeout.millis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + + } + if (committed.get() == false) { + throw new FailedToCommitException("failed to get enough masters to ack sent cluster state. [" + neededMastersToCommit + "] left"); + } + } + + synchronized public void onNodeSendAck(DiscoveryNode node) { + if (committed.get() == false) { + sendAckedBeforeCommit.add(node); + if (node.isMasterNode()) { + onMasterNodeSendAck(node); + } + } else { + assert sendAckedBeforeCommit.isEmpty(); + sendCommitToNode(node, clusterState, publishResponseHandler); + } + + } + + private void onMasterNodeSendAck(DiscoveryNode node) { + neededMastersToCommit--; + if (neededMastersToCommit == 0) { + logger.trace("committing version [{}]", clusterState.version()); + for (DiscoveryNode nodeToCommit : sendAckedBeforeCommit) { + sendCommitToNode(nodeToCommit, clusterState, publishResponseHandler); + } + sendAckedBeforeCommit.clear(); + boolean success = committed.compareAndSet(false, true); + assert success; + comittedOrFailed.countDown(); + } + onMasterNodeDone(node); + } + + private void onMasterNodeDone(DiscoveryNode node) { + pendingMasterNodes--; + if (pendingMasterNodes == 0) { + comittedOrFailed.countDown(); + } + } + + synchronized public void onNodeSendFailed(DiscoveryNode node, Throwable t) { + if (node.isMasterNode()) { + onMasterNodeDone(node); + } + publishResponseHandler.onFailure(node, t); + } + + } +} \ No newline at end of file diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffPublishingTests.java b/core/src/test/java/org/elasticsearch/cluster/PublishClusterStateActionTests.java similarity index 97% rename from core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffPublishingTests.java rename to core/src/test/java/org/elasticsearch/cluster/PublishClusterStateActionTests.java index 5575ed93317..5bd51f8ec7a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffPublishingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/PublishClusterStateActionTests.java @@ -60,7 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.collect.Maps.newHashMap; import static org.hamcrest.Matchers.*; -public class ClusterStateDiffPublishingTests extends ESTestCase { +public class PublishClusterStateActionTests extends ESTestCase { protected ThreadPool threadPool; protected Map nodes = newHashMap(); @@ -177,7 +177,7 @@ public class ClusterStateDiffPublishingTests extends ESTestCase { protected PublishClusterStateAction buildPublishClusterStateAction(Settings settings, MockTransportService transportService, MockDiscoveryNodesProvider nodesProvider, PublishClusterStateAction.NewClusterStateListener listener) { DiscoverySettings discoverySettings = new DiscoverySettings(settings, new NodeSettingsService(settings)); - return new PublishClusterStateAction(settings, transportService, nodesProvider, listener, discoverySettings); + return new PublishClusterStateAction(settings, transportService, nodesProvider, listener, discoverySettings, ClusterName.DEFAULT); } @@ -217,7 +217,7 @@ public class ClusterStateDiffPublishingTests extends ESTestCase { // Initial cluster state DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build(); - ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build(); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); // cluster state update - add nodeB discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(nodeB.discoveryNode).build(); @@ -356,7 +356,7 @@ public class ClusterStateDiffPublishingTests extends ESTestCase { // Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).put(nodeB.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build(); - ClusterState previousClusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build(); + ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build(); mockListenerB.add(new NewClusterStateExpectation() { @Override @@ -401,7 +401,7 @@ public class ClusterStateDiffPublishingTests extends ESTestCase { // Initial cluster state DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build(); - ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build(); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); // cluster state update - add nodeB discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(nodeB.discoveryNode).build(); @@ -447,7 +447,7 @@ public class ClusterStateDiffPublishingTests extends ESTestCase { AssertingAckListener[] listeners = new AssertingAckListener[numberOfIterations]; DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build(); MetaData metaData = MetaData.EMPTY_META_DATA; - ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metaData(metaData).build(); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build(); ClusterState previousState; for (int i = 0; i < numberOfIterations; i++) { previousState = clusterState; @@ -477,7 +477,7 @@ public class ClusterStateDiffPublishingTests extends ESTestCase { // Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).put(nodeB.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build(); - ClusterState previousClusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build(); + ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build(); mockListenerB.add(new NewClusterStateExpectation() { @Override @@ -545,7 +545,8 @@ public class ClusterStateDiffPublishingTests extends ESTestCase { public AssertingAckListener publishStateDiff(PublishClusterStateAction action, ClusterState state, ClusterState previousState) throws InterruptedException { AssertingAckListener assertingAckListener = new AssertingAckListener(state.nodes().getSize() - 1); ClusterChangedEvent changedEvent = new ClusterChangedEvent("test update", state, previousState); - action.publish(changedEvent, assertingAckListener); + int requiredNodes = randomIntBetween(-1, state.nodes().getSize() - 1); + action.publish(changedEvent, requiredNodes, assertingAckListener); return assertingAckListener; } diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index c18abdc5bef..6d4b0a9ee45 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -828,7 +828,11 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode); MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode); - masterTransportService.addFailToSendNoConnectRule(discoveryNodes.localNode(), PublishClusterStateAction.ACTION_NAME); + if (randomBoolean()) { + masterTransportService.addFailToSendNoConnectRule(discoveryNodes.localNode(), PublishClusterStateAction.SEND_ACTION_NAME); + } else { + masterTransportService.addFailToSendNoConnectRule(discoveryNodes.localNode(), PublishClusterStateAction.COMMIT_ACTION_NAME); + } logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode); final CountDownLatch countDownLatch = new CountDownLatch(2); diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java index cc293375a2c..9b79ff9f7a7 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java @@ -205,7 +205,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference reference = new AtomicReference<>(); - internalCluster().getInstance(TransportService.class, noneMasterNode).sendRequest(node, PublishClusterStateAction.ACTION_NAME, new BytesTransportRequest(bytes, Version.CURRENT), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + internalCluster().getInstance(TransportService.class, noneMasterNode).sendRequest(node, PublishClusterStateAction.SEND_ACTION_NAME, new BytesTransportRequest(bytes, Version.CURRENT), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleResponse(TransportResponse.Empty response) {