From 6c2139c18672b50c39c3cc4861b0dc7706d58177 Mon Sep 17 00:00:00 2001 From: javanna Date: Fri, 16 Jan 2015 15:31:45 +0100 Subject: [PATCH] Tribe node: remove closed indices from cluster state At startup the tribe node ignores closed indices, but if you closed an index that was part of the tribe node cluster state, its state change was not currently handled. A NullPointerException could be seen in the logs instead as the routing table for the closed index was null. As a result, the index stayed in the tribe node cluster state in open state, although that didn't reflect reality. Also, subsequent cluster state updates happening in the tribe node kept failing, affecting updates related to any other index. The only way to recover from this was to restart the tribe node every time an index is closed on any tribe. This commit properly handles index state changes, making sure that when an index gets closed it gets removed from the tribe node cluster state. Note that it makes little sense to keep the closed index around in the tribe node, as from the tribe node you can't do anything with it. The tribe node simply doesn't see any closed index, it's the same as if they didn't exist. Closes #6411 Closes #9334 --- .../org/elasticsearch/tribe/TribeService.java | 2 +- .../org/elasticsearch/tribe/TribeTests.java | 106 ++++++++++++++---- 2 files changed, 83 insertions(+), 25 deletions(-) diff --git a/src/main/java/org/elasticsearch/tribe/TribeService.java b/src/main/java/org/elasticsearch/tribe/TribeService.java index 641923f1c30..910d88110fa 100644 --- a/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -255,7 +255,7 @@ public class TribeService extends AbstractLifecycleComponent { String markedTribeName = index.settings().get(TRIBE_NAME); if (markedTribeName != null && markedTribeName.equals(tribeName)) { IndexMetaData tribeIndex = tribeState.metaData().index(index.index()); - if (tribeIndex == null) { + if (tribeIndex == null || tribeIndex.state() == IndexMetaData.State.CLOSE) { logger.info("[{}] removing index [{}]", tribeName, index.index()); removeIndex(blocks, metaData, routingTable, index); } else { diff --git a/src/test/java/org/elasticsearch/tribe/TribeTests.java b/src/test/java/org/elasticsearch/tribe/TribeTests.java index c30dd92b6b9..a9a8bf2faab 100644 --- a/src/test/java/org/elasticsearch/tribe/TribeTests.java +++ b/src/test/java/org/elasticsearch/tribe/TribeTests.java @@ -20,12 +20,15 @@ package org.elasticsearch.tribe; import com.google.common.collect.ImmutableMap; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; @@ -34,6 +37,7 @@ import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeBuilder; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.TestCluster; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -44,6 +48,7 @@ import java.util.Map; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -125,7 +130,7 @@ public class TribeTests extends ElasticsearchIntegrationTest { public void testGlobalReadWriteBlocks() throws Exception { logger.info("create 2 indices, test1 on t1, and test2 on t2"); internalCluster().client().admin().indices().prepareCreate("test1").get(); - cluster2.client().admin().indices().prepareCreate("test2").get(); + assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); setupTribeNode(ImmutableSettings.builder() @@ -162,10 +167,10 @@ public class TribeTests extends ElasticsearchIntegrationTest { @Test public void testIndexWriteBlocks() throws Exception { logger.info("create 2 indices, test1 on t1, and test2 on t2"); - internalCluster().client().admin().indices().prepareCreate("test1").get(); - internalCluster().client().admin().indices().prepareCreate("block_test1").get(); - cluster2.client().admin().indices().prepareCreate("test2").get(); - cluster2.client().admin().indices().prepareCreate("block_test2").get(); + assertAcked(internalCluster().client().admin().indices().prepareCreate("test1")); + assertAcked(internalCluster().client().admin().indices().prepareCreate("block_test1")); + assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); + assertAcked(cluster2.client().admin().indices().prepareCreate("block_test2")); setupTribeNode(ImmutableSettings.builder() .put("tribe.blocks.write.indices", "block_*") @@ -226,10 +231,10 @@ public class TribeTests extends ElasticsearchIntegrationTest { logger.info("testing preference for tribe {}", tribe); logger.info("create 2 indices, test1 on t1, and test2 on t2"); - internalCluster().client().admin().indices().prepareCreate("conflict").get(); - cluster2.client().admin().indices().prepareCreate("conflict").get(); - internalCluster().client().admin().indices().prepareCreate("test1").get(); - cluster2.client().admin().indices().prepareCreate("test2").get(); + assertAcked(internalCluster().client().admin().indices().prepareCreate("conflict")); + assertAcked(cluster2.client().admin().indices().prepareCreate("conflict")); + assertAcked(internalCluster().client().admin().indices().prepareCreate("test1")); + assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); setupTribeNode(ImmutableSettings.builder() .put("tribe.on_conflict", "prefer_" + tribe) @@ -249,8 +254,8 @@ public class TribeTests extends ElasticsearchIntegrationTest { public void testTribeOnOneCluster() throws Exception { setupTribeNode(ImmutableSettings.EMPTY); logger.info("create 2 indices, test1 on t1, and test2 on t2"); - internalCluster().client().admin().indices().prepareCreate("test1").get(); - cluster2.client().admin().indices().prepareCreate("test2").get(); + assertAcked(internalCluster().client().admin().indices().prepareCreate("test1")); + assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); // wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state @@ -283,7 +288,7 @@ public class TribeTests extends ElasticsearchIntegrationTest { logger.info("write to another type"); tribeClient.prepareIndex("test1", "type2", "1").setSource("field1", "value1").get(); tribeClient.prepareIndex("test2", "type2", "1").setSource("field1", "value1").get(); - tribeClient.admin().indices().prepareRefresh().get(); + assertNoFailures(tribeClient.admin().indices().prepareRefresh().get()); logger.info("verify they are there"); @@ -310,20 +315,50 @@ public class TribeTests extends ElasticsearchIntegrationTest { logger.info("delete an index, and make sure its reflected"); cluster2.client().admin().indices().prepareDelete("test2").get(); - assertBusy(new Runnable() { - @Override - public void run() { - ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState(); - assertTrue(tribeState.getMetaData().hasIndex("test1")); - assertFalse(tribeState.getMetaData().hasIndex("test2")); - assertTrue(tribeState.getRoutingTable().hasIndex("test1")); - assertFalse(tribeState.getRoutingTable().hasIndex("test2")); - } - }); + awaitIndicesNotInClusterState("test2"); - logger.info("stop a node, make sure its reflected"); - cluster2.stopRandomDataNode(); + try { + logger.info("stop a node, make sure its reflected"); + cluster2.stopRandomDataNode(); + awaitSameNodeCounts(); + } finally { + cluster2.startNode(); + awaitSameNodeCounts(); + } + } + + @Test + public void testCloseAndOpenIndex() throws Exception { + //create an index and close it even before starting the tribe node + assertAcked(internalCluster().client().admin().indices().prepareCreate("test1")); + ensureGreen(internalCluster()); + assertAcked(internalCluster().client().admin().indices().prepareClose("test1")); + + setupTribeNode(ImmutableSettings.EMPTY); awaitSameNodeCounts(); + + //the closed index is not part of the tribe node cluster state + ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState(); + assertThat(tribeState.getMetaData().hasIndex("test1"), equalTo(false)); + + //open the index, it becomes part of the tribe node cluster state + assertAcked(internalCluster().client().admin().indices().prepareOpen("test1")); + awaitIndicesInClusterState("test1"); + ensureGreen(internalCluster()); + + //create a second index, wait till it is seen from within the tribe node + assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); + awaitIndicesInClusterState("test1", "test2"); + ensureGreen(cluster2); + + //close the second index, wait till it gets removed from the tribe node cluster state + assertAcked(cluster2.client().admin().indices().prepareClose("test2")); + awaitIndicesNotInClusterState("test2"); + + //open the second index, wait till it gets added back to the tribe node cluster state + assertAcked(cluster2.client().admin().indices().prepareOpen("test2")); + awaitIndicesInClusterState("test1", "test2"); + ensureGreen(cluster2); } private void awaitIndicesInClusterState(final String... indices) throws Exception { @@ -339,6 +374,29 @@ public class TribeTests extends ElasticsearchIntegrationTest { }); } + private void awaitIndicesNotInClusterState(final String... indices) throws Exception { + assertBusy(new Runnable() { + @Override + public void run() { + ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState(); + for (String index : indices) { + assertFalse(tribeState.getMetaData().hasIndex(index)); + assertFalse(tribeState.getRoutingTable().hasIndex(index)); + } + } + }); + } + + private void ensureGreen(TestCluster testCluster) { + ClusterHealthResponse actionGet = testCluster.client().admin().cluster() + .health(Requests.clusterHealthRequest().waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet(); + if (actionGet.isTimedOut()) { + logger.info("ensureGreen timed out, cluster state:\n{}\n{}", testCluster.client().admin().cluster().prepareState().get().getState().prettyPrint(), testCluster.client().admin().cluster().preparePendingClusterTasks().get().prettyPrint()); + assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false)); + } + assertThat(actionGet.getStatus(), equalTo(ClusterHealthStatus.GREEN)); + } + private void awaitSameNodeCounts() throws Exception { assertBusy(new Runnable() { @Override