From 9cb55138c77d902a617a7ab5ca9d6b9f8950a5e8 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 27 Sep 2013 15:23:37 +0200 Subject: [PATCH] Cut over remaining tests to 'AbstractIntegrationTest' --- .../gateway/fs/IndexGatewayTests.java | 205 +++++---- .../local/LocalGatewayIndexStateTests.java | 397 ++++++++---------- .../local/QuorumLocalGatewayTests.java | 165 ++++---- .../SimpleRecoveryLocalGatewayTests.java | 350 +++++++-------- .../IndexLifecycleActionTests.java | 16 +- .../indices/store/IndicesStoreTests.java | 8 +- .../LocalGatewayIndicesWarmerTests.java | 86 ++-- .../percolator/RecoveryPercolatorTests.java | 4 +- .../test/AbstractNodesTests.java | 254 ----------- .../org/elasticsearch/test/TestCluster.java | 130 +++++- 10 files changed, 656 insertions(+), 959 deletions(-) delete mode 100644 src/test/java/org/elasticsearch/test/AbstractNodesTests.java diff --git a/src/test/java/org/elasticsearch/gateway/fs/IndexGatewayTests.java b/src/test/java/org/elasticsearch/gateway/fs/IndexGatewayTests.java index e09acead15b..c5c6ffd5956 100644 --- a/src/test/java/org/elasticsearch/gateway/fs/IndexGatewayTests.java +++ b/src/test/java/org/elasticsearch/gateway/fs/IndexGatewayTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.gateway.fs; import com.carrotsearch.randomizedtesting.annotations.Nightly; import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -36,12 +37,10 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings.Builder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; -import org.elasticsearch.gateway.Gateway; import org.elasticsearch.indices.IndexAlreadyExistsException; -import org.elasticsearch.node.internal.InternalNode; -import org.elasticsearch.test.AbstractNodesTests; -import org.junit.After; -import org.junit.Before; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import org.elasticsearch.test.AbstractIntegrationTest.Scope; import org.junit.Test; import static org.elasticsearch.client.Requests.*; @@ -51,65 +50,51 @@ import static org.hamcrest.Matchers.*; /** * */ -public class IndexGatewayTests extends AbstractNodesTests { +@ClusterScope(scope=Scope.TEST, numNodes=0) +public class IndexGatewayTests extends AbstractIntegrationTest { - private Settings defaultSettings; private String storeType; - - @After - public void closeNodes() throws Exception { - tearDown(); - node("server1").stop(); - // since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well - ((InternalNode) node("server1")).injector().getInstance(Gateway.class).reset(); - closeAllNodes(); - } - - @Before - public void buildNode1() throws Exception { - super.setUp(); - Builder builder = ImmutableSettings.builder(); - builder.put("cluster.routing.schedule", "100ms"); - builder.put("gateway.type", "fs"); - if (between(0, 5) == 0) { - builder.put("gateway.fs.buffer_size", between(1, 100) + "kb"); - } - if (between(0, 5) == 0) { - builder.put("gateway.fs.chunk_size", between(1, 100) + "kb"); - } - - builder.put("index.number_of_replicas", "1"); - builder.put("index.number_of_shards", rarely() ? Integer.toString(between(2, 6)) : "1"); - storeType = rarely() ? "ram" : "fs"; - builder.put("index.store.type", storeType); - defaultSettings = builder.build(); - buildNode("server1"); - // since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well - ((InternalNode) node("server1")).injector().getInstance(Gateway.class).reset(); - closeAllNodes(); - } + private final SetOnce settings = new SetOnce(); @Override - protected final Settings getClassDefaultSettings() { - return defaultSettings; + protected Settings nodeSettings(int nodeOrdinal) { + if (settings.get() == null) { + Builder builder = ImmutableSettings.builder(); + builder.put("cluster.routing.schedule", "100ms"); + builder.put("gateway.type", "fs"); + if (between(0, 5) == 0) { + builder.put("gateway.fs.buffer_size", between(1, 100) + "kb"); + } + if (between(0, 5) == 0) { + builder.put("gateway.fs.chunk_size", between(1, 100) + "kb"); + } + + builder.put("index.number_of_replicas", "1"); + builder.put("index.number_of_shards", rarely() ? Integer.toString(between(2, 6)) : "1"); + storeType = rarely() ? "ram" : "fs"; + builder.put("index.store.type", storeType); + settings.set(builder.build()); + } + return settings.get(); } + protected boolean isPersistentStorage() { assert storeType != null; - return "fs".equals(storeType); + return "fs".equals(settings.get().get("index.store.type")); } @Test @Slow public void testSnapshotOperations() throws Exception { - startNode("server1", getClassDefaultSettings()); + cluster().startNode(nodeSettings(0)); // get the environment, so we can clear the work dir when needed - Environment environment = ((InternalNode) node("server1")).injector().getInstance(Environment.class); + Environment environment = cluster().getInstance(Environment.class); logger.info("Running Cluster Health (waiting for node to startup properly)"); - ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); @@ -117,145 +102,145 @@ public class IndexGatewayTests extends AbstractNodesTests { // Translog tests logger.info("Creating index [{}]", "test"); - client("server1").admin().indices().prepareCreate("test").execute().actionGet(); + client().admin().indices().prepareCreate("test").execute().actionGet(); // create a mapping - PutMappingResponse putMappingResponse = client("server1").admin().indices().preparePutMapping("test").setType("type1").setSource(mappingSource()).execute().actionGet(); + PutMappingResponse putMappingResponse = client().admin().indices().preparePutMapping("test").setType("type1").setSource(mappingSource()).execute().actionGet(); assertThat(putMappingResponse.isAcknowledged(), equalTo(true)); // verify that mapping is there - ClusterStateResponse clusterState = client("server1").admin().cluster().state(clusterStateRequest()).actionGet(); + ClusterStateResponse clusterState = client().admin().cluster().state(clusterStateRequest()).actionGet(); assertThat(clusterState.getState().metaData().index("test").mapping("type1"), notNullValue()); // create two and delete the first logger.info("Indexing #1"); - client("server1").index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet(); + client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet(); logger.info("Indexing #2"); - client("server1").index(Requests.indexRequest("test").type("type1").id("2").source(source("2", "test"))).actionGet(); + client().index(Requests.indexRequest("test").type("type1").id("2").source(source("2", "test"))).actionGet(); // perform snapshot to the index logger.info("Gateway Snapshot"); - client("server1").admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet(); + client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet(); logger.info("Deleting #1"); - client("server1").delete(deleteRequest("test").type("type1").id("1")).actionGet(); + client().delete(deleteRequest("test").type("type1").id("1")).actionGet(); // perform snapshot to the index logger.info("Gateway Snapshot"); - client("server1").admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet(); + client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet(); logger.info("Gateway Snapshot (should be a no op)"); // do it again, it should be a no op - client("server1").admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet(); + client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet(); logger.info("Closing the server"); - closeNode("server1"); + cluster().stopRandomNode(); logger.info("Starting the server, should recover from the gateway (only translog should be populated)"); - startNode("server1"); + cluster().startNode(nodeSettings(0)); logger.info("Running Cluster Health (wait for the shards to startup)"); - clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); // verify that mapping is there - clusterState = client("server1").admin().cluster().state(clusterStateRequest()).actionGet(); + clusterState = client().admin().cluster().state(clusterStateRequest()).actionGet(); assertThat(clusterState.getState().metaData().index("test").mapping("type1"), notNullValue()); logger.info("Getting #1, should not exists"); - GetResponse getResponse = client("server1").get(getRequest("test").type("type1").id("1")).actionGet(); + GetResponse getResponse = client().get(getRequest("test").type("type1").id("1")).actionGet(); assertThat(getResponse.isExists(), equalTo(false)); logger.info("Getting #2"); - getResponse = client("server1").get(getRequest("test").type("type1").id("2")).actionGet(); + getResponse = client().get(getRequest("test").type("type1").id("2")).actionGet(); assertThat(getResponse.getSourceAsString(), equalTo(source("2", "test"))); // Now flush and add some data (so we have index recovery as well) logger.info("Flushing, so we have actual content in the index files (#2 should be in the index)"); - client("server1").admin().indices().flush(flushRequest("test")).actionGet(); + client().admin().indices().flush(flushRequest("test")).actionGet(); logger.info("Indexing #3, so we have something in the translog as well"); - client("server1").index(Requests.indexRequest("test").type("type1").id("3").source(source("3", "test"))).actionGet(); + client().index(Requests.indexRequest("test").type("type1").id("3").source(source("3", "test"))).actionGet(); logger.info("Gateway Snapshot"); - client("server1").admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet(); + client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet(); logger.info("Gateway Snapshot (should be a no op)"); - client("server1").admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet(); + client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet(); logger.info("Closing the server"); - closeNode("server1"); + cluster().stopRandomNode(); logger.info("Starting the server, should recover from the gateway (both index and translog) and reuse work dir"); - startNode("server1"); + cluster().startNode(nodeSettings(0)); logger.info("Running Cluster Health (wait for the shards to startup)"); - clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); logger.info("Getting #1, should not exists"); - getResponse = client("server1").get(getRequest("test").type("type1").id("1")).actionGet(); + getResponse = client().get(getRequest("test").type("type1").id("1")).actionGet(); assertThat(getResponse.isExists(), equalTo(false)); logger.info("Getting #2 (not from the translog, but from the index)"); - getResponse = client("server1").get(getRequest("test").type("type1").id("2")).actionGet(); + getResponse = client().get(getRequest("test").type("type1").id("2")).actionGet(); assertThat(getResponse.getSourceAsString(), equalTo(source("2", "test"))); logger.info("Getting #3 (from the translog)"); - getResponse = client("server1").get(getRequest("test").type("type1").id("3")).actionGet(); + getResponse = client().get(getRequest("test").type("type1").id("3")).actionGet(); assertThat(getResponse.getSourceAsString(), equalTo(source("3", "test"))); logger.info("Closing the server"); - closeNode("server1"); + cluster().stopRandomNode(); logger.info("Clearing cluster data dir, so there will be a full recovery from the gateway"); FileSystemUtils.deleteRecursively(environment.dataWithClusterFiles()); logger.info("Starting the server, should recover from the gateway (both index and translog) without reusing work dir"); - startNode("server1"); + cluster().startNode(nodeSettings(0)); logger.info("Running Cluster Health (wait for the shards to startup)"); - clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); logger.info("Getting #1, should not exists"); - getResponse = client("server1").get(getRequest("test").type("type1").id("1")).actionGet(); + getResponse = client().get(getRequest("test").type("type1").id("1")).actionGet(); assertThat(getResponse.isExists(), equalTo(false)); logger.info("Getting #2 (not from the translog, but from the index)"); - getResponse = client("server1").get(getRequest("test").type("type1").id("2")).actionGet(); + getResponse = client().get(getRequest("test").type("type1").id("2")).actionGet(); assertThat(getResponse.getSourceAsString(), equalTo(source("2", "test"))); logger.info("Getting #3 (from the translog)"); - getResponse = client("server1").get(getRequest("test").type("type1").id("3")).actionGet(); + getResponse = client().get(getRequest("test").type("type1").id("3")).actionGet(); assertThat(getResponse.getSourceAsString(), equalTo(source("3", "test"))); logger.info("Flushing, so we have actual content in the index files (#3 should be in the index now as well)"); - client("server1").admin().indices().flush(flushRequest("test")).actionGet(); + client().admin().indices().flush(flushRequest("test")).actionGet(); logger.info("Gateway Snapshot"); - client("server1").admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet(); + client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet(); logger.info("Gateway Snapshot (should be a no op)"); - client("server1").admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet(); + client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet(); logger.info("Closing the server"); - closeNode("server1"); + cluster().stopRandomNode(); logger.info("Starting the server, should recover from the gateway (just from the index, nothing in the translog)"); - startNode("server1"); + cluster().startNode(nodeSettings(0)); logger.info("Running Cluster Health (wait for the shards to startup)"); - clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); logger.info("Getting #1, should not exists"); - getResponse = client("server1").get(getRequest("test").type("type1").id("1")).actionGet(); + getResponse = client().get(getRequest("test").type("type1").id("1")).actionGet(); assertThat(getResponse.isExists(), equalTo(false)); logger.info("Getting #2 (not from the translog, but from the index)"); - getResponse = client("server1").get(getRequest("test").type("type1").id("2")).actionGet(); + getResponse = client().get(getRequest("test").type("type1").id("2")).actionGet(); assertThat(getResponse.getSourceAsString(), equalTo(source("2", "test"))); logger.info("Getting #3 (not from the translog, but from the index)"); - getResponse = client("server1").get(getRequest("test").type("type1").id("3")).actionGet(); + getResponse = client().get(getRequest("test").type("type1").id("3")).actionGet(); assertThat(getResponse.getSourceAsString(), equalTo(source("3", "test"))); logger.info("Deleting the index"); - client("server1").admin().indices().delete(deleteIndexRequest("test")).actionGet(); + client().admin().indices().delete(deleteIndexRequest("test")).actionGet(); } @Test @@ -273,73 +258,73 @@ public class IndexGatewayTests extends AbstractNodesTests { private void testLoad(boolean fullRecovery) { logger.info("Running with fullRecover [{}]", fullRecovery); - startNode("server1"); + cluster().startNode(nodeSettings(0)); logger.info("Running Cluster Health (waiting for node to startup properly)"); - ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); // get the environment, so we can clear the work dir when needed - Environment environment = ((InternalNode) node("server1")).injector().getInstance(Environment.class); + Environment environment = cluster().getInstance(Environment.class); logger.info("--> creating test index ..."); - client("server1").admin().indices().prepareCreate("test").execute().actionGet(); + client().admin().indices().prepareCreate("test").execute().actionGet(); logger.info("Running Cluster Health (wait for the shards to startup)"); - clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); logger.info("--> refreshing and checking count"); - client("server1").admin().indices().prepareRefresh().execute().actionGet(); - assertThat(client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(0l)); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(0l)); logger.info("--> indexing 1234 docs"); for (long i = 0; i < 1234; i++) { - client("server1").prepareIndex("test", "type1", Long.toString(i)) + client().prepareIndex("test", "type1", Long.toString(i)) .setCreate(true) // make sure we use create, so if we recover wrongly, we will get increments... .setSource(MapBuilder.newMapBuilder().put("test", "value" + i).map()).execute().actionGet(); // snapshot every 100 so we get some actions going on in the gateway if ((i % 11) == 0) { - client("server1").admin().indices().prepareGatewaySnapshot().execute().actionGet(); + client().admin().indices().prepareGatewaySnapshot().execute().actionGet(); } // flush every once is a while, so we get different data if ((i % 55) == 0) { - client("server1").admin().indices().prepareFlush().execute().actionGet(); + client().admin().indices().prepareFlush().execute().actionGet(); } } logger.info("--> refreshing and checking count"); - client("server1").admin().indices().prepareRefresh().execute().actionGet(); - assertThat(client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1234l)); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1234l)); logger.info("--> closing the server"); - closeNode("server1"); + cluster().stopRandomNode(); if (fullRecovery) { logger.info("Clearing cluster data dir, so there will be a full recovery from the gateway"); FileSystemUtils.deleteRecursively(environment.dataWithClusterFiles()); logger.info("Starting the server, should recover from the gateway (both index and translog) without reusing work dir"); } - startNode("server1"); + cluster().startNode(nodeSettings(0)); logger.info("--> running Cluster Health (wait for the shards to startup)"); - clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("--> done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); logger.info("--> checking count"); - assertThat(client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1234l)); + assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1234l)); logger.info("--> checking reuse / recovery status"); - IndicesStatusResponse statusResponse = client("server1").admin().indices().prepareStatus().setRecovery(true).execute().actionGet(); + IndicesStatusResponse statusResponse = client().admin().indices().prepareStatus().setRecovery(true).execute().actionGet(); for (IndexShardStatus indexShardStatus : statusResponse.getIndex("test")) { for (ShardStatus shardStatus : indexShardStatus) { if (shardStatus.getShardRouting().primary()) { @@ -370,22 +355,22 @@ public class IndexGatewayTests extends AbstractNodesTests { @Test @Slow public void testIndexActions() throws Exception { - startNode("server1"); + cluster().startNode(nodeSettings(0)); logger.info("Running Cluster Health (waiting for node to startup properly)"); - ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - client("server1").admin().indices().create(createIndexRequest("test")).actionGet(); + client().admin().indices().create(createIndexRequest("test")).actionGet(); - closeNode("server1"); + cluster().stopRandomNode(); - startNode("server1"); + cluster().startNode(nodeSettings(0)); Thread.sleep(500); try { - client("server1").admin().indices().create(createIndexRequest("test")).actionGet(); + client().admin().indices().create(createIndexRequest("test")).actionGet(); assert false : "index should exists"; } catch (IndexAlreadyExistsException e) { // all is well diff --git a/src/test/java/org/elasticsearch/gateway/local/LocalGatewayIndexStateTests.java b/src/test/java/org/elasticsearch/gateway/local/LocalGatewayIndexStateTests.java index 92ffacc65a7..373aed5f3e9 100644 --- a/src/test/java/org/elasticsearch/gateway/local/LocalGatewayIndexStateTests.java +++ b/src/test/java/org/elasticsearch/gateway/local/LocalGatewayIndexStateTests.java @@ -33,11 +33,11 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.Gateway; -import org.elasticsearch.node.internal.InternalNode; -import org.elasticsearch.test.AbstractNodesTests; -import org.junit.After; +import org.elasticsearch.test.*; +import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import org.elasticsearch.test.AbstractIntegrationTest.Scope; +import org.elasticsearch.test.TestCluster.RestartCallback; import org.junit.Test; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; @@ -48,281 +48,242 @@ import static org.hamcrest.Matchers.nullValue; /** * */ -public class LocalGatewayIndexStateTests extends AbstractNodesTests { +@ClusterScope(scope=Scope.TEST, numNodes=0) +public class LocalGatewayIndexStateTests extends AbstractIntegrationTest { private final ESLogger logger = Loggers.getLogger(LocalGatewayIndexStateTests.class); - //TODO Randomize this test - lots of tests are duplicates with settings that can be randomized - @After - public void cleanAndCloseNodes() throws Exception { - for (int i = 0; i < 10; i++) { - if (node("node" + i) != null) { - node("node" + i).stop(); - // since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well - if (((InternalNode) node("node" + i)).injector().getInstance(NodeEnvironment.class).hasNodeFile()) { - ((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset(); - } - } - } - closeAllNodes(); - } @Test public void testMappingMetaDataParsed() throws Exception { - logger.info("--> cleaning nodes"); - buildNode("node1", settingsBuilder().put("gateway.type", "local")); - buildNode("node2", settingsBuilder().put("gateway.type", "local")); - cleanAndCloseNodes(); logger.info("--> starting 1 nodes"); - startNode("node1", settingsBuilder().put("gateway.type", "local")); + cluster().startNode(settingsBuilder().put("gateway.type", "local")); logger.info("--> creating test index, with meta routing"); - client("node1").admin().indices().prepareCreate("test") + client().admin().indices().prepareCreate("test") .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("_routing").field("required", true).endObject().endObject().endObject()) .execute().actionGet(); logger.info("--> waiting for yellow status"); - ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForActiveShards(5).setWaitForYellowStatus().execute().actionGet(); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForActiveShards(5).setWaitForYellowStatus().execute().actionGet(); if (health.isTimedOut()) { - ClusterStateResponse response = client("node1").admin().cluster().prepareState().execute().actionGet(); + ClusterStateResponse response = client().admin().cluster().prepareState().execute().actionGet(); System.out.println("" + response); } assertThat(health.isTimedOut(), equalTo(false)); logger.info("--> verify meta _routing required exists"); - MappingMetaData mappingMd = client("node1").admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test").mapping("type1"); + MappingMetaData mappingMd = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test").mapping("type1"); assertThat(mappingMd.routing().required(), equalTo(true)); - logger.info("--> close node"); - closeNode("node1"); - - logger.info("--> starting node again..."); - startNode("node1", settingsBuilder().put("gateway.type", "local")); + logger.info("--> restarting nodes..."); + cluster().fullRestart(); logger.info("--> waiting for yellow status"); - health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForActiveShards(5).setWaitForYellowStatus().execute().actionGet(); + health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForActiveShards(5).setWaitForYellowStatus().execute().actionGet(); if (health.isTimedOut()) { - ClusterStateResponse response = client("node1").admin().cluster().prepareState().execute().actionGet(); + ClusterStateResponse response = client().admin().cluster().prepareState().execute().actionGet(); System.out.println("" + response); } assertThat(health.isTimedOut(), equalTo(false)); logger.info("--> verify meta _routing required exists"); - mappingMd = client("node1").admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test").mapping("type1"); + mappingMd = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test").mapping("type1"); assertThat(mappingMd.routing().required(), equalTo(true)); } @Test public void testSimpleOpenClose() throws Exception { - logger.info("--> cleaning nodes"); - buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); - buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); - cleanAndCloseNodes(); logger.info("--> starting 2 nodes"); - startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); - startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); + cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); + cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); logger.info("--> creating test index"); - client("node1").admin().indices().prepareCreate("test").execute().actionGet(); + client().admin().indices().prepareCreate("test").execute().actionGet(); logger.info("--> waiting for green status"); - ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); - ClusterStateResponse stateResponse = client("node1").admin().cluster().prepareState().execute().actionGet(); + ClusterStateResponse stateResponse = client().admin().cluster().prepareState().execute().actionGet(); assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.OPEN)); assertThat(stateResponse.getState().routingTable().index("test").shards().size(), equalTo(2)); assertThat(stateResponse.getState().routingTable().index("test").shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4)); logger.info("--> indexing a simple document"); - client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").execute().actionGet(); + client().prepareIndex("test", "type1", "1").setSource("field1", "value1").execute().actionGet(); logger.info("--> closing test index..."); - client("node1").admin().indices().prepareClose("test").execute().actionGet(); + client().admin().indices().prepareClose("test").execute().actionGet(); - stateResponse = client("node1").admin().cluster().prepareState().execute().actionGet(); + stateResponse = client().admin().cluster().prepareState().execute().actionGet(); assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE)); assertThat(stateResponse.getState().routingTable().index("test"), nullValue()); logger.info("--> verifying that the state is green"); - health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet(); + health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN)); logger.info("--> trying to index into a closed index ..."); try { - client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setTimeout("1s").execute().actionGet(); + client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setTimeout("1s").execute().actionGet(); assert false; } catch (ClusterBlockException e) { // all is well } logger.info("--> creating another index (test2) by indexing into it"); - client("node1").prepareIndex("test2", "type1", "1").setSource("field1", "value1").execute().actionGet(); + client().prepareIndex("test2", "type1", "1").setSource("field1", "value1").execute().actionGet(); logger.info("--> verifying that the state is green"); - health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN)); logger.info("--> opening the first index again..."); - client("node1").admin().indices().prepareOpen("test").execute().actionGet(); + client().admin().indices().prepareOpen("test").execute().actionGet(); logger.info("--> verifying that the state is green"); - health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - stateResponse = client("node1").admin().cluster().prepareState().execute().actionGet(); + stateResponse = client().admin().cluster().prepareState().execute().actionGet(); assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.OPEN)); assertThat(stateResponse.getState().routingTable().index("test").shards().size(), equalTo(2)); assertThat(stateResponse.getState().routingTable().index("test").shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4)); logger.info("--> trying to get the indexed document on the first index"); - GetResponse getResponse = client("node1").prepareGet("test", "type1", "1").execute().actionGet(); + GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); assertThat(getResponse.isExists(), equalTo(true)); logger.info("--> closing test index..."); - client("node1").admin().indices().prepareClose("test").execute().actionGet(); - stateResponse = client("node1").admin().cluster().prepareState().execute().actionGet(); + client().admin().indices().prepareClose("test").execute().actionGet(); + stateResponse = client().admin().cluster().prepareState().execute().actionGet(); assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE)); assertThat(stateResponse.getState().routingTable().index("test"), nullValue()); - logger.info("--> closing nodes..."); - closeNode("node2"); - closeNode("node1"); - - logger.info("--> starting nodes again..."); - startNode("node1", settingsBuilder().put("gateway.type", "local").build()); - startNode("node2", settingsBuilder().put("gateway.type", "local").build()); - + logger.info("--> restarting nodes..."); + cluster().fullRestart(); logger.info("--> waiting for two nodes and green status"); - health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); - stateResponse = client("node1").admin().cluster().prepareState().execute().actionGet(); + stateResponse = client().admin().cluster().prepareState().execute().actionGet(); assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE)); assertThat(stateResponse.getState().routingTable().index("test"), nullValue()); logger.info("--> trying to index into a closed index ..."); try { - client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setTimeout("1s").execute().actionGet(); + client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setTimeout("1s").execute().actionGet(); assert false; } catch (ClusterBlockException e) { // all is well } logger.info("--> opening index..."); - client("node1").admin().indices().prepareOpen("test").execute().actionGet(); + client().admin().indices().prepareOpen("test").execute().actionGet(); logger.info("--> waiting for green status"); - health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); - stateResponse = client("node1").admin().cluster().prepareState().execute().actionGet(); + stateResponse = client().admin().cluster().prepareState().execute().actionGet(); assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.OPEN)); assertThat(stateResponse.getState().routingTable().index("test").shards().size(), equalTo(2)); assertThat(stateResponse.getState().routingTable().index("test").shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4)); logger.info("--> trying to get the indexed document on the first round (before close and shutdown)"); - getResponse = client("node1").prepareGet("test", "type1", "1").execute().actionGet(); + getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); assertThat(getResponse.isExists(), equalTo(true)); logger.info("--> indexing a simple document"); - client("node1").prepareIndex("test", "type1", "2").setSource("field1", "value1").execute().actionGet(); + client().prepareIndex("test", "type1", "2").setSource("field1", "value1").execute().actionGet(); } @Test public void testJustMasterNode() throws Exception { logger.info("--> cleaning nodes"); - buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); - buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); - cleanAndCloseNodes(); logger.info("--> starting 1 master node non data"); - startNode("node1", settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); + cluster().startNode(settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); logger.info("--> create an index"); - client("node1").admin().indices().prepareCreate("test").execute().actionGet(); + client().admin().indices().prepareCreate("test").execute().actionGet(); logger.info("--> closing master node"); - closeNode("node1"); + cluster().closeNonSharedNodes(false); logger.info("--> starting 1 master node non data again"); - startNode("node1", settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); + cluster().startNode(settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); logger.info("--> waiting for test index to be created"); - ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setIndices("test").execute().actionGet(); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setIndices("test").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); logger.info("--> verify we have an index"); - ClusterStateResponse clusterStateResponse = client("node1").admin().cluster().prepareState().setFilterIndices("test").execute().actionGet(); + ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().setFilterIndices("test").execute().actionGet(); assertThat(clusterStateResponse.getState().metaData().hasIndex("test"), equalTo(true)); } @Test public void testJustMasterNodeAndJustDataNode() throws Exception { logger.info("--> cleaning nodes"); - buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); - buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); - cleanAndCloseNodes(); logger.info("--> starting 1 master node non data"); - startNode("node1", settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); - startNode("node2", settingsBuilder().put("node.master", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); + cluster().startNode(settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); + cluster().startNode(settingsBuilder().put("node.master", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); logger.info("--> create an index"); - client("node1").admin().indices().prepareCreate("test").execute().actionGet(); + client().admin().indices().prepareCreate("test").execute().actionGet(); logger.info("--> waiting for test index to be created"); - ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setIndices("test").setWaitForYellowStatus().execute().actionGet(); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setIndices("test").setWaitForYellowStatus().execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); - client("node1").prepareIndex("test", "type1").setSource("field1", "value1").setTimeout("100ms").execute().actionGet(); + client().prepareIndex("test", "type1").setSource("field1", "value1").setTimeout("100ms").execute().actionGet(); } @Test public void testTwoNodesSingleDoc() throws Exception { logger.info("--> cleaning nodes"); - buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); - buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); - cleanAndCloseNodes(); logger.info("--> starting 2 nodes"); - startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build()); - startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build()); + cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build()); + cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build()); logger.info("--> indexing a simple document"); - client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet(); + client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet(); logger.info("--> waiting for green status"); - ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); logger.info("--> verify 1 doc in the index"); for (int i = 0; i < 10; i++) { - assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); + assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); } logger.info("--> closing test index..."); - client("node1").admin().indices().prepareClose("test").execute().actionGet(); + client().admin().indices().prepareClose("test").execute().actionGet(); - ClusterStateResponse stateResponse = client("node1").admin().cluster().prepareState().execute().actionGet(); + ClusterStateResponse stateResponse = client().admin().cluster().prepareState().execute().actionGet(); assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE)); assertThat(stateResponse.getState().routingTable().index("test"), nullValue()); logger.info("--> opening the index..."); - client("node1").admin().indices().prepareOpen("test").execute().actionGet(); + client().admin().indices().prepareOpen("test").execute().actionGet(); logger.info("--> waiting for green status"); - health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); logger.info("--> verify 1 doc in the index"); - assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); + assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); for (int i = 0; i < 10; i++) { - assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); + assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); } } @@ -332,60 +293,56 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests { .put("gateway.type", "local").put("gateway.local.auto_import_dangled", "yes") .put("index.number_of_shards", 1).put("index.number_of_replicas", 1) .build(); - logger.info("--> cleaning nodes"); - buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); - buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); - cleanAndCloseNodes(); - logger.info("--> starting two nodes"); - startNode("node1", settings); - startNode("node2", settings); + final String node_1 = cluster().startNode(settings); + cluster().startNode(settings); logger.info("--> indexing a simple document"); - client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet(); + client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet(); logger.info("--> waiting for green status"); - ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); logger.info("--> verify 1 doc in the index"); for (int i = 0; i < 10; i++) { - assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); + assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); } - assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true)); - - logger.info("--> shutting down the nodes"); - Gateway gateway1 = ((InternalNode) node("node1")).injector().getInstance(Gateway.class); - closeNode("node1"); - closeNode("node2"); - - logger.info("--> deleting the data for the first node"); - gateway1.reset(); - - logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)"); - startNode("node1", settings); - startNode("node2", settings); + assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true)); + + logger.info("--> restarting the nodes"); + final Gateway gateway1 = cluster().getInstance(Gateway.class, node_1); + cluster().fullRestart(new RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + if (node_1.equals(nodeName)) { + logger.info("--> deleting the data for the first node"); + gateway1.reset(); + } + return null; + } + }); logger.info("--> waiting for green status"); - health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); // spin a bit waiting for the index to exists long time = System.currentTimeMillis(); while ((System.currentTimeMillis() - time) < TimeValue.timeValueSeconds(10).millis()) { - if (client("node1").admin().indices().prepareExists("test").execute().actionGet().isExists()) { + if (client().admin().indices().prepareExists("test").execute().actionGet().isExists()) { break; } } logger.info("--> verify that the dangling index exists"); - assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true)); + assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true)); logger.info("--> waiting for green status"); - health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); logger.info("--> verify the doc is there"); - assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true)); + assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true)); } @Test @@ -394,68 +351,66 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests { .put("gateway.type", "local").put("gateway.local.auto_import_dangled", "closed") .put("index.number_of_shards", 1).put("index.number_of_replicas", 1) .build(); - logger.info("--> cleaning nodes"); - buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); - buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); - cleanAndCloseNodes(); + logger.info("--> starting two nodes"); - startNode("node1", settings); - startNode("node2", settings); + final String node_1 = cluster().startNode(settings); + cluster().startNode(settings); logger.info("--> indexing a simple document"); - client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet(); + client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet(); logger.info("--> waiting for green status"); - ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); logger.info("--> verify 1 doc in the index"); for (int i = 0; i < 10; i++) { - assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); + assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); } - assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true)); - - logger.info("--> shutting down the nodes"); - Gateway gateway1 = ((InternalNode) node("node1")).injector().getInstance(Gateway.class); - closeNode("node1"); - closeNode("node2"); - - logger.info("--> deleting the data for the first node"); - gateway1.reset(); - - logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)"); - startNode("node1", settings); - startNode("node2", settings); + assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true)); + + logger.info("--> restarting the nodes"); + final Gateway gateway1 = cluster().getInstance(Gateway.class, node_1); + cluster().fullRestart(new RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + if (node_1.equals(nodeName)) { + logger.info("--> deleting the data for the first node"); + gateway1.reset(); + } + return null; + } + }); logger.info("--> waiting for green status"); - health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); // spin a bit waiting for the index to exists long time = System.currentTimeMillis(); while ((System.currentTimeMillis() - time) < TimeValue.timeValueSeconds(10).millis()) { - if (client("node1").admin().indices().prepareExists("test").execute().actionGet().isExists()) { + if (client().admin().indices().prepareExists("test").execute().actionGet().isExists()) { break; } } logger.info("--> verify that the dangling index exists"); - assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true)); + assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true)); logger.info("--> waiting for green status"); - health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); logger.info("--> verify the index state is closed"); - assertThat(client("node1").admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE)); + assertThat(client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE)); logger.info("--> open the index"); - client("node1").admin().indices().prepareOpen("test").execute().actionGet(); + client().admin().indices().prepareOpen("test").execute().actionGet(); logger.info("--> waiting for green status"); - health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); logger.info("--> verify the doc is there"); - assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true)); + assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true)); } @Test @@ -464,42 +419,39 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests { .put("gateway.type", "local").put("gateway.local.auto_import_dangled", "no") .put("index.number_of_shards", 1).put("index.number_of_replicas", 1) .build(); - logger.info("--> cleaning nodes"); - buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); - buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); - cleanAndCloseNodes(); - logger.info("--> starting two nodes"); - startNode("node1", settings); - startNode("node2", settings); + final String node_1 = cluster().startNode(settings); + cluster().startNode(settings); logger.info("--> indexing a simple document"); - client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet(); + client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet(); logger.info("--> waiting for green status"); - ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); logger.info("--> verify 1 doc in the index"); for (int i = 0; i < 10; i++) { - assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); + assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); } - assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true)); + assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true)); - logger.info("--> shutting down the nodes"); - Gateway gateway1 = ((InternalNode) node("node1")).injector().getInstance(Gateway.class); - closeNode("node1"); - closeNode("node2"); - - logger.info("--> deleting the data for the first node"); - gateway1.reset(); - - logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)"); - startNode("node1", settings); - startNode("node2", settings); + logger.info("--> restarting the nodes"); + final Gateway gateway1 = cluster().getInstance(Gateway.class, node_1); + cluster().fullRestart(new RestartCallback() { + + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + if (node_1.equals(nodeName)) { + logger.info("--> deleting the data for the first node"); + gateway1.reset(); + } + return null; + } + }); logger.info("--> waiting for green status"); - health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); // we need to wait for the allocate dangled to kick in (even though in this case its disabled) @@ -507,24 +459,24 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests { Thread.sleep(500); logger.info("--> verify that the dangling index does not exists"); - assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(false)); + assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(false)); - logger.info("--> shutdown the nodes"); - closeNode("node1"); - closeNode("node2"); - - logger.info("--> start the nodes back, but make sure we do recovery only after we have 2 nodes in the cluster"); - startNode("node1", settingsBuilder().put(settings).put("gateway.recover_after_nodes", 2).build()); - startNode("node2", settingsBuilder().put(settings).put("gateway.recover_after_nodes", 2).build()); + logger.info("--> restart start the nodes, but make sure we do recovery only after we have 2 nodes in the cluster"); + cluster().fullRestart(new RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + return settingsBuilder().put("gateway.recover_after_nodes", 2).build(); + } + }); logger.info("--> waiting for green status"); - health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); logger.info("--> verify that the dangling index does exists now!"); - assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true)); + assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true)); logger.info("--> verify the doc is there"); - assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true)); + assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true)); } @Test @@ -534,53 +486,50 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests { .put("index.number_of_shards", 1).put("index.number_of_replicas", 1) .build(); - logger.info("--> cleaning nodes"); - buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); - buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); - cleanAndCloseNodes(); - logger.info("--> starting two nodes"); - startNode("node1", settings); - startNode("node2", settings); + final String node_1 = cluster().startNode(settings); + cluster().startNode(settings); logger.info("--> indexing a simple document"); - client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet(); + client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet(); logger.info("--> waiting for green status"); - ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); logger.info("--> verify 1 doc in the index"); for (int i = 0; i < 10; i++) { - assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); + assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); } - logger.info("--> shutting down the nodes"); - Gateway gateway1 = ((InternalNode) node("node1")).injector().getInstance(Gateway.class); - closeNode("node1"); - closeNode("node2"); - - logger.info("--> deleting the data for the first node"); - gateway1.reset(); - - logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)"); - startNode("node1", settings); - startNode("node2", settings); + logger.info("--> restarting the nodes"); + final Gateway gateway1 = cluster().getInstance(Gateway.class, node_1); + cluster().fullRestart(new RestartCallback() { + + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + if (node_1.equals(nodeName)) { + logger.info("--> deleting the data for the first node"); + gateway1.reset(); + } + return null; + } + }); logger.info("--> waiting for green status"); - health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); logger.info("--> verify that the dangling index does not exists"); - assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(false)); + assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(false)); logger.info("--> close the first node, so we remain with the second that has the dangling index"); - closeNode("node1"); + cluster().stopRandomNode(TestCluster.nameFilter(node_1)); logger.info("--> index a different doc"); - client("node2").prepareIndex("test", "type1", "2").setSource("field1", "value2").setRefresh(true).execute().actionGet(); + client().prepareIndex("test", "type1", "2").setSource("field1", "value2").setRefresh(true).execute().actionGet(); - assertThat(client("node2").prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(false)); - assertThat(client("node2").prepareGet("test", "type1", "2").execute().actionGet().isExists(), equalTo(true)); + assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(false)); + assertThat(client().prepareGet("test", "type1", "2").execute().actionGet().isExists(), equalTo(true)); } } diff --git a/src/test/java/org/elasticsearch/gateway/local/QuorumLocalGatewayTests.java b/src/test/java/org/elasticsearch/gateway/local/QuorumLocalGatewayTests.java index 50ef986e118..5164927bb09 100644 --- a/src/test/java/org/elasticsearch/gateway/local/QuorumLocalGatewayTests.java +++ b/src/test/java/org/elasticsearch/gateway/local/QuorumLocalGatewayTests.java @@ -24,11 +24,12 @@ import org.apache.lucene.util.LuceneTestCase.Slow; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.gateway.Gateway; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.internal.InternalNode; -import org.elasticsearch.test.AbstractNodesTests; -import org.junit.After; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import org.elasticsearch.test.AbstractIntegrationTest.Scope; +import org.elasticsearch.test.TestCluster.RestartCallback; import org.junit.Test; import java.util.concurrent.TimeUnit; @@ -44,155 +45,137 @@ import static org.hamcrest.Matchers.equalTo; /** * */ -public class QuorumLocalGatewayTests extends AbstractNodesTests { - - @After - public void cleanAndCloseNodes() throws Exception { - for (int i = 0; i < 10; i++) { - if (node("node" + i) != null) { - node("node" + i).stop(); - // since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well - ((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset(); - } - } - closeAllNodes(); - } +@ClusterScope(numNodes=0, scope=Scope.TEST) +public class QuorumLocalGatewayTests extends AbstractIntegrationTest { @Test @Slow public void testChangeInitialShardsRecovery() throws Exception { - // clean three nodes - logger.info("--> cleaning nodes"); - buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); - buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); - buildNode("node3", settingsBuilder().put("gateway.type", "local").build()); - cleanAndCloseNodes(); - - logger.info("--> starting 3 nodes"); - Node node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); - Node node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); - Node node3 = startNode("node3", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); + final String[] nodes = new String[3]; + nodes[0] = cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); + nodes[1] = cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); + nodes[2] = cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); logger.info("--> indexing..."); - node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get(); + client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get(); //We don't check for failures in the flush response: if we do we might get the following: // FlushNotAllowedEngineException[[test][1] recovery is in progress, flush [COMMIT_TRANSLOG] is not allowed] - node1.client().admin().indices().prepareFlush().get(); - node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).get(); - assertNoFailures(node1.client().admin().indices().prepareRefresh().execute().get()); + client().admin().indices().prepareFlush().get(); + client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).get(); + assertNoFailures(client().admin().indices().prepareRefresh().execute().get()); logger.info("--> running cluster_health (wait for the shards to startup)"); - ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(6)).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(6)).actionGet(); logger.info("--> done cluster_health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); for (int i = 0; i < 10; i++) { - assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).get(), 2l); + assertHitCount(client().prepareCount().setQuery(matchAllQuery()).get(), 2l); } - - logger.info("--> closing nodes"); - closeAllNodes(); - - logger.info("--> starting 2 nodes back, should not do any recovery (less than quorum)"); - - node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").build()); - node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").build()); + + final String nodeToRemove = nodes[between(0,2)]; + logger.info("--> restarting 2 nodes -- kill 1"); + cluster().fullRestart(new RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + return settingsBuilder().put("gateway.type", "local").build(); + } + + @Override + public boolean doRestart(String nodeName) { + return !nodeToRemove.equals(nodeName); + } + }); assertThat(awaitBusy(new Predicate() { @Override public boolean apply(Object input) { - ClusterStateResponse clusterStateResponse = client("node1").admin().cluster().prepareState().setMasterNodeTimeout("500ms").get(); + ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().setMasterNodeTimeout("500ms").get(); return !clusterStateResponse.getState().routingTable().index("test").allPrimaryShardsActive(); } }, 30, TimeUnit.SECONDS), equalTo(true)); logger.info("--> change the recovery.initial_shards setting, and make sure its recovered"); - client("node1").admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put("recovery.initial_shards", 1)).get(); + client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put("recovery.initial_shards", 1)).get(); logger.info("--> running cluster_health (wait for the shards to startup), 4 shards since we only have 2 nodes"); - clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(4)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(4)).actionGet(); logger.info("--> done cluster_health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); for (int i = 0; i < 10; i++) { - assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).get(), 2l); + assertHitCount(client().prepareCount().setQuery(matchAllQuery()).get(), 2l); } } @Test @Slow public void testQuorumRecovery() throws Exception { - // clean three nodes - logger.info("--> cleaning nodes"); - buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); - buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); - buildNode("node3", settingsBuilder().put("gateway.type", "local").build()); - cleanAndCloseNodes(); logger.info("--> starting 3 nodes"); - Node node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); - Node node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); - Node node3 = startNode("node3", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); + cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); + cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); + cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); logger.info("--> indexing..."); - node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get(); + client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get(); //We don't check for failures in the flush response: if we do we might get the following: // FlushNotAllowedEngineException[[test][1] recovery is in progress, flush [COMMIT_TRANSLOG] is not allowed] - node1.client().admin().indices().prepareFlush().get(); - node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).get(); - assertNoFailures(node1.client().admin().indices().prepareRefresh().get()); + client().admin().indices().prepareFlush().get(); + client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).get(); + assertNoFailures(client().admin().indices().prepareRefresh().get()); logger.info("--> running cluster_health (wait for the shards to startup)"); - ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(6)).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(6)).actionGet(); logger.info("--> done cluster_health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); for (int i = 0; i < 10; i++) { - assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).get(), 2l); + assertHitCount(client().prepareCount().setQuery(matchAllQuery()).get(), 2l); } - logger.info("--> closing first node, and indexing more data to the second node"); - closeNode("node1"); - - assertThat(awaitBusy(new Predicate() { + logger.info("--> restart all nodes"); + cluster().fullRestart(new RestartCallback() { @Override - public boolean apply(Object input) { - logger.info("--> running cluster_health (wait for the shards to startup)"); - ClusterHealthResponse clusterHealth = client("node2").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForNodes("2").waitForActiveShards(4)).actionGet(); - logger.info("--> done cluster_health, status " + clusterHealth.getStatus()); - return clusterHealth.isTimedOut() && clusterHealth.getStatus() == ClusterHealthStatus.YELLOW; + public Settings onNodeStopped(String nodeName) throws Exception { + return null; } - }, 30, TimeUnit.SECONDS), equalTo(false)); - - node2.client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).get(); - assertNoFailures(node2.client().admin().indices().prepareRefresh().get()); - - for (int i = 0; i < 10; i++) { - assertHitCount(node2.client().prepareCount().setQuery(matchAllQuery()).get(), 3l); - } - - logger.info("--> closing the second node and third node"); - closeNode("node2"); - closeNode("node3"); - - logger.info("--> starting the nodes back, verifying we got the latest version"); - - node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").build()); - node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").build()); - node3 = startNode("node3", settingsBuilder().put("gateway.type", "local").build()); + @Override + public void doAfterNodes(int numNodes, final Client activeClient) throws Exception { + if (numNodes == 1) { + assertThat(awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + logger.info("--> running cluster_health (wait for the shards to startup)"); + ClusterHealthResponse clusterHealth = activeClient.admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForNodes("2").waitForActiveShards(4)).actionGet(); + logger.info("--> done cluster_health, status " + clusterHealth.getStatus()); + return (!clusterHealth.isTimedOut()) && clusterHealth.getStatus() == ClusterHealthStatus.YELLOW; + } + }, 30, TimeUnit.SECONDS), equalTo(true)); + logger.info("--> one node is closed -- index 1 document into the remaining nodes"); + activeClient.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).get(); + assertNoFailures(activeClient.admin().indices().prepareRefresh().get()); + for (int i = 0; i < 10; i++) { + assertHitCount(activeClient.prepareCount().setQuery(matchAllQuery()).get(), 3l); + } + } + } + + }); + logger.info("--> all nodes are started back, verifying we got the latest version"); logger.info("--> running cluster_health (wait for the shards to startup)"); - clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(6)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(6)).actionGet(); logger.info("--> done cluster_health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); for (int i = 0; i < 10; i++) { - assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).get(), 3l); + assertHitCount(client().prepareCount().setQuery(matchAllQuery()).get(), 3l); } } } diff --git a/src/test/java/org/elasticsearch/gateway/local/SimpleRecoveryLocalGatewayTests.java b/src/test/java/org/elasticsearch/gateway/local/SimpleRecoveryLocalGatewayTests.java index 35948dd06e6..39f9d34345b 100644 --- a/src/test/java/org/elasticsearch/gateway/local/SimpleRecoveryLocalGatewayTests.java +++ b/src/test/java/org/elasticsearch/gateway/local/SimpleRecoveryLocalGatewayTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.indices.status.IndexShardStatus; import org.elasticsearch.action.admin.indices.status.IndicesStatusResponse; import org.elasticsearch.action.admin.indices.status.ShardStatus; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider; @@ -32,17 +33,15 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.gateway.Gateway; import org.elasticsearch.index.query.FilterBuilders; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.internal.InternalNode; -import org.elasticsearch.test.AbstractNodesTests; -import org.junit.After; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import org.elasticsearch.test.AbstractIntegrationTest.Scope; +import org.elasticsearch.test.TestCluster.RestartCallback; import org.junit.Test; import static org.elasticsearch.client.Requests.clusterHealthRequest; -import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery; @@ -52,137 +51,119 @@ import static org.hamcrest.Matchers.*; /** * */ -public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests { +@ClusterScope(numNodes=0, scope=Scope.TEST) +public class SimpleRecoveryLocalGatewayTests extends AbstractIntegrationTest { - @After - public void cleanAndCloseNodes() throws Exception { - for (int i = 0; i < 10; i++) { - if (node("node" + i) != null) { - node("node" + i).stop(); - // since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well - ((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset(); - } - } - closeAllNodes(); - } - - @Override - protected Settings getClassDefaultSettings() { - return settingsBuilder().put("gateway.type", "local").build(); + + private ImmutableSettings.Builder settingsBuilder() { + return ImmutableSettings.settingsBuilder().put("gateway.type", "local"); } @Test @Slow public void testX() throws Exception { - buildNode("node1"); - cleanAndCloseNodes(); - Node node1 = startNode("node1", settingsBuilder().put("index.number_of_shards", 1).build()); + cluster().startNode(settingsBuilder().put("index.number_of_shards", 1).build()); String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1") .startObject("properties").startObject("appAccountIds").field("type", "string").endObject().endObject() .endObject().endObject().string(); - node1.client().admin().indices().prepareCreate("test").addMapping("type1", mapping).execute().actionGet(); + client().admin().indices().prepareCreate("test").addMapping("type1", mapping).execute().actionGet(); - node1.client().prepareIndex("test", "type1", "10990239").setSource(jsonBuilder().startObject() + client().prepareIndex("test", "type1", "10990239").setSource(jsonBuilder().startObject() .field("_id", "10990239") .startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet(); - node1.client().prepareIndex("test", "type1", "10990473").setSource(jsonBuilder().startObject() + client().prepareIndex("test", "type1", "10990473").setSource(jsonBuilder().startObject() .field("_id", "10990473") .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet(); - node1.client().prepareIndex("test", "type1", "10990513").setSource(jsonBuilder().startObject() + client().prepareIndex("test", "type1", "10990513").setSource(jsonBuilder().startObject() .field("_id", "10990513") .startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet(); - node1.client().prepareIndex("test", "type1", "10990695").setSource(jsonBuilder().startObject() + client().prepareIndex("test", "type1", "10990695").setSource(jsonBuilder().startObject() .field("_id", "10990695") .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet(); - node1.client().prepareIndex("test", "type1", "11026351").setSource(jsonBuilder().startObject() + client().prepareIndex("test", "type1", "11026351").setSource(jsonBuilder().startObject() .field("_id", "11026351") .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet(); - node1.client().admin().indices().prepareRefresh().execute().actionGet(); - assertHitCount(node1.client().prepareCount().setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2); - - closeNode("node1"); - node1 = startNode("node1"); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertHitCount(client().prepareCount().setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2); + cluster().fullRestart(); logger.info("Running Cluster Health (wait for the shards to startup)"); - ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); - node1.client().admin().indices().prepareRefresh().execute().actionGet(); - assertHitCount(node1.client().prepareCount().setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertHitCount(client().prepareCount().setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2); + + cluster().fullRestart(); - closeNode("node1"); - node1 = startNode("node1"); logger.info("Running Cluster Health (wait for the shards to startup)"); - clusterHealth = node1.client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); - node1.client().admin().indices().prepareRefresh().execute().actionGet(); - assertHitCount(node1.client().prepareCount().setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertHitCount(client().prepareCount().setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2); } @Test @Slow public void testSingleNodeNoFlush() throws Exception { - buildNode("node1"); - cleanAndCloseNodes(); - Node node1 = startNode("node1", settingsBuilder().put("index.number_of_shards", 1).build()); + cluster().startNode(settingsBuilder().put("index.number_of_shards", 1).build()); String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1") .startObject("properties").startObject("field").field("type", "string").endObject().startObject("num").field("type", "integer").endObject().endObject() .endObject().endObject().string(); - node1.client().admin().indices().prepareCreate("test").addMapping("type1", mapping).execute().actionGet(); + client().admin().indices().prepareCreate("test").addMapping("type1", mapping).execute().actionGet(); for (int i = 0; i < 100; i++) { - node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("_id", "1").field("field", "value1").startArray("num").value(14).value(179).endArray().endObject()).execute().actionGet(); - node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("_id", "2").field("field", "value2").startArray("num").value(14).endArray().endObject()).execute().actionGet(); + client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("_id", "1").field("field", "value1").startArray("num").value(14).value(179).endArray().endObject()).execute().actionGet(); + client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("_id", "2").field("field", "value2").startArray("num").value(14).endArray().endObject()).execute().actionGet(); } - node1.client().admin().indices().prepareRefresh().execute().actionGet(); + client().admin().indices().prepareRefresh().execute().actionGet(); for (int i = 0; i < 10; i++) { - assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); - assertHitCount(node1.client().prepareCount().setQuery(termQuery("field", "value1")).execute().actionGet(), 1); - assertHitCount(node1.client().prepareCount().setQuery(termQuery("field", "value2")).execute().actionGet(), 1); - assertHitCount(node1.client().prepareCount().setQuery(termQuery("num", 179)).execute().actionGet(), 1); + assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); + assertHitCount(client().prepareCount().setQuery(termQuery("field", "value1")).execute().actionGet(), 1); + assertHitCount(client().prepareCount().setQuery(termQuery("field", "value2")).execute().actionGet(), 1); + assertHitCount(client().prepareCount().setQuery(termQuery("num", 179)).execute().actionGet(), 1); } - closeNode("node1"); - node1 = startNode("node1"); + cluster().fullRestart(); logger.info("Running Cluster Health (wait for the shards to startup)"); - ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); for (int i = 0; i < 10; i++) { - assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); - assertHitCount(node1.client().prepareCount().setQuery(termQuery("field", "value1")).execute().actionGet(), 1); - assertHitCount(node1.client().prepareCount().setQuery(termQuery("field", "value2")).execute().actionGet(), 1); - assertHitCount(node1.client().prepareCount().setQuery(termQuery("num", 179)).execute().actionGet(), 1); + assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); + assertHitCount(client().prepareCount().setQuery(termQuery("field", "value1")).execute().actionGet(), 1); + assertHitCount(client().prepareCount().setQuery(termQuery("field", "value2")).execute().actionGet(), 1); + assertHitCount(client().prepareCount().setQuery(termQuery("num", 179)).execute().actionGet(), 1); } - closeNode("node1"); - node1 = startNode("node1"); + cluster().fullRestart(); + logger.info("Running Cluster Health (wait for the shards to startup)"); - clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); for (int i = 0; i < 10; i++) { - assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); - assertHitCount(node1.client().prepareCount().setQuery(termQuery("field", "value1")).execute().actionGet(), 1); - assertHitCount(node1.client().prepareCount().setQuery(termQuery("field", "value2")).execute().actionGet(), 1); - assertHitCount(node1.client().prepareCount().setQuery(termQuery("num", 179)).execute().actionGet(), 1); + assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); + assertHitCount(client().prepareCount().setQuery(termQuery("field", "value1")).execute().actionGet(), 1); + assertHitCount(client().prepareCount().setQuery(termQuery("field", "value2")).execute().actionGet(), 1); + assertHitCount(client().prepareCount().setQuery(termQuery("num", 179)).execute().actionGet(), 1); } } @@ -190,89 +171,83 @@ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests { @Test @Slow public void testSingleNodeWithFlush() throws Exception { - buildNode("node1"); - cleanAndCloseNodes(); - Node node1 = startNode("node1", settingsBuilder().put("index.number_of_shards", 1).build()); - node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet(); - node1.client().admin().indices().prepareFlush().execute().actionGet(); - node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet(); - node1.client().admin().indices().prepareRefresh().execute().actionGet(); + cluster().startNode(settingsBuilder().put("index.number_of_shards", 1).build()); + client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet(); + client().admin().indices().prepareFlush().execute().actionGet(); + client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet(); + client().admin().indices().prepareRefresh().execute().actionGet(); - assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); + assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); - closeNode("node1"); - node1 = startNode("node1"); + cluster().fullRestart(); logger.info("Running Cluster Health (wait for the shards to startup)"); - ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); for (int i = 0; i < 10; i++) { - assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); + assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); } - closeNode("node1"); - node1 = startNode("node1"); + cluster().fullRestart(); logger.info("Running Cluster Health (wait for the shards to startup)"); - clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); for (int i = 0; i < 10; i++) { - assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); + assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); } } @Test @Slow public void testTwoNodeFirstNodeCleared() throws Exception { - // clean two nodes - buildNode("node1"); - buildNode("node2"); - cleanAndCloseNodes(); - Node node1 = startNode("node1", settingsBuilder().put("index.number_of_shards", 1).build()); - Node node2 = startNode("node2", settingsBuilder().put("index.number_of_shards", 1).build()); + final String firstNode = cluster().startNode(settingsBuilder().put("index.number_of_shards", 1).build()); + cluster().startNode(settingsBuilder().put("index.number_of_shards", 1).build()); - node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet(); - node1.client().admin().indices().prepareFlush().execute().actionGet(); - node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet(); - node1.client().admin().indices().prepareRefresh().execute().actionGet(); + client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet(); + client().admin().indices().prepareFlush().execute().actionGet(); + client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet(); + client().admin().indices().prepareRefresh().execute().actionGet(); logger.info("Running Cluster Health (wait for the shards to startup)"); - ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); for (int i = 0; i < 10; i++) { - assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); + assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); } - logger.info("--> closing nodes"); - closeNode("node1"); - closeNode("node2"); + cluster().fullRestart(new RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + return settingsBuilder().put("gateway.recover_after_nodes", 2).build(); + } - logger.info("--> cleaning node1 gateway"); - buildNode("node1"); - cleanAndCloseNodes(); - - node1 = startNode("node1", settingsBuilder().put("gateway.recover_after_nodes", 2).build()); - node2 = startNode("node2", settingsBuilder().put("gateway.recover_after_nodes", 2).build()); + @Override + public boolean clearData(String nodeName) { + return firstNode.equals(nodeName); + } + + }); logger.info("Running Cluster Health (wait for the shards to startup)"); - clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); for (int i = 0; i < 10; i++) { - assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); + assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); } } @@ -280,72 +255,70 @@ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests { @Slow public void testLatestVersionLoaded() throws Exception { // clean two nodes - buildNode("node1"); - buildNode("node2"); - cleanAndCloseNodes(); - Node node1 = startNode("node1", settingsBuilder().put("index.number_of_shards", 1).put("gateway.recover_after_nodes", 2).build()); - Node node2 = startNode("node2", settingsBuilder().put("index.number_of_shards", 1).put("gateway.recover_after_nodes", 2).build()); + cluster().startNode(settingsBuilder().put("index.number_of_shards", 1).put("gateway.recover_after_nodes", 2).build()); + cluster().startNode(settingsBuilder().put("index.number_of_shards", 1).put("gateway.recover_after_nodes", 2).build()); - node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet(); - node1.client().admin().indices().prepareFlush().execute().actionGet(); - node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet(); - node1.client().admin().indices().prepareRefresh().execute().actionGet(); + client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet(); + client().admin().indices().prepareFlush().execute().actionGet(); + client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet(); + client().admin().indices().prepareRefresh().execute().actionGet(); logger.info("--> running cluster_health (wait for the shards to startup)"); - ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet(); logger.info("--> done cluster_health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); for (int i = 0; i < 10; i++) { - assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); + assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2); } logger.info("--> closing first node, and indexing more data to the second node"); - closeNode("node1"); + cluster().fullRestart(new RestartCallback() { - node2.client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet(); - node2.client().admin().indices().prepareRefresh().execute().actionGet(); + @Override + public void doAfterNodes(int numNodes, Client client) throws Exception { + if (numNodes == 1) { + logger.info("--> one node is closed - start indexing data into the second one"); + client.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet(); + client.admin().indices().prepareRefresh().execute().actionGet(); - for (int i = 0; i < 10; i++) { - assertHitCount(node2.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 3); - } + for (int i = 0; i < 10; i++) { + assertHitCount(client.prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 3); + } - logger.info("--> add some metadata, additional type and template"); - node2.client().admin().indices().preparePutMapping("test").setType("type2") - .setSource(jsonBuilder().startObject().startObject("type1").startObject("_source").field("enabled", false).endObject().endObject().endObject()) - .execute().actionGet(); - node2.client().admin().indices().preparePutTemplate("template_1") - .setTemplate("te*") - .setOrder(0) - .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties") - .startObject("field1").field("type", "string").field("store", "yes").endObject() - .startObject("field2").field("type", "string").field("store", "yes").field("index", "not_analyzed").endObject() - .endObject().endObject().endObject()) - .execute().actionGet(); - node2.client().admin().indices().prepareAliases().addAlias("test", "test_alias", FilterBuilders.termFilter("field", "value")).execute().actionGet(); - - - logger.info("--> closing the second node"); - closeNode("node2"); - - logger.info("--> starting two nodes back, verifying we got the latest version"); - - node1 = startNode("node1", settingsBuilder().put("gateway.recover_after_nodes", 2).build()); - node2 = startNode("node2", settingsBuilder().put("gateway.recover_after_nodes", 2).build()); + logger.info("--> add some metadata, additional type and template"); + client.admin().indices().preparePutMapping("test").setType("type2") + .setSource(jsonBuilder().startObject().startObject("type1").startObject("_source").field("enabled", false).endObject().endObject().endObject()) + .execute().actionGet(); + client.admin().indices().preparePutTemplate("template_1") + .setTemplate("te*") + .setOrder(0) + .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties") + .startObject("field1").field("type", "string").field("store", "yes").endObject() + .startObject("field2").field("type", "string").field("store", "yes").field("index", "not_analyzed").endObject() + .endObject().endObject().endObject()) + .execute().actionGet(); + client.admin().indices().prepareAliases().addAlias("test", "test_alias", FilterBuilders.termFilter("field", "value")).execute().actionGet(); + logger.info("--> starting two nodes back, verifying we got the latest version"); + } + + } + + }); logger.info("--> running cluster_health (wait for the shards to startup)"); - clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet(); logger.info("--> done cluster_health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); for (int i = 0; i < 10; i++) { - assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 3); + assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 3); } - ClusterState state = node1.client().admin().cluster().prepareState().execute().actionGet().getState(); + ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState(); assertThat(state.metaData().index("test").mapping("type2"), notNullValue()); assertThat(state.metaData().templates().get("template_1").template(), equalTo("te*")); assertThat(state.metaData().index("test").aliases().get("test_alias"), notNullValue()); @@ -355,76 +328,56 @@ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests { @Test @Slow public void testReusePeerRecovery() throws Exception { - buildNode("node1"); - buildNode("node2"); - buildNode("node3"); - buildNode("node4"); - cleanAndCloseNodes(); - ImmutableSettings.Builder settings = ImmutableSettings.settingsBuilder() + ImmutableSettings.Builder settings = settingsBuilder() .put("action.admin.cluster.node.shutdown.delay", "10ms") .put("gateway.recover_after_nodes", 4) .put(BalancedShardsAllocator.SETTING_THRESHOLD, 1.1f); // use less agressive settings - startNode("node1", settings); - startNode("node2", settings); - startNode("node3", settings); - startNode("node4", settings); + cluster().startNode(settings); + cluster().startNode(settings); + cluster().startNode(settings); + cluster().startNode(settings); logger.info("--> indexing docs"); for (int i = 0; i < 1000; i++) { - client("node1").prepareIndex("test", "type").setSource("field", "value").execute().actionGet(); + client().prepareIndex("test", "type").setSource("field", "value").execute().actionGet(); if ((i % 200) == 0) { - client("node1").admin().indices().prepareFlush().execute().actionGet(); + client().admin().indices().prepareFlush().execute().actionGet(); } } logger.info("Running Cluster Health"); - ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForRelocatingShards(0)).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForRelocatingShards(0)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); logger.info("--> shutting down the nodes"); // Disable allocations while we are closing nodes - client("node1").admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true)).execute().actionGet(); - for (int i = 1; i < 5; i++) { - closeNode("node" + i); - } - - logger.info("--> start the nodes back up"); - startNode("node1", settings); - startNode("node2", settings); - startNode("node3", settings); - startNode("node4", settings); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true)).execute().actionGet(); + cluster().fullRestart(); logger.info("Running Cluster Health"); - clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(10)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(10)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); logger.info("--> shutting down the nodes"); // Disable allocations while we are closing nodes - client("node1").admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true)).execute().actionGet(); - for (int i = 1; i < 5; i++) { - closeNode("node" + i); - } + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true)).execute().actionGet(); + cluster().fullRestart(); - logger.info("--> start the nodes back up"); - startNode("node1", settings); - startNode("node2", settings); - startNode("node3", settings); - startNode("node4", settings); logger.info("Running Cluster Health"); - clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(10)).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(10)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - IndicesStatusResponse statusResponse = client("node1").admin().indices().prepareStatus("test").setRecovery(true).execute().actionGet(); + IndicesStatusResponse statusResponse = client().admin().indices().prepareStatus("test").setRecovery(true).execute().actionGet(); for (IndexShardStatus indexShardStatus : statusResponse.getIndex("test")) { for (ShardStatus shardStatus : indexShardStatus) { if (!shardStatus.getShardRouting().primary()) { @@ -441,29 +394,30 @@ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests { @Slow public void testRecoveryDifferentNodeOrderStartup() throws Exception { // we need different data paths so we make sure we start the second node fresh - buildNode("node1", settingsBuilder().put("path.data", "data/data1").build()); - buildNode("node2", settingsBuilder().put("path.data", "data/data2").build()); - cleanAndCloseNodes(); - startNode("node1", settingsBuilder().put("path.data", "data/data1").build()); + final String node_1 = cluster().startNode(settingsBuilder().put("path.data", "data/data1").build()); - client("node1").prepareIndex("test", "type1", "1").setSource("field", "value").execute().actionGet(); + client().prepareIndex("test", "type1", "1").setSource("field", "value").execute().actionGet(); - startNode("node2", settingsBuilder().put("path.data", "data/data2").build()); + cluster().startNode(settingsBuilder().put("path.data", "data/data2").build()); - ClusterHealthResponse health = client("node2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); - closeNode("node1"); - closeNode("node2"); + cluster().fullRestart(new RestartCallback() { - startNode("node2", settingsBuilder().put("path.data", "data/data2").build()); + @Override + public boolean doRestart(String nodeName) { + return !node_1.equals(nodeName); + } + }); - health = client("node2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet(); + + health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); - assertThat(client("node2").admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true)); - assertHitCount(client("node2").prepareCount("test").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 1); + assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true)); + assertHitCount(client().prepareCount("test").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 1); } } diff --git a/src/test/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionTests.java b/src/test/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionTests.java index 42684388e07..53638a7461b 100644 --- a/src/test/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionTests.java +++ b/src/test/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.indexlifecycle; import com.carrotsearch.randomizedtesting.annotations.Nightly; -import com.google.common.base.Predicate; import org.apache.lucene.util.LuceneTestCase.Slow; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; @@ -34,6 +33,7 @@ import org.elasticsearch.discovery.Discovery; import org.elasticsearch.test.AbstractIntegrationTest; import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; import org.elasticsearch.test.AbstractIntegrationTest.Scope; +import org.elasticsearch.test.TestCluster; import org.junit.Test; import java.util.Map; @@ -177,12 +177,7 @@ public class IndexLifecycleActionTests extends AbstractIntegrationTest { logger.info("Closing server1"); // kill the first server - cluster().stopRandomNode(new Predicate() { - public boolean apply(Settings settings) { - return server_1.equals(settings.get("name")); - - } - }); + cluster().stopRandomNode(TestCluster.nameFilter(server_1)); // verify health logger.info("Running Cluster Health"); clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet(); @@ -347,12 +342,7 @@ public class IndexLifecycleActionTests extends AbstractIntegrationTest { logger.info("Closing server1"); // kill the first server - cluster().stopRandomNode(new Predicate() { - public boolean apply(Settings settings) { - return server_1.equals(settings.get("name")); - - } - }); + cluster().stopRandomNode(TestCluster.nameFilter(server_1)); logger.info("Running Cluster Health"); clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet(); diff --git a/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java b/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java index b6bf6d6a7c0..3f1e426b1b1 100644 --- a/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java +++ b/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.AbstractIntegrationTest; import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; import org.elasticsearch.test.AbstractIntegrationTest.Scope; +import org.elasticsearch.test.TestCluster; import org.junit.Test; import java.io.File; @@ -69,12 +70,7 @@ public class IndicesStoreTests extends AbstractIntegrationTest { File server2Shard = shardDirectory(node_2, "test", 0); logger.info("--> stopping node node_2"); - cluster().stopRandomNode(new Predicate() { - - public boolean apply(Settings settings) { - return settings.get("name").equals(node_2); - } - }); + cluster().stopRandomNode(TestCluster.nameFilter(node_2)); assertThat(server2Shard.exists(), equalTo(true)); logger.info("--> running cluster_health"); diff --git a/src/test/java/org/elasticsearch/indices/warmer/LocalGatewayIndicesWarmerTests.java b/src/test/java/org/elasticsearch/indices/warmer/LocalGatewayIndicesWarmerTests.java index f98da6fc293..e7940c1bb5f 100644 --- a/src/test/java/org/elasticsearch/indices/warmer/LocalGatewayIndicesWarmerTests.java +++ b/src/test/java/org/elasticsearch/indices/warmer/LocalGatewayIndicesWarmerTests.java @@ -25,14 +25,14 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.gateway.Gateway; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.search.warmer.IndexWarmersMetaData; -import org.elasticsearch.test.AbstractNodesTests; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import org.elasticsearch.test.AbstractIntegrationTest.Scope; +import org.elasticsearch.test.TestCluster.RestartCallback; import org.hamcrest.Matchers; -import org.junit.After; import org.junit.Test; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; @@ -40,51 +40,33 @@ import static org.hamcrest.Matchers.equalTo; /** */ -public class LocalGatewayIndicesWarmerTests extends AbstractNodesTests { +@ClusterScope(numNodes=0, scope=Scope.TEST) +public class LocalGatewayIndicesWarmerTests extends AbstractIntegrationTest { private final ESLogger logger = Loggers.getLogger(LocalGatewayIndicesWarmerTests.class); - @After - public void cleanAndCloseNodes() throws Exception { - super.tearDown(); - for (int i = 0; i < 10; i++) { - if (node("node" + i) != null) { - node("node" + i).stop(); - // since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well - if (((InternalNode) node("node" + i)).injector().getInstance(NodeEnvironment.class).hasNodeFile()) { - ((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset(); - } - } - } - closeAllNodes(false); - } - @Test public void testStatePersistence() throws Exception { - logger.info("--> cleaning nodes"); - buildNode("node1", settingsBuilder().put("gateway.type", "local")); - buildNode("node2", settingsBuilder().put("gateway.type", "local")); - cleanAndCloseNodes(); logger.info("--> starting 1 nodes"); - startNode("node1", settingsBuilder().put("gateway.type", "local")); + cluster().startNode(settingsBuilder().put("gateway.type", "local")); logger.info("--> putting two templates"); - client("node1").admin().indices().prepareCreate("test") + client().admin().indices().prepareCreate("test") .setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 1)) .execute().actionGet(); - client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet(); + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet(); - client("node1").admin().indices().preparePutWarmer("warmer_1") - .setSearchRequest(client("node1").prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "value1"))) + client().admin().indices().preparePutWarmer("warmer_1") + .setSearchRequest(client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "value1"))) .execute().actionGet(); - client("node1").admin().indices().preparePutWarmer("warmer_2") - .setSearchRequest(client("node1").prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "value2"))) + client().admin().indices().preparePutWarmer("warmer_2") + .setSearchRequest(client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "value2"))) .execute().actionGet(); logger.info("--> put template with warmer"); - client("node1").admin().indices().preparePutTemplate("template_1") + client().admin().indices().preparePutTemplate("template_1") .setSource("{\n" + " \"template\" : \"xxx\",\n" + " \"warmers\" : {\n" + @@ -102,7 +84,7 @@ public class LocalGatewayIndicesWarmerTests extends AbstractNodesTests { logger.info("--> verify warmers are registered in cluster state"); - ClusterState clusterState = client("node1").admin().cluster().prepareState().execute().actionGet().getState(); + ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); IndexWarmersMetaData warmersMetaData = clusterState.metaData().index("test").custom(IndexWarmersMetaData.TYPE); assertThat(warmersMetaData, Matchers.notNullValue()); assertThat(warmersMetaData.entries().size(), equalTo(2)); @@ -111,17 +93,19 @@ public class LocalGatewayIndicesWarmerTests extends AbstractNodesTests { assertThat(templateWarmers, Matchers.notNullValue()); assertThat(templateWarmers.entries().size(), equalTo(1)); - logger.info("--> close the node"); - closeNode("node1"); + logger.info("--> restarting the node"); + cluster().fullRestart(new RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + return settingsBuilder().put("gateway.type", "local").build(); + } + }); - logger.info("--> starting the node again..."); - startNode("node1", settingsBuilder().put("gateway.type", "local")); - - ClusterHealthResponse healthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet(); + ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet(); assertThat(healthResponse.isTimedOut(), equalTo(false)); logger.info("--> verify warmers are recovered"); - clusterState = client("node1").admin().cluster().prepareState().execute().actionGet().getState(); + clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); IndexWarmersMetaData recoveredWarmersMetaData = clusterState.metaData().index("test").custom(IndexWarmersMetaData.TYPE); assertThat(recoveredWarmersMetaData.entries().size(), equalTo(warmersMetaData.entries().size())); for (int i = 0; i < warmersMetaData.entries().size(); i++) { @@ -139,25 +123,27 @@ public class LocalGatewayIndicesWarmerTests extends AbstractNodesTests { logger.info("--> delete warmer warmer_1"); - client("node1").admin().indices().prepareDeleteWarmer().setIndices("test").setName("warmer_1").execute().actionGet(); + client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("warmer_1").execute().actionGet(); logger.info("--> verify warmers (delete) are registered in cluster state"); - clusterState = client("node1").admin().cluster().prepareState().execute().actionGet().getState(); + clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); warmersMetaData = clusterState.metaData().index("test").custom(IndexWarmersMetaData.TYPE); assertThat(warmersMetaData, Matchers.notNullValue()); assertThat(warmersMetaData.entries().size(), equalTo(1)); - logger.info("--> close the node"); - closeNode("node1"); + logger.info("--> restarting the node"); + cluster().fullRestart(new RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + return settingsBuilder().put("gateway.type", "local").build(); + } + }); - logger.info("--> starting the node again..."); - startNode("node1", settingsBuilder().put("gateway.type", "local")); - - healthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet(); + healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet(); assertThat(healthResponse.isTimedOut(), equalTo(false)); logger.info("--> verify warmers are recovered"); - clusterState = client("node1").admin().cluster().prepareState().execute().actionGet().getState(); + clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); recoveredWarmersMetaData = clusterState.metaData().index("test").custom(IndexWarmersMetaData.TYPE); assertThat(recoveredWarmersMetaData.entries().size(), equalTo(warmersMetaData.entries().size())); for (int i = 0; i < warmersMetaData.entries().size(); i++) { diff --git a/src/test/java/org/elasticsearch/percolator/RecoveryPercolatorTests.java b/src/test/java/org/elasticsearch/percolator/RecoveryPercolatorTests.java index 52ec89f0b74..e759186f6ab 100644 --- a/src/test/java/org/elasticsearch/percolator/RecoveryPercolatorTests.java +++ b/src/test/java/org/elasticsearch/percolator/RecoveryPercolatorTests.java @@ -85,7 +85,7 @@ public class RecoveryPercolatorTests extends AbstractIntegrationTest { .execute().actionGet(); assertThat(percolate.getMatches(), arrayWithSize(1)); - cluster().restartAllNodes(); + cluster().rollingRestart(); logger.info("Running Cluster Health (wait for the shards to startup)"); ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); @@ -132,7 +132,7 @@ public class RecoveryPercolatorTests extends AbstractIntegrationTest { .execute().actionGet(); assertThat(percolate.getMatches(), arrayWithSize(1)); - cluster().restartAllNodes(); + cluster().rollingRestart(); logger.info("Running Cluster Health (wait for the shards to startup)"); ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); diff --git a/src/test/java/org/elasticsearch/test/AbstractNodesTests.java b/src/test/java/org/elasticsearch/test/AbstractNodesTests.java deleted file mode 100644 index 590bcdfb117..00000000000 --- a/src/test/java/org/elasticsearch/test/AbstractNodesTests.java +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.test; - -import com.google.common.collect.ImmutableSet; -import org.apache.lucene.util.AbstractRandomizedTest.IntegrationTests; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.block.ClusterBlock; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.common.network.NetworkUtils; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.indices.IndexMissingException; -import org.elasticsearch.node.Node; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Ignore; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import static com.google.common.collect.Maps.newHashMap; -import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; -import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; -import static org.elasticsearch.node.NodeBuilder.nodeBuilder; - -@Ignore -@IntegrationTests -public abstract class AbstractNodesTests extends ElasticSearchTestCase { - private static Map nodes = newHashMap(); - - private static Map clients = newHashMap(); - - private static final Settings defaultSettings = ImmutableSettings - .settingsBuilder() - .put("cluster.name", "test-cluster-" + NetworkUtils.getLocalAddress().getHostName() + "CHILD_VM=[" + CHILD_VM_ID +"]") - .build(); - - - public Node startNode(String id) { - return buildNode(id).start(); - } - - public Node startNode(String id, Settings.Builder settings) { - return startNode(id, settings.build()); - } - - public Node startNode(String id, Settings settings) { - return buildNode(id, settings).start(); - } - - public Node buildNode(String id) { - return buildNode(id, EMPTY_SETTINGS); - } - - public Node buildNode(String id, Settings.Builder settings) { - return buildNode(id, settings.build()); - } - - public Node buildNode(String id, Settings settings) { - synchronized (AbstractNodesTests.class) { - if (nodes.containsKey(id)) { - throw new IllegalArgumentException("Node with id ["+ id + "] already exists"); - } - assert !nodes.containsKey(id); - assert !clients.containsKey(id); - - String settingsSource = getClass().getName().replace('.', '/') + ".yml"; - Settings finalSettings = settingsBuilder() - .loadFromClasspath(settingsSource) - .put(defaultSettings) - .put(getClassDefaultSettings()) - .put(settings) - .put("name", id) - .put("discovery.id.seed", randomLong()) - .build(); - - if (finalSettings.get("gateway.type") == null) { - // default to non gateway - finalSettings = settingsBuilder().put(finalSettings).put("gateway.type", "none").build(); - } - if (finalSettings.get("cluster.routing.schedule") != null) { - // decrease the routing schedule so new nodes will be added quickly - finalSettings = settingsBuilder().put(finalSettings).put("cluster.routing.schedule", "50ms").build(); - } - Node node = nodeBuilder() - .settings(finalSettings) - .build(); - logger.info("Build Node [{}] with settings [{}]", id, finalSettings.toDelimitedString(',')); - nodes.put(id, node); - clients.put(id, node.client()); - return node; - } - } - - public void closeNode(String id) { - Client client; - Node node; - synchronized (AbstractNodesTests.class) { - client = clients.remove(id); - node = nodes.remove(id); - } - if (client != null) { - client.close(); - } - if (node != null) { - node.close(); - } - - } - - public List nodes() { - synchronized (AbstractNodesTests.class) { - return new ArrayList(nodes.values()); - } - } - - public Node node(String id) { - synchronized (AbstractNodesTests.class) { - return nodes.get(id); - } - } - - public Client client(String id) { - synchronized (AbstractNodesTests.class) { - return clients.get(id); - } - } - public void closeAllNodes() { - closeAllNodes(false); - } - public void closeAllNodes(boolean preventRelocation) { - synchronized (AbstractNodesTests.class) { - if (preventRelocation) { - Settings build = ImmutableSettings.builder().put("cluster.routing.allocation.disable_allocation", true).build(); - Client aClient = client(); - if (aClient != null) { - aClient.admin().cluster().prepareUpdateSettings().setTransientSettings(build).execute().actionGet(); - } - } - for (Client client : clients.values()) { - client.close(); - } - clients.clear(); - for (Node node : nodes.values()) { - node.close(); - } - nodes.clear(); - } - } - - public ImmutableSet waitForNoBlocks(TimeValue timeout, String node) throws InterruptedException { - long start = System.currentTimeMillis(); - ImmutableSet blocks; - do { - blocks = client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet() - .getState().blocks().global(ClusterBlockLevel.METADATA); - } - while (!blocks.isEmpty() && (System.currentTimeMillis() - start) < timeout.millis()); - return blocks; - } - - public void createIndices(Client client, String... indices) { - for (String index : indices) { - client.admin().indices().prepareCreate(index).execute().actionGet(); - } - } - - public void wipeIndices(Client client, String... names) { - try { - client.admin().indices().prepareDelete(names).execute().actionGet(); - } catch (IndexMissingException e) { - // ignore - } - } - - private static volatile AbstractNodesTests testInstance; // this test class only works once per JVM - - @AfterClass - public static void tearDownOnce() throws Exception { - synchronized (AbstractNodesTests.class) { - if (testInstance != null) { - testInstance.afterClass(); - testInstance.closeAllNodes(); - testInstance = null; - } - } - } - - - @BeforeClass - public static void setUpOnce() throws Exception { - synchronized (AbstractNodesTests.class) { - if (testInstance != null) { - testInstance.afterClass(); - testInstance.closeAllNodes(); - testInstance = null; - } - } - } - - @Before - public final void setUp() throws Exception { - super.setUp(); - synchronized (AbstractNodesTests.class) { - if (testInstance == null) { - testInstance = this; - testInstance.beforeClass(); - - } else { - assert testInstance.getClass() == this.getClass(); - } - } - } - - public Client client() { - synchronized (AbstractNodesTests.class) { - if (clients.isEmpty()) { - return null; - } - return clients.values().iterator().next(); - } - } - - protected void afterClass() throws Exception { - } - - protected Settings getClassDefaultSettings() { - return ImmutableSettings.EMPTY; - } - - protected void beforeClass() throws Exception { - } -} \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/test/TestCluster.java b/src/test/java/org/elasticsearch/test/TestCluster.java index fb2aeda739c..61ebc81a281 100644 --- a/src/test/java/org/elasticsearch/test/TestCluster.java +++ b/src/test/java/org/elasticsearch/test/TestCluster.java @@ -385,11 +385,26 @@ public class TestCluster implements Closeable, Iterable { } } - void restart() { - node.close(); - node = (InternalNode) nodeBuilder().settings(node.settings()).node(); + void restart(RestartCallback callback) throws Exception { + assert callback != null; + if (!node.isClosed()) { + node.close(); + } + Settings newSettings = callback.onNodeStopped(name); + if (newSettings == null) { + newSettings = ImmutableSettings.EMPTY; + } + if (callback.clearData(name)) { + NodeEnvironment nodeEnv = getInstanceFromNode(NodeEnvironment.class, node); + if (nodeEnv.hasNodeFile()) { + FileSystemUtils.deleteRecursively(nodeEnv.nodeDataLocations()); + } + } + node = (InternalNode) nodeBuilder().settings(node.settings()).settings(newSettings).node(); resetClient(); } + + @Override public void close() { @@ -624,24 +639,79 @@ public class TestCluster implements Closeable, Iterable { nodeAndClient.close(); } } + public void restartRandomNode() throws Exception { + restartRandomNode(EMPTY_CALLBACK); + + } - public void restartRandomNode() { + public void restartRandomNode(RestartCallback callback) throws Exception { ensureOpen(); NodeAndClient nodeAndClient = getRandomNodeAndClient(); if (nodeAndClient != null) { logger.info("Restarting random node [{}] ", nodeAndClient.name); - nodeAndClient.restart(); + nodeAndClient.restart(callback); } } - - public void restartAllNodes() { + + private void restartAllNodes(boolean rollingRestart, RestartCallback callback) throws Exception { ensureOpen(); - logger.info("Restarting all nodes"); - for (NodeAndClient nodeAndClient : nodes.values()) { - logger.info("Restarting node [{}] ", nodeAndClient.name); - nodeAndClient.restart(); + List toRemove = new ArrayList(); + try { + for (NodeAndClient nodeAndClient : nodes.values()) { + if (!callback.doRestart(nodeAndClient.name)) { + logger.info("Closing node [{}] during restart", nodeAndClient.name); + toRemove.add(nodeAndClient); + nodeAndClient.close(); + } + } + } finally { + for (NodeAndClient nodeAndClient : toRemove) { + nodes.remove(nodeAndClient.name); + } + } + logger.info("Restarting remaining nodes rollingRestart [{}]", rollingRestart); + if (rollingRestart) { + int numNodesRestarted = 0; + for (NodeAndClient nodeAndClient : nodes.values()) { + callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient()); + logger.info("Restarting node [{}] ", nodeAndClient.name); + nodeAndClient.restart(callback); + } + } else { + int numNodesRestarted = 0; + for (NodeAndClient nodeAndClient : nodes.values()) { + callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient()); + logger.info("Stopping node [{}] ", nodeAndClient.name); + nodeAndClient.node.close(); + } + for (NodeAndClient nodeAndClient : nodes.values()) { + logger.info("Starting node [{}] ", nodeAndClient.name); + nodeAndClient.restart(callback); + } } } + public static final RestartCallback EMPTY_CALLBACK = new RestartCallback() { + public Settings onNodeStopped(String node) { + return null; + } + }; + + public void fullRestart() throws Exception { + fullRestart(EMPTY_CALLBACK); + } + + public void rollingRestart() throws Exception { + rollingRestart(EMPTY_CALLBACK); + } + + public void rollingRestart(RestartCallback function) throws Exception { + restartAllNodes(true, function); + } + + public void fullRestart(RestartCallback function) throws Exception { + restartAllNodes(false, function); + } + private String getMasterName() { try { @@ -781,5 +851,43 @@ public class TestCluster implements Closeable, Iterable { }; } + + public static Predicate nameFilter(String... nodeName) { + return new NodeNamePredicate(new HashSet(Arrays.asList(nodeName))); + } + + private static final class NodeNamePredicate implements Predicate { + private final HashSet nodeNames; + + + public NodeNamePredicate(HashSet nodeNames) { + this.nodeNames = nodeNames; + } + + @Override + public boolean apply(Settings settings) { + return nodeNames.contains(settings.get("name")); + + } + } + + public static abstract class RestartCallback { + + public Settings onNodeStopped(String nodeName) throws Exception { + return ImmutableSettings.EMPTY; + } + + public void doAfterNodes(int numNodes, Client client) throws Exception { + } + + public boolean clearData(String nodeName) { + return false; + } + + public boolean doRestart(String nodeName) { + return true; + } + + } }