Cut over more tests to AbstractIntegrationTest

This commit is contained in:
Simon Willnauer 2013-09-20 13:25:12 +02:00
parent 29c0f27a9e
commit 30d7faeba2
9 changed files with 378 additions and 328 deletions

View File

@ -1,39 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.AbstractNodesTests;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
/**
*
*/
public abstract class AbstractZenNodesTests extends AbstractNodesTests {
@Override
protected final Settings getClassDefaultSettings() {
// we force zen discovery here since it has specific handling for specific master / data nodes
// and disconnections
return settingsBuilder().put("discovery.type", "zen").build();
}
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.cluster;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -31,10 +30,11 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Test;
import java.util.*;
@ -48,23 +48,19 @@ import static org.hamcrest.Matchers.*;
/**
*
*/
public class ClusterServiceTests extends AbstractZenNodesTests {
@After
public void closeNodes() {
closeAllNodes();
}
@ClusterScope(scope = Scope.TEST, numNodes=0)
public class ClusterServiceTests extends AbstractIntegrationTest {
@Test
public void testTimeoutUpdateTask() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "zen")
.put("discovery.zen.minimum_master_nodes", 1)
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")
.build();
InternalNode node1 = (InternalNode) startNode("node1", settings);
ClusterService clusterService1 = node1.injector().getInstance(ClusterService.class);
cluster().startNode(settings);
ClusterService clusterService1 = cluster().getInstance(ClusterService.class);
final CountDownLatch block = new CountDownLatch(1);
clusterService1.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
@ -115,10 +111,13 @@ public class ClusterServiceTests extends AbstractZenNodesTests {
@Test
public void testPendingUpdateTask() throws Exception {
InternalNode node1 = (InternalNode) startNode("node1");
Client client = startNode("client-node", settingsBuilder().put("node.client", true).build()).client();
Settings zenSettings = settingsBuilder()
.put("discovery.type", "zen").build();
String node_0 = cluster().startNode(zenSettings);
cluster().startNodeClient(zenSettings);
ClusterService clusterService = node1.injector().getInstance(ClusterService.class);
ClusterService clusterService = cluster().getInstance(ClusterService.class, node_0);
final CountDownLatch block1 = new CountDownLatch(1);
final CountDownLatch invoked1 = new CountDownLatch(1);
clusterService.submitStateUpdateTask("1", new ClusterStateUpdateTask() {
@ -166,7 +165,7 @@ public class ClusterServiceTests extends AbstractZenNodesTests {
assertTrue(controlSources.isEmpty());
controlSources = new HashSet<String>(Arrays.asList("2", "3", "4", "5", "6", "7", "8", "9", "10"));
PendingClusterTasksResponse response = client.admin().cluster().preparePendingClusterTasks().execute().actionGet();
PendingClusterTasksResponse response = cluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet();
assertThat(response.pendingTasks().size(), equalTo(9));
for (PendingClusterTask task : response) {
assertTrue(controlSources.remove(task.source().string()));
@ -177,7 +176,7 @@ public class ClusterServiceTests extends AbstractZenNodesTests {
pendingClusterTasks = clusterService.pendingTasks();
assertThat(pendingClusterTasks, empty());
response = client.admin().cluster().preparePendingClusterTasks().execute().actionGet();
response = cluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet();
assertThat(response.pendingTasks(), empty());
final CountDownLatch block2 = new CountDownLatch(1);
@ -225,7 +224,7 @@ public class ClusterServiceTests extends AbstractZenNodesTests {
}
assertTrue(controlSources.isEmpty());
response = client.admin().cluster().preparePendingClusterTasks().execute().actionGet();
response = cluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet();
assertThat(response.pendingTasks().size(), equalTo(4));
controlSources = new HashSet<String>(Arrays.asList("2", "3", "4", "5"));
for (PendingClusterTask task : response) {
@ -239,34 +238,35 @@ public class ClusterServiceTests extends AbstractZenNodesTests {
@Test
public void testListenerCallbacks() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "zen")
.put("discovery.zen.minimum_master_nodes", 1)
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")
.put("plugin.types", TestPlugin.class.getName())
.build();
InternalNode node1 = (InternalNode) startNode("node1", settings);
ClusterService clusterService1 = node1.injector().getInstance(ClusterService.class);
MasterAwareService testService1 = node1.injector().getInstance(MasterAwareService.class);
cluster().startNode(settings);
ClusterService clusterService1 = cluster().getInstance(ClusterService.class);
MasterAwareService testService1 = cluster().getInstance(MasterAwareService.class);
// the first node should be a master as the minimum required is 1
assertThat(clusterService1.state().nodes().masterNode(), notNullValue());
assertThat(clusterService1.state().nodes().localNodeMaster(), is(true));
assertThat(testService1.master(), is(true));
InternalNode node2 = (InternalNode) startNode("node2", settings);
ClusterService clusterService2 = node2.injector().getInstance(ClusterService.class);
MasterAwareService testService2 = node2.injector().getInstance(MasterAwareService.class);
String node_1 = cluster().startNode(settings);
ClusterService clusterService2 = cluster().getInstance(ClusterService.class, node_1);
MasterAwareService testService2 = cluster().getInstance(MasterAwareService.class, node_1);
ClusterHealthResponse clusterHealth = node2.client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
// the second node should not be the master as node1 is already the master.
assertThat(clusterService2.state().nodes().localNodeMaster(), is(false));
assertThat(testService2.master(), is(false));
closeNode("node1");
clusterHealth = node2.client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("1").execute().actionGet();
cluster().stopCurrentMasterNode();
clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("1").execute().actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
// now that node1 is closed, node2 should be elected as master
@ -275,8 +275,9 @@ public class ClusterServiceTests extends AbstractZenNodesTests {
Settings newSettings = settingsBuilder()
.put("discovery.zen.minimum_master_nodes", 2)
.put("discovery.type", "zen")
.build();
node2.client().admin().cluster().prepareUpdateSettings().setTransientSettings(newSettings).execute().actionGet();
client().admin().cluster().prepareUpdateSettings().setTransientSettings(newSettings).execute().actionGet();
Thread.sleep(200);
// there should not be any master as the minimum number of required eligible masters is not met
@ -284,17 +285,17 @@ public class ClusterServiceTests extends AbstractZenNodesTests {
assertThat(testService2.master(), is(false));
node1 = (InternalNode) startNode("node1", settings);
clusterService1 = node1.injector().getInstance(ClusterService.class);
testService1 = node1.injector().getInstance(MasterAwareService.class);
String node_2 = cluster().startNode(settings);
clusterService1 =cluster().getInstance(ClusterService.class, node_2);
testService1 = cluster().getInstance(MasterAwareService.class, node_2);
// make sure both nodes see each other otherwise the masternode below could be null if node 2 is master and node 1 did'r receive the updated cluster state...
assertThat(node2.client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true).setWaitForNodes("2").execute().actionGet().isTimedOut(), is(false));
assertThat(node1.client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true).setWaitForNodes("2").execute().actionGet().isTimedOut(), is(false));
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true).setWaitForNodes("2").execute().actionGet().isTimedOut(), is(false));
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true).setWaitForNodes("2").execute().actionGet().isTimedOut(), is(false));
// now that we started node1 again, a new master should be elected
assertThat(clusterService1.state().nodes().masterNode(), is(notNullValue()));
if ("node1".equals(clusterService1.state().nodes().masterNode().name())) {
if (node_2.equals(clusterService1.state().nodes().masterNode().name())) {
assertThat(testService1.master(), is(true));
assertThat(testService2.master(), is(false));
} else {

View File

@ -20,52 +20,29 @@
package org.elasticsearch.cluster;
import com.google.common.base.Predicate;
import com.google.common.collect.Sets;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.node.internal.InternalNode;
import org.junit.After;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.junit.Test;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.client.Requests.clusterHealthRequest;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
public class MinimumMasterNodesTests extends AbstractZenNodesTests {
@After
public void cleanAndCloseNodes() throws Exception {
super.tearDown();
for (int i = 0; i < 10; i++) {
if (node("node" + i) != null) {
node("node" + i).stop();
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
if (((InternalNode) node("node" + i)).injector().getInstance(NodeEnvironment.class).hasNodeFile()) {
((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset();
}
}
}
closeAllNodes();
}
@ClusterScope(scope = Scope.TEST, numNodes=0)
public class MinimumMasterNodesTests extends AbstractIntegrationTest {
@Test
public void simpleMinimumMasterNodes() throws Exception {
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local"));
buildNode("node2", settingsBuilder().put("gateway.type", "local"));
cleanAndCloseNodes();
Settings settings = settingsBuilder()
.put("discovery.type", "zen")
@ -77,135 +54,116 @@ public class MinimumMasterNodesTests extends AbstractZenNodesTests {
.build();
logger.info("--> start first node");
startNode("node1", settings);
cluster().startNode(settings);
logger.info("--> should be blocked, no master...");
ClusterState state = client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true));
logger.info("--> start second node, cluster should be formed");
startNode("node2", settings);
cluster().startNode(settings);
ClusterHealthResponse clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
state = client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false));
state = client("node2").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false));
state = client("node1").admin().cluster().prepareState().execute().actionGet().getState();
state = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(state.nodes().size(), equalTo(2));
assertThat(state.metaData().indices().containsKey("test"), equalTo(false));
client("node1").admin().indices().prepareCreate("test").execute().actionGet();
client().admin().indices().prepareCreate("test").execute().actionGet();
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
client("node1").prepareIndex("test", "type1", Integer.toString(i)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test", "type1", Integer.toString(i)).setSource("field", "value").execute().actionGet();
}
// make sure that all shards recovered before trying to flush
assertThat(client("node1").admin().cluster().prepareHealth("test").setWaitForActiveShards(2).execute().actionGet().getActiveShards(), equalTo(2));
assertThat(client().admin().cluster().prepareHealth("test").setWaitForActiveShards(2).execute().actionGet().getActiveShards(), equalTo(2));
// flush for simpler debugging
client("node1").admin().indices().prepareFlush().execute().actionGet();
client().admin().indices().prepareFlush().execute().actionGet();
client("node1").admin().indices().prepareRefresh().execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
logger.info("--> verify we the data back");
for (int i = 0; i < 10; i++) {
assertThat(client("node1").prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l));
assertThat(client().prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l));
}
String masterNodeName = state.nodes().masterNode().name();
String nonMasterNodeName = masterNodeName.equals("node1") ? "node2" : "node1";
logger.info("--> closing master node {}", masterNodeName);
closeNode(masterNodeName);
final String noMasterNode = nonMasterNodeName;
cluster().stopCurrentMasterNode();
awaitBusy(new Predicate<Object>() {
public boolean apply(Object obj) {
ClusterState state = client(noMasterNode).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
return state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK);
}
});
state = client(nonMasterNodeName).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true));
logger.info("--> starting the previous master node again...");
startNode(masterNodeName, settings);
cluster().startNode(settings);
clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().setWaitForNodes("2").execute().actionGet();
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().setWaitForNodes("2").execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
state = client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false));
state = client("node2").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false));
state = client("node1").admin().cluster().prepareState().execute().actionGet().getState();
state = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(state.nodes().size(), equalTo(2));
assertThat(state.metaData().indices().containsKey("test"), equalTo(true));
logger.info("Running Cluster Health");
ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
logger.info("--> verify we the data back");
for (int i = 0; i < 10; i++) {
assertThat(client("node1").prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l));
assertThat(client().prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l));
}
masterNodeName = state.nodes().masterNode().name();
nonMasterNodeName = masterNodeName.equals("node1") ? "node2" : "node1";
logger.info("--> closing non master node {}", nonMasterNodeName);
closeNode(nonMasterNodeName);
final String masterNode = masterNodeName;
awaitBusy(new Predicate<Object>() {
cluster().stopRandomNonMasterNode();
assertThat(awaitBusy(new Predicate<Object>() {
public boolean apply(Object obj) {
ClusterState state = client(masterNode).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
return state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK);
}
});
state = client(masterNodeName).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true));
}), equalTo(true));
logger.info("--> starting the previous master node again...");
startNode(nonMasterNodeName, settings);
cluster().startNode(settings);
clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").setWaitForGreenStatus().execute().actionGet();
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").setWaitForGreenStatus().execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
state = client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false));
state = client("node2").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false));
state = client("node1").admin().cluster().prepareState().execute().actionGet().getState();
state = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(state.nodes().size(), equalTo(2));
assertThat(state.metaData().indices().containsKey("test"), equalTo(true));
logger.info("Running Cluster Health");
clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
logger.info("--> verify we the data back");
for (int i = 0; i < 10; i++) {
assertThat(client("node1").prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l));
assertThat(client().prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l));
}
}
@Test
public void multipleNodesShutdownNonMasterNodes() throws Exception {
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local"));
buildNode("node2", settingsBuilder().put("gateway.type", "local"));
buildNode("node3", settingsBuilder().put("gateway.type", "local"));
buildNode("node4", settingsBuilder().put("gateway.type", "local"));
cleanAndCloseNodes();
Settings settings = settingsBuilder()
.put("discovery.type", "zen")
.put("discovery.zen.minimum_master_nodes", 3)
@ -215,108 +173,93 @@ public class MinimumMasterNodesTests extends AbstractZenNodesTests {
.build();
logger.info("--> start first 2 nodes");
startNode("node1", settings);
startNode("node2", settings);
cluster().startNode(settings);
cluster().startNode(settings);
ClusterState state;
awaitBusy(new Predicate<Object>() {
public boolean apply(Object obj) {
ClusterState state = client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
return state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK);
}
});
awaitBusy(new Predicate<Object>() {
public boolean apply(Object obj) {
ClusterState state = client("node2").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
return state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK);
}
});
state = client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true));
state = client("node2").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true));
logger.info("--> start two more nodes");
startNode("node3", settings);
startNode("node4", settings);
cluster().startNode(settings);
cluster().startNode(settings);
ClusterHealthResponse clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("4").execute().actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("4").execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
state = client("node1").admin().cluster().prepareState().execute().actionGet().getState();
state = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(state.nodes().size(), equalTo(4));
final String masterNode = state.nodes().masterNode().name();
LinkedList<String> nonMasterNodes = new LinkedList<String>();
for (DiscoveryNode node : state.nodes()) {
if (!node.name().equals(masterNode)) {
nonMasterNodes.add(node.name());
}
}
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
client("node1").prepareIndex("test", "type1", Integer.toString(i)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test", "type1", Integer.toString(i)).setSource("field", "value").execute().actionGet();
}
// make sure that all shards recovered before trying to flush
assertThat(client("node1").admin().cluster().prepareHealth("test").setWaitForActiveShards(10).execute().actionGet().isTimedOut(), equalTo(false));
assertThat(client().admin().cluster().prepareHealth("test").setWaitForActiveShards(10).execute().actionGet().isTimedOut(), equalTo(false));
// flush for simpler debugging
client("node1").admin().indices().prepareFlush().execute().actionGet();
client().admin().indices().prepareFlush().execute().actionGet();
client("node1").admin().indices().prepareRefresh().execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
logger.info("--> verify we the data back");
for (int i = 0; i < 10; i++) {
assertThat(client("node1").prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l));
assertThat(client().prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l));
}
Set<String> nodesToShutdown = Sets.newHashSet();
nodesToShutdown.add(nonMasterNodes.removeLast());
nodesToShutdown.add(nonMasterNodes.removeLast());
logger.info("--> shutting down two master nodes {}", nodesToShutdown);
for (String nodeToShutdown : nodesToShutdown) {
closeNode(nodeToShutdown);
}
cluster().stopRandomNonMasterNode();
cluster().stopRandomNonMasterNode();
final String lastNonMasterNodeUp = nonMasterNodes.removeLast();
logger.info("--> verify that there is no master anymore on remaining nodes");
// spin here to wait till the state is set
awaitBusy(new Predicate<Object>() {
assertThat(awaitBusy(new Predicate<Object>() {
public boolean apply(Object obj) {
ClusterState state = client(masterNode).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
boolean firstNoMasterLock = state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK);
state = client(lastNonMasterNodeUp).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
boolean secondNoMasterLock = state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK);
return firstNoMasterLock && secondNoMasterLock;
boolean success = true;
for(Client client : cluster()) {
ClusterState state = client.admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
success &= state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK);
if (logger.isDebugEnabled()) {
logger.debug("Checking for NO_MASTER_BLOCL on client: {} NO_MASTER_BLOCK: [{}]", client, state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK));
}
}
return success;
}
}, 20, TimeUnit.SECONDS);
state = client(masterNode).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true));
state = client(lastNonMasterNodeUp).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true));
}, 20, TimeUnit.SECONDS), equalTo(true));
logger.info("--> start back the nodes {}", nodesToShutdown);
for (String nodeToShutdown : nodesToShutdown) {
startNode(nodeToShutdown, settings);
}
logger.info("--> start back the 2 nodes ");
cluster().startNode(settings);
cluster().startNode(settings);
clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("4").execute().actionGet();
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("4").execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
logger.info("Running Cluster Health");
ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
state = client("node1").admin().cluster().prepareState().execute().actionGet().getState();
state = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(state.nodes().size(), equalTo(4));
logger.info("--> verify we the data back");
for (int i = 0; i < 10; i++) {
assertThat(client("node1").prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l));
assertThat(client().prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l));
}
}
}

View File

@ -19,83 +19,80 @@
package org.elasticsearch.cluster;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.junit.After;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.junit.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
/**
*
*/
public class SpecificMasterNodesTests extends AbstractZenNodesTests {
@ClusterScope(scope = Scope.TEST, numNodes=0)
public class SpecificMasterNodesTests extends AbstractIntegrationTest {
@After
public void closeNodes() {
closeAllNodes();
protected final ImmutableSettings.Builder settingsBuilder() {
return ImmutableSettings.builder().put("discovery.type", "zen");
}
@Test
public void simpleOnlyMasterNodeElection() {
logger.info("--> start data node / non master node");
startNode("data1", settingsBuilder().put("node.data", true).put("node.master", false).put("discovery.initial_state_timeout", "1s"));
cluster().startNode(settingsBuilder().put("node.data", true).put("node.master", false).put("discovery.initial_state_timeout", "1s"));
try {
assertThat(client("data1").admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().masterNodeId(), nullValue());
assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().masterNodeId(), nullValue());
assert false : "should not be able to find master";
} catch (MasterNotDiscoveredException e) {
// all is well, no master elected
}
logger.info("--> start master node");
startNode("master1", settingsBuilder().put("node.data", false).put("node.master", true));
assertThat(client("data1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1"));
assertThat(client("master1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1"));
final String masterNodeName = cluster().startNode(settingsBuilder().put("node.data", false).put("node.master", true));
assertThat(cluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(masterNodeName));
assertThat(cluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(masterNodeName));
logger.info("--> stop master node");
closeNode("master1");
cluster().stopCurrentMasterNode();
try {
assertThat(client("data1").admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().masterNodeId(), nullValue());
assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().masterNodeId(), nullValue());
assert false : "should not be able to find master";
} catch (MasterNotDiscoveredException e) {
// all is well, no master elected
}
logger.info("--> start master node");
startNode("master1", settingsBuilder().put("node.data", false).put("node.master", true));
assertThat(client("data1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1"));
assertThat(client("master1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1"));
logger.info("--> stop all nodes");
closeNode("data1");
closeNode("master1");
final String nextMasterEligableNodeName = cluster().startNode(settingsBuilder().put("node.data", false).put("node.master", true));
assertThat(cluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(nextMasterEligableNodeName));
assertThat(cluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(nextMasterEligableNodeName));
}
@Test
public void electOnlyBetweenMasterNodes() {
logger.info("--> start data node / non master node");
startNode("data1", settingsBuilder().put("node.data", true).put("node.master", false).put("discovery.initial_state_timeout", "1s"));
cluster().startNode(settingsBuilder().put("node.data", true).put("node.master", false).put("discovery.initial_state_timeout", "1s"));
try {
assertThat(client("data1").admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().masterNodeId(), nullValue());
assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().masterNodeId(), nullValue());
assert false : "should not be able to find master";
} catch (MasterNotDiscoveredException e) {
// all is well, no master elected
}
logger.info("--> start master node (1)");
startNode("master1", settingsBuilder().put("node.data", false).put("node.master", true));
assertThat(client("data1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1"));
assertThat(client("master1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1"));
final String masterNodeName = cluster().startNode(settingsBuilder().put("node.data", false).put("node.master", true));
assertThat(cluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(masterNodeName));
assertThat(cluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(masterNodeName));
logger.info("--> start master node (2)");
startNode("master2", settingsBuilder().put("node.data", false).put("node.master", true));
assertThat(client("data1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1"));
assertThat(client("master1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1"));
assertThat(client("master2").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1"));
final String nextMasterEligableNodeName = cluster().startNode(settingsBuilder().put("node.data", false).put("node.master", true));
assertThat(cluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(masterNodeName));
assertThat(cluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(masterNodeName));
assertThat(cluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(masterNodeName));
logger.info("--> closing master node (1)");
closeNode("master1");
assertThat(client("data1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master2"));
assertThat(client("master2").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master2"));
cluster().stopCurrentMasterNode();
assertThat(cluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(nextMasterEligableNodeName));
assertThat(cluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(nextMasterEligableNodeName));
}
}

View File

@ -25,20 +25,16 @@ import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocatorModule;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.AbstractNodesTests;
import org.junit.After;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.instanceOf;
public class ShardsAllocatorModuleTests extends AbstractNodesTests {
@ClusterScope(scope=Scope.TEST, numNodes=0)
public class ShardsAllocatorModuleTests extends AbstractIntegrationTest {
@After
public void cleanAndCloseNodes() throws Exception {
closeAllNodes();
}
public void testLoadDefaultShardsAllocator() {
assertAllocatorInstance(ImmutableSettings.Builder.EMPTY_SETTINGS, BalancedShardsAllocator.class);
@ -62,11 +58,11 @@ public class ShardsAllocatorModuleTests extends AbstractNodesTests {
}
private void assertAllocatorInstance(Settings settings, Class<? extends ShardsAllocator> clazz) {
closeNode("node");
Node _node = startNode("node", settings);
InternalNode node = (InternalNode) _node;
ShardsAllocator instance = node.injector().getInstance(ShardsAllocator.class);
node.close();
while (cluster().numNodes() != 0) {
cluster().stopRandomNode();
}
cluster().startNode(settings);
ShardsAllocator instance = cluster().getInstance(ShardsAllocator.class);
assertThat(instance, instanceOf(clazz));
}
}

View File

@ -171,6 +171,7 @@ public abstract class AbstractIntegrationTest extends ElasticSearchTestCase {
ensureAllFilesClosed();
logger.info("[{}#{}]: cleaned up after test", getTestClass().getSimpleName(), getTestName());
} finally {
currentCluster.afterTest();
currentCluster = null;
}
}
@ -199,7 +200,7 @@ public abstract class AbstractIntegrationTest extends ElasticSearchTestCase {
}
public static Iterable<Client> clients() {
return cluster().clients();
return cluster();
}
public ImmutableSettings.Builder randomSettingsBuilder() {
@ -559,14 +560,25 @@ public abstract class AbstractIntegrationTest extends ElasticSearchTestCase {
GLOBAL, SUITE, TEST;
}
private ClusterScope getAnnotation(Class<?> clazz) {
if (clazz == Object.class || clazz == AbstractIntegrationTest.class) {
return null;
}
ClusterScope annotation = clazz.getAnnotation(ClusterScope.class);
if (annotation != null) {
return annotation;
}
return getAnnotation(clazz.getSuperclass());
}
private Scope getCurrentClusterScope() {
ClusterScope annotation = this.getClass().getAnnotation(ClusterScope.class);
ClusterScope annotation = getAnnotation(this.getClass());
// if we are not annotated assume global!
return annotation == null ? Scope.GLOBAL : annotation.scope();
}
private int getNumNodes() {
ClusterScope annotation = this.getClass().getAnnotation(ClusterScope.class);
ClusterScope annotation = getAnnotation(this.getClass());
return annotation == null ? -1 : annotation.numNodes();
}

View File

@ -62,8 +62,8 @@ public abstract class ElasticSearchTestCase extends AbstractRandomizedTest {
}
public void awaitBusy(Predicate<?> breakPredicate) throws InterruptedException {
awaitBusy(breakPredicate, 10, TimeUnit.SECONDS);
public boolean awaitBusy(Predicate<?> breakPredicate) throws InterruptedException {
return awaitBusy(breakPredicate, 10, TimeUnit.SECONDS);
}
public boolean awaitBusy(Predicate<?> breakPredicate, long maxWaitTime, TimeUnit unit) throws InterruptedException {

View File

@ -21,6 +21,8 @@ package org.elasticsearch.test;
import com.carrotsearch.randomizedtesting.SeedUtils;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
@ -28,11 +30,13 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkUtils;
@ -41,6 +45,7 @@ import org.elasticsearch.common.settings.ImmutableSettings.Builder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.index.store.mock.MockFSIndexStoreModule;
import org.elasticsearch.node.Node;
@ -48,6 +53,7 @@ import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.File;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -56,12 +62,14 @@ import static com.google.common.collect.Maps.newTreeMap;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
public class TestCluster implements Closeable {
public class TestCluster implements Closeable, Iterable<Client> {
protected final ESLogger logger = Loggers.getLogger(getClass());
/* sorted map to make traverse order reproducible */
private final TreeMap<String, NodeAndClient> nodes = newTreeMap();
private final Set<File> dataDirToClean = new HashSet<File>();
private final String clusterName;
@ -69,8 +77,6 @@ public class TestCluster implements Closeable {
private final Settings defaultSettings;
private NodeAndClient clientNode; // currently unused
private Random random;
private AtomicInteger nextNodeId = new AtomicInteger(0);
@ -154,29 +160,38 @@ public class TestCluster implements Closeable {
return randomNodeAndClient.node();
}
NodeAndClient buildNode = buildNode();
nodes.put(buildNode.name, buildNode);
return buildNode.node().start();
buildNode.node().start();
publishNode(buildNode);
return buildNode.node();
}
private synchronized NodeAndClient getRandomNodeAndClient() {
Predicate<NodeAndClient> all = Predicates.alwaysTrue();
return getRandomNodeAndClient(all);
}
private synchronized NodeAndClient getRandomNodeAndClient(Predicate<NodeAndClient> predicate) {
ensureOpen();
Collection<NodeAndClient> values = nodes.values();
int whichOne = random.nextInt(values.size());
for (NodeAndClient nodeAndClient : values) {
if (whichOne-- == 0) {
return nodeAndClient;
Collection<NodeAndClient> values = Collections2.filter(nodes.values(), predicate) ;
if (!values.isEmpty()) {
int whichOne = random.nextInt(values.size());
for (NodeAndClient nodeAndClient : values) {
if (whichOne-- == 0) {
return nodeAndClient;
}
}
}
return null;
}
public synchronized void ensureAtLeastNumNodes(int num) {
int size = nodes.size();
for (int i = size; i < num; i++) {
logger.info("increasing cluster size from {} to {}", size, num);
NodeAndClient buildNode = buildNode();
buildNode.node().start();
nodes.put(buildNode.name, buildNode);
publishNode(buildNode);
}
}
@ -227,13 +242,43 @@ public class TestCluster implements Closeable {
return getOrBuildRandomNode().client();
}
private synchronized Client masterClient() { // should we expose this?
Collection<NodeAndClient> values = nodes.values();
for (NodeAndClient nodeAndClient : values) {
ClusterService instance = getInstance(ClusterService.class, (InternalNode)nodeAndClient.node());
if (instance.state().getNodes().getMasterNode().id().equals(instance.localNode().id())) {
return nodeAndClient.client(random);
public synchronized Client masterClient() {
ensureOpen();
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new MasterNodePredicate(getMasterName()));
if (randomNodeAndClient != null) {
return randomNodeAndClient.client(random);
}
return null;
}
public synchronized Client nonMasterClient() {
ensureOpen();
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(Predicates.not(new MasterNodePredicate(getMasterName())));
if (randomNodeAndClient != null) {
return randomNodeAndClient.client(random);
}
return null;
}
public synchronized Client clientNodeClient() {
ensureOpen();
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new ClientNodePredicate());
if (randomNodeAndClient != null) {
return randomNodeAndClient.client(random);
}
return null;
}
public synchronized Client client(final Predicate<Settings> filterPredicate) {
ensureOpen();
final NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new Predicate<NodeAndClient>() {
@Override
public boolean apply(NodeAndClient nodeAndClient) {
return filterPredicate.apply(nodeAndClient.node.settings());
}
});
if (randomNodeAndClient != null) {
return randomNodeAndClient.client(random);
}
return null;
}
@ -243,9 +288,6 @@ public class TestCluster implements Closeable {
if (this.open.compareAndSet(true, false)) {
IOUtils.closeWhileHandlingException(nodes.values());
nodes.clear();
if (clientNode != null) {
IOUtils.closeWhileHandlingException(clientNode);
}
}
}
@ -261,14 +303,14 @@ public class TestCluster implements Closeable {
}
private final class NodeAndClient implements Closeable {
private final Node node;
private final InternalNode node;
private Client client;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final ClientFactory clientFactory;
private final String name;
NodeAndClient(String name, Node node, ClientFactory factory) {
this.node = node;
this.node = (InternalNode)node;
this.name = name;
this.clientFactory = factory;
}
@ -357,6 +399,7 @@ public class TestCluster implements Closeable {
public synchronized void beforeTest(Random random) {
this.random = new Random(random.nextLong());
resetClients(); /* reset all clients - each test gets it's own client based on the Random instance created above. */
wipeDataDirectories();
if (nextNodeId.get() == sharedNodesSeeds.length && nodes.size() == sharedNodesSeeds.length) {
logger.debug("Cluster hasn't changed - moving out - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), sharedNodesSeeds.length);
return;
@ -396,7 +439,7 @@ public class TestCluster implements Closeable {
}
nodes.clear();
for (NodeAndClient nodeAndClient : sharedNodes) {
nodes.put(nodeAndClient.name, nodeAndClient);
publishNode(nodeAndClient);
}
nextNodeId.set(sharedNodesSeeds.length);
assert numNodes() == sharedNodesSeeds.length;
@ -406,21 +449,53 @@ public class TestCluster implements Closeable {
logger.debug("Cluster is consistent again - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), sharedNodesSeeds.length);
}
public synchronized void afterTest() {
wipeDataDirectories();
}
private void resetClients() {
final Collection<NodeAndClient> nodesAndClients = nodes.values();
for (NodeAndClient nodeAndClient : nodesAndClients) {
nodeAndClient.resetClient();
}
}
private void wipeDataDirectories() {
if (!dataDirToClean.isEmpty()) {
logger.info("Wipe data directory for all nodes locations: {}", this.dataDirToClean);
try {
FileSystemUtils.deleteRecursively(dataDirToClean.toArray(new File[0]));
} finally {
this.dataDirToClean.clear();
}
}
}
public synchronized ClusterService clusterService() {
return getInstance(ClusterService.class);
}
public synchronized <T> T getInstance(Class<T> clazz) {
return getInstance(clazz, ((InternalNode) getOrBuildRandomNode()));
public synchronized <T> T getInstance(Class<T> clazz, final String node) {
final Predicate<TestCluster.NodeAndClient> predicate;
if (node != null) {
predicate = new Predicate<TestCluster.NodeAndClient>() {
public boolean apply(NodeAndClient nodeAndClient) {
return node.equals(nodeAndClient.name);
}
};
} else {
predicate = Predicates.alwaysTrue();
}
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(predicate);
assert randomNodeAndClient != null;
return getInstanceFromNode(clazz, randomNodeAndClient.node);
}
private synchronized <T> T getInstance(Class<T> clazz, InternalNode node) {
public synchronized <T> T getInstance(Class<T> clazz) {
return getInstance(clazz, null);
}
private synchronized <T> T getInstanceFromNode(Class<T> clazz, InternalNode node) {
return node.injector().getInstance(clazz);
}
@ -432,40 +507,54 @@ public class TestCluster implements Closeable {
ensureOpen();
NodeAndClient nodeAndClient = getRandomNodeAndClient();
if (nodeAndClient != null) {
logger.info("Closing random node [{}] ", nodeAndClient.name);
nodes.remove(nodeAndClient.name);
nodeAndClient.close();
}
}
public synchronized Iterable<Client> clients() {
final Map<String, NodeAndClient> nodes = this.nodes;
return new Iterable<Client>() {
public synchronized void stopRandomNode(final Predicate<Settings> filter) {
ensureOpen();
NodeAndClient nodeAndClient = getRandomNodeAndClient(new Predicate<TestCluster.NodeAndClient>() {
@Override
public Iterator<Client> iterator() {
final Iterator<NodeAndClient> iterator = nodes.values().iterator();
return new Iterator<Client>() {
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public Client next() {
return iterator.next().client(random);
}
@Override
public void remove() {
throw new UnsupportedOperationException("");
}
};
public boolean apply(NodeAndClient nodeAndClient) {
return filter.apply(nodeAndClient.node.settings());
}
};
});
if (nodeAndClient != null) {
logger.info("Closing filtered random node [{}] ", nodeAndClient.name);
nodes.remove(nodeAndClient.name);
nodeAndClient.close();
}
}
public synchronized void stopCurrentMasterNode() {
ensureOpen();
assert numNodes() > 0;
String masterNodeName = getMasterName();
assert nodes.containsKey(masterNodeName);
logger.info("Closing master node [{}] ", masterNodeName);
NodeAndClient remove = nodes.remove(masterNodeName);
remove.close();
}
public void stopRandomNonMasterNode() {
NodeAndClient nodeAndClient = getRandomNodeAndClient(Predicates.not(new MasterNodePredicate(getMasterName())));
if (nodeAndClient != null) {
logger.info("Closing random non master node [{}] current master [{}] ", nodeAndClient.name, getMasterName());
nodes.remove(nodeAndClient.name);
nodeAndClient.close();
}
}
private String getMasterName() {
try {
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
return state.nodes().masterNode().name();
} catch (Throwable e) {
logger.warn("Can't fetch cluster state" , e);
throw new RuntimeException("Can't get master node " + e.getMessage(), e);
}
}
public synchronized Set<String> allButN(int numNodes) {
@ -477,19 +566,9 @@ public class TestCluster implements Closeable {
return Sets.newHashSet(Iterators.limit(this.nodes.keySet().iterator(), numNodes));
}
private synchronized Client nodeClient() {
public synchronized void startNodeClient(Settings settings) {
ensureOpen(); // currently unused
if (clientNode == null) {
String name = "client_node";
Settings finalSettings = settingsBuilder().put(defaultSettings).put("name", name)
.build();
Node node = nodeBuilder().settings(finalSettings).client(true).build();
node.start();
this.clientNode = new NodeAndClient(name, node, new ClientFactory());
}
return clientNode.client(random);
startNode(settingsBuilder().put(settings).put("node.client", true));
}
public synchronized Set<String> nodesInclude(String index) {
@ -528,22 +607,83 @@ public class TestCluster implements Closeable {
public String startNode() {
return startNode(ImmutableSettings.EMPTY);
}
public String startNode(Settings.Builder settings) {
return startNode(settings.build());
}
public String startNode(Settings settings) {
NodeAndClient buildNode = buildNode(settings);
nodes.put(buildNode.name, buildNode);
buildNode.node().start();
publishNode(buildNode);
return buildNode.name;
}
private void publishNode(NodeAndClient nodeAndClient) {
assert !nodeAndClient.node().isClosed();
NodeEnvironment nodeEnv = getInstanceFromNode(NodeEnvironment.class, nodeAndClient.node);
if (nodeEnv.hasNodeFile()) {
dataDirToClean.addAll(Arrays.asList(nodeEnv.nodeDataLocations()));
}
nodes.put(nodeAndClient.name, nodeAndClient);
}
public void resetAllGateways() throws Exception {
Collection<NodeAndClient> values = this.nodes.values();
for (NodeAndClient nodeAndClient : values) {
getInstance(Gateway.class, ((InternalNode) nodeAndClient.node)).reset();
getInstanceFromNode(Gateway.class, ((InternalNode) nodeAndClient.node)).reset();
}
}
public void closeAllNodesAndReset() {
beforeTest(random);
}
private static final class MasterNodePredicate implements Predicate<NodeAndClient> {
private final String masterNodeName;
public MasterNodePredicate(String masterNodeName) {
this.masterNodeName = masterNodeName;
}
@Override
public boolean apply(NodeAndClient nodeAndClient) {
return masterNodeName.equals(nodeAndClient.name);
}
}
private static final class ClientNodePredicate implements Predicate<NodeAndClient> {
@Override
public boolean apply(NodeAndClient nodeAndClient) {
return nodeAndClient.node.settings().getAsBoolean("node.client", false);
}
}
@Override
public synchronized Iterator<Client> iterator() {
ensureOpen();
final Iterator<NodeAndClient> iterator = nodes.values().iterator();
return new Iterator<Client>() {
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public Client next() {
return iterator.next().client(random);
}
@Override
public void remove() {
throw new UnsupportedOperationException("");
}
};
}
}

View File

@ -203,7 +203,7 @@ public class SimpleValidateQueryTests extends AbstractIntegrationTest {
for (Client client : cluster().clients()) {
for (Client client : cluster()) {
ValidateQueryResponse response = client.admin().indices().prepareValidateQuery("test")
.setQuery("foo".getBytes(Charsets.UTF_8))
.setExplain(true)
@ -215,7 +215,7 @@ public class SimpleValidateQueryTests extends AbstractIntegrationTest {
}
for (Client client : cluster().clients()) {
for (Client client : cluster()) {
ValidateQueryResponse response = client.admin().indices().prepareValidateQuery("test")
.setQuery(QueryBuilders.queryString("foo"))
.setExplain(true)