Tribe Node

The tribes feature allowed to create a tribe node that can act as a federated client across multiple clusters.

The tribe node configuration looks something like this:

```
tribe.t1.cluster.name: cluster1
tribe.t2.cluster.name: cluster2
```

The configuration above configure connections to 2 clusters, named `t1`, `t2`. It creates a "node" client to each (so by default, above, multicast discovery is used). The settings for each node client is extracted from the `tribe.[tribe_name]` prefix.

The way the tribe node works is by merging the cluster state from each cluster, and creating a merged view of all clusters. This means all operations work the same, distributed search, suggest, percolation, indexing, ... .

The merged view drops conflicted indices and picks one of them if there are 2 indices with the same name across multiple clusters.

By default, read and write operations are allowed. Master level read operations (cluster state for example), require setting the local flag to true (since there is no elected master). Master level write operations are not allowed (create index, ...).

The tribe node can be configured to block write operations `tribe.blocks.write` to `true`, and metadata operations by setting `tribe.blocks.metadata` to `true`.
closes #4708
This commit is contained in:
Shay Banon 2014-01-13 11:08:18 -08:00
parent 541059a4d1
commit 76319b0cd2
13 changed files with 612 additions and 98 deletions

View File

@ -51,6 +51,8 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
*/
void addInitialStateBlock(ClusterBlock block) throws ElasticsearchIllegalStateException;
void removeInitialStateBlock(ClusterBlock block) throws ElasticsearchIllegalStateException;
/**
* The operation routing.
*/

View File

@ -115,6 +115,14 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
initialBlocks.addGlobalBlock(block);
}
@Override
public void removeInitialStateBlock(ClusterBlock block) throws ElasticsearchIllegalStateException {
if (lifecycle.started()) {
throw new ElasticsearchIllegalStateException("can't set initial block when started");
}
initialBlocks.removeGlobalBlock(block);
}
@Override
protected void doStart() throws ElasticsearchException {
add(localNodeMasterListeners);

View File

@ -492,6 +492,11 @@ public class ImmutableSettings implements Settings {
@Override
public Map<String, Settings> getGroups(String settingPrefix) throws SettingsException {
return getGroups(settingPrefix, false);
}
@Override
public Map<String, Settings> getGroups(String settingPrefix, boolean ignoreNonGrouped) throws SettingsException {
if (settingPrefix.charAt(settingPrefix.length() - 1) != '.') {
settingPrefix = settingPrefix + ".";
}
@ -503,6 +508,9 @@ public class ImmutableSettings implements Settings {
String nameValue = setting.substring(settingPrefix.length());
int dotIndex = nameValue.indexOf('.');
if (dotIndex == -1) {
if (ignoreNonGrouped) {
continue;
}
throw new SettingsException("Failed to get setting group for [" + settingPrefix + "] setting prefix and setting [" + setting + "] because of a missing '.'");
}
String name = nameValue.substring(0, dotIndex);

View File

@ -109,6 +109,11 @@ public interface Settings extends ToXContent {
*/
Map<String, Settings> getGroups(String settingPrefix) throws SettingsException;
/**
* Returns group settings for the given setting prefix.
*/
Map<String, Settings> getGroups(String settingPrefix, boolean ignoreNonGrouped) throws SettingsException;
/**
* Returns the setting value (as float) associated with the setting key. If it does not exists,
* returns the default value provided.

View File

@ -22,11 +22,13 @@ package org.elasticsearch.discovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -60,17 +62,19 @@ public class DiscoveryService extends AbstractLifecycleComponent<DiscoveryServic
discovery.addListener(listener);
try {
discovery.start();
try {
logger.trace("waiting for {} for the initial state to be set by the discovery", initialStateTimeout);
if (latch.await(initialStateTimeout.millis(), TimeUnit.MILLISECONDS)) {
logger.trace("initial state set from discovery");
initialStateReceived = true;
} else {
initialStateReceived = false;
logger.warn("waited for {} and no initial state was set by the discovery", initialStateTimeout);
if (initialStateTimeout.millis() > 0) {
try {
logger.trace("waiting for {} for the initial state to be set by the discovery", initialStateTimeout);
if (latch.await(initialStateTimeout.millis(), TimeUnit.MILLISECONDS)) {
logger.trace("initial state set from discovery");
initialStateReceived = true;
} else {
initialStateReceived = false;
logger.warn("waited for {} and no initial state was set by the discovery", initialStateTimeout);
}
} catch (InterruptedException e) {
// ignore
}
} catch (InterruptedException e) {
// ignore
}
} finally {
discovery.removeListener(listener);
@ -107,7 +111,7 @@ public class DiscoveryService extends AbstractLifecycleComponent<DiscoveryServic
/**
* Publish all the changes to the cluster from the master (can be called just by the master). The publish
* process should not publish this state to the master as well! (the master is sending it...).
*
* <p/>
* The {@link org.elasticsearch.discovery.Discovery.AckListener} allows to acknowledge the publish
* event based on the response gotten from all nodes
*/
@ -116,4 +120,12 @@ public class DiscoveryService extends AbstractLifecycleComponent<DiscoveryServic
discovery.publish(clusterState, ackListener);
}
}
public static String generateNodeId(Settings settings) {
String seed = settings.get("discovery.id.seed");
if (seed != null) {
Strings.randomBase64UUID(new Random(Long.parseLong(seed)));
}
return Strings.randomBase64UUID();
}
}

View File

@ -35,10 +35,7 @@ import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.ClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.InitialStateDiscoveryListener;
import org.elasticsearch.discovery.*;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.transport.TransportService;
@ -47,7 +44,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.collect.Sets.newHashSet;
import static org.elasticsearch.cluster.ClusterState.Builder;
@ -78,8 +74,6 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
private static final ConcurrentMap<ClusterName, ClusterGroup> clusterGroups = ConcurrentCollections.newConcurrentMap();
private static final AtomicLong nodeIdGenerator = new AtomicLong();
@Inject
public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
DiscoveryNodeService discoveryNodeService, Version version) {
@ -112,7 +106,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
clusterGroups.put(clusterName, clusterGroup);
}
logger.debug("Connected to cluster [{}]", clusterName);
this.localNode = new DiscoveryNode(settings.get("name"), Long.toString(nodeIdGenerator.incrementAndGet()), transportService.boundAddress().publishAddress(),
this.localNode = new DiscoveryNode(settings.get("name"), DiscoveryService.generateNodeId(settings), transportService.boundAddress().publishAddress(),
discoveryNodeService.buildAttributes(), version);
clusterGroup.members().add(this);

View File

@ -35,7 +35,6 @@ import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
@ -45,6 +44,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.discovery.InitialStateDiscoveryListener;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
@ -62,7 +62,6 @@ import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@ -168,7 +167,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
protected void doStart() throws ElasticsearchException {
Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
// note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling
final String nodeId = getNodeUUID(settings);
final String nodeId = DiscoveryService.generateNodeId(settings);
localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes, version);
latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build();
nodesFD.updateNodes(latestDiscoNodes);
@ -902,14 +901,4 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
}
}
private final String getNodeUUID(Settings settings) {
String seed = settings.get("discovery.id.seed");
if (seed != null) {
logger.trace("using stable discover node UUIDs with seed: [{}]", seed);
Strings.randomBase64UUID(new Random(Long.parseLong(seed)));
}
return Strings.randomBase64UUID();
}
}

View File

@ -93,6 +93,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.tribe.TribeModule;
import org.elasticsearch.tribe.TribeService;
import org.elasticsearch.watcher.ResourceWatcherModule;
import org.elasticsearch.watcher.ResourceWatcherService;
@ -122,6 +124,7 @@ public final class InternalNode implements Node {
public InternalNode(Settings pSettings, boolean loadConfigSettings) throws ElasticsearchException {
Tuple<Settings, Environment> tuple = InternalSettingsPreparer.prepareSettings(pSettings, loadConfigSettings);
tuple = new Tuple<Settings, Environment>(TribeService.processSettings(tuple.v1()), tuple.v2());
Version version = Version.CURRENT;
@ -139,7 +142,7 @@ public final class InternalNode implements Node {
this.pluginsService = new PluginsService(tuple.v1(), tuple.v2());
this.settings = pluginsService.updatedSettings();
this.environment = tuple.v2();
this.environment = new Environment(this.settings());
CompressorFactory.configure(settings);
@ -178,6 +181,7 @@ public final class InternalNode implements Node {
modules.add(new PercolatorModule());
modules.add(new ResourceWatcherModule());
modules.add(new RepositoriesModule());
modules.add(new TribeModule());
injector = modules.createInjector();
@ -232,6 +236,7 @@ public final class InternalNode implements Node {
}
injector.getInstance(BulkUdpService.class).start();
injector.getInstance(ResourceWatcherService.class).start();
injector.getInstance(TribeService.class).start();
logger.info("started");
@ -246,6 +251,7 @@ public final class InternalNode implements Node {
ESLogger logger = Loggers.getLogger(Node.class, settings.get("name"));
logger.info("stopping ...");
injector.getInstance(TribeService.class).stop();
injector.getInstance(BulkUdpService.class).stop();
injector.getInstance(ResourceWatcherService.class).stop();
if (settings.getAsBoolean("http.enabled", true)) {
@ -296,7 +302,9 @@ public final class InternalNode implements Node {
logger.info("closing ...");
StopWatch stopWatch = new StopWatch("node_close");
stopWatch.start("bulk.udp");
stopWatch.start("tribe");
injector.getInstance(TribeService.class).close();
stopWatch.stop().start("bulk.udp");
injector.getInstance(BulkUdpService.class).close();
stopWatch.stop().start("http");
if (settings.getAsBoolean("http.enabled", true)) {

View File

@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tribe;
import org.elasticsearch.common.inject.AbstractModule;
/**
*/
public class TribeModule extends AbstractModule {
@Override
protected void configure() {
bind(TribeService.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,267 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tribe;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.rest.RestStatus;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
/**
* The tribe service holds a list of node clients connected to a list of tribe members, and uses their
* cluster state events to update this local node cluster state with the merged view of it.
* <p/>
* The {@link #processSettings(org.elasticsearch.common.settings.Settings)} method should be called before
* starting the node, so it will make sure to configure this current node properly with the relevant tribe node
* settings.
* <p/>
* The tribe node settings make sure the discovery used is "local", but with no master elected. This means no
* write level master node operations will work ({@link org.elasticsearch.discovery.MasterNotDiscoveredException}
* will be thrown), and state level metadata operations should use the local flag.
* <p/>
* The state merged from different clusters include the list of nodes, metadata, and routing table. Each node merged
* will have in its tribe which tribe member it came from. Each index merged will have in its settings which tribe
* member it came from. In case an index has already been merged from one cluster, and the same name index is discovered
* in another cluster, the conflict one will be discarded. This happens because we need to have the correct index name
* to propagate to the relevant cluster.
*/
public class TribeService extends AbstractLifecycleComponent<TribeService> {
public static final ClusterBlock TRIBE_METADATA_BLOCK = new ClusterBlock(10, "tribe node, metadata not allowed", false, false, RestStatus.BAD_REQUEST, ClusterBlockLevel.METADATA);
public static final ClusterBlock TRIBE_WRITE_BLOCK = new ClusterBlock(11, "tribe node, write not allowed", false, false, RestStatus.BAD_REQUEST, ClusterBlockLevel.WRITE);
public static Settings processSettings(Settings settings) {
if (settings.get(TRIBE_NAME) != null) {
// if its a node client started by this service as tribe, remove any tribe group setting
// to avoid recursive configuration
ImmutableSettings.Builder sb = ImmutableSettings.builder().put(settings);
for (String s : settings.getAsMap().keySet()) {
if (s.startsWith("tribe.") && !s.equals(TRIBE_NAME)) {
sb.remove(s);
}
}
return sb.build();
}
Map<String, Settings> nodesSettings = settings.getGroups("tribe", true);
if (nodesSettings.isEmpty()) {
return settings;
}
// its a tribe configured node..., force settings
ImmutableSettings.Builder sb = ImmutableSettings.builder().put(settings);
sb.put("node.client", true); // this node should just act as a node client
sb.put("discovery.type", "local"); // a tribe node should not use zen discovery
sb.put("discovery.initial_state_timeout", 0); // nothing is going to be discovered, since no master will be elected
if (sb.get("cluster.name") == null) {
sb.put("cluster.name", "tribe_" + Strings.randomBase64UUID()); // make sure it won't join other tribe nodes in the same JVM
}
sb.put("gateway.type", "none"); // we shouldn't store anything locally...
return sb.build();
}
public static final String TRIBE_NAME = "tribe.name";
private final ClusterService clusterService;
private final List<InternalNode> nodes = Lists.newCopyOnWriteArrayList();
@Inject
public TribeService(Settings settings, ClusterService clusterService) {
super(settings);
this.clusterService = clusterService;
Map<String, Settings> nodesSettings = settings.getGroups("tribe", true);
for (Map.Entry<String, Settings> entry : nodesSettings.entrySet()) {
ImmutableSettings.Builder sb = ImmutableSettings.builder().put(entry.getValue());
sb.put("node.name", settings.get("name") + "/" + entry.getKey());
sb.put(TRIBE_NAME, entry.getKey());
if (sb.get("http.enabled") == null) {
sb.put("http.enabled", false);
}
nodes.add((InternalNode) NodeBuilder.nodeBuilder().settings(sb).client(true).build());
}
if (!nodes.isEmpty()) {
// remove the initial election / recovery blocks since we are not going to have a
// master elected in this single tribe node local "cluster"
clusterService.removeInitialStateBlock(Discovery.NO_MASTER_BLOCK);
clusterService.removeInitialStateBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
if (settings.getAsBoolean("tribe.blocks.write", false)) {
clusterService.addInitialStateBlock(TRIBE_WRITE_BLOCK);
}
if (settings.getAsBoolean("tribe.blocks.metadata", false)) {
clusterService.addInitialStateBlock(TRIBE_METADATA_BLOCK);
}
for (InternalNode node : nodes) {
node.injector().getInstance(ClusterService.class).add(new TribeClusterStateListener(node));
}
}
}
@Override
protected void doStart() throws ElasticsearchException {
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("updating local node id", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// add our local node to the mix...
return ClusterState.builder(currentState)
.nodes(DiscoveryNodes.builder(currentState.nodes()).put(clusterService.localNode()).localNodeId(clusterService.localNode().id()))
.build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("{}", t, source);
latch.countDown();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
});
try {
latch.await();
} catch (InterruptedException e) {
// ignore
}
for (InternalNode node : nodes) {
node.start();
}
}
@Override
protected void doStop() throws ElasticsearchException {
for (InternalNode node : nodes) {
node.stop();
}
}
@Override
protected void doClose() throws ElasticsearchException {
for (InternalNode node : nodes) {
node.close();
}
}
class TribeClusterStateListener implements ClusterStateListener {
private final InternalNode tribeNode;
private final String tribeName;
TribeClusterStateListener(InternalNode tribeNode) {
this.tribeNode = tribeNode;
this.tribeName = tribeNode.settings().get(TRIBE_NAME);
}
@Override
public void clusterChanged(final ClusterChangedEvent event) {
logger.debug("[{}] received cluster event, [{}]", tribeName, event.source());
clusterService.submitStateUpdateTask("cluster event from " + tribeName + ", " + event.source(), new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState tribeState = event.state();
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(currentState.nodes());
// -- merge nodes
// go over existing nodes, and see if they need to be removed
for (DiscoveryNode discoNode : currentState.nodes()) {
String markedTribeName = discoNode.attributes().get(TRIBE_NAME);
if (markedTribeName != null && markedTribeName.equals(tribeName)) {
if (tribeState.nodes().get(discoNode.id()) == null) {
logger.info("[{}] removing node [{}]", tribeName, discoNode);
nodes.remove(discoNode.id());
}
}
}
// go over tribe nodes, and see if they need to be added
for (DiscoveryNode tribe : tribeState.nodes()) {
if (currentState.nodes().get(tribe.id()) == null) {
// a new node, add it, but also add the tribe name to the attributes
ImmutableMap<String, String> tribeAttr = MapBuilder.newMapBuilder(tribe.attributes()).put(TRIBE_NAME, tribeName).immutableMap();
DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(), tribe.address(), tribeAttr, tribe.version());
logger.info("[{}] adding node [{}]", tribeName, discoNode);
nodes.put(discoNode);
}
}
// -- merge metadata
MetaData.Builder metaData = MetaData.builder(currentState.metaData());
RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
// go over existing indices, and see if they need to be removed
for (IndexMetaData index : currentState.metaData()) {
String markedTribeName = index.settings().get(TRIBE_NAME);
if (markedTribeName != null && markedTribeName.equals(tribeName)) {
IndexMetaData tribeIndex = tribeState.metaData().index(index.index());
if (tribeIndex == null) {
logger.info("[{}] removing index [{}]", tribeName, index.index());
metaData.remove(index.index());
routingTable.remove(index.index());
} else {
// always make sure to update the metadata and routing table, in case
// there are changes in them (new mapping, shards moving from initializing to started)
routingTable.add(tribeState.routingTable().index(index.index()));
Settings tribeSettings = ImmutableSettings.builder().put(tribeIndex.settings()).put(TRIBE_NAME, tribeName).build();
metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
}
}
}
// go over tribe one, and see if they need to be added
for (IndexMetaData tribeIndex : tribeState.metaData()) {
if (!currentState.metaData().hasIndex(tribeIndex.index())) {
// a new index, add it, and add the tribe name as a setting
logger.info("[{}] adding index [{}]", tribeName, tribeIndex.index());
Settings tribeSettings = ImmutableSettings.builder().put(tribeIndex.settings()).put(TRIBE_NAME, tribeName).build();
metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
routingTable.add(tribeState.routingTable().index(tribeIndex.index()));
}
}
return ClusterState.builder(currentState).nodes(nodes).metaData(metaData).routingTable(routingTable).build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.warn("failed to process [{}]", t, source);
}
});
}
}
}

View File

@ -99,25 +99,25 @@ import static org.hamcrest.Matchers.equalTo;
* or {@link Scope#SUITE} should be used. To configure a scope for the test cluster the {@link ClusterScope} annotation
* should be used, here is an example:
* <pre>
* @ClusterScope(scope=Scope.TEST)
* public class SomeIntegrationTest extends ElasticsearchIntegrationTest {
* @Test
* public void testMethod() {}
*
* @ClusterScope(scope=Scope.TEST) public class SomeIntegrationTest extends ElasticsearchIntegrationTest {
* @Test
* public void testMethod() {}
* }
* </pre>
*
* <p/>
* If no {@link ClusterScope} annotation is present on an integration test the default scope it {@link Scope#GLOBAL}
* <p/>
* A test cluster creates a set of nodes in the background before the test starts. The number of nodes in the cluster is
* determined at random and can change across tests. The minimum number of nodes in the shared global cluster is <code>2</code>.
* For other scopes the {@link ClusterScope} allows configuring the initial number of nodes that are created before
* the tests start.
*
* <p/>
* <pre>
* @ClusterScope(scope=Scope.SUITE, numNodes=3)
* public class SomeIntegrationTest extends ElasticsearchIntegrationTest {
* @Test
* public void testMethod() {}
* @Test
* public void testMethod() {}
* }
* </pre>
* <p/>
@ -125,16 +125,16 @@ import static org.hamcrest.Matchers.equalTo;
* each test might use different directory implementation for each test or will return a random client to one of the
* nodes in the cluster for each call to {@link #client()}. Test failures might only be reproducible if the correct
* system properties are passed to the test execution environment.
*
* <p/>
* <p>
* This class supports the following system properties (passed with -Dkey=value to the application)
* <ul>
* <li>-D{@value #TESTS_CLIENT_RATIO} - a double value in the interval [0..1] which defines the ration between node and transport clients used</li>
* <li>-D{@value TestCluster#TESTS_CLUSTER_SEED} - a random seed used to initialize the clusters random context.
* <li>-D{@value TestCluster#TESTS_ENABLE_MOCK_MODULES} - a boolean value to enable or disable mock modules. This is
* useful to test the system without asserting modules that to make sure they don't hide any bugs in production.</li>
* <li>-D{@value #INDEX_SEED_SETTING} - a random seed used to initialize the index random context.
* </ul>
* This class supports the following system properties (passed with -Dkey=value to the application)
* <ul>
* <li>-D{@value #TESTS_CLIENT_RATIO} - a double value in the interval [0..1] which defines the ration between node and transport clients used</li>
* <li>-D{@value TestCluster#TESTS_CLUSTER_SEED} - a random seed used to initialize the clusters random context.
* <li>-D{@value TestCluster#TESTS_ENABLE_MOCK_MODULES} - a boolean value to enable or disable mock modules. This is
* useful to test the system without asserting modules that to make sure they don't hide any bugs in production.</li>
* <li>-D{@value #INDEX_SEED_SETTING} - a random seed used to initialize the index random context.
* </ul>
* </p>
*/
@Ignore
@ -165,25 +165,25 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
private static final double TRANSPORT_CLIENT_RATIO = transportClientRatio();
private static final Map<Class<?>, TestCluster> clusters = new IdentityHashMap<Class<?>, TestCluster>();
@Before
public final void before() throws IOException {
assert Thread.getDefaultUncaughtExceptionHandler() instanceof ElasticsearchUncaughtExceptionHandler;
try {
final Scope currentClusterScope = getCurrentClusterScope();
switch (currentClusterScope) {
case GLOBAL:
clearClusters();
currentCluster = GLOBAL_CLUSTER;
break;
case SUITE:
currentCluster = buildAndPutCluster(currentClusterScope, false);
break;
case TEST:
currentCluster = buildAndPutCluster(currentClusterScope, true);
break;
default:
assert false : "Unknown Scope: [" + currentClusterScope + "]";
case GLOBAL:
clearClusters();
currentCluster = GLOBAL_CLUSTER;
break;
case SUITE:
currentCluster = buildAndPutCluster(currentClusterScope, false);
break;
case TEST:
currentCluster = buildAndPutCluster(currentClusterScope, true);
break;
default:
assert false : "Unknown Scope: [" + currentClusterScope + "]";
}
currentCluster.beforeTest(getRandom(), getPerTestTransportClientRatio());
wipeIndices("_all");
@ -204,15 +204,15 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
testCluster = buildTestCluster(currentClusterScope);
} else {
clusters.remove(this.getClass());
}
}
clearClusters();
clusters.put(this.getClass(), testCluster);
return testCluster;
}
private void clearClusters() throws IOException {
if (!clusters.isEmpty()) {
for(TestCluster cluster : clusters.values()) {
for (TestCluster cluster : clusters.values()) {
cluster.close();
}
clusters.clear();
@ -232,10 +232,10 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
.persistentSettings().getAsMap().size(), equalTo(0));
assertThat("test leaves transient cluster metadata behind: " + metaData.transientSettings().getAsMap(), metaData
.transientSettings().getAsMap().size(), equalTo(0));
}
wipeIndices("_all"); // wipe after to make sure we fail in the test that
// didn't ack the delete
// didn't ack the delete
wipeTemplates();
ensureAllSearchersClosed();
ensureAllFilesClosed();
@ -254,7 +254,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
public static TestCluster cluster() {
return currentCluster;
}
public ClusterService clusterService() {
return cluster().clusterService();
}
@ -271,10 +271,10 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
// TODO move settings for random directory etc here into the index based randomized settings.
if (cluster().size() > 0) {
client().admin().indices().preparePutTemplate("random_index_template")
.setTemplate("*")
.setOrder(0)
.setSettings(setRandomNormsLoading(setRandomMergePolicy(getRandom(), ImmutableSettings.builder())
.put(INDEX_SEED_SETTING, randomLong())))
.setTemplate("*")
.setOrder(0)
.setSettings(setRandomNormsLoading(setRandomMergePolicy(getRandom(), ImmutableSettings.builder())
.put(INDEX_SEED_SETTING, randomLong())))
.execute().actionGet();
}
}
@ -292,15 +292,15 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
random.nextBoolean() ? random.nextDouble() : random.nextBoolean());
}
Class<? extends MergePolicyProvider<?>> clazz = TieredMergePolicyProvider.class;
switch(random.nextInt(5)) {
case 4:
clazz = LogByteSizeMergePolicyProvider.class;
break;
case 3:
clazz = LogDocMergePolicyProvider.class;
break;
case 0:
return builder; // don't set the setting at all
switch (random.nextInt(5)) {
case 4:
clazz = LogByteSizeMergePolicyProvider.class;
break;
case 3:
clazz = LogDocMergePolicyProvider.class;
break;
case 0:
return builder; // don't set the setting at all
}
assert clazz != null;
builder.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, clazz.getName());
@ -319,6 +319,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
public Settings indexSettings() {
return ImmutableSettings.EMPTY;
}
/**
* Deletes the given indices from the tests cluster. If no index name is passed to this method
* all indices are removed.
@ -456,7 +457,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
* It is useful to ensure that all action on the cluster have finished and all shards that were currently relocating
* are now allocated and started.
*/
public ClusterHealthStatus ensureGreen(String...indices) {
public ClusterHealthStatus ensureGreen(String... indices) {
ClusterHealthResponse actionGet = client().admin().cluster()
.health(Requests.clusterHealthRequest(indices).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
if (actionGet.isTimedOut()) {
@ -498,7 +499,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
/**
* Ensures the cluster has a yellow state via the cluster health API.
*/
public ClusterHealthStatus ensureYellow(String...indices) {
public ClusterHealthStatus ensureYellow(String... indices) {
ClusterHealthResponse actionGet = client().admin().cluster()
.health(Requests.clusterHealthRequest(indices).waitForRelocatingShards(0).waitForYellowStatus().waitForEvents(Priority.LANGUID)).actionGet();
if (actionGet.isTimedOut()) {
@ -513,7 +514,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
* Ensures the cluster is in a searchable state for the given indices. This means a searchable copy of each
* shard is available on the cluster.
*/
protected ClusterHealthStatus ensureSearchable(String...indices) {
protected ClusterHealthStatus ensureSearchable(String... indices) {
// this is just a temporary thing but it's easier to change if it is encapsulated.
return ensureGreen(indices);
}
@ -570,6 +571,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
/**
* Waits for relocations and refreshes all indices in the cluster.
*
* @see #waitForRelocation()
*/
protected final RefreshResponse refresh() {
@ -634,7 +636,9 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
return client().admin();
}
/** Convenience method that forwards to {@link #indexRandom(boolean, List)}. */
/**
* Convenience method that forwards to {@link #indexRandom(boolean, List)}.
*/
public void indexRandom(boolean forceRefresh, IndexRequestBuilder... builders) throws InterruptedException, ExecutionException {
indexRandom(forceRefresh, Arrays.asList(builders));
}
@ -650,7 +654,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
if (builders.size() == 0) {
return;
}
Random random = getRandom();
Set<String> indicesSet = new HashSet<String>();
for (IndexRequestBuilder builder : builders) {
@ -744,12 +748,12 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
latch.countDown();
}
}
protected void addError(Throwable t) {
}
}
private class PayloadLatchedActionListener<Response, T> extends LatchedActionListener<Response> {
private final CopyOnWriteArrayList<Tuple<T, Throwable>> errors;
private final T builder;
@ -795,7 +799,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
*/
TEST
}
private ClusterScope getAnnotation(Class<?> clazz) {
if (clazz == Object.class || clazz == ElasticsearchIntegrationTest.class) {
return null;
@ -806,13 +810,13 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
}
return getAnnotation(clazz.getSuperclass());
}
private Scope getCurrentClusterScope() {
ClusterScope annotation = getAnnotation(this.getClass());
// if we are not annotated assume global!
return annotation == null ? Scope.GLOBAL : annotation.scope();
}
private int getNumNodes() {
ClusterScope annotation = getAnnotation(this.getClass());
return annotation == null ? -1 : annotation.numNodes();
@ -828,7 +832,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.EMPTY;
}
private TestCluster buildTestCluster(Scope scope) {
long currentClusterSeed = randomLong();
int numNodes = getNumNodes();
@ -876,9 +880,9 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
*/
double transportClientRatio() default -1;
}
/**
* Returns the client ratio configured via
* Returns the client ratio configured via
*/
private static double transportClientRatio() {
String property = System.getProperty(TESTS_CLIENT_RATIO);
@ -893,7 +897,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
* {@link System#getProperty(String)} if available. If both are not available this will
* return a random ratio in the interval <tt>[0..1]</tt>
*/
private double getPerTestTransportClientRatio() {
protected double getPerTestTransportClientRatio() {
final ClusterScope annotation = getAnnotation(this.getClass());
double perTestRatio = -1;
if (annotation != null) {

View File

@ -185,6 +185,10 @@ public final class TestCluster implements Iterable<Client> {
}
public String getClusterName() {
return clusterName;
}
private static boolean isLocalTransportConfigured() {
if ("local".equals(System.getProperty("es.node.mode", "network"))) {
return true;
@ -360,7 +364,7 @@ public final class TestCluster implements Iterable<Client> {
return "node_" + id;
}
synchronized Client client() {
public synchronized Client client() {
ensureOpen();
/* Randomly return a client to one of the nodes in the cluster */
return getOrBuildRandomNode().client(random);

View File

@ -0,0 +1,181 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tribe;
import com.google.common.base.Predicate;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.TestCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
/**
*/
public class TribeTests extends ElasticsearchIntegrationTest {
private TestCluster cluster2;
private Node tribeNode;
private Client tribeClient;
@Before
public void setupSecondCluster() {
// create another cluster
cluster2 = new TestCluster(randomLong(), 2, cluster().getClusterName() + "-2");
cluster2.beforeTest(getRandom(), getPerTestTransportClientRatio());
cluster2.ensureAtLeastNumNodes(2);
Settings settings = ImmutableSettings.builder()
.put("tribe.t1.cluster.name", cluster().getClusterName())
.put("tribe.t2.cluster.name", cluster2.getClusterName())
.build();
tribeNode = NodeBuilder.nodeBuilder()
.settings(settings)
.node();
tribeClient = tribeNode.client();
}
@After
public void tearDownSecondCluster() {
tribeNode.close();
cluster2.afterTest();
cluster2.close();
}
@Test
public void testTribeOnOneCluster() throws Exception {
logger.info("create 2 indices, test1 on t1, and test2 on t2");
cluster().client().admin().indices().prepareCreate("test1").get();
cluster2.client().admin().indices().prepareCreate("test2").get();
// wait till the tribe node connected to the cluster, by checking if the index exists in the cluster state
logger.info("wait till test1 and test2 exists in the tribe node state");
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().setLocal(true).get().getState();
return tribeState.getMetaData().hasIndex("test1") && tribeState.getMetaData().hasIndex("test2") &&
tribeState.getRoutingTable().hasIndex("test1") && tribeState.getRoutingTable().hasIndex("test2");
}
});
logger.info("wait till tribe has the same nodes as the 2 clusters");
awaitSameNodeCounts();
assertThat(tribeClient.admin().cluster().prepareHealth().setLocal(true).setWaitForGreenStatus().get().getStatus(), equalTo(ClusterHealthStatus.GREEN));
logger.info("create 2 docs through the tribe node");
tribeClient.prepareIndex("test1", "type1", "1").setSource("field1", "value1").get();
tribeClient.prepareIndex("test2", "type1", "1").setSource("field1", "value1").get();
tribeClient.admin().indices().prepareRefresh().get();
logger.info("verify they are there");
assertHitCount(tribeClient.prepareCount().get(), 2l);
assertHitCount(tribeClient.prepareSearch().get(), 2l);
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().setLocal(true).get().getState();
return tribeState.getMetaData().index("test1").mapping("type1") != null &&
tribeState.getMetaData().index("test2").mapping("type2") != null;
}
});
logger.info("write to another type");
tribeClient.prepareIndex("test1", "type2", "1").setSource("field1", "value1").get();
tribeClient.prepareIndex("test2", "type2", "1").setSource("field1", "value1").get();
tribeClient.admin().indices().prepareRefresh().get();
logger.info("verify they are there");
assertHitCount(tribeClient.prepareCount().get(), 4l);
assertHitCount(tribeClient.prepareSearch().get(), 4l);
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().setLocal(true).get().getState();
return tribeState.getMetaData().index("test1").mapping("type1") != null && tribeState.getMetaData().index("test1").mapping("type2") != null &&
tribeState.getMetaData().index("test2").mapping("type1") != null && tribeState.getMetaData().index("test2").mapping("type2") != null;
}
});
logger.info("make sure master level write operations fail... (we don't really have a master)");
try {
tribeClient.admin().indices().prepareCreate("tribe_index").setMasterNodeTimeout("10ms").get();
assert false;
} catch (MasterNotDiscoveredException e) {
// all is well!
}
logger.info("delete an index, and make sure its reflected");
cluster2.client().admin().indices().prepareDelete("test2").get();
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().setLocal(true).get().getState();
return tribeState.getMetaData().hasIndex("test1") && !tribeState.getMetaData().hasIndex("test2") &&
tribeState.getRoutingTable().hasIndex("test1") && !tribeState.getRoutingTable().hasIndex("test2");
}
});
logger.info("stop a node, make sure its reflected");
cluster2.stopRandomNode();
awaitSameNodeCounts();
}
private void awaitSameNodeCounts() throws Exception {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
DiscoveryNodes tribeNodes = tribeNode.client().admin().cluster().prepareState().setLocal(true).get().getState().getNodes();
return countDataNodesForTribe("t1", tribeNodes) == cluster().client().admin().cluster().prepareState().get().getState().getNodes().dataNodes().size()
&& countDataNodesForTribe("t2", tribeNodes) == cluster2.client().admin().cluster().prepareState().get().getState().getNodes().dataNodes().size();
}
});
}
private int countDataNodesForTribe(String tribeName, DiscoveryNodes nodes) {
int count = 0;
for (DiscoveryNode node : nodes) {
if (!node.dataNode()) {
continue;
}
if (tribeName.equals(node.getAttributes().get(TribeService.TRIBE_NAME))) {
count++;
}
}
return count;
}
}