add another test for local gateway, fix race when loading the current state of a node

This commit is contained in:
kimchy 2010-08-30 19:29:00 +03:00
parent 908fba44e7
commit 61764c5b69
2 changed files with 80 additions and 25 deletions

View File

@ -77,6 +77,8 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
private volatile LocalGatewayState currentState;
private volatile boolean initialized = false;
@Inject public LocalGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
NodeEnvironment nodeEnv, TransportNodesListGatewayState listGatewayState) {
super(settings);
@ -91,28 +93,12 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
}
public LocalGatewayState currentState() {
lazyInitialize();
return this.currentState;
}
@Override protected void doStart() throws ElasticSearchException {
// if this is not a possible master node or data node, bail, we won't save anything here...
if (!clusterService.state().nodes().localNode().masterNode() || !clusterService.state().nodes().localNode().dataNode()) {
location = null;
return;
}
// create the location where the state will be stored
this.location = new File(nodeEnv.nodeLocation(), "_state");
this.location.mkdirs();
try {
long version = findLatestStateVersion();
if (version != -1) {
this.currentState = readState(Streams.copyToByteArray(new FileInputStream(new File(location, "state-" + version))));
}
} catch (Exception e) {
logger.warn("failed to read local state", e);
}
lazyInitialize();
clusterService.add(this);
}
@ -283,6 +269,37 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
}
}
/**
* We do here lazy initialization on not only on start(), since we might be called before start by another node (really will
* happen in term of timing in testing, but still), and we want to return the cluster state when we can.
*
* It is synchronized since we want to wait for it to be loaded if called concurrently. There should really be a nicer
* solution here, but for now, its good enough.
*/
private synchronized void lazyInitialize() {
if (initialized) {
return;
}
initialized = true;
// if this is not a possible master node or data node, bail, we won't save anything here...
if (!clusterService.state().nodes().localNode().masterNode() || !clusterService.state().nodes().localNode().dataNode()) {
location = null;
} else {
// create the location where the state will be stored
this.location = new File(nodeEnv.nodeLocation(), "_state");
this.location.mkdirs();
try {
long version = findLatestStateVersion();
if (version != -1) {
this.currentState = readState(Streams.copyToByteArray(new FileInputStream(new File(location, "state-" + version))));
}
} catch (Exception e) {
logger.warn("failed to read local state", e);
}
}
}
private long findLatestStateVersion() throws IOException {
long index = -1;
for (File stateFile : location.listFiles()) {

View File

@ -40,18 +40,20 @@ import static org.hamcrest.Matchers.*;
*/
public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests {
@AfterMethod public void closeNodes() throws Exception {
node("node1").stop();
@AfterMethod public void cleanAndCloseNodes() throws Exception {
for (int i = 0; i < 10; i++) {
if (node("node" + i) != null) {
node("node" + i).stop();
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
((InternalNode) node("node1")).injector().getInstance(Gateway.class).reset();
((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset();
}
}
closeAllNodes();
}
@Test public void testSingleNode() throws Exception {
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
((InternalNode) node("node1")).injector().getInstance(Gateway.class).reset();
closeAllNodes();
cleanAndCloseNodes();
Node node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).build());
node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
@ -72,4 +74,40 @@ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests {
assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l));
}
@Test public void testTwoNodeFirstNodeCleared() throws Exception {
// clean two nodes
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
Node node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).build());
Node node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).build());
node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
node1.client().admin().indices().prepareFlush().execute().actionGet();
node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet();
node1.client().admin().indices().prepareRefresh().execute().actionGet();
assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l));
logger.info("--> closing nodes");
closeNode("node1");
closeNode("node2");
logger.info("--> cleaning node1 gateway");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("gateway.recover_after_nodes", 2).build());
node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").put("gateway.recover_after_nodes", 2).build());
logger.info("Running Cluster Health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l));
}
}