diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index 9a59c4be0f7..1335d4739c6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -19,7 +19,6 @@ package org.elasticsearch.gateway; -import org.elasticsearch.util.inject.Inject; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -29,6 +28,7 @@ import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.component.AbstractLifecycleComponent; +import org.elasticsearch.util.inject.Inject; import org.elasticsearch.util.settings.Settings; import javax.annotation.Nullable; @@ -44,12 +44,10 @@ import static org.elasticsearch.util.TimeValue.*; import static org.elasticsearch.util.concurrent.DynamicExecutors.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class GatewayService extends AbstractLifecycleComponent<GatewayService> implements ClusterStateListener { - private final TimeValue initialStateTimeout; - private final Gateway gateway; private final ThreadPool threadPool; @@ -62,6 +60,12 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i private final MetaDataService metaDataService; + + private final TimeValue initialStateTimeout; + + private final TimeValue delayIndexCreation; + + private final AtomicBoolean firstMasterRead = new AtomicBoolean(); @Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService, @@ -73,6 +77,10 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i this.threadPool = threadPool; this.metaDataService = metaDataService; this.initialStateTimeout = componentSettings.getAsTime("initial_state_timeout", TimeValue.timeValueSeconds(30)); + // allow to control a delay of when indices will get created + // TODO we need to maintain, on the cluster state, a flag that states if it was read from the gateway or not + // so if we delay, and the first master failed to start, others will load it + this.delayIndexCreation = componentSettings.getAsTime("delay_index_creation", null); } @Override protected void doStart() throws ElasticSearchException { @@ -83,8 +91,8 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i if (discoveryService.initialStateReceived()) { if (discoveryService.firstMaster()) { if (firstMasterRead.compareAndSet(false, true)) { - boolean waited = readFromGateway(initialStateTimeout); - if (!waited) { + Boolean waited = readFromGateway(initialStateTimeout); + if (waited != null && !waited) { logger.warn("Waited for {} for indices to be created from the gateway, and not all have been created", initialStateTimeout); } } @@ -146,7 +154,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i * have been created from the meta data read from the gateway. Return value only applicable * when waiting, and indicates that everything was created within teh wait timeout. */ - private boolean readFromGateway(@Nullable TimeValue waitTimeout) { + private Boolean readFromGateway(@Nullable TimeValue waitTimeout) { // we are the first master, go ahead and read and create indices logger.debug("First master in the cluster, reading state from gateway"); MetaData metaData; @@ -162,6 +170,31 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i } final MetaData fMetaData = metaData; final CountDownLatch latch = new CountDownLatch(fMetaData.indices().size()); + if (delayIndexCreation != null) { + logger.debug("Delaying initial state index creation for [{}]", delayIndexCreation); + threadPool.schedule(new Runnable() { + @Override public void run() { + updateClusterStateFromGateway(fMetaData, latch); + } + }, delayIndexCreation); + } else { + updateClusterStateFromGateway(fMetaData, latch); + } + // if we delay indices creation, then waiting for them does not make sense + if (delayIndexCreation != null) { + return null; + } + if (waitTimeout != null) { + try { + return latch.await(waitTimeout.millis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // ignore + } + } + return null; + } + + private void updateClusterStateFromGateway(final MetaData fMetaData, final CountDownLatch latch) { clusterService.submitStateUpdateTask("gateway (recovered meta-data)", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { MetaData.Builder metaDataBuilder = newMetaDataBuilder() @@ -184,13 +217,5 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i return newClusterStateBuilder().state(currentState).metaData(metaDataBuilder).build(); } }); - if (waitTimeout != null) { - try { - return latch.await(waitTimeout.millis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // ignore - } - } - return false; } }