Gateway: Allow to configure a delay till index creation from gateway will occur, closes #223.

This commit is contained in:
kimchy 2010-06-15 01:25:55 +03:00
parent e0d20af743
commit 98df1b3433
1 changed files with 40 additions and 15 deletions

View File

@ -19,7 +19,6 @@
package org.elasticsearch.gateway;
import org.elasticsearch.util.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -29,6 +28,7 @@ import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.inject.Inject;
import org.elasticsearch.util.settings.Settings;
import javax.annotation.Nullable;
@ -44,12 +44,10 @@ import static org.elasticsearch.util.TimeValue.*;
import static org.elasticsearch.util.concurrent.DynamicExecutors.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class GatewayService extends AbstractLifecycleComponent<GatewayService> implements ClusterStateListener {
private final TimeValue initialStateTimeout;
private final Gateway gateway;
private final ThreadPool threadPool;
@ -62,6 +60,12 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
private final MetaDataService metaDataService;
private final TimeValue initialStateTimeout;
private final TimeValue delayIndexCreation;
private final AtomicBoolean firstMasterRead = new AtomicBoolean();
@Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService,
@ -73,6 +77,10 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
this.threadPool = threadPool;
this.metaDataService = metaDataService;
this.initialStateTimeout = componentSettings.getAsTime("initial_state_timeout", TimeValue.timeValueSeconds(30));
// allow to control a delay of when indices will get created
// TODO we need to maintain, on the cluster state, a flag that states if it was read from the gateway or not
// so if we delay, and the first master failed to start, others will load it
this.delayIndexCreation = componentSettings.getAsTime("delay_index_creation", null);
}
@Override protected void doStart() throws ElasticSearchException {
@ -83,8 +91,8 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
if (discoveryService.initialStateReceived()) {
if (discoveryService.firstMaster()) {
if (firstMasterRead.compareAndSet(false, true)) {
boolean waited = readFromGateway(initialStateTimeout);
if (!waited) {
Boolean waited = readFromGateway(initialStateTimeout);
if (waited != null && !waited) {
logger.warn("Waited for {} for indices to be created from the gateway, and not all have been created", initialStateTimeout);
}
}
@ -146,7 +154,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
* have been created from the meta data read from the gateway. Return value only applicable
* when waiting, and indicates that everything was created within teh wait timeout.
*/
private boolean readFromGateway(@Nullable TimeValue waitTimeout) {
private Boolean readFromGateway(@Nullable TimeValue waitTimeout) {
// we are the first master, go ahead and read and create indices
logger.debug("First master in the cluster, reading state from gateway");
MetaData metaData;
@ -162,6 +170,31 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
}
final MetaData fMetaData = metaData;
final CountDownLatch latch = new CountDownLatch(fMetaData.indices().size());
if (delayIndexCreation != null) {
logger.debug("Delaying initial state index creation for [{}]", delayIndexCreation);
threadPool.schedule(new Runnable() {
@Override public void run() {
updateClusterStateFromGateway(fMetaData, latch);
}
}, delayIndexCreation);
} else {
updateClusterStateFromGateway(fMetaData, latch);
}
// if we delay indices creation, then waiting for them does not make sense
if (delayIndexCreation != null) {
return null;
}
if (waitTimeout != null) {
try {
return latch.await(waitTimeout.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore
}
}
return null;
}
private void updateClusterStateFromGateway(final MetaData fMetaData, final CountDownLatch latch) {
clusterService.submitStateUpdateTask("gateway (recovered meta-data)", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
MetaData.Builder metaDataBuilder = newMetaDataBuilder()
@ -184,13 +217,5 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
return newClusterStateBuilder().state(currentState).metaData(metaDataBuilder).build();
}
});
if (waitTimeout != null) {
try {
return latch.await(waitTimeout.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore
}
}
return false;
}
}