From 8eab51047f0345fb59297a45618e9c4a78b61fde Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 23 Sep 2013 00:19:33 +0200 Subject: [PATCH] Cut RecoveryPercolatorTests over to AbstractIntegrationTest Added node restart capabilities to TestCluster Trigger retry mechanism (onFailure method) instead of invoking transport service with null DiscoveryNode when no DiscoveryNode can be found for a ShardRouting. --- .../TransportShardSingleOperationAction.java | 40 ++-- .../percolator/RecoveryPercolatorTests.java | 196 ++++++++---------- .../org/elasticsearch/test/TestCluster.java | 30 ++- 3 files changed, 133 insertions(+), 133 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/support/single/shard/TransportShardSingleOperationAction.java b/src/main/java/org/elasticsearch/action/support/single/shard/TransportShardSingleOperationAction.java index 5b7e68146b2..7fe7e15e5d0 100644 --- a/src/main/java/org/elasticsearch/action/support/single/shard/TransportShardSingleOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/single/shard/TransportShardSingleOperationAction.java @@ -174,28 +174,32 @@ public abstract class TransportShardSingleOperationAction() { + if (node == null) { + onFailure(shardRouting, new NoShardAvailableActionException(shardIt.shardId())); + } else { + transportService.sendRequest(node, transportShardAction, new ShardSingleOperationRequest(request, shardRouting.id()), new BaseTransportResponseHandler() { - @Override - public Response newInstance() { - return newResponse(); - } + @Override + public Response newInstance() { + return newResponse(); + } - @Override - public String executor() { - return ThreadPool.Names.SAME; - } + @Override + public String executor() { + return ThreadPool.Names.SAME; + } - @Override - public void handleResponse(final Response response) { - listener.onResponse(response); - } + @Override + public void handleResponse(final Response response) { + listener.onResponse(response); + } - @Override - public void handleException(TransportException exp) { - onFailure(shardRouting, exp); - } - }); + @Override + public void handleException(TransportException exp) { + onFailure(shardRouting, exp); + } + }); + } } } } diff --git a/src/test/java/org/elasticsearch/percolator/RecoveryPercolatorTests.java b/src/test/java/org/elasticsearch/percolator/RecoveryPercolatorTests.java index 4c646e7a8e4..52ec89f0b74 100644 --- a/src/test/java/org/elasticsearch/percolator/RecoveryPercolatorTests.java +++ b/src/test/java/org/elasticsearch/percolator/RecoveryPercolatorTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.percolator; +import com.google.common.base.Predicate; import org.apache.lucene.util.LuceneTestCase.Slow; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; @@ -34,11 +35,7 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.gateway.Gateway; -import org.elasticsearch.node.internal.InternalNode; -import org.elasticsearch.test.AbstractNodesTests; -import org.junit.After; +import org.elasticsearch.test.AbstractIntegrationTest; import org.junit.Test; import java.util.concurrent.CountDownLatch; @@ -51,46 +48,28 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.*; import static org.elasticsearch.percolator.PercolatorTests.convertFromTextArray; +import static org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import static org.elasticsearch.test.AbstractIntegrationTest.Scope; import static org.elasticsearch.test.hamcrest.ElasticSearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.*; -public class RecoveryPercolatorTests extends AbstractNodesTests { - - @After - public void cleanAndCloseNodes() throws Exception { - for (int i = 0; i < 10; i++) { - if (node("node" + i) != null) { - node("node" + i).stop(); - // since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well - if (((InternalNode) node("node" + i)).injector().getInstance(NodeEnvironment.class).hasNodeFile()) { - ((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset(); - } - } - } - closeAllNodes(); - } - - - @Override - protected Settings getClassDefaultSettings() { - return settingsBuilder().put("gateway.type", "local").build(); - } +@ClusterScope(scope = Scope.TEST, numNodes = 0) +public class RecoveryPercolatorTests extends AbstractIntegrationTest { @Test @Slow public void testRestartNodePercolator1() throws Exception { - logger.info("--> cleaning nodes"); - buildNode("node1"); - cleanAndCloseNodes(); - - logger.info("--> starting 1 nodes"); - startNode("node1"); - - Client client = client("node1"); - client.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet(); + Settings settings = settingsBuilder() + .put(super.getSettings()) + .put("gateway.type", "local") + .build(); + cluster().startNode(settings); + client().admin().indices().prepareCreate("test").setSettings( + settingsBuilder().put("index.number_of_shards", 1).put() + ).execute().actionGet(); logger.info("--> register a query"); - client.prepareIndex("test", "_percolator", "kuku") + client().prepareIndex("test", "_percolator", "kuku") .setSource(jsonBuilder().startObject() .field("color", "blue") .field("query", termQuery("field1", "value1")) @@ -98,7 +77,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { .setRefresh(true) .execute().actionGet(); - PercolateResponse percolate = client.preparePercolate() + PercolateResponse percolate = client().preparePercolate() .setIndices("test").setDocumentType("type1") .setSource(jsonBuilder().startObject().startObject("doc") .field("field1", "value1") @@ -106,19 +85,15 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { .execute().actionGet(); assertThat(percolate.getMatches(), arrayWithSize(1)); - client.close(); - closeNode("node1"); - - startNode("node1"); - client = client("node1"); + cluster().restartAllNodes(); logger.info("Running Cluster Health (wait for the shards to startup)"); - ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); - percolate = client.preparePercolate() + percolate = client().preparePercolate() .setIndices("test").setDocumentType("type1") .setSource(jsonBuilder().startObject().startObject("doc") .field("field1", "value1") @@ -130,19 +105,16 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { @Test @Slow public void testRestartNodePercolator2() throws Exception { - logger.info("--> cleaning nodes"); - buildNode("node1"); - cleanAndCloseNodes(); - - logger.info("--> starting 1 nodes"); - startNode("node1"); - - Client client = client("node1"); - client.admin().indices().prepareCreate("test") - .setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet(); + Settings settings = settingsBuilder() + .put(super.getSettings()) + .put("gateway.type", "local") + .build(); + cluster().startNode(settings); + client().admin().indices().prepareCreate("test") + .setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet(); logger.info("--> register a query"); - client.prepareIndex("test", "_percolator", "kuku") + client().prepareIndex("test", "_percolator", "kuku") .setSource(jsonBuilder().startObject() .field("color", "blue") .field("query", termQuery("field1", "value1")) @@ -150,9 +122,9 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { .setRefresh(true) .execute().actionGet(); - assertThat(client.prepareCount().setTypes("_percolator").setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); + assertThat(client().prepareCount().setTypes("_percolator").setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); - PercolateResponse percolate = client.preparePercolate() + PercolateResponse percolate = client().preparePercolate() .setIndices("test").setDocumentType("type1") .setSource(jsonBuilder().startObject().startObject("doc") .field("field1", "value1") @@ -160,30 +132,26 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { .execute().actionGet(); assertThat(percolate.getMatches(), arrayWithSize(1)); - client.close(); - closeNode("node1"); - - startNode("node1"); - client = client("node1"); + cluster().restartAllNodes(); logger.info("Running Cluster Health (wait for the shards to startup)"); - ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); - assertThat(client.prepareCount().setTypes("_percolator").setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); + assertThat(client().prepareCount().setTypes("_percolator").setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); - DeleteIndexResponse actionGet = client.admin().indices().prepareDelete("test").execute().actionGet(); + DeleteIndexResponse actionGet = client().admin().indices().prepareDelete("test").execute().actionGet(); assertThat(actionGet.isAcknowledged(), equalTo(true)); - client.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet(); - clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + client().admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); - assertThat(client.prepareCount().setTypes("_percolator").setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(0l)); + assertThat(client().prepareCount().setTypes("_percolator").setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(0l)); - percolate = client.preparePercolate() + percolate = client().preparePercolate() .setIndices("test").setDocumentType("type1") .setSource(jsonBuilder().startObject().startObject("doc") .field("field1", "value1") @@ -192,7 +160,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { assertThat(percolate.getMatches(), emptyArray()); logger.info("--> register a query"); - client.prepareIndex("test", "_percolator", "kuku") + client().prepareIndex("test", "_percolator", "kuku") .setSource(jsonBuilder().startObject() .field("color", "blue") .field("query", termQuery("field1", "value1")) @@ -200,9 +168,9 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { .setRefresh(true) .execute().actionGet(); - assertThat(client.prepareCount().setTypes("_percolator").setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); + assertThat(client().prepareCount().setTypes("_percolator").setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l)); - percolate = client.preparePercolate() + percolate = client().preparePercolate() .setIndices("test").setDocumentType("type1") .setSource(jsonBuilder().startObject().startObject("doc") .field("field1", "value1") @@ -215,27 +183,28 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { @Slow public void testLoadingPercolateQueriesDuringCloseAndOpen() throws Exception { Settings settings = settingsBuilder() - .put("gateway.type", "local").build(); - logger.info("--> starting 2 nodes"); - startNode("node1", settings); - startNode("node2", settings); + .put(super.getSettings()) + .put("gateway.type", "local") + .build(); + logger.info("--> Starting 2 nodes"); + cluster().startNode(settings); + cluster().startNode(settings); - Client client = client("node1"); - client.admin().indices().prepareDelete().execute().actionGet(); - ensureGreen(client); + client().admin().indices().prepareDelete().execute().actionGet(); + ensureGreen(); - client.admin().indices().prepareCreate("test") + client().admin().indices().prepareCreate("test") .setSettings(settingsBuilder().put("index.number_of_shards", 2)) .execute().actionGet(); - ensureGreen(client); + ensureGreen(); logger.info("--> Add dummy docs"); - client.prepareIndex("test", "type1", "1").setSource("field1", 0).execute().actionGet(); - client.prepareIndex("test", "type2", "1").setSource("field1", "0").execute().actionGet(); + client().prepareIndex("test", "type1", "1").setSource("field1", 0).execute().actionGet(); + client().prepareIndex("test", "type2", "1").setSource("field1", "0").execute().actionGet(); logger.info("--> register a queries"); for (int i = 1; i <= 100; i++) { - client.prepareIndex("test", "_percolator", Integer.toString(i)) + client().prepareIndex("test", "_percolator", Integer.toString(i)) .setSource(jsonBuilder().startObject() .field("query", rangeQuery("field1").from(0).to(i)) // The type must be set now, because two fields with the same name exist in different types. @@ -246,7 +215,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { } logger.info("--> Percolate doc with field1=95"); - PercolateResponse response = client.preparePercolate() + PercolateResponse response = client().preparePercolate() .setIndices("test").setDocumentType("type1") .setSource(jsonBuilder().startObject().startObject("doc").field("field1", 95).endObject().endObject()) .execute().actionGet(); @@ -254,13 +223,13 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { assertThat(convertFromTextArray(response.getMatches(), "test"), arrayContainingInAnyOrder("95", "96", "97", "98", "99", "100")); logger.info("--> Close and open index to trigger percolate queries loading..."); - client.admin().indices().prepareClose("test").execute().actionGet(); - ensureGreen(client); - client.admin().indices().prepareOpen("test").execute().actionGet(); - ensureGreen(client); + client().admin().indices().prepareClose("test").execute().actionGet(); + ensureGreen(); + client().admin().indices().prepareOpen("test").execute().actionGet(); + ensureGreen(); logger.info("--> Percolate doc with field1=100"); - response = client.preparePercolate() + response = client().preparePercolate() .setIndices("test").setDocumentType("type1") .setSource(jsonBuilder().startObject().startObject("doc").field("field1", 100).endObject().endObject()) .execute().actionGet(); @@ -283,25 +252,29 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { // 3 nodes, 2 primary + 2 replicas per primary, so each node should have a copy of the data. // We only start and stop nodes 2 and 3, so all requests should succeed and never be partial. private void percolatorRecovery(final boolean multiPercolate) throws Exception { - Settings settings = settingsBuilder() - .put("gateway.type", "none").build(); - logger.info("--> starting 3 nodes"); - startNode("node1", settings); - startNode("node2", settings); - startNode("node3", settings); + logger.info("--> ensuring exactly 2 nodes"); + cluster().ensureAtLeastNumNodes(2); + cluster().ensureAtMostNumNodes(2); + logger.info("--> Adding 3th node"); + cluster().startNode(settingsBuilder().put("node.stay", true)); - final Client client = client("node1"); - client.admin().indices().prepareDelete().execute().actionGet(); - ensureGreen(client); + client().admin().indices().prepareDelete().execute().actionGet(); + ensureGreen(); - client.admin().indices().prepareCreate("test") + client().admin().indices().prepareCreate("test") .setSettings(settingsBuilder() .put("index.number_of_shards", 2) .put("index.number_of_replicas", 2) ) .execute().actionGet(); - ensureGreen(client); + ensureGreen(); + final Client client = cluster().client(new Predicate() { + @Override + public boolean apply(Settings input) { + return input.getAsBoolean("node.stay", false); + } + }); final int numQueries = randomIntBetween(50, 100); logger.info("--> register a queries"); for (int i = 0; i < numQueries; i++) { @@ -411,10 +384,16 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { }; new Thread(r).start(); + Predicate nodePredicate = new Predicate() { + @Override + public boolean apply(Settings input) { + return !input.getAsBoolean("node.stay", false); + } + }; try { // 1 index, 2 primaries, 2 replicas per primary for (int i = 0; i < 4; i++) { - closeNode("node3"); + cluster().stopRandomNode(nodePredicate); client.admin().cluster().prepareHealth("test") .setWaitForEvents(Priority.LANGUID) .setTimeout(TimeValue.timeValueMinutes(2)) @@ -422,7 +401,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { .setWaitForActiveShards(4) // 2 nodes, so 4 shards (2 primaries, 2 replicas) .execute().actionGet(); assertThat(error.get(), nullValue()); - closeNode("node2"); + cluster().stopRandomNode(nodePredicate); client.admin().cluster().prepareHealth("test") .setWaitForEvents(Priority.LANGUID) .setTimeout(TimeValue.timeValueMinutes(2)) @@ -430,7 +409,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { .setWaitForActiveShards(2) // 1 node, so 2 shards (2 primaries, 0 replicas) .execute().actionGet(); assertThat(error.get(), nullValue()); - startNode("node3"); + cluster().startNode(); client.admin().cluster().prepareHealth("test") .setWaitForEvents(Priority.LANGUID) .setTimeout(TimeValue.timeValueMinutes(2)) @@ -438,7 +417,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { .setWaitForActiveShards(4) // 2 nodes, so 4 shards (2 primaries, 2 replicas) .execute().actionGet(); assertThat(error.get(), nullValue()); - startNode("node2"); + cluster().startNode(); client.admin().cluster().prepareHealth("test") .setWaitForEvents(Priority.LANGUID) .setTimeout(TimeValue.timeValueMinutes(2)) @@ -454,11 +433,4 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { assertThat(error.get(), nullValue()); } - public static void ensureGreen(Client client) { - ClusterHealthResponse actionGet = client.admin().cluster() - .health(Requests.clusterHealthRequest().waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet(); - assertThat(actionGet.isTimedOut(), equalTo(false)); - assertThat(actionGet.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - } - } diff --git a/src/test/java/org/elasticsearch/test/TestCluster.java b/src/test/java/org/elasticsearch/test/TestCluster.java index 8de8380e1fe..8c1c3f78e70 100644 --- a/src/test/java/org/elasticsearch/test/TestCluster.java +++ b/src/test/java/org/elasticsearch/test/TestCluster.java @@ -318,7 +318,7 @@ public class TestCluster implements Closeable, Iterable { } private final class NodeAndClient implements Closeable { - private final InternalNode node; + private InternalNode node; private Client client; private Client nodeClient; private final AtomicBoolean closed = new AtomicBoolean(false); @@ -326,7 +326,7 @@ public class TestCluster implements Closeable, Iterable { private final String name; NodeAndClient(String name, Node node, ClientFactory factory) { - this.node = (InternalNode)node; + this.node = (InternalNode) node; this.name = name; this.clientFactory = factory; } @@ -372,6 +372,12 @@ public class TestCluster implements Closeable, Iterable { } } + void restart() { + node.close(); + node = (InternalNode) nodeBuilder().settings(node.settings()).node(); + resetClient(); + } + @Override public void close() { closed.set(true); @@ -605,7 +611,25 @@ public class TestCluster implements Closeable, Iterable { nodeAndClient.close(); } } - + + public void restartRandomNode() { + ensureOpen(); + NodeAndClient nodeAndClient = getRandomNodeAndClient(); + if (nodeAndClient != null) { + logger.info("Restarting random node [{}] ", nodeAndClient.name); + nodeAndClient.restart(); + } + } + + public void restartAllNodes() { + ensureOpen(); + logger.info("Restarting all nodes"); + for (NodeAndClient nodeAndClient : nodes.values()) { + logger.info("Restarting node [{}] ", nodeAndClient.name); + nodeAndClient.restart(); + } + } + private String getMasterName() { try { ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();