From 0c1c2dc671b7a6e8a65f0ac910db2bf9c34230fa Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Thu, 19 Dec 2013 21:32:04 +0100 Subject: [PATCH] Allow to enable / disable bloom filter loading on an index Allow to have a new index level setting index.codec.bloom.load (default to true), that can control if the boom filters will be loaded or not. This is an updateable setting, that can be updated on a live index using the update settings API. Note though, when this setting is updated, a fresh Lucene index will be reopened, causing associate caches to be dropped potentially. closes #4525 Note, this change also disables the returning lucene ram usage stats, due to a bug in Lucene, relates to #4512 --- .../index/codec/CodecService.java | 13 +++ .../BloomFilterPostingsFormat.java | 13 ++- .../index/engine/robin/RobinEngine.java | 33 ++++++- .../settings/IndexDynamicSettingsModule.java | 2 + .../org/elasticsearch/index/store/Store.java | 15 +++- .../robin/RobinEngineIntegrationTest.java | 86 +++++++++++++++++-- .../index/engine/robin/RobinEngineTests.java | 4 +- .../merge/policy/MergePolicySettingsTest.java | 13 ++- 8 files changed, 158 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/codec/CodecService.java b/src/main/java/org/elasticsearch/index/codec/CodecService.java index 5876e9bd61b..835dcfb22ac 100644 --- a/src/main/java/org/elasticsearch/index/codec/CodecService.java +++ b/src/main/java/org/elasticsearch/index/codec/CodecService.java @@ -44,11 +44,16 @@ import org.elasticsearch.index.settings.IndexSettings; */ public class CodecService extends AbstractIndexComponent { + public static final String INDEX_CODEC_BLOOM_LOAD = "index.codec.bloom.load"; + public static final boolean INDEX_CODEC_BLOOM_LOAD_DEFAULT = true; + private final PostingsFormatService postingsFormatService; private final DocValuesFormatService docValuesFormatService; private final MapperService mapperService; private final ImmutableMap codecs; + private volatile boolean loadBloomFilter = true; + public final static String DEFAULT_CODEC = "default"; public CodecService(Index index) { @@ -78,6 +83,7 @@ public class CodecService extends AbstractIndexComponent { codecs.put(codec, Codec.forName(codec)); } this.codecs = codecs.immutableMap(); + this.loadBloomFilter = indexSettings.getAsBoolean(INDEX_CODEC_BLOOM_LOAD, INDEX_CODEC_BLOOM_LOAD_DEFAULT); } public PostingsFormatService postingsFormatService() { @@ -100,4 +106,11 @@ public class CodecService extends AbstractIndexComponent { return codec; } + public boolean isLoadBloomFilter() { + return this.loadBloomFilter; + } + + public void setLoadBloomFilter(boolean loadBloomFilter) { + this.loadBloomFilter = loadBloomFilter; + } } diff --git a/src/main/java/org/elasticsearch/index/codec/postingsformat/BloomFilterPostingsFormat.java b/src/main/java/org/elasticsearch/index/codec/postingsformat/BloomFilterPostingsFormat.java index 5753c26a950..4f699387f9b 100644 --- a/src/main/java/org/elasticsearch/index/codec/postingsformat/BloomFilterPostingsFormat.java +++ b/src/main/java/org/elasticsearch/index/codec/postingsformat/BloomFilterPostingsFormat.java @@ -28,6 +28,8 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.elasticsearch.common.util.BloomFilter; +import org.elasticsearch.index.store.DirectoryUtils; +import org.elasticsearch.index.store.Store; import java.io.IOException; import java.util.*; @@ -129,7 +131,14 @@ public final class BloomFilterPostingsFormat extends PostingsFormat { this.delegateFieldsProducer = delegatePostingsFormat .fieldsProducer(state); int numBlooms = bloomIn.readInt(); - if (state.context.context != IOContext.Context.MERGE) { + + boolean load = true; + Store.StoreDirectory storeDir = DirectoryUtils.getStoreDirectory(state.directory); + if (storeDir != null && storeDir.codecService() != null) { + load = storeDir.codecService().isLoadBloomFilter(); + } + + if (load && state.context.context != IOContext.Context.MERGE) { // if we merge we don't need to load the bloom filters for (int i = 0; i < numBlooms; i++) { int fieldNum = bloomIn.readInt(); @@ -189,7 +198,7 @@ public final class BloomFilterPostingsFormat extends PostingsFormat { return size; } } - + public static final class BloomFilteredTerms extends FilterAtomicReader.FilterTerms { private BloomFilter filter; diff --git a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index 0d21ab7d19e..5735370f018 100644 --- a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -32,6 +32,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalStateException; +import org.elasticsearch.Version; import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Preconditions; @@ -1117,6 +1118,21 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } } + static final boolean allowRamBytesUsed; + + static { + assert Version.CURRENT.luceneVersion == org.apache.lucene.util.Version.LUCENE_46 : + "when upgrading to a new lucene version, check if ramBytes is fixed, see https://issues.apache.org/jira/browse/LUCENE-5373"; + boolean xAllowRamBytesUsed = false; + assert xAllowRamBytesUsed = true; + allowRamBytesUsed = xAllowRamBytesUsed; + } + + private long getReaderRamBytesUsed(AtomicReaderContext reader) { + assert reader.reader() instanceof SegmentReader; + return allowRamBytesUsed ? ((SegmentReader) reader.reader()).ramBytesUsed() : 0; + } + @Override public SegmentsStats segmentsStats() { rwl.readLock().lock(); @@ -1126,8 +1142,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { try { SegmentsStats stats = new SegmentsStats(); for (AtomicReaderContext reader : searcher.reader().leaves()) { - assert reader.reader() instanceof SegmentReader; - stats.add(1, ((SegmentReader) reader.reader()).ramBytesUsed()); + stats.add(1, getReaderRamBytesUsed(reader)); } return stats; } finally { @@ -1163,7 +1178,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } catch (IOException e) { logger.trace("failed to get size for [{}]", e, info.info.name); } - segment.memoryInBytes = ((SegmentReader) reader.reader()).ramBytesUsed(); + segment.memoryInBytes = getReaderRamBytesUsed(reader); segments.put(info.info.name, segment); } } finally { @@ -1413,8 +1428,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, RobinEngine.this.indexConcurrency); boolean failOnMergeFailure = settings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, RobinEngine.this.failOnMergeFailure); String codecName = settings.get(INDEX_CODEC, RobinEngine.this.codecName); + final boolean codecBloomLoad = settings.getAsBoolean(CodecService.INDEX_CODEC_BLOOM_LOAD, codecService.isLoadBloomFilter()); boolean requiresFlushing = false; - if (indexConcurrency != RobinEngine.this.indexConcurrency || !codecName.equals(RobinEngine.this.codecName) || failOnMergeFailure != RobinEngine.this.failOnMergeFailure) { + if (indexConcurrency != RobinEngine.this.indexConcurrency || + !codecName.equals(RobinEngine.this.codecName) || + failOnMergeFailure != RobinEngine.this.failOnMergeFailure || + codecBloomLoad != codecService.isLoadBloomFilter()) { rwl.readLock().lock(); try { if (indexConcurrency != RobinEngine.this.indexConcurrency) { @@ -1433,6 +1452,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { logger.info("updating {} from [{}] to [{}]", RobinEngine.INDEX_FAIL_ON_MERGE_FAILURE, RobinEngine.this.failOnMergeFailure, failOnMergeFailure); RobinEngine.this.failOnMergeFailure = failOnMergeFailure; } + if (codecBloomLoad != codecService.isLoadBloomFilter()) { + logger.info("updating {} from [{}] to [{}]", CodecService.INDEX_CODEC_BLOOM_LOAD, codecService.isLoadBloomFilter(), codecBloomLoad); + codecService.setLoadBloomFilter(codecBloomLoad); + // we need to flush in this case, to load/unload the bloom filters + requiresFlushing = true; + } } finally { rwl.readLock().unlock(); } diff --git a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java index d6205b4ca24..effdc6e134f 100644 --- a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.settings.DynamicSettings; import org.elasticsearch.cluster.settings.Validator; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.gateway.local.LocalGatewayAllocator; +import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.robin.RobinEngine; import org.elasticsearch.index.gateway.IndexShardGatewayService; import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService; @@ -79,6 +80,7 @@ public class IndexDynamicSettingsModule extends AbstractModule { indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_COMPOUND_FORMAT); indexDynamicSettings.addDynamicSetting(RobinEngine.INDEX_INDEX_CONCURRENCY, Validator.NON_NEGATIVE_INTEGER); indexDynamicSettings.addDynamicSetting(RobinEngine.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN); + indexDynamicSettings.addDynamicSetting(CodecService.INDEX_CODEC_BLOOM_LOAD, Validator.BOOLEAN); indexDynamicSettings.addDynamicSetting(RobinEngine.INDEX_GC_DELETES, Validator.TIME); indexDynamicSettings.addDynamicSetting(RobinEngine.INDEX_CODEC); indexDynamicSettings.addDynamicSetting(RobinEngine.INDEX_FAIL_ON_MERGE_FAILURE); diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index d6817f81086..9c3c57ba520 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import org.apache.lucene.store.*; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.compress.Compressor; @@ -35,6 +36,7 @@ import org.elasticsearch.common.lucene.store.ChecksumIndexOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.CloseableIndexComponent; +import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; @@ -61,6 +63,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex } private final IndexStore indexStore; + final CodecService codecService; private final DirectoryService directoryService; private final StoreDirectory directory; @@ -71,9 +74,10 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex private final boolean sync; @Inject - public Store(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, DirectoryService directoryService, Distributor distributor) throws IOException { + public Store(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, CodecService codecService, DirectoryService directoryService, Distributor distributor) throws IOException { super(shardId, indexSettings); this.indexStore = indexStore; + this.codecService = codecService; this.directoryService = directoryService; this.sync = componentSettings.getAsBoolean("sync", true); // TODO we don't really need to fsync when using shared gateway... this.directory = new StoreDirectory(distributor); @@ -324,6 +328,15 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex return Store.this.shardId(); } + public Settings settings() { + return Store.this.indexSettings(); + } + + @Nullable + public CodecService codecService() { + return Store.this.codecService; + } + public Directory[] delegates() { return distributor.all(); } diff --git a/src/test/java/org/elasticsearch/index/engine/robin/RobinEngineIntegrationTest.java b/src/test/java/org/elasticsearch/index/engine/robin/RobinEngineIntegrationTest.java index 2e010083e30..e13005d2395 100644 --- a/src/test/java/org/elasticsearch/index/engine/robin/RobinEngineIntegrationTest.java +++ b/src/test/java/org/elasticsearch/index/engine/robin/RobinEngineIntegrationTest.java @@ -19,15 +19,19 @@ package org.elasticsearch.index.engine.robin; +import com.google.common.base.Predicate; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.indices.segments.IndexSegments; import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; import org.elasticsearch.action.admin.indices.segments.ShardSegments; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BloomFilter; +import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.ElasticsearchIntegrationTest; @@ -41,6 +45,78 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitC public class RobinEngineIntegrationTest extends ElasticsearchIntegrationTest { + @Test + public void testSettingLoadBloomFilterDefaultTrue() throws Exception { + client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.builder().put("number_of_replicas", 0).put("number_of_shards", 1)).get(); + client().prepareIndex("test", "foo").setSource("field", "foo").get(); + ensureGreen(); + refresh(); + + IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).get(); + final long segmentsMemoryWithBloom = stats.getTotal().getSegments().getMemoryInBytes(); + logger.info("segments with bloom: {}", segmentsMemoryWithBloom); + + logger.info("updating the setting to unload bloom filters"); + client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(CodecService.INDEX_CODEC_BLOOM_LOAD, false)).get(); + logger.info("waiting for memory to match without blooms"); + awaitBusy(new Predicate() { + public boolean apply(Object o) { + IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).get(); + long segmentsMemoryWithoutBloom = stats.getTotal().getSegments().getMemoryInBytes(); + logger.info("trying segments without bloom: {}", segmentsMemoryWithoutBloom); + return segmentsMemoryWithoutBloom == (segmentsMemoryWithBloom - BloomFilter.Factory.DEFAULT.createFilter(1).getSizeInBytes()); + } + }); + + logger.info("updating the setting to load bloom filters"); + client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(CodecService.INDEX_CODEC_BLOOM_LOAD, true)).get(); + logger.info("waiting for memory to match with blooms"); + awaitBusy(new Predicate() { + public boolean apply(Object o) { + IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).get(); + long newSegmentsMemoryWithBloom = stats.getTotal().getSegments().getMemoryInBytes(); + logger.info("trying segments with bloom: {}", newSegmentsMemoryWithBloom); + return newSegmentsMemoryWithBloom == segmentsMemoryWithBloom; + } + }); + } + + @Test + public void testSettingLoadBloomFilterDefaultFalse() throws Exception { + client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.builder().put("number_of_replicas", 0).put("number_of_shards", 1).put(CodecService.INDEX_CODEC_BLOOM_LOAD, false)).get(); + client().prepareIndex("test", "foo").setSource("field", "foo").get(); + ensureGreen(); + refresh(); + + IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).get(); + final long segmentsMemoryWithoutBloom = stats.getTotal().getSegments().getMemoryInBytes(); + logger.info("segments without bloom: {}", segmentsMemoryWithoutBloom); + + logger.info("updating the setting to load bloom filters"); + client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(CodecService.INDEX_CODEC_BLOOM_LOAD, true)).get(); + logger.info("waiting for memory to match with blooms"); + awaitBusy(new Predicate() { + public boolean apply(Object o) { + IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).get(); + long segmentsMemoryWithBloom = stats.getTotal().getSegments().getMemoryInBytes(); + logger.info("trying segments with bloom: {}", segmentsMemoryWithoutBloom); + return segmentsMemoryWithoutBloom == (segmentsMemoryWithBloom - BloomFilter.Factory.DEFAULT.createFilter(1).getSizeInBytes()); + } + }); + + logger.info("updating the setting to unload bloom filters"); + client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(CodecService.INDEX_CODEC_BLOOM_LOAD, false)).get(); + logger.info("waiting for memory to match without blooms"); + awaitBusy(new Predicate() { + public boolean apply(Object o) { + IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).get(); + long newSegmentsMemoryWithoutBloom = stats.getTotal().getSegments().getMemoryInBytes(); + logger.info("trying segments without bloom: {}", newSegmentsMemoryWithoutBloom); + return newSegmentsMemoryWithoutBloom == segmentsMemoryWithoutBloom; + } + }); + } + @Test public void testSetIndexCompoundOnFlush() { client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.builder().put("number_of_replicas", 0).put("number_of_shards", 1)).get(); @@ -52,13 +128,12 @@ public class RobinEngineIntegrationTest extends ElasticsearchIntegrationTest { client().prepareIndex("test", "foo").setSource("field", "foo").get(); refresh(); assertTotalCompoundSegments(1, 2, "test"); - + client().admin().indices().prepareUpdateSettings("test") - .setSettings(ImmutableSettings.builder().put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, true)).get(); + .setSettings(ImmutableSettings.builder().put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, true)).get(); client().prepareIndex("test", "foo").setSource("field", "foo").get(); refresh(); assertTotalCompoundSegments(2, 3, "test"); - } private void assertTotalCompoundSegments(int i, int t, String index) { @@ -83,6 +158,7 @@ public class RobinEngineIntegrationTest extends ElasticsearchIntegrationTest { assertThat(total, Matchers.equalTo(t)); } + @Test public void test4093() { assertAcked(prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder() @@ -103,8 +179,8 @@ public class RobinEngineIntegrationTest extends ElasticsearchIntegrationTest { final int numDocs = between(30, 100); // 30 docs are enough to fail without the fix for #4093 logger.debug(" --> Indexing [{}] documents", numDocs); for (int i = 0; i < numDocs; i++) { - if ((i+1) % 10 == 0) { - logger.debug(" --> Indexed [{}] documents", i+1); + if ((i + 1) % 10 == 0) { + logger.debug(" --> Indexed [{}] documents", i + 1); } client().prepareIndex("test", "type1") .setSource("a", "" + i) diff --git a/src/test/java/org/elasticsearch/index/engine/robin/RobinEngineTests.java b/src/test/java/org/elasticsearch/index/engine/robin/RobinEngineTests.java index 4ce99e600cf..f0598dd632c 100644 --- a/src/test/java/org/elasticsearch/index/engine/robin/RobinEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/robin/RobinEngineTests.java @@ -157,12 +157,12 @@ public class RobinEngineTests extends ElasticsearchTestCase { protected Store createStore() throws IOException { DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS); - return new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService)); + return new Store(shardId, EMPTY_SETTINGS, null, null, directoryService, new LeastUsedDistributor(directoryService)); } protected Store createStoreReplica() throws IOException { DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS); - return new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService)); + return new Store(shardId, EMPTY_SETTINGS, null, null, directoryService, new LeastUsedDistributor(directoryService)); } protected Translog createTranslog() { diff --git a/src/test/java/org/elasticsearch/index/merge/policy/MergePolicySettingsTest.java b/src/test/java/org/elasticsearch/index/merge/policy/MergePolicySettingsTest.java index 0896a090041..2e5581208b0 100644 --- a/src/test/java/org/elasticsearch/index/merge/policy/MergePolicySettingsTest.java +++ b/src/test/java/org/elasticsearch/index/merge/policy/MergePolicySettingsTest.java @@ -19,10 +19,6 @@ package org.elasticsearch.index.merge.policy; -import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; - -import java.io.IOException; - import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; @@ -35,8 +31,11 @@ import org.elasticsearch.index.store.distributor.LeastUsedDistributor; import org.elasticsearch.index.store.ram.RamDirectoryService; import org.junit.Test; +import java.io.IOException; + +import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.equalTo; public class MergePolicySettingsTest { @@ -83,7 +82,7 @@ public class MergePolicySettingsTest { assertThat(new LogDocMergePolicyProvider(createStore(build(0.0)), service).newMergePolicy().getNoCFSRatio(), equalTo(0.0)); } - + @Test public void testInvalidValue() throws IOException { IndexSettingsService service = new IndexSettingsService(new Index("test"), EMPTY_SETTINGS); @@ -174,7 +173,7 @@ public class MergePolicySettingsTest { protected Store createStore(Settings settings) throws IOException { DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS); - return new Store(shardId, settings, null, directoryService, new LeastUsedDistributor(directoryService)); + return new Store(shardId, settings, null, null, directoryService, new LeastUsedDistributor(directoryService)); } }