Allow to enable / disable bloom filter loading on an index

Allow to have a new index level setting index.codec.bloom.load (default to true), that can control if the boom filters will be loaded or not. This is an updateable setting, that can be updated on a live index using the update settings API.

Note though, when this setting is updated, a fresh Lucene index will be reopened, causing associate caches to be dropped potentially.

closes #4525

Note, this change also disables the returning lucene ram usage stats, due to a bug in Lucene, relates to #4512
This commit is contained in:
Shay Banon 2013-12-19 21:32:04 +01:00
parent 80ed3d05bc
commit 0c1c2dc671
8 changed files with 158 additions and 21 deletions

View File

@ -44,11 +44,16 @@ import org.elasticsearch.index.settings.IndexSettings;
*/
public class CodecService extends AbstractIndexComponent {
public static final String INDEX_CODEC_BLOOM_LOAD = "index.codec.bloom.load";
public static final boolean INDEX_CODEC_BLOOM_LOAD_DEFAULT = true;
private final PostingsFormatService postingsFormatService;
private final DocValuesFormatService docValuesFormatService;
private final MapperService mapperService;
private final ImmutableMap<String, Codec> codecs;
private volatile boolean loadBloomFilter = true;
public final static String DEFAULT_CODEC = "default";
public CodecService(Index index) {
@ -78,6 +83,7 @@ public class CodecService extends AbstractIndexComponent {
codecs.put(codec, Codec.forName(codec));
}
this.codecs = codecs.immutableMap();
this.loadBloomFilter = indexSettings.getAsBoolean(INDEX_CODEC_BLOOM_LOAD, INDEX_CODEC_BLOOM_LOAD_DEFAULT);
}
public PostingsFormatService postingsFormatService() {
@ -100,4 +106,11 @@ public class CodecService extends AbstractIndexComponent {
return codec;
}
public boolean isLoadBloomFilter() {
return this.loadBloomFilter;
}
public void setLoadBloomFilter(boolean loadBloomFilter) {
this.loadBloomFilter = loadBloomFilter;
}
}

View File

@ -28,6 +28,8 @@ import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.util.BloomFilter;
import org.elasticsearch.index.store.DirectoryUtils;
import org.elasticsearch.index.store.Store;
import java.io.IOException;
import java.util.*;
@ -129,7 +131,14 @@ public final class BloomFilterPostingsFormat extends PostingsFormat {
this.delegateFieldsProducer = delegatePostingsFormat
.fieldsProducer(state);
int numBlooms = bloomIn.readInt();
if (state.context.context != IOContext.Context.MERGE) {
boolean load = true;
Store.StoreDirectory storeDir = DirectoryUtils.getStoreDirectory(state.directory);
if (storeDir != null && storeDir.codecService() != null) {
load = storeDir.codecService().isLoadBloomFilter();
}
if (load && state.context.context != IOContext.Context.MERGE) {
// if we merge we don't need to load the bloom filters
for (int i = 0; i < numBlooms; i++) {
int fieldNum = bloomIn.readInt();
@ -189,7 +198,7 @@ public final class BloomFilterPostingsFormat extends PostingsFormat {
return size;
}
}
public static final class BloomFilteredTerms extends FilterAtomicReader.FilterTerms {
private BloomFilter filter;

View File

@ -32,6 +32,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Preconditions;
@ -1117,6 +1118,21 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
}
static final boolean allowRamBytesUsed;
static {
assert Version.CURRENT.luceneVersion == org.apache.lucene.util.Version.LUCENE_46 :
"when upgrading to a new lucene version, check if ramBytes is fixed, see https://issues.apache.org/jira/browse/LUCENE-5373";
boolean xAllowRamBytesUsed = false;
assert xAllowRamBytesUsed = true;
allowRamBytesUsed = xAllowRamBytesUsed;
}
private long getReaderRamBytesUsed(AtomicReaderContext reader) {
assert reader.reader() instanceof SegmentReader;
return allowRamBytesUsed ? ((SegmentReader) reader.reader()).ramBytesUsed() : 0;
}
@Override
public SegmentsStats segmentsStats() {
rwl.readLock().lock();
@ -1126,8 +1142,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
try {
SegmentsStats stats = new SegmentsStats();
for (AtomicReaderContext reader : searcher.reader().leaves()) {
assert reader.reader() instanceof SegmentReader;
stats.add(1, ((SegmentReader) reader.reader()).ramBytesUsed());
stats.add(1, getReaderRamBytesUsed(reader));
}
return stats;
} finally {
@ -1163,7 +1178,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} catch (IOException e) {
logger.trace("failed to get size for [{}]", e, info.info.name);
}
segment.memoryInBytes = ((SegmentReader) reader.reader()).ramBytesUsed();
segment.memoryInBytes = getReaderRamBytesUsed(reader);
segments.put(info.info.name, segment);
}
} finally {
@ -1413,8 +1428,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, RobinEngine.this.indexConcurrency);
boolean failOnMergeFailure = settings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, RobinEngine.this.failOnMergeFailure);
String codecName = settings.get(INDEX_CODEC, RobinEngine.this.codecName);
final boolean codecBloomLoad = settings.getAsBoolean(CodecService.INDEX_CODEC_BLOOM_LOAD, codecService.isLoadBloomFilter());
boolean requiresFlushing = false;
if (indexConcurrency != RobinEngine.this.indexConcurrency || !codecName.equals(RobinEngine.this.codecName) || failOnMergeFailure != RobinEngine.this.failOnMergeFailure) {
if (indexConcurrency != RobinEngine.this.indexConcurrency ||
!codecName.equals(RobinEngine.this.codecName) ||
failOnMergeFailure != RobinEngine.this.failOnMergeFailure ||
codecBloomLoad != codecService.isLoadBloomFilter()) {
rwl.readLock().lock();
try {
if (indexConcurrency != RobinEngine.this.indexConcurrency) {
@ -1433,6 +1452,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
logger.info("updating {} from [{}] to [{}]", RobinEngine.INDEX_FAIL_ON_MERGE_FAILURE, RobinEngine.this.failOnMergeFailure, failOnMergeFailure);
RobinEngine.this.failOnMergeFailure = failOnMergeFailure;
}
if (codecBloomLoad != codecService.isLoadBloomFilter()) {
logger.info("updating {} from [{}] to [{}]", CodecService.INDEX_CODEC_BLOOM_LOAD, codecService.isLoadBloomFilter(), codecBloomLoad);
codecService.setLoadBloomFilter(codecBloomLoad);
// we need to flush in this case, to load/unload the bloom filters
requiresFlushing = true;
}
} finally {
rwl.readLock().unlock();
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.cluster.settings.Validator;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.gateway.local.LocalGatewayAllocator;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.robin.RobinEngine;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
@ -79,6 +80,7 @@ public class IndexDynamicSettingsModule extends AbstractModule {
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_COMPOUND_FORMAT);
indexDynamicSettings.addDynamicSetting(RobinEngine.INDEX_INDEX_CONCURRENCY, Validator.NON_NEGATIVE_INTEGER);
indexDynamicSettings.addDynamicSetting(RobinEngine.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN);
indexDynamicSettings.addDynamicSetting(CodecService.INDEX_CODEC_BLOOM_LOAD, Validator.BOOLEAN);
indexDynamicSettings.addDynamicSetting(RobinEngine.INDEX_GC_DELETES, Validator.TIME);
indexDynamicSettings.addDynamicSetting(RobinEngine.INDEX_CODEC);
indexDynamicSettings.addDynamicSetting(RobinEngine.INDEX_FAIL_ON_MERGE_FAILURE);

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.lucene.store.*;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.compress.Compressor;
@ -35,6 +36,7 @@ import org.elasticsearch.common.lucene.store.ChecksumIndexOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.CloseableIndexComponent;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
@ -61,6 +63,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
}
private final IndexStore indexStore;
final CodecService codecService;
private final DirectoryService directoryService;
private final StoreDirectory directory;
@ -71,9 +74,10 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
private final boolean sync;
@Inject
public Store(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, DirectoryService directoryService, Distributor distributor) throws IOException {
public Store(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, CodecService codecService, DirectoryService directoryService, Distributor distributor) throws IOException {
super(shardId, indexSettings);
this.indexStore = indexStore;
this.codecService = codecService;
this.directoryService = directoryService;
this.sync = componentSettings.getAsBoolean("sync", true); // TODO we don't really need to fsync when using shared gateway...
this.directory = new StoreDirectory(distributor);
@ -324,6 +328,15 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
return Store.this.shardId();
}
public Settings settings() {
return Store.this.indexSettings();
}
@Nullable
public CodecService codecService() {
return Store.this.codecService;
}
public Directory[] delegates() {
return distributor.all();
}

View File

@ -19,15 +19,19 @@
package org.elasticsearch.index.engine.robin;
import com.google.common.base.Predicate;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BloomFilter;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -41,6 +45,78 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitC
public class RobinEngineIntegrationTest extends ElasticsearchIntegrationTest {
@Test
public void testSettingLoadBloomFilterDefaultTrue() throws Exception {
client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.builder().put("number_of_replicas", 0).put("number_of_shards", 1)).get();
client().prepareIndex("test", "foo").setSource("field", "foo").get();
ensureGreen();
refresh();
IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).get();
final long segmentsMemoryWithBloom = stats.getTotal().getSegments().getMemoryInBytes();
logger.info("segments with bloom: {}", segmentsMemoryWithBloom);
logger.info("updating the setting to unload bloom filters");
client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(CodecService.INDEX_CODEC_BLOOM_LOAD, false)).get();
logger.info("waiting for memory to match without blooms");
awaitBusy(new Predicate<Object>() {
public boolean apply(Object o) {
IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).get();
long segmentsMemoryWithoutBloom = stats.getTotal().getSegments().getMemoryInBytes();
logger.info("trying segments without bloom: {}", segmentsMemoryWithoutBloom);
return segmentsMemoryWithoutBloom == (segmentsMemoryWithBloom - BloomFilter.Factory.DEFAULT.createFilter(1).getSizeInBytes());
}
});
logger.info("updating the setting to load bloom filters");
client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(CodecService.INDEX_CODEC_BLOOM_LOAD, true)).get();
logger.info("waiting for memory to match with blooms");
awaitBusy(new Predicate<Object>() {
public boolean apply(Object o) {
IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).get();
long newSegmentsMemoryWithBloom = stats.getTotal().getSegments().getMemoryInBytes();
logger.info("trying segments with bloom: {}", newSegmentsMemoryWithBloom);
return newSegmentsMemoryWithBloom == segmentsMemoryWithBloom;
}
});
}
@Test
public void testSettingLoadBloomFilterDefaultFalse() throws Exception {
client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.builder().put("number_of_replicas", 0).put("number_of_shards", 1).put(CodecService.INDEX_CODEC_BLOOM_LOAD, false)).get();
client().prepareIndex("test", "foo").setSource("field", "foo").get();
ensureGreen();
refresh();
IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).get();
final long segmentsMemoryWithoutBloom = stats.getTotal().getSegments().getMemoryInBytes();
logger.info("segments without bloom: {}", segmentsMemoryWithoutBloom);
logger.info("updating the setting to load bloom filters");
client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(CodecService.INDEX_CODEC_BLOOM_LOAD, true)).get();
logger.info("waiting for memory to match with blooms");
awaitBusy(new Predicate<Object>() {
public boolean apply(Object o) {
IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).get();
long segmentsMemoryWithBloom = stats.getTotal().getSegments().getMemoryInBytes();
logger.info("trying segments with bloom: {}", segmentsMemoryWithoutBloom);
return segmentsMemoryWithoutBloom == (segmentsMemoryWithBloom - BloomFilter.Factory.DEFAULT.createFilter(1).getSizeInBytes());
}
});
logger.info("updating the setting to unload bloom filters");
client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(CodecService.INDEX_CODEC_BLOOM_LOAD, false)).get();
logger.info("waiting for memory to match without blooms");
awaitBusy(new Predicate<Object>() {
public boolean apply(Object o) {
IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).get();
long newSegmentsMemoryWithoutBloom = stats.getTotal().getSegments().getMemoryInBytes();
logger.info("trying segments without bloom: {}", newSegmentsMemoryWithoutBloom);
return newSegmentsMemoryWithoutBloom == segmentsMemoryWithoutBloom;
}
});
}
@Test
public void testSetIndexCompoundOnFlush() {
client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.builder().put("number_of_replicas", 0).put("number_of_shards", 1)).get();
@ -52,13 +128,12 @@ public class RobinEngineIntegrationTest extends ElasticsearchIntegrationTest {
client().prepareIndex("test", "foo").setSource("field", "foo").get();
refresh();
assertTotalCompoundSegments(1, 2, "test");
client().admin().indices().prepareUpdateSettings("test")
.setSettings(ImmutableSettings.builder().put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, true)).get();
.setSettings(ImmutableSettings.builder().put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, true)).get();
client().prepareIndex("test", "foo").setSource("field", "foo").get();
refresh();
assertTotalCompoundSegments(2, 3, "test");
}
private void assertTotalCompoundSegments(int i, int t, String index) {
@ -83,6 +158,7 @@ public class RobinEngineIntegrationTest extends ElasticsearchIntegrationTest {
assertThat(total, Matchers.equalTo(t));
}
@Test
public void test4093() {
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder()
@ -103,8 +179,8 @@ public class RobinEngineIntegrationTest extends ElasticsearchIntegrationTest {
final int numDocs = between(30, 100); // 30 docs are enough to fail without the fix for #4093
logger.debug(" --> Indexing [{}] documents", numDocs);
for (int i = 0; i < numDocs; i++) {
if ((i+1) % 10 == 0) {
logger.debug(" --> Indexed [{}] documents", i+1);
if ((i + 1) % 10 == 0) {
logger.debug(" --> Indexed [{}] documents", i + 1);
}
client().prepareIndex("test", "type1")
.setSource("a", "" + i)

View File

@ -157,12 +157,12 @@ public class RobinEngineTests extends ElasticsearchTestCase {
protected Store createStore() throws IOException {
DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS);
return new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService));
return new Store(shardId, EMPTY_SETTINGS, null, null, directoryService, new LeastUsedDistributor(directoryService));
}
protected Store createStoreReplica() throws IOException {
DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS);
return new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService));
return new Store(shardId, EMPTY_SETTINGS, null, null, directoryService, new LeastUsedDistributor(directoryService));
}
protected Translog createTranslog() {

View File

@ -19,10 +19,6 @@
package org.elasticsearch.index.merge.policy;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import java.io.IOException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@ -35,8 +31,11 @@ import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
import org.elasticsearch.index.store.ram.RamDirectoryService;
import org.junit.Test;
import java.io.IOException;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.equalTo;
public class MergePolicySettingsTest {
@ -83,7 +82,7 @@ public class MergePolicySettingsTest {
assertThat(new LogDocMergePolicyProvider(createStore(build(0.0)), service).newMergePolicy().getNoCFSRatio(), equalTo(0.0));
}
@Test
public void testInvalidValue() throws IOException {
IndexSettingsService service = new IndexSettingsService(new Index("test"), EMPTY_SETTINGS);
@ -174,7 +173,7 @@ public class MergePolicySettingsTest {
protected Store createStore(Settings settings) throws IOException {
DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS);
return new Store(shardId, settings, null, directoryService, new LeastUsedDistributor(directoryService));
return new Store(shardId, settings, null, null, directoryService, new LeastUsedDistributor(directoryService));
}
}