move handling of recovered metadata to GatewayService, reducing code duplication in different gateway implementations

This commit is contained in:
kimchy 2011-01-18 14:38:10 +02:00
parent a4c2087511
commit 87d5a92edb
9 changed files with 122 additions and 173 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.gateway;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
@ -36,7 +37,7 @@ public interface Gateway extends LifecycleComponent<Gateway> {
void reset() throws Exception;
interface GatewayStateRecoveredListener {
void onSuccess();
void onSuccess(ClusterState recoveredState);
void onFailure(Throwable t);
}

View File

@ -25,7 +25,10 @@ import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -35,12 +38,16 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.common.unit.TimeValue.*;
/**
* @author kimchy (shay.banon)
@ -57,6 +64,8 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
private final DiscoveryService discoveryService;
private final MetaDataCreateIndexService createIndexService;
private final TimeValue initialStateTimeout;
private final TimeValue recoverAfterTime;
private final int recoverAfterNodes;
@ -70,11 +79,12 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
private final AtomicBoolean recovered = new AtomicBoolean();
private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
@Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService, ThreadPool threadPool) {
@Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService, MetaDataCreateIndexService createIndexService, ThreadPool threadPool) {
super(settings);
this.gateway = gateway;
this.clusterService = clusterService;
this.discoveryService = discoveryService;
this.createIndexService = createIndexService;
this.threadPool = threadPool;
this.initialStateTimeout = componentSettings.getAsTime("initial_state_timeout", TimeValue.timeValueSeconds(30));
// allow to control a delay of when indices will get created
@ -176,17 +186,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
private void performStateRecovery(@Nullable TimeValue timeout, boolean ignoreTimeout) {
final CountDownLatch latch = new CountDownLatch(1);
final Gateway.GatewayStateRecoveredListener recoveryListener = new Gateway.GatewayStateRecoveredListener() {
@Override public void onSuccess() {
markMetaDataAsReadFromGateway("success");
latch.countDown();
}
@Override public void onFailure(Throwable t) {
markMetaDataAsReadFromGateway("failure [" + t.getMessage() + "]");
latch.countDown();
}
};
final Gateway.GatewayStateRecoveredListener recoveryListener = new GatewayRecoveryListener(latch);
if (!ignoreTimeout && recoverAfterTime != null) {
if (scheduledRecovery.compareAndSet(false, true)) {
@ -214,6 +214,87 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
}
}
class GatewayRecoveryListener implements Gateway.GatewayStateRecoveredListener {
private final CountDownLatch latch;
GatewayRecoveryListener(CountDownLatch latch) {
this.latch = latch;
}
@Override public void onSuccess(final ClusterState recoveredState) {
final AtomicInteger indicesCounter = new AtomicInteger(recoveredState.metaData().indices().size());
clusterService.submitStateUpdateTask("local-gateway-elected-state", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
MetaData.Builder metaDataBuilder = newMetaDataBuilder()
.metaData(currentState.metaData());
// mark the metadata as read from gateway
metaDataBuilder.markAsRecoveredFromGateway();
// add the index templates
for (Map.Entry<String, IndexTemplateMetaData> entry : recoveredState.metaData().templates().entrySet()) {
metaDataBuilder.put(entry.getValue());
}
return newClusterStateBuilder().state(currentState)
.version(recoveredState.version())
.metaData(metaDataBuilder).build();
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
if (recoveredState.metaData().indices().isEmpty()) {
markMetaDataAsReadFromGateway("success");
latch.countDown();
return;
}
// go over the meta data and create indices, we don't really need to copy over
// the meta data per index, since we create the index and it will be added automatically
for (final IndexMetaData indexMetaData : recoveredState.metaData()) {
try {
createIndexService.createIndex(new MetaDataCreateIndexService.Request(MetaDataCreateIndexService.Request.Origin.GATEWAY, "gateway", indexMetaData.index())
.settings(indexMetaData.settings())
.mappingsMetaData(indexMetaData.mappings())
.state(indexMetaData.state())
.timeout(timeValueSeconds(30)),
new MetaDataCreateIndexService.Listener() {
@Override public void onResponse(MetaDataCreateIndexService.Response response) {
if (indicesCounter.decrementAndGet() == 0) {
markMetaDataAsReadFromGateway("success");
latch.countDown();
}
}
@Override public void onFailure(Throwable t) {
logger.error("failed to create index [{}]", t, indexMetaData.index());
// we report success on index creation failure and do nothing
// should we disable writing the updated metadata?
if (indicesCounter.decrementAndGet() == 0) {
markMetaDataAsReadFromGateway("success");
latch.countDown();
}
}
});
} catch (IOException e) {
logger.error("failed to create index [{}]", e, indexMetaData.index());
// we report success on index creation failure and do nothing
// should we disable writing the updated metadata?
if (indicesCounter.decrementAndGet() == 0) {
markMetaDataAsReadFromGateway("success");
latch.countDown();
}
}
}
}
});
}
@Override public void onFailure(Throwable t) {
// don't remove the block here, we don't want to allow anything in such a case
logger.error("failed recover state, blocking...", t);
}
}
private void markMetaDataAsReadFromGateway(String reason) {
clusterService.submitStateUpdateTask("gateway (marked as read, reason=" + reason + ")", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {

View File

@ -22,7 +22,6 @@ package org.elasticsearch.gateway.blobstore;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.*;
import org.elasticsearch.common.collect.ImmutableMap;
@ -55,8 +54,8 @@ public abstract class BlobStoreGateway extends SharedStorageGateway {
private volatile int currentIndex;
protected BlobStoreGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService) {
super(settings, clusterService, createIndexService);
protected BlobStoreGateway(Settings settings, ClusterService clusterService) {
super(settings, clusterService);
}
protected void initialize(BlobStore blobStore, ClusterName clusterName, @Nullable ByteSizeValue defaultChunkSize) throws IOException {

View File

@ -22,7 +22,6 @@ package org.elasticsearch.gateway.fs;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
@ -46,9 +45,9 @@ public class FsGateway extends BlobStoreGateway {
private final ExecutorService concurrentStreamPool;
@Inject public FsGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
@Inject public FsGateway(Settings settings, ClusterService clusterService,
Environment environment, ClusterName clusterName, ThreadPool threadPool) throws IOException {
super(settings, clusterService, createIndexService);
super(settings, clusterService);
File gatewayFile;
String location = componentSettings.get("location");

View File

@ -21,11 +21,10 @@ package org.elasticsearch.gateway.local;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.MutableShardRouting;
@ -45,16 +44,11 @@ import org.elasticsearch.gateway.GatewayException;
import org.elasticsearch.index.gateway.local.LocalIndexGatewayModule;
import java.io.*;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.concurrent.Executors.*;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.common.unit.TimeValue.*;
import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
/**
@ -68,8 +62,6 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
private final NodeEnvironment nodeEnv;
private final MetaDataCreateIndexService createIndexService;
private final TransportNodesListGatewayMetaState listGatewayMetaState;
private final TransportNodesListGatewayStartedShards listGatewayStartedShards;
@ -82,11 +74,10 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
private volatile boolean initialized = false;
@Inject public LocalGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
NodeEnvironment nodeEnv, TransportNodesListGatewayMetaState listGatewayMetaState, TransportNodesListGatewayStartedShards listGatewayStartedShards) {
@Inject public LocalGateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv,
TransportNodesListGatewayMetaState listGatewayMetaState, TransportNodesListGatewayStartedShards listGatewayStartedShards) {
super(settings);
this.clusterService = clusterService;
this.createIndexService = createIndexService;
this.nodeEnv = nodeEnv;
this.listGatewayMetaState = listGatewayMetaState.initGateway(this);
this.listGatewayStartedShards = listGatewayStartedShards.initGateway(this);
@ -149,67 +140,11 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
}
if (electedState == null) {
logger.debug("no state elected");
listener.onSuccess();
return;
}
listener.onSuccess(ClusterState.builder().build());
} else {
logger.debug("elected state from [{}]", electedState.node());
final LocalGatewayMetaState state = electedState.state();
final AtomicInteger indicesCounter = new AtomicInteger(state.metaData().indices().size());
clusterService.submitStateUpdateTask("local-gateway-elected-state", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
MetaData.Builder metaDataBuilder = newMetaDataBuilder()
.metaData(currentState.metaData());
// mark the metadata as read from gateway
metaDataBuilder.markAsRecoveredFromGateway();
// add the index templates
for (Map.Entry<String, IndexTemplateMetaData> entry : state.metaData().templates().entrySet()) {
metaDataBuilder.put(entry.getValue());
listener.onSuccess(ClusterState.builder().version(electedState.state().version()).metaData(electedState.state().metaData()).build());
}
return newClusterStateBuilder().state(currentState)
.version(state.version())
.metaData(metaDataBuilder).build();
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
if (state.metaData().indices().isEmpty()) {
listener.onSuccess();
return;
}
// go over the meta data and create indices, we don't really need to copy over
// the meta data per index, since we create the index and it will be added automatically
for (final IndexMetaData indexMetaData : state.metaData()) {
try {
createIndexService.createIndex(new MetaDataCreateIndexService.Request(MetaDataCreateIndexService.Request.Origin.GATEWAY, "gateway", indexMetaData.index())
.settings(indexMetaData.settings())
.mappingsMetaData(indexMetaData.mappings())
.state(indexMetaData.state())
.timeout(timeValueSeconds(30)),
new MetaDataCreateIndexService.Listener() {
@Override public void onResponse(MetaDataCreateIndexService.Response response) {
if (indicesCounter.decrementAndGet() == 0) {
listener.onSuccess();
}
}
@Override public void onFailure(Throwable t) {
logger.error("failed to create index [{}]", t, indexMetaData.index());
// we report success on index creation failure and do nothing
// should we disable writing the updated metadata?
if (indicesCounter.decrementAndGet() == 0) {
listener.onSuccess();
}
}
});
} catch (IOException e) {
logger.error("failed to create index [{}]", e, indexMetaData.index());
}
}
}
});
}
@Override public Class<? extends Module> suggestIndexGateway() {

View File

@ -20,6 +20,7 @@
package org.elasticsearch.gateway.none;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
@ -58,7 +59,7 @@ public class NoneGateway extends AbstractLifecycleComponent<Gateway> implements
@Override public void performStateRecovery(GatewayStateRecoveredListener listener) throws GatewayException {
logger.debug("performing state recovery");
listener.onSuccess();
listener.onSuccess(ClusterState.builder().build());
}
@Override public Class<? extends Module> suggestIndexGateway() {

View File

@ -20,27 +20,21 @@
package org.elasticsearch.gateway.shared;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.GatewayException;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.concurrent.Executors.*;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.common.unit.TimeValue.*;
import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
/**
@ -50,16 +44,13 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
private final ClusterService clusterService;
private final MetaDataCreateIndexService createIndexService;
private volatile boolean performedStateRecovery = false;
private volatile ExecutorService executor;
public SharedStorageGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService) {
public SharedStorageGateway(Settings settings, ClusterService clusterService) {
super(settings);
this.clusterService = clusterService;
this.createIndexService = createIndexService;
}
@Override protected void doStart() throws ElasticSearchException {
@ -92,9 +83,9 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
logger.debug("read state from gateway {}, took {}", this, stopWatch.stop().totalTime());
if (metaData == null) {
logger.debug("no state read from gateway");
listener.onSuccess();
listener.onSuccess(ClusterState.builder().build());
} else {
updateClusterStateFromGateway(metaData, listener);
listener.onSuccess(ClusterState.builder().metaData(metaData).build());
}
} catch (Exception e) {
logger.error("failed to read from gateway", e);
@ -131,62 +122,6 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
}
}
private void updateClusterStateFromGateway(final MetaData fMetaData, final GatewayStateRecoveredListener listener) {
final AtomicInteger indicesCounter = new AtomicInteger(fMetaData.indices().size());
clusterService.submitStateUpdateTask("gateway (recovered meta-data)", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
MetaData.Builder metaDataBuilder = newMetaDataBuilder()
.metaData(currentState.metaData());
// mark the metadata as read from gateway
metaDataBuilder.markAsRecoveredFromGateway();
// add the index templates
for (Map.Entry<String, IndexTemplateMetaData> entry : fMetaData.templates().entrySet()) {
metaDataBuilder.put(entry.getValue());
}
return newClusterStateBuilder().state(currentState).metaData(metaDataBuilder).build();
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
if (fMetaData.indices().isEmpty()) {
listener.onSuccess();
return;
}
// go over the meta data and create indices, we don't really need to copy over
// the meta data per index, since we create the index and it will be added automatically
for (final IndexMetaData indexMetaData : fMetaData) {
try {
createIndexService.createIndex(new MetaDataCreateIndexService.Request(MetaDataCreateIndexService.Request.Origin.GATEWAY, "gateway", indexMetaData.index())
.settings(indexMetaData.settings())
.mappingsMetaData(indexMetaData.mappings())
.state(indexMetaData.state())
.timeout(timeValueSeconds(30)),
new MetaDataCreateIndexService.Listener() {
@Override public void onResponse(MetaDataCreateIndexService.Response response) {
if (indicesCounter.decrementAndGet() == 0) {
listener.onSuccess();
}
}
@Override public void onFailure(Throwable t) {
logger.error("failed to create index [{}]", t, indexMetaData.index());
// we report success on index creation failure and do nothing
// should we disable writing the updated metadata?
if (indicesCounter.decrementAndGet() == 0) {
listener.onSuccess();
}
}
});
} catch (IOException e) {
logger.error("failed to create index [{}]", e, indexMetaData.index());
}
}
}
});
}
protected abstract MetaData read() throws ElasticSearchException;
protected abstract void write(MetaData metaData) throws ElasticSearchException;

View File

@ -25,7 +25,6 @@ import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cloud.aws.blobstore.S3BlobStore;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
@ -48,9 +47,9 @@ public class S3Gateway extends BlobStoreGateway {
private final ExecutorService concurrentStreamPool;
@Inject public S3Gateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
@Inject public S3Gateway(Settings settings, ClusterService clusterService,
ClusterName clusterName, ThreadPool threadPool, AwsS3Service s3Service) throws IOException {
super(settings, clusterService, createIndexService);
super(settings, clusterService);
String bucket = componentSettings.get("bucket");
if (bucket == null) {

View File

@ -26,7 +26,6 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.common.blobstore.hdfs.HdfsBlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
@ -53,9 +52,9 @@ public class HdfsGateway extends BlobStoreGateway {
private final ExecutorService concurrentStreamPool;
@Inject public HdfsGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
@Inject public HdfsGateway(Settings settings, ClusterService clusterService,
ClusterName clusterName, ThreadPool threadPool) throws IOException {
super(settings, clusterService, createIndexService);
super(settings, clusterService);
this.closeFileSystem = componentSettings.getAsBoolean("close_fs", true);
String uri = componentSettings.get("uri");