diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 35826f5461d..3ac70271ce4 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -972,7 +972,6 @@ <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]geo[/\\]RandomShapeGenerator.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]hamcrest[/\\]ElasticsearchGeoAssertions.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]timestamp[/\\]SimpleTimestampIT.java" checks="LineLength" /> - <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]tribe[/\\]TribeIT.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ttl[/\\]SimpleTTLIT.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]update[/\\]UpdateIT.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]validate[/\\]SimpleValidateQueryIT.java" checks="LineLength" /> diff --git a/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java b/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java index 3928c95bcf8..56e18d5335e 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java +++ b/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java @@ -109,7 +109,7 @@ public class SimpleThreadPoolIT extends ESIntegTestCase { String nodePrefix = "(" + Pattern.quote(InternalTestCluster.TRANSPORT_CLIENT_PREFIX) + ")?(" + Pattern.quote(ESIntegTestCase.SUITE_CLUSTER_NODE_PREFIX) + "|" + Pattern.quote(ESIntegTestCase.TEST_CLUSTER_NODE_PREFIX) + "|" + - Pattern.quote(TribeIT.SECOND_CLUSTER_NODE_PREFIX) + ")"; + Pattern.quote("node_tribe2") + ")"; assertThat(threadName, RegexMatcher.matches("\\[" + nodePrefix + "\\d+\\]")); } } diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index 8a5bbc54031..440859dce44 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -19,433 +19,492 @@ package org.elasticsearch.tribe; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Priority; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.discovery.DiscoveryModule; +import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; import org.elasticsearch.node.Node; -import org.elasticsearch.node.NodeValidationException; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.NodeConfigurationSource; -import org.elasticsearch.test.TestCluster; +import org.elasticsearch.transport.Transport; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import java.io.IOException; -import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; -import java.util.Map; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import static java.util.stream.Collectors.toSet; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.core.Is.is; /** * Note, when talking to tribe client, no need to set the local flag on master read operations, it * does it by default. */ -@LuceneTestCase.SuppressFileSystems("ExtrasFS") // doesn't work with potential multi data path from test cluster yet +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0) public class TribeIT extends ESIntegTestCase { - public static final String SECOND_CLUSTER_NODE_PREFIX = "node_tribe2"; + private static final String TRIBE_NODE = "tribe_node"; + private static InternalTestCluster cluster1; private static InternalTestCluster cluster2; - private Node tribeNode; - private Client tribeClient; + /** + * A predicate that is used to select none of the remote clusters + **/ + private static final Predicate<InternalTestCluster> NONE = c -> false; - @Before - public void setupSecondCluster() throws Exception { - if (cluster2 == null) { - final NodeConfigurationSource configSource = getNodeConfigSource(); - cluster2 = new InternalTestCluster(randomLong(), createTempDir(), true, 2, 2, - UUIDs.randomBase64UUID(random()), configSource, 0, false, SECOND_CLUSTER_NODE_PREFIX, getMockPlugins(), - Function.identity()); - cluster2.beforeTest(random(), 0.1); - cluster2.ensureAtLeastNumDataNodes(2); - } + /** + * A predicate that is used to select the remote cluster 1 only + **/ + private static final Predicate<InternalTestCluster> CLUSTER1_ONLY = c -> c.getClusterName().equals(cluster1.getClusterName()); + + /** + * A predicate that is used to select the remote cluster 2 only + **/ + private static final Predicate<InternalTestCluster> CLUSTER2_ONLY = c -> c.getClusterName().equals(cluster2.getClusterName()); + + /** + * A predicate that is used to select the the two remote clusters + **/ + private static final Predicate<InternalTestCluster> ALL = c -> true; + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + // Required to delete _all indices on remote clusters + .put(DestructiveOperations.REQUIRES_NAME_SETTING.getKey(), false) + .build(); } - @AfterClass - public static void tearDownSecondCluster() { - if (cluster2 != null) { - try { - cluster2.close(); - } finally { - cluster2 = null; - } + @Override + protected Collection<Class<? extends Plugin>> nodePlugins() { + return getMockPlugins(); + } + + @Before + public void startRemoteClusters() { + final int minNumDataNodes = 2; + final int maxNumDataNodes = 4; + final NodeConfigurationSource nodeConfigurationSource = getNodeConfigSource(); + final Collection<Class<? extends Plugin>> plugins = nodePlugins(); + + if (cluster1 == null) { + cluster1 = new InternalTestCluster(randomLong(), createTempDir(), true, minNumDataNodes, maxNumDataNodes, + UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "cluster_1", + plugins, Function.identity()); } + + if (cluster2 == null) { + cluster2 = new InternalTestCluster(randomLong(), createTempDir(), true, minNumDataNodes, maxNumDataNodes, + UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "cluster_2", + plugins, Function.identity()); + } + + doWithAllClusters(c -> { + try { + c.beforeTest(random(), 0.1); + c.ensureAtLeastNumDataNodes(minNumDataNodes); + } catch (Exception e) { + throw new RuntimeException("Failed to set up remote cluster [" + c.getClusterName() + "]", e); + } + }); } @After - public void tearDownTribeNode() throws IOException { - if (cluster2 != null) { + public void wipeRemoteClusters() { + doWithAllClusters(c -> { + final String clusterName = c.getClusterName(); try { - cluster2.wipe(Collections.<String>emptySet()); - } finally { - cluster2.afterTest(); + c.client().admin().indices().prepareDelete(MetaData.ALL).get(); + c.afterTest(); + } catch (IOException e) { + throw new RuntimeException("Failed to clean up remote cluster [" + clusterName + "]", e); } - } - if (tribeNode != null) { - tribeNode.close(); - tribeNode = null; + }); + } + + @AfterClass + public static void stopRemoteClusters() { + try { + doWithAllClusters(InternalTestCluster::close); + } finally { + cluster1 = null; + cluster2 = null; } } - private void setupTribeNode(Settings settings) throws NodeValidationException { - Map<String,String> asMap = internalCluster().getDefaultSettings().getAsMap(); - Settings.Builder tribe1Defaults = Settings.builder(); - Settings.Builder tribe2Defaults = Settings.builder(); - for (Map.Entry<String, String> entry : asMap.entrySet()) { - if (entry.getKey().startsWith("path.")) { - continue; + private Releasable startTribeNode() throws Exception { + return startTribeNode(ALL, Settings.EMPTY); + } + + private Releasable startTribeNode(Predicate<InternalTestCluster> filter, Settings settings) throws Exception { + final String node = internalCluster().startNode(createTribeSettings(filter).put(settings).build()); + return () -> { + try { + while(internalCluster().getNodeNames().length > 0) { + internalCluster().stopRandomNode(s -> true); + } + } catch (Exception e) { + throw new RuntimeException("Failed to close tribe node [" + node + "]", e); } - tribe1Defaults.put("tribe.t1." + entry.getKey(), entry.getValue()); - tribe2Defaults.put("tribe.t2." + entry.getKey(), entry.getValue()); - } - // give each tribe it's unicast hosts to connect to - tribe1Defaults.putArray("tribe.t1." + UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey(), getUnicastHosts(internalCluster().client())); - tribe1Defaults.putArray("tribe.t2." + UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey(), getUnicastHosts(cluster2.client())); + }; + } - Settings merged = Settings.builder() - .put(internalCluster().getDefaultSettings()) - .put("tribe.t1.cluster.name", internalCluster().getClusterName()) - .put("tribe.t2.cluster.name", cluster2.getClusterName()) - .put("tribe.t1.transport.type", "local") - .put("tribe.t2.transport.type", "local") - .put("tribe.t1.discovery.type", "local") - .put("tribe.t2.discovery.type", "local") - .put("transport.type", "local") - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") - .put("tribe.blocks.write", false) - .put(NetworkModule.HTTP_ENABLED.getKey(), false) - .put(settings) + private Settings.Builder createTribeSettings(Predicate<InternalTestCluster> filter) { + assertNotNull(filter); - .put(tribe1Defaults.build()) - .put(tribe2Defaults.build()) - .put("node.name", "tribe_node") // make sure we can identify threads from this node - .build(); + final Settings.Builder settings = Settings.builder(); + settings.put(Node.NODE_NAME_SETTING.getKey(), TRIBE_NODE); + settings.put(Node.NODE_DATA_SETTING.getKey(), false); + settings.put(Node.NODE_MASTER_SETTING.getKey(), true); + settings.put(NetworkModule.HTTP_ENABLED.getKey(), false); + settings.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), NetworkModule.LOCAL_TRANSPORT); + settings.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), NetworkModule.LOCAL_TRANSPORT); - tribeNode = new Node(merged).start(); - tribeClient = tribeNode.client(); + doWithAllClusters(filter, c -> { + String tribeSetting = "tribe." + c.getClusterName() + "."; + settings.put(tribeSetting + ClusterName.CLUSTER_NAME_SETTING.getKey(), c.getClusterName()); + settings.put(tribeSetting + DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "100ms"); + settings.put(tribeSetting + NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), NetworkModule.LOCAL_TRANSPORT); + settings.put(tribeSetting + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), NetworkModule.LOCAL_TRANSPORT); + + Set<String> hosts = new HashSet<>(); + for (Transport transport : c.getInstances(Transport.class)) { + TransportAddress address = transport.boundAddress().publishAddress(); + hosts.add(address.getHost() + ":" + address.getPort()); + } + settings.putArray(tribeSetting + UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey(), + hosts.toArray(new String[hosts.size()])); + }); + + return settings; } public void testGlobalReadWriteBlocks() throws Exception { - logger.info("create 2 indices, test1 on t1, and test2 on t2"); - internalCluster().client().admin().indices().prepareCreate("test1").get(); - assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); - - - setupTribeNode(Settings.builder() + Settings additionalSettings = Settings.builder() .put("tribe.blocks.write", true) .put("tribe.blocks.metadata", true) - .build()); + .build(); - logger.info("wait till tribe has the same nodes as the 2 clusters"); - awaitSameNodeCounts(); - // wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state - logger.info("wait till test1 and test2 exists in the tribe node state"); - awaitIndicesInClusterState("test1", "test2"); + try (Releasable tribeNode = startTribeNode(ALL, additionalSettings)) { + // Creates 2 indices, test1 on cluster1 and test2 on cluster2 + assertAcked(cluster1.client().admin().indices().prepareCreate("test1")); + ensureGreen(cluster1.client()); - try { - tribeClient.prepareIndex("test1", "type1", "1").setSource("field1", "value1").execute().actionGet(); - fail("cluster block should be thrown"); - } catch (ClusterBlockException e) { - // all is well! - } - try { - tribeClient.admin().indices().prepareForceMerge("test1").execute().actionGet(); - fail("cluster block should be thrown"); - } catch (ClusterBlockException e) { - // all is well! - } - try { - tribeClient.admin().indices().prepareForceMerge("test2").execute().actionGet(); - fail("cluster block should be thrown"); - } catch (ClusterBlockException e) { - // all is well! + assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); + ensureGreen(cluster2.client()); + + // Wait for the tribe node to connect to the two remote clusters + assertNodes(ALL); + + // Wait for the tribe node to retrieve the indices into its cluster state + assertIndicesExist(client(), "test1", "test2"); + + // Writes not allowed through the tribe node + ClusterBlockException e = expectThrows(ClusterBlockException.class, () -> { + client().prepareIndex("test1", "type1").setSource("field", "value").get(); + }); + assertThat(e.getMessage(), containsString("blocked by: [BAD_REQUEST/11/tribe node, write not allowed]")); + + e = expectThrows(ClusterBlockException.class, () -> client().prepareIndex("test2", "type2").setSource("field", "value").get()); + assertThat(e.getMessage(), containsString("blocked by: [BAD_REQUEST/11/tribe node, write not allowed]")); + + e = expectThrows(ClusterBlockException.class, () -> client().admin().indices().prepareForceMerge("test1").get()); + assertThat(e.getMessage(), containsString("blocked by: [BAD_REQUEST/10/tribe node, metadata not allowed]")); + + e = expectThrows(ClusterBlockException.class, () -> client().admin().indices().prepareForceMerge("test2").get()); + assertThat(e.getMessage(), containsString("blocked by: [BAD_REQUEST/10/tribe node, metadata not allowed]")); } } public void testIndexWriteBlocks() throws Exception { - logger.info("create 2 indices, test1 on t1, and test2 on t2"); - assertAcked(internalCluster().client().admin().indices().prepareCreate("test1")); - assertAcked(internalCluster().client().admin().indices().prepareCreate("block_test1")); - assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); - assertAcked(cluster2.client().admin().indices().prepareCreate("block_test2")); - - setupTribeNode(Settings.builder() + Settings additionalSettings = Settings.builder() .put("tribe.blocks.write.indices", "block_*") - .build()); - logger.info("wait till tribe has the same nodes as the 2 clusters"); - awaitSameNodeCounts(); - // wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state - logger.info("wait till test1 and test2 exists in the tribe node state"); - awaitIndicesInClusterState("test1", "test2", "block_test1", "block_test2"); + .build(); - tribeClient.prepareIndex("test1", "type1", "1").setSource("field1", "value1").get(); - try { - tribeClient.prepareIndex("block_test1", "type1", "1").setSource("field1", "value1").get(); - fail("cluster block should be thrown"); - } catch (ClusterBlockException e) { - // all is well! - } + try (Releasable tribeNode = startTribeNode(ALL, additionalSettings)) { + // Creates 2 indices on each remote cluster, test1 and block_test1 on cluster1 and test2 and block_test2 on cluster2 + assertAcked(cluster1.client().admin().indices().prepareCreate("test1")); + assertAcked(cluster1.client().admin().indices().prepareCreate("block_test1")); + ensureGreen(cluster1.client()); - tribeClient.prepareIndex("test2", "type1", "1").setSource("field1", "value1").get(); - try { - tribeClient.prepareIndex("block_test2", "type1", "1").setSource("field1", "value1").get(); - fail("cluster block should be thrown"); - } catch (ClusterBlockException e) { - // all is well! + assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); + assertAcked(cluster2.client().admin().indices().prepareCreate("block_test2")); + ensureGreen(cluster2.client()); + + // Wait for the tribe node to connect to the two remote clusters + assertNodes(ALL); + + // Wait for the tribe node to retrieve the indices into its cluster state + assertIndicesExist(client(), "test1", "test2", "block_test1", "block_test2"); + + // Writes allowed through the tribe node for test1/test2 indices + client().prepareIndex("test1", "type1").setSource("field", "value").get(); + client().prepareIndex("test2", "type2").setSource("field", "value").get(); + + ClusterBlockException e; + e = expectThrows(ClusterBlockException.class, () -> client().prepareIndex("block_test1", "type1").setSource("foo", 0).get()); + assertThat(e.getMessage(), containsString("blocked by: [FORBIDDEN/8/index write (api)]")); + + e = expectThrows(ClusterBlockException.class, () -> client().prepareIndex("block_test2", "type2").setSource("foo", 0).get()); + assertThat(e.getMessage(), containsString("blocked by: [FORBIDDEN/8/index write (api)]")); } } public void testOnConflictDrop() throws Exception { - logger.info("create 2 indices, test1 on t1, and test2 on t2"); - assertAcked(cluster().client().admin().indices().prepareCreate("conflict")); - assertAcked(cluster2.client().admin().indices().prepareCreate("conflict")); - assertAcked(cluster().client().admin().indices().prepareCreate("test1")); - assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); - - setupTribeNode(Settings.builder() + Settings additionalSettings = Settings.builder() .put("tribe.on_conflict", "drop") - .build()); + .build(); - logger.info("wait till tribe has the same nodes as the 2 clusters"); - awaitSameNodeCounts(); + try (Releasable tribeNode = startTribeNode(ALL, additionalSettings)) { + // Creates 2 indices on each remote cluster, test1 and conflict on cluster1 and test2 and also conflict on cluster2 + assertAcked(cluster1.client().admin().indices().prepareCreate("test1")); + assertAcked(cluster1.client().admin().indices().prepareCreate("conflict")); + ensureGreen(cluster1.client()); - // wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state - logger.info("wait till test1 and test2 exists in the tribe node state"); - awaitIndicesInClusterState("test1", "test2"); + assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); + assertAcked(cluster2.client().admin().indices().prepareCreate("conflict")); + ensureGreen(cluster2.client()); - assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test1").getSettings().get("tribe.name"), equalTo("t1")); - assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test2").getSettings().get("tribe.name"), equalTo("t2")); - assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().hasIndex("conflict"), equalTo(false)); + // Wait for the tribe node to connect to the two remote clusters + assertNodes(ALL); + + // Wait for the tribe node to retrieve the indices into its cluster state + assertIndicesExist(client(), "test1", "test2"); + + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + assertThat(clusterState.getMetaData().hasIndex("test1"), is(true)); + assertThat(clusterState.getMetaData().index("test1").getSettings().get("tribe.name"), equalTo(cluster1.getClusterName())); + assertThat(clusterState.getMetaData().hasIndex("test2"), is(true)); + assertThat(clusterState.getMetaData().index("test2").getSettings().get("tribe.name"), equalTo(cluster2.getClusterName())); + assertThat(clusterState.getMetaData().hasIndex("conflict"), is(false)); + } } public void testOnConflictPrefer() throws Exception { - testOnConflictPrefer(randomBoolean() ? "t1" : "t2"); - } + final String preference = randomFrom(cluster1, cluster2).getClusterName(); + Settings additionalSettings = Settings.builder() + .put("tribe.on_conflict", "prefer_" + preference) + .build(); - private void testOnConflictPrefer(String tribe) throws Exception { - logger.info("testing preference for tribe {}", tribe); + try (Releasable tribeNode = startTribeNode(ALL, additionalSettings)) { + assertAcked(cluster1.client().admin().indices().prepareCreate("test1")); + assertAcked(cluster1.client().admin().indices().prepareCreate("shared")); + ensureGreen(cluster1.client()); - logger.info("create 2 indices, test1 on t1, and test2 on t2"); - assertAcked(internalCluster().client().admin().indices().prepareCreate("conflict")); - assertAcked(cluster2.client().admin().indices().prepareCreate("conflict")); - assertAcked(internalCluster().client().admin().indices().prepareCreate("test1")); - assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); + assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); + assertAcked(cluster2.client().admin().indices().prepareCreate("shared")); + ensureGreen(cluster2.client()); - setupTribeNode(Settings.builder() - .put("tribe.on_conflict", "prefer_" + tribe) - .build()); - logger.info("wait till tribe has the same nodes as the 2 clusters"); - awaitSameNodeCounts(); - // wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state - logger.info("wait till test1 and test2 exists in the tribe node state"); - awaitIndicesInClusterState("test1", "test2", "conflict"); + // Wait for the tribe node to connect to the two remote clusters + assertNodes(ALL); - assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test1").getSettings().get("tribe.name"), equalTo("t1")); - assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("test2").getSettings().get("tribe.name"), equalTo("t2")); - assertThat(tribeClient.admin().cluster().prepareState().get().getState().getMetaData().index("conflict").getSettings().get("tribe.name"), equalTo(tribe)); + // Wait for the tribe node to retrieve the indices into its cluster state + assertIndicesExist(client(), "test1", "test2", "shared"); + + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + assertThat(clusterState.getMetaData().hasIndex("test1"), is(true)); + assertThat(clusterState.getMetaData().index("test1").getSettings().get("tribe.name"), equalTo(cluster1.getClusterName())); + assertThat(clusterState.getMetaData().hasIndex("test2"), is(true)); + assertThat(clusterState.getMetaData().index("test2").getSettings().get("tribe.name"), equalTo(cluster2.getClusterName())); + assertThat(clusterState.getMetaData().hasIndex("shared"), is(true)); + assertThat(clusterState.getMetaData().index("shared").getSettings().get("tribe.name"), equalTo(preference)); + } } public void testTribeOnOneCluster() throws Exception { - setupTribeNode(Settings.EMPTY); - logger.info("create 2 indices, test1 on t1, and test2 on t2"); - assertAcked(internalCluster().client().admin().indices().prepareCreate("test1")); - assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); + try (Releasable tribeNode = startTribeNode()) { + // Creates 2 indices, test1 on cluster1 and test2 on cluster2 + assertAcked(cluster1.client().admin().indices().prepareCreate("test1")); + ensureGreen(cluster1.client()); + assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); + ensureGreen(cluster2.client()); - // wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state - logger.info("wait till test1 and test2 exists in the tribe node state"); - awaitIndicesInClusterState("test1", "test2"); + // Wait for the tribe node to connect to the two remote clusters + assertNodes(ALL); - logger.info("wait till tribe has the same nodes as the 2 clusters"); - awaitSameNodeCounts(); + // Wait for the tribe node to retrieve the indices into its cluster state + assertIndicesExist(client(), "test1", "test2"); - assertThat(tribeClient.admin().cluster().prepareHealth().setWaitForGreenStatus().get().getStatus(), equalTo(ClusterHealthStatus.GREEN)); + // Creates two docs using the tribe node + indexRandom(true, + client().prepareIndex("test1", "type1", "1").setSource("field1", "value1"), + client().prepareIndex("test2", "type1", "1").setSource("field1", "value1") + ); - logger.info("create 2 docs through the tribe node"); - tribeClient.prepareIndex("test1", "type1", "1").setSource("field1", "value1").get(); - tribeClient.prepareIndex("test2", "type1", "1").setSource("field1", "value1").get(); - tribeClient.admin().indices().prepareRefresh().get(); + // Verify that documents are searchable using the tribe node + assertHitCount(client().prepareSearch().get(), 2L); - logger.info("verify they are there"); - assertHitCount(tribeClient.prepareSearch().setSize(0).get(), 2L); - assertHitCount(tribeClient.prepareSearch().get(), 2L); - assertBusy(new Runnable() { - @Override - public void run() { - ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState(); - assertThat(tribeState.getMetaData().index("test1").mapping("type1"), notNullValue()); - assertThat(tribeState.getMetaData().index("test2").mapping("type1"), notNullValue()); - } - }); + // Using assertBusy to check that the mappings are in the tribe node cluster state + assertBusy(() -> { + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + assertThat(clusterState.getMetaData().index("test1").mapping("type1"), notNullValue()); + assertThat(clusterState.getMetaData().index("test2").mapping("type1"), notNullValue()); + }); + // More documents with another type + indexRandom(true, + client().prepareIndex("test1", "type2", "1").setSource("field1", "value1"), + client().prepareIndex("test2", "type2", "1").setSource("field1", "value1") + ); + assertHitCount(client().prepareSearch().get(), 4L); + assertBusy(() -> { + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + assertThat(clusterState.getMetaData().index("test1").mapping("type1"), notNullValue()); + assertThat(clusterState.getMetaData().index("test1").mapping("type2"), notNullValue()); - logger.info("write to another type"); - tribeClient.prepareIndex("test1", "type2", "1").setSource("field1", "value1").get(); - tribeClient.prepareIndex("test2", "type2", "1").setSource("field1", "value1").get(); - assertNoFailures(tribeClient.admin().indices().prepareRefresh().get()); + assertThat(clusterState.getMetaData().index("test2").mapping("type1"), notNullValue()); + assertThat(clusterState.getMetaData().index("test2").mapping("type2"), notNullValue()); + }); + // Make sure master level write operations fail... (we don't really have a master) + expectThrows(MasterNotDiscoveredException.class, () -> { + client().admin().indices().prepareCreate("tribe_index").setMasterNodeTimeout("10ms").get(); + }); - logger.info("verify they are there"); - assertHitCount(tribeClient.prepareSearch().setSize(0).get(), 4L); - assertHitCount(tribeClient.prepareSearch().get(), 4L); - assertBusy(new Runnable() { - @Override - public void run() { - ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState(); - assertThat(tribeState.getMetaData().index("test1").mapping("type1"), notNullValue()); - assertThat(tribeState.getMetaData().index("test1").mapping("type2"), notNullValue()); - assertThat(tribeState.getMetaData().index("test2").mapping("type1"), notNullValue()); - assertThat(tribeState.getMetaData().index("test2").mapping("type2"), notNullValue()); - } - }); - - logger.info("make sure master level write operations fail... (we don't really have a master)"); - try { - tribeClient.admin().indices().prepareCreate("tribe_index").setMasterNodeTimeout("10ms").get(); - fail(); - } catch (MasterNotDiscoveredException e) { - // all is well! - } - - logger.info("delete an index, and make sure its reflected"); - cluster2.client().admin().indices().prepareDelete("test2").get(); - awaitIndicesNotInClusterState("test2"); - - try { - logger.info("stop a node, make sure its reflected"); - cluster2.stopRandomDataNode(); - awaitSameNodeCounts(); - } finally { - cluster2.startNode(); - awaitSameNodeCounts(); + // Now delete an index and makes sure it's reflected in cluster state + cluster2.client().admin().indices().prepareDelete("test2").get(); + assertBusy(() -> { + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + assertFalse(clusterState.getMetaData().hasIndex("test2")); + assertFalse(clusterState.getRoutingTable().hasIndex("test2")); + }); } } public void testCloseAndOpenIndex() throws Exception { - //create an index and close it even before starting the tribe node - assertAcked(internalCluster().client().admin().indices().prepareCreate("test1")); - ensureGreen(internalCluster()); - assertAcked(internalCluster().client().admin().indices().prepareClose("test1")); + // Creates an index on remote cluster 1 + assertTrue(cluster1.client().admin().indices().prepareCreate("first").get().isAcknowledged()); + ensureGreen(cluster1.client()); - setupTribeNode(Settings.EMPTY); - awaitSameNodeCounts(); + // Closes the index + assertTrue(cluster1.client().admin().indices().prepareClose("first").get().isAcknowledged()); - //the closed index is not part of the tribe node cluster state - ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState(); - assertThat(tribeState.getMetaData().hasIndex("test1"), equalTo(false)); + try (Releasable tribeNode = startTribeNode()) { + // Wait for the tribe node to connect to the two remote clusters + assertNodes(ALL); - //open the index, it becomes part of the tribe node cluster state - assertAcked(internalCluster().client().admin().indices().prepareOpen("test1")); - awaitIndicesInClusterState("test1"); - ensureGreen(internalCluster()); + // The closed index is not part of the tribe node cluster state + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + assertFalse(clusterState.getMetaData().hasIndex("first")); - //create a second index, wait till it is seen from within the tribe node - assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); - awaitIndicesInClusterState("test1", "test2"); - ensureGreen(cluster2); + // Open the index, it becomes part of the tribe node cluster state + assertTrue(cluster1.client().admin().indices().prepareOpen("first").get().isAcknowledged()); + assertIndicesExist(client(), "first"); - //close the second index, wait till it gets removed from the tribe node cluster state - assertAcked(cluster2.client().admin().indices().prepareClose("test2")); - awaitIndicesNotInClusterState("test2"); + // Create a second index, wait till it is seen from within the tribe node + assertTrue(cluster2.client().admin().indices().prepareCreate("second").get().isAcknowledged()); + assertIndicesExist(client(), "first", "second"); + ensureGreen(cluster2.client()); - //open the second index, wait till it gets added back to the tribe node cluster state - assertAcked(cluster2.client().admin().indices().prepareOpen("test2")); - awaitIndicesInClusterState("test1", "test2"); - ensureGreen(cluster2); + // Close the second index, wait till it gets removed from the tribe node cluster state + assertTrue(cluster2.client().admin().indices().prepareClose("second").get().isAcknowledged()); + assertIndicesExist(client(), "first"); + + // Open the second index, wait till it gets added back to the tribe node cluster state + assertTrue(cluster2.client().admin().indices().prepareOpen("second").get().isAcknowledged()); + assertIndicesExist(client(), "first", "second"); + ensureGreen(cluster2.client()); + } } - private void awaitIndicesInClusterState(final String... indices) throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState(); - for (String index : indices) { - assertTrue(tribeState.getMetaData().hasIndex(index)); - assertTrue(tribeState.getRoutingTable().hasIndex(index)); - } + /** + * Test that the tribe node's cluster state correctly reflect the number of nodes + * of the remote clusters the tribe node is connected to. + */ + public void testClusterStateNodes() throws Exception { + List<Predicate<InternalTestCluster>> predicates = Arrays.asList(NONE, CLUSTER1_ONLY, CLUSTER2_ONLY, ALL); + Collections.shuffle(predicates, random()); + + for (Predicate<InternalTestCluster> predicate : predicates) { + try (Releasable tribeNode = startTribeNode(predicate, Settings.EMPTY)) { + assertNodes(predicate); + } + } + } + + private void assertIndicesExist(Client client, String... indices) throws Exception { + assertBusy(() -> { + ClusterState state = client.admin().cluster().prepareState().setRoutingTable(true).setMetaData(true).get().getState(); + assertThat(state.getMetaData().getIndices().size(), equalTo(indices.length)); + for (String index : indices) { + assertTrue(state.getMetaData().hasIndex(index)); + assertTrue(state.getRoutingTable().hasIndex(index)); } }); } - private void awaitIndicesNotInClusterState(final String... indices) throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState(); - for (String index : indices) { - assertFalse(tribeState.getMetaData().hasIndex(index)); - assertFalse(tribeState.getRoutingTable().hasIndex(index)); - } - } + private void ensureGreen(Client client) throws Exception { + assertBusy(() -> { + ClusterHealthResponse clusterHealthResponse = client.admin().cluster() .prepareHealth() + .setWaitForActiveShards(0) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .get(); + assertThat(clusterHealthResponse.getStatus(), equalTo(ClusterHealthStatus.GREEN)); + assertFalse(clusterHealthResponse.isTimedOut()); }); } - private void ensureGreen(TestCluster testCluster) { - ClusterHealthResponse actionGet = testCluster.client().admin().cluster() - .health(Requests.clusterHealthRequest().waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForNoRelocatingShards(true)).actionGet(); - if (actionGet.isTimedOut()) { - logger.info("ensureGreen timed out, cluster state:\n{}\n{}", testCluster.client().admin().cluster().prepareState().get().getState().prettyPrint(), testCluster.client().admin().cluster().preparePendingClusterTasks().get().prettyPrint()); - assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false)); - } - assertThat(actionGet.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - } - - private void awaitSameNodeCounts() throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - DiscoveryNodes tribeNodes = tribeNode.client().admin().cluster().prepareState().get().getState().getNodes(); - assertThat(countDataNodesForTribe("t1", tribeNodes), equalTo(internalCluster().client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size())); - assertThat(countDataNodesForTribe("t2", tribeNodes), equalTo(cluster2.client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size())); + private static void assertNodes(Predicate<InternalTestCluster> filter) throws Exception { + final Set<String> expectedNodes = Sets.newHashSet(internalCluster().getNodeNames()); + doWithAllClusters(filter, c -> { + // Adds the tribe client node dedicated to this remote cluster + for (String tribeNode : internalCluster().getNodeNames()) { + expectedNodes.add(tribeNode + "/" + c.getClusterName()); } + // Adds the remote clusters nodes names + Collections.addAll(expectedNodes, c.getNodeNames()); + }); + + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().setNodes(true).get().getState(); + Set<String> nodes = StreamSupport.stream(state.getNodes().spliterator(), false).map(DiscoveryNode::getName).collect(toSet()); + assertThat(nodes.containsAll(expectedNodes), is(true)); }); } - private int countDataNodesForTribe(String tribeName, DiscoveryNodes nodes) { - int count = 0; - for (DiscoveryNode node : nodes) { - if (!node.isDataNode()) { - continue; - } - if (tribeName.equals(node.getAttributes().get("tribe.name"))) { - count++; - } - } - return count; + private static void doWithAllClusters(Consumer<InternalTestCluster> consumer) { + doWithAllClusters(cluster -> cluster != null, consumer); } - public String[] getUnicastHosts(Client client) { - ArrayList<String> unicastHosts = new ArrayList<>(); - NodesInfoResponse nodeInfos = client.admin().cluster().prepareNodesInfo().clear().setTransport(true).get(); - for (NodeInfo info : nodeInfos.getNodes()) { - TransportAddress address = info.getTransport().getAddress().publishAddress(); - unicastHosts.add(address.getAddress() + ":" + address.getPort()); - } - return unicastHosts.toArray(new String[unicastHosts.size()]); + private static void doWithAllClusters(Predicate<InternalTestCluster> predicate, Consumer<InternalTestCluster> consumer) { + Stream.of(cluster1, cluster2).filter(predicate).forEach(consumer); } }