cut over more tests to AbstractIntegrationTest

This commit is contained in:
Simon Willnauer 2013-09-23 18:18:27 +02:00
parent 291cb54c95
commit e4af8c720c
5 changed files with 182 additions and 172 deletions

View File

@ -23,8 +23,9 @@ import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Priority;
import org.elasticsearch.test.AbstractNodesTests;
import org.junit.After;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.junit.Test;
import static org.elasticsearch.client.Requests.createIndexRequest;
@ -35,40 +36,36 @@ import static org.hamcrest.Matchers.equalTo;
/**
*
*/
public class SimpleDataNodesTests extends AbstractNodesTests {
@After
public void closeNodes() {
closeAllNodes();
}
@ClusterScope(scope=Scope.TEST, numNodes=0)
public class SimpleDataNodesTests extends AbstractIntegrationTest {
@Test
public void testDataNodes() throws Exception {
startNode("nonData1", settingsBuilder().put("node.data", false).build());
client("nonData1").admin().indices().create(createIndexRequest("test")).actionGet();
cluster().startNode(settingsBuilder().put("node.data", false).build());
client().admin().indices().create(createIndexRequest("test")).actionGet();
try {
client("nonData1").index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test")).timeout(timeValueSeconds(1))).actionGet();
assert false : "no allocation should happen";
client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test")).timeout(timeValueSeconds(1))).actionGet();
fail("no allocation should happen");
} catch (UnavailableShardsException e) {
// all is well
}
startNode("nonData2", settingsBuilder().put("node.data", false).build());
assertThat(client("nonData2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").setLocal(true).execute().actionGet().isTimedOut(), equalTo(false));
cluster().startNode(settingsBuilder().put("node.data", false).build());
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").setLocal(true).execute().actionGet().isTimedOut(), equalTo(false));
// still no shard should be allocated
try {
client("nonData2").index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test")).timeout(timeValueSeconds(1))).actionGet();
assert false : "no allocation should happen";
client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test")).timeout(timeValueSeconds(1))).actionGet();
fail("no allocation should happen");
} catch (UnavailableShardsException e) {
// all is well
}
// now, start a node data, and see that it gets with shards
startNode("data1", settingsBuilder().put("node.data", true).build());
assertThat(client("nonData2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3").setLocal(true).execute().actionGet().isTimedOut(), equalTo(false));
cluster().startNode(settingsBuilder().put("node.data", true).build());
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3").setLocal(true).execute().actionGet().isTimedOut(), equalTo(false));
IndexResponse indexResponse = client("nonData2").index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet();
IndexResponse indexResponse = client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet();
assertThat(indexResponse.getId(), equalTo("1"));
assertThat(indexResponse.getType(), equalTo("type1"));
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.indexlifecycle;
import com.carrotsearch.randomizedtesting.annotations.Nightly;
import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
@ -27,13 +29,11 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.AbstractNodesTests;
import org.junit.After;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.junit.Test;
import java.util.Map;
@ -50,16 +50,21 @@ import static org.hamcrest.Matchers.*;
/**
*
*/
public class IndexLifecycleActionTests extends AbstractNodesTests {
@ClusterScope(scope=Scope.TEST, numNodes=0)
public class IndexLifecycleActionTests extends AbstractIntegrationTest {
private final ESLogger logger = Loggers.getLogger(IndexLifecycleActionTests.class);
@After
public void closeNodes() {
closeAllNodes();
@Slow
@Test
public void testIndexLifecycleActions() throws Exception {
if (randomBoolean()) { // both run with @Nightly
testIndexLifecycleActionsWith11Shards0Backup();
} else {
testIndexLifecycleActionsWith11Shards1Backup();
}
}
@Slow
@Nightly
@Test
public void testIndexLifecycleActionsWith11Shards1Backup() throws Exception {
Settings settings = settingsBuilder()
@ -70,18 +75,15 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
// start one server
logger.info("Starting sever1");
startNode("server1", settings);
final String node1 = getLocalNodeId("server1");
wipeIndices(client());
final String server_1 = cluster().startNode(settings);
final String node1 = getLocalNodeId(server_1);
logger.info("Creating index [test]");
CreateIndexResponse createIndexResponse = client("server1").admin().indices().create(createIndexRequest("test")).actionGet();
CreateIndexResponse createIndexResponse = client().admin().indices().create(createIndexRequest("test")).actionGet();
assertThat(createIndexResponse.isAcknowledged(), equalTo(true));
logger.info("Running Cluster Health");
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
@ -92,22 +94,22 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
logger.info("Starting server2");
// start another server
startNode("server2", settings);
String server_2 = cluster().startNode(settings);
// first wait for 2 nodes in the cluster
logger.info("Running Cluster Health");
clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
final String node2 = getLocalNodeId("server2");
final String node2 = getLocalNodeId(server_2);
// explicitly call reroute, so shards will get relocated to the new node (we delay it in ES in case other nodes join)
client("server1").admin().cluster().prepareReroute().execute().actionGet();
client().admin().cluster().prepareReroute().execute().actionGet();
clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2").waitForRelocatingShards(0)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2").waitForRelocatingShards(0)).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.getNumberOfDataNodes(), equalTo(2));
@ -129,21 +131,21 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
logger.info("Starting server3");
// start another server
startNode("server3", settings);
String server_3 = cluster().startNode(settings);
// first wait for 3 nodes in the cluster
clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3")).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3")).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
final String node3 = getLocalNodeId("server3");
final String node3 = getLocalNodeId(server_3);
// explicitly call reroute, so shards will get relocated to the new node (we delay it in ES in case other nodes join)
client("server1").admin().cluster().prepareReroute().execute().actionGet();
client().admin().cluster().prepareReroute().execute().actionGet();
clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3").waitForRelocatingShards(0)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3").waitForRelocatingShards(0)).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.getNumberOfDataNodes(), equalTo(3));
@ -175,17 +177,22 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
logger.info("Closing server1");
// kill the first server
closeNode("server1");
cluster().stopRandomNode(new Predicate<Settings>() {
public boolean apply(Settings settings) {
return server_1.equals(settings.get("name"));
}
});
// verify health
logger.info("Running Cluster Health");
clusterHealth = client("server2").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
client().admin().cluster().prepareReroute().get();
clusterHealth = client("server2").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("2")).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("2")).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.getRelocatingShards(), equalTo(0));
@ -208,7 +215,7 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
logger.info("Deleting index [test]");
// last, lets delete the index
DeleteIndexResponse deleteIndexResponse = client("server2").admin().indices().prepareDelete("test").execute().actionGet();
DeleteIndexResponse deleteIndexResponse = client().admin().indices().prepareDelete("test").execute().actionGet();
assertThat(deleteIndexResponse.isAcknowledged(), equalTo(true));
clusterState = client().admin().cluster().prepareState().get().getState();
@ -221,13 +228,14 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
}
private String getLocalNodeId(String name) {
assert node(name) != null : "no node for name: " + name;
Discovery discovery = ((InternalNode) node(name)).injector().getInstance(Discovery.class);
Discovery discovery = cluster().getInstance(Discovery.class, name);
String nodeId = discovery.localNode().getId();
assertThat(nodeId, not(nullValue()));
return nodeId;
}
@Slow
@Nightly
@Test
public void testIndexLifecycleActionsWith11Shards0Backup() throws Exception {
@ -239,17 +247,16 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
// start one server
logger.info("Starting server1");
startNode("server1", settings);
final String server_1 = cluster().startNode(settings);
final String node1 = getLocalNodeId("server1");
wipeIndices(client());
final String node1 = getLocalNodeId(server_1);
logger.info("Creating index [test]");
CreateIndexResponse createIndexResponse = client("server1").admin().indices().create(createIndexRequest("test")).actionGet();
CreateIndexResponse createIndexResponse = client().admin().indices().create(createIndexRequest("test")).actionGet();
assertThat(createIndexResponse.isAcknowledged(), equalTo(true));
logger.info("Running Cluster Health");
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
@ -264,19 +271,19 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
// start another server
logger.info("Starting server2");
startNode("server2", settings);
final String server_2 = cluster().startNode(settings);
// first wait for 2 nodes in the cluster
clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
final String node2 = getLocalNodeId("server2");
final String node2 = getLocalNodeId(server_2);
// explicitly call reroute, so shards will get relocated to the new node (we delay it in ES in case other nodes join)
client("server1").admin().cluster().prepareReroute().execute().actionGet();
client().admin().cluster().prepareReroute().execute().actionGet();
clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("2")).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("2")).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.getNumberOfDataNodes(), equalTo(2));
@ -298,18 +305,18 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
// start another server
logger.info("Starting server3");
startNode("server3");
final String server_3 = cluster().startNode();
// first wait for 3 nodes in the cluster
clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3")).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3")).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
final String node3 = getLocalNodeId("server3");
final String node3 = getLocalNodeId(server_3);
// explicitly call reroute, so shards will get relocated to the new node (we delay it in ES in case other nodes join)
client("server1").admin().cluster().prepareReroute().execute().actionGet();
client().admin().cluster().prepareReroute().execute().actionGet();
clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3").waitForRelocatingShards(0)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3").waitForRelocatingShards(0)).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.getNumberOfDataNodes(), equalTo(3));
@ -340,10 +347,15 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
logger.info("Closing server1");
// kill the first server
closeNode("server1");
cluster().stopRandomNode(new Predicate<Settings>() {
public boolean apply(Settings settings) {
return server_1.equals(settings.get("name"));
}
});
logger.info("Running Cluster Health");
clusterHealth = client("server3").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
@ -351,7 +363,7 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
client().admin().cluster().prepareReroute().get();
logger.info("Running Cluster Health");
clusterHealth = client("server3").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2").waitForRelocatingShards(0)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2").waitForRelocatingShards(0)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
@ -377,7 +389,7 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
logger.info("Deleting index [test]");
// last, lets delete the index
DeleteIndexResponse deleteIndexResponse = client("server2").admin().indices().delete(deleteIndexRequest("test")).actionGet();
DeleteIndexResponse deleteIndexResponse = client().admin().indices().delete(deleteIndexRequest("test")).actionGet();
assertThat(deleteIndexResponse.isAcknowledged(), equalTo(true));
clusterState = client().admin().cluster().prepareState().get().getState();

View File

@ -28,28 +28,29 @@ import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.test.AbstractNodesTests;
import org.elasticsearch.test.ElasticSearchTestCase;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
import static org.hamcrest.Matchers.is;
/**
*
*/
public class InternalNodeTests extends AbstractNodesTests {
public class InternalNodeTests extends ElasticSearchTestCase {
@Test
public void testDefaultPluginConfiguration() throws Exception {
Settings settings = settingsBuilder()
.put("plugin.types", TestPlugin.class.getName())
.put("name", "test")
.build();
InternalNode node = (InternalNode) buildNode("test", settings);
InternalNode node = (InternalNode) nodeBuilder()
.settings(settings)
.build();
TestService service = node.injector().getInstance(TestService.class);
assertThat(service.state.initialized(), is(true));

View File

@ -31,11 +31,11 @@ import org.elasticsearch.action.admin.cluster.node.info.PluginInfo;
import org.elasticsearch.action.admin.cluster.node.info.PluginsInfo;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.nodesinfo.plugin.dummy1.TestPlugin;
import org.elasticsearch.nodesinfo.plugin.dummy2.TestNoVersionPlugin;
import org.elasticsearch.test.AbstractNodesTests;
import org.junit.After;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.junit.Test;
import java.io.File;
@ -54,7 +54,8 @@ import static org.hamcrest.Matchers.*;
/**
*
*/
public class SimpleNodesInfoTests extends AbstractNodesTests {
@ClusterScope(scope=Scope.TEST, numNodes=0)
public class SimpleNodesInfoTests extends AbstractIntegrationTest {
static final class Fields {
static final String SITE_PLUGIN = "dummy";
@ -62,46 +63,42 @@ public class SimpleNodesInfoTests extends AbstractNodesTests {
static final String SITE_PLUGIN_NO_DESCRIPTION = "No description found for dummy.";
}
@After
public void closeNodes() {
closeAllNodes();
}
@Test
public void testNodesInfos() {
startNode("server1");
startNode("server2");
final String node_1 = cluster().startNode();
final String node_2 = cluster().startNode();
ClusterHealthResponse clusterHealth = client("server2").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
String server1NodeId = ((InternalNode) node("server1")).injector().getInstance(ClusterService.class).state().nodes().localNodeId();
String server2NodeId = ((InternalNode) node("server2")).injector().getInstance(ClusterService.class).state().nodes().localNodeId();
String server1NodeId = cluster().getInstance(ClusterService.class, node_1).state().nodes().localNodeId();
String server2NodeId = cluster().getInstance(ClusterService.class, node_2).state().nodes().localNodeId();
logger.info("--> started nodes: " + server1NodeId + " and " + server2NodeId);
NodesInfoResponse response = client("server1").admin().cluster().prepareNodesInfo().execute().actionGet();
NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().execute().actionGet();
assertThat(response.getNodes().length, is(2));
assertThat(response.getNodesMap().get(server1NodeId), notNullValue());
assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
response = client("server2").admin().cluster().nodesInfo(nodesInfoRequest()).actionGet();
response = client().admin().cluster().nodesInfo(nodesInfoRequest()).actionGet();
assertThat(response.getNodes().length, is(2));
assertThat(response.getNodesMap().get(server1NodeId), notNullValue());
assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
response = client("server1").admin().cluster().nodesInfo(nodesInfoRequest(server1NodeId)).actionGet();
response = client().admin().cluster().nodesInfo(nodesInfoRequest(server1NodeId)).actionGet();
assertThat(response.getNodes().length, is(1));
assertThat(response.getNodesMap().get(server1NodeId), notNullValue());
response = client("server2").admin().cluster().nodesInfo(nodesInfoRequest(server1NodeId)).actionGet();
response = client().admin().cluster().nodesInfo(nodesInfoRequest(server1NodeId)).actionGet();
assertThat(response.getNodes().length, is(1));
assertThat(response.getNodesMap().get(server1NodeId), notNullValue());
response = client("server1").admin().cluster().nodesInfo(nodesInfoRequest(server2NodeId)).actionGet();
response = client().admin().cluster().nodesInfo(nodesInfoRequest(server2NodeId)).actionGet();
assertThat(response.getNodes().length, is(1));
assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
response = client("server2").admin().cluster().nodesInfo(nodesInfoRequest(server2NodeId)).actionGet();
response = client().admin().cluster().nodesInfo(nodesInfoRequest(server2NodeId)).actionGet();
assertThat(response.getNodes().length, is(1));
assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
}
@ -121,18 +118,18 @@ public class SimpleNodesInfoTests extends AbstractNodesTests {
public void testNodeInfoPlugin() throws URISyntaxException {
// We start four nodes
// The first has no plugin
String server1NodeId = startNodeWithPlugins("node1");
String server1NodeId = startNodeWithPlugins(1);
// The second has one site plugin with a es-plugin.properties file (description and version)
String server2NodeId = startNodeWithPlugins("node2");
String server2NodeId = startNodeWithPlugins(2);
// The third has one java plugin
String server3NodeId = startNodeWithPlugins("node3", TestPlugin.class.getName());
String server3NodeId = startNodeWithPlugins(3,TestPlugin.class.getName());
// The fourth has one java plugin and one site plugin
String server4NodeId = startNodeWithPlugins("node4", TestNoVersionPlugin.class.getName());
String server4NodeId = startNodeWithPlugins(4,TestNoVersionPlugin.class.getName());
ClusterHealthResponse clusterHealth = client("node4").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
NodesInfoResponse response = client("node1").admin().cluster().prepareNodesInfo().setPlugin(true).execute().actionGet();
NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().setPlugin(true).execute().actionGet();
logger.info("--> full json answer, status " + response.toString());
assertNodeContainsPlugins(response, server1NodeId, Collections.EMPTY_LIST, Collections.EMPTY_LIST,
@ -194,8 +191,8 @@ public class SimpleNodesInfoTests extends AbstractNodesTests {
assertThat(sitePluginUrls, not(contains(nullValue())));
}
private String startNodeWithPlugins(String name, String ... pluginClassNames) throws URISyntaxException {
URL resource = SimpleNodesInfoTests.class.getResource("/org/elasticsearch/nodesinfo/" + name + "/");
private String startNodeWithPlugins(int nodeId, String ... pluginClassNames) throws URISyntaxException {
URL resource = SimpleNodesInfoTests.class.getResource("/org/elasticsearch/nodesinfo/node" + Integer.toString(nodeId) + "/");
ImmutableSettings.Builder settings = settingsBuilder();
if (resource != null) {
settings.put("path.plugins", new File(resource.toURI()).getAbsolutePath());
@ -205,13 +202,12 @@ public class SimpleNodesInfoTests extends AbstractNodesTests {
settings.putArray("plugin.types", pluginClassNames);
}
startNode(name, settings);
String nodeName = cluster().startNode(settings);
// We wait for a Green status
client(name).admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
String serverNodeId = ((InternalNode) node(name)).injector()
.getInstance(ClusterService.class).state().nodes().localNodeId();
String serverNodeId = cluster().getInstance(ClusterService.class, nodeName).state().nodes().localNodeId();
logger.debug("--> server {} started" + serverNodeId);
return serverNodeId;
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -35,8 +36,9 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.AbstractNodesTests;
import org.junit.After;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
@ -51,21 +53,18 @@ import static org.hamcrest.Matchers.equalTo;
/**
*/
public class RelocationTests extends AbstractNodesTests {
@ClusterScope(scope=Scope.TEST, numNodes=0)
public class RelocationTests extends AbstractIntegrationTest {
private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES);
@After
public void shutdownNodes() {
closeAllNodes(true);
}
@Test
public void testSimpleRelocationNoIndexing() {
logger.info("--> starting [node1] ...");
startNode("node1");
final String node_1 = cluster().startNode();
logger.info("--> creating test index ...");
client("node1").admin().indices().prepareCreate("test")
client().admin().indices().prepareCreate("test")
.setSettings(ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
@ -74,37 +73,37 @@ public class RelocationTests extends AbstractNodesTests {
logger.info("--> index 10 docs");
for (int i = 0; i < 10; i++) {
client("node1").prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
logger.info("--> flush so we have an actual index");
client("node1").admin().indices().prepareFlush().execute().actionGet();
client().admin().indices().prepareFlush().execute().actionGet();
logger.info("--> index more docs so we have something in the translog");
for (int i = 10; i < 20; i++) {
client("node1").prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
logger.info("--> verifying count");
client("node1").admin().indices().prepareRefresh().execute().actionGet();
assertThat(client("node1").prepareCount("test").execute().actionGet().getCount(), equalTo(20l));
client().admin().indices().prepareRefresh().execute().actionGet();
assertThat(client().prepareCount("test").execute().actionGet().getCount(), equalTo(20l));
logger.info("--> start another node");
startNode("node2");
ClusterHealthResponse clusterHealthResponse = client("node2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet();
final String node_2 = cluster().startNode();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
logger.info("--> relocate the shard from node1 to node2");
client("node1").admin().cluster().prepareReroute()
.add(new MoveAllocationCommand(new ShardId("test", 0), "node1", "node2"))
client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_2))
.execute().actionGet();
clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
clusterHealthResponse = client("node2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
logger.info("--> verifying count again...");
client("node1").admin().indices().prepareRefresh().execute().actionGet();
assertThat(client("node1").prepareCount("test").execute().actionGet().getCount(), equalTo(20l));
client().admin().indices().prepareRefresh().execute().actionGet();
assertThat(client().prepareCount("test").execute().actionGet().getCount(), equalTo(20l));
}
@Test
@ -121,18 +120,19 @@ public class RelocationTests extends AbstractNodesTests {
private void testPrimaryRelocationWhileIndexing(final int numberOfRelocations, final int numberOfWriters, final boolean batch) throws Exception {
String[] nodes = new String[2];
logger.info("--> starting [node1] ...");
startNode("node1");
nodes[0] = cluster().startNode();
logger.info("--> creating test index ...");
client("node1").admin().indices().prepareCreate("test")
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
).execute().actionGet();
logger.info("--> starting [node2] ...");
startNode("node2");
nodes[1] = cluster().startNode();
final AtomicLong idGenerator = new AtomicLong();
final AtomicLong indexCounter = new AtomicLong();
@ -142,6 +142,7 @@ public class RelocationTests extends AbstractNodesTests {
logger.info("--> starting {} indexing threads", writers.length);
for (int i = 0; i < writers.length; i++) {
final Client perThreadClient = client();
final int indexerId = i;
writers[i] = new Thread() {
@Override
@ -150,13 +151,13 @@ public class RelocationTests extends AbstractNodesTests {
logger.info("**** starting indexing thread {}", indexerId);
while (!stop.get()) {
if (batch) {
BulkRequestBuilder bulkRequest = client("node1").prepareBulk();
BulkRequestBuilder bulkRequest = perThreadClient.prepareBulk();
for (int i = 0; i < 100; i++) {
long id = idGenerator.incrementAndGet();
if (id % 1000 == 0) {
client("node1").admin().indices().prepareFlush().execute().actionGet();
perThreadClient.admin().indices().prepareFlush().execute().actionGet();
}
bulkRequest.add(client("node1").prepareIndex("test", "type1", Long.toString(id))
bulkRequest.add(perThreadClient.prepareIndex("test", "type1", Long.toString(id))
.setSource("test", "value" + id));
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
@ -170,9 +171,9 @@ public class RelocationTests extends AbstractNodesTests {
} else {
long id = idGenerator.incrementAndGet();
if (id % 1000 == 0) {
client("node1").admin().indices().prepareFlush().execute().actionGet();
perThreadClient.admin().indices().prepareFlush().execute().actionGet();
}
client("node1").prepareIndex("test", "type1", Long.toString(id))
perThreadClient.prepareIndex("test", "type1", Long.toString(id))
.setSource("test", "value" + id).execute().actionGet();
indexCounter.incrementAndGet();
}
@ -189,23 +190,23 @@ public class RelocationTests extends AbstractNodesTests {
}
logger.info("--> waiting for 2000 docs to be indexed ...");
while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() < 2000) {
while (client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() < 2000) {
Thread.sleep(100);
client("node1").admin().indices().prepareRefresh().execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
}
logger.info("--> 2000 docs indexed");
logger.info("--> starting relocations...");
for (int i = 0; i < numberOfRelocations; i++) {
String fromNode = "node" + (1 + (i % 2));
String toNode = "node1".equals(fromNode) ? "node2" : "node1";
logger.info("--> START relocate the shard from {} to {}", fromNode, toNode);
client("node1").admin().cluster().prepareReroute()
.add(new MoveAllocationCommand(new ShardId("test", 0), fromNode, toNode))
int fromNode = (i % 2);
int toNode = fromNode == 0 ? 1 : 0;
logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]);
client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand(new ShardId("test", 0), nodes[fromNode], nodes[toNode]))
.execute().actionGet();
ClusterHealthResponse clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
clusterHealthResponse = client("node2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode);
}
@ -217,13 +218,13 @@ public class RelocationTests extends AbstractNodesTests {
logger.info("--> indexing threads stopped");
logger.info("--> refreshing the index");
client("node1").admin().indices().prepareRefresh("test").execute().actionGet();
client().admin().indices().prepareRefresh("test").execute().actionGet();
logger.info("--> searching the index");
boolean ranOnce = false;
for (int i = 0; i < 10; i++) {
try {
logger.info("--> START search test round {}", i + 1);
SearchHits hits = client("node1").prepareSearch("test").setQuery(matchAllQuery()).setSize((int) indexCounter.get()).setNoFields().execute().actionGet().getHits();
SearchHits hits = client().prepareSearch("test").setQuery(matchAllQuery()).setSize((int) indexCounter.get()).setNoFields().execute().actionGet().getHits();
ranOnce = true;
if (hits.totalHits() != indexCounter.get()) {
int[] hitIds = new int[(int) indexCounter.get()];
@ -269,25 +270,26 @@ public class RelocationTests extends AbstractNodesTests {
private void testReplicaRelocationWhileIndexing(final int numberOfRelocations, final int numberOfWriters, final boolean batch) throws Exception {
logger.info("--> starting [node1] ...");
startNode("node1");
String[] nodes = new String[3];
nodes[0] = cluster().startNode();
logger.info("--> creating test index ...");
client("node1").admin().indices().prepareCreate("test")
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)
).execute().actionGet();
logger.info("--> starting [node2] ...");
startNode("node2");
nodes[1] = cluster().startNode();
ClusterHealthResponse healthResponse = client("node2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").setWaitForGreenStatus().execute().actionGet();
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").setWaitForGreenStatus().execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
logger.info("--> starting [node3] ...");
startNode("node3");
nodes[2] = cluster().startNode();
healthResponse = client("node3").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3").setWaitForGreenStatus().execute().actionGet();
healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3").setWaitForGreenStatus().execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
final AtomicLong idGenerator = new AtomicLong();
@ -298,21 +300,23 @@ public class RelocationTests extends AbstractNodesTests {
logger.info("--> starting {} indexing threads", writers.length);
for (int i = 0; i < writers.length; i++) {
final Client perThreadClient = client();
final int indexerId = i;
writers[i] = new Thread() {
@Override
public void run() {
try {
logger.info("**** starting indexing thread {}", indexerId);
while (!stop.get()) {
if (batch) {
BulkRequestBuilder bulkRequest = client("node1").prepareBulk();
BulkRequestBuilder bulkRequest = perThreadClient.prepareBulk();
for (int i = 0; i < 100; i++) {
long id = idGenerator.incrementAndGet();
if (id % 1000 == 0) {
client("node1").admin().indices().prepareFlush().execute().actionGet();
perThreadClient.admin().indices().prepareFlush().execute().actionGet();
}
bulkRequest.add(client("node1").prepareIndex("test", "type1", Long.toString(id))
bulkRequest.add(perThreadClient.prepareIndex("test", "type1", Long.toString(id))
.setSource("test", "value" + id));
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
@ -326,9 +330,9 @@ public class RelocationTests extends AbstractNodesTests {
} else {
long id = idGenerator.incrementAndGet();
if (id % 1000 == 0) {
client("node1").admin().indices().prepareFlush().execute().actionGet();
perThreadClient.admin().indices().prepareFlush().execute().actionGet();
}
client("node1").prepareIndex("test", "type1", Long.toString(id))
perThreadClient.prepareIndex("test", "type1", Long.toString(id))
.setSource("test", "value" + id).execute().actionGet();
indexCounter.incrementAndGet();
}
@ -345,23 +349,23 @@ public class RelocationTests extends AbstractNodesTests {
}
logger.info("--> waiting for 2000 docs to be indexed ...");
while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() < 2000) {
while (client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() < 2000) {
Thread.sleep(100);
client("node1").admin().indices().prepareRefresh().execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
}
logger.info("--> 2000 docs indexed");
logger.info("--> starting relocations...");
for (int i = 0; i < numberOfRelocations; i++) {
String fromNode = "node" + (2 + (i % 2));
String toNode = "node2".equals(fromNode) ? "node3" : "node2";
logger.info("--> START relocate the shard from {} to {}", fromNode, toNode);
client("node1").admin().cluster().prepareReroute()
.add(new MoveAllocationCommand(new ShardId("test", 0), fromNode, toNode))
int fromNode = (1 + (i % 2));
int toNode = fromNode == 1 ? 2 : 1;
logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]);
client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand(new ShardId("test", 0), nodes[fromNode], nodes[toNode]))
.execute().actionGet();
ClusterHealthResponse clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
clusterHealthResponse = client("node2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode);
}
@ -373,13 +377,13 @@ public class RelocationTests extends AbstractNodesTests {
logger.info("--> indexing threads stopped");
logger.info("--> refreshing the index");
client("node1").admin().indices().prepareRefresh("test").execute().actionGet();
client().admin().indices().prepareRefresh("test").execute().actionGet();
logger.info("--> searching the index");
boolean ranOnce = false;
for (int i = 0; i < 10; i++) {
try {
logger.info("--> START search test round {}", i + 1);
SearchHits hits = client("node1").prepareSearch("test").setQuery(matchAllQuery()).setSize((int) indexCounter.get()).setNoFields().execute().actionGet().getHits();
SearchHits hits = client().prepareSearch("test").setQuery(matchAllQuery()).setSize((int) indexCounter.get()).setNoFields().execute().actionGet().getHits();
ranOnce = true;
if (hits.totalHits() != indexCounter.get()) {
int[] hitIds = new int[(int) indexCounter.get()];