cut over more tests to AbstractIntegrationTest

This commit is contained in:
Simon Willnauer 2013-09-23 15:53:42 +02:00
parent 8eab51047f
commit ac5120e722
5 changed files with 133 additions and 174 deletions

View File

@ -27,10 +27,9 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.AbstractNodesTests;
import org.junit.After;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.junit.Test;
import java.lang.ref.WeakReference;
@ -41,33 +40,28 @@ import static org.hamcrest.Matchers.nullValue;
/**
*/
public class IndicesLeaksTests extends AbstractNodesTests {
@ClusterScope(scope=Scope.TEST, numNodes=1)
public class IndicesLeaksTests extends AbstractIntegrationTest {
@After
public void closeNodes() {
closeAllNodes();
}
@SuppressWarnings({"ConstantConditions", "unchecked"})
@Test
@BadApple
public void testIndexShardLifecycleLeak() throws Exception {
Node node = startNode("node1");
node.client().admin().indices().prepareCreate("test")
client().admin().indices().prepareCreate("test")
.setSettings(ImmutableSettings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
.execute().actionGet();
node.client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
IndicesService indicesService = ((InternalNode) node).injector().getInstance(IndicesService.class);
IndicesService indicesService = cluster().getInstance(IndicesService.class);
IndexService indexService = indicesService.indexServiceSafe("test");
Injector indexInjector = indexService.injector();
IndexShard shard = indexService.shardSafe(0);
Injector shardInjector = indexService.shardInjector(0);
performCommonOperations(node);
performCommonOperations();
List<WeakReference> indexReferences = new ArrayList<WeakReference>();
List<WeakReference> shardReferences = new ArrayList<WeakReference>();
@ -97,7 +91,7 @@ public class IndicesLeaksTests extends AbstractNodesTests {
shard = null;
shardInjector = null;
node.client().admin().indices().prepareDelete().execute().actionGet();
client().admin().indices().prepareDelete().execute().actionGet();
for (int i = 0; i < 100; i++) {
System.gc();
@ -130,10 +124,10 @@ public class IndicesLeaksTests extends AbstractNodesTests {
}
}
private void performCommonOperations(Node node) {
node.client().prepareIndex("test", "type", "1").setSource("field1", "value", "field2", 2, "field3", 3.0f).execute().actionGet();
node.client().admin().indices().prepareRefresh().execute().actionGet();
node.client().prepareSearch("test").setQuery(QueryBuilders.queryString("field1:value")).execute().actionGet();
node.client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field1", "value")).execute().actionGet();
private void performCommonOperations() {
client().prepareIndex("test", "type", "1").setSource("field1", "value", "field2", 2, "field3", 3.0f).execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
client().prepareSearch("test").setQuery(QueryBuilders.queryString("field1:value")).execute().actionGet();
client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field1", "value")).execute().actionGet();
}
}

View File

@ -21,12 +21,12 @@ package org.elasticsearch.indices.store;
import com.google.common.base.Predicate;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.AbstractNodesTests;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.junit.Test;
import java.io.File;
@ -39,28 +39,14 @@ import static org.hamcrest.Matchers.equalTo;
/**
*
*/
public class IndicesStoreTests extends AbstractNodesTests {
@Override
protected Settings getClassDefaultSettings() {
// The default (none) gateway cleans the shards on closing
return settingsBuilder().put("gateway.type", "local").build();
}
@Override
protected void beforeClass() {
startNode("server1");
startNode("server2");
}
@Override
public Client client() {
return client("server1");
}
@ClusterScope(scope=Scope.TEST, numNodes=0)
public class IndicesStoreTests extends AbstractIntegrationTest {
private static final Settings SETTINGS = settingsBuilder().put("gateway.type", "local").build();
@Test
public void shardsCleanup() throws Exception {
final String node_1 = cluster().startNode(SETTINGS);
final String node_2 = cluster().startNode(SETTINGS);
logger.info("--> creating index [test] with one shard and on replica");
client().admin().indices().create(createIndexRequest("test")
.settings(settingsBuilder().put("index.numberOfReplicas", 1).put("index.numberOfShards", 1))).actionGet();
@ -71,19 +57,24 @@ public class IndicesStoreTests extends AbstractNodesTests {
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
logger.info("--> making sure that shard and it's replica are allocated on server1 and server2");
assertThat(shardDirectory("server1", "test", 0).exists(), equalTo(true));
assertThat(shardDirectory("server2", "test", 0).exists(), equalTo(true));
logger.info("--> making sure that shard and it's replica are allocated on node_1 and node_2");
assertThat(shardDirectory(node_1, "test", 0).exists(), equalTo(true));
assertThat(shardDirectory(node_2, "test", 0).exists(), equalTo(true));
logger.info("--> starting node server3");
startNode("server3");
String node_3 = cluster().startNode(SETTINGS);
logger.info("--> making sure that shard is not allocated on server3");
assertThat(waitForShardDeletion("server3", "test", 0), equalTo(false));
assertThat(waitForShardDeletion(node_3, "test", 0), equalTo(false));
File server2Shard = shardDirectory("server2", "test", 0);
logger.info("--> stopping node server2");
closeNode("server2");
File server2Shard = shardDirectory(node_2, "test", 0);
logger.info("--> stopping node node_2");
cluster().stopRandomNode(new Predicate<Settings>() {
public boolean apply(Settings settings) {
return settings.get("name").equals(node_2);
}
});
assertThat(server2Shard.exists(), equalTo(true));
logger.info("--> running cluster_health");
@ -92,27 +83,26 @@ public class IndicesStoreTests extends AbstractNodesTests {
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
logger.info("--> making sure that shard and it's replica exist on server1, server2 and server3");
assertThat(shardDirectory("server1", "test", 0).exists(), equalTo(true));
assertThat(shardDirectory(node_1, "test", 0).exists(), equalTo(true));
assertThat(server2Shard.exists(), equalTo(true));
assertThat(shardDirectory("server3", "test", 0).exists(), equalTo(true));
assertThat(shardDirectory(node_3, "test", 0).exists(), equalTo(true));
logger.info("--> starting node server2");
startNode("server2");
logger.info("--> starting node node_4");
final String node_4 = cluster().startNode(SETTINGS);
logger.info("--> running cluster_health");
clusterHealth = client("server2").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
logger.info("--> making sure that shard and it's replica are allocated on server1 and server3 but not on server2");
assertThat(shardDirectory("server1", "test", 0).exists(), equalTo(true));
assertThat(shardDirectory("server3", "test", 0).exists(), equalTo(true));
assertThat(waitForShardDeletion("server2", "test", 0), equalTo(false));
assertThat(shardDirectory(node_1, "test", 0).exists(), equalTo(true));
assertThat(shardDirectory(node_3, "test", 0).exists(), equalTo(true));
assertThat(waitForShardDeletion(node_4, "test", 0), equalTo(false));
}
private File shardDirectory(String server, String index, int shard) {
InternalNode node = ((InternalNode) node(server));
NodeEnvironment env = node.injector().getInstance(NodeEnvironment.class);
NodeEnvironment env = cluster().getInstance(NodeEnvironment.class, server);
return env.shardLocations(new ShardId(index, shard))[0];
}

View File

@ -22,12 +22,10 @@ package org.elasticsearch.indices.store;
import org.apache.lucene.store.Directory;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.AbstractNodesTests;
import org.junit.After;
import org.junit.Before;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.junit.Test;
import java.io.File;
@ -39,86 +37,73 @@ import static org.hamcrest.Matchers.*;
/**
*
*/
public class SimpleDistributorTests extends AbstractNodesTests {
protected Environment environment;
@Before
public void getTestEnvironment() {
environment = ((InternalNode) startNode("node0")).injector().getInstance(Environment.class);
closeNode("node0");
}
@After
public void closeNodes() {
closeAllNodes();
}
@ClusterScope(scope=Scope.TEST, numNodes = 1)
public class SimpleDistributorTests extends AbstractIntegrationTest {
public final static String[] STORE_TYPES = {"fs", "simplefs", "niofs", "mmapfs"};
@Test
public void testAvailableSpaceDetection() {
File dataRoot = environment.dataFiles()[0];
startNode("node1", settingsBuilder().putArray("path.data", new File(dataRoot, "data1").getAbsolutePath(), new File(dataRoot, "data2").getAbsolutePath()));
File dataRoot = cluster().getInstance(Environment.class).dataFiles()[0];
cluster().stopRandomNode();
cluster().startNode(settingsBuilder().putArray("path.data", new File(dataRoot, "data1").getAbsolutePath(), new File(dataRoot, "data2").getAbsolutePath()));
for (String store : STORE_TYPES) {
createIndexWithStoreType("node1", "test", store, StrictDistributor.class.getCanonicalName());
createIndexWithStoreType("test", store, StrictDistributor.class.getCanonicalName());
}
}
@Test
public void testDirectoryToString() throws IOException {
File dataRoot = environment.dataFiles()[0];
File dataRoot = cluster().getInstance(Environment.class).dataFiles()[0];
String dataPath1 = new File(dataRoot, "data1").getCanonicalPath();
String dataPath2 = new File(dataRoot, "data2").getCanonicalPath();
startNode("node1", settingsBuilder().putArray("path.data", dataPath1, dataPath2));
cluster().stopRandomNode();
cluster().startNode(settingsBuilder().putArray("path.data", dataPath1, dataPath2));
createIndexWithStoreType("node1", "test", "niofs", "least_used");
String storeString = getStoreDirectory("node1", "test", 0).toString();
createIndexWithStoreType("test", "niofs", "least_used");
String storeString = getStoreDirectory("test", 0).toString();
logger.info(storeString);
assertThat(storeString, startsWith("store(least_used[rate_limited(niofs(" + dataPath1));
assertThat(storeString, containsString("), rate_limited(niofs(" + dataPath2));
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
createIndexWithStoreType("node1", "test", "niofs", "random");
storeString = getStoreDirectory("node1", "test", 0).toString();
createIndexWithStoreType("test", "niofs", "random");
storeString = getStoreDirectory("test", 0).toString();
logger.info(storeString);
assertThat(storeString, startsWith("store(random[rate_limited(niofs(" + dataPath1));
assertThat(storeString, containsString("), rate_limited(niofs(" + dataPath2));
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
createIndexWithStoreType("node1", "test", "mmapfs", "least_used");
storeString = getStoreDirectory("node1", "test", 0).toString();
createIndexWithStoreType("test", "mmapfs", "least_used");
storeString = getStoreDirectory("test", 0).toString();
logger.info(storeString);
assertThat(storeString, startsWith("store(least_used[rate_limited(mmapfs(" + dataPath1));
assertThat(storeString, containsString("), rate_limited(mmapfs(" + dataPath2));
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
createIndexWithStoreType("node1", "test", "simplefs", "least_used");
storeString = getStoreDirectory("node1", "test", 0).toString();
createIndexWithStoreType("test", "simplefs", "least_used");
storeString = getStoreDirectory("test", 0).toString();
logger.info(storeString);
assertThat(storeString, startsWith("store(least_used[rate_limited(simplefs(" + dataPath1));
assertThat(storeString, containsString("), rate_limited(simplefs(" + dataPath2));
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
createIndexWithStoreType("node1", "test", "memory", "least_used");
storeString = getStoreDirectory("node1", "test", 0).toString();
createIndexWithStoreType("test", "memory", "least_used");
storeString = getStoreDirectory("test", 0).toString();
logger.info(storeString);
assertThat(storeString, equalTo("store(least_used[byte_buffer])"));
createIndexWithoutRateLimitingStoreType("node1", "test", "niofs", "least_used");
storeString = getStoreDirectory("node1", "test", 0).toString();
createIndexWithoutRateLimitingStoreType("test", "niofs", "least_used");
storeString = getStoreDirectory("test", 0).toString();
logger.info(storeString);
assertThat(storeString, startsWith("store(least_used[niofs(" + dataPath1));
assertThat(storeString, containsString("), niofs(" + dataPath2));
assertThat(storeString, endsWith(")])"));
}
private void createIndexWithStoreType(String nodeId, String index, String storeType, String distributor) {
try {
client(nodeId).admin().indices().prepareDelete(index).execute().actionGet();
} catch (IndexMissingException ex) {
// Ignore
}
client(nodeId).admin().indices().prepareCreate(index)
private void createIndexWithStoreType(String index, String storeType, String distributor) {
wipeIndex(index);
client().admin().indices().prepareCreate(index)
.setSettings(settingsBuilder()
.put("index.store.distributor", distributor)
.put("index.store.type", storeType)
@ -126,16 +111,12 @@ public class SimpleDistributorTests extends AbstractNodesTests {
.put("index.number_of_shards", 1)
)
.execute().actionGet();
assertThat(client("node1").admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false));
assertThat(client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false));
}
private void createIndexWithoutRateLimitingStoreType(String nodeId, String index, String storeType, String distributor) {
try {
client(nodeId).admin().indices().prepareDelete(index).execute().actionGet();
} catch (IndexMissingException ex) {
// Ignore
}
client(nodeId).admin().indices().prepareCreate(index)
private void createIndexWithoutRateLimitingStoreType(String index, String storeType, String distributor) {
wipeIndex(index);
client().admin().indices().prepareCreate(index)
.setSettings(settingsBuilder()
.put("index.store.distributor", distributor)
.put("index.store.type", storeType)
@ -144,11 +125,11 @@ public class SimpleDistributorTests extends AbstractNodesTests {
.put("index.number_of_shards", 1)
)
.execute().actionGet();
assertThat(client("node1").admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false));
assertThat(client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false));
}
private Directory getStoreDirectory(String nodeId, String index, int shardId) {
IndicesService indicesService = ((InternalNode) node(nodeId)).injector().getInstance(IndicesService.class);
private Directory getStoreDirectory(String index, int shardId) {
IndicesService indicesService = cluster().getInstance(IndicesService.class);
InternalIndexShard indexShard = (InternalIndexShard) (indicesService.indexService(index).shard(shardId));
return indexShard.store().directory();
}

View File

@ -25,14 +25,14 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.PluginManager;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.helper.HttpClient;
import org.elasticsearch.rest.helper.HttpClientResponse;
import org.elasticsearch.test.AbstractNodesTests;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -43,10 +43,13 @@ import java.net.URL;
import static org.hamcrest.CoreMatchers.*;
public class PluginManagerTests extends AbstractNodesTests {
@ClusterScope(scope=Scope.TEST, numNodes=0)
public class PluginManagerTests extends AbstractIntegrationTest {
private static final Settings SETTINGS = ImmutableSettings.settingsBuilder()
.put("discovery.zen.ping.multicast.enabled", false).build();
private static final String PLUGIN_DIR = "plugins";
private static final String NODE_NAME = "plugin-test-node";
@Test
public void testLocalPluginInstallSingleFolder() throws Exception {
@ -55,10 +58,10 @@ public class PluginManagerTests extends AbstractNodesTests {
URL url = PluginManagerTests.class.getResource("plugin_single_folder.zip");
downloadAndExtract(pluginName, "file://" + url.getFile());
Node node = startNode();
String nodeName = cluster().startNode(SETTINGS);
assertPluginLoaded(pluginName);
assertPluginAvailable(node, pluginName);
assertPluginAvailable(nodeName, pluginName);
}
@Test
@ -69,10 +72,10 @@ public class PluginManagerTests extends AbstractNodesTests {
URL url = PluginManagerTests.class.getResource("plugin_folder_site.zip");
downloadAndExtract(pluginName, "file://" + url.getFile());
Node node = startNode();
String nodeName = cluster().startNode(SETTINGS);
assertPluginLoaded(pluginName);
assertPluginAvailable(node, pluginName);
assertPluginAvailable(nodeName, pluginName);
}
@Test
@ -82,10 +85,10 @@ public class PluginManagerTests extends AbstractNodesTests {
URL url = PluginManagerTests.class.getResource("plugin_without_folders.zip");
downloadAndExtract(pluginName, "file://" + url.getFile());
Node node = startNode();
String nodeName = cluster().startNode(SETTINGS);
assertPluginLoaded(pluginName);
assertPluginAvailable(node, pluginName);
assertPluginAvailable(nodeName, pluginName);
}
@Test
@ -95,10 +98,10 @@ public class PluginManagerTests extends AbstractNodesTests {
URL url = PluginManagerTests.class.getResource("plugin_folder_file.zip");
downloadAndExtract(pluginName, "file://" + url.getFile());
Node node = startNode();
String nodeName = cluster().startNode(SETTINGS);
assertPluginLoaded(pluginName);
assertPluginAvailable(node, pluginName);
assertPluginAvailable(nodeName, pluginName);
}
private static PluginManager pluginManager(String pluginUrl) {
@ -114,11 +117,6 @@ public class PluginManagerTests extends AbstractNodesTests {
pluginManager(pluginUrl).downloadAndExtract(pluginName);
}
private Node startNode() {
return startNode(NODE_NAME, ImmutableSettings.settingsBuilder()
.put("discovery.zen.ping.multicast.enabled", false));
}
private void assertPluginLoaded(String pluginName) {
NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().clear().setPlugin(true).get();
assertThat(nodesInfoResponse.getNodes().length, equalTo(1));
@ -128,27 +126,26 @@ public class PluginManagerTests extends AbstractNodesTests {
assertThat(nodesInfoResponse.getNodes()[0].getPlugins().getInfos().get(0).isSite(), equalTo(true));
}
private void assertPluginAvailable(Node node, String pluginName) {
HttpServerTransport httpServerTransport = ((InternalNode) node).injector().getInstance(HttpServerTransport.class);
private void assertPluginAvailable(String nodeName, String pluginName) {
HttpServerTransport httpServerTransport = cluster().getInstance(HttpServerTransport.class);
HttpClient httpClient = new HttpClient(httpServerTransport.boundAddress().publishAddress());
//checking that the http connector is working properly
HttpClientResponse response = httpClient.request("");
assertThat(response.errorCode(), equalTo(RestStatus.OK.getStatus()));
assertThat(response.response(), containsString(NODE_NAME));
assertThat(response.response(), containsString(nodeName));
//checking now that the plugin is available
response = httpClient.request("_plugin/" + pluginName + "/");
assertThat(response.errorCode(), equalTo(RestStatus.OK.getStatus()));
}
@Before
public void before() {
public void beforeTest() {
deletePluginsFolder();
}
@After
public void after() {
public void afterTest() {
deletePluginsFolder();
closeAllNodes();
}
private void deletePluginsFolder() {

View File

@ -22,8 +22,9 @@ package org.elasticsearch.recovery;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.test.AbstractNodesTests;
import org.junit.After;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.junit.Test;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@ -32,73 +33,69 @@ import static org.hamcrest.Matchers.equalTo;
/**
*
*/
public class FullRollingRestartTests extends AbstractNodesTests {
@After
public void shutdownNodes() {
closeAllNodes();
}
@ClusterScope(scope=Scope.TEST, numNodes = 0)
public class FullRollingRestartTests extends AbstractIntegrationTest {
@Test
@Slow
public void testFullRollingRestart() throws Exception {
startNode("node1");
client("node1").admin().indices().prepareCreate("test").execute().actionGet();
cluster().startNode();
client().admin().indices().prepareCreate("test").execute().actionGet();
for (int i = 0; i < 1000; i++) {
client("node1").prepareIndex("test", "type1", Long.toString(i))
client().prepareIndex("test", "type1", Long.toString(i))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + i).map()).execute().actionGet();
}
client("node1").admin().indices().prepareFlush().execute().actionGet();
client().admin().indices().prepareFlush().execute().actionGet();
for (int i = 1000; i < 2000; i++) {
client("node1").prepareIndex("test", "type1", Long.toString(i))
client().prepareIndex("test", "type1", Long.toString(i))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + i).map()).execute().actionGet();
}
// now start adding nodes
startNode("node2");
startNode("node3");
cluster().startNode();
cluster().startNode();
// make sure the cluster state is green, and all has been recovered
assertThat(client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("3").execute().actionGet().isTimedOut(), equalTo(false));
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("3").execute().actionGet().isTimedOut(), equalTo(false));
// now start adding nodes
startNode("node4");
startNode("node5");
cluster().startNode();
cluster().startNode();
// make sure the cluster state is green, and all has been recovered
assertThat(client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("5").execute().actionGet().isTimedOut(), equalTo(false));
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("5").execute().actionGet().isTimedOut(), equalTo(false));
client("node1").admin().indices().prepareRefresh().execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) {
assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(2000l));
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(2000l));
}
// now start shutting nodes down
closeNode("node1");
cluster().stopRandomNode();
// make sure the cluster state is green, and all has been recovered
assertThat(client("node5").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("4").execute().actionGet().isTimedOut(), equalTo(false));
closeNode("node2");
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("4").execute().actionGet().isTimedOut(), equalTo(false));
cluster().stopRandomNode();
// make sure the cluster state is green, and all has been recovered
assertThat(client("node5").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("3").execute().actionGet().isTimedOut(), equalTo(false));
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("3").execute().actionGet().isTimedOut(), equalTo(false));
client("node5").admin().indices().prepareRefresh().execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) {
assertThat(client("node5").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(2000l));
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(2000l));
}
closeNode("node3");
cluster().stopRandomNode();
// make sure the cluster state is green, and all has been recovered
assertThat(client("node5").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("2").execute().actionGet().isTimedOut(), equalTo(false));
closeNode("node4");
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("2").execute().actionGet().isTimedOut(), equalTo(false));
cluster().stopRandomNode();
// make sure the cluster state is green, and all has been recovered
assertThat(client("node5").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForYellowStatus().setWaitForRelocatingShards(0).setWaitForNodes("1").execute().actionGet().isTimedOut(), equalTo(false));
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForYellowStatus().setWaitForRelocatingShards(0).setWaitForNodes("1").execute().actionGet().isTimedOut(), equalTo(false));
client("node5").admin().indices().prepareRefresh().execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) {
assertThat(client("node5").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(2000l));
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(2000l));
}
}
}