From db13eead54a707bbac549225ff4c0332e74369bd Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 15 Sep 2014 20:19:07 +0200 Subject: [PATCH] Internal: ClusterHealthAPI does not respect waitForEvents when local flag is set It uses a cluster state update task and it gets rejected if not run on a master node. We should enable running on non-masters if the local flag is set. Also, report any unexpected error that may happen during this cluster state update task Closes #7731 --- .../health/TransportClusterHealthAction.java | 12 ++++++++++++ .../elasticsearch/cluster/ClusterHealthTests.java | 15 +++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java index 8358ea39832..95dcf636431 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -37,6 +37,7 @@ import org.elasticsearch.transport.TransportService; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * @@ -74,6 +75,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati if (request.waitForEvents() != null) { final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference failure = new AtomicReference<>(); clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", request.waitForEvents(), new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -88,6 +90,13 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati @Override public void onFailure(String source, Throwable t) { logger.error("unexpected failure during [{}]", t, source); + failure.set(new ElasticsearchException("Error while waiting for events", t)); + latch.countDown(); + } + + @Override + public boolean runOnlyOnMaster() { + return !request.local(); } }); @@ -96,6 +105,9 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati } catch (InterruptedException e) { // ignore } + if (failure.get() != null) { + throw failure.get(); + } } diff --git a/src/test/java/org/elasticsearch/cluster/ClusterHealthTests.java b/src/test/java/org/elasticsearch/cluster/ClusterHealthTests.java index d262c7dd3ed..658da8bde36 100644 --- a/src/test/java/org/elasticsearch/cluster/ClusterHealthTests.java +++ b/src/test/java/org/elasticsearch/cluster/ClusterHealthTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.common.Priority; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.junit.Test; @@ -28,6 +29,20 @@ import static org.hamcrest.Matchers.equalTo; public class ClusterHealthTests extends ElasticsearchIntegrationTest { + + @Test + public void simpleLocalHealthTest() { + createIndex("test"); + ensureGreen(); // master should thing it's green now. + + for (String node : internalCluster().getNodeNames()) { + // a very high time out, which should never fire due to the local flag + ClusterHealthResponse health = client(node).admin().cluster().prepareHealth().setLocal(true).setWaitForEvents(Priority.LANGUID).setTimeout("30s").get("10s"); + assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(health.isTimedOut(), equalTo(false)); + } + } + @Test public void testHealth() { logger.info("--> running cluster health on an index that does not exists");