diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java index 59f9b5c67b5..fcce3642298 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -77,6 +77,8 @@ public class LocalGateway extends AbstractLifecycleComponent implements private volatile LocalGatewayState currentState; + private volatile boolean initialized = false; + @Inject public LocalGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService, NodeEnvironment nodeEnv, TransportNodesListGatewayState listGatewayState) { super(settings); @@ -91,28 +93,12 @@ public class LocalGateway extends AbstractLifecycleComponent implements } public LocalGatewayState currentState() { + lazyInitialize(); return this.currentState; } @Override protected void doStart() throws ElasticSearchException { - // if this is not a possible master node or data node, bail, we won't save anything here... - if (!clusterService.state().nodes().localNode().masterNode() || !clusterService.state().nodes().localNode().dataNode()) { - location = null; - return; - } - // create the location where the state will be stored - this.location = new File(nodeEnv.nodeLocation(), "_state"); - this.location.mkdirs(); - - try { - long version = findLatestStateVersion(); - if (version != -1) { - this.currentState = readState(Streams.copyToByteArray(new FileInputStream(new File(location, "state-" + version)))); - } - } catch (Exception e) { - logger.warn("failed to read local state", e); - } - + lazyInitialize(); clusterService.add(this); } @@ -283,6 +269,37 @@ public class LocalGateway extends AbstractLifecycleComponent implements } } + /** + * We do here lazy initialization on not only on start(), since we might be called before start by another node (really will + * happen in term of timing in testing, but still), and we want to return the cluster state when we can. + * + * It is synchronized since we want to wait for it to be loaded if called concurrently. There should really be a nicer + * solution here, but for now, its good enough. + */ + private synchronized void lazyInitialize() { + if (initialized) { + return; + } + initialized = true; + + // if this is not a possible master node or data node, bail, we won't save anything here... + if (!clusterService.state().nodes().localNode().masterNode() || !clusterService.state().nodes().localNode().dataNode()) { + location = null; + } else { + // create the location where the state will be stored + this.location = new File(nodeEnv.nodeLocation(), "_state"); + this.location.mkdirs(); + try { + long version = findLatestStateVersion(); + if (version != -1) { + this.currentState = readState(Streams.copyToByteArray(new FileInputStream(new File(location, "state-" + version)))); + } + } catch (Exception e) { + logger.warn("failed to read local state", e); + } + } + } + private long findLatestStateVersion() throws IOException { long index = -1; for (File stateFile : location.listFiles()) { diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/SimpleRecoveryLocalGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/SimpleRecoveryLocalGatewayTests.java index 6eae0f0df70..bbf01e27077 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/SimpleRecoveryLocalGatewayTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/SimpleRecoveryLocalGatewayTests.java @@ -40,18 +40,20 @@ import static org.hamcrest.Matchers.*; */ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests { - @AfterMethod public void closeNodes() throws Exception { - node("node1").stop(); - // since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well - ((InternalNode) node("node1")).injector().getInstance(Gateway.class).reset(); + @AfterMethod public void cleanAndCloseNodes() throws Exception { + for (int i = 0; i < 10; i++) { + if (node("node" + i) != null) { + node("node" + i).stop(); + // since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well + ((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset(); + } + } closeAllNodes(); } @Test public void testSingleNode() throws Exception { buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); - // since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well - ((InternalNode) node("node1")).injector().getInstance(Gateway.class).reset(); - closeAllNodes(); + cleanAndCloseNodes(); Node node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).build()); node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet(); @@ -72,4 +74,40 @@ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests { assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l)); } + + @Test public void testTwoNodeFirstNodeCleared() throws Exception { + // clean two nodes + buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); + buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); + cleanAndCloseNodes(); + + Node node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).build()); + Node node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).build()); + + node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet(); + node1.client().admin().indices().prepareFlush().execute().actionGet(); + node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet(); + node1.client().admin().indices().prepareRefresh().execute().actionGet(); + + assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l)); + + logger.info("--> closing nodes"); + closeNode("node1"); + closeNode("node2"); + + logger.info("--> cleaning node1 gateway"); + buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); + cleanAndCloseNodes(); + + node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("gateway.recover_after_nodes", 2).build()); + node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").put("gateway.recover_after_nodes", 2).build()); + + logger.info("Running Cluster Health (wait for the shards to startup)"); + ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + + assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l)); + } }