Publish to self through transport (#43994)
This commit ensures that cluster state publications to self also go through the transport layer. This allows voting-only nodes to intercept the publication to self. Fixes an issue discovered by a test failure where a voting-only node, which was the only bootstrapped node, would not step down as master after state transfer because publishing to self would succeed. Closes #43631
This commit is contained in:
parent
5cdf3ff3fa
commit
1220ff5b6d
|
@ -38,7 +38,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
|||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.discovery.zen.PublishClusterStateAction;
|
||||
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
|
||||
|
@ -75,6 +74,13 @@ public class PublicationTransportHandler {
|
|||
|
||||
private AtomicReference<ClusterState> lastSeenClusterState = new AtomicReference<>();
|
||||
|
||||
// the master needs the original non-serialized state as the cluster state contains some volatile information that we
|
||||
// don't want to be replicated because it's not usable on another node (e.g. UnassignedInfo.unassignedTimeNanos) or
|
||||
// because it's mostly just debugging info that would unnecessarily blow up CS updates (I think there was one in
|
||||
// snapshot code).
|
||||
// TODO: look into these and check how to get rid of them
|
||||
private AtomicReference<PublishRequest> currentPublishRequestToSelf = new AtomicReference<>();
|
||||
|
||||
private final AtomicLong fullClusterStateReceivedCount = new AtomicLong();
|
||||
private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong();
|
||||
private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong();
|
||||
|
@ -179,32 +185,32 @@ public class PublicationTransportHandler {
|
|||
return new PublicationContext() {
|
||||
@Override
|
||||
public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
|
||||
ActionListener<PublishWithJoinResponse> responseActionListener) {
|
||||
ActionListener<PublishWithJoinResponse> originalListener) {
|
||||
assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us";
|
||||
final ActionListener<PublishWithJoinResponse> responseActionListener;
|
||||
if (destination.equals(nodes.getLocalNode())) {
|
||||
// the master needs the original non-serialized state as the cluster state contains some volatile information that we
|
||||
// don't want to be replicated because it's not usable on another node (e.g. UnassignedInfo.unassignedTimeNanos) or
|
||||
// because it's mostly just debugging info that would unnecessarily blow up CS updates (I think there was one in
|
||||
// snapshot code).
|
||||
// TODO: look into these and check how to get rid of them
|
||||
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
|
||||
// if publishing to self, use original request instead (see currentPublishRequestToSelf for explanation)
|
||||
final PublishRequest previousRequest = currentPublishRequestToSelf.getAndSet(publishRequest);
|
||||
assert previousRequest == null;
|
||||
responseActionListener = new ActionListener<PublishWithJoinResponse>() {
|
||||
@Override
|
||||
public void onResponse(PublishWithJoinResponse publishWithJoinResponse) {
|
||||
final PublishRequest previousRequest = currentPublishRequestToSelf.getAndSet(null);
|
||||
assert previousRequest == publishRequest;
|
||||
originalListener.onResponse(publishWithJoinResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
// wrap into fake TransportException, as that's what we expect in Publication
|
||||
responseActionListener.onFailure(new TransportException(e));
|
||||
final PublishRequest previousRequest = currentPublishRequestToSelf.getAndSet(null);
|
||||
assert previousRequest == publishRequest;
|
||||
originalListener.onFailure(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() {
|
||||
responseActionListener.onResponse(handlePublishRequest.apply(publishRequest));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "publish to self of " + publishRequest;
|
||||
}
|
||||
});
|
||||
} else if (sendFullVersion || !previousState.nodes().nodeExists(destination)) {
|
||||
};
|
||||
} else {
|
||||
responseActionListener = originalListener;
|
||||
}
|
||||
if (sendFullVersion || !previousState.nodes().nodeExists(destination)) {
|
||||
logger.trace("sending full cluster state version {} to {}", newState.version(), destination);
|
||||
PublicationTransportHandler.this.sendFullClusterState(newState, serializedStates, destination, responseActionListener);
|
||||
} else {
|
||||
|
@ -314,10 +320,6 @@ public class PublicationTransportHandler {
|
|||
Map<Version, BytesReference> serializedDiffs) {
|
||||
Diff<ClusterState> diff = null;
|
||||
for (DiscoveryNode node : discoveryNodes) {
|
||||
if (node.equals(discoveryNodes.getLocalNode())) {
|
||||
// ignore, see newPublicationContext
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
if (sendFullVersion || !previousState.nodes().nodeExists(node)) {
|
||||
if (serializedStates.containsKey(node.getVersion()) == false) {
|
||||
|
@ -403,7 +405,7 @@ public class PublicationTransportHandler {
|
|||
fullClusterStateReceivedCount.incrementAndGet();
|
||||
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),
|
||||
request.bytes().length());
|
||||
final PublishWithJoinResponse response = handlePublishRequest.apply(new PublishRequest(incomingState));
|
||||
final PublishWithJoinResponse response = acceptState(incomingState);
|
||||
lastSeenClusterState.set(incomingState);
|
||||
return response;
|
||||
} else {
|
||||
|
@ -413,7 +415,7 @@ public class PublicationTransportHandler {
|
|||
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
|
||||
throw new IncompatibleClusterStateVersionException("have no local cluster state");
|
||||
} else {
|
||||
final ClusterState incomingState;
|
||||
ClusterState incomingState;
|
||||
try {
|
||||
Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeen.nodes().getLocalNode());
|
||||
incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException
|
||||
|
@ -427,7 +429,7 @@ public class PublicationTransportHandler {
|
|||
compatibleClusterStateDiffReceivedCount.incrementAndGet();
|
||||
logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
|
||||
incomingState.version(), incomingState.stateUUID(), request.bytes().length());
|
||||
final PublishWithJoinResponse response = handlePublishRequest.apply(new PublishRequest(incomingState));
|
||||
final PublishWithJoinResponse response = acceptState(incomingState);
|
||||
lastSeenClusterState.compareAndSet(lastSeen, incomingState);
|
||||
return response;
|
||||
}
|
||||
|
@ -436,4 +438,17 @@ public class PublicationTransportHandler {
|
|||
IOUtils.close(in);
|
||||
}
|
||||
}
|
||||
|
||||
private PublishWithJoinResponse acceptState(ClusterState incomingState) {
|
||||
// if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
|
||||
if (transportService.getLocalNode().equals(incomingState.nodes().getMasterNode())) {
|
||||
final PublishRequest publishRequest = currentPublishRequestToSelf.get();
|
||||
if (publishRequest == null || publishRequest.getAcceptedState().stateUUID().equals(incomingState.stateUUID()) == false) {
|
||||
throw new IllegalStateException("publication to self failed for " + publishRequest);
|
||||
} else {
|
||||
return handlePublishRequest.apply(publishRequest);
|
||||
}
|
||||
}
|
||||
return handlePublishRequest.apply(new PublishRequest(incomingState));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -771,23 +771,12 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
|
|||
|
||||
for (ClusterNode cn : cluster.clusterNodes) {
|
||||
assertThat(value(cn.getLastAppliedClusterState()), is(finalValue));
|
||||
if (cn == leader) {
|
||||
// leader does not update publish stats as it's not using the serialized state
|
||||
assertEquals(cn.toString(), prePublishStats.get(cn).getFullClusterStateReceivedCount(),
|
||||
postPublishStats.get(cn).getFullClusterStateReceivedCount());
|
||||
assertEquals(cn.toString(), prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount(),
|
||||
postPublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount());
|
||||
assertEquals(cn.toString(), prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
|
||||
postPublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount());
|
||||
} else {
|
||||
// followers receive a diff
|
||||
assertEquals(cn.toString(), prePublishStats.get(cn).getFullClusterStateReceivedCount(),
|
||||
postPublishStats.get(cn).getFullClusterStateReceivedCount());
|
||||
assertEquals(cn.toString(), prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount() + 1,
|
||||
postPublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount());
|
||||
assertEquals(cn.toString(), prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
|
||||
postPublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount());
|
||||
}
|
||||
assertEquals(cn.toString(), prePublishStats.get(cn).getFullClusterStateReceivedCount(),
|
||||
postPublishStats.get(cn).getFullClusterStateReceivedCount());
|
||||
assertEquals(cn.toString(), prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount() + 1,
|
||||
postPublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount());
|
||||
assertEquals(cn.toString(), prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
|
||||
postPublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -50,6 +49,7 @@ import java.util.concurrent.TimeoutException;
|
|||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
@ -158,21 +158,6 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
public void testDiscoveryStats() throws Exception {
|
||||
String expectedStatsJsonResponse = "{\n" +
|
||||
" \"discovery\" : {\n" +
|
||||
" \"cluster_state_queue\" : {\n" +
|
||||
" \"total\" : 0,\n" +
|
||||
" \"pending\" : 0,\n" +
|
||||
" \"committed\" : 0\n" +
|
||||
" },\n" +
|
||||
" \"published_cluster_states\" : {\n" +
|
||||
" \"full_states\" : 0,\n" +
|
||||
" \"incompatible_diffs\" : 0,\n" +
|
||||
" \"compatible_diffs\" : 0\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
"}";
|
||||
|
||||
internalCluster().startNode();
|
||||
ensureGreen(); // ensures that all events are processed (in particular state recovery fully completed)
|
||||
assertBusy(() ->
|
||||
|
@ -190,15 +175,13 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
|
|||
assertThat(stats.getQueueStats().getPending(), equalTo(0));
|
||||
|
||||
assertThat(stats.getPublishStats(), notNullValue());
|
||||
assertThat(stats.getPublishStats().getFullClusterStateReceivedCount(), equalTo(0L));
|
||||
assertThat(stats.getPublishStats().getFullClusterStateReceivedCount(), greaterThanOrEqualTo(0L));
|
||||
assertThat(stats.getPublishStats().getIncompatibleClusterStateDiffReceivedCount(), equalTo(0L));
|
||||
assertThat(stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount(), equalTo(0L));
|
||||
assertThat(stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount(), greaterThanOrEqualTo(0L));
|
||||
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
|
||||
builder.startObject();
|
||||
stats.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
builder.endObject();
|
||||
|
||||
assertThat(Strings.toString(builder), equalTo(expectedStatsJsonResponse));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.cluster.coordination;
|
|||
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.discovery.MasterNotDiscoveredException;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
@ -80,6 +81,17 @@ public class VotingOnlyNodePluginTests extends ESIntegTestCase {
|
|||
equalTo(false));
|
||||
}
|
||||
|
||||
public void testBootstrapOnlySingleVotingOnlyNode() throws Exception {
|
||||
internalCluster().setBootstrapMasterNodeIndex(0);
|
||||
internalCluster().startNode(Settings.builder().put(VotingOnlyNodePlugin.VOTING_ONLY_NODE_SETTING.getKey(), true)
|
||||
.put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s").build());
|
||||
internalCluster().startNode();
|
||||
assertBusy(() -> assertThat(client().admin().cluster().prepareState().get().getState().getNodes().getSize(), equalTo(2)));
|
||||
assertThat(
|
||||
VotingOnlyNodePlugin.isVotingOnlyNode(client().admin().cluster().prepareState().get().getState().nodes().getMasterNode()),
|
||||
equalTo(false));
|
||||
}
|
||||
|
||||
public void testVotingOnlyNodesCannotBeMasterWithoutFullMasterNodes() throws Exception {
|
||||
internalCluster().setBootstrapMasterNodeIndex(0);
|
||||
internalCluster().startNode();
|
||||
|
|
Loading…
Reference in New Issue