Cut RecoveryPercolatorTests over to AbstractIntegrationTest

Added node restart capabilities to TestCluster
Trigger retry mechanism (onFailure method) instead of invoking transport service with null DiscoveryNode when no DiscoveryNode can be found for a ShardRouting.
This commit is contained in:
Martijn van Groningen 2013-09-23 00:19:33 +02:00
parent e7e39936b8
commit 8eab51047f
3 changed files with 133 additions and 133 deletions

View File

@ -174,6 +174,9 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
}
} else {
DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
if (node == null) {
onFailure(shardRouting, new NoShardAvailableActionException(shardIt.shardId()));
} else {
transportService.sendRequest(node, transportShardAction, new ShardSingleOperationRequest(request, shardRouting.id()), new BaseTransportResponseHandler<Response>() {
@Override
@ -199,6 +202,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
}
}
}
}
private class TransportHandler extends BaseTransportRequestHandler<Request> {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.percolator;
import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
@ -34,11 +35,7 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.AbstractNodesTests;
import org.junit.After;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
@ -51,46 +48,28 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.percolator.PercolatorTests.convertFromTextArray;
import static org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import static org.elasticsearch.test.AbstractIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticSearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.*;
public class RecoveryPercolatorTests extends AbstractNodesTests {
@After
public void cleanAndCloseNodes() throws Exception {
for (int i = 0; i < 10; i++) {
if (node("node" + i) != null) {
node("node" + i).stop();
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
if (((InternalNode) node("node" + i)).injector().getInstance(NodeEnvironment.class).hasNodeFile()) {
((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset();
}
}
}
closeAllNodes();
}
@Override
protected Settings getClassDefaultSettings() {
return settingsBuilder().put("gateway.type", "local").build();
}
@ClusterScope(scope = Scope.TEST, numNodes = 0)
public class RecoveryPercolatorTests extends AbstractIntegrationTest {
@Test
@Slow
public void testRestartNodePercolator1() throws Exception {
logger.info("--> cleaning nodes");
buildNode("node1");
cleanAndCloseNodes();
logger.info("--> starting 1 nodes");
startNode("node1");
Client client = client("node1");
client.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
Settings settings = settingsBuilder()
.put(super.getSettings())
.put("gateway.type", "local")
.build();
cluster().startNode(settings);
client().admin().indices().prepareCreate("test").setSettings(
settingsBuilder().put("index.number_of_shards", 1).put()
).execute().actionGet();
logger.info("--> register a query");
client.prepareIndex("test", "_percolator", "kuku")
client().prepareIndex("test", "_percolator", "kuku")
.setSource(jsonBuilder().startObject()
.field("color", "blue")
.field("query", termQuery("field1", "value1"))
@ -98,7 +77,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
.setRefresh(true)
.execute().actionGet();
PercolateResponse percolate = client.preparePercolate()
PercolateResponse percolate = client().preparePercolate()
.setIndices("test").setDocumentType("type1")
.setSource(jsonBuilder().startObject().startObject("doc")
.field("field1", "value1")
@ -106,19 +85,15 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
.execute().actionGet();
assertThat(percolate.getMatches(), arrayWithSize(1));
client.close();
closeNode("node1");
startNode("node1");
client = client("node1");
cluster().restartAllNodes();
logger.info("Running Cluster Health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
percolate = client.preparePercolate()
percolate = client().preparePercolate()
.setIndices("test").setDocumentType("type1")
.setSource(jsonBuilder().startObject().startObject("doc")
.field("field1", "value1")
@ -130,19 +105,16 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
@Test
@Slow
public void testRestartNodePercolator2() throws Exception {
logger.info("--> cleaning nodes");
buildNode("node1");
cleanAndCloseNodes();
logger.info("--> starting 1 nodes");
startNode("node1");
Client client = client("node1");
client.admin().indices().prepareCreate("test")
Settings settings = settingsBuilder()
.put(super.getSettings())
.put("gateway.type", "local")
.build();
cluster().startNode(settings);
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
logger.info("--> register a query");
client.prepareIndex("test", "_percolator", "kuku")
client().prepareIndex("test", "_percolator", "kuku")
.setSource(jsonBuilder().startObject()
.field("color", "blue")
.field("query", termQuery("field1", "value1"))
@ -150,9 +122,9 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
.setRefresh(true)
.execute().actionGet();
assertThat(client.prepareCount().setTypes("_percolator").setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
assertThat(client().prepareCount().setTypes("_percolator").setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
PercolateResponse percolate = client.preparePercolate()
PercolateResponse percolate = client().preparePercolate()
.setIndices("test").setDocumentType("type1")
.setSource(jsonBuilder().startObject().startObject("doc")
.field("field1", "value1")
@ -160,30 +132,26 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
.execute().actionGet();
assertThat(percolate.getMatches(), arrayWithSize(1));
client.close();
closeNode("node1");
startNode("node1");
client = client("node1");
cluster().restartAllNodes();
logger.info("Running Cluster Health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(client.prepareCount().setTypes("_percolator").setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
assertThat(client().prepareCount().setTypes("_percolator").setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
DeleteIndexResponse actionGet = client.admin().indices().prepareDelete("test").execute().actionGet();
DeleteIndexResponse actionGet = client().admin().indices().prepareDelete("test").execute().actionGet();
assertThat(actionGet.isAcknowledged(), equalTo(true));
client.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
client().admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(client.prepareCount().setTypes("_percolator").setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(0l));
assertThat(client().prepareCount().setTypes("_percolator").setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(0l));
percolate = client.preparePercolate()
percolate = client().preparePercolate()
.setIndices("test").setDocumentType("type1")
.setSource(jsonBuilder().startObject().startObject("doc")
.field("field1", "value1")
@ -192,7 +160,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
assertThat(percolate.getMatches(), emptyArray());
logger.info("--> register a query");
client.prepareIndex("test", "_percolator", "kuku")
client().prepareIndex("test", "_percolator", "kuku")
.setSource(jsonBuilder().startObject()
.field("color", "blue")
.field("query", termQuery("field1", "value1"))
@ -200,9 +168,9 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
.setRefresh(true)
.execute().actionGet();
assertThat(client.prepareCount().setTypes("_percolator").setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
assertThat(client().prepareCount().setTypes("_percolator").setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
percolate = client.preparePercolate()
percolate = client().preparePercolate()
.setIndices("test").setDocumentType("type1")
.setSource(jsonBuilder().startObject().startObject("doc")
.field("field1", "value1")
@ -215,27 +183,28 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
@Slow
public void testLoadingPercolateQueriesDuringCloseAndOpen() throws Exception {
Settings settings = settingsBuilder()
.put("gateway.type", "local").build();
logger.info("--> starting 2 nodes");
startNode("node1", settings);
startNode("node2", settings);
.put(super.getSettings())
.put("gateway.type", "local")
.build();
logger.info("--> Starting 2 nodes");
cluster().startNode(settings);
cluster().startNode(settings);
Client client = client("node1");
client.admin().indices().prepareDelete().execute().actionGet();
ensureGreen(client);
client().admin().indices().prepareDelete().execute().actionGet();
ensureGreen();
client.admin().indices().prepareCreate("test")
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder().put("index.number_of_shards", 2))
.execute().actionGet();
ensureGreen(client);
ensureGreen();
logger.info("--> Add dummy docs");
client.prepareIndex("test", "type1", "1").setSource("field1", 0).execute().actionGet();
client.prepareIndex("test", "type2", "1").setSource("field1", "0").execute().actionGet();
client().prepareIndex("test", "type1", "1").setSource("field1", 0).execute().actionGet();
client().prepareIndex("test", "type2", "1").setSource("field1", "0").execute().actionGet();
logger.info("--> register a queries");
for (int i = 1; i <= 100; i++) {
client.prepareIndex("test", "_percolator", Integer.toString(i))
client().prepareIndex("test", "_percolator", Integer.toString(i))
.setSource(jsonBuilder().startObject()
.field("query", rangeQuery("field1").from(0).to(i))
// The type must be set now, because two fields with the same name exist in different types.
@ -246,7 +215,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
}
logger.info("--> Percolate doc with field1=95");
PercolateResponse response = client.preparePercolate()
PercolateResponse response = client().preparePercolate()
.setIndices("test").setDocumentType("type1")
.setSource(jsonBuilder().startObject().startObject("doc").field("field1", 95).endObject().endObject())
.execute().actionGet();
@ -254,13 +223,13 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
assertThat(convertFromTextArray(response.getMatches(), "test"), arrayContainingInAnyOrder("95", "96", "97", "98", "99", "100"));
logger.info("--> Close and open index to trigger percolate queries loading...");
client.admin().indices().prepareClose("test").execute().actionGet();
ensureGreen(client);
client.admin().indices().prepareOpen("test").execute().actionGet();
ensureGreen(client);
client().admin().indices().prepareClose("test").execute().actionGet();
ensureGreen();
client().admin().indices().prepareOpen("test").execute().actionGet();
ensureGreen();
logger.info("--> Percolate doc with field1=100");
response = client.preparePercolate()
response = client().preparePercolate()
.setIndices("test").setDocumentType("type1")
.setSource(jsonBuilder().startObject().startObject("doc").field("field1", 100).endObject().endObject())
.execute().actionGet();
@ -283,25 +252,29 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
// 3 nodes, 2 primary + 2 replicas per primary, so each node should have a copy of the data.
// We only start and stop nodes 2 and 3, so all requests should succeed and never be partial.
private void percolatorRecovery(final boolean multiPercolate) throws Exception {
Settings settings = settingsBuilder()
.put("gateway.type", "none").build();
logger.info("--> starting 3 nodes");
startNode("node1", settings);
startNode("node2", settings);
startNode("node3", settings);
logger.info("--> ensuring exactly 2 nodes");
cluster().ensureAtLeastNumNodes(2);
cluster().ensureAtMostNumNodes(2);
logger.info("--> Adding 3th node");
cluster().startNode(settingsBuilder().put("node.stay", true));
final Client client = client("node1");
client.admin().indices().prepareDelete().execute().actionGet();
ensureGreen(client);
client().admin().indices().prepareDelete().execute().actionGet();
ensureGreen();
client.admin().indices().prepareCreate("test")
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 2)
)
.execute().actionGet();
ensureGreen(client);
ensureGreen();
final Client client = cluster().client(new Predicate<Settings>() {
@Override
public boolean apply(Settings input) {
return input.getAsBoolean("node.stay", false);
}
});
final int numQueries = randomIntBetween(50, 100);
logger.info("--> register a queries");
for (int i = 0; i < numQueries; i++) {
@ -411,10 +384,16 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
};
new Thread(r).start();
Predicate<Settings> nodePredicate = new Predicate<Settings>() {
@Override
public boolean apply(Settings input) {
return !input.getAsBoolean("node.stay", false);
}
};
try {
// 1 index, 2 primaries, 2 replicas per primary
for (int i = 0; i < 4; i++) {
closeNode("node3");
cluster().stopRandomNode(nodePredicate);
client.admin().cluster().prepareHealth("test")
.setWaitForEvents(Priority.LANGUID)
.setTimeout(TimeValue.timeValueMinutes(2))
@ -422,7 +401,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
.setWaitForActiveShards(4) // 2 nodes, so 4 shards (2 primaries, 2 replicas)
.execute().actionGet();
assertThat(error.get(), nullValue());
closeNode("node2");
cluster().stopRandomNode(nodePredicate);
client.admin().cluster().prepareHealth("test")
.setWaitForEvents(Priority.LANGUID)
.setTimeout(TimeValue.timeValueMinutes(2))
@ -430,7 +409,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
.setWaitForActiveShards(2) // 1 node, so 2 shards (2 primaries, 0 replicas)
.execute().actionGet();
assertThat(error.get(), nullValue());
startNode("node3");
cluster().startNode();
client.admin().cluster().prepareHealth("test")
.setWaitForEvents(Priority.LANGUID)
.setTimeout(TimeValue.timeValueMinutes(2))
@ -438,7 +417,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
.setWaitForActiveShards(4) // 2 nodes, so 4 shards (2 primaries, 2 replicas)
.execute().actionGet();
assertThat(error.get(), nullValue());
startNode("node2");
cluster().startNode();
client.admin().cluster().prepareHealth("test")
.setWaitForEvents(Priority.LANGUID)
.setTimeout(TimeValue.timeValueMinutes(2))
@ -454,11 +433,4 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
assertThat(error.get(), nullValue());
}
public static void ensureGreen(Client client) {
ClusterHealthResponse actionGet = client.admin().cluster()
.health(Requests.clusterHealthRequest().waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
assertThat(actionGet.isTimedOut(), equalTo(false));
assertThat(actionGet.getStatus(), equalTo(ClusterHealthStatus.GREEN));
}
}

View File

@ -318,7 +318,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
}
private final class NodeAndClient implements Closeable {
private final InternalNode node;
private InternalNode node;
private Client client;
private Client nodeClient;
private final AtomicBoolean closed = new AtomicBoolean(false);
@ -326,7 +326,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
private final String name;
NodeAndClient(String name, Node node, ClientFactory factory) {
this.node = (InternalNode)node;
this.node = (InternalNode) node;
this.name = name;
this.clientFactory = factory;
}
@ -372,6 +372,12 @@ public class TestCluster implements Closeable, Iterable<Client> {
}
}
void restart() {
node.close();
node = (InternalNode) nodeBuilder().settings(node.settings()).node();
resetClient();
}
@Override
public void close() {
closed.set(true);
@ -606,6 +612,24 @@ public class TestCluster implements Closeable, Iterable<Client> {
}
}
public void restartRandomNode() {
ensureOpen();
NodeAndClient nodeAndClient = getRandomNodeAndClient();
if (nodeAndClient != null) {
logger.info("Restarting random node [{}] ", nodeAndClient.name);
nodeAndClient.restart();
}
}
public void restartAllNodes() {
ensureOpen();
logger.info("Restarting all nodes");
for (NodeAndClient nodeAndClient : nodes.values()) {
logger.info("Restarting node [{}] ", nodeAndClient.name);
nodeAndClient.restart();
}
}
private String getMasterName() {
try {
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();