From 87d5a92edb16a07138e7df90ac57bb6f84d53c4c Mon Sep 17 00:00:00 2001 From: kimchy Date: Tue, 18 Jan 2011 14:38:10 +0200 Subject: [PATCH] move handling of recovered metadata to GatewayService, reducing code duplication in different gateway implementations --- .../org/elasticsearch/gateway/Gateway.java | 3 +- .../elasticsearch/gateway/GatewayService.java | 105 ++++++++++++++++-- .../gateway/blobstore/BlobStoreGateway.java | 5 +- .../elasticsearch/gateway/fs/FsGateway.java | 5 +- .../gateway/local/LocalGateway.java | 85 ++------------ .../gateway/none/NoneGateway.java | 3 +- .../gateway/shared/SharedStorageGateway.java | 79 ++----------- .../elasticsearch/gateway/s3/S3Gateway.java | 5 +- .../gateway/hdfs/HdfsGateway.java | 5 +- 9 files changed, 122 insertions(+), 173 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/Gateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/Gateway.java index f9775d333cf..76f26143297 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/Gateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/Gateway.java @@ -19,6 +19,7 @@ package org.elasticsearch.gateway; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Module; @@ -36,7 +37,7 @@ public interface Gateway extends LifecycleComponent { void reset() throws Exception; interface GatewayStateRecoveredListener { - void onSuccess(); + void onSuccess(ClusterState recoveredState); void onFailure(Throwable t); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index c0ea48d2ab8..784d83a0834 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -25,7 +25,10 @@ import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -35,12 +38,16 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.ClusterState.*; import static org.elasticsearch.cluster.metadata.MetaData.*; +import static org.elasticsearch.common.unit.TimeValue.*; /** * @author kimchy (shay.banon) @@ -57,6 +64,8 @@ public class GatewayService extends AbstractLifecycleComponent i private final DiscoveryService discoveryService; + private final MetaDataCreateIndexService createIndexService; + private final TimeValue initialStateTimeout; private final TimeValue recoverAfterTime; private final int recoverAfterNodes; @@ -70,11 +79,12 @@ public class GatewayService extends AbstractLifecycleComponent i private final AtomicBoolean recovered = new AtomicBoolean(); private final AtomicBoolean scheduledRecovery = new AtomicBoolean(); - @Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService, ThreadPool threadPool) { + @Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService, MetaDataCreateIndexService createIndexService, ThreadPool threadPool) { super(settings); this.gateway = gateway; this.clusterService = clusterService; this.discoveryService = discoveryService; + this.createIndexService = createIndexService; this.threadPool = threadPool; this.initialStateTimeout = componentSettings.getAsTime("initial_state_timeout", TimeValue.timeValueSeconds(30)); // allow to control a delay of when indices will get created @@ -176,17 +186,7 @@ public class GatewayService extends AbstractLifecycleComponent i private void performStateRecovery(@Nullable TimeValue timeout, boolean ignoreTimeout) { final CountDownLatch latch = new CountDownLatch(1); - final Gateway.GatewayStateRecoveredListener recoveryListener = new Gateway.GatewayStateRecoveredListener() { - @Override public void onSuccess() { - markMetaDataAsReadFromGateway("success"); - latch.countDown(); - } - - @Override public void onFailure(Throwable t) { - markMetaDataAsReadFromGateway("failure [" + t.getMessage() + "]"); - latch.countDown(); - } - }; + final Gateway.GatewayStateRecoveredListener recoveryListener = new GatewayRecoveryListener(latch); if (!ignoreTimeout && recoverAfterTime != null) { if (scheduledRecovery.compareAndSet(false, true)) { @@ -214,6 +214,87 @@ public class GatewayService extends AbstractLifecycleComponent i } } + class GatewayRecoveryListener implements Gateway.GatewayStateRecoveredListener { + + private final CountDownLatch latch; + + GatewayRecoveryListener(CountDownLatch latch) { + this.latch = latch; + } + + @Override public void onSuccess(final ClusterState recoveredState) { + final AtomicInteger indicesCounter = new AtomicInteger(recoveredState.metaData().indices().size()); + clusterService.submitStateUpdateTask("local-gateway-elected-state", new ProcessedClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + MetaData.Builder metaDataBuilder = newMetaDataBuilder() + .metaData(currentState.metaData()); + // mark the metadata as read from gateway + metaDataBuilder.markAsRecoveredFromGateway(); + + // add the index templates + for (Map.Entry entry : recoveredState.metaData().templates().entrySet()) { + metaDataBuilder.put(entry.getValue()); + } + + return newClusterStateBuilder().state(currentState) + .version(recoveredState.version()) + .metaData(metaDataBuilder).build(); + } + + @Override public void clusterStateProcessed(ClusterState clusterState) { + if (recoveredState.metaData().indices().isEmpty()) { + markMetaDataAsReadFromGateway("success"); + latch.countDown(); + return; + } + // go over the meta data and create indices, we don't really need to copy over + // the meta data per index, since we create the index and it will be added automatically + for (final IndexMetaData indexMetaData : recoveredState.metaData()) { + try { + createIndexService.createIndex(new MetaDataCreateIndexService.Request(MetaDataCreateIndexService.Request.Origin.GATEWAY, "gateway", indexMetaData.index()) + .settings(indexMetaData.settings()) + .mappingsMetaData(indexMetaData.mappings()) + .state(indexMetaData.state()) + .timeout(timeValueSeconds(30)), + + new MetaDataCreateIndexService.Listener() { + @Override public void onResponse(MetaDataCreateIndexService.Response response) { + if (indicesCounter.decrementAndGet() == 0) { + markMetaDataAsReadFromGateway("success"); + latch.countDown(); + } + } + + @Override public void onFailure(Throwable t) { + logger.error("failed to create index [{}]", t, indexMetaData.index()); + // we report success on index creation failure and do nothing + // should we disable writing the updated metadata? + if (indicesCounter.decrementAndGet() == 0) { + markMetaDataAsReadFromGateway("success"); + latch.countDown(); + } + } + }); + } catch (IOException e) { + logger.error("failed to create index [{}]", e, indexMetaData.index()); + // we report success on index creation failure and do nothing + // should we disable writing the updated metadata? + if (indicesCounter.decrementAndGet() == 0) { + markMetaDataAsReadFromGateway("success"); + latch.countDown(); + } + } + } + } + }); + } + + @Override public void onFailure(Throwable t) { + // don't remove the block here, we don't want to allow anything in such a case + logger.error("failed recover state, blocking...", t); + } + } + private void markMetaDataAsReadFromGateway(String reason) { clusterService.submitStateUpdateTask("gateway (marked as read, reason=" + reason + ")", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java index c805f21ab95..874f16a74ec 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java @@ -22,7 +22,6 @@ package org.elasticsearch.gateway.blobstore; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.*; import org.elasticsearch.common.collect.ImmutableMap; @@ -55,8 +54,8 @@ public abstract class BlobStoreGateway extends SharedStorageGateway { private volatile int currentIndex; - protected BlobStoreGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService) { - super(settings, clusterService, createIndexService); + protected BlobStoreGateway(Settings settings, ClusterService clusterService) { + super(settings, clusterService); } protected void initialize(BlobStore blobStore, ClusterName clusterName, @Nullable ByteSizeValue defaultChunkSize) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java index e0a28286c8a..6ee119c1e15 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java @@ -22,7 +22,6 @@ package org.elasticsearch.gateway.fs; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.common.blobstore.fs.FsBlobStore; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; @@ -46,9 +45,9 @@ public class FsGateway extends BlobStoreGateway { private final ExecutorService concurrentStreamPool; - @Inject public FsGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService, + @Inject public FsGateway(Settings settings, ClusterService clusterService, Environment environment, ClusterName clusterName, ThreadPool threadPool) throws IOException { - super(settings, clusterService, createIndexService); + super(settings, clusterService); File gatewayFile; String location = componentSettings.get("location"); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java index fb58aa2ddec..b58289ecc9f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -21,11 +21,10 @@ package org.elasticsearch.gateway.local; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.FailedNodeException; -import org.elasticsearch.cluster.*; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.MutableShardRouting; @@ -45,16 +44,11 @@ import org.elasticsearch.gateway.GatewayException; import org.elasticsearch.index.gateway.local.LocalIndexGatewayModule; import java.io.*; -import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import static java.util.concurrent.Executors.*; -import static org.elasticsearch.cluster.ClusterState.*; -import static org.elasticsearch.cluster.metadata.MetaData.*; -import static org.elasticsearch.common.unit.TimeValue.*; import static org.elasticsearch.common.util.concurrent.EsExecutors.*; /** @@ -68,8 +62,6 @@ public class LocalGateway extends AbstractLifecycleComponent implements private final NodeEnvironment nodeEnv; - private final MetaDataCreateIndexService createIndexService; - private final TransportNodesListGatewayMetaState listGatewayMetaState; private final TransportNodesListGatewayStartedShards listGatewayStartedShards; @@ -82,11 +74,10 @@ public class LocalGateway extends AbstractLifecycleComponent implements private volatile boolean initialized = false; - @Inject public LocalGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService, - NodeEnvironment nodeEnv, TransportNodesListGatewayMetaState listGatewayMetaState, TransportNodesListGatewayStartedShards listGatewayStartedShards) { + @Inject public LocalGateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, + TransportNodesListGatewayMetaState listGatewayMetaState, TransportNodesListGatewayStartedShards listGatewayStartedShards) { super(settings); this.clusterService = clusterService; - this.createIndexService = createIndexService; this.nodeEnv = nodeEnv; this.listGatewayMetaState = listGatewayMetaState.initGateway(this); this.listGatewayStartedShards = listGatewayStartedShards.initGateway(this); @@ -149,67 +140,11 @@ public class LocalGateway extends AbstractLifecycleComponent implements } if (electedState == null) { logger.debug("no state elected"); - listener.onSuccess(); - return; + listener.onSuccess(ClusterState.builder().build()); + } else { + logger.debug("elected state from [{}]", electedState.node()); + listener.onSuccess(ClusterState.builder().version(electedState.state().version()).metaData(electedState.state().metaData()).build()); } - - logger.debug("elected state from [{}]", electedState.node()); - final LocalGatewayMetaState state = electedState.state(); - final AtomicInteger indicesCounter = new AtomicInteger(state.metaData().indices().size()); - clusterService.submitStateUpdateTask("local-gateway-elected-state", new ProcessedClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - MetaData.Builder metaDataBuilder = newMetaDataBuilder() - .metaData(currentState.metaData()); - // mark the metadata as read from gateway - metaDataBuilder.markAsRecoveredFromGateway(); - - // add the index templates - for (Map.Entry entry : state.metaData().templates().entrySet()) { - metaDataBuilder.put(entry.getValue()); - } - - return newClusterStateBuilder().state(currentState) - .version(state.version()) - .metaData(metaDataBuilder).build(); - } - - @Override public void clusterStateProcessed(ClusterState clusterState) { - if (state.metaData().indices().isEmpty()) { - listener.onSuccess(); - return; - } - // go over the meta data and create indices, we don't really need to copy over - // the meta data per index, since we create the index and it will be added automatically - for (final IndexMetaData indexMetaData : state.metaData()) { - try { - createIndexService.createIndex(new MetaDataCreateIndexService.Request(MetaDataCreateIndexService.Request.Origin.GATEWAY, "gateway", indexMetaData.index()) - .settings(indexMetaData.settings()) - .mappingsMetaData(indexMetaData.mappings()) - .state(indexMetaData.state()) - .timeout(timeValueSeconds(30)), - - new MetaDataCreateIndexService.Listener() { - @Override public void onResponse(MetaDataCreateIndexService.Response response) { - if (indicesCounter.decrementAndGet() == 0) { - listener.onSuccess(); - } - } - - @Override public void onFailure(Throwable t) { - logger.error("failed to create index [{}]", t, indexMetaData.index()); - // we report success on index creation failure and do nothing - // should we disable writing the updated metadata? - if (indicesCounter.decrementAndGet() == 0) { - listener.onSuccess(); - } - } - }); - } catch (IOException e) { - logger.error("failed to create index [{}]", e, indexMetaData.index()); - } - } - } - }); } @Override public Class suggestIndexGateway() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java index 6a92d1f625b..4d17e137f91 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java @@ -20,6 +20,7 @@ package org.elasticsearch.gateway.none; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; @@ -58,7 +59,7 @@ public class NoneGateway extends AbstractLifecycleComponent implements @Override public void performStateRecovery(GatewayStateRecoveredListener listener) throws GatewayException { logger.debug("performing state recovery"); - listener.onSuccess(); + listener.onSuccess(ClusterState.builder().build()); } @Override public Class suggestIndexGateway() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java index 2771b7e3d9c..02f0c250625 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java @@ -20,27 +20,21 @@ package org.elasticsearch.gateway.shared; import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.cluster.*; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.GatewayException; -import java.io.IOException; -import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import static java.util.concurrent.Executors.*; -import static org.elasticsearch.cluster.ClusterState.*; -import static org.elasticsearch.cluster.metadata.MetaData.*; -import static org.elasticsearch.common.unit.TimeValue.*; import static org.elasticsearch.common.util.concurrent.EsExecutors.*; /** @@ -50,16 +44,13 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent entry : fMetaData.templates().entrySet()) { - metaDataBuilder.put(entry.getValue()); - } - - return newClusterStateBuilder().state(currentState).metaData(metaDataBuilder).build(); - } - - @Override public void clusterStateProcessed(ClusterState clusterState) { - if (fMetaData.indices().isEmpty()) { - listener.onSuccess(); - return; - } - // go over the meta data and create indices, we don't really need to copy over - // the meta data per index, since we create the index and it will be added automatically - for (final IndexMetaData indexMetaData : fMetaData) { - try { - createIndexService.createIndex(new MetaDataCreateIndexService.Request(MetaDataCreateIndexService.Request.Origin.GATEWAY, "gateway", indexMetaData.index()) - .settings(indexMetaData.settings()) - .mappingsMetaData(indexMetaData.mappings()) - .state(indexMetaData.state()) - .timeout(timeValueSeconds(30)), - - new MetaDataCreateIndexService.Listener() { - @Override public void onResponse(MetaDataCreateIndexService.Response response) { - if (indicesCounter.decrementAndGet() == 0) { - listener.onSuccess(); - } - } - - @Override public void onFailure(Throwable t) { - logger.error("failed to create index [{}]", t, indexMetaData.index()); - // we report success on index creation failure and do nothing - // should we disable writing the updated metadata? - if (indicesCounter.decrementAndGet() == 0) { - listener.onSuccess(); - } - } - }); - } catch (IOException e) { - logger.error("failed to create index [{}]", e, indexMetaData.index()); - } - } - } - }); - } - protected abstract MetaData read() throws ElasticSearchException; protected abstract void write(MetaData metaData) throws ElasticSearchException; diff --git a/plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3Gateway.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3Gateway.java index 332d5dd87c7..1734f00cd18 100644 --- a/plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3Gateway.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3Gateway.java @@ -25,7 +25,6 @@ import org.elasticsearch.cloud.aws.AwsS3Service; import org.elasticsearch.cloud.aws.blobstore.S3BlobStore; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.settings.Settings; @@ -48,9 +47,9 @@ public class S3Gateway extends BlobStoreGateway { private final ExecutorService concurrentStreamPool; - @Inject public S3Gateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService, + @Inject public S3Gateway(Settings settings, ClusterService clusterService, ClusterName clusterName, ThreadPool threadPool, AwsS3Service s3Service) throws IOException { - super(settings, clusterService, createIndexService); + super(settings, clusterService); String bucket = componentSettings.get("bucket"); if (bucket == null) { diff --git a/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java b/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java index 75af81c166e..16150cc6410 100644 --- a/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java +++ b/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java @@ -26,7 +26,6 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.common.blobstore.hdfs.HdfsBlobStore; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; @@ -53,9 +52,9 @@ public class HdfsGateway extends BlobStoreGateway { private final ExecutorService concurrentStreamPool; - @Inject public HdfsGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService, + @Inject public HdfsGateway(Settings settings, ClusterService clusterService, ClusterName clusterName, ThreadPool threadPool) throws IOException { - super(settings, clusterService, createIndexService); + super(settings, clusterService); this.closeFileSystem = componentSettings.getAsBoolean("close_fs", true); String uri = componentSettings.get("uri");