diff --git a/src/main/java/org/elasticsearch/tribe/TribeService.java b/src/main/java/org/elasticsearch/tribe/TribeService.java index 641923f1c30..910d88110fa 100644 --- a/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -255,7 +255,7 @@ public class TribeService extends AbstractLifecycleComponent { String markedTribeName = index.settings().get(TRIBE_NAME); if (markedTribeName != null && markedTribeName.equals(tribeName)) { IndexMetaData tribeIndex = tribeState.metaData().index(index.index()); - if (tribeIndex == null) { + if (tribeIndex == null || tribeIndex.state() == IndexMetaData.State.CLOSE) { logger.info("[{}] removing index [{}]", tribeName, index.index()); removeIndex(blocks, metaData, routingTable, index); } else { diff --git a/src/test/java/org/elasticsearch/tribe/TribeTests.java b/src/test/java/org/elasticsearch/tribe/TribeTests.java index c30dd92b6b9..a9a8bf2faab 100644 --- a/src/test/java/org/elasticsearch/tribe/TribeTests.java +++ b/src/test/java/org/elasticsearch/tribe/TribeTests.java @@ -20,12 +20,15 @@ package org.elasticsearch.tribe; import com.google.common.collect.ImmutableMap; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; @@ -34,6 +37,7 @@ import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeBuilder; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.TestCluster; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -44,6 +48,7 @@ import java.util.Map; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -125,7 +130,7 @@ public class TribeTests extends ElasticsearchIntegrationTest { public void testGlobalReadWriteBlocks() throws Exception { logger.info("create 2 indices, test1 on t1, and test2 on t2"); internalCluster().client().admin().indices().prepareCreate("test1").get(); - cluster2.client().admin().indices().prepareCreate("test2").get(); + assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); setupTribeNode(ImmutableSettings.builder() @@ -162,10 +167,10 @@ public class TribeTests extends ElasticsearchIntegrationTest { @Test public void testIndexWriteBlocks() throws Exception { logger.info("create 2 indices, test1 on t1, and test2 on t2"); - internalCluster().client().admin().indices().prepareCreate("test1").get(); - internalCluster().client().admin().indices().prepareCreate("block_test1").get(); - cluster2.client().admin().indices().prepareCreate("test2").get(); - cluster2.client().admin().indices().prepareCreate("block_test2").get(); + assertAcked(internalCluster().client().admin().indices().prepareCreate("test1")); + assertAcked(internalCluster().client().admin().indices().prepareCreate("block_test1")); + assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); + assertAcked(cluster2.client().admin().indices().prepareCreate("block_test2")); setupTribeNode(ImmutableSettings.builder() .put("tribe.blocks.write.indices", "block_*") @@ -226,10 +231,10 @@ public class TribeTests extends ElasticsearchIntegrationTest { logger.info("testing preference for tribe {}", tribe); logger.info("create 2 indices, test1 on t1, and test2 on t2"); - internalCluster().client().admin().indices().prepareCreate("conflict").get(); - cluster2.client().admin().indices().prepareCreate("conflict").get(); - internalCluster().client().admin().indices().prepareCreate("test1").get(); - cluster2.client().admin().indices().prepareCreate("test2").get(); + assertAcked(internalCluster().client().admin().indices().prepareCreate("conflict")); + assertAcked(cluster2.client().admin().indices().prepareCreate("conflict")); + assertAcked(internalCluster().client().admin().indices().prepareCreate("test1")); + assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); setupTribeNode(ImmutableSettings.builder() .put("tribe.on_conflict", "prefer_" + tribe) @@ -249,8 +254,8 @@ public class TribeTests extends ElasticsearchIntegrationTest { public void testTribeOnOneCluster() throws Exception { setupTribeNode(ImmutableSettings.EMPTY); logger.info("create 2 indices, test1 on t1, and test2 on t2"); - internalCluster().client().admin().indices().prepareCreate("test1").get(); - cluster2.client().admin().indices().prepareCreate("test2").get(); + assertAcked(internalCluster().client().admin().indices().prepareCreate("test1")); + assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); // wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state @@ -283,7 +288,7 @@ public class TribeTests extends ElasticsearchIntegrationTest { logger.info("write to another type"); tribeClient.prepareIndex("test1", "type2", "1").setSource("field1", "value1").get(); tribeClient.prepareIndex("test2", "type2", "1").setSource("field1", "value1").get(); - tribeClient.admin().indices().prepareRefresh().get(); + assertNoFailures(tribeClient.admin().indices().prepareRefresh().get()); logger.info("verify they are there"); @@ -310,20 +315,50 @@ public class TribeTests extends ElasticsearchIntegrationTest { logger.info("delete an index, and make sure its reflected"); cluster2.client().admin().indices().prepareDelete("test2").get(); - assertBusy(new Runnable() { - @Override - public void run() { - ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState(); - assertTrue(tribeState.getMetaData().hasIndex("test1")); - assertFalse(tribeState.getMetaData().hasIndex("test2")); - assertTrue(tribeState.getRoutingTable().hasIndex("test1")); - assertFalse(tribeState.getRoutingTable().hasIndex("test2")); - } - }); + awaitIndicesNotInClusterState("test2"); - logger.info("stop a node, make sure its reflected"); - cluster2.stopRandomDataNode(); + try { + logger.info("stop a node, make sure its reflected"); + cluster2.stopRandomDataNode(); + awaitSameNodeCounts(); + } finally { + cluster2.startNode(); + awaitSameNodeCounts(); + } + } + + @Test + public void testCloseAndOpenIndex() throws Exception { + //create an index and close it even before starting the tribe node + assertAcked(internalCluster().client().admin().indices().prepareCreate("test1")); + ensureGreen(internalCluster()); + assertAcked(internalCluster().client().admin().indices().prepareClose("test1")); + + setupTribeNode(ImmutableSettings.EMPTY); awaitSameNodeCounts(); + + //the closed index is not part of the tribe node cluster state + ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState(); + assertThat(tribeState.getMetaData().hasIndex("test1"), equalTo(false)); + + //open the index, it becomes part of the tribe node cluster state + assertAcked(internalCluster().client().admin().indices().prepareOpen("test1")); + awaitIndicesInClusterState("test1"); + ensureGreen(internalCluster()); + + //create a second index, wait till it is seen from within the tribe node + assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); + awaitIndicesInClusterState("test1", "test2"); + ensureGreen(cluster2); + + //close the second index, wait till it gets removed from the tribe node cluster state + assertAcked(cluster2.client().admin().indices().prepareClose("test2")); + awaitIndicesNotInClusterState("test2"); + + //open the second index, wait till it gets added back to the tribe node cluster state + assertAcked(cluster2.client().admin().indices().prepareOpen("test2")); + awaitIndicesInClusterState("test1", "test2"); + ensureGreen(cluster2); } private void awaitIndicesInClusterState(final String... indices) throws Exception { @@ -339,6 +374,29 @@ public class TribeTests extends ElasticsearchIntegrationTest { }); } + private void awaitIndicesNotInClusterState(final String... indices) throws Exception { + assertBusy(new Runnable() { + @Override + public void run() { + ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState(); + for (String index : indices) { + assertFalse(tribeState.getMetaData().hasIndex(index)); + assertFalse(tribeState.getRoutingTable().hasIndex(index)); + } + } + }); + } + + private void ensureGreen(TestCluster testCluster) { + ClusterHealthResponse actionGet = testCluster.client().admin().cluster() + .health(Requests.clusterHealthRequest().waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet(); + if (actionGet.isTimedOut()) { + logger.info("ensureGreen timed out, cluster state:\n{}\n{}", testCluster.client().admin().cluster().prepareState().get().getState().prettyPrint(), testCluster.client().admin().cluster().preparePendingClusterTasks().get().prettyPrint()); + assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false)); + } + assertThat(actionGet.getStatus(), equalTo(ClusterHealthStatus.GREEN)); + } + private void awaitSameNodeCounts() throws Exception { assertBusy(new Runnable() { @Override