Cut over remaining tests to 'AbstractIntegrationTest'

This commit is contained in:
Simon Willnauer 2013-09-27 15:23:37 +02:00
parent 8b69035fa0
commit 9cb55138c7
10 changed files with 656 additions and 959 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.gateway.fs;
import com.carrotsearch.randomizedtesting.annotations.Nightly;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
@ -36,12 +37,10 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.ImmutableSettings.Builder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.AbstractNodesTests;
import org.junit.After;
import org.junit.Before;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.junit.Test;
import static org.elasticsearch.client.Requests.*;
@ -51,65 +50,51 @@ import static org.hamcrest.Matchers.*;
/**
*
*/
public class IndexGatewayTests extends AbstractNodesTests {
@ClusterScope(scope=Scope.TEST, numNodes=0)
public class IndexGatewayTests extends AbstractIntegrationTest {
private Settings defaultSettings;
private String storeType;
@After
public void closeNodes() throws Exception {
tearDown();
node("server1").stop();
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
((InternalNode) node("server1")).injector().getInstance(Gateway.class).reset();
closeAllNodes();
}
@Before
public void buildNode1() throws Exception {
super.setUp();
Builder builder = ImmutableSettings.builder();
builder.put("cluster.routing.schedule", "100ms");
builder.put("gateway.type", "fs");
if (between(0, 5) == 0) {
builder.put("gateway.fs.buffer_size", between(1, 100) + "kb");
}
if (between(0, 5) == 0) {
builder.put("gateway.fs.chunk_size", between(1, 100) + "kb");
}
builder.put("index.number_of_replicas", "1");
builder.put("index.number_of_shards", rarely() ? Integer.toString(between(2, 6)) : "1");
storeType = rarely() ? "ram" : "fs";
builder.put("index.store.type", storeType);
defaultSettings = builder.build();
buildNode("server1");
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
((InternalNode) node("server1")).injector().getInstance(Gateway.class).reset();
closeAllNodes();
}
private final SetOnce<Settings> settings = new SetOnce<Settings>();
@Override
protected final Settings getClassDefaultSettings() {
return defaultSettings;
protected Settings nodeSettings(int nodeOrdinal) {
if (settings.get() == null) {
Builder builder = ImmutableSettings.builder();
builder.put("cluster.routing.schedule", "100ms");
builder.put("gateway.type", "fs");
if (between(0, 5) == 0) {
builder.put("gateway.fs.buffer_size", between(1, 100) + "kb");
}
if (between(0, 5) == 0) {
builder.put("gateway.fs.chunk_size", between(1, 100) + "kb");
}
builder.put("index.number_of_replicas", "1");
builder.put("index.number_of_shards", rarely() ? Integer.toString(between(2, 6)) : "1");
storeType = rarely() ? "ram" : "fs";
builder.put("index.store.type", storeType);
settings.set(builder.build());
}
return settings.get();
}
protected boolean isPersistentStorage() {
assert storeType != null;
return "fs".equals(storeType);
return "fs".equals(settings.get().get("index.store.type"));
}
@Test
@Slow
public void testSnapshotOperations() throws Exception {
startNode("server1", getClassDefaultSettings());
cluster().startNode(nodeSettings(0));
// get the environment, so we can clear the work dir when needed
Environment environment = ((InternalNode) node("server1")).injector().getInstance(Environment.class);
Environment environment = cluster().getInstance(Environment.class);
logger.info("Running Cluster Health (waiting for node to startup properly)");
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
@ -117,145 +102,145 @@ public class IndexGatewayTests extends AbstractNodesTests {
// Translog tests
logger.info("Creating index [{}]", "test");
client("server1").admin().indices().prepareCreate("test").execute().actionGet();
client().admin().indices().prepareCreate("test").execute().actionGet();
// create a mapping
PutMappingResponse putMappingResponse = client("server1").admin().indices().preparePutMapping("test").setType("type1").setSource(mappingSource()).execute().actionGet();
PutMappingResponse putMappingResponse = client().admin().indices().preparePutMapping("test").setType("type1").setSource(mappingSource()).execute().actionGet();
assertThat(putMappingResponse.isAcknowledged(), equalTo(true));
// verify that mapping is there
ClusterStateResponse clusterState = client("server1").admin().cluster().state(clusterStateRequest()).actionGet();
ClusterStateResponse clusterState = client().admin().cluster().state(clusterStateRequest()).actionGet();
assertThat(clusterState.getState().metaData().index("test").mapping("type1"), notNullValue());
// create two and delete the first
logger.info("Indexing #1");
client("server1").index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet();
client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet();
logger.info("Indexing #2");
client("server1").index(Requests.indexRequest("test").type("type1").id("2").source(source("2", "test"))).actionGet();
client().index(Requests.indexRequest("test").type("type1").id("2").source(source("2", "test"))).actionGet();
// perform snapshot to the index
logger.info("Gateway Snapshot");
client("server1").admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
logger.info("Deleting #1");
client("server1").delete(deleteRequest("test").type("type1").id("1")).actionGet();
client().delete(deleteRequest("test").type("type1").id("1")).actionGet();
// perform snapshot to the index
logger.info("Gateway Snapshot");
client("server1").admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
logger.info("Gateway Snapshot (should be a no op)");
// do it again, it should be a no op
client("server1").admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
logger.info("Closing the server");
closeNode("server1");
cluster().stopRandomNode();
logger.info("Starting the server, should recover from the gateway (only translog should be populated)");
startNode("server1");
cluster().startNode(nodeSettings(0));
logger.info("Running Cluster Health (wait for the shards to startup)");
clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
// verify that mapping is there
clusterState = client("server1").admin().cluster().state(clusterStateRequest()).actionGet();
clusterState = client().admin().cluster().state(clusterStateRequest()).actionGet();
assertThat(clusterState.getState().metaData().index("test").mapping("type1"), notNullValue());
logger.info("Getting #1, should not exists");
GetResponse getResponse = client("server1").get(getRequest("test").type("type1").id("1")).actionGet();
GetResponse getResponse = client().get(getRequest("test").type("type1").id("1")).actionGet();
assertThat(getResponse.isExists(), equalTo(false));
logger.info("Getting #2");
getResponse = client("server1").get(getRequest("test").type("type1").id("2")).actionGet();
getResponse = client().get(getRequest("test").type("type1").id("2")).actionGet();
assertThat(getResponse.getSourceAsString(), equalTo(source("2", "test")));
// Now flush and add some data (so we have index recovery as well)
logger.info("Flushing, so we have actual content in the index files (#2 should be in the index)");
client("server1").admin().indices().flush(flushRequest("test")).actionGet();
client().admin().indices().flush(flushRequest("test")).actionGet();
logger.info("Indexing #3, so we have something in the translog as well");
client("server1").index(Requests.indexRequest("test").type("type1").id("3").source(source("3", "test"))).actionGet();
client().index(Requests.indexRequest("test").type("type1").id("3").source(source("3", "test"))).actionGet();
logger.info("Gateway Snapshot");
client("server1").admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
logger.info("Gateway Snapshot (should be a no op)");
client("server1").admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
logger.info("Closing the server");
closeNode("server1");
cluster().stopRandomNode();
logger.info("Starting the server, should recover from the gateway (both index and translog) and reuse work dir");
startNode("server1");
cluster().startNode(nodeSettings(0));
logger.info("Running Cluster Health (wait for the shards to startup)");
clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
logger.info("Getting #1, should not exists");
getResponse = client("server1").get(getRequest("test").type("type1").id("1")).actionGet();
getResponse = client().get(getRequest("test").type("type1").id("1")).actionGet();
assertThat(getResponse.isExists(), equalTo(false));
logger.info("Getting #2 (not from the translog, but from the index)");
getResponse = client("server1").get(getRequest("test").type("type1").id("2")).actionGet();
getResponse = client().get(getRequest("test").type("type1").id("2")).actionGet();
assertThat(getResponse.getSourceAsString(), equalTo(source("2", "test")));
logger.info("Getting #3 (from the translog)");
getResponse = client("server1").get(getRequest("test").type("type1").id("3")).actionGet();
getResponse = client().get(getRequest("test").type("type1").id("3")).actionGet();
assertThat(getResponse.getSourceAsString(), equalTo(source("3", "test")));
logger.info("Closing the server");
closeNode("server1");
cluster().stopRandomNode();
logger.info("Clearing cluster data dir, so there will be a full recovery from the gateway");
FileSystemUtils.deleteRecursively(environment.dataWithClusterFiles());
logger.info("Starting the server, should recover from the gateway (both index and translog) without reusing work dir");
startNode("server1");
cluster().startNode(nodeSettings(0));
logger.info("Running Cluster Health (wait for the shards to startup)");
clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
logger.info("Getting #1, should not exists");
getResponse = client("server1").get(getRequest("test").type("type1").id("1")).actionGet();
getResponse = client().get(getRequest("test").type("type1").id("1")).actionGet();
assertThat(getResponse.isExists(), equalTo(false));
logger.info("Getting #2 (not from the translog, but from the index)");
getResponse = client("server1").get(getRequest("test").type("type1").id("2")).actionGet();
getResponse = client().get(getRequest("test").type("type1").id("2")).actionGet();
assertThat(getResponse.getSourceAsString(), equalTo(source("2", "test")));
logger.info("Getting #3 (from the translog)");
getResponse = client("server1").get(getRequest("test").type("type1").id("3")).actionGet();
getResponse = client().get(getRequest("test").type("type1").id("3")).actionGet();
assertThat(getResponse.getSourceAsString(), equalTo(source("3", "test")));
logger.info("Flushing, so we have actual content in the index files (#3 should be in the index now as well)");
client("server1").admin().indices().flush(flushRequest("test")).actionGet();
client().admin().indices().flush(flushRequest("test")).actionGet();
logger.info("Gateway Snapshot");
client("server1").admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
logger.info("Gateway Snapshot (should be a no op)");
client("server1").admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
logger.info("Closing the server");
closeNode("server1");
cluster().stopRandomNode();
logger.info("Starting the server, should recover from the gateway (just from the index, nothing in the translog)");
startNode("server1");
cluster().startNode(nodeSettings(0));
logger.info("Running Cluster Health (wait for the shards to startup)");
clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
logger.info("Getting #1, should not exists");
getResponse = client("server1").get(getRequest("test").type("type1").id("1")).actionGet();
getResponse = client().get(getRequest("test").type("type1").id("1")).actionGet();
assertThat(getResponse.isExists(), equalTo(false));
logger.info("Getting #2 (not from the translog, but from the index)");
getResponse = client("server1").get(getRequest("test").type("type1").id("2")).actionGet();
getResponse = client().get(getRequest("test").type("type1").id("2")).actionGet();
assertThat(getResponse.getSourceAsString(), equalTo(source("2", "test")));
logger.info("Getting #3 (not from the translog, but from the index)");
getResponse = client("server1").get(getRequest("test").type("type1").id("3")).actionGet();
getResponse = client().get(getRequest("test").type("type1").id("3")).actionGet();
assertThat(getResponse.getSourceAsString(), equalTo(source("3", "test")));
logger.info("Deleting the index");
client("server1").admin().indices().delete(deleteIndexRequest("test")).actionGet();
client().admin().indices().delete(deleteIndexRequest("test")).actionGet();
}
@Test
@ -273,73 +258,73 @@ public class IndexGatewayTests extends AbstractNodesTests {
private void testLoad(boolean fullRecovery) {
logger.info("Running with fullRecover [{}]", fullRecovery);
startNode("server1");
cluster().startNode(nodeSettings(0));
logger.info("Running Cluster Health (waiting for node to startup properly)");
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
// get the environment, so we can clear the work dir when needed
Environment environment = ((InternalNode) node("server1")).injector().getInstance(Environment.class);
Environment environment = cluster().getInstance(Environment.class);
logger.info("--> creating test index ...");
client("server1").admin().indices().prepareCreate("test").execute().actionGet();
client().admin().indices().prepareCreate("test").execute().actionGet();
logger.info("Running Cluster Health (wait for the shards to startup)");
clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
logger.info("--> refreshing and checking count");
client("server1").admin().indices().prepareRefresh().execute().actionGet();
assertThat(client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(0l));
client().admin().indices().prepareRefresh().execute().actionGet();
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(0l));
logger.info("--> indexing 1234 docs");
for (long i = 0; i < 1234; i++) {
client("server1").prepareIndex("test", "type1", Long.toString(i))
client().prepareIndex("test", "type1", Long.toString(i))
.setCreate(true) // make sure we use create, so if we recover wrongly, we will get increments...
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + i).map()).execute().actionGet();
// snapshot every 100 so we get some actions going on in the gateway
if ((i % 11) == 0) {
client("server1").admin().indices().prepareGatewaySnapshot().execute().actionGet();
client().admin().indices().prepareGatewaySnapshot().execute().actionGet();
}
// flush every once is a while, so we get different data
if ((i % 55) == 0) {
client("server1").admin().indices().prepareFlush().execute().actionGet();
client().admin().indices().prepareFlush().execute().actionGet();
}
}
logger.info("--> refreshing and checking count");
client("server1").admin().indices().prepareRefresh().execute().actionGet();
assertThat(client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1234l));
client().admin().indices().prepareRefresh().execute().actionGet();
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1234l));
logger.info("--> closing the server");
closeNode("server1");
cluster().stopRandomNode();
if (fullRecovery) {
logger.info("Clearing cluster data dir, so there will be a full recovery from the gateway");
FileSystemUtils.deleteRecursively(environment.dataWithClusterFiles());
logger.info("Starting the server, should recover from the gateway (both index and translog) without reusing work dir");
}
startNode("server1");
cluster().startNode(nodeSettings(0));
logger.info("--> running Cluster Health (wait for the shards to startup)");
clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("--> done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
logger.info("--> checking count");
assertThat(client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1234l));
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1234l));
logger.info("--> checking reuse / recovery status");
IndicesStatusResponse statusResponse = client("server1").admin().indices().prepareStatus().setRecovery(true).execute().actionGet();
IndicesStatusResponse statusResponse = client().admin().indices().prepareStatus().setRecovery(true).execute().actionGet();
for (IndexShardStatus indexShardStatus : statusResponse.getIndex("test")) {
for (ShardStatus shardStatus : indexShardStatus) {
if (shardStatus.getShardRouting().primary()) {
@ -370,22 +355,22 @@ public class IndexGatewayTests extends AbstractNodesTests {
@Test
@Slow
public void testIndexActions() throws Exception {
startNode("server1");
cluster().startNode(nodeSettings(0));
logger.info("Running Cluster Health (waiting for node to startup properly)");
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
client("server1").admin().indices().create(createIndexRequest("test")).actionGet();
client().admin().indices().create(createIndexRequest("test")).actionGet();
closeNode("server1");
cluster().stopRandomNode();
startNode("server1");
cluster().startNode(nodeSettings(0));
Thread.sleep(500);
try {
client("server1").admin().indices().create(createIndexRequest("test")).actionGet();
client().admin().indices().create(createIndexRequest("test")).actionGet();
assert false : "index should exists";
} catch (IndexAlreadyExistsException e) {
// all is well

View File

@ -33,11 +33,11 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.AbstractNodesTests;
import org.junit.After;
import org.elasticsearch.test.*;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.elasticsearch.test.TestCluster.RestartCallback;
import org.junit.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
@ -48,281 +48,242 @@ import static org.hamcrest.Matchers.nullValue;
/**
*
*/
public class LocalGatewayIndexStateTests extends AbstractNodesTests {
@ClusterScope(scope=Scope.TEST, numNodes=0)
public class LocalGatewayIndexStateTests extends AbstractIntegrationTest {
private final ESLogger logger = Loggers.getLogger(LocalGatewayIndexStateTests.class);
//TODO Randomize this test - lots of tests are duplicates with settings that can be randomized
@After
public void cleanAndCloseNodes() throws Exception {
for (int i = 0; i < 10; i++) {
if (node("node" + i) != null) {
node("node" + i).stop();
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
if (((InternalNode) node("node" + i)).injector().getInstance(NodeEnvironment.class).hasNodeFile()) {
((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset();
}
}
}
closeAllNodes();
}
@Test
public void testMappingMetaDataParsed() throws Exception {
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local"));
buildNode("node2", settingsBuilder().put("gateway.type", "local"));
cleanAndCloseNodes();
logger.info("--> starting 1 nodes");
startNode("node1", settingsBuilder().put("gateway.type", "local"));
cluster().startNode(settingsBuilder().put("gateway.type", "local"));
logger.info("--> creating test index, with meta routing");
client("node1").admin().indices().prepareCreate("test")
client().admin().indices().prepareCreate("test")
.addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("_routing").field("required", true).endObject().endObject().endObject())
.execute().actionGet();
logger.info("--> waiting for yellow status");
ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForActiveShards(5).setWaitForYellowStatus().execute().actionGet();
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForActiveShards(5).setWaitForYellowStatus().execute().actionGet();
if (health.isTimedOut()) {
ClusterStateResponse response = client("node1").admin().cluster().prepareState().execute().actionGet();
ClusterStateResponse response = client().admin().cluster().prepareState().execute().actionGet();
System.out.println("" + response);
}
assertThat(health.isTimedOut(), equalTo(false));
logger.info("--> verify meta _routing required exists");
MappingMetaData mappingMd = client("node1").admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test").mapping("type1");
MappingMetaData mappingMd = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test").mapping("type1");
assertThat(mappingMd.routing().required(), equalTo(true));
logger.info("--> close node");
closeNode("node1");
logger.info("--> starting node again...");
startNode("node1", settingsBuilder().put("gateway.type", "local"));
logger.info("--> restarting nodes...");
cluster().fullRestart();
logger.info("--> waiting for yellow status");
health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForActiveShards(5).setWaitForYellowStatus().execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForActiveShards(5).setWaitForYellowStatus().execute().actionGet();
if (health.isTimedOut()) {
ClusterStateResponse response = client("node1").admin().cluster().prepareState().execute().actionGet();
ClusterStateResponse response = client().admin().cluster().prepareState().execute().actionGet();
System.out.println("" + response);
}
assertThat(health.isTimedOut(), equalTo(false));
logger.info("--> verify meta _routing required exists");
mappingMd = client("node1").admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test").mapping("type1");
mappingMd = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test").mapping("type1");
assertThat(mappingMd.routing().required(), equalTo(true));
}
@Test
public void testSimpleOpenClose() throws Exception {
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting 2 nodes");
startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
logger.info("--> creating test index");
client("node1").admin().indices().prepareCreate("test").execute().actionGet();
client().admin().indices().prepareCreate("test").execute().actionGet();
logger.info("--> waiting for green status");
ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ClusterStateResponse stateResponse = client("node1").admin().cluster().prepareState().execute().actionGet();
ClusterStateResponse stateResponse = client().admin().cluster().prepareState().execute().actionGet();
assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.OPEN));
assertThat(stateResponse.getState().routingTable().index("test").shards().size(), equalTo(2));
assertThat(stateResponse.getState().routingTable().index("test").shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4));
logger.info("--> indexing a simple document");
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").execute().actionGet();
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").execute().actionGet();
logger.info("--> closing test index...");
client("node1").admin().indices().prepareClose("test").execute().actionGet();
client().admin().indices().prepareClose("test").execute().actionGet();
stateResponse = client("node1").admin().cluster().prepareState().execute().actionGet();
stateResponse = client().admin().cluster().prepareState().execute().actionGet();
assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE));
assertThat(stateResponse.getState().routingTable().index("test"), nullValue());
logger.info("--> verifying that the state is green");
health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN));
logger.info("--> trying to index into a closed index ...");
try {
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setTimeout("1s").execute().actionGet();
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setTimeout("1s").execute().actionGet();
assert false;
} catch (ClusterBlockException e) {
// all is well
}
logger.info("--> creating another index (test2) by indexing into it");
client("node1").prepareIndex("test2", "type1", "1").setSource("field1", "value1").execute().actionGet();
client().prepareIndex("test2", "type1", "1").setSource("field1", "value1").execute().actionGet();
logger.info("--> verifying that the state is green");
health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN));
logger.info("--> opening the first index again...");
client("node1").admin().indices().prepareOpen("test").execute().actionGet();
client().admin().indices().prepareOpen("test").execute().actionGet();
logger.info("--> verifying that the state is green");
health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN));
stateResponse = client("node1").admin().cluster().prepareState().execute().actionGet();
stateResponse = client().admin().cluster().prepareState().execute().actionGet();
assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.OPEN));
assertThat(stateResponse.getState().routingTable().index("test").shards().size(), equalTo(2));
assertThat(stateResponse.getState().routingTable().index("test").shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4));
logger.info("--> trying to get the indexed document on the first index");
GetResponse getResponse = client("node1").prepareGet("test", "type1", "1").execute().actionGet();
GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet();
assertThat(getResponse.isExists(), equalTo(true));
logger.info("--> closing test index...");
client("node1").admin().indices().prepareClose("test").execute().actionGet();
stateResponse = client("node1").admin().cluster().prepareState().execute().actionGet();
client().admin().indices().prepareClose("test").execute().actionGet();
stateResponse = client().admin().cluster().prepareState().execute().actionGet();
assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE));
assertThat(stateResponse.getState().routingTable().index("test"), nullValue());
logger.info("--> closing nodes...");
closeNode("node2");
closeNode("node1");
logger.info("--> starting nodes again...");
startNode("node1", settingsBuilder().put("gateway.type", "local").build());
startNode("node2", settingsBuilder().put("gateway.type", "local").build());
logger.info("--> restarting nodes...");
cluster().fullRestart();
logger.info("--> waiting for two nodes and green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
stateResponse = client("node1").admin().cluster().prepareState().execute().actionGet();
stateResponse = client().admin().cluster().prepareState().execute().actionGet();
assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE));
assertThat(stateResponse.getState().routingTable().index("test"), nullValue());
logger.info("--> trying to index into a closed index ...");
try {
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setTimeout("1s").execute().actionGet();
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setTimeout("1s").execute().actionGet();
assert false;
} catch (ClusterBlockException e) {
// all is well
}
logger.info("--> opening index...");
client("node1").admin().indices().prepareOpen("test").execute().actionGet();
client().admin().indices().prepareOpen("test").execute().actionGet();
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
stateResponse = client("node1").admin().cluster().prepareState().execute().actionGet();
stateResponse = client().admin().cluster().prepareState().execute().actionGet();
assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.OPEN));
assertThat(stateResponse.getState().routingTable().index("test").shards().size(), equalTo(2));
assertThat(stateResponse.getState().routingTable().index("test").shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4));
logger.info("--> trying to get the indexed document on the first round (before close and shutdown)");
getResponse = client("node1").prepareGet("test", "type1", "1").execute().actionGet();
getResponse = client().prepareGet("test", "type1", "1").execute().actionGet();
assertThat(getResponse.isExists(), equalTo(true));
logger.info("--> indexing a simple document");
client("node1").prepareIndex("test", "type1", "2").setSource("field1", "value1").execute().actionGet();
client().prepareIndex("test", "type1", "2").setSource("field1", "value1").execute().actionGet();
}
@Test
public void testJustMasterNode() throws Exception {
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting 1 master node non data");
startNode("node1", settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
cluster().startNode(settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
logger.info("--> create an index");
client("node1").admin().indices().prepareCreate("test").execute().actionGet();
client().admin().indices().prepareCreate("test").execute().actionGet();
logger.info("--> closing master node");
closeNode("node1");
cluster().closeNonSharedNodes(false);
logger.info("--> starting 1 master node non data again");
startNode("node1", settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
cluster().startNode(settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
logger.info("--> waiting for test index to be created");
ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setIndices("test").execute().actionGet();
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setIndices("test").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
logger.info("--> verify we have an index");
ClusterStateResponse clusterStateResponse = client("node1").admin().cluster().prepareState().setFilterIndices("test").execute().actionGet();
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().setFilterIndices("test").execute().actionGet();
assertThat(clusterStateResponse.getState().metaData().hasIndex("test"), equalTo(true));
}
@Test
public void testJustMasterNodeAndJustDataNode() throws Exception {
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting 1 master node non data");
startNode("node1", settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
startNode("node2", settingsBuilder().put("node.master", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
cluster().startNode(settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
cluster().startNode(settingsBuilder().put("node.master", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
logger.info("--> create an index");
client("node1").admin().indices().prepareCreate("test").execute().actionGet();
client().admin().indices().prepareCreate("test").execute().actionGet();
logger.info("--> waiting for test index to be created");
ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setIndices("test").setWaitForYellowStatus().execute().actionGet();
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setIndices("test").setWaitForYellowStatus().execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
client("node1").prepareIndex("test", "type1").setSource("field1", "value1").setTimeout("100ms").execute().actionGet();
client().prepareIndex("test", "type1").setSource("field1", "value1").setTimeout("100ms").execute().actionGet();
}
@Test
public void testTwoNodesSingleDoc() throws Exception {
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting 2 nodes");
startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build());
startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build());
cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build());
cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build());
logger.info("--> indexing a simple document");
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
logger.info("--> waiting for green status");
ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
logger.info("--> verify 1 doc in the index");
for (int i = 0; i < 10; i++) {
assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
}
logger.info("--> closing test index...");
client("node1").admin().indices().prepareClose("test").execute().actionGet();
client().admin().indices().prepareClose("test").execute().actionGet();
ClusterStateResponse stateResponse = client("node1").admin().cluster().prepareState().execute().actionGet();
ClusterStateResponse stateResponse = client().admin().cluster().prepareState().execute().actionGet();
assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE));
assertThat(stateResponse.getState().routingTable().index("test"), nullValue());
logger.info("--> opening the index...");
client("node1").admin().indices().prepareOpen("test").execute().actionGet();
client().admin().indices().prepareOpen("test").execute().actionGet();
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
logger.info("--> verify 1 doc in the index");
assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
for (int i = 0; i < 10; i++) {
assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
}
}
@ -332,60 +293,56 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
.put("gateway.type", "local").put("gateway.local.auto_import_dangled", "yes")
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)
.build();
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting two nodes");
startNode("node1", settings);
startNode("node2", settings);
final String node_1 = cluster().startNode(settings);
cluster().startNode(settings);
logger.info("--> indexing a simple document");
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
logger.info("--> waiting for green status");
ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
logger.info("--> verify 1 doc in the index");
for (int i = 0; i < 10; i++) {
assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
}
assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
logger.info("--> shutting down the nodes");
Gateway gateway1 = ((InternalNode) node("node1")).injector().getInstance(Gateway.class);
closeNode("node1");
closeNode("node2");
logger.info("--> deleting the data for the first node");
gateway1.reset();
logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)");
startNode("node1", settings);
startNode("node2", settings);
assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
logger.info("--> restarting the nodes");
final Gateway gateway1 = cluster().getInstance(Gateway.class, node_1);
cluster().fullRestart(new RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
if (node_1.equals(nodeName)) {
logger.info("--> deleting the data for the first node");
gateway1.reset();
}
return null;
}
});
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
// spin a bit waiting for the index to exists
long time = System.currentTimeMillis();
while ((System.currentTimeMillis() - time) < TimeValue.timeValueSeconds(10).millis()) {
if (client("node1").admin().indices().prepareExists("test").execute().actionGet().isExists()) {
if (client().admin().indices().prepareExists("test").execute().actionGet().isExists()) {
break;
}
}
logger.info("--> verify that the dangling index exists");
assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));
assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
logger.info("--> verify the doc is there");
assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
}
@Test
@ -394,68 +351,66 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
.put("gateway.type", "local").put("gateway.local.auto_import_dangled", "closed")
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)
.build();
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting two nodes");
startNode("node1", settings);
startNode("node2", settings);
final String node_1 = cluster().startNode(settings);
cluster().startNode(settings);
logger.info("--> indexing a simple document");
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
logger.info("--> waiting for green status");
ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
logger.info("--> verify 1 doc in the index");
for (int i = 0; i < 10; i++) {
assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
}
assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
logger.info("--> shutting down the nodes");
Gateway gateway1 = ((InternalNode) node("node1")).injector().getInstance(Gateway.class);
closeNode("node1");
closeNode("node2");
logger.info("--> deleting the data for the first node");
gateway1.reset();
logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)");
startNode("node1", settings);
startNode("node2", settings);
assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
logger.info("--> restarting the nodes");
final Gateway gateway1 = cluster().getInstance(Gateway.class, node_1);
cluster().fullRestart(new RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
if (node_1.equals(nodeName)) {
logger.info("--> deleting the data for the first node");
gateway1.reset();
}
return null;
}
});
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
// spin a bit waiting for the index to exists
long time = System.currentTimeMillis();
while ((System.currentTimeMillis() - time) < TimeValue.timeValueSeconds(10).millis()) {
if (client("node1").admin().indices().prepareExists("test").execute().actionGet().isExists()) {
if (client().admin().indices().prepareExists("test").execute().actionGet().isExists()) {
break;
}
}
logger.info("--> verify that the dangling index exists");
assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));
assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
logger.info("--> verify the index state is closed");
assertThat(client("node1").admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE));
assertThat(client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE));
logger.info("--> open the index");
client("node1").admin().indices().prepareOpen("test").execute().actionGet();
client().admin().indices().prepareOpen("test").execute().actionGet();
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
logger.info("--> verify the doc is there");
assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
}
@Test
@ -464,42 +419,39 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
.put("gateway.type", "local").put("gateway.local.auto_import_dangled", "no")
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)
.build();
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting two nodes");
startNode("node1", settings);
startNode("node2", settings);
final String node_1 = cluster().startNode(settings);
cluster().startNode(settings);
logger.info("--> indexing a simple document");
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
logger.info("--> waiting for green status");
ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
logger.info("--> verify 1 doc in the index");
for (int i = 0; i < 10; i++) {
assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
}
assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
logger.info("--> shutting down the nodes");
Gateway gateway1 = ((InternalNode) node("node1")).injector().getInstance(Gateway.class);
closeNode("node1");
closeNode("node2");
logger.info("--> deleting the data for the first node");
gateway1.reset();
logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)");
startNode("node1", settings);
startNode("node2", settings);
logger.info("--> restarting the nodes");
final Gateway gateway1 = cluster().getInstance(Gateway.class, node_1);
cluster().fullRestart(new RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
if (node_1.equals(nodeName)) {
logger.info("--> deleting the data for the first node");
gateway1.reset();
}
return null;
}
});
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
// we need to wait for the allocate dangled to kick in (even though in this case its disabled)
@ -507,24 +459,24 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
Thread.sleep(500);
logger.info("--> verify that the dangling index does not exists");
assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(false));
assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(false));
logger.info("--> shutdown the nodes");
closeNode("node1");
closeNode("node2");
logger.info("--> start the nodes back, but make sure we do recovery only after we have 2 nodes in the cluster");
startNode("node1", settingsBuilder().put(settings).put("gateway.recover_after_nodes", 2).build());
startNode("node2", settingsBuilder().put(settings).put("gateway.recover_after_nodes", 2).build());
logger.info("--> restart start the nodes, but make sure we do recovery only after we have 2 nodes in the cluster");
cluster().fullRestart(new RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
return settingsBuilder().put("gateway.recover_after_nodes", 2).build();
}
});
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
logger.info("--> verify that the dangling index does exists now!");
assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));
assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));
logger.info("--> verify the doc is there");
assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
}
@Test
@ -534,53 +486,50 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)
.build();
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting two nodes");
startNode("node1", settings);
startNode("node2", settings);
final String node_1 = cluster().startNode(settings);
cluster().startNode(settings);
logger.info("--> indexing a simple document");
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
logger.info("--> waiting for green status");
ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
logger.info("--> verify 1 doc in the index");
for (int i = 0; i < 10; i++) {
assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
}
logger.info("--> shutting down the nodes");
Gateway gateway1 = ((InternalNode) node("node1")).injector().getInstance(Gateway.class);
closeNode("node1");
closeNode("node2");
logger.info("--> deleting the data for the first node");
gateway1.reset();
logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)");
startNode("node1", settings);
startNode("node2", settings);
logger.info("--> restarting the nodes");
final Gateway gateway1 = cluster().getInstance(Gateway.class, node_1);
cluster().fullRestart(new RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
if (node_1.equals(nodeName)) {
logger.info("--> deleting the data for the first node");
gateway1.reset();
}
return null;
}
});
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
logger.info("--> verify that the dangling index does not exists");
assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(false));
assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(false));
logger.info("--> close the first node, so we remain with the second that has the dangling index");
closeNode("node1");
cluster().stopRandomNode(TestCluster.nameFilter(node_1));
logger.info("--> index a different doc");
client("node2").prepareIndex("test", "type1", "2").setSource("field1", "value2").setRefresh(true).execute().actionGet();
client().prepareIndex("test", "type1", "2").setSource("field1", "value2").setRefresh(true).execute().actionGet();
assertThat(client("node2").prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(false));
assertThat(client("node2").prepareGet("test", "type1", "2").execute().actionGet().isExists(), equalTo(true));
assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(false));
assertThat(client().prepareGet("test", "type1", "2").execute().actionGet().isExists(), equalTo(true));
}
}

View File

@ -24,11 +24,12 @@ import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.AbstractNodesTests;
import org.junit.After;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.elasticsearch.test.TestCluster.RestartCallback;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
@ -44,155 +45,137 @@ import static org.hamcrest.Matchers.equalTo;
/**
*
*/
public class QuorumLocalGatewayTests extends AbstractNodesTests {
@After
public void cleanAndCloseNodes() throws Exception {
for (int i = 0; i < 10; i++) {
if (node("node" + i) != null) {
node("node" + i).stop();
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset();
}
}
closeAllNodes();
}
@ClusterScope(numNodes=0, scope=Scope.TEST)
public class QuorumLocalGatewayTests extends AbstractIntegrationTest {
@Test
@Slow
public void testChangeInitialShardsRecovery() throws Exception {
// clean three nodes
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
buildNode("node3", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting 3 nodes");
Node node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build());
Node node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build());
Node node3 = startNode("node3", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build());
final String[] nodes = new String[3];
nodes[0] = cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build());
nodes[1] = cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build());
nodes[2] = cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build());
logger.info("--> indexing...");
node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get();
client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get();
//We don't check for failures in the flush response: if we do we might get the following:
// FlushNotAllowedEngineException[[test][1] recovery is in progress, flush [COMMIT_TRANSLOG] is not allowed]
node1.client().admin().indices().prepareFlush().get();
node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).get();
assertNoFailures(node1.client().admin().indices().prepareRefresh().execute().get());
client().admin().indices().prepareFlush().get();
client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).get();
assertNoFailures(client().admin().indices().prepareRefresh().execute().get());
logger.info("--> running cluster_health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(6)).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(6)).actionGet();
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
for (int i = 0; i < 10; i++) {
assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).get(), 2l);
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).get(), 2l);
}
logger.info("--> closing nodes");
closeAllNodes();
logger.info("--> starting 2 nodes back, should not do any recovery (less than quorum)");
node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").build());
node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").build());
final String nodeToRemove = nodes[between(0,2)];
logger.info("--> restarting 2 nodes -- kill 1");
cluster().fullRestart(new RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
return settingsBuilder().put("gateway.type", "local").build();
}
@Override
public boolean doRestart(String nodeName) {
return !nodeToRemove.equals(nodeName);
}
});
assertThat(awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
ClusterStateResponse clusterStateResponse = client("node1").admin().cluster().prepareState().setMasterNodeTimeout("500ms").get();
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().setMasterNodeTimeout("500ms").get();
return !clusterStateResponse.getState().routingTable().index("test").allPrimaryShardsActive();
}
}, 30, TimeUnit.SECONDS), equalTo(true));
logger.info("--> change the recovery.initial_shards setting, and make sure its recovered");
client("node1").admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put("recovery.initial_shards", 1)).get();
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put("recovery.initial_shards", 1)).get();
logger.info("--> running cluster_health (wait for the shards to startup), 4 shards since we only have 2 nodes");
clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(4)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(4)).actionGet();
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
for (int i = 0; i < 10; i++) {
assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).get(), 2l);
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).get(), 2l);
}
}
@Test
@Slow
public void testQuorumRecovery() throws Exception {
// clean three nodes
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
buildNode("node3", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting 3 nodes");
Node node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build());
Node node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build());
Node node3 = startNode("node3", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build());
cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build());
cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build());
cluster().startNode(settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build());
logger.info("--> indexing...");
node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get();
client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get();
//We don't check for failures in the flush response: if we do we might get the following:
// FlushNotAllowedEngineException[[test][1] recovery is in progress, flush [COMMIT_TRANSLOG] is not allowed]
node1.client().admin().indices().prepareFlush().get();
node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).get();
assertNoFailures(node1.client().admin().indices().prepareRefresh().get());
client().admin().indices().prepareFlush().get();
client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).get();
assertNoFailures(client().admin().indices().prepareRefresh().get());
logger.info("--> running cluster_health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(6)).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(6)).actionGet();
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
for (int i = 0; i < 10; i++) {
assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).get(), 2l);
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).get(), 2l);
}
logger.info("--> closing first node, and indexing more data to the second node");
closeNode("node1");
assertThat(awaitBusy(new Predicate<Object>() {
logger.info("--> restart all nodes");
cluster().fullRestart(new RestartCallback() {
@Override
public boolean apply(Object input) {
logger.info("--> running cluster_health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = client("node2").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForNodes("2").waitForActiveShards(4)).actionGet();
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
return clusterHealth.isTimedOut() && clusterHealth.getStatus() == ClusterHealthStatus.YELLOW;
public Settings onNodeStopped(String nodeName) throws Exception {
return null;
}
}, 30, TimeUnit.SECONDS), equalTo(false));
node2.client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).get();
assertNoFailures(node2.client().admin().indices().prepareRefresh().get());
for (int i = 0; i < 10; i++) {
assertHitCount(node2.client().prepareCount().setQuery(matchAllQuery()).get(), 3l);
}
logger.info("--> closing the second node and third node");
closeNode("node2");
closeNode("node3");
logger.info("--> starting the nodes back, verifying we got the latest version");
node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").build());
node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").build());
node3 = startNode("node3", settingsBuilder().put("gateway.type", "local").build());
@Override
public void doAfterNodes(int numNodes, final Client activeClient) throws Exception {
if (numNodes == 1) {
assertThat(awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
logger.info("--> running cluster_health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = activeClient.admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForNodes("2").waitForActiveShards(4)).actionGet();
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
return (!clusterHealth.isTimedOut()) && clusterHealth.getStatus() == ClusterHealthStatus.YELLOW;
}
}, 30, TimeUnit.SECONDS), equalTo(true));
logger.info("--> one node is closed -- index 1 document into the remaining nodes");
activeClient.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).get();
assertNoFailures(activeClient.admin().indices().prepareRefresh().get());
for (int i = 0; i < 10; i++) {
assertHitCount(activeClient.prepareCount().setQuery(matchAllQuery()).get(), 3l);
}
}
}
});
logger.info("--> all nodes are started back, verifying we got the latest version");
logger.info("--> running cluster_health (wait for the shards to startup)");
clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(6)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(6)).actionGet();
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
for (int i = 0; i < 10; i++) {
assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).get(), 3l);
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).get(), 3l);
}
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.status.IndexShardStatus;
import org.elasticsearch.action.admin.indices.status.IndicesStatusResponse;
import org.elasticsearch.action.admin.indices.status.ShardStatus;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider;
@ -32,17 +33,15 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.AbstractNodesTests;
import org.junit.After;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.elasticsearch.test.TestCluster.RestartCallback;
import org.junit.Test;
import static org.elasticsearch.client.Requests.clusterHealthRequest;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@ -52,137 +51,119 @@ import static org.hamcrest.Matchers.*;
/**
*
*/
public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests {
@ClusterScope(numNodes=0, scope=Scope.TEST)
public class SimpleRecoveryLocalGatewayTests extends AbstractIntegrationTest {
@After
public void cleanAndCloseNodes() throws Exception {
for (int i = 0; i < 10; i++) {
if (node("node" + i) != null) {
node("node" + i).stop();
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset();
}
}
closeAllNodes();
}
@Override
protected Settings getClassDefaultSettings() {
return settingsBuilder().put("gateway.type", "local").build();
private ImmutableSettings.Builder settingsBuilder() {
return ImmutableSettings.settingsBuilder().put("gateway.type", "local");
}
@Test
@Slow
public void testX() throws Exception {
buildNode("node1");
cleanAndCloseNodes();
Node node1 = startNode("node1", settingsBuilder().put("index.number_of_shards", 1).build());
cluster().startNode(settingsBuilder().put("index.number_of_shards", 1).build());
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1")
.startObject("properties").startObject("appAccountIds").field("type", "string").endObject().endObject()
.endObject().endObject().string();
node1.client().admin().indices().prepareCreate("test").addMapping("type1", mapping).execute().actionGet();
client().admin().indices().prepareCreate("test").addMapping("type1", mapping).execute().actionGet();
node1.client().prepareIndex("test", "type1", "10990239").setSource(jsonBuilder().startObject()
client().prepareIndex("test", "type1", "10990239").setSource(jsonBuilder().startObject()
.field("_id", "10990239")
.startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet();
node1.client().prepareIndex("test", "type1", "10990473").setSource(jsonBuilder().startObject()
client().prepareIndex("test", "type1", "10990473").setSource(jsonBuilder().startObject()
.field("_id", "10990473")
.startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();
node1.client().prepareIndex("test", "type1", "10990513").setSource(jsonBuilder().startObject()
client().prepareIndex("test", "type1", "10990513").setSource(jsonBuilder().startObject()
.field("_id", "10990513")
.startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet();
node1.client().prepareIndex("test", "type1", "10990695").setSource(jsonBuilder().startObject()
client().prepareIndex("test", "type1", "10990695").setSource(jsonBuilder().startObject()
.field("_id", "10990695")
.startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();
node1.client().prepareIndex("test", "type1", "11026351").setSource(jsonBuilder().startObject()
client().prepareIndex("test", "type1", "11026351").setSource(jsonBuilder().startObject()
.field("_id", "11026351")
.startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();
node1.client().admin().indices().prepareRefresh().execute().actionGet();
assertHitCount(node1.client().prepareCount().setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2);
closeNode("node1");
node1 = startNode("node1");
client().admin().indices().prepareRefresh().execute().actionGet();
assertHitCount(client().prepareCount().setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2);
cluster().fullRestart();
logger.info("Running Cluster Health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
node1.client().admin().indices().prepareRefresh().execute().actionGet();
assertHitCount(node1.client().prepareCount().setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2);
client().admin().indices().prepareRefresh().execute().actionGet();
assertHitCount(client().prepareCount().setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2);
cluster().fullRestart();
closeNode("node1");
node1 = startNode("node1");
logger.info("Running Cluster Health (wait for the shards to startup)");
clusterHealth = node1.client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
node1.client().admin().indices().prepareRefresh().execute().actionGet();
assertHitCount(node1.client().prepareCount().setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2);
client().admin().indices().prepareRefresh().execute().actionGet();
assertHitCount(client().prepareCount().setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2);
}
@Test
@Slow
public void testSingleNodeNoFlush() throws Exception {
buildNode("node1");
cleanAndCloseNodes();
Node node1 = startNode("node1", settingsBuilder().put("index.number_of_shards", 1).build());
cluster().startNode(settingsBuilder().put("index.number_of_shards", 1).build());
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1")
.startObject("properties").startObject("field").field("type", "string").endObject().startObject("num").field("type", "integer").endObject().endObject()
.endObject().endObject().string();
node1.client().admin().indices().prepareCreate("test").addMapping("type1", mapping).execute().actionGet();
client().admin().indices().prepareCreate("test").addMapping("type1", mapping).execute().actionGet();
for (int i = 0; i < 100; i++) {
node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("_id", "1").field("field", "value1").startArray("num").value(14).value(179).endArray().endObject()).execute().actionGet();
node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("_id", "2").field("field", "value2").startArray("num").value(14).endArray().endObject()).execute().actionGet();
client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("_id", "1").field("field", "value1").startArray("num").value(14).value(179).endArray().endObject()).execute().actionGet();
client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("_id", "2").field("field", "value2").startArray("num").value(14).endArray().endObject()).execute().actionGet();
}
node1.client().admin().indices().prepareRefresh().execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) {
assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
assertHitCount(node1.client().prepareCount().setQuery(termQuery("field", "value1")).execute().actionGet(), 1);
assertHitCount(node1.client().prepareCount().setQuery(termQuery("field", "value2")).execute().actionGet(), 1);
assertHitCount(node1.client().prepareCount().setQuery(termQuery("num", 179)).execute().actionGet(), 1);
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value1")).execute().actionGet(), 1);
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value2")).execute().actionGet(), 1);
assertHitCount(client().prepareCount().setQuery(termQuery("num", 179)).execute().actionGet(), 1);
}
closeNode("node1");
node1 = startNode("node1");
cluster().fullRestart();
logger.info("Running Cluster Health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
for (int i = 0; i < 10; i++) {
assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
assertHitCount(node1.client().prepareCount().setQuery(termQuery("field", "value1")).execute().actionGet(), 1);
assertHitCount(node1.client().prepareCount().setQuery(termQuery("field", "value2")).execute().actionGet(), 1);
assertHitCount(node1.client().prepareCount().setQuery(termQuery("num", 179)).execute().actionGet(), 1);
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value1")).execute().actionGet(), 1);
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value2")).execute().actionGet(), 1);
assertHitCount(client().prepareCount().setQuery(termQuery("num", 179)).execute().actionGet(), 1);
}
closeNode("node1");
node1 = startNode("node1");
cluster().fullRestart();
logger.info("Running Cluster Health (wait for the shards to startup)");
clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
for (int i = 0; i < 10; i++) {
assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
assertHitCount(node1.client().prepareCount().setQuery(termQuery("field", "value1")).execute().actionGet(), 1);
assertHitCount(node1.client().prepareCount().setQuery(termQuery("field", "value2")).execute().actionGet(), 1);
assertHitCount(node1.client().prepareCount().setQuery(termQuery("num", 179)).execute().actionGet(), 1);
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value1")).execute().actionGet(), 1);
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value2")).execute().actionGet(), 1);
assertHitCount(client().prepareCount().setQuery(termQuery("num", 179)).execute().actionGet(), 1);
}
}
@ -190,89 +171,83 @@ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests {
@Test
@Slow
public void testSingleNodeWithFlush() throws Exception {
buildNode("node1");
cleanAndCloseNodes();
Node node1 = startNode("node1", settingsBuilder().put("index.number_of_shards", 1).build());
node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
node1.client().admin().indices().prepareFlush().execute().actionGet();
node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet();
node1.client().admin().indices().prepareRefresh().execute().actionGet();
cluster().startNode(settingsBuilder().put("index.number_of_shards", 1).build());
client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
client().admin().indices().prepareFlush().execute().actionGet();
client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
closeNode("node1");
node1 = startNode("node1");
cluster().fullRestart();
logger.info("Running Cluster Health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
for (int i = 0; i < 10; i++) {
assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
}
closeNode("node1");
node1 = startNode("node1");
cluster().fullRestart();
logger.info("Running Cluster Health (wait for the shards to startup)");
clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
for (int i = 0; i < 10; i++) {
assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
}
}
@Test
@Slow
public void testTwoNodeFirstNodeCleared() throws Exception {
// clean two nodes
buildNode("node1");
buildNode("node2");
cleanAndCloseNodes();
Node node1 = startNode("node1", settingsBuilder().put("index.number_of_shards", 1).build());
Node node2 = startNode("node2", settingsBuilder().put("index.number_of_shards", 1).build());
final String firstNode = cluster().startNode(settingsBuilder().put("index.number_of_shards", 1).build());
cluster().startNode(settingsBuilder().put("index.number_of_shards", 1).build());
node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
node1.client().admin().indices().prepareFlush().execute().actionGet();
node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet();
node1.client().admin().indices().prepareRefresh().execute().actionGet();
client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
client().admin().indices().prepareFlush().execute().actionGet();
client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
logger.info("Running Cluster Health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
for (int i = 0; i < 10; i++) {
assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
}
logger.info("--> closing nodes");
closeNode("node1");
closeNode("node2");
cluster().fullRestart(new RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
return settingsBuilder().put("gateway.recover_after_nodes", 2).build();
}
logger.info("--> cleaning node1 gateway");
buildNode("node1");
cleanAndCloseNodes();
node1 = startNode("node1", settingsBuilder().put("gateway.recover_after_nodes", 2).build());
node2 = startNode("node2", settingsBuilder().put("gateway.recover_after_nodes", 2).build());
@Override
public boolean clearData(String nodeName) {
return firstNode.equals(nodeName);
}
});
logger.info("Running Cluster Health (wait for the shards to startup)");
clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
for (int i = 0; i < 10; i++) {
assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
}
}
@ -280,72 +255,70 @@ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests {
@Slow
public void testLatestVersionLoaded() throws Exception {
// clean two nodes
buildNode("node1");
buildNode("node2");
cleanAndCloseNodes();
Node node1 = startNode("node1", settingsBuilder().put("index.number_of_shards", 1).put("gateway.recover_after_nodes", 2).build());
Node node2 = startNode("node2", settingsBuilder().put("index.number_of_shards", 1).put("gateway.recover_after_nodes", 2).build());
cluster().startNode(settingsBuilder().put("index.number_of_shards", 1).put("gateway.recover_after_nodes", 2).build());
cluster().startNode(settingsBuilder().put("index.number_of_shards", 1).put("gateway.recover_after_nodes", 2).build());
node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
node1.client().admin().indices().prepareFlush().execute().actionGet();
node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet();
node1.client().admin().indices().prepareRefresh().execute().actionGet();
client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
client().admin().indices().prepareFlush().execute().actionGet();
client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
logger.info("--> running cluster_health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet();
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
for (int i = 0; i < 10; i++) {
assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
}
logger.info("--> closing first node, and indexing more data to the second node");
closeNode("node1");
cluster().fullRestart(new RestartCallback() {
node2.client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet();
node2.client().admin().indices().prepareRefresh().execute().actionGet();
@Override
public void doAfterNodes(int numNodes, Client client) throws Exception {
if (numNodes == 1) {
logger.info("--> one node is closed - start indexing data into the second one");
client.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet();
client.admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) {
assertHitCount(node2.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 3);
}
for (int i = 0; i < 10; i++) {
assertHitCount(client.prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 3);
}
logger.info("--> add some metadata, additional type and template");
node2.client().admin().indices().preparePutMapping("test").setType("type2")
.setSource(jsonBuilder().startObject().startObject("type1").startObject("_source").field("enabled", false).endObject().endObject().endObject())
.execute().actionGet();
node2.client().admin().indices().preparePutTemplate("template_1")
.setTemplate("te*")
.setOrder(0)
.addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties")
.startObject("field1").field("type", "string").field("store", "yes").endObject()
.startObject("field2").field("type", "string").field("store", "yes").field("index", "not_analyzed").endObject()
.endObject().endObject().endObject())
.execute().actionGet();
node2.client().admin().indices().prepareAliases().addAlias("test", "test_alias", FilterBuilders.termFilter("field", "value")).execute().actionGet();
logger.info("--> closing the second node");
closeNode("node2");
logger.info("--> starting two nodes back, verifying we got the latest version");
node1 = startNode("node1", settingsBuilder().put("gateway.recover_after_nodes", 2).build());
node2 = startNode("node2", settingsBuilder().put("gateway.recover_after_nodes", 2).build());
logger.info("--> add some metadata, additional type and template");
client.admin().indices().preparePutMapping("test").setType("type2")
.setSource(jsonBuilder().startObject().startObject("type1").startObject("_source").field("enabled", false).endObject().endObject().endObject())
.execute().actionGet();
client.admin().indices().preparePutTemplate("template_1")
.setTemplate("te*")
.setOrder(0)
.addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties")
.startObject("field1").field("type", "string").field("store", "yes").endObject()
.startObject("field2").field("type", "string").field("store", "yes").field("index", "not_analyzed").endObject()
.endObject().endObject().endObject())
.execute().actionGet();
client.admin().indices().prepareAliases().addAlias("test", "test_alias", FilterBuilders.termFilter("field", "value")).execute().actionGet();
logger.info("--> starting two nodes back, verifying we got the latest version");
}
}
});
logger.info("--> running cluster_health (wait for the shards to startup)");
clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet();
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
for (int i = 0; i < 10; i++) {
assertHitCount(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 3);
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 3);
}
ClusterState state = node1.client().admin().cluster().prepareState().execute().actionGet().getState();
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(state.metaData().index("test").mapping("type2"), notNullValue());
assertThat(state.metaData().templates().get("template_1").template(), equalTo("te*"));
assertThat(state.metaData().index("test").aliases().get("test_alias"), notNullValue());
@ -355,76 +328,56 @@ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests {
@Test
@Slow
public void testReusePeerRecovery() throws Exception {
buildNode("node1");
buildNode("node2");
buildNode("node3");
buildNode("node4");
cleanAndCloseNodes();
ImmutableSettings.Builder settings = ImmutableSettings.settingsBuilder()
ImmutableSettings.Builder settings = settingsBuilder()
.put("action.admin.cluster.node.shutdown.delay", "10ms")
.put("gateway.recover_after_nodes", 4)
.put(BalancedShardsAllocator.SETTING_THRESHOLD, 1.1f); // use less agressive settings
startNode("node1", settings);
startNode("node2", settings);
startNode("node3", settings);
startNode("node4", settings);
cluster().startNode(settings);
cluster().startNode(settings);
cluster().startNode(settings);
cluster().startNode(settings);
logger.info("--> indexing docs");
for (int i = 0; i < 1000; i++) {
client("node1").prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
client().prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
if ((i % 200) == 0) {
client("node1").admin().indices().prepareFlush().execute().actionGet();
client().admin().indices().prepareFlush().execute().actionGet();
}
}
logger.info("Running Cluster Health");
ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
logger.info("--> shutting down the nodes");
// Disable allocations while we are closing nodes
client("node1").admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true)).execute().actionGet();
for (int i = 1; i < 5; i++) {
closeNode("node" + i);
}
logger.info("--> start the nodes back up");
startNode("node1", settings);
startNode("node2", settings);
startNode("node3", settings);
startNode("node4", settings);
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true)).execute().actionGet();
cluster().fullRestart();
logger.info("Running Cluster Health");
clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(10)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(10)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
logger.info("--> shutting down the nodes");
// Disable allocations while we are closing nodes
client("node1").admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true)).execute().actionGet();
for (int i = 1; i < 5; i++) {
closeNode("node" + i);
}
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true)).execute().actionGet();
cluster().fullRestart();
logger.info("--> start the nodes back up");
startNode("node1", settings);
startNode("node2", settings);
startNode("node3", settings);
startNode("node4", settings);
logger.info("Running Cluster Health");
clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(10)).actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(10)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
IndicesStatusResponse statusResponse = client("node1").admin().indices().prepareStatus("test").setRecovery(true).execute().actionGet();
IndicesStatusResponse statusResponse = client().admin().indices().prepareStatus("test").setRecovery(true).execute().actionGet();
for (IndexShardStatus indexShardStatus : statusResponse.getIndex("test")) {
for (ShardStatus shardStatus : indexShardStatus) {
if (!shardStatus.getShardRouting().primary()) {
@ -441,29 +394,30 @@ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests {
@Slow
public void testRecoveryDifferentNodeOrderStartup() throws Exception {
// we need different data paths so we make sure we start the second node fresh
buildNode("node1", settingsBuilder().put("path.data", "data/data1").build());
buildNode("node2", settingsBuilder().put("path.data", "data/data2").build());
cleanAndCloseNodes();
startNode("node1", settingsBuilder().put("path.data", "data/data1").build());
final String node_1 = cluster().startNode(settingsBuilder().put("path.data", "data/data1").build());
client("node1").prepareIndex("test", "type1", "1").setSource("field", "value").execute().actionGet();
client().prepareIndex("test", "type1", "1").setSource("field", "value").execute().actionGet();
startNode("node2", settingsBuilder().put("path.data", "data/data2").build());
cluster().startNode(settingsBuilder().put("path.data", "data/data2").build());
ClusterHealthResponse health = client("node2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
closeNode("node1");
closeNode("node2");
cluster().fullRestart(new RestartCallback() {
startNode("node2", settingsBuilder().put("path.data", "data/data2").build());
@Override
public boolean doRestart(String nodeName) {
return !node_1.equals(nodeName);
}
});
health = client("node2").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
assertThat(client("node2").admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));
assertHitCount(client("node2").prepareCount("test").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 1);
assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));
assertHitCount(client().prepareCount("test").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 1);
}
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.indexlifecycle;
import com.carrotsearch.randomizedtesting.annotations.Nightly;
import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
@ -34,6 +33,7 @@ import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.elasticsearch.test.TestCluster;
import org.junit.Test;
import java.util.Map;
@ -177,12 +177,7 @@ public class IndexLifecycleActionTests extends AbstractIntegrationTest {
logger.info("Closing server1");
// kill the first server
cluster().stopRandomNode(new Predicate<Settings>() {
public boolean apply(Settings settings) {
return server_1.equals(settings.get("name"));
}
});
cluster().stopRandomNode(TestCluster.nameFilter(server_1));
// verify health
logger.info("Running Cluster Health");
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet();
@ -347,12 +342,7 @@ public class IndexLifecycleActionTests extends AbstractIntegrationTest {
logger.info("Closing server1");
// kill the first server
cluster().stopRandomNode(new Predicate<Settings>() {
public boolean apply(Settings settings) {
return server_1.equals(settings.get("name"));
}
});
cluster().stopRandomNode(TestCluster.nameFilter(server_1));
logger.info("Running Cluster Health");
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet();

View File

@ -27,6 +27,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.elasticsearch.test.TestCluster;
import org.junit.Test;
import java.io.File;
@ -69,12 +70,7 @@ public class IndicesStoreTests extends AbstractIntegrationTest {
File server2Shard = shardDirectory(node_2, "test", 0);
logger.info("--> stopping node node_2");
cluster().stopRandomNode(new Predicate<Settings>() {
public boolean apply(Settings settings) {
return settings.get("name").equals(node_2);
}
});
cluster().stopRandomNode(TestCluster.nameFilter(node_2));
assertThat(server2Shard.exists(), equalTo(true));
logger.info("--> running cluster_health");

View File

@ -25,14 +25,14 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.search.warmer.IndexWarmersMetaData;
import org.elasticsearch.test.AbstractNodesTests;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import org.elasticsearch.test.AbstractIntegrationTest.Scope;
import org.elasticsearch.test.TestCluster.RestartCallback;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
@ -40,51 +40,33 @@ import static org.hamcrest.Matchers.equalTo;
/**
*/
public class LocalGatewayIndicesWarmerTests extends AbstractNodesTests {
@ClusterScope(numNodes=0, scope=Scope.TEST)
public class LocalGatewayIndicesWarmerTests extends AbstractIntegrationTest {
private final ESLogger logger = Loggers.getLogger(LocalGatewayIndicesWarmerTests.class);
@After
public void cleanAndCloseNodes() throws Exception {
super.tearDown();
for (int i = 0; i < 10; i++) {
if (node("node" + i) != null) {
node("node" + i).stop();
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
if (((InternalNode) node("node" + i)).injector().getInstance(NodeEnvironment.class).hasNodeFile()) {
((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset();
}
}
}
closeAllNodes(false);
}
@Test
public void testStatePersistence() throws Exception {
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local"));
buildNode("node2", settingsBuilder().put("gateway.type", "local"));
cleanAndCloseNodes();
logger.info("--> starting 1 nodes");
startNode("node1", settingsBuilder().put("gateway.type", "local"));
cluster().startNode(settingsBuilder().put("gateway.type", "local"));
logger.info("--> putting two templates");
client("node1").admin().indices().prepareCreate("test")
client().admin().indices().prepareCreate("test")
.setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 1))
.execute().actionGet();
client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet();
client("node1").admin().indices().preparePutWarmer("warmer_1")
.setSearchRequest(client("node1").prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "value1")))
client().admin().indices().preparePutWarmer("warmer_1")
.setSearchRequest(client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "value1")))
.execute().actionGet();
client("node1").admin().indices().preparePutWarmer("warmer_2")
.setSearchRequest(client("node1").prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "value2")))
client().admin().indices().preparePutWarmer("warmer_2")
.setSearchRequest(client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "value2")))
.execute().actionGet();
logger.info("--> put template with warmer");
client("node1").admin().indices().preparePutTemplate("template_1")
client().admin().indices().preparePutTemplate("template_1")
.setSource("{\n" +
" \"template\" : \"xxx\",\n" +
" \"warmers\" : {\n" +
@ -102,7 +84,7 @@ public class LocalGatewayIndicesWarmerTests extends AbstractNodesTests {
logger.info("--> verify warmers are registered in cluster state");
ClusterState clusterState = client("node1").admin().cluster().prepareState().execute().actionGet().getState();
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
IndexWarmersMetaData warmersMetaData = clusterState.metaData().index("test").custom(IndexWarmersMetaData.TYPE);
assertThat(warmersMetaData, Matchers.notNullValue());
assertThat(warmersMetaData.entries().size(), equalTo(2));
@ -111,17 +93,19 @@ public class LocalGatewayIndicesWarmerTests extends AbstractNodesTests {
assertThat(templateWarmers, Matchers.notNullValue());
assertThat(templateWarmers.entries().size(), equalTo(1));
logger.info("--> close the node");
closeNode("node1");
logger.info("--> restarting the node");
cluster().fullRestart(new RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
return settingsBuilder().put("gateway.type", "local").build();
}
});
logger.info("--> starting the node again...");
startNode("node1", settingsBuilder().put("gateway.type", "local"));
ClusterHealthResponse healthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet();
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
logger.info("--> verify warmers are recovered");
clusterState = client("node1").admin().cluster().prepareState().execute().actionGet().getState();
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
IndexWarmersMetaData recoveredWarmersMetaData = clusterState.metaData().index("test").custom(IndexWarmersMetaData.TYPE);
assertThat(recoveredWarmersMetaData.entries().size(), equalTo(warmersMetaData.entries().size()));
for (int i = 0; i < warmersMetaData.entries().size(); i++) {
@ -139,25 +123,27 @@ public class LocalGatewayIndicesWarmerTests extends AbstractNodesTests {
logger.info("--> delete warmer warmer_1");
client("node1").admin().indices().prepareDeleteWarmer().setIndices("test").setName("warmer_1").execute().actionGet();
client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("warmer_1").execute().actionGet();
logger.info("--> verify warmers (delete) are registered in cluster state");
clusterState = client("node1").admin().cluster().prepareState().execute().actionGet().getState();
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
warmersMetaData = clusterState.metaData().index("test").custom(IndexWarmersMetaData.TYPE);
assertThat(warmersMetaData, Matchers.notNullValue());
assertThat(warmersMetaData.entries().size(), equalTo(1));
logger.info("--> close the node");
closeNode("node1");
logger.info("--> restarting the node");
cluster().fullRestart(new RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
return settingsBuilder().put("gateway.type", "local").build();
}
});
logger.info("--> starting the node again...");
startNode("node1", settingsBuilder().put("gateway.type", "local"));
healthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet();
healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
logger.info("--> verify warmers are recovered");
clusterState = client("node1").admin().cluster().prepareState().execute().actionGet().getState();
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
recoveredWarmersMetaData = clusterState.metaData().index("test").custom(IndexWarmersMetaData.TYPE);
assertThat(recoveredWarmersMetaData.entries().size(), equalTo(warmersMetaData.entries().size()));
for (int i = 0; i < warmersMetaData.entries().size(); i++) {

View File

@ -85,7 +85,7 @@ public class RecoveryPercolatorTests extends AbstractIntegrationTest {
.execute().actionGet();
assertThat(percolate.getMatches(), arrayWithSize(1));
cluster().restartAllNodes();
cluster().rollingRestart();
logger.info("Running Cluster Health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
@ -132,7 +132,7 @@ public class RecoveryPercolatorTests extends AbstractIntegrationTest {
.execute().actionGet();
assertThat(percolate.getMatches(), arrayWithSize(1));
cluster().restartAllNodes();
cluster().rollingRestart();
logger.info("Running Cluster Health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();

View File

@ -1,254 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test;
import com.google.common.collect.ImmutableSet;
import org.apache.lucene.util.AbstractRandomizedTest.IntegrationTests;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.node.Node;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static com.google.common.collect.Maps.newHashMap;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
@Ignore
@IntegrationTests
public abstract class AbstractNodesTests extends ElasticSearchTestCase {
private static Map<String, Node> nodes = newHashMap();
private static Map<String, Client> clients = newHashMap();
private static final Settings defaultSettings = ImmutableSettings
.settingsBuilder()
.put("cluster.name", "test-cluster-" + NetworkUtils.getLocalAddress().getHostName() + "CHILD_VM=[" + CHILD_VM_ID +"]")
.build();
public Node startNode(String id) {
return buildNode(id).start();
}
public Node startNode(String id, Settings.Builder settings) {
return startNode(id, settings.build());
}
public Node startNode(String id, Settings settings) {
return buildNode(id, settings).start();
}
public Node buildNode(String id) {
return buildNode(id, EMPTY_SETTINGS);
}
public Node buildNode(String id, Settings.Builder settings) {
return buildNode(id, settings.build());
}
public Node buildNode(String id, Settings settings) {
synchronized (AbstractNodesTests.class) {
if (nodes.containsKey(id)) {
throw new IllegalArgumentException("Node with id ["+ id + "] already exists");
}
assert !nodes.containsKey(id);
assert !clients.containsKey(id);
String settingsSource = getClass().getName().replace('.', '/') + ".yml";
Settings finalSettings = settingsBuilder()
.loadFromClasspath(settingsSource)
.put(defaultSettings)
.put(getClassDefaultSettings())
.put(settings)
.put("name", id)
.put("discovery.id.seed", randomLong())
.build();
if (finalSettings.get("gateway.type") == null) {
// default to non gateway
finalSettings = settingsBuilder().put(finalSettings).put("gateway.type", "none").build();
}
if (finalSettings.get("cluster.routing.schedule") != null) {
// decrease the routing schedule so new nodes will be added quickly
finalSettings = settingsBuilder().put(finalSettings).put("cluster.routing.schedule", "50ms").build();
}
Node node = nodeBuilder()
.settings(finalSettings)
.build();
logger.info("Build Node [{}] with settings [{}]", id, finalSettings.toDelimitedString(','));
nodes.put(id, node);
clients.put(id, node.client());
return node;
}
}
public void closeNode(String id) {
Client client;
Node node;
synchronized (AbstractNodesTests.class) {
client = clients.remove(id);
node = nodes.remove(id);
}
if (client != null) {
client.close();
}
if (node != null) {
node.close();
}
}
public List<Node> nodes() {
synchronized (AbstractNodesTests.class) {
return new ArrayList<Node>(nodes.values());
}
}
public Node node(String id) {
synchronized (AbstractNodesTests.class) {
return nodes.get(id);
}
}
public Client client(String id) {
synchronized (AbstractNodesTests.class) {
return clients.get(id);
}
}
public void closeAllNodes() {
closeAllNodes(false);
}
public void closeAllNodes(boolean preventRelocation) {
synchronized (AbstractNodesTests.class) {
if (preventRelocation) {
Settings build = ImmutableSettings.builder().put("cluster.routing.allocation.disable_allocation", true).build();
Client aClient = client();
if (aClient != null) {
aClient.admin().cluster().prepareUpdateSettings().setTransientSettings(build).execute().actionGet();
}
}
for (Client client : clients.values()) {
client.close();
}
clients.clear();
for (Node node : nodes.values()) {
node.close();
}
nodes.clear();
}
}
public ImmutableSet<ClusterBlock> waitForNoBlocks(TimeValue timeout, String node) throws InterruptedException {
long start = System.currentTimeMillis();
ImmutableSet<ClusterBlock> blocks;
do {
blocks = client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA);
}
while (!blocks.isEmpty() && (System.currentTimeMillis() - start) < timeout.millis());
return blocks;
}
public void createIndices(Client client, String... indices) {
for (String index : indices) {
client.admin().indices().prepareCreate(index).execute().actionGet();
}
}
public void wipeIndices(Client client, String... names) {
try {
client.admin().indices().prepareDelete(names).execute().actionGet();
} catch (IndexMissingException e) {
// ignore
}
}
private static volatile AbstractNodesTests testInstance; // this test class only works once per JVM
@AfterClass
public static void tearDownOnce() throws Exception {
synchronized (AbstractNodesTests.class) {
if (testInstance != null) {
testInstance.afterClass();
testInstance.closeAllNodes();
testInstance = null;
}
}
}
@BeforeClass
public static void setUpOnce() throws Exception {
synchronized (AbstractNodesTests.class) {
if (testInstance != null) {
testInstance.afterClass();
testInstance.closeAllNodes();
testInstance = null;
}
}
}
@Before
public final void setUp() throws Exception {
super.setUp();
synchronized (AbstractNodesTests.class) {
if (testInstance == null) {
testInstance = this;
testInstance.beforeClass();
} else {
assert testInstance.getClass() == this.getClass();
}
}
}
public Client client() {
synchronized (AbstractNodesTests.class) {
if (clients.isEmpty()) {
return null;
}
return clients.values().iterator().next();
}
}
protected void afterClass() throws Exception {
}
protected Settings getClassDefaultSettings() {
return ImmutableSettings.EMPTY;
}
protected void beforeClass() throws Exception {
}
}

View File

@ -385,11 +385,26 @@ public class TestCluster implements Closeable, Iterable<Client> {
}
}
void restart() {
node.close();
node = (InternalNode) nodeBuilder().settings(node.settings()).node();
void restart(RestartCallback callback) throws Exception {
assert callback != null;
if (!node.isClosed()) {
node.close();
}
Settings newSettings = callback.onNodeStopped(name);
if (newSettings == null) {
newSettings = ImmutableSettings.EMPTY;
}
if (callback.clearData(name)) {
NodeEnvironment nodeEnv = getInstanceFromNode(NodeEnvironment.class, node);
if (nodeEnv.hasNodeFile()) {
FileSystemUtils.deleteRecursively(nodeEnv.nodeDataLocations());
}
}
node = (InternalNode) nodeBuilder().settings(node.settings()).settings(newSettings).node();
resetClient();
}
@Override
public void close() {
@ -624,24 +639,79 @@ public class TestCluster implements Closeable, Iterable<Client> {
nodeAndClient.close();
}
}
public void restartRandomNode() throws Exception {
restartRandomNode(EMPTY_CALLBACK);
}
public void restartRandomNode() {
public void restartRandomNode(RestartCallback callback) throws Exception {
ensureOpen();
NodeAndClient nodeAndClient = getRandomNodeAndClient();
if (nodeAndClient != null) {
logger.info("Restarting random node [{}] ", nodeAndClient.name);
nodeAndClient.restart();
nodeAndClient.restart(callback);
}
}
public void restartAllNodes() {
private void restartAllNodes(boolean rollingRestart, RestartCallback callback) throws Exception {
ensureOpen();
logger.info("Restarting all nodes");
for (NodeAndClient nodeAndClient : nodes.values()) {
logger.info("Restarting node [{}] ", nodeAndClient.name);
nodeAndClient.restart();
List<NodeAndClient> toRemove = new ArrayList<TestCluster.NodeAndClient>();
try {
for (NodeAndClient nodeAndClient : nodes.values()) {
if (!callback.doRestart(nodeAndClient.name)) {
logger.info("Closing node [{}] during restart", nodeAndClient.name);
toRemove.add(nodeAndClient);
nodeAndClient.close();
}
}
} finally {
for (NodeAndClient nodeAndClient : toRemove) {
nodes.remove(nodeAndClient.name);
}
}
logger.info("Restarting remaining nodes rollingRestart [{}]", rollingRestart);
if (rollingRestart) {
int numNodesRestarted = 0;
for (NodeAndClient nodeAndClient : nodes.values()) {
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
logger.info("Restarting node [{}] ", nodeAndClient.name);
nodeAndClient.restart(callback);
}
} else {
int numNodesRestarted = 0;
for (NodeAndClient nodeAndClient : nodes.values()) {
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
logger.info("Stopping node [{}] ", nodeAndClient.name);
nodeAndClient.node.close();
}
for (NodeAndClient nodeAndClient : nodes.values()) {
logger.info("Starting node [{}] ", nodeAndClient.name);
nodeAndClient.restart(callback);
}
}
}
public static final RestartCallback EMPTY_CALLBACK = new RestartCallback() {
public Settings onNodeStopped(String node) {
return null;
}
};
public void fullRestart() throws Exception {
fullRestart(EMPTY_CALLBACK);
}
public void rollingRestart() throws Exception {
rollingRestart(EMPTY_CALLBACK);
}
public void rollingRestart(RestartCallback function) throws Exception {
restartAllNodes(true, function);
}
public void fullRestart(RestartCallback function) throws Exception {
restartAllNodes(false, function);
}
private String getMasterName() {
try {
@ -781,5 +851,43 @@ public class TestCluster implements Closeable, Iterable<Client> {
};
}
public static Predicate<Settings> nameFilter(String... nodeName) {
return new NodeNamePredicate(new HashSet<String>(Arrays.asList(nodeName)));
}
private static final class NodeNamePredicate implements Predicate<Settings> {
private final HashSet<String> nodeNames;
public NodeNamePredicate(HashSet<String> nodeNames) {
this.nodeNames = nodeNames;
}
@Override
public boolean apply(Settings settings) {
return nodeNames.contains(settings.get("name"));
}
}
public static abstract class RestartCallback {
public Settings onNodeStopped(String nodeName) throws Exception {
return ImmutableSettings.EMPTY;
}
public void doAfterNodes(int numNodes, Client client) throws Exception {
}
public boolean clearData(String nodeName) {
return false;
}
public boolean doRestart(String nodeName) {
return true;
}
}
}