initial copy over from POC

This commit is contained in:
Boaz Leskes 2015-08-19 17:46:44 +02:00
parent 35f9ee7a62
commit 3815a41626
6 changed files with 366 additions and 148 deletions

View File

@ -36,10 +36,12 @@ import java.util.EnumSet;
public class DiscoverySettings extends AbstractComponent {
public static final String PUBLISH_TIMEOUT = "discovery.zen.publish_timeout";
public static final String COMMIT_TIMEOUT = "discovery.zen.commit_timeout";
public static final String NO_MASTER_BLOCK = "discovery.zen.no_master_block";
public static final String PUBLISH_DIFF_ENABLE = "discovery.zen.publish_diff.enable";
public static final TimeValue DEFAULT_PUBLISH_TIMEOUT = TimeValue.timeValueSeconds(30);
public static final TimeValue DEFAULT_COMMIT_TIMEOUT = TimeValue.timeValueSeconds(1);
public static final String DEFAULT_NO_MASTER_BLOCK = "write";
public final static int NO_MASTER_BLOCK_ID = 2;
public final static boolean DEFAULT_PUBLISH_DIFF_ENABLE = true;
@ -49,6 +51,7 @@ public class DiscoverySettings extends AbstractComponent {
private volatile ClusterBlock noMasterBlock;
private volatile TimeValue publishTimeout = DEFAULT_PUBLISH_TIMEOUT;
private volatile TimeValue commitTimeout = DEFAULT_COMMIT_TIMEOUT;
private volatile boolean publishDiff = DEFAULT_PUBLISH_DIFF_ENABLE;
@Inject
@ -57,6 +60,7 @@ public class DiscoverySettings extends AbstractComponent {
nodeSettingsService.addListener(new ApplySettings());
this.noMasterBlock = parseNoMasterBlock(settings.get(NO_MASTER_BLOCK, DEFAULT_NO_MASTER_BLOCK));
this.publishTimeout = settings.getAsTime(PUBLISH_TIMEOUT, publishTimeout);
this.commitTimeout = settings.getAsTime(PUBLISH_TIMEOUT, publishTimeout);
this.publishDiff = settings.getAsBoolean(PUBLISH_DIFF_ENABLE, DEFAULT_PUBLISH_DIFF_ENABLE);
}
@ -67,6 +71,10 @@ public class DiscoverySettings extends AbstractComponent {
return publishTimeout;
}
public TimeValue getCommitTimeout() {
return commitTimeout;
}
public ClusterBlock getNoMasterBlock() {
return noMasterBlock;
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.InternalClusterService;
@ -199,7 +200,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName);
this.nodesFD.addListener(new NodeFaultDetectionListener());
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings);
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings, clusterName);
this.pingService.setPingContextProvider(this);
this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener());
@ -329,7 +330,24 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
throw new IllegalStateException("Shouldn't publish state when not master");
}
nodesFD.updateNodesAndPing(clusterChangedEvent.state());
publishClusterState.publish(clusterChangedEvent, ackListener);
try {
publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
} catch (PublishClusterStateAction.FailedToCommitException t) {
logger.warn("failed to publish [{}] (not enough nodes acknowledged, min master nodes [{}])", clusterChangedEvent.state().version(), electMaster.minimumMasterNodes());
clusterService.submitStateUpdateTask("zen-disco-failed-to-publish", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return rejoin(currentState, "failed to publish to min_master_nodes");
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
});
throw t;
}
}
/**
@ -677,12 +695,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
void handleNewClusterStateFromMaster(ClusterState newClusterState, final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
final ClusterName incomingClusterName = newClusterState.getClusterName();
/* The cluster name can still be null if the state comes from a node that is prev 1.1.1*/
if (incomingClusterName != null && !incomingClusterName.equals(this.clusterName)) {
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", newClusterState.nodes().masterNode(), incomingClusterName);
newStateProcessed.onNewClusterStateFailed(new IllegalStateException("received state from a node that is not part of the cluster"));
return;
}
if (localNodeMaster()) {
logger.debug("received cluster state from [{}] which is also master with cluster name [{}]", newClusterState.nodes().masterNode(), incomingClusterName);
final ClusterState newState = newClusterState;
@ -705,10 +717,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
});
} else {
if (newClusterState.nodes().localNode() == null) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", newClusterState.nodes().masterNode());
newStateProcessed.onNewClusterStateFailed(new IllegalStateException("received state from a node that is not part of the cluster"));
} else {
final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState);
processNewClusterStates.add(processClusterState);
@ -801,7 +810,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
});
}
}
}
/**
* Picks the cluster state with highest version with the same master from the queue. All cluster states with
@ -848,13 +856,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
* If the second condition fails we ignore the cluster state.
*/
static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) {
if (currentState.nodes().masterNodeId() == null) {
return false;
}
if (!currentState.nodes().masterNodeId().equals(newClusterState.nodes().masterNodeId())) {
logger.warn("received a cluster state from a different master then the current one, rejecting (received {}, current {})", newClusterState.nodes().masterNode(), currentState.nodes().masterNode());
throw new IllegalStateException("cluster state from a different master than the current one, rejecting (received " + newClusterState.nodes().masterNode() + ", current " + currentState.nodes().masterNode() + ")");
} else if (newClusterState.version() < currentState.version()) {
rejectNewClusterStateIfNeeded(logger, currentState.nodes(), newClusterState);
if (currentState.nodes().masterNodeId() != null && newClusterState.version() < currentState.version()) {
// if the new state has a smaller version, and it has the same master node, then no need to process it
logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
return true;
@ -863,6 +866,21 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
}
/**
* In the case we follow an elected master the new cluster state needs to have the same elected master
* This method checks for this and throws an exception if needed
*/
public static void rejectNewClusterStateIfNeeded(ESLogger logger, DiscoveryNodes currentNodes, ClusterState newClusterState) {
if (currentNodes.masterNodeId() == null) {
return;
}
if (!currentNodes.masterNodeId().equals(newClusterState.nodes().masterNodeId())) {
logger.warn("received a cluster state from a different master then the current one, rejecting (received {}, current {})", newClusterState.nodes().masterNode(), currentNodes.masterNode());
throw new IllegalStateException("cluster state from a different master then the current one, rejecting (received " + newClusterState.nodes().masterNode() + ", current " + currentNodes.masterNode() + ")");
}
}
void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {
if (!transportService.addressSupported(node.address().getClass())) {

View File

@ -20,11 +20,9 @@
package org.elasticsearch.discovery.zen.publish;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
@ -41,13 +39,17 @@ import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -55,7 +57,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class PublishClusterStateAction extends AbstractComponent {
public static final String ACTION_NAME = "internal:discovery/zen/publish";
public static final String SEND_ACTION_NAME = "internal:discovery/zen/publish/send";
public static final String COMMIT_ACTION_NAME = "internal:discovery/zen/publish/commit";
public interface NewClusterStateListener {
@ -73,34 +76,41 @@ public class PublishClusterStateAction extends AbstractComponent {
private final DiscoveryNodesProvider nodesProvider;
private final NewClusterStateListener listener;
private final DiscoverySettings discoverySettings;
private final ClusterName clusterName;
public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider,
NewClusterStateListener listener, DiscoverySettings discoverySettings) {
NewClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
super(settings);
this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.listener = listener;
this.discoverySettings = discoverySettings;
transportService.registerRequestHandler(ACTION_NAME, BytesTransportRequest.class, ThreadPool.Names.SAME, new PublishClusterStateRequestHandler());
this.clusterName = clusterName;
transportService.registerRequestHandler(SEND_ACTION_NAME, BytesTransportRequest.class, ThreadPool.Names.SAME, new SendClusterStateRequestHandler());
transportService.registerRequestHandler(COMMIT_ACTION_NAME, CommitClusterStateRequest.class, ThreadPool.Names.SAME, new CommitClusterStateRequestHandler());
}
public void close() {
transportService.removeHandler(ACTION_NAME);
transportService.removeHandler(SEND_ACTION_NAME);
transportService.removeHandler(COMMIT_ACTION_NAME);
}
public void publish(ClusterChangedEvent clusterChangedEvent, final Discovery.AckListener ackListener) {
public void publish(ClusterChangedEvent clusterChangedEvent, int minMasterNodes, final Discovery.AckListener ackListener) {
Set<DiscoveryNode> nodesToPublishTo = new HashSet<>(clusterChangedEvent.state().nodes().size());
DiscoveryNode localNode = nodesProvider.nodes().localNode();
int totalMasterNodes = 0;
for (final DiscoveryNode node : clusterChangedEvent.state().nodes()) {
if (node.equals(localNode)) {
continue;
if (node.isMasterNode()) {
totalMasterNodes++;
}
if (node.equals(localNode) == false) {
nodesToPublishTo.add(node);
}
publish(clusterChangedEvent, nodesToPublishTo, new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener));
}
publish(clusterChangedEvent, minMasterNodes, totalMasterNodes, nodesToPublishTo, new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener));
}
private void publish(final ClusterChangedEvent clusterChangedEvent, final Set<DiscoveryNode> nodesToPublishTo,
private void publish(final ClusterChangedEvent clusterChangedEvent, int minMasterNodes, int totalMasterNodes, final Set<DiscoveryNode> nodesToPublishTo,
final BlockingClusterStatePublishResponseHandler publishResponseHandler) {
Map<Version, BytesReference> serializedStates = Maps.newHashMap();
@ -111,6 +121,7 @@ public class PublishClusterStateAction extends AbstractComponent {
final AtomicBoolean timedOutWaitingForNodes = new AtomicBoolean(false);
final TimeValue publishTimeout = discoverySettings.getPublishTimeout();
final boolean sendFullVersion = !discoverySettings.getPublishDiff() || previousState == null;
final SendingController sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes, totalMasterNodes, publishResponseHandler);
Diff<ClusterState> diff = null;
for (final DiscoveryNode node : nodesToPublishTo) {
@ -119,15 +130,17 @@ public class PublishClusterStateAction extends AbstractComponent {
// per node when we send it over the wire, compress it while we are at it...
// we don't send full version if node didn't exist in the previous version of cluster state
if (sendFullVersion || !previousState.nodes().nodeExists(node.id())) {
sendFullClusterState(clusterState, serializedStates, node, timedOutWaitingForNodes, publishTimeout, publishResponseHandler);
sendFullClusterState(clusterState, serializedStates, node, timedOutWaitingForNodes, publishTimeout, sendingController);
} else {
if (diff == null) {
diff = clusterState.diff(previousState);
}
sendClusterStateDiff(clusterState, diff, serializedDiffs, node, timedOutWaitingForNodes, publishTimeout, publishResponseHandler);
sendClusterStateDiff(clusterState, diff, serializedDiffs, node, timedOutWaitingForNodes, publishTimeout, sendingController);
}
}
sendingController.waitForCommit(discoverySettings.getCommitTimeout());
if (publishTimeout.millis() > 0) {
// only wait if the publish timeout is configured...
try {
@ -148,7 +161,7 @@ public class PublishClusterStateAction extends AbstractComponent {
private void sendFullClusterState(ClusterState clusterState, @Nullable Map<Version, BytesReference> serializedStates,
DiscoveryNode node, AtomicBoolean timedOutWaitingForNodes, TimeValue publishTimeout,
BlockingClusterStatePublishResponseHandler publishResponseHandler) {
SendingController sendingController) {
BytesReference bytes = null;
if (serializedStates != null) {
bytes = serializedStates.get(node.version());
@ -161,16 +174,16 @@ public class PublishClusterStateAction extends AbstractComponent {
}
} catch (Throwable e) {
logger.warn("failed to serialize cluster_state before publishing it to node {}", e, node);
publishResponseHandler.onFailure(node, e);
sendingController.onNodeSendFailed(node, e);
return;
}
}
publishClusterStateToNode(clusterState, bytes, node, timedOutWaitingForNodes, publishTimeout, publishResponseHandler, false);
sendClusterStateToNode(clusterState, bytes, node, timedOutWaitingForNodes, publishTimeout, sendingController, false);
}
private void sendClusterStateDiff(ClusterState clusterState, Diff diff, Map<Version, BytesReference> serializedDiffs, DiscoveryNode node,
AtomicBoolean timedOutWaitingForNodes, TimeValue publishTimeout,
BlockingClusterStatePublishResponseHandler publishResponseHandler) {
SendingController sendingController) {
BytesReference bytes = serializedDiffs.get(node.version());
if (bytes == null) {
try {
@ -178,23 +191,23 @@ public class PublishClusterStateAction extends AbstractComponent {
serializedDiffs.put(node.version(), bytes);
} catch (Throwable e) {
logger.warn("failed to serialize diff of cluster_state before publishing it to node {}", e, node);
publishResponseHandler.onFailure(node, e);
sendingController.onNodeSendFailed(node, e);
return;
}
}
publishClusterStateToNode(clusterState, bytes, node, timedOutWaitingForNodes, publishTimeout, publishResponseHandler, true);
sendClusterStateToNode(clusterState, bytes, node, timedOutWaitingForNodes, publishTimeout, sendingController, true);
}
private void publishClusterStateToNode(final ClusterState clusterState, BytesReference bytes,
private void sendClusterStateToNode(final ClusterState clusterState, BytesReference bytes,
final DiscoveryNode node, final AtomicBoolean timedOutWaitingForNodes,
final TimeValue publishTimeout,
final BlockingClusterStatePublishResponseHandler publishResponseHandler,
final SendingController sendingController,
final boolean sendDiffs) {
try {
TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false);
// no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
transportService.sendRequest(node, ACTION_NAME,
transportService.sendRequest(node, SEND_ACTION_NAME,
new BytesTransportRequest(bytes, node.version()),
options, // no need to compress, we already compressed the bytes
@ -205,26 +218,59 @@ public class PublishClusterStateAction extends AbstractComponent {
if (timedOutWaitingForNodes.get()) {
logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node, clusterState.version(), publishTimeout);
}
publishResponseHandler.onResponse(node);
sendingController.onNodeSendAck(node);
}
@Override
public void handleException(TransportException exp) {
if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage());
sendFullClusterState(clusterState, null, node, timedOutWaitingForNodes, publishTimeout, publishResponseHandler);
sendFullClusterState(clusterState, null, node, timedOutWaitingForNodes, publishTimeout, sendingController);
} else {
logger.debug("failed to send cluster state to {}", exp, node);
publishResponseHandler.onFailure(node, exp);
sendingController.onNodeSendFailed(node, exp);
}
}
});
} catch (Throwable t) {
logger.warn("error sending cluster state to {}", t, node);
sendingController.onNodeSendFailed(node, t);
}
}
private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final BlockingClusterStatePublishResponseHandler publishResponseHandler) {
try {
logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]", clusterState.stateUUID(), clusterState.version(), node);
TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false);
// no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
transportService.sendRequest(node, COMMIT_ACTION_NAME,
new CommitClusterStateRequest(clusterState.stateUUID()),
options, // no need to compress, we already compressed the bytes
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
// if (timedOutWaitingForNodes.get()) {
logger.debug("node {} responded to cluster state commit [{}]", node, clusterState.version());
// }
publishResponseHandler.onResponse(node);
}
@Override
public void handleException(TransportException exp) {
logger.debug("failed to commit cluster state (uuid [{}], version [{}]) to {}", exp, clusterState.stateUUID(), clusterState.version(), node);
publishResponseHandler.onFailure(node, exp);
}
});
} catch (Throwable t) {
logger.warn("error sending cluster state commit (uuid [{}], version [{}]) to {}", t, clusterState.stateUUID(), clusterState.version(), node);
publishResponseHandler.onFailure(node, t);
}
}
public static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
BytesStreamOutput bStream = new BytesStreamOutput();
try (StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream)) {
@ -245,9 +291,11 @@ public class PublishClusterStateAction extends AbstractComponent {
return bStream.bytes();
}
private class PublishClusterStateRequestHandler implements TransportRequestHandler<BytesTransportRequest> {
private Object lastSeenClusterStateMutex = new Object();
private ClusterState lastSeenClusterState;
private class SendClusterStateRequestHandler implements TransportRequestHandler<BytesTransportRequest> {
@Override
public void messageReceived(BytesTransportRequest request, final TransportChannel channel) throws Exception {
Compressor compressor = CompressorFactory.compressor(request.bytes());
@ -258,24 +306,57 @@ public class PublishClusterStateAction extends AbstractComponent {
in = request.bytes().streamInput();
}
in.setVersion(request.version());
synchronized (this) {
synchronized (lastSeenClusterStateMutex) {
final ClusterState incomingState;
// If true we received full cluster state - otherwise diffs
if (in.readBoolean()) {
lastSeenClusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
logger.debug("received full cluster state version {} with size {}", lastSeenClusterState.version(), request.bytes().length());
incomingState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length());
} else if (lastSeenClusterState != null) {
Diff<ClusterState> diff = lastSeenClusterState.readDiffFrom(in);
lastSeenClusterState = diff.apply(lastSeenClusterState);
logger.debug("received diff cluster state version {} with uuid {}, diff size {}", lastSeenClusterState.version(), lastSeenClusterState.stateUUID(), request.bytes().length());
incomingState = diff.apply(lastSeenClusterState);
logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", incomingState.version(), incomingState.stateUUID(), request.bytes().length());
} else {
logger.debug("received diff for but don't have any local cluster state - requesting full state");
throw new IncompatibleClusterStateVersionException("have no local cluster state");
}
// sanity check incoming state
final ClusterName incomingClusterName = incomingState.getClusterName();
if (!incomingClusterName.equals(PublishClusterStateAction.this.clusterName)) {
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().masterNode(), incomingClusterName);
throw new IllegalStateException("received state from a node that is not part of the cluster");
}
if (incomingState.nodes().localNode() == null) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().masterNode());
throw new IllegalStateException("received state from a node that is not part of the cluster");
}
// state from another master requires more subtle checks, so we let it pass for now (it will be checked in ZenDiscovery)
if (nodesProvider.nodes().localNodeMaster() == false) {
ZenDiscovery.rejectNewClusterStateIfNeeded(logger, nodesProvider.nodes(), incomingState);
}
lastSeenClusterState = incomingState;
lastSeenClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
private class CommitClusterStateRequestHandler implements TransportRequestHandler<CommitClusterStateRequest> {
@Override
public void messageReceived(CommitClusterStateRequest request, final TransportChannel channel) throws Exception {
ClusterState committedClusterState;
synchronized (lastSeenClusterStateMutex) {
committedClusterState = lastSeenClusterState;
}
if (committedClusterState.stateUUID().equals(request.stateUUID) == false) {
// nocommit: we need something better here
channel.sendResponse(TransportResponse.Empty.INSTANCE);
return;
}
try {
listener.onNewClusterState(lastSeenClusterState, new NewClusterStateListener.NewStateProcessed() {
listener.onNewClusterState(committedClusterState, new NewClusterStateListener.NewStateProcessed() {
@Override
public void onNewClusterStateProcessed() {
try {
@ -304,4 +385,110 @@ public class PublishClusterStateAction extends AbstractComponent {
}
}
}
static class CommitClusterStateRequest extends TransportRequest {
String stateUUID;
public CommitClusterStateRequest() {
}
public CommitClusterStateRequest(String stateUUID) {
this.stateUUID = stateUUID;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
stateUUID = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(stateUUID);
}
}
public class FailedToCommitException extends ElasticsearchException {
public FailedToCommitException(String msg) {
super(msg);
}
}
class SendingController {
private final ClusterState clusterState;
private final BlockingClusterStatePublishResponseHandler publishResponseHandler;
volatile int neededMastersToCommit;
int pendingMasterNodes;
final ArrayList<DiscoveryNode> sendAckedBeforeCommit = new ArrayList<>();
final CountDownLatch comittedOrFailed;
final AtomicBoolean committed;
private SendingController(ClusterState clusterState, int minMasterNodes, int totalMasterNodes, BlockingClusterStatePublishResponseHandler publishResponseHandler) {
this.clusterState = clusterState;
this.publishResponseHandler = publishResponseHandler;
this.neededMastersToCommit = Math.max(0, minMasterNodes - 1); // we are one of the master nodes
this.pendingMasterNodes = totalMasterNodes - 1;
this.committed = new AtomicBoolean(neededMastersToCommit == 0);
this.comittedOrFailed = new CountDownLatch(committed.get() ? 0 : 1);
}
public void waitForCommit(TimeValue commitTimeout) {
try {
comittedOrFailed.await(commitTimeout.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (committed.get() == false) {
throw new FailedToCommitException("failed to get enough masters to ack sent cluster state. [" + neededMastersToCommit + "] left");
}
}
synchronized public void onNodeSendAck(DiscoveryNode node) {
if (committed.get() == false) {
sendAckedBeforeCommit.add(node);
if (node.isMasterNode()) {
onMasterNodeSendAck(node);
}
} else {
assert sendAckedBeforeCommit.isEmpty();
sendCommitToNode(node, clusterState, publishResponseHandler);
}
}
private void onMasterNodeSendAck(DiscoveryNode node) {
neededMastersToCommit--;
if (neededMastersToCommit == 0) {
logger.trace("committing version [{}]", clusterState.version());
for (DiscoveryNode nodeToCommit : sendAckedBeforeCommit) {
sendCommitToNode(nodeToCommit, clusterState, publishResponseHandler);
}
sendAckedBeforeCommit.clear();
boolean success = committed.compareAndSet(false, true);
assert success;
comittedOrFailed.countDown();
}
onMasterNodeDone(node);
}
private void onMasterNodeDone(DiscoveryNode node) {
pendingMasterNodes--;
if (pendingMasterNodes == 0) {
comittedOrFailed.countDown();
}
}
synchronized public void onNodeSendFailed(DiscoveryNode node, Throwable t) {
if (node.isMasterNode()) {
onMasterNodeDone(node);
}
publishResponseHandler.onFailure(node, t);
}
}
}

View File

@ -60,7 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.collect.Maps.newHashMap;
import static org.hamcrest.Matchers.*;
public class ClusterStateDiffPublishingTests extends ESTestCase {
public class PublishClusterStateActionTests extends ESTestCase {
protected ThreadPool threadPool;
protected Map<String, MockNode> nodes = newHashMap();
@ -177,7 +177,7 @@ public class ClusterStateDiffPublishingTests extends ESTestCase {
protected PublishClusterStateAction buildPublishClusterStateAction(Settings settings, MockTransportService transportService, MockDiscoveryNodesProvider nodesProvider,
PublishClusterStateAction.NewClusterStateListener listener) {
DiscoverySettings discoverySettings = new DiscoverySettings(settings, new NodeSettingsService(settings));
return new PublishClusterStateAction(settings, transportService, nodesProvider, listener, discoverySettings);
return new PublishClusterStateAction(settings, transportService, nodesProvider, listener, discoverySettings, ClusterName.DEFAULT);
}
@ -217,7 +217,7 @@ public class ClusterStateDiffPublishingTests extends ESTestCase {
// Initial cluster state
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build();
ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build();
// cluster state update - add nodeB
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(nodeB.discoveryNode).build();
@ -356,7 +356,7 @@ public class ClusterStateDiffPublishingTests extends ESTestCase {
// Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).put(nodeB.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build();
ClusterState previousClusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build();
ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build();
ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build();
mockListenerB.add(new NewClusterStateExpectation() {
@Override
@ -401,7 +401,7 @@ public class ClusterStateDiffPublishingTests extends ESTestCase {
// Initial cluster state
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build();
ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build();
// cluster state update - add nodeB
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(nodeB.discoveryNode).build();
@ -447,7 +447,7 @@ public class ClusterStateDiffPublishingTests extends ESTestCase {
AssertingAckListener[] listeners = new AssertingAckListener[numberOfIterations];
DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build();
MetaData metaData = MetaData.EMPTY_META_DATA;
ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metaData(metaData).build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
ClusterState previousState;
for (int i = 0; i < numberOfIterations; i++) {
previousState = clusterState;
@ -477,7 +477,7 @@ public class ClusterStateDiffPublishingTests extends ESTestCase {
// Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).put(nodeB.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build();
ClusterState previousClusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build();
ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build();
ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build();
mockListenerB.add(new NewClusterStateExpectation() {
@Override
@ -545,7 +545,8 @@ public class ClusterStateDiffPublishingTests extends ESTestCase {
public AssertingAckListener publishStateDiff(PublishClusterStateAction action, ClusterState state, ClusterState previousState) throws InterruptedException {
AssertingAckListener assertingAckListener = new AssertingAckListener(state.nodes().getSize() - 1);
ClusterChangedEvent changedEvent = new ClusterChangedEvent("test update", state, previousState);
action.publish(changedEvent, assertingAckListener);
int requiredNodes = randomIntBetween(-1, state.nodes().getSize() - 1);
action.publish(changedEvent, requiredNodes, assertingAckListener);
return assertingAckListener;
}

View File

@ -828,7 +828,11 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode);
MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode);
masterTransportService.addFailToSendNoConnectRule(discoveryNodes.localNode(), PublishClusterStateAction.ACTION_NAME);
if (randomBoolean()) {
masterTransportService.addFailToSendNoConnectRule(discoveryNodes.localNode(), PublishClusterStateAction.SEND_ACTION_NAME);
} else {
masterTransportService.addFailToSendNoConnectRule(discoveryNodes.localNode(), PublishClusterStateAction.COMMIT_ACTION_NAME);
}
logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode);
final CountDownLatch countDownLatch = new CountDownLatch(2);

View File

@ -205,7 +205,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> reference = new AtomicReference<>();
internalCluster().getInstance(TransportService.class, noneMasterNode).sendRequest(node, PublishClusterStateAction.ACTION_NAME, new BytesTransportRequest(bytes, Version.CURRENT), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
internalCluster().getInstance(TransportService.class, noneMasterNode).sendRequest(node, PublishClusterStateAction.SEND_ACTION_NAME, new BytesTransportRequest(bytes, Version.CURRENT), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {