From e4af8c720c5d1e66b8ee313ea0d20d1453899db4 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 23 Sep 2013 18:18:27 +0200 Subject: [PATCH] cut over more tests to AbstractIntegrationTest --- .../cluster/SimpleDataNodesTests.java | 35 +++-- .../IndexLifecycleActionTests.java | 118 +++++++++------- .../elasticsearch/node/InternalNodeTests.java | 13 +- .../nodesinfo/SimpleNodesInfoTests.java | 58 ++++---- .../recovery/RelocationTests.java | 130 +++++++++--------- 5 files changed, 182 insertions(+), 172 deletions(-) diff --git a/src/test/java/org/elasticsearch/cluster/SimpleDataNodesTests.java b/src/test/java/org/elasticsearch/cluster/SimpleDataNodesTests.java index af3e0f152b9..7b12040364e 100644 --- a/src/test/java/org/elasticsearch/cluster/SimpleDataNodesTests.java +++ b/src/test/java/org/elasticsearch/cluster/SimpleDataNodesTests.java @@ -23,8 +23,9 @@ import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Requests; import org.elasticsearch.common.Priority; -import org.elasticsearch.test.AbstractNodesTests; -import org.junit.After; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import org.elasticsearch.test.AbstractIntegrationTest.Scope; import org.junit.Test; import static org.elasticsearch.client.Requests.createIndexRequest; @@ -35,40 +36,36 @@ import static org.hamcrest.Matchers.equalTo; /** * */ -public class SimpleDataNodesTests extends AbstractNodesTests { - - @After - public void closeNodes() { - closeAllNodes(); - } +@ClusterScope(scope=Scope.TEST, numNodes=0) +public class SimpleDataNodesTests extends AbstractIntegrationTest { @Test public void testDataNodes() throws Exception { - startNode("nonData1", settingsBuilder().put("node.data", false).build()); - client("nonData1").admin().indices().create(createIndexRequest("test")).actionGet(); + cluster().startNode(settingsBuilder().put("node.data", false).build()); + client().admin().indices().create(createIndexRequest("test")).actionGet(); try { - client("nonData1").index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test")).timeout(timeValueSeconds(1))).actionGet(); - assert false : "no allocation should happen"; + client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test")).timeout(timeValueSeconds(1))).actionGet(); + fail("no allocation should happen"); } catch (UnavailableShardsException e) { // all is well } - startNode("nonData2", settingsBuilder().put("node.data", false).build()); - assertThat(client("nonData2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").setLocal(true).execute().actionGet().isTimedOut(), equalTo(false)); + cluster().startNode(settingsBuilder().put("node.data", false).build()); + assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").setLocal(true).execute().actionGet().isTimedOut(), equalTo(false)); // still no shard should be allocated try { - client("nonData2").index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test")).timeout(timeValueSeconds(1))).actionGet(); - assert false : "no allocation should happen"; + client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test")).timeout(timeValueSeconds(1))).actionGet(); + fail("no allocation should happen"); } catch (UnavailableShardsException e) { // all is well } // now, start a node data, and see that it gets with shards - startNode("data1", settingsBuilder().put("node.data", true).build()); - assertThat(client("nonData2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3").setLocal(true).execute().actionGet().isTimedOut(), equalTo(false)); + cluster().startNode(settingsBuilder().put("node.data", true).build()); + assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3").setLocal(true).execute().actionGet().isTimedOut(), equalTo(false)); - IndexResponse indexResponse = client("nonData2").index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet(); + IndexResponse indexResponse = client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet(); assertThat(indexResponse.getId(), equalTo("1")); assertThat(indexResponse.getType(), equalTo("type1")); } diff --git a/src/test/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionTests.java b/src/test/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionTests.java index 047ceef5a48..42684388e07 100644 --- a/src/test/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionTests.java +++ b/src/test/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionTests.java @@ -19,6 +19,8 @@ package org.elasticsearch.indexlifecycle; +import com.carrotsearch.randomizedtesting.annotations.Nightly; +import com.google.common.base.Predicate; import org.apache.lucene.util.LuceneTestCase.Slow; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; @@ -27,13 +29,11 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.common.Priority; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.node.internal.InternalNode; -import org.elasticsearch.test.AbstractNodesTests; -import org.junit.After; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import org.elasticsearch.test.AbstractIntegrationTest.Scope; import org.junit.Test; import java.util.Map; @@ -50,16 +50,21 @@ import static org.hamcrest.Matchers.*; /** * */ -public class IndexLifecycleActionTests extends AbstractNodesTests { +@ClusterScope(scope=Scope.TEST, numNodes=0) +public class IndexLifecycleActionTests extends AbstractIntegrationTest { - private final ESLogger logger = Loggers.getLogger(IndexLifecycleActionTests.class); - - @After - public void closeNodes() { - closeAllNodes(); + @Slow + @Test + public void testIndexLifecycleActions() throws Exception { + if (randomBoolean()) { // both run with @Nightly + testIndexLifecycleActionsWith11Shards0Backup(); + } else { + testIndexLifecycleActionsWith11Shards1Backup(); + } } @Slow + @Nightly @Test public void testIndexLifecycleActionsWith11Shards1Backup() throws Exception { Settings settings = settingsBuilder() @@ -70,18 +75,15 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { // start one server logger.info("Starting sever1"); - startNode("server1", settings); - - final String node1 = getLocalNodeId("server1"); - - wipeIndices(client()); + final String server_1 = cluster().startNode(settings); + final String node1 = getLocalNodeId(server_1); logger.info("Creating index [test]"); - CreateIndexResponse createIndexResponse = client("server1").admin().indices().create(createIndexRequest("test")).actionGet(); + CreateIndexResponse createIndexResponse = client().admin().indices().create(createIndexRequest("test")).actionGet(); assertThat(createIndexResponse.isAcknowledged(), equalTo(true)); logger.info("Running Cluster Health"); - ClusterHealthResponse clusterHealth = client("server1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); @@ -92,22 +94,22 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { logger.info("Starting server2"); // start another server - startNode("server2", settings); + String server_2 = cluster().startNode(settings); // first wait for 2 nodes in the cluster logger.info("Running Cluster Health"); - clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - final String node2 = getLocalNodeId("server2"); + final String node2 = getLocalNodeId(server_2); // explicitly call reroute, so shards will get relocated to the new node (we delay it in ES in case other nodes join) - client("server1").admin().cluster().prepareReroute().execute().actionGet(); + client().admin().cluster().prepareReroute().execute().actionGet(); - clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2").waitForRelocatingShards(0)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2").waitForRelocatingShards(0)).actionGet(); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); assertThat(clusterHealth.getNumberOfDataNodes(), equalTo(2)); @@ -129,21 +131,21 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { logger.info("Starting server3"); // start another server - startNode("server3", settings); + String server_3 = cluster().startNode(settings); // first wait for 3 nodes in the cluster - clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3")).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3")).actionGet(); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - final String node3 = getLocalNodeId("server3"); + final String node3 = getLocalNodeId(server_3); // explicitly call reroute, so shards will get relocated to the new node (we delay it in ES in case other nodes join) - client("server1").admin().cluster().prepareReroute().execute().actionGet(); + client().admin().cluster().prepareReroute().execute().actionGet(); - clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3").waitForRelocatingShards(0)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3").waitForRelocatingShards(0)).actionGet(); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); assertThat(clusterHealth.getNumberOfDataNodes(), equalTo(3)); @@ -175,17 +177,22 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { logger.info("Closing server1"); // kill the first server - closeNode("server1"); + cluster().stopRandomNode(new Predicate() { + public boolean apply(Settings settings) { + return server_1.equals(settings.get("name")); + + } + }); // verify health logger.info("Running Cluster Health"); - clusterHealth = client("server2").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); client().admin().cluster().prepareReroute().get(); - clusterHealth = client("server2").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("2")).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("2")).actionGet(); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); assertThat(clusterHealth.getRelocatingShards(), equalTo(0)); @@ -208,7 +215,7 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { logger.info("Deleting index [test]"); // last, lets delete the index - DeleteIndexResponse deleteIndexResponse = client("server2").admin().indices().prepareDelete("test").execute().actionGet(); + DeleteIndexResponse deleteIndexResponse = client().admin().indices().prepareDelete("test").execute().actionGet(); assertThat(deleteIndexResponse.isAcknowledged(), equalTo(true)); clusterState = client().admin().cluster().prepareState().get().getState(); @@ -221,13 +228,14 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { } private String getLocalNodeId(String name) { - assert node(name) != null : "no node for name: " + name; - Discovery discovery = ((InternalNode) node(name)).injector().getInstance(Discovery.class); + Discovery discovery = cluster().getInstance(Discovery.class, name); String nodeId = discovery.localNode().getId(); assertThat(nodeId, not(nullValue())); return nodeId; } + @Slow + @Nightly @Test public void testIndexLifecycleActionsWith11Shards0Backup() throws Exception { @@ -239,17 +247,16 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { // start one server logger.info("Starting server1"); - startNode("server1", settings); + final String server_1 = cluster().startNode(settings); - final String node1 = getLocalNodeId("server1"); - wipeIndices(client()); + final String node1 = getLocalNodeId(server_1); logger.info("Creating index [test]"); - CreateIndexResponse createIndexResponse = client("server1").admin().indices().create(createIndexRequest("test")).actionGet(); + CreateIndexResponse createIndexResponse = client().admin().indices().create(createIndexRequest("test")).actionGet(); assertThat(createIndexResponse.isAcknowledged(), equalTo(true)); logger.info("Running Cluster Health"); - ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); @@ -264,19 +271,19 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { // start another server logger.info("Starting server2"); - startNode("server2", settings); + final String server_2 = cluster().startNode(settings); // first wait for 2 nodes in the cluster - clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet(); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - final String node2 = getLocalNodeId("server2"); + final String node2 = getLocalNodeId(server_2); // explicitly call reroute, so shards will get relocated to the new node (we delay it in ES in case other nodes join) - client("server1").admin().cluster().prepareReroute().execute().actionGet(); + client().admin().cluster().prepareReroute().execute().actionGet(); - clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("2")).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("2")).actionGet(); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); assertThat(clusterHealth.getNumberOfDataNodes(), equalTo(2)); @@ -298,18 +305,18 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { // start another server logger.info("Starting server3"); - startNode("server3"); + final String server_3 = cluster().startNode(); // first wait for 3 nodes in the cluster - clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3")).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3")).actionGet(); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - final String node3 = getLocalNodeId("server3"); + final String node3 = getLocalNodeId(server_3); // explicitly call reroute, so shards will get relocated to the new node (we delay it in ES in case other nodes join) - client("server1").admin().cluster().prepareReroute().execute().actionGet(); + client().admin().cluster().prepareReroute().execute().actionGet(); - clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3").waitForRelocatingShards(0)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3").waitForRelocatingShards(0)).actionGet(); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); assertThat(clusterHealth.getNumberOfDataNodes(), equalTo(3)); @@ -340,10 +347,15 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { logger.info("Closing server1"); // kill the first server - closeNode("server1"); + cluster().stopRandomNode(new Predicate() { + public boolean apply(Settings settings) { + return server_1.equals(settings.get("name")); + + } + }); logger.info("Running Cluster Health"); - clusterHealth = client("server3").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); @@ -351,7 +363,7 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { client().admin().cluster().prepareReroute().get(); logger.info("Running Cluster Health"); - clusterHealth = client("server3").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2").waitForRelocatingShards(0)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2").waitForRelocatingShards(0)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); @@ -377,7 +389,7 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { logger.info("Deleting index [test]"); // last, lets delete the index - DeleteIndexResponse deleteIndexResponse = client("server2").admin().indices().delete(deleteIndexRequest("test")).actionGet(); + DeleteIndexResponse deleteIndexResponse = client().admin().indices().delete(deleteIndexRequest("test")).actionGet(); assertThat(deleteIndexResponse.isAcknowledged(), equalTo(true)); clusterState = client().admin().cluster().prepareState().get().getState(); diff --git a/src/test/java/org/elasticsearch/node/InternalNodeTests.java b/src/test/java/org/elasticsearch/node/InternalNodeTests.java index 77fe6f2578e..906d9951a9f 100644 --- a/src/test/java/org/elasticsearch/node/InternalNodeTests.java +++ b/src/test/java/org/elasticsearch/node/InternalNodeTests.java @@ -28,28 +28,29 @@ import org.elasticsearch.common.inject.Singleton; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.plugins.AbstractPlugin; -import org.elasticsearch.test.AbstractNodesTests; +import org.elasticsearch.test.ElasticSearchTestCase; import org.junit.Test; import java.util.ArrayList; import java.util.Collection; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.node.NodeBuilder.nodeBuilder; import static org.hamcrest.Matchers.is; -/** - * - */ -public class InternalNodeTests extends AbstractNodesTests { +public class InternalNodeTests extends ElasticSearchTestCase { @Test public void testDefaultPluginConfiguration() throws Exception { Settings settings = settingsBuilder() .put("plugin.types", TestPlugin.class.getName()) + .put("name", "test") .build(); - InternalNode node = (InternalNode) buildNode("test", settings); + InternalNode node = (InternalNode) nodeBuilder() + .settings(settings) + .build(); TestService service = node.injector().getInstance(TestService.class); assertThat(service.state.initialized(), is(true)); diff --git a/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoTests.java b/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoTests.java index 7b946b32031..5bf56fa3ab0 100644 --- a/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoTests.java +++ b/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoTests.java @@ -31,11 +31,11 @@ import org.elasticsearch.action.admin.cluster.node.info.PluginInfo; import org.elasticsearch.action.admin.cluster.node.info.PluginsInfo; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.nodesinfo.plugin.dummy1.TestPlugin; import org.elasticsearch.nodesinfo.plugin.dummy2.TestNoVersionPlugin; -import org.elasticsearch.test.AbstractNodesTests; -import org.junit.After; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import org.elasticsearch.test.AbstractIntegrationTest.Scope; import org.junit.Test; import java.io.File; @@ -54,7 +54,8 @@ import static org.hamcrest.Matchers.*; /** * */ -public class SimpleNodesInfoTests extends AbstractNodesTests { +@ClusterScope(scope=Scope.TEST, numNodes=0) +public class SimpleNodesInfoTests extends AbstractIntegrationTest { static final class Fields { static final String SITE_PLUGIN = "dummy"; @@ -62,46 +63,42 @@ public class SimpleNodesInfoTests extends AbstractNodesTests { static final String SITE_PLUGIN_NO_DESCRIPTION = "No description found for dummy."; } - @After - public void closeNodes() { - closeAllNodes(); - } @Test public void testNodesInfos() { - startNode("server1"); - startNode("server2"); + final String node_1 = cluster().startNode(); + final String node_2 = cluster().startNode(); - ClusterHealthResponse clusterHealth = client("server2").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); logger.info("--> done cluster_health, status " + clusterHealth.getStatus()); - String server1NodeId = ((InternalNode) node("server1")).injector().getInstance(ClusterService.class).state().nodes().localNodeId(); - String server2NodeId = ((InternalNode) node("server2")).injector().getInstance(ClusterService.class).state().nodes().localNodeId(); + String server1NodeId = cluster().getInstance(ClusterService.class, node_1).state().nodes().localNodeId(); + String server2NodeId = cluster().getInstance(ClusterService.class, node_2).state().nodes().localNodeId(); logger.info("--> started nodes: " + server1NodeId + " and " + server2NodeId); - NodesInfoResponse response = client("server1").admin().cluster().prepareNodesInfo().execute().actionGet(); + NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().execute().actionGet(); assertThat(response.getNodes().length, is(2)); assertThat(response.getNodesMap().get(server1NodeId), notNullValue()); assertThat(response.getNodesMap().get(server2NodeId), notNullValue()); - response = client("server2").admin().cluster().nodesInfo(nodesInfoRequest()).actionGet(); + response = client().admin().cluster().nodesInfo(nodesInfoRequest()).actionGet(); assertThat(response.getNodes().length, is(2)); assertThat(response.getNodesMap().get(server1NodeId), notNullValue()); assertThat(response.getNodesMap().get(server2NodeId), notNullValue()); - response = client("server1").admin().cluster().nodesInfo(nodesInfoRequest(server1NodeId)).actionGet(); + response = client().admin().cluster().nodesInfo(nodesInfoRequest(server1NodeId)).actionGet(); assertThat(response.getNodes().length, is(1)); assertThat(response.getNodesMap().get(server1NodeId), notNullValue()); - response = client("server2").admin().cluster().nodesInfo(nodesInfoRequest(server1NodeId)).actionGet(); + response = client().admin().cluster().nodesInfo(nodesInfoRequest(server1NodeId)).actionGet(); assertThat(response.getNodes().length, is(1)); assertThat(response.getNodesMap().get(server1NodeId), notNullValue()); - response = client("server1").admin().cluster().nodesInfo(nodesInfoRequest(server2NodeId)).actionGet(); + response = client().admin().cluster().nodesInfo(nodesInfoRequest(server2NodeId)).actionGet(); assertThat(response.getNodes().length, is(1)); assertThat(response.getNodesMap().get(server2NodeId), notNullValue()); - response = client("server2").admin().cluster().nodesInfo(nodesInfoRequest(server2NodeId)).actionGet(); + response = client().admin().cluster().nodesInfo(nodesInfoRequest(server2NodeId)).actionGet(); assertThat(response.getNodes().length, is(1)); assertThat(response.getNodesMap().get(server2NodeId), notNullValue()); } @@ -121,18 +118,18 @@ public class SimpleNodesInfoTests extends AbstractNodesTests { public void testNodeInfoPlugin() throws URISyntaxException { // We start four nodes // The first has no plugin - String server1NodeId = startNodeWithPlugins("node1"); + String server1NodeId = startNodeWithPlugins(1); // The second has one site plugin with a es-plugin.properties file (description and version) - String server2NodeId = startNodeWithPlugins("node2"); + String server2NodeId = startNodeWithPlugins(2); // The third has one java plugin - String server3NodeId = startNodeWithPlugins("node3", TestPlugin.class.getName()); + String server3NodeId = startNodeWithPlugins(3,TestPlugin.class.getName()); // The fourth has one java plugin and one site plugin - String server4NodeId = startNodeWithPlugins("node4", TestNoVersionPlugin.class.getName()); + String server4NodeId = startNodeWithPlugins(4,TestNoVersionPlugin.class.getName()); - ClusterHealthResponse clusterHealth = client("node4").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); logger.info("--> done cluster_health, status " + clusterHealth.getStatus()); - NodesInfoResponse response = client("node1").admin().cluster().prepareNodesInfo().setPlugin(true).execute().actionGet(); + NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().setPlugin(true).execute().actionGet(); logger.info("--> full json answer, status " + response.toString()); assertNodeContainsPlugins(response, server1NodeId, Collections.EMPTY_LIST, Collections.EMPTY_LIST, @@ -194,8 +191,8 @@ public class SimpleNodesInfoTests extends AbstractNodesTests { assertThat(sitePluginUrls, not(contains(nullValue()))); } - private String startNodeWithPlugins(String name, String ... pluginClassNames) throws URISyntaxException { - URL resource = SimpleNodesInfoTests.class.getResource("/org/elasticsearch/nodesinfo/" + name + "/"); + private String startNodeWithPlugins(int nodeId, String ... pluginClassNames) throws URISyntaxException { + URL resource = SimpleNodesInfoTests.class.getResource("/org/elasticsearch/nodesinfo/node" + Integer.toString(nodeId) + "/"); ImmutableSettings.Builder settings = settingsBuilder(); if (resource != null) { settings.put("path.plugins", new File(resource.toURI()).getAbsolutePath()); @@ -205,13 +202,12 @@ public class SimpleNodesInfoTests extends AbstractNodesTests { settings.putArray("plugin.types", pluginClassNames); } - startNode(name, settings); + String nodeName = cluster().startNode(settings); // We wait for a Green status - client(name).admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); - String serverNodeId = ((InternalNode) node(name)).injector() - .getInstance(ClusterService.class).state().nodes().localNodeId(); + String serverNodeId = cluster().getInstance(ClusterService.class, nodeName).state().nodes().localNodeId(); logger.debug("--> server {} started" + serverNodeId); return serverNodeId; } diff --git a/src/test/java/org/elasticsearch/recovery/RelocationTests.java b/src/test/java/org/elasticsearch/recovery/RelocationTests.java index ef56ba6d439..ecfe4f3d864 100644 --- a/src/test/java/org/elasticsearch/recovery/RelocationTests.java +++ b/src/test/java/org/elasticsearch/recovery/RelocationTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ImmutableSettings; @@ -35,8 +36,9 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; -import org.elasticsearch.test.AbstractNodesTests; -import org.junit.After; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import org.elasticsearch.test.AbstractIntegrationTest.Scope; import org.junit.Test; import java.util.concurrent.CountDownLatch; @@ -51,21 +53,18 @@ import static org.hamcrest.Matchers.equalTo; /** */ -public class RelocationTests extends AbstractNodesTests { +@ClusterScope(scope=Scope.TEST, numNodes=0) +public class RelocationTests extends AbstractIntegrationTest { private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); - @After - public void shutdownNodes() { - closeAllNodes(true); - } @Test public void testSimpleRelocationNoIndexing() { logger.info("--> starting [node1] ..."); - startNode("node1"); + final String node_1 = cluster().startNode(); logger.info("--> creating test index ..."); - client("node1").admin().indices().prepareCreate("test") + client().admin().indices().prepareCreate("test") .setSettings(ImmutableSettings.settingsBuilder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) @@ -74,37 +73,37 @@ public class RelocationTests extends AbstractNodesTests { logger.info("--> index 10 docs"); for (int i = 0; i < 10; i++) { - client("node1").prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); } logger.info("--> flush so we have an actual index"); - client("node1").admin().indices().prepareFlush().execute().actionGet(); + client().admin().indices().prepareFlush().execute().actionGet(); logger.info("--> index more docs so we have something in the translog"); for (int i = 10; i < 20; i++) { - client("node1").prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); } logger.info("--> verifying count"); - client("node1").admin().indices().prepareRefresh().execute().actionGet(); - assertThat(client("node1").prepareCount("test").execute().actionGet().getCount(), equalTo(20l)); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertThat(client().prepareCount("test").execute().actionGet().getCount(), equalTo(20l)); logger.info("--> start another node"); - startNode("node2"); - ClusterHealthResponse clusterHealthResponse = client("node2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet(); + final String node_2 = cluster().startNode(); + ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); logger.info("--> relocate the shard from node1 to node2"); - client("node1").admin().cluster().prepareReroute() - .add(new MoveAllocationCommand(new ShardId("test", 0), "node1", "node2")) + client().admin().cluster().prepareReroute() + .add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_2)) .execute().actionGet(); - clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); + clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - clusterHealthResponse = client("node2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); + clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); logger.info("--> verifying count again..."); - client("node1").admin().indices().prepareRefresh().execute().actionGet(); - assertThat(client("node1").prepareCount("test").execute().actionGet().getCount(), equalTo(20l)); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertThat(client().prepareCount("test").execute().actionGet().getCount(), equalTo(20l)); } @Test @@ -121,18 +120,19 @@ public class RelocationTests extends AbstractNodesTests { private void testPrimaryRelocationWhileIndexing(final int numberOfRelocations, final int numberOfWriters, final boolean batch) throws Exception { + String[] nodes = new String[2]; logger.info("--> starting [node1] ..."); - startNode("node1"); + nodes[0] = cluster().startNode(); logger.info("--> creating test index ..."); - client("node1").admin().indices().prepareCreate("test") + client().admin().indices().prepareCreate("test") .setSettings(settingsBuilder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) ).execute().actionGet(); logger.info("--> starting [node2] ..."); - startNode("node2"); + nodes[1] = cluster().startNode(); final AtomicLong idGenerator = new AtomicLong(); final AtomicLong indexCounter = new AtomicLong(); @@ -142,6 +142,7 @@ public class RelocationTests extends AbstractNodesTests { logger.info("--> starting {} indexing threads", writers.length); for (int i = 0; i < writers.length; i++) { + final Client perThreadClient = client(); final int indexerId = i; writers[i] = new Thread() { @Override @@ -150,13 +151,13 @@ public class RelocationTests extends AbstractNodesTests { logger.info("**** starting indexing thread {}", indexerId); while (!stop.get()) { if (batch) { - BulkRequestBuilder bulkRequest = client("node1").prepareBulk(); + BulkRequestBuilder bulkRequest = perThreadClient.prepareBulk(); for (int i = 0; i < 100; i++) { long id = idGenerator.incrementAndGet(); if (id % 1000 == 0) { - client("node1").admin().indices().prepareFlush().execute().actionGet(); + perThreadClient.admin().indices().prepareFlush().execute().actionGet(); } - bulkRequest.add(client("node1").prepareIndex("test", "type1", Long.toString(id)) + bulkRequest.add(perThreadClient.prepareIndex("test", "type1", Long.toString(id)) .setSource("test", "value" + id)); } BulkResponse bulkResponse = bulkRequest.execute().actionGet(); @@ -170,9 +171,9 @@ public class RelocationTests extends AbstractNodesTests { } else { long id = idGenerator.incrementAndGet(); if (id % 1000 == 0) { - client("node1").admin().indices().prepareFlush().execute().actionGet(); + perThreadClient.admin().indices().prepareFlush().execute().actionGet(); } - client("node1").prepareIndex("test", "type1", Long.toString(id)) + perThreadClient.prepareIndex("test", "type1", Long.toString(id)) .setSource("test", "value" + id).execute().actionGet(); indexCounter.incrementAndGet(); } @@ -189,23 +190,23 @@ public class RelocationTests extends AbstractNodesTests { } logger.info("--> waiting for 2000 docs to be indexed ..."); - while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() < 2000) { + while (client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() < 2000) { Thread.sleep(100); - client("node1").admin().indices().prepareRefresh().execute().actionGet(); + client().admin().indices().prepareRefresh().execute().actionGet(); } logger.info("--> 2000 docs indexed"); logger.info("--> starting relocations..."); for (int i = 0; i < numberOfRelocations; i++) { - String fromNode = "node" + (1 + (i % 2)); - String toNode = "node1".equals(fromNode) ? "node2" : "node1"; - logger.info("--> START relocate the shard from {} to {}", fromNode, toNode); - client("node1").admin().cluster().prepareReroute() - .add(new MoveAllocationCommand(new ShardId("test", 0), fromNode, toNode)) + int fromNode = (i % 2); + int toNode = fromNode == 0 ? 1 : 0; + logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]); + client().admin().cluster().prepareReroute() + .add(new MoveAllocationCommand(new ShardId("test", 0), nodes[fromNode], nodes[toNode])) .execute().actionGet(); - ClusterHealthResponse clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - clusterHealthResponse = client("node2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); + clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); } @@ -217,13 +218,13 @@ public class RelocationTests extends AbstractNodesTests { logger.info("--> indexing threads stopped"); logger.info("--> refreshing the index"); - client("node1").admin().indices().prepareRefresh("test").execute().actionGet(); + client().admin().indices().prepareRefresh("test").execute().actionGet(); logger.info("--> searching the index"); boolean ranOnce = false; for (int i = 0; i < 10; i++) { try { logger.info("--> START search test round {}", i + 1); - SearchHits hits = client("node1").prepareSearch("test").setQuery(matchAllQuery()).setSize((int) indexCounter.get()).setNoFields().execute().actionGet().getHits(); + SearchHits hits = client().prepareSearch("test").setQuery(matchAllQuery()).setSize((int) indexCounter.get()).setNoFields().execute().actionGet().getHits(); ranOnce = true; if (hits.totalHits() != indexCounter.get()) { int[] hitIds = new int[(int) indexCounter.get()]; @@ -269,25 +270,26 @@ public class RelocationTests extends AbstractNodesTests { private void testReplicaRelocationWhileIndexing(final int numberOfRelocations, final int numberOfWriters, final boolean batch) throws Exception { logger.info("--> starting [node1] ..."); - startNode("node1"); + String[] nodes = new String[3]; + nodes[0] = cluster().startNode(); logger.info("--> creating test index ..."); - client("node1").admin().indices().prepareCreate("test") + client().admin().indices().prepareCreate("test") .setSettings(settingsBuilder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", 1) ).execute().actionGet(); logger.info("--> starting [node2] ..."); - startNode("node2"); + nodes[1] = cluster().startNode(); - ClusterHealthResponse healthResponse = client("node2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").setWaitForGreenStatus().execute().actionGet(); + ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").setWaitForGreenStatus().execute().actionGet(); assertThat(healthResponse.isTimedOut(), equalTo(false)); logger.info("--> starting [node3] ..."); - startNode("node3"); + nodes[2] = cluster().startNode(); - healthResponse = client("node3").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3").setWaitForGreenStatus().execute().actionGet(); + healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3").setWaitForGreenStatus().execute().actionGet(); assertThat(healthResponse.isTimedOut(), equalTo(false)); final AtomicLong idGenerator = new AtomicLong(); @@ -298,21 +300,23 @@ public class RelocationTests extends AbstractNodesTests { logger.info("--> starting {} indexing threads", writers.length); for (int i = 0; i < writers.length; i++) { + final Client perThreadClient = client(); final int indexerId = i; writers[i] = new Thread() { @Override public void run() { + try { logger.info("**** starting indexing thread {}", indexerId); while (!stop.get()) { if (batch) { - BulkRequestBuilder bulkRequest = client("node1").prepareBulk(); + BulkRequestBuilder bulkRequest = perThreadClient.prepareBulk(); for (int i = 0; i < 100; i++) { long id = idGenerator.incrementAndGet(); if (id % 1000 == 0) { - client("node1").admin().indices().prepareFlush().execute().actionGet(); + perThreadClient.admin().indices().prepareFlush().execute().actionGet(); } - bulkRequest.add(client("node1").prepareIndex("test", "type1", Long.toString(id)) + bulkRequest.add(perThreadClient.prepareIndex("test", "type1", Long.toString(id)) .setSource("test", "value" + id)); } BulkResponse bulkResponse = bulkRequest.execute().actionGet(); @@ -326,9 +330,9 @@ public class RelocationTests extends AbstractNodesTests { } else { long id = idGenerator.incrementAndGet(); if (id % 1000 == 0) { - client("node1").admin().indices().prepareFlush().execute().actionGet(); + perThreadClient.admin().indices().prepareFlush().execute().actionGet(); } - client("node1").prepareIndex("test", "type1", Long.toString(id)) + perThreadClient.prepareIndex("test", "type1", Long.toString(id)) .setSource("test", "value" + id).execute().actionGet(); indexCounter.incrementAndGet(); } @@ -345,23 +349,23 @@ public class RelocationTests extends AbstractNodesTests { } logger.info("--> waiting for 2000 docs to be indexed ..."); - while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() < 2000) { + while (client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() < 2000) { Thread.sleep(100); - client("node1").admin().indices().prepareRefresh().execute().actionGet(); + client().admin().indices().prepareRefresh().execute().actionGet(); } logger.info("--> 2000 docs indexed"); logger.info("--> starting relocations..."); for (int i = 0; i < numberOfRelocations; i++) { - String fromNode = "node" + (2 + (i % 2)); - String toNode = "node2".equals(fromNode) ? "node3" : "node2"; - logger.info("--> START relocate the shard from {} to {}", fromNode, toNode); - client("node1").admin().cluster().prepareReroute() - .add(new MoveAllocationCommand(new ShardId("test", 0), fromNode, toNode)) + int fromNode = (1 + (i % 2)); + int toNode = fromNode == 1 ? 2 : 1; + logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]); + client().admin().cluster().prepareReroute() + .add(new MoveAllocationCommand(new ShardId("test", 0), nodes[fromNode], nodes[toNode])) .execute().actionGet(); - ClusterHealthResponse clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - clusterHealthResponse = client("node2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); + clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); } @@ -373,13 +377,13 @@ public class RelocationTests extends AbstractNodesTests { logger.info("--> indexing threads stopped"); logger.info("--> refreshing the index"); - client("node1").admin().indices().prepareRefresh("test").execute().actionGet(); + client().admin().indices().prepareRefresh("test").execute().actionGet(); logger.info("--> searching the index"); boolean ranOnce = false; for (int i = 0; i < 10; i++) { try { logger.info("--> START search test round {}", i + 1); - SearchHits hits = client("node1").prepareSearch("test").setQuery(matchAllQuery()).setSize((int) indexCounter.get()).setNoFields().execute().actionGet().getHits(); + SearchHits hits = client().prepareSearch("test").setQuery(matchAllQuery()).setSize((int) indexCounter.get()).setNoFields().execute().actionGet().getHits(); ranOnce = true; if (hits.totalHits() != indexCounter.get()) { int[] hitIds = new int[(int) indexCounter.get()];