Tribe node: remove closed indices from cluster state

At startup the tribe node ignores closed indices, but if you closed an index that was part of the tribe node cluster state, its state change was not currently handled. A NullPointerException could be seen in the logs instead as the routing table for the closed index was null. As a result, the index stayed in the tribe node cluster state in open state, although that didn't reflect reality. Also, subsequent cluster state updates happening in the tribe node kept failing, affecting updates related to any other index. The only way to recover from this was to restart the tribe node every time an index is closed on any tribe.

This commit properly handles index state changes, making sure that when an index gets closed it gets removed from the tribe node cluster state. Note that it makes little sense to keep the closed index around in the tribe node, as from the tribe node you can't do anything with it. The tribe node simply doesn't see any closed index, it's the same as if they didn't exist.

Closes #6411
Closes #9334
This commit is contained in:
javanna 2015-01-16 15:31:45 +01:00 committed by Luca Cavanna
parent ff48c649b1
commit 6c2139c186
2 changed files with 83 additions and 25 deletions

View File

@ -255,7 +255,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
String markedTribeName = index.settings().get(TRIBE_NAME); String markedTribeName = index.settings().get(TRIBE_NAME);
if (markedTribeName != null && markedTribeName.equals(tribeName)) { if (markedTribeName != null && markedTribeName.equals(tribeName)) {
IndexMetaData tribeIndex = tribeState.metaData().index(index.index()); IndexMetaData tribeIndex = tribeState.metaData().index(index.index());
if (tribeIndex == null) { if (tribeIndex == null || tribeIndex.state() == IndexMetaData.State.CLOSE) {
logger.info("[{}] removing index [{}]", tribeName, index.index()); logger.info("[{}] removing index [{}]", tribeName, index.index());
removeIndex(blocks, metaData, routingTable, index); removeIndex(blocks, metaData, routingTable, index);
} else { } else {

View File

@ -20,12 +20,15 @@
package org.elasticsearch.tribe; package org.elasticsearch.tribe;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -34,6 +37,7 @@ import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder; import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.TestCluster;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -44,6 +48,7 @@ import java.util.Map;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
@ -125,7 +130,7 @@ public class TribeTests extends ElasticsearchIntegrationTest {
public void testGlobalReadWriteBlocks() throws Exception { public void testGlobalReadWriteBlocks() throws Exception {
logger.info("create 2 indices, test1 on t1, and test2 on t2"); logger.info("create 2 indices, test1 on t1, and test2 on t2");
internalCluster().client().admin().indices().prepareCreate("test1").get(); internalCluster().client().admin().indices().prepareCreate("test1").get();
cluster2.client().admin().indices().prepareCreate("test2").get(); assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));
setupTribeNode(ImmutableSettings.builder() setupTribeNode(ImmutableSettings.builder()
@ -162,10 +167,10 @@ public class TribeTests extends ElasticsearchIntegrationTest {
@Test @Test
public void testIndexWriteBlocks() throws Exception { public void testIndexWriteBlocks() throws Exception {
logger.info("create 2 indices, test1 on t1, and test2 on t2"); logger.info("create 2 indices, test1 on t1, and test2 on t2");
internalCluster().client().admin().indices().prepareCreate("test1").get(); assertAcked(internalCluster().client().admin().indices().prepareCreate("test1"));
internalCluster().client().admin().indices().prepareCreate("block_test1").get(); assertAcked(internalCluster().client().admin().indices().prepareCreate("block_test1"));
cluster2.client().admin().indices().prepareCreate("test2").get(); assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));
cluster2.client().admin().indices().prepareCreate("block_test2").get(); assertAcked(cluster2.client().admin().indices().prepareCreate("block_test2"));
setupTribeNode(ImmutableSettings.builder() setupTribeNode(ImmutableSettings.builder()
.put("tribe.blocks.write.indices", "block_*") .put("tribe.blocks.write.indices", "block_*")
@ -226,10 +231,10 @@ public class TribeTests extends ElasticsearchIntegrationTest {
logger.info("testing preference for tribe {}", tribe); logger.info("testing preference for tribe {}", tribe);
logger.info("create 2 indices, test1 on t1, and test2 on t2"); logger.info("create 2 indices, test1 on t1, and test2 on t2");
internalCluster().client().admin().indices().prepareCreate("conflict").get(); assertAcked(internalCluster().client().admin().indices().prepareCreate("conflict"));
cluster2.client().admin().indices().prepareCreate("conflict").get(); assertAcked(cluster2.client().admin().indices().prepareCreate("conflict"));
internalCluster().client().admin().indices().prepareCreate("test1").get(); assertAcked(internalCluster().client().admin().indices().prepareCreate("test1"));
cluster2.client().admin().indices().prepareCreate("test2").get(); assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));
setupTribeNode(ImmutableSettings.builder() setupTribeNode(ImmutableSettings.builder()
.put("tribe.on_conflict", "prefer_" + tribe) .put("tribe.on_conflict", "prefer_" + tribe)
@ -249,8 +254,8 @@ public class TribeTests extends ElasticsearchIntegrationTest {
public void testTribeOnOneCluster() throws Exception { public void testTribeOnOneCluster() throws Exception {
setupTribeNode(ImmutableSettings.EMPTY); setupTribeNode(ImmutableSettings.EMPTY);
logger.info("create 2 indices, test1 on t1, and test2 on t2"); logger.info("create 2 indices, test1 on t1, and test2 on t2");
internalCluster().client().admin().indices().prepareCreate("test1").get(); assertAcked(internalCluster().client().admin().indices().prepareCreate("test1"));
cluster2.client().admin().indices().prepareCreate("test2").get(); assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));
// wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state // wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state
@ -283,7 +288,7 @@ public class TribeTests extends ElasticsearchIntegrationTest {
logger.info("write to another type"); logger.info("write to another type");
tribeClient.prepareIndex("test1", "type2", "1").setSource("field1", "value1").get(); tribeClient.prepareIndex("test1", "type2", "1").setSource("field1", "value1").get();
tribeClient.prepareIndex("test2", "type2", "1").setSource("field1", "value1").get(); tribeClient.prepareIndex("test2", "type2", "1").setSource("field1", "value1").get();
tribeClient.admin().indices().prepareRefresh().get(); assertNoFailures(tribeClient.admin().indices().prepareRefresh().get());
logger.info("verify they are there"); logger.info("verify they are there");
@ -310,20 +315,50 @@ public class TribeTests extends ElasticsearchIntegrationTest {
logger.info("delete an index, and make sure its reflected"); logger.info("delete an index, and make sure its reflected");
cluster2.client().admin().indices().prepareDelete("test2").get(); cluster2.client().admin().indices().prepareDelete("test2").get();
assertBusy(new Runnable() { awaitIndicesNotInClusterState("test2");
@Override
public void run() {
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState();
assertTrue(tribeState.getMetaData().hasIndex("test1"));
assertFalse(tribeState.getMetaData().hasIndex("test2"));
assertTrue(tribeState.getRoutingTable().hasIndex("test1"));
assertFalse(tribeState.getRoutingTable().hasIndex("test2"));
}
});
logger.info("stop a node, make sure its reflected"); try {
cluster2.stopRandomDataNode(); logger.info("stop a node, make sure its reflected");
cluster2.stopRandomDataNode();
awaitSameNodeCounts();
} finally {
cluster2.startNode();
awaitSameNodeCounts();
}
}
@Test
public void testCloseAndOpenIndex() throws Exception {
//create an index and close it even before starting the tribe node
assertAcked(internalCluster().client().admin().indices().prepareCreate("test1"));
ensureGreen(internalCluster());
assertAcked(internalCluster().client().admin().indices().prepareClose("test1"));
setupTribeNode(ImmutableSettings.EMPTY);
awaitSameNodeCounts(); awaitSameNodeCounts();
//the closed index is not part of the tribe node cluster state
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState();
assertThat(tribeState.getMetaData().hasIndex("test1"), equalTo(false));
//open the index, it becomes part of the tribe node cluster state
assertAcked(internalCluster().client().admin().indices().prepareOpen("test1"));
awaitIndicesInClusterState("test1");
ensureGreen(internalCluster());
//create a second index, wait till it is seen from within the tribe node
assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));
awaitIndicesInClusterState("test1", "test2");
ensureGreen(cluster2);
//close the second index, wait till it gets removed from the tribe node cluster state
assertAcked(cluster2.client().admin().indices().prepareClose("test2"));
awaitIndicesNotInClusterState("test2");
//open the second index, wait till it gets added back to the tribe node cluster state
assertAcked(cluster2.client().admin().indices().prepareOpen("test2"));
awaitIndicesInClusterState("test1", "test2");
ensureGreen(cluster2);
} }
private void awaitIndicesInClusterState(final String... indices) throws Exception { private void awaitIndicesInClusterState(final String... indices) throws Exception {
@ -339,6 +374,29 @@ public class TribeTests extends ElasticsearchIntegrationTest {
}); });
} }
private void awaitIndicesNotInClusterState(final String... indices) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState();
for (String index : indices) {
assertFalse(tribeState.getMetaData().hasIndex(index));
assertFalse(tribeState.getRoutingTable().hasIndex(index));
}
}
});
}
private void ensureGreen(TestCluster testCluster) {
ClusterHealthResponse actionGet = testCluster.client().admin().cluster()
.health(Requests.clusterHealthRequest().waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
if (actionGet.isTimedOut()) {
logger.info("ensureGreen timed out, cluster state:\n{}\n{}", testCluster.client().admin().cluster().prepareState().get().getState().prettyPrint(), testCluster.client().admin().cluster().preparePendingClusterTasks().get().prettyPrint());
assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false));
}
assertThat(actionGet.getStatus(), equalTo(ClusterHealthStatus.GREEN));
}
private void awaitSameNodeCounts() throws Exception { private void awaitSameNodeCounts() throws Exception {
assertBusy(new Runnable() { assertBusy(new Runnable() {
@Override @Override