Internal: ClusterHealthAPI does not respect waitForEvents when local flag is set
It uses a cluster state update task and it gets rejected if not run on a master node. We should enable running on non-masters if the local flag is set. Also, report any unexpected error that may happen during this cluster state update task Closes #7731
This commit is contained in:
parent
3397908454
commit
db13eead54
|
@ -37,6 +37,7 @@ import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -74,6 +75,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati
|
||||||
|
|
||||||
if (request.waitForEvents() != null) {
|
if (request.waitForEvents() != null) {
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
final AtomicReference<ElasticsearchException> failure = new AtomicReference<>();
|
||||||
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", request.waitForEvents(), new ProcessedClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", request.waitForEvents(), new ProcessedClusterStateUpdateTask() {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
|
@ -88,6 +90,13 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(String source, Throwable t) {
|
public void onFailure(String source, Throwable t) {
|
||||||
logger.error("unexpected failure during [{}]", t, source);
|
logger.error("unexpected failure during [{}]", t, source);
|
||||||
|
failure.set(new ElasticsearchException("Error while waiting for events", t));
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean runOnlyOnMaster() {
|
||||||
|
return !request.local();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -96,6 +105,9 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
|
if (failure.get() != null) {
|
||||||
|
throw failure.get();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.cluster;
|
||||||
|
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||||
|
import org.elasticsearch.common.Priority;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -28,6 +29,20 @@ import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
public class ClusterHealthTests extends ElasticsearchIntegrationTest {
|
public class ClusterHealthTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void simpleLocalHealthTest() {
|
||||||
|
createIndex("test");
|
||||||
|
ensureGreen(); // master should thing it's green now.
|
||||||
|
|
||||||
|
for (String node : internalCluster().getNodeNames()) {
|
||||||
|
// a very high time out, which should never fire due to the local flag
|
||||||
|
ClusterHealthResponse health = client(node).admin().cluster().prepareHealth().setLocal(true).setWaitForEvents(Priority.LANGUID).setTimeout("30s").get("10s");
|
||||||
|
assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN));
|
||||||
|
assertThat(health.isTimedOut(), equalTo(false));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHealth() {
|
public void testHealth() {
|
||||||
logger.info("--> running cluster health on an index that does not exists");
|
logger.info("--> running cluster health on an index that does not exists");
|
||||||
|
|
Loading…
Reference in New Issue