diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/DiscoveryService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/DiscoveryService.java index cdf4b03637f..56b123bec3c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/DiscoveryService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/DiscoveryService.java @@ -38,6 +38,8 @@ public class DiscoveryService extends AbstractLifecycleComponenttrue if the initial state was received within the timeout waiting for it + * on {@link #doStart()}. + */ + public boolean initialStateReceived() { + return initialStateReceived; + } + public String nodeDescription() { return discovery.nodeDescription(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index 0a2415608a3..7f0dbab72cc 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -25,10 +25,14 @@ import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataService; +import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.util.Nullable; +import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.component.AbstractLifecycleComponent; import org.elasticsearch.util.settings.Settings; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -44,6 +48,8 @@ import static org.elasticsearch.util.concurrent.DynamicExecutors.*; */ public class GatewayService extends AbstractLifecycleComponent implements ClusterStateListener { + private final TimeValue initialStateTimeout; + private final Gateway gateway; private final ThreadPool threadPool; @@ -52,22 +58,40 @@ public class GatewayService extends AbstractLifecycleComponent i private final ClusterService clusterService; + private final DiscoveryService discoveryService; + private final MetaDataService metaDataService; private final AtomicBoolean firstMasterRead = new AtomicBoolean(); - @Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, + @Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService, ThreadPool threadPool, MetaDataService metaDataService) { super(settings); this.gateway = gateway; this.clusterService = clusterService; + this.discoveryService = discoveryService; this.threadPool = threadPool; this.metaDataService = metaDataService; + this.initialStateTimeout = componentSettings.getAsTime("initialStateTimeout", TimeValue.timeValueSeconds(30)); } @Override protected void doStart() throws ElasticSearchException { gateway.start(); this.executor = newSingleThreadExecutor(daemonThreadFactory(settings, "gateway")); + // if we received initial state, see if we can recover within the start phase, so we hold the + // server from starting until we recovered properly + if (discoveryService.initialStateReceived()) { + if (discoveryService.firstMaster()) { + if (firstMasterRead.compareAndSet(false, true)) { + boolean waited = readFromGateway(initialStateTimeout); + if (!waited) { + logger.warn("Waited for {} for indices to be created from the gateway, and not all have been created", initialStateTimeout); + } + } + } + } else { + logger.debug("Can't wait on start for (possibly) reading state from gateway, will do it asynchronously"); + } clusterService.add(this); } @@ -89,7 +113,11 @@ public class GatewayService extends AbstractLifecycleComponent i @Override public void clusterChanged(final ClusterChangedEvent event) { if (lifecycle.started() && event.localNodeMaster()) { if (event.firstMaster() && firstMasterRead.compareAndSet(false, true)) { - readFromGateway(); + executor.execute(new Runnable() { + @Override public void run() { + readFromGateway(null); + } + }); } else { writeToGateway(event); } @@ -113,44 +141,56 @@ public class GatewayService extends AbstractLifecycleComponent i }); } - private void readFromGateway() { + /** + * Reads from the gateway. If the waitTimeout is set, will wait till all the indices + * have been created from the meta data read from the gateway. Return value only applicable + * when waiting, and indicates that everything was created within teh wait timeout. + */ + private boolean readFromGateway(@Nullable TimeValue waitTimeout) { // we are the first master, go ahead and read and create indices logger.debug("First master in the cluster, reading state from gateway"); - executor.execute(new Runnable() { - @Override public void run() { - MetaData metaData; - try { - metaData = gateway.read(); - } catch (Exception e) { - logger.error("Failed to read from gateway", e); - return; - } - if (metaData == null) { - logger.debug("No state read from gateway"); - return; - } - final MetaData fMetaData = metaData; - clusterService.submitStateUpdateTask("gateway (recovered meta-data)", new ClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - MetaData.Builder metaDataBuilder = newMetaDataBuilder() - .metaData(currentState.metaData()).maxNumberOfShardsPerNode(fMetaData.maxNumberOfShardsPerNode()); - // go over the meta data and create indices, we don't really need to copy over - // the meta data per index, since we create the index and it will be added automatically - for (final IndexMetaData indexMetaData : fMetaData) { - threadPool.execute(new Runnable() { - @Override public void run() { - try { - metaDataService.createIndex("gateway", indexMetaData.index(), indexMetaData.settings(), indexMetaData.mappings(), timeValueMillis(10)); - } catch (Exception e) { - logger.error("Failed to create index [" + indexMetaData.index() + "]", e); - } - } - }); + MetaData metaData; + try { + metaData = gateway.read(); + } catch (Exception e) { + logger.error("Failed to read from gateway", e); + return false; + } + if (metaData == null) { + logger.debug("No state read from gateway"); + return false; + } + final MetaData fMetaData = metaData; + final CountDownLatch latch = new CountDownLatch(fMetaData.indices().size()); + clusterService.submitStateUpdateTask("gateway (recovered meta-data)", new ClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + MetaData.Builder metaDataBuilder = newMetaDataBuilder() + .metaData(currentState.metaData()).maxNumberOfShardsPerNode(fMetaData.maxNumberOfShardsPerNode()); + // go over the meta data and create indices, we don't really need to copy over + // the meta data per index, since we create the index and it will be added automatically + for (final IndexMetaData indexMetaData : fMetaData) { + threadPool.execute(new Runnable() { + @Override public void run() { + try { + metaDataService.createIndex("gateway", indexMetaData.index(), indexMetaData.settings(), indexMetaData.mappings(), timeValueMillis(initialStateTimeout.millis() - 1000)); + } catch (Exception e) { + logger.error("Failed to create index [" + indexMetaData.index() + "]", e); + } finally { + latch.countDown(); + } } - return newClusterStateBuilder().state(currentState).metaData(metaDataBuilder).build(); - } - }); + }); + } + return newClusterStateBuilder().state(currentState).metaData(metaDataBuilder).build(); } }); + if (waitTimeout != null) { + try { + return latch.await(waitTimeout.millis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // ignore + } + } + return false; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/server/internal/InternalServer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/server/internal/InternalServer.java index 69473600791..fab010e1c47 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/server/internal/InternalServer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/server/internal/InternalServer.java @@ -159,7 +159,6 @@ public final class InternalServer implements Server { } injector.getInstance(IndicesService.class).start(); - injector.getInstance(GatewayService.class).start(); injector.getInstance(ClusterService.class).start(); injector.getInstance(RoutingService.class).start(); injector.getInstance(SearchService.class).start(); @@ -167,6 +166,10 @@ public final class InternalServer implements Server { injector.getInstance(RestController.class).start(); injector.getInstance(TransportService.class).start(); DiscoveryService discoService = injector.getInstance(DiscoveryService.class).start(); + + // gateway should start after disco, so it can try and recovery from gateway on "start" + injector.getInstance(GatewayService.class).start(); + if (settings.getAsBoolean("http.enabled", true)) { injector.getInstance(HttpServer.class).start(); } diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/AbstractSimpleIndexGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/AbstractSimpleIndexGatewayTests.java index add30fbe4ad..1c2c160dac0 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/AbstractSimpleIndexGatewayTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/AbstractSimpleIndexGatewayTests.java @@ -19,6 +19,8 @@ package org.elasticsearch.test.integration.gateway; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.get.GetResponse; @@ -86,10 +88,14 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractServersTes logger.info("Closing the server"); closeServer("server1"); - Thread.sleep(500); logger.info("Starting the server, should recover from the gateway (only translog should be populated)"); startServer("server1"); - Thread.sleep(1000); + + logger.info("Running Cluster Health (wait for the shards to startup)"); + ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForYellowStatus()).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW)); // verify that mapping is there clusterState = client("server1").admin().cluster().state(clusterState()).actionGet(); @@ -115,10 +121,14 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractServersTes logger.info("Closing the server"); closeServer("server1"); - Thread.sleep(500); logger.info("Starting the server, should recover from the gateway (both index and translog)"); startServer("server1"); - Thread.sleep(1000); + + logger.info("Running Cluster Health (wait for the shards to startup)"); + clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForYellowStatus()).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW)); logger.info("Getting #1, should not exists"); getResponse = client("server1").get(getRequest("test").type("type1").id("1")).actionGet(); @@ -140,10 +150,14 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractServersTes logger.info("Closing the server"); closeServer("server1"); - Thread.sleep(500); logger.info("Starting the server, should recover from the gateway (just from the index, nothing in the translog)"); startServer("server1"); - Thread.sleep(1000); + + logger.info("Running Cluster Health (wait for the shards to startup)"); + clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForYellowStatus()).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW)); logger.info("Getting #1, should not exists"); getResponse = client("server1").get(getRequest("test").type("type1").id("1")).actionGet(); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/FsMetaDataGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/FsMetaDataGatewayTests.java index e588f749c2a..9c9e428f996 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/FsMetaDataGatewayTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/FsMetaDataGatewayTests.java @@ -56,10 +56,7 @@ public class FsMetaDataGatewayTests extends AbstractServersTests { closeServer("server1"); - Thread.sleep(1000); - startServer("server1"); - Thread.sleep(3000); try { client("server1").admin().indices().create(createIndexRequest("test")).actionGet(); assert false : "index should exists"; diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/SimpleFsIndexGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/SimpleFsIndexGatewayTests.java index 905d0f6e9bf..e651e729fa1 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/SimpleFsIndexGatewayTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/SimpleFsIndexGatewayTests.java @@ -22,7 +22,7 @@ package org.elasticsearch.test.integration.gateway.fs; import org.elasticsearch.test.integration.gateway.AbstractSimpleIndexGatewayTests; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class SimpleFsIndexGatewayTests extends AbstractSimpleIndexGatewayTests { diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/SimpleFsIndexInRamIndexGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/SimpleFsIndexInRamIndexGatewayTests.java index e717a256198..3f3dc12cbec 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/SimpleFsIndexInRamIndexGatewayTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/SimpleFsIndexInRamIndexGatewayTests.java @@ -22,7 +22,7 @@ package org.elasticsearch.test.integration.gateway.fs; import org.elasticsearch.test.integration.gateway.AbstractSimpleIndexGatewayTests; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class SimpleFsIndexInRamIndexGatewayTests extends AbstractSimpleIndexGatewayTests {