From 76319b0cd2f382e34526e448617897ab8b00e5cd Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Mon, 13 Jan 2014 11:08:18 -0800 Subject: [PATCH] Tribe Node The tribes feature allowed to create a tribe node that can act as a federated client across multiple clusters. The tribe node configuration looks something like this: ``` tribe.t1.cluster.name: cluster1 tribe.t2.cluster.name: cluster2 ``` The configuration above configure connections to 2 clusters, named `t1`, `t2`. It creates a "node" client to each (so by default, above, multicast discovery is used). The settings for each node client is extracted from the `tribe.[tribe_name]` prefix. The way the tribe node works is by merging the cluster state from each cluster, and creating a merged view of all clusters. This means all operations work the same, distributed search, suggest, percolation, indexing, ... . The merged view drops conflicted indices and picks one of them if there are 2 indices with the same name across multiple clusters. By default, read and write operations are allowed. Master level read operations (cluster state for example), require setting the local flag to true (since there is no elected master). Master level write operations are not allowed (create index, ...). The tribe node can be configured to block write operations `tribe.blocks.write` to `true`, and metadata operations by setting `tribe.blocks.metadata` to `true`. closes #4708 --- .../elasticsearch/cluster/ClusterService.java | 2 + .../service/InternalClusterService.java | 8 + .../common/settings/ImmutableSettings.java | 8 + .../common/settings/Settings.java | 5 + .../discovery/DiscoveryService.java | 34 ++- .../discovery/local/LocalDiscovery.java | 10 +- .../discovery/zen/ZenDiscovery.java | 15 +- .../node/internal/InternalNode.java | 12 +- .../org/elasticsearch/tribe/TribeModule.java | 32 +++ .../org/elasticsearch/tribe/TribeService.java | 267 ++++++++++++++++++ .../test/ElasticsearchIntegrationTest.java | 130 ++++----- .../org/elasticsearch/test/TestCluster.java | 6 +- .../org/elasticsearch/tribe/TribeTests.java | 181 ++++++++++++ 13 files changed, 612 insertions(+), 98 deletions(-) create mode 100644 src/main/java/org/elasticsearch/tribe/TribeModule.java create mode 100644 src/main/java/org/elasticsearch/tribe/TribeService.java create mode 100644 src/test/java/org/elasticsearch/tribe/TribeTests.java diff --git a/src/main/java/org/elasticsearch/cluster/ClusterService.java b/src/main/java/org/elasticsearch/cluster/ClusterService.java index 04584fc2909..bdef076d0be 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterService.java @@ -51,6 +51,8 @@ public interface ClusterService extends LifecycleComponent { */ void addInitialStateBlock(ClusterBlock block) throws ElasticsearchIllegalStateException; + void removeInitialStateBlock(ClusterBlock block) throws ElasticsearchIllegalStateException; + /** * The operation routing. */ diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index a3671fd9752..5a237fd7e3c 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -115,6 +115,14 @@ public class InternalClusterService extends AbstractLifecycleComponent getGroups(String settingPrefix) throws SettingsException { + return getGroups(settingPrefix, false); + } + + @Override + public Map getGroups(String settingPrefix, boolean ignoreNonGrouped) throws SettingsException { if (settingPrefix.charAt(settingPrefix.length() - 1) != '.') { settingPrefix = settingPrefix + "."; } @@ -503,6 +508,9 @@ public class ImmutableSettings implements Settings { String nameValue = setting.substring(settingPrefix.length()); int dotIndex = nameValue.indexOf('.'); if (dotIndex == -1) { + if (ignoreNonGrouped) { + continue; + } throw new SettingsException("Failed to get setting group for [" + settingPrefix + "] setting prefix and setting [" + setting + "] because of a missing '.'"); } String name = nameValue.substring(0, dotIndex); diff --git a/src/main/java/org/elasticsearch/common/settings/Settings.java b/src/main/java/org/elasticsearch/common/settings/Settings.java index 4faac622e59..f5a0cc3dd6a 100644 --- a/src/main/java/org/elasticsearch/common/settings/Settings.java +++ b/src/main/java/org/elasticsearch/common/settings/Settings.java @@ -109,6 +109,11 @@ public interface Settings extends ToXContent { */ Map getGroups(String settingPrefix) throws SettingsException; + /** + * Returns group settings for the given setting prefix. + */ + Map getGroups(String settingPrefix, boolean ignoreNonGrouped) throws SettingsException; + /** * Returns the setting value (as float) associated with the setting key. If it does not exists, * returns the default value provided. diff --git a/src/main/java/org/elasticsearch/discovery/DiscoveryService.java b/src/main/java/org/elasticsearch/discovery/DiscoveryService.java index f58ddd66c5e..f4fff6f7874 100644 --- a/src/main/java/org/elasticsearch/discovery/DiscoveryService.java +++ b/src/main/java/org/elasticsearch/discovery/DiscoveryService.java @@ -22,11 +22,13 @@ package org.elasticsearch.discovery; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -60,17 +62,19 @@ public class DiscoveryService extends AbstractLifecycleComponent 0) { + try { + logger.trace("waiting for {} for the initial state to be set by the discovery", initialStateTimeout); + if (latch.await(initialStateTimeout.millis(), TimeUnit.MILLISECONDS)) { + logger.trace("initial state set from discovery"); + initialStateReceived = true; + } else { + initialStateReceived = false; + logger.warn("waited for {} and no initial state was set by the discovery", initialStateTimeout); + } + } catch (InterruptedException e) { + // ignore } - } catch (InterruptedException e) { - // ignore } } finally { discovery.removeListener(listener); @@ -107,7 +111,7 @@ public class DiscoveryService extends AbstractLifecycleComponent * The {@link org.elasticsearch.discovery.Discovery.AckListener} allows to acknowledge the publish * event based on the response gotten from all nodes */ @@ -116,4 +120,12 @@ public class DiscoveryService extends AbstractLifecycleComponent implem private static final ConcurrentMap clusterGroups = ConcurrentCollections.newConcurrentMap(); - private static final AtomicLong nodeIdGenerator = new AtomicLong(); - @Inject public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService, DiscoveryNodeService discoveryNodeService, Version version) { @@ -112,7 +106,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem clusterGroups.put(clusterName, clusterGroup); } logger.debug("Connected to cluster [{}]", clusterName); - this.localNode = new DiscoveryNode(settings.get("name"), Long.toString(nodeIdGenerator.incrementAndGet()), transportService.boundAddress().publishAddress(), + this.localNode = new DiscoveryNode(settings.get("name"), DiscoveryService.generateNodeId(settings), transportService.boundAddress().publishAddress(), discoveryNodeService.buildAttributes(), version); clusterGroup.members().add(this); diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 03bd71dc744..89a8da10764 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -35,7 +35,6 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.Priority; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.inject.Inject; @@ -45,6 +44,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.discovery.InitialStateDiscoveryListener; import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.fd.MasterFaultDetection; @@ -62,7 +62,6 @@ import org.elasticsearch.transport.*; import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -168,7 +167,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen protected void doStart() throws ElasticsearchException { Map nodeAttributes = discoveryNodeService.buildAttributes(); // note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling - final String nodeId = getNodeUUID(settings); + final String nodeId = DiscoveryService.generateNodeId(settings); localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes, version); latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build(); nodesFD.updateNodes(latestDiscoNodes); @@ -902,14 +901,4 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } } } - - private final String getNodeUUID(Settings settings) { - String seed = settings.get("discovery.id.seed"); - if (seed != null) { - logger.trace("using stable discover node UUIDs with seed: [{}]", seed); - Strings.randomBase64UUID(new Random(Long.parseLong(seed))); - } - return Strings.randomBase64UUID(); - } - } diff --git a/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/src/main/java/org/elasticsearch/node/internal/InternalNode.java index 3a407180f4d..33e5f336c95 100644 --- a/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -93,6 +93,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolModule; import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.tribe.TribeModule; +import org.elasticsearch.tribe.TribeService; import org.elasticsearch.watcher.ResourceWatcherModule; import org.elasticsearch.watcher.ResourceWatcherService; @@ -122,6 +124,7 @@ public final class InternalNode implements Node { public InternalNode(Settings pSettings, boolean loadConfigSettings) throws ElasticsearchException { Tuple tuple = InternalSettingsPreparer.prepareSettings(pSettings, loadConfigSettings); + tuple = new Tuple(TribeService.processSettings(tuple.v1()), tuple.v2()); Version version = Version.CURRENT; @@ -139,7 +142,7 @@ public final class InternalNode implements Node { this.pluginsService = new PluginsService(tuple.v1(), tuple.v2()); this.settings = pluginsService.updatedSettings(); - this.environment = tuple.v2(); + this.environment = new Environment(this.settings()); CompressorFactory.configure(settings); @@ -178,6 +181,7 @@ public final class InternalNode implements Node { modules.add(new PercolatorModule()); modules.add(new ResourceWatcherModule()); modules.add(new RepositoriesModule()); + modules.add(new TribeModule()); injector = modules.createInjector(); @@ -232,6 +236,7 @@ public final class InternalNode implements Node { } injector.getInstance(BulkUdpService.class).start(); injector.getInstance(ResourceWatcherService.class).start(); + injector.getInstance(TribeService.class).start(); logger.info("started"); @@ -246,6 +251,7 @@ public final class InternalNode implements Node { ESLogger logger = Loggers.getLogger(Node.class, settings.get("name")); logger.info("stopping ..."); + injector.getInstance(TribeService.class).stop(); injector.getInstance(BulkUdpService.class).stop(); injector.getInstance(ResourceWatcherService.class).stop(); if (settings.getAsBoolean("http.enabled", true)) { @@ -296,7 +302,9 @@ public final class InternalNode implements Node { logger.info("closing ..."); StopWatch stopWatch = new StopWatch("node_close"); - stopWatch.start("bulk.udp"); + stopWatch.start("tribe"); + injector.getInstance(TribeService.class).close(); + stopWatch.stop().start("bulk.udp"); injector.getInstance(BulkUdpService.class).close(); stopWatch.stop().start("http"); if (settings.getAsBoolean("http.enabled", true)) { diff --git a/src/main/java/org/elasticsearch/tribe/TribeModule.java b/src/main/java/org/elasticsearch/tribe/TribeModule.java new file mode 100644 index 00000000000..fb642d1b034 --- /dev/null +++ b/src/main/java/org/elasticsearch/tribe/TribeModule.java @@ -0,0 +1,32 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.tribe; + +import org.elasticsearch.common.inject.AbstractModule; + +/** + */ +public class TribeModule extends AbstractModule { + + @Override + protected void configure() { + bind(TribeService.class).asEagerSingleton(); + } +} diff --git a/src/main/java/org/elasticsearch/tribe/TribeService.java b/src/main/java/org/elasticsearch/tribe/TribeService.java new file mode 100644 index 00000000000..810e03de44b --- /dev/null +++ b/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -0,0 +1,267 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.tribe; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.node.NodeBuilder; +import org.elasticsearch.node.internal.InternalNode; +import org.elasticsearch.rest.RestStatus; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +/** + * The tribe service holds a list of node clients connected to a list of tribe members, and uses their + * cluster state events to update this local node cluster state with the merged view of it. + *

+ * The {@link #processSettings(org.elasticsearch.common.settings.Settings)} method should be called before + * starting the node, so it will make sure to configure this current node properly with the relevant tribe node + * settings. + *

+ * The tribe node settings make sure the discovery used is "local", but with no master elected. This means no + * write level master node operations will work ({@link org.elasticsearch.discovery.MasterNotDiscoveredException} + * will be thrown), and state level metadata operations should use the local flag. + *

+ * The state merged from different clusters include the list of nodes, metadata, and routing table. Each node merged + * will have in its tribe which tribe member it came from. Each index merged will have in its settings which tribe + * member it came from. In case an index has already been merged from one cluster, and the same name index is discovered + * in another cluster, the conflict one will be discarded. This happens because we need to have the correct index name + * to propagate to the relevant cluster. + */ +public class TribeService extends AbstractLifecycleComponent { + + public static final ClusterBlock TRIBE_METADATA_BLOCK = new ClusterBlock(10, "tribe node, metadata not allowed", false, false, RestStatus.BAD_REQUEST, ClusterBlockLevel.METADATA); + public static final ClusterBlock TRIBE_WRITE_BLOCK = new ClusterBlock(11, "tribe node, write not allowed", false, false, RestStatus.BAD_REQUEST, ClusterBlockLevel.WRITE); + + public static Settings processSettings(Settings settings) { + if (settings.get(TRIBE_NAME) != null) { + // if its a node client started by this service as tribe, remove any tribe group setting + // to avoid recursive configuration + ImmutableSettings.Builder sb = ImmutableSettings.builder().put(settings); + for (String s : settings.getAsMap().keySet()) { + if (s.startsWith("tribe.") && !s.equals(TRIBE_NAME)) { + sb.remove(s); + } + } + return sb.build(); + } + Map nodesSettings = settings.getGroups("tribe", true); + if (nodesSettings.isEmpty()) { + return settings; + } + // its a tribe configured node..., force settings + ImmutableSettings.Builder sb = ImmutableSettings.builder().put(settings); + sb.put("node.client", true); // this node should just act as a node client + sb.put("discovery.type", "local"); // a tribe node should not use zen discovery + sb.put("discovery.initial_state_timeout", 0); // nothing is going to be discovered, since no master will be elected + if (sb.get("cluster.name") == null) { + sb.put("cluster.name", "tribe_" + Strings.randomBase64UUID()); // make sure it won't join other tribe nodes in the same JVM + } + sb.put("gateway.type", "none"); // we shouldn't store anything locally... + return sb.build(); + } + + public static final String TRIBE_NAME = "tribe.name"; + + private final ClusterService clusterService; + + private final List nodes = Lists.newCopyOnWriteArrayList(); + + @Inject + public TribeService(Settings settings, ClusterService clusterService) { + super(settings); + this.clusterService = clusterService; + Map nodesSettings = settings.getGroups("tribe", true); + for (Map.Entry entry : nodesSettings.entrySet()) { + ImmutableSettings.Builder sb = ImmutableSettings.builder().put(entry.getValue()); + sb.put("node.name", settings.get("name") + "/" + entry.getKey()); + sb.put(TRIBE_NAME, entry.getKey()); + if (sb.get("http.enabled") == null) { + sb.put("http.enabled", false); + } + nodes.add((InternalNode) NodeBuilder.nodeBuilder().settings(sb).client(true).build()); + } + + if (!nodes.isEmpty()) { + // remove the initial election / recovery blocks since we are not going to have a + // master elected in this single tribe node local "cluster" + clusterService.removeInitialStateBlock(Discovery.NO_MASTER_BLOCK); + clusterService.removeInitialStateBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK); + if (settings.getAsBoolean("tribe.blocks.write", false)) { + clusterService.addInitialStateBlock(TRIBE_WRITE_BLOCK); + } + if (settings.getAsBoolean("tribe.blocks.metadata", false)) { + clusterService.addInitialStateBlock(TRIBE_METADATA_BLOCK); + } + for (InternalNode node : nodes) { + node.injector().getInstance(ClusterService.class).add(new TribeClusterStateListener(node)); + } + } + } + + @Override + protected void doStart() throws ElasticsearchException { + final CountDownLatch latch = new CountDownLatch(1); + clusterService.submitStateUpdateTask("updating local node id", new ProcessedClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + // add our local node to the mix... + return ClusterState.builder(currentState) + .nodes(DiscoveryNodes.builder(currentState.nodes()).put(clusterService.localNode()).localNodeId(clusterService.localNode().id())) + .build(); + } + + @Override + public void onFailure(String source, Throwable t) { + logger.error("{}", t, source); + latch.countDown(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + }); + try { + latch.await(); + } catch (InterruptedException e) { + // ignore + } + for (InternalNode node : nodes) { + node.start(); + } + } + + @Override + protected void doStop() throws ElasticsearchException { + for (InternalNode node : nodes) { + node.stop(); + } + } + + @Override + protected void doClose() throws ElasticsearchException { + for (InternalNode node : nodes) { + node.close(); + } + } + + class TribeClusterStateListener implements ClusterStateListener { + + private final InternalNode tribeNode; + private final String tribeName; + + TribeClusterStateListener(InternalNode tribeNode) { + this.tribeNode = tribeNode; + this.tribeName = tribeNode.settings().get(TRIBE_NAME); + } + + @Override + public void clusterChanged(final ClusterChangedEvent event) { + logger.debug("[{}] received cluster event, [{}]", tribeName, event.source()); + clusterService.submitStateUpdateTask("cluster event from " + tribeName + ", " + event.source(), new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + ClusterState tribeState = event.state(); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(currentState.nodes()); + // -- merge nodes + // go over existing nodes, and see if they need to be removed + for (DiscoveryNode discoNode : currentState.nodes()) { + String markedTribeName = discoNode.attributes().get(TRIBE_NAME); + if (markedTribeName != null && markedTribeName.equals(tribeName)) { + if (tribeState.nodes().get(discoNode.id()) == null) { + logger.info("[{}] removing node [{}]", tribeName, discoNode); + nodes.remove(discoNode.id()); + } + } + } + // go over tribe nodes, and see if they need to be added + for (DiscoveryNode tribe : tribeState.nodes()) { + if (currentState.nodes().get(tribe.id()) == null) { + // a new node, add it, but also add the tribe name to the attributes + ImmutableMap tribeAttr = MapBuilder.newMapBuilder(tribe.attributes()).put(TRIBE_NAME, tribeName).immutableMap(); + DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(), tribe.address(), tribeAttr, tribe.version()); + logger.info("[{}] adding node [{}]", tribeName, discoNode); + nodes.put(discoNode); + } + } + + // -- merge metadata + MetaData.Builder metaData = MetaData.builder(currentState.metaData()); + RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); + // go over existing indices, and see if they need to be removed + for (IndexMetaData index : currentState.metaData()) { + String markedTribeName = index.settings().get(TRIBE_NAME); + if (markedTribeName != null && markedTribeName.equals(tribeName)) { + IndexMetaData tribeIndex = tribeState.metaData().index(index.index()); + if (tribeIndex == null) { + logger.info("[{}] removing index [{}]", tribeName, index.index()); + metaData.remove(index.index()); + routingTable.remove(index.index()); + } else { + // always make sure to update the metadata and routing table, in case + // there are changes in them (new mapping, shards moving from initializing to started) + routingTable.add(tribeState.routingTable().index(index.index())); + Settings tribeSettings = ImmutableSettings.builder().put(tribeIndex.settings()).put(TRIBE_NAME, tribeName).build(); + metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings)); + } + } + } + // go over tribe one, and see if they need to be added + for (IndexMetaData tribeIndex : tribeState.metaData()) { + if (!currentState.metaData().hasIndex(tribeIndex.index())) { + // a new index, add it, and add the tribe name as a setting + logger.info("[{}] adding index [{}]", tribeName, tribeIndex.index()); + Settings tribeSettings = ImmutableSettings.builder().put(tribeIndex.settings()).put(TRIBE_NAME, tribeName).build(); + metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings)); + routingTable.add(tribeState.routingTable().index(tribeIndex.index())); + } + } + + return ClusterState.builder(currentState).nodes(nodes).metaData(metaData).routingTable(routingTable).build(); + } + + @Override + public void onFailure(String source, Throwable t) { + logger.warn("failed to process [{}]", t, source); + } + }); + } + } +} diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 21a609a1a45..f50345e1c73 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -99,25 +99,25 @@ import static org.hamcrest.Matchers.equalTo; * or {@link Scope#SUITE} should be used. To configure a scope for the test cluster the {@link ClusterScope} annotation * should be used, here is an example: *

- * @ClusterScope(scope=Scope.TEST)
- * public class SomeIntegrationTest extends ElasticsearchIntegrationTest {
- *   @Test
- *   public void testMethod() {}
+ *
+ * @ClusterScope(scope=Scope.TEST) public class SomeIntegrationTest extends ElasticsearchIntegrationTest {
+ * @Test
+ * public void testMethod() {}
  * }
  * 
- * + *

* If no {@link ClusterScope} annotation is present on an integration test the default scope it {@link Scope#GLOBAL} *

* A test cluster creates a set of nodes in the background before the test starts. The number of nodes in the cluster is * determined at random and can change across tests. The minimum number of nodes in the shared global cluster is 2. * For other scopes the {@link ClusterScope} allows configuring the initial number of nodes that are created before * the tests start. - * + *

*

  * @ClusterScope(scope=Scope.SUITE, numNodes=3)
  * public class SomeIntegrationTest extends ElasticsearchIntegrationTest {
- *   @Test
- *   public void testMethod() {}
+ * @Test
+ * public void testMethod() {}
  * }
  * 
*

@@ -125,16 +125,16 @@ import static org.hamcrest.Matchers.equalTo; * each test might use different directory implementation for each test or will return a random client to one of the * nodes in the cluster for each call to {@link #client()}. Test failures might only be reproducible if the correct * system properties are passed to the test execution environment. - * + *

*

- * This class supports the following system properties (passed with -Dkey=value to the application) - *

    - *
  • -D{@value #TESTS_CLIENT_RATIO} - a double value in the interval [0..1] which defines the ration between node and transport clients used
  • - *
  • -D{@value TestCluster#TESTS_CLUSTER_SEED} - a random seed used to initialize the clusters random context. - *
  • -D{@value TestCluster#TESTS_ENABLE_MOCK_MODULES} - a boolean value to enable or disable mock modules. This is - * useful to test the system without asserting modules that to make sure they don't hide any bugs in production.
  • - *
  • -D{@value #INDEX_SEED_SETTING} - a random seed used to initialize the index random context. - *
+ * This class supports the following system properties (passed with -Dkey=value to the application) + *
    + *
  • -D{@value #TESTS_CLIENT_RATIO} - a double value in the interval [0..1] which defines the ration between node and transport clients used
  • + *
  • -D{@value TestCluster#TESTS_CLUSTER_SEED} - a random seed used to initialize the clusters random context. + *
  • -D{@value TestCluster#TESTS_ENABLE_MOCK_MODULES} - a boolean value to enable or disable mock modules. This is + * useful to test the system without asserting modules that to make sure they don't hide any bugs in production.
  • + *
  • -D{@value #INDEX_SEED_SETTING} - a random seed used to initialize the index random context. + *
*

*/ @Ignore @@ -165,25 +165,25 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase private static final double TRANSPORT_CLIENT_RATIO = transportClientRatio(); private static final Map, TestCluster> clusters = new IdentityHashMap, TestCluster>(); - + @Before public final void before() throws IOException { assert Thread.getDefaultUncaughtExceptionHandler() instanceof ElasticsearchUncaughtExceptionHandler; try { final Scope currentClusterScope = getCurrentClusterScope(); switch (currentClusterScope) { - case GLOBAL: - clearClusters(); - currentCluster = GLOBAL_CLUSTER; - break; - case SUITE: - currentCluster = buildAndPutCluster(currentClusterScope, false); - break; - case TEST: - currentCluster = buildAndPutCluster(currentClusterScope, true); - break; - default: - assert false : "Unknown Scope: [" + currentClusterScope + "]"; + case GLOBAL: + clearClusters(); + currentCluster = GLOBAL_CLUSTER; + break; + case SUITE: + currentCluster = buildAndPutCluster(currentClusterScope, false); + break; + case TEST: + currentCluster = buildAndPutCluster(currentClusterScope, true); + break; + default: + assert false : "Unknown Scope: [" + currentClusterScope + "]"; } currentCluster.beforeTest(getRandom(), getPerTestTransportClientRatio()); wipeIndices("_all"); @@ -204,15 +204,15 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase testCluster = buildTestCluster(currentClusterScope); } else { clusters.remove(this.getClass()); - } + } clearClusters(); clusters.put(this.getClass(), testCluster); return testCluster; } - + private void clearClusters() throws IOException { if (!clusters.isEmpty()) { - for(TestCluster cluster : clusters.values()) { + for (TestCluster cluster : clusters.values()) { cluster.close(); } clusters.clear(); @@ -232,10 +232,10 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase .persistentSettings().getAsMap().size(), equalTo(0)); assertThat("test leaves transient cluster metadata behind: " + metaData.transientSettings().getAsMap(), metaData .transientSettings().getAsMap().size(), equalTo(0)); - + } wipeIndices("_all"); // wipe after to make sure we fail in the test that - // didn't ack the delete + // didn't ack the delete wipeTemplates(); ensureAllSearchersClosed(); ensureAllFilesClosed(); @@ -254,7 +254,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase public static TestCluster cluster() { return currentCluster; } - + public ClusterService clusterService() { return cluster().clusterService(); } @@ -271,10 +271,10 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase // TODO move settings for random directory etc here into the index based randomized settings. if (cluster().size() > 0) { client().admin().indices().preparePutTemplate("random_index_template") - .setTemplate("*") - .setOrder(0) - .setSettings(setRandomNormsLoading(setRandomMergePolicy(getRandom(), ImmutableSettings.builder()) - .put(INDEX_SEED_SETTING, randomLong()))) + .setTemplate("*") + .setOrder(0) + .setSettings(setRandomNormsLoading(setRandomMergePolicy(getRandom(), ImmutableSettings.builder()) + .put(INDEX_SEED_SETTING, randomLong()))) .execute().actionGet(); } } @@ -292,15 +292,15 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase random.nextBoolean() ? random.nextDouble() : random.nextBoolean()); } Class> clazz = TieredMergePolicyProvider.class; - switch(random.nextInt(5)) { - case 4: - clazz = LogByteSizeMergePolicyProvider.class; - break; - case 3: - clazz = LogDocMergePolicyProvider.class; - break; - case 0: - return builder; // don't set the setting at all + switch (random.nextInt(5)) { + case 4: + clazz = LogByteSizeMergePolicyProvider.class; + break; + case 3: + clazz = LogDocMergePolicyProvider.class; + break; + case 0: + return builder; // don't set the setting at all } assert clazz != null; builder.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, clazz.getName()); @@ -319,6 +319,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase public Settings indexSettings() { return ImmutableSettings.EMPTY; } + /** * Deletes the given indices from the tests cluster. If no index name is passed to this method * all indices are removed. @@ -456,7 +457,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase * It is useful to ensure that all action on the cluster have finished and all shards that were currently relocating * are now allocated and started. */ - public ClusterHealthStatus ensureGreen(String...indices) { + public ClusterHealthStatus ensureGreen(String... indices) { ClusterHealthResponse actionGet = client().admin().cluster() .health(Requests.clusterHealthRequest(indices).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet(); if (actionGet.isTimedOut()) { @@ -498,7 +499,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase /** * Ensures the cluster has a yellow state via the cluster health API. */ - public ClusterHealthStatus ensureYellow(String...indices) { + public ClusterHealthStatus ensureYellow(String... indices) { ClusterHealthResponse actionGet = client().admin().cluster() .health(Requests.clusterHealthRequest(indices).waitForRelocatingShards(0).waitForYellowStatus().waitForEvents(Priority.LANGUID)).actionGet(); if (actionGet.isTimedOut()) { @@ -513,7 +514,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase * Ensures the cluster is in a searchable state for the given indices. This means a searchable copy of each * shard is available on the cluster. */ - protected ClusterHealthStatus ensureSearchable(String...indices) { + protected ClusterHealthStatus ensureSearchable(String... indices) { // this is just a temporary thing but it's easier to change if it is encapsulated. return ensureGreen(indices); } @@ -570,6 +571,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase /** * Waits for relocations and refreshes all indices in the cluster. + * * @see #waitForRelocation() */ protected final RefreshResponse refresh() { @@ -634,7 +636,9 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase return client().admin(); } - /** Convenience method that forwards to {@link #indexRandom(boolean, List)}. */ + /** + * Convenience method that forwards to {@link #indexRandom(boolean, List)}. + */ public void indexRandom(boolean forceRefresh, IndexRequestBuilder... builders) throws InterruptedException, ExecutionException { indexRandom(forceRefresh, Arrays.asList(builders)); } @@ -650,7 +654,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase if (builders.size() == 0) { return; } - + Random random = getRandom(); Set indicesSet = new HashSet(); for (IndexRequestBuilder builder : builders) { @@ -744,12 +748,12 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase latch.countDown(); } } - + protected void addError(Throwable t) { } } - + private class PayloadLatchedActionListener extends LatchedActionListener { private final CopyOnWriteArrayList> errors; private final T builder; @@ -795,7 +799,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase */ TEST } - + private ClusterScope getAnnotation(Class clazz) { if (clazz == Object.class || clazz == ElasticsearchIntegrationTest.class) { return null; @@ -806,13 +810,13 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase } return getAnnotation(clazz.getSuperclass()); } - + private Scope getCurrentClusterScope() { ClusterScope annotation = getAnnotation(this.getClass()); // if we are not annotated assume global! return annotation == null ? Scope.GLOBAL : annotation.scope(); } - + private int getNumNodes() { ClusterScope annotation = getAnnotation(this.getClass()); return annotation == null ? -1 : annotation.numNodes(); @@ -828,7 +832,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase protected Settings nodeSettings(int nodeOrdinal) { return ImmutableSettings.EMPTY; } - + private TestCluster buildTestCluster(Scope scope) { long currentClusterSeed = randomLong(); int numNodes = getNumNodes(); @@ -876,9 +880,9 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase */ double transportClientRatio() default -1; } - + /** - * Returns the client ratio configured via + * Returns the client ratio configured via */ private static double transportClientRatio() { String property = System.getProperty(TESTS_CLIENT_RATIO); @@ -893,7 +897,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase * {@link System#getProperty(String)} if available. If both are not available this will * return a random ratio in the interval [0..1] */ - private double getPerTestTransportClientRatio() { + protected double getPerTestTransportClientRatio() { final ClusterScope annotation = getAnnotation(this.getClass()); double perTestRatio = -1; if (annotation != null) { diff --git a/src/test/java/org/elasticsearch/test/TestCluster.java b/src/test/java/org/elasticsearch/test/TestCluster.java index 806270915bf..6d389104bf3 100644 --- a/src/test/java/org/elasticsearch/test/TestCluster.java +++ b/src/test/java/org/elasticsearch/test/TestCluster.java @@ -185,6 +185,10 @@ public final class TestCluster implements Iterable { } + public String getClusterName() { + return clusterName; + } + private static boolean isLocalTransportConfigured() { if ("local".equals(System.getProperty("es.node.mode", "network"))) { return true; @@ -360,7 +364,7 @@ public final class TestCluster implements Iterable { return "node_" + id; } - synchronized Client client() { + public synchronized Client client() { ensureOpen(); /* Randomly return a client to one of the nodes in the cluster */ return getOrBuildRandomNode().client(random); diff --git a/src/test/java/org/elasticsearch/tribe/TribeTests.java b/src/test/java/org/elasticsearch/tribe/TribeTests.java new file mode 100644 index 00000000000..510ca421ac1 --- /dev/null +++ b/src/test/java/org/elasticsearch/tribe/TribeTests.java @@ -0,0 +1,181 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.tribe; + +import com.google.common.base.Predicate; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.TestCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.equalTo; + +/** + */ +public class TribeTests extends ElasticsearchIntegrationTest { + + private TestCluster cluster2; + private Node tribeNode; + private Client tribeClient; + + @Before + public void setupSecondCluster() { + // create another cluster + cluster2 = new TestCluster(randomLong(), 2, cluster().getClusterName() + "-2"); + cluster2.beforeTest(getRandom(), getPerTestTransportClientRatio()); + cluster2.ensureAtLeastNumNodes(2); + + Settings settings = ImmutableSettings.builder() + .put("tribe.t1.cluster.name", cluster().getClusterName()) + .put("tribe.t2.cluster.name", cluster2.getClusterName()) + .build(); + + tribeNode = NodeBuilder.nodeBuilder() + .settings(settings) + .node(); + tribeClient = tribeNode.client(); + } + + @After + public void tearDownSecondCluster() { + tribeNode.close(); + cluster2.afterTest(); + cluster2.close(); + } + + @Test + public void testTribeOnOneCluster() throws Exception { + logger.info("create 2 indices, test1 on t1, and test2 on t2"); + cluster().client().admin().indices().prepareCreate("test1").get(); + cluster2.client().admin().indices().prepareCreate("test2").get(); + + + // wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state + logger.info("wait till test1 and test2 exists in the tribe node state"); + awaitBusy(new Predicate() { + @Override + public boolean apply(Object o) { + ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().setLocal(true).get().getState(); + return tribeState.getMetaData().hasIndex("test1") && tribeState.getMetaData().hasIndex("test2") && + tribeState.getRoutingTable().hasIndex("test1") && tribeState.getRoutingTable().hasIndex("test2"); + } + }); + + logger.info("wait till tribe has the same nodes as the 2 clusters"); + awaitSameNodeCounts(); + + assertThat(tribeClient.admin().cluster().prepareHealth().setLocal(true).setWaitForGreenStatus().get().getStatus(), equalTo(ClusterHealthStatus.GREEN)); + + logger.info("create 2 docs through the tribe node"); + tribeClient.prepareIndex("test1", "type1", "1").setSource("field1", "value1").get(); + tribeClient.prepareIndex("test2", "type1", "1").setSource("field1", "value1").get(); + tribeClient.admin().indices().prepareRefresh().get(); + + logger.info("verify they are there"); + assertHitCount(tribeClient.prepareCount().get(), 2l); + assertHitCount(tribeClient.prepareSearch().get(), 2l); + awaitBusy(new Predicate() { + @Override + public boolean apply(Object o) { + ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().setLocal(true).get().getState(); + return tribeState.getMetaData().index("test1").mapping("type1") != null && + tribeState.getMetaData().index("test2").mapping("type2") != null; + } + }); + + + logger.info("write to another type"); + tribeClient.prepareIndex("test1", "type2", "1").setSource("field1", "value1").get(); + tribeClient.prepareIndex("test2", "type2", "1").setSource("field1", "value1").get(); + tribeClient.admin().indices().prepareRefresh().get(); + + + logger.info("verify they are there"); + assertHitCount(tribeClient.prepareCount().get(), 4l); + assertHitCount(tribeClient.prepareSearch().get(), 4l); + awaitBusy(new Predicate() { + @Override + public boolean apply(Object o) { + ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().setLocal(true).get().getState(); + return tribeState.getMetaData().index("test1").mapping("type1") != null && tribeState.getMetaData().index("test1").mapping("type2") != null && + tribeState.getMetaData().index("test2").mapping("type1") != null && tribeState.getMetaData().index("test2").mapping("type2") != null; + } + }); + + logger.info("make sure master level write operations fail... (we don't really have a master)"); + try { + tribeClient.admin().indices().prepareCreate("tribe_index").setMasterNodeTimeout("10ms").get(); + assert false; + } catch (MasterNotDiscoveredException e) { + // all is well! + } + + logger.info("delete an index, and make sure its reflected"); + cluster2.client().admin().indices().prepareDelete("test2").get(); + awaitBusy(new Predicate() { + @Override + public boolean apply(Object o) { + ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().setLocal(true).get().getState(); + return tribeState.getMetaData().hasIndex("test1") && !tribeState.getMetaData().hasIndex("test2") && + tribeState.getRoutingTable().hasIndex("test1") && !tribeState.getRoutingTable().hasIndex("test2"); + } + }); + + logger.info("stop a node, make sure its reflected"); + cluster2.stopRandomNode(); + awaitSameNodeCounts(); + } + + private void awaitSameNodeCounts() throws Exception { + awaitBusy(new Predicate() { + @Override + public boolean apply(Object o) { + DiscoveryNodes tribeNodes = tribeNode.client().admin().cluster().prepareState().setLocal(true).get().getState().getNodes(); + return countDataNodesForTribe("t1", tribeNodes) == cluster().client().admin().cluster().prepareState().get().getState().getNodes().dataNodes().size() + && countDataNodesForTribe("t2", tribeNodes) == cluster2.client().admin().cluster().prepareState().get().getState().getNodes().dataNodes().size(); + } + }); + } + + private int countDataNodesForTribe(String tribeName, DiscoveryNodes nodes) { + int count = 0; + for (DiscoveryNode node : nodes) { + if (!node.dataNode()) { + continue; + } + if (tribeName.equals(node.getAttributes().get(TribeService.TRIBE_NAME))) { + count++; + } + } + return count; + } +}