From febc7f5d4c2158574d9a22c2e73e8092c5516690 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Sat, 22 Aug 2015 18:18:29 +0200 Subject: [PATCH] [TEST] Provide unicast hosts for Tribe nodes in TribeIT --- .../zen/ping/unicast/UnicastZenPing.java | 3 +- .../test/InternalTestCluster.java | 5 --- .../java/org/elasticsearch/tribe/TribeIT.java | 36 ++++++++++++++++++- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 2a29ca5748c..06820a9f066 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -64,6 +64,7 @@ import static org.elasticsearch.discovery.zen.ping.ZenPing.PingResponse.readPing public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPing { public static final String ACTION_NAME = "internal:discovery/zen/unicast"; + public static final String DISCOVERY_ZEN_PING_UNICAST_HOSTS = "discovery.zen.ping.unicast.hosts"; // these limits are per-address public static final int LIMIT_FOREIGN_PORTS_COUNT = 1; @@ -116,7 +117,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen } this.concurrentConnects = this.settings.getAsInt("discovery.zen.ping.unicast.concurrent_connects", 10); - String[] hostArr = this.settings.getAsArray("discovery.zen.ping.unicast.hosts"); + String[] hostArr = this.settings.getAsArray(DISCOVERY_ZEN_PING_UNICAST_HOSTS); // trim the hosts for (int i = 0; i < hostArr.length; i++) { hostArr[i] = hostArr[i].trim(); diff --git a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 783021a0429..f24ac1e3645 100644 --- a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -227,11 +227,6 @@ public final class InternalTestCluster extends TestCluster { private ServiceDisruptionScheme activeDisruptionScheme; private String nodeMode; - public InternalTestCluster(String nodeMode, long clusterSeed, Path baseDir, int minNumDataNodes, int maxNumDataNodes, String clusterName, int numClientNodes, - boolean enableHttpPipelining, String nodePrefix) { - this(nodeMode, clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, DEFAULT_SETTINGS_SOURCE, numClientNodes, enableHttpPipelining, nodePrefix); - } - public InternalTestCluster(String nodeMode, long clusterSeed, Path baseDir, int minNumDataNodes, int maxNumDataNodes, String clusterName, SettingsSource settingsSource, int numClientNodes, boolean enableHttpPipelining, String nodePrefix) { diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index 0f5e8bfc9a1..4847d2b528e 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableMap; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; @@ -32,11 +34,14 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeBuilder; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.SettingsSource; import org.elasticsearch.test.TestCluster; import org.junit.After; import org.junit.AfterClass; @@ -44,6 +49,8 @@ import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -69,8 +76,21 @@ public class TribeIT extends ESIntegTestCase { @BeforeClass public static void setupSecondCluster() throws Exception { ESIntegTestCase.beforeClass(); + SettingsSource source = new SettingsSource() { + @Override + public Settings node(int nodeOrdinal) { + final int base = InternalTestCluster.BASE_PORT + 1000; + return Settings.builder().put("transport.tcp.port", base + "-" + (base + 100)).build(); + } + + @Override + public Settings transportClient() { + return node(0); + } + }; // create another cluster - cluster2 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), randomLong(), createTempDir(), 2, 2, Strings.randomBase64UUID(getRandom()), 0, false, SECOND_CLUSTER_NODE_PREFIX); + cluster2 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), randomLong(), createTempDir(), 2, 2, Strings.randomBase64UUID(getRandom()), source, 0, false, SECOND_CLUSTER_NODE_PREFIX); + cluster2.beforeTest(getRandom(), 0.1); cluster2.ensureAtLeastNumDataNodes(2); } @@ -109,6 +129,10 @@ public class TribeIT extends ESIntegTestCase { tribe1Defaults.put("tribe.t1." + entry.getKey(), entry.getValue()); tribe2Defaults.put("tribe.t2." + entry.getKey(), entry.getValue()); } + // give each tribe it's unicast hosts to connect to + tribe1Defaults.putArray("tribe.t1." + UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS, getUnicastHosts(internalCluster().client())); + tribe1Defaults.putArray("tribe.t2." + UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS, getUnicastHosts(cluster2.client())); + Settings merged = Settings.builder() .put("tribe.t1.cluster.name", internalCluster().getClusterName()) .put("tribe.t2.cluster.name", cluster2.getClusterName()) @@ -421,4 +445,14 @@ public class TribeIT extends ESIntegTestCase { } return count; } + + public String[] getUnicastHosts(Client client) { + ArrayList unicastHosts = new ArrayList<>(); + NodesInfoResponse nodeInfos = client.admin().cluster().prepareNodesInfo().clear().setTransport(true).get(); + for (NodeInfo info : nodeInfos.getNodes()) { + TransportAddress address = info.getTransport().getAddress().publishAddress(); + unicastHosts.add(address.getAddress() + ":" + address.getPort()); + } + return unicastHosts.toArray(new String[unicastHosts.size()]); + } } \ No newline at end of file