Tribe: Index level blocks, index conflict settings

allow to configure on the index level which blocks can optionally be applied using tribe.blocks.indices prefix settings.
allow to control what will be done when a conflict is detected on index names coming from several clusters using the tribe.on_conflict setting. Defaults remains "any", but now support also "drop" and "prefer_[tribeName]".
closes #5501
This commit is contained in:
Shay Banon 2014-03-23 19:02:23 +01:00
parent 029c7b174a
commit 6fce15beec
3 changed files with 275 additions and 29 deletions

View File

@ -36,7 +36,7 @@ indexing, etc.
However, there are a few exceptions: However, there are a few exceptions:
* The merged view cannot handle indices with the same name in multiple * The merged view cannot handle indices with the same name in multiple
clusters. It will pick one of them and discard the other. clusters. By default it will pick one of them, see later for on_conflict options.
* Master level read operations (eg <<cluster-state>>, <<cluster-health>>) * Master level read operations (eg <<cluster-state>>, <<cluster-health>>)
will automatically execute with a local flag set to true since there is will automatically execute with a local flag set to true since there is
@ -56,3 +56,21 @@ tribe:
metadata: true metadata: true
-------------------------------- --------------------------------
coming[1.2.0]
The tribe node can also configure blocks on indices explicitly:
[source,yaml]
--------------------------------
tribe:
blocks:
indices.write: hk*,ldn*
--------------------------------
coming[1.2.0]
When there is a conflict and multiple clusters hold the same index, by default
the tribe node will pick one of them. This can be configured using the `tribe.on_conflict`
setting. It defaults to `any`, but can be set to `drop` (drop indices that have
a conflict), or `prefer_[tribeName]` to prefer the index from a specific tribe.

View File

@ -22,23 +22,28 @@ package org.elasticsearch.tribe;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction; import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.node.NodeBuilder; import org.elasticsearch.node.NodeBuilder;
@ -47,6 +52,7 @@ import org.elasticsearch.rest.RestStatus;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
/** /**
@ -104,6 +110,13 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
public static final String TRIBE_NAME = "tribe.name"; public static final String TRIBE_NAME = "tribe.name";
private final ClusterService clusterService; private final ClusterService clusterService;
private final String[] blockIndicesWrite;
private final String[] blockIndicesRead;
private final String[] blockIndicesMetadata;
private static final String ON_CONFLICT_ANY = "any", ON_CONFLICT_DROP = "drop", ON_CONFLICT_PREFER = "prefer_";
private final String onConflict;
private final Set<String> droppedIndices = ConcurrentCollections.newConcurrentSet();
private final List<InternalNode> nodes = Lists.newCopyOnWriteArrayList(); private final List<InternalNode> nodes = Lists.newCopyOnWriteArrayList();
@ -113,6 +126,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
this.clusterService = clusterService; this.clusterService = clusterService;
Map<String, Settings> nodesSettings = Maps.newHashMap(settings.getGroups("tribe", true)); Map<String, Settings> nodesSettings = Maps.newHashMap(settings.getGroups("tribe", true));
nodesSettings.remove("blocks"); // remove prefix settings that don't indicate a client nodesSettings.remove("blocks"); // remove prefix settings that don't indicate a client
nodesSettings.remove("on_conflict"); // remove prefix settings that don't indicate a client
for (Map.Entry<String, Settings> entry : nodesSettings.entrySet()) { for (Map.Entry<String, Settings> entry : nodesSettings.entrySet()) {
ImmutableSettings.Builder sb = ImmutableSettings.builder().put(entry.getValue()); ImmutableSettings.Builder sb = ImmutableSettings.builder().put(entry.getValue());
sb.put("node.name", settings.get("name") + "/" + entry.getKey()); sb.put("node.name", settings.get("name") + "/" + entry.getKey());
@ -123,6 +137,9 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
nodes.add((InternalNode) NodeBuilder.nodeBuilder().settings(sb).client(true).build()); nodes.add((InternalNode) NodeBuilder.nodeBuilder().settings(sb).client(true).build());
} }
String[] blockIndicesWrite = Strings.EMPTY_ARRAY;
String[] blockIndicesRead = Strings.EMPTY_ARRAY;
String[] blockIndicesMetadata = Strings.EMPTY_ARRAY;
if (!nodes.isEmpty()) { if (!nodes.isEmpty()) {
// remove the initial election / recovery blocks since we are not going to have a // remove the initial election / recovery blocks since we are not going to have a
// master elected in this single tribe node local "cluster" // master elected in this single tribe node local "cluster"
@ -131,13 +148,21 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
if (settings.getAsBoolean("tribe.blocks.write", false)) { if (settings.getAsBoolean("tribe.blocks.write", false)) {
clusterService.addInitialStateBlock(TRIBE_WRITE_BLOCK); clusterService.addInitialStateBlock(TRIBE_WRITE_BLOCK);
} }
blockIndicesWrite = settings.getAsArray("tribe.blocks.write.indices", Strings.EMPTY_ARRAY);
if (settings.getAsBoolean("tribe.blocks.metadata", false)) { if (settings.getAsBoolean("tribe.blocks.metadata", false)) {
clusterService.addInitialStateBlock(TRIBE_METADATA_BLOCK); clusterService.addInitialStateBlock(TRIBE_METADATA_BLOCK);
} }
blockIndicesMetadata = settings.getAsArray("tribe.blocks.metadata.indices", Strings.EMPTY_ARRAY);
blockIndicesRead = settings.getAsArray("tribe.blocks.read.indices", Strings.EMPTY_ARRAY);
for (InternalNode node : nodes) { for (InternalNode node : nodes) {
node.injector().getInstance(ClusterService.class).add(new TribeClusterStateListener(node)); node.injector().getInstance(ClusterService.class).add(new TribeClusterStateListener(node));
} }
} }
this.blockIndicesMetadata = blockIndicesMetadata;
this.blockIndicesRead = blockIndicesRead;
this.blockIndicesWrite = blockIndicesWrite;
this.onConflict = settings.get("tribe.on_conflict", ON_CONFLICT_ANY);
} }
@Override @Override
@ -255,6 +280,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
} }
// -- merge metadata // -- merge metadata
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
MetaData.Builder metaData = MetaData.builder(currentState.metaData()); MetaData.Builder metaData = MetaData.builder(currentState.metaData());
RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
// go over existing indices, and see if they need to be removed // go over existing indices, and see if they need to be removed
@ -264,8 +290,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
IndexMetaData tribeIndex = tribeState.metaData().index(index.index()); IndexMetaData tribeIndex = tribeState.metaData().index(index.index());
if (tribeIndex == null) { if (tribeIndex == null) {
logger.info("[{}] removing index [{}]", tribeName, index.index()); logger.info("[{}] removing index [{}]", tribeName, index.index());
metaData.remove(index.index()); removeIndex(blocks, metaData, routingTable, index);
routingTable.remove(index.index());
} else { } else {
// always make sure to update the metadata and routing table, in case // always make sure to update the metadata and routing table, in case
// there are changes in them (new mapping, shards moving from initializing to started) // there are changes in them (new mapping, shards moving from initializing to started)
@ -277,16 +302,62 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
} }
// go over tribe one, and see if they need to be added // go over tribe one, and see if they need to be added
for (IndexMetaData tribeIndex : tribeState.metaData()) { for (IndexMetaData tribeIndex : tribeState.metaData()) {
if (!currentState.metaData().hasIndex(tribeIndex.index())) { // if there is no routing table yet, do nothing with it...
IndexRoutingTable table = tribeState.routingTable().index(tribeIndex.index());
if (table == null) {
continue;
}
if (!currentState.metaData().hasIndex(tribeIndex.index()) && !droppedIndices.contains(tribeIndex.index())) {
// a new index, add it, and add the tribe name as a setting // a new index, add it, and add the tribe name as a setting
logger.info("[{}] adding index [{}]", tribeName, tribeIndex.index()); logger.info("[{}] adding index [{}]", tribeName, tribeIndex.index());
Settings tribeSettings = ImmutableSettings.builder().put(tribeIndex.settings()).put(TRIBE_NAME, tribeName).build(); addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex);
metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings)); } else {
routingTable.add(tribeState.routingTable().index(tribeIndex.index())); String existingFromTribe = currentState.metaData().index(tribeIndex.index()).getSettings().get(TRIBE_NAME);
if (!tribeName.equals(existingFromTribe)) {
// we have a potential conflict on index names, decide what to do...
if (ON_CONFLICT_ANY.equals(onConflict)) {
// we chose any tribe, carry on
} else if (ON_CONFLICT_DROP.equals(onConflict)) {
// drop the indices, there is a conflict
logger.info("[{}] dropping index [{}] due to conflict with [{}]", tribeName, tribeIndex.index(), existingFromTribe);
removeIndex(blocks, metaData, routingTable, tribeIndex);
droppedIndices.add(tribeIndex.index());
} else if (onConflict.startsWith(ON_CONFLICT_PREFER)) {
// on conflict, prefer a tribe...
String preferredTribeName = onConflict.substring(ON_CONFLICT_PREFER.length());
if (tribeName.equals(preferredTribeName)) {
// the new one is hte preferred one, replace...
logger.info("[{}] adding index [{}], preferred over [{}]", tribeName, tribeIndex.index(), existingFromTribe);
removeIndex(blocks, metaData, routingTable, tribeIndex);
addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex);
} // else: either the existing one is the preferred one, or we haven't seen one, carry on
}
}
} }
} }
return ClusterState.builder(currentState).nodes(nodes).metaData(metaData).routingTable(routingTable).build(); return ClusterState.builder(currentState).blocks(blocks).nodes(nodes).metaData(metaData).routingTable(routingTable).build();
}
private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData index) {
metaData.remove(index.index());
routingTable.remove(index.index());
blocks.removeIndexBlocks(index.index());
}
private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData tribeIndex) {
Settings tribeSettings = ImmutableSettings.builder().put(tribeIndex.settings()).put(TRIBE_NAME, tribeName).build();
metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
routingTable.add(tribeState.routingTable().index(tribeIndex.index()));
if (Regex.simpleMatch(blockIndicesMetadata, tribeIndex.index())) {
blocks.addIndexBlock(tribeIndex.index(), IndexMetaData.INDEX_METADATA_BLOCK);
}
if (Regex.simpleMatch(blockIndicesRead, tribeIndex.index())) {
blocks.addIndexBlock(tribeIndex.index(), IndexMetaData.INDEX_READ_BLOCK);
}
if (Regex.simpleMatch(blockIndicesWrite, tribeIndex.index())) {
blocks.addIndexBlock(tribeIndex.index(), IndexMetaData.INDEX_WRITE_BLOCK);
}
} }
@Override @Override

View File

@ -23,8 +23,10 @@ import com.google.common.base.Predicate;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.discovery.MasterNotDiscoveredException;
@ -33,9 +35,12 @@ import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.TestCluster; import org.elasticsearch.test.TestCluster;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -45,39 +50,180 @@ import static org.hamcrest.Matchers.equalTo;
*/ */
public class TribeTests extends ElasticsearchIntegrationTest { public class TribeTests extends ElasticsearchIntegrationTest {
private TestCluster cluster2; private static TestCluster cluster2;
private Node tribeNode; private Node tribeNode;
private Client tribeClient; private Client tribeClient;
@Before @BeforeClass
public void setupSecondCluster() { public static void setupSecondCluster() throws Exception {
ElasticsearchIntegrationTest.beforeClass();
// create another cluster // create another cluster
cluster2 = new TestCluster(randomLong(), 2, 2, cluster().getClusterName() + "-2"); cluster2 = new TestCluster(randomLong(), 2, 2, Strings.randomBase64UUID(getRandom()));
cluster2.beforeTest(getRandom(), getPerTestTransportClientRatio()); cluster2.beforeTest(getRandom(), 0.1);
cluster2.ensureAtLeastNumNodes(2); cluster2.ensureAtLeastNumNodes(2);
}
Settings settings = ImmutableSettings.builder() @AfterClass
public static void tearDownSecondCluster() {
if (cluster2 != null) {
cluster2.afterTest();
cluster2.close();
}
}
@After
public void tearDownTribeNode() {
if (cluster2 != null) {
cluster2.client().admin().indices().prepareDelete("_all").execute().actionGet();
}
if (tribeNode != null) {
tribeNode.close();
}
}
private void setupTribeNode(Settings settings) {
Settings merged = ImmutableSettings.builder()
.put("tribe.t1.cluster.name", cluster().getClusterName()) .put("tribe.t1.cluster.name", cluster().getClusterName())
.put("tribe.t2.cluster.name", cluster2.getClusterName()) .put("tribe.t2.cluster.name", cluster2.getClusterName())
.put("tribe.blocks.write", false) .put("tribe.blocks.write", false)
.put("tribe.blocks.read", false) .put("tribe.blocks.read", false)
.put(settings)
.build(); .build();
tribeNode = NodeBuilder.nodeBuilder() tribeNode = NodeBuilder.nodeBuilder()
.settings(settings) .settings(merged)
.node(); .node();
tribeClient = tribeNode.client(); tribeClient = tribeNode.client();
} }
@After @Test
public void tearDownSecondCluster() { public void testGlobalReadWriteBlocks() throws Exception {
tribeNode.close(); logger.info("create 2 indices, test1 on t1, and test2 on t2");
cluster2.afterTest(); cluster().client().admin().indices().prepareCreate("test1").get();
cluster2.close(); cluster2.client().admin().indices().prepareCreate("test2").get();
setupTribeNode(ImmutableSettings.builder()
.put("tribe.blocks.write", true)
.put("tribe.blocks.metadata", true)
.build());
logger.info("wait till tribe has the same nodes as the 2 clusters");
awaitSameNodeCounts();
// wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state
logger.info("wait till test1 and test2 exists in the tribe node state");
awaitIndicesInClusterState("test1", "test2");
try {
tribeClient.prepareIndex("test1", "type1", "1").setSource("field1", "value1").execute().actionGet();
fail("cluster block should be thrown");
} catch (ClusterBlockException e) {
// all is well!
}
try {
tribeClient.admin().indices().prepareOptimize("test1").execute().actionGet();
fail("cluster block should be thrown");
} catch (ClusterBlockException e) {
// all is well!
}
try {
tribeClient.admin().indices().prepareOptimize("test2").execute().actionGet();
fail("cluster block should be thrown");
} catch (ClusterBlockException e) {
// all is well!
}
}
@Test
public void testIndexWriteBlocks() throws Exception {
logger.info("create 2 indices, test1 on t1, and test2 on t2");
cluster().client().admin().indices().prepareCreate("test1").get();
cluster().client().admin().indices().prepareCreate("block_test1").get();
cluster2.client().admin().indices().prepareCreate("test2").get();
cluster2.client().admin().indices().prepareCreate("block_test2").get();
setupTribeNode(ImmutableSettings.builder()
.put("tribe.blocks.write.indices", "block_*")
.build());
logger.info("wait till tribe has the same nodes as the 2 clusters");
awaitSameNodeCounts();
// wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state
logger.info("wait till test1 and test2 exists in the tribe node state");
awaitIndicesInClusterState("test1", "test2", "block_test1", "block_test2");
tribeClient.prepareIndex("test1", "type1", "1").setSource("field1", "value1").get();
try {
tribeClient.prepareIndex("block_test1", "type1", "1").setSource("field1", "value1").get();
fail("cluster block should be thrown");
} catch (ClusterBlockException e) {
// all is well!
}
tribeClient.prepareIndex("test2", "type1", "1").setSource("field1", "value1").get();
try {
tribeClient.prepareIndex("block_test2", "type1", "1").setSource("field1", "value1").get();
fail("cluster block should be thrown");
} catch (ClusterBlockException e) {
// all is well!
}
}
@Test
public void testOnConflictDrop() throws Exception {
logger.info("create 2 indices, test1 on t1, and test2 on t2");
cluster().client().admin().indices().prepareCreate("conflict").get();
cluster2.client().admin().indices().prepareCreate("conflict").get();
cluster().client().admin().indices().prepareCreate("test1").get();
cluster2.client().admin().indices().prepareCreate("test2").get();
setupTribeNode(ImmutableSettings.builder()
.put("tribe.on_conflict", "drop")
.build());
logger.info("wait till tribe has the same nodes as the 2 clusters");
awaitSameNodeCounts();
// wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state
logger.info("wait till test1 and test2 exists in the tribe node state");
awaitIndicesInClusterState("test1", "test2");
assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test1").getSettings().get(TribeService.TRIBE_NAME), equalTo("t1"));
assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test2").getSettings().get(TribeService.TRIBE_NAME), equalTo("t2"));
assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().hasIndex("conflict"), equalTo(false));
}
@Test
public void testOnConflictPrefer() throws Exception {
testOnConflictPrefer(randomBoolean() ? "t1" : "t2");
}
private void testOnConflictPrefer(String tribe) throws Exception {
logger.info("testing preference for tribe {}", tribe);
logger.info("create 2 indices, test1 on t1, and test2 on t2");
cluster().client().admin().indices().prepareCreate("conflict").get();
cluster2.client().admin().indices().prepareCreate("conflict").get();
cluster().client().admin().indices().prepareCreate("test1").get();
cluster2.client().admin().indices().prepareCreate("test2").get();
setupTribeNode(ImmutableSettings.builder()
.put("tribe.on_conflict", "prefer_" + tribe)
.build());
logger.info("wait till tribe has the same nodes as the 2 clusters");
awaitSameNodeCounts();
// wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state
logger.info("wait till test1 and test2 exists in the tribe node state");
awaitIndicesInClusterState("test1", "test2", "conflict");
assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test1").getSettings().get(TribeService.TRIBE_NAME), equalTo("t1"));
assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test2").getSettings().get(TribeService.TRIBE_NAME), equalTo("t2"));
assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("conflict").getSettings().get(TribeService.TRIBE_NAME), equalTo(tribe));
} }
@Test @Test
public void testTribeOnOneCluster() throws Exception { public void testTribeOnOneCluster() throws Exception {
setupTribeNode(ImmutableSettings.EMPTY);
logger.info("create 2 indices, test1 on t1, and test2 on t2"); logger.info("create 2 indices, test1 on t1, and test2 on t2");
cluster().client().admin().indices().prepareCreate("test1").get(); cluster().client().admin().indices().prepareCreate("test1").get();
cluster2.client().admin().indices().prepareCreate("test2").get(); cluster2.client().admin().indices().prepareCreate("test2").get();
@ -85,14 +231,7 @@ public class TribeTests extends ElasticsearchIntegrationTest {
// wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state // wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state
logger.info("wait till test1 and test2 exists in the tribe node state"); logger.info("wait till test1 and test2 exists in the tribe node state");
awaitBusy(new Predicate<Object>() { awaitIndicesInClusterState("test1", "test2");
@Override
public boolean apply(Object o) {
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState();
return tribeState.getMetaData().hasIndex("test1") && tribeState.getMetaData().hasIndex("test2") &&
tribeState.getRoutingTable().hasIndex("test1") && tribeState.getRoutingTable().hasIndex("test2");
}
});
logger.info("wait till tribe has the same nodes as the 2 clusters"); logger.info("wait till tribe has the same nodes as the 2 clusters");
awaitSameNodeCounts(); awaitSameNodeCounts();
@ -159,6 +298,24 @@ public class TribeTests extends ElasticsearchIntegrationTest {
awaitSameNodeCounts(); awaitSameNodeCounts();
} }
private void awaitIndicesInClusterState(final String... indices) throws Exception {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState();
for (String index : indices) {
if (!tribeState.getMetaData().hasIndex(index)) {
return false;
}
if (!tribeState.getRoutingTable().hasIndex(index)) {
return false;
}
}
return true;
}
});
}
private void awaitSameNodeCounts() throws Exception { private void awaitSameNodeCounts() throws Exception {
awaitBusy(new Predicate<Object>() { awaitBusy(new Predicate<Object>() {
@Override @Override