Cluster Health should run on applied states, even if waitFor=0 #17440

We already protect against making decisions based on an inflight cluster state if someone asks for a waitFor rule (like wait for green). We should do the same for normal health checks as well (unless timeout is set to 0) as it be trappy to debug failures when health says the cluster is in a certain state, but that state wasn't applied yet.

Closes #17440
This commit is contained in:
Boaz Leskes 2016-03-31 09:11:08 +02:00
parent 9ebc0c8f47
commit 0b12cab07a
6 changed files with 118 additions and 10 deletions

View File

@ -143,7 +143,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
assert waitFor >= 0;
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext());
final ClusterState state = observer.observedState();
if (waitFor == 0 || request.timeout().millis() == 0) {
if (request.timeout().millis() == 0) {
listener.onResponse(getResponse(request, state, waitFor, request.timeout().millis() == 0));
return;
}

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.MasterNodeChangePredicate;
@ -116,10 +115,6 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
if (task != null) {
request.setParentTask(clusterService.localNode().getId(), task.getId());
}
// TODO do we really need to wrap it in a listener? the handlers should be cheap
if ((listener instanceof ThreadedActionListener) == false) {
listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener);
}
this.listener = listener;
}

View File

@ -41,7 +41,9 @@ import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
@ -53,17 +55,26 @@ import static org.elasticsearch.cluster.service.ClusterServiceUtils.createCluste
import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.mockito.Mockito.mock;
public class TransportBulkActionTookTests extends ESTestCase {
private ThreadPool threadPool;
static private ThreadPool threadPool;
private ClusterService clusterService;
@BeforeClass
public static void beforeClass() {
threadPool = new ThreadPool("TransportBulkActionTookTests");
}
@AfterClass
public static void afterClass() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
}
@Before
public void setUp() throws Exception {
super.setUp();
threadPool = mock(ThreadPool.class);
clusterService = createClusterService(threadPool);
}

View File

@ -19,22 +19,42 @@
package org.elasticsearch.cluster.health;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.cluster.service.ClusterServiceUtils.createClusterService;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.empty;
@ -45,6 +65,83 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class ClusterStateHealthTests extends ESTestCase {
private final IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(Settings.EMPTY);
private static ThreadPool threadPool;
private ClusterService clusterService;
private TransportService transportService;
private CapturingTransport transport;
@BeforeClass
public static void beforeClass() {
threadPool = new ThreadPool("ClusterStateHealthTests");
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
transport = new CapturingTransport();
clusterService = createClusterService(threadPool);
transportService = new TransportService(transport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();
}
@After
public void tearDown() throws Exception {
super.tearDown();
clusterService.close();
transportService.close();
}
@AfterClass
public static void afterClass() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
}
public void testClusterHealthWaitsForClusterStateApplication() throws InterruptedException, ExecutionException {
final CountDownLatch applyLatch = new CountDownLatch(1);
final CountDownLatch listenerCalled = new CountDownLatch(1);
clusterService.add(event -> {
listenerCalled.countDown();
try {
applyLatch.await();
} catch (InterruptedException e) {
logger.debug("interrupted", e);
}
});
clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return ClusterState.builder(currentState).build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.warn("unexpected failure", t);
}
});
logger.info("--> waiting for listener to be called and cluster state being blocked");
listenerCalled.await();
TransportClusterHealthAction action = new TransportClusterHealthAction(Settings.EMPTY, transportService,
clusterService, threadPool, clusterService.state().getClusterName(), new ActionFilters(new HashSet<>()),
indexNameExpressionResolver, NoopGatewayAllocator.INSTANCE);
PlainActionFuture<ClusterHealthResponse> listener = new PlainActionFuture<>();
action.execute(new ClusterHealthRequest(), listener);
assertFalse(listener.isDone());
applyLatch.countDown();
listener.get();
}
public void testClusterHealth() throws IOException {
RoutingTableGenerator routingTableGenerator = new RoutingTableGenerator();
RoutingTableGenerator.ShardCounter counter = new RoutingTableGenerator.ShardCounter();

View File

@ -48,7 +48,8 @@ public class ShardStateIT extends ESIntegTestCase {
indicesService.indexService(resolveIndex("test")).getShard(shard).failShard("simulated test failure", null);
logger.info("--> waiting for a yellow index");
assertBusy(() -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(), equalTo(ClusterHealthStatus.YELLOW)));
assertBusy(() -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(),
equalTo(ClusterHealthStatus.YELLOW)));
final long term0 = shard == 0 ? 2 : 1;
final long term1 = shard == 1 ? 2 : 1;

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
@ -60,6 +61,9 @@ public class ClusterServiceUtils {
clusterService.setClusterStatePublisher((event, ackListener) -> {
});
clusterService.start();
final DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterService.state().nodes());
nodes.masterNodeId(clusterService.localNode().getId());
setState(clusterService, ClusterState.builder(clusterService.state()).nodes(nodes));
return clusterService;
}