Cluster state from API should always have a master (#42454)
Today the `TransportClusterStateAction` ignores the state passed by the `TransportMasterNodeAction` and obtains its state from the cluster applier. This might be inconsistent, showing a different node as the master or maybe even having no master. This change adjusts the action to use the passed-in state directly, and adds tests showing that the state returned is consistent with our expectations even if there is a concurrent master failover. Fixes #38331 Relates #38432
This commit is contained in:
parent
528f8cc073
commit
f864f6a740
|
@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.cluster.state;
|
|||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
public class ClusterStateRequestBuilder extends MasterNodeReadOperationRequestBuilder<ClusterStateRequest,
|
||||
ClusterStateResponse, ClusterStateRequestBuilder> {
|
||||
|
@ -100,4 +101,21 @@ public class ClusterStateRequestBuilder extends MasterNodeReadOperationRequestBu
|
|||
request.indicesOptions(indicesOptions);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Causes the request to wait for the metadata version to advance to at least the given version.
|
||||
* @param waitForMetaDataVersion The metadata version for which to wait
|
||||
*/
|
||||
public ClusterStateRequestBuilder setWaitForMetaDataVersion(long waitForMetaDataVersion) {
|
||||
request.waitForMetaDataVersion(waitForMetaDataVersion);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* If {@link ClusterStateRequest#waitForMetaDataVersion()} is set then this determines how long to wait
|
||||
*/
|
||||
public ClusterStateRequestBuilder setWaitForTimeOut(TimeValue waitForTimeout) {
|
||||
request.waitForTimeout(waitForTimeout);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.action.support.ActionFilters;
|
|||
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.NotMasterException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
|
@ -86,50 +87,50 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
|
|||
protected void masterOperation(final ClusterStateRequest request, final ClusterState state,
|
||||
final ActionListener<ClusterStateResponse> listener) throws IOException {
|
||||
|
||||
if (request.waitForMetaDataVersion() != null) {
|
||||
final Predicate<ClusterState> metadataVersionPredicate = clusterState -> {
|
||||
return clusterState.metaData().version() >= request.waitForMetaDataVersion();
|
||||
};
|
||||
final ClusterStateObserver observer =
|
||||
new ClusterStateObserver(clusterService, request.waitForTimeout(), logger, threadPool.getThreadContext());
|
||||
final ClusterState clusterState = observer.setAndGetObservedState();
|
||||
if (metadataVersionPredicate.test(clusterState)) {
|
||||
buildResponse(request, clusterState, listener);
|
||||
} else {
|
||||
observer.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
try {
|
||||
buildResponse(request, state, listener);
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
final Predicate<ClusterState> acceptableClusterStatePredicate
|
||||
= request.waitForMetaDataVersion() == null ? clusterState -> true
|
||||
: clusterState -> clusterState.metaData().version() >= request.waitForMetaDataVersion();
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new NodeClosedException(clusterService.localNode()));
|
||||
}
|
||||
final Predicate<ClusterState> acceptableClusterStateOrNotMasterPredicate = request.local()
|
||||
? acceptableClusterStatePredicate
|
||||
: acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedMaster() == false);
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
try {
|
||||
listener.onResponse(new ClusterStateResponse(clusterState.getClusterName(), null, true));
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
}, metadataVersionPredicate);
|
||||
}
|
||||
if (acceptableClusterStatePredicate.test(state)) {
|
||||
ActionListener.completeWith(listener, () -> buildResponse(request, state));
|
||||
} else {
|
||||
ClusterState currentState = clusterService.state();
|
||||
buildResponse(request, currentState, listener);
|
||||
assert acceptableClusterStateOrNotMasterPredicate.test(state) == false;
|
||||
new ClusterStateObserver(state, clusterService, request.waitForTimeout(), logger, threadPool.getThreadContext())
|
||||
.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState newState) {
|
||||
if (acceptableClusterStatePredicate.test(newState)) {
|
||||
ActionListener.completeWith(listener, () -> buildResponse(request, newState));
|
||||
} else {
|
||||
listener.onFailure(new NotMasterException(
|
||||
"master stepped down waiting for metadata version " + request.waitForMetaDataVersion()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new NodeClosedException(clusterService.localNode()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
try {
|
||||
listener.onResponse(new ClusterStateResponse(state.getClusterName(), null, true));
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
}, acceptableClusterStateOrNotMasterPredicate);
|
||||
}
|
||||
}
|
||||
|
||||
private void buildResponse(final ClusterStateRequest request,
|
||||
final ClusterState currentState,
|
||||
final ActionListener<ClusterStateResponse> listener) throws IOException {
|
||||
private ClusterStateResponse buildResponse(final ClusterStateRequest request,
|
||||
final ClusterState currentState) {
|
||||
logger.trace("Serving cluster state request using version {}", currentState.version());
|
||||
ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName());
|
||||
builder.version(currentState.version());
|
||||
|
@ -192,8 +193,7 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
|
|||
}
|
||||
}
|
||||
|
||||
listener.onResponse(new ClusterStateResponse(currentState.getClusterName(), builder.build(), false));
|
||||
return new ClusterStateResponse(currentState.getClusterName(), builder.build(), false);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,182 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.cluster.state;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.MasterNotDiscoveredException;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(numDataNodes = 0, scope = ESIntegTestCase.Scope.TEST, transportClientRatio = 0)
|
||||
public class TransportClusterStateActionDisruptionIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Collections.singletonList(MockTransportService.TestPlugin.class);
|
||||
}
|
||||
|
||||
public void testNonLocalRequestAlwaysFindsMaster() throws Exception {
|
||||
runRepeatedlyWhileChangingMaster(() -> {
|
||||
final ClusterStateRequestBuilder clusterStateRequestBuilder = client().admin().cluster().prepareState()
|
||||
.clear().setNodes(true).setMasterNodeTimeout("100ms");
|
||||
final ClusterStateResponse clusterStateResponse;
|
||||
try {
|
||||
clusterStateResponse = clusterStateRequestBuilder.get();
|
||||
} catch (MasterNotDiscoveredException e) {
|
||||
return; // ok, we hit the disconnected node
|
||||
}
|
||||
assertNotNull("should always contain a master node", clusterStateResponse.getState().nodes().getMasterNodeId());
|
||||
});
|
||||
}
|
||||
|
||||
public void testLocalRequestAlwaysSucceeds() throws Exception {
|
||||
runRepeatedlyWhileChangingMaster(() -> {
|
||||
final String node = randomFrom(internalCluster().getNodeNames());
|
||||
final DiscoveryNodes discoveryNodes = client(node).admin().cluster().prepareState()
|
||||
.clear().setLocal(true).setNodes(true).setMasterNodeTimeout("100ms").get().getState().nodes();
|
||||
for (DiscoveryNode discoveryNode : discoveryNodes) {
|
||||
if (discoveryNode.getName().equals(node)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
fail("nodes did not contain [" + node + "]: " + discoveryNodes);
|
||||
});
|
||||
}
|
||||
|
||||
public void testNonLocalRequestAlwaysFindsMasterAndWaitsForMetadata() throws Exception {
|
||||
runRepeatedlyWhileChangingMaster(() -> {
|
||||
final String node = randomFrom(internalCluster().getNodeNames());
|
||||
final long metadataVersion
|
||||
= internalCluster().getInstance(ClusterService.class, node).getClusterApplierService().state().metaData().version();
|
||||
final long waitForMetaDataVersion = randomLongBetween(Math.max(1, metadataVersion - 3), metadataVersion + 5);
|
||||
final ClusterStateRequestBuilder clusterStateRequestBuilder = client(node).admin().cluster().prepareState()
|
||||
.clear().setNodes(true).setMetaData(true)
|
||||
.setMasterNodeTimeout(TimeValue.timeValueMillis(100)).setWaitForTimeOut(TimeValue.timeValueMillis(100))
|
||||
.setWaitForMetaDataVersion(waitForMetaDataVersion);
|
||||
final ClusterStateResponse clusterStateResponse;
|
||||
try {
|
||||
clusterStateResponse = clusterStateRequestBuilder.get();
|
||||
} catch (MasterNotDiscoveredException e) {
|
||||
return; // ok, we hit the disconnected node
|
||||
}
|
||||
if (clusterStateResponse.isWaitForTimedOut() == false) {
|
||||
final ClusterState state = clusterStateResponse.getState();
|
||||
assertNotNull("should always contain a master node", state.nodes().getMasterNodeId());
|
||||
assertThat("waited for metadata version", state.metaData().version(), greaterThanOrEqualTo(waitForMetaDataVersion));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void testLocalRequestWaitsForMetadata() throws Exception {
|
||||
runRepeatedlyWhileChangingMaster(() -> {
|
||||
final String node = randomFrom(internalCluster().getNodeNames());
|
||||
final long metadataVersion
|
||||
= internalCluster().getInstance(ClusterService.class, node).getClusterApplierService().state().metaData().version();
|
||||
final long waitForMetaDataVersion = randomLongBetween(Math.max(1, metadataVersion - 3), metadataVersion + 5);
|
||||
final ClusterStateResponse clusterStateResponse = client(node).admin().cluster()
|
||||
.prepareState().clear().setLocal(true).setMetaData(true).setWaitForMetaDataVersion(waitForMetaDataVersion)
|
||||
.setMasterNodeTimeout(TimeValue.timeValueMillis(100)).setWaitForTimeOut(TimeValue.timeValueMillis(100))
|
||||
.get();
|
||||
if (clusterStateResponse.isWaitForTimedOut() == false) {
|
||||
final MetaData metaData = clusterStateResponse.getState().metaData();
|
||||
assertThat("waited for metadata version " + waitForMetaDataVersion + " with node " + node,
|
||||
metaData.version(), greaterThanOrEqualTo(waitForMetaDataVersion));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void runRepeatedlyWhileChangingMaster(Runnable runnable) throws Exception {
|
||||
internalCluster().startNodes(3);
|
||||
|
||||
assertBusy(() -> assertThat(client().admin().cluster().prepareState().clear().setMetaData(true)
|
||||
.get().getState().getLastCommittedConfiguration().getNodeIds().stream()
|
||||
.filter(n -> ClusterBootstrapService.isBootstrapPlaceholder(n) == false).collect(Collectors.toSet()), hasSize(3)));
|
||||
|
||||
final String masterName = internalCluster().getMasterName();
|
||||
|
||||
final AtomicBoolean shutdown = new AtomicBoolean();
|
||||
final Thread assertingThread = new Thread(() -> {
|
||||
while (shutdown.get() == false) {
|
||||
runnable.run();
|
||||
}
|
||||
}, "asserting thread");
|
||||
|
||||
final Thread updatingThread = new Thread(() -> {
|
||||
String value = "none";
|
||||
while (shutdown.get() == false) {
|
||||
value = "none".equals(value) ? "all" : "none";
|
||||
final String nonMasterNode = randomValueOtherThan(masterName, () -> randomFrom(internalCluster().getNodeNames()));
|
||||
assertAcked(client(nonMasterNode).admin().cluster().prepareUpdateSettings().setPersistentSettings(
|
||||
Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), value)));
|
||||
}
|
||||
}, "updating thread");
|
||||
|
||||
final List<MockTransportService> mockTransportServices
|
||||
= StreamSupport.stream(internalCluster().getInstances(TransportService.class).spliterator(), false)
|
||||
.map(ts -> (MockTransportService) ts).collect(Collectors.toList());
|
||||
|
||||
assertingThread.start();
|
||||
updatingThread.start();
|
||||
|
||||
final MockTransportService masterTransportService
|
||||
= (MockTransportService) internalCluster().getInstance(TransportService.class, masterName);
|
||||
|
||||
for (MockTransportService mockTransportService : mockTransportServices) {
|
||||
if (masterTransportService != mockTransportService) {
|
||||
masterTransportService.addFailToSendNoConnectRule(mockTransportService);
|
||||
mockTransportService.addFailToSendNoConnectRule(masterTransportService);
|
||||
}
|
||||
}
|
||||
|
||||
assertBusy(() -> {
|
||||
final String nonMasterNode = randomValueOtherThan(masterName, () -> randomFrom(internalCluster().getNodeNames()));
|
||||
final String claimedMasterName = internalCluster().getMasterName(nonMasterNode);
|
||||
assertThat(claimedMasterName, not(equalTo(masterName)));
|
||||
});
|
||||
|
||||
shutdown.set(true);
|
||||
assertingThread.join();
|
||||
updatingThread.join();
|
||||
internalCluster().close();
|
||||
}
|
||||
|
||||
}
|
|
@ -167,7 +167,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
@ -1957,9 +1956,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
public String getMasterName(@Nullable String viaNode) {
|
||||
try {
|
||||
Client client = viaNode != null ? client(viaNode) : client();
|
||||
final DiscoveryNode masterNode = client.admin().cluster().prepareState().get().getState().nodes().getMasterNode();
|
||||
assertNotNull(masterNode);
|
||||
return masterNode.getName();
|
||||
return client.admin().cluster().prepareState().get().getState().nodes().getMasterNode().getName();
|
||||
} catch (Exception e) {
|
||||
logger.warn("Can't fetch cluster state", e);
|
||||
throw new RuntimeException("Can't get master node " + e.getMessage(), e);
|
||||
|
|
Loading…
Reference in New Issue