mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-26 01:48:45 +00:00
Use reader attributes to control term dict memory useage (#42838)
This change makes use of the reader attributes added in LUCENE-8671 to ensure that `_id` fields are always on-heap for best update performance and term dicts are generally off-heap on Read-Only engines. Closes #38390
This commit is contained in:
parent
955aee8a07
commit
41a9f3ae3b
@ -20,6 +20,8 @@
|
||||
package org.elasticsearch.index.engine;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader;
|
||||
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader.FSTLoadMode;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
@ -42,7 +44,9 @@ import org.apache.lucene.search.SearcherManager;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FilterDirectory;
|
||||
import org.apache.lucene.store.LockObtainFailedException;
|
||||
import org.apache.lucene.store.MMapDirectory;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
import org.elasticsearch.Assertions;
|
||||
@ -77,6 +81,7 @@ import org.elasticsearch.index.seqno.SeqNoStats;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.store.FsDirectoryFactory;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.index.translog.TranslogConfig;
|
||||
@ -2143,10 +2148,21 @@ public class InternalEngine extends Engine {
|
||||
}
|
||||
}
|
||||
|
||||
static Map<String, String> getReaderAttributes(Directory directory) {
|
||||
Directory unwrap = FilterDirectory.unwrap(directory);
|
||||
boolean defaultOffHeap = FsDirectoryFactory.isHybridFs(unwrap) || unwrap instanceof MMapDirectory;
|
||||
return Map.of(
|
||||
BlockTreeTermsReader.FST_MODE_KEY, // if we are using MMAP for term dics we force all off heap unless it's the ID field
|
||||
defaultOffHeap ? FSTLoadMode.OFF_HEAP.name() : FSTLoadMode.ON_HEAP.name()
|
||||
, BlockTreeTermsReader.FST_MODE_KEY + "." + IdFieldMapper.NAME, // always force ID field on-heap for fast updates
|
||||
FSTLoadMode.ON_HEAP.name());
|
||||
}
|
||||
|
||||
private IndexWriterConfig getIndexWriterConfig() {
|
||||
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
|
||||
iwc.setCommitOnClose(false); // we by default don't commit on close
|
||||
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
|
||||
iwc.setReaderAttributes(getReaderAttributes(store.directory()));
|
||||
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
|
||||
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
|
||||
boolean verbose = false;
|
||||
|
@ -46,7 +46,7 @@ public final class NoOpEngine extends ReadOnlyEngine {
|
||||
super(config, null, null, true, Function.identity());
|
||||
this.stats = new SegmentsStats();
|
||||
Directory directory = store.directory();
|
||||
try (DirectoryReader reader = DirectoryReader.open(directory)) {
|
||||
try (DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES)) {
|
||||
for (LeafReaderContext ctx : reader.getContext().leaves()) {
|
||||
SegmentReader segmentReader = Lucene.segmentReader(ctx.reader());
|
||||
fillSegmentStats(segmentReader, true, stats);
|
||||
|
@ -18,6 +18,7 @@
|
||||
*/
|
||||
package org.elasticsearch.index.engine;
|
||||
|
||||
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
@ -47,7 +48,9 @@ import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
@ -62,6 +65,12 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
public class ReadOnlyEngine extends Engine {
|
||||
|
||||
/**
|
||||
* Reader attributes used for read only engines. These attributes prevent loading term dictionaries on-heap even if the field is an
|
||||
* ID field.
|
||||
*/
|
||||
public static final Map<String, String> OFF_HEAP_READER_ATTRIBUTES = Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY,
|
||||
BlockTreeTermsReader.FSTLoadMode.OFF_HEAP.name());
|
||||
private final SegmentInfos lastCommittedSegmentInfos;
|
||||
private final SeqNoStats seqNoStats;
|
||||
private final TranslogStats translogStats;
|
||||
@ -165,7 +174,7 @@ public class ReadOnlyEngine extends Engine {
|
||||
}
|
||||
|
||||
protected DirectoryReader open(IndexCommit commit) throws IOException {
|
||||
return DirectoryReader.open(commit);
|
||||
return DirectoryReader.open(commit, OFF_HEAP_READER_ATTRIBUTES);
|
||||
}
|
||||
|
||||
private DocsStats docsStats(final SegmentInfos lastCommittedSegmentInfos) {
|
||||
|
@ -22,6 +22,7 @@ package org.elasticsearch.index.store;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
import org.apache.lucene.store.FileSwitchDirectory;
|
||||
import org.apache.lucene.store.FilterDirectory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.store.LockFactory;
|
||||
@ -121,6 +122,14 @@ public class FsDirectoryFactory implements IndexStorePlugin.DirectoryFactory {
|
||||
return directory;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true iff the directory is a hybrid fs directory
|
||||
*/
|
||||
public static boolean isHybridFs(Directory directory) {
|
||||
Directory unwrap = FilterDirectory.unwrap(directory);
|
||||
return unwrap instanceof HybridDirectory;
|
||||
}
|
||||
|
||||
static final class HybridDirectory extends NIOFSDirectory {
|
||||
private final FSDirectory randomAccessDirectory;
|
||||
|
||||
|
@ -38,7 +38,6 @@ import org.apache.lucene.index.SegmentInfos;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.BufferedChecksum;
|
||||
import org.apache.lucene.store.ByteArrayDataInput;
|
||||
import org.apache.lucene.store.ByteBufferIndexInput;
|
||||
import org.apache.lucene.store.ChecksumIndexInput;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FilterDirectory;
|
||||
@ -46,7 +45,6 @@ import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.store.Lock;
|
||||
import org.apache.lucene.store.RandomAccessInput;
|
||||
import org.apache.lucene.store.SimpleFSDirectory;
|
||||
import org.apache.lucene.util.ArrayUtil;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
@ -98,7 +96,6 @@ import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
@ -137,7 +134,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
||||
* this by exploiting lucene internals and wrapping the IndexInput in a simple delegate.
|
||||
*/
|
||||
public static final Setting<Boolean> FORCE_RAM_TERM_DICT = Setting.boolSetting("index.force_memory_term_dictionary", false,
|
||||
Property.IndexScope);
|
||||
Property.IndexScope, Property.Deprecated);
|
||||
static final String CODEC = "store";
|
||||
static final int VERSION_WRITE_THROWABLE= 2; // we write throwable since 2.0
|
||||
static final int VERSION_STACK_TRACE = 1; // we write the stack trace too since 1.4.0
|
||||
@ -172,8 +169,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
||||
final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
|
||||
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
|
||||
ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(directory, refreshInterval);
|
||||
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId),
|
||||
indexSettings.getValue(FORCE_RAM_TERM_DICT));
|
||||
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId));
|
||||
this.shardLock = shardLock;
|
||||
this.onClose = onClose;
|
||||
|
||||
@ -712,12 +708,10 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
||||
static final class StoreDirectory extends FilterDirectory {
|
||||
|
||||
private final Logger deletesLogger;
|
||||
private final boolean forceRamTermDict;
|
||||
|
||||
StoreDirectory(ByteSizeCachingDirectory delegateDirectory, Logger deletesLogger, boolean forceRamTermDict) {
|
||||
StoreDirectory(ByteSizeCachingDirectory delegateDirectory, Logger deletesLogger) {
|
||||
super(delegateDirectory);
|
||||
this.deletesLogger = deletesLogger;
|
||||
this.forceRamTermDict = forceRamTermDict;
|
||||
}
|
||||
|
||||
/** Estimate the cumulative size of all files in this directory in bytes. */
|
||||
@ -744,18 +738,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
||||
super.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexInput openInput(String name, IOContext context) throws IOException {
|
||||
IndexInput input = super.openInput(name, context);
|
||||
if (name.endsWith(".tip") || name.endsWith(".cfs")) {
|
||||
// only do this if we are reading cfs or tip file - all other files don't need this.
|
||||
if (forceRamTermDict && input instanceof ByteBufferIndexInput) {
|
||||
return new DeoptimizingIndexInput(input.toString(), input);
|
||||
}
|
||||
}
|
||||
return input;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "store(" + in.toString() + ")";
|
||||
@ -1636,127 +1618,4 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
||||
// we also don't specify a codec here and merges should use the engines for this index
|
||||
.setMergePolicy(NoMergePolicy.INSTANCE);
|
||||
}
|
||||
|
||||
/**
|
||||
* see {@link #FORCE_RAM_TERM_DICT} for details
|
||||
*/
|
||||
private static final class DeoptimizingIndexInput extends IndexInput {
|
||||
|
||||
private final IndexInput in;
|
||||
|
||||
private DeoptimizingIndexInput(String resourceDescription, IndexInput in) {
|
||||
super(resourceDescription);
|
||||
this.in = in;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexInput clone() {
|
||||
return new DeoptimizingIndexInput(toString(), in.clone());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
in.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFilePointer() {
|
||||
return in.getFilePointer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void seek(long pos) throws IOException {
|
||||
in.seek(pos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long length() {
|
||||
return in.length();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return in.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
|
||||
return new DeoptimizingIndexInput(sliceDescription, in.slice(sliceDescription, offset, length));
|
||||
}
|
||||
|
||||
@Override
|
||||
public RandomAccessInput randomAccessSlice(long offset, long length) throws IOException {
|
||||
return in.randomAccessSlice(offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte readByte() throws IOException {
|
||||
return in.readByte();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readBytes(byte[] b, int offset, int len) throws IOException {
|
||||
in.readBytes(b, offset, len);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
|
||||
in.readBytes(b, offset, len, useBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public short readShort() throws IOException {
|
||||
return in.readShort();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readInt() throws IOException {
|
||||
return in.readInt();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readVInt() throws IOException {
|
||||
return in.readVInt();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readZInt() throws IOException {
|
||||
return in.readZInt();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long readLong() throws IOException {
|
||||
return in.readLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long readVLong() throws IOException {
|
||||
return in.readVLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long readZLong() throws IOException {
|
||||
return in.readZLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String readString() throws IOException {
|
||||
return in.readString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> readMapOfStrings() throws IOException {
|
||||
return in.readMapOfStrings();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> readSetOfStrings() throws IOException {
|
||||
return in.readSetOfStrings();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skipBytes(long numBytes) throws IOException {
|
||||
in.skipBytes(numBytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -60,8 +60,11 @@ import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.search.TotalHitCountCollector;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.BaseDirectoryWrapper;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FilterDirectory;
|
||||
import org.apache.lucene.store.Lock;
|
||||
import org.apache.lucene.store.MMapDirectory;
|
||||
import org.apache.lucene.store.MockDirectoryWrapper;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
@ -97,6 +100,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.codec.CodecService;
|
||||
@ -122,7 +126,9 @@ import org.elasticsearch.index.seqno.SeqNoStats;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.IndexSearcherWrapper;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardPath;
|
||||
import org.elasticsearch.index.shard.ShardUtils;
|
||||
import org.elasticsearch.index.store.FsDirectoryFactory;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.translog.SnapshotMatchers;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
@ -5677,4 +5683,59 @@ public class InternalEngineTests extends EngineTestCase {
|
||||
}
|
||||
assertThat(engine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L));
|
||||
}
|
||||
|
||||
public void testGetReaderAttributes() throws IOException {
|
||||
try(BaseDirectoryWrapper dir = newFSDirectory(createTempDir())) {
|
||||
Directory unwrap = FilterDirectory.unwrap(dir);
|
||||
boolean isMMap = unwrap instanceof MMapDirectory;
|
||||
Map<String, String> readerAttributes = InternalEngine.getReaderAttributes(dir);
|
||||
assertEquals(2, readerAttributes.size());
|
||||
assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst._id"));
|
||||
if (isMMap) {
|
||||
assertEquals("OFF_HEAP", readerAttributes.get("blocktree.terms.fst"));
|
||||
} else {
|
||||
assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst"));
|
||||
}
|
||||
}
|
||||
|
||||
try(MMapDirectory dir = new MMapDirectory(createTempDir())) {
|
||||
Map<String, String> readerAttributes =
|
||||
InternalEngine.getReaderAttributes(randomBoolean() ? dir :
|
||||
new MockDirectoryWrapper(random(), dir));
|
||||
assertEquals(2, readerAttributes.size());
|
||||
assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst._id"));
|
||||
assertEquals("OFF_HEAP", readerAttributes.get("blocktree.terms.fst"));
|
||||
}
|
||||
|
||||
Settings.Builder settingsBuilder = Settings.builder()
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT);
|
||||
Settings settings = settingsBuilder.build();
|
||||
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
|
||||
FsDirectoryFactory service = new FsDirectoryFactory();
|
||||
Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0");
|
||||
ShardPath path = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0));
|
||||
try (Directory directory = service.newDirectory(indexSettings, path)) {
|
||||
|
||||
Map<String, String> readerAttributes =
|
||||
InternalEngine.getReaderAttributes(randomBoolean() ? directory :
|
||||
new MockDirectoryWrapper(random(), directory));
|
||||
assertEquals(2, readerAttributes.size());
|
||||
|
||||
switch (IndexModule.defaultStoreType(true)) {
|
||||
case HYBRIDFS:
|
||||
case MMAPFS:
|
||||
assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst._id"));
|
||||
assertEquals("OFF_HEAP", readerAttributes.get("blocktree.terms.fst"));
|
||||
break;
|
||||
case NIOFS:
|
||||
case SIMPLEFS:
|
||||
case FS:
|
||||
assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst._id"));
|
||||
assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst"));
|
||||
break;
|
||||
default:
|
||||
fail("unknownw type");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -144,7 +144,7 @@ public class NoOpEngineTests extends EngineTestCase {
|
||||
assertEquals(expectedDocStats.getTotalSizeInBytes(), noOpEngine.docStats().getTotalSizeInBytes());
|
||||
assertEquals(expectedDocStats.getAverageSizeInBytes(), noOpEngine.docStats().getAverageSizeInBytes());
|
||||
assertEquals(expectedSegmentStats.getCount(), noOpEngine.segmentsStats(includeFileSize, true).getCount());
|
||||
assertEquals(expectedSegmentStats.getMemoryInBytes(), noOpEngine.segmentsStats(includeFileSize, true).getMemoryInBytes());
|
||||
// don't compare memory in bytes since we load the index with term-dict off-heap
|
||||
assertEquals(expectedSegmentStats.getFileSizes().size(),
|
||||
noOpEngine.segmentsStats(includeFileSize, true).getFileSizes().size());
|
||||
|
||||
|
@ -40,14 +40,12 @@ import org.apache.lucene.index.SegmentInfos;
|
||||
import org.apache.lucene.index.SnapshotDeletionPolicy;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.store.BaseDirectoryWrapper;
|
||||
import org.apache.lucene.store.ByteBufferIndexInput;
|
||||
import org.apache.lucene.store.ChecksumIndexInput;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FilterDirectory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.store.MMapDirectory;
|
||||
import org.apache.lucene.store.NIOFSDirectory;
|
||||
import org.apache.lucene.store.RAMDirectory;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
@ -1084,48 +1082,6 @@ public class StoreTests extends ESTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testDeoptimizeMMap() throws IOException {
|
||||
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("index",
|
||||
Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
|
||||
.put(Store.FORCE_RAM_TERM_DICT.getKey(), true).build());
|
||||
final ShardId shardId = new ShardId("index", "_na_", 1);
|
||||
String file = "test." + (randomBoolean() ? "tip" : "cfs");
|
||||
try (Store store = new Store(shardId, indexSettings, new MMapDirectory(createTempDir()), new DummyShardLock(shardId))) {
|
||||
try (IndexOutput output = store.directory().createOutput(file, IOContext.DEFAULT)) {
|
||||
output.writeInt(0);
|
||||
}
|
||||
try (IndexOutput output = store.directory().createOutput("someOtherFile.txt", IOContext.DEFAULT)) {
|
||||
output.writeInt(0);
|
||||
}
|
||||
try (IndexInput input = store.directory().openInput(file, IOContext.DEFAULT)) {
|
||||
assertFalse(input instanceof ByteBufferIndexInput);
|
||||
assertFalse(input.clone() instanceof ByteBufferIndexInput);
|
||||
assertFalse(input.slice("foo", 1, 1) instanceof ByteBufferIndexInput);
|
||||
}
|
||||
|
||||
try (IndexInput input = store.directory().openInput("someOtherFile.txt", IOContext.DEFAULT)) {
|
||||
assertTrue(input instanceof ByteBufferIndexInput);
|
||||
assertTrue(input.clone() instanceof ByteBufferIndexInput);
|
||||
assertTrue(input.slice("foo", 1, 1) instanceof ByteBufferIndexInput);
|
||||
}
|
||||
}
|
||||
|
||||
indexSettings = IndexSettingsModule.newIndexSettings("index",
|
||||
Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
|
||||
.put(Store.FORCE_RAM_TERM_DICT.getKey(), false).build());
|
||||
|
||||
try (Store store = new Store(shardId, indexSettings, new MMapDirectory(createTempDir()), new DummyShardLock(shardId))) {
|
||||
try (IndexOutput output = store.directory().createOutput(file, IOContext.DEFAULT)) {
|
||||
output.writeInt(0);
|
||||
}
|
||||
try (IndexInput input = store.directory().openInput(file, IOContext.DEFAULT)) {
|
||||
assertTrue(input instanceof ByteBufferIndexInput);
|
||||
assertTrue(input.clone() instanceof ByteBufferIndexInput);
|
||||
assertTrue(input.slice("foo", 1, 1) instanceof ByteBufferIndexInput);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testGetPendingFiles() throws IOException {
|
||||
final ShardId shardId = new ShardId("index", "_na_", 1);
|
||||
final String testfile = "testfile";
|
||||
|
@ -78,7 +78,7 @@ public final class FrozenEngine extends ReadOnlyEngine {
|
||||
|
||||
boolean success = false;
|
||||
Directory directory = store.directory();
|
||||
try (DirectoryReader reader = DirectoryReader.open(directory)) {
|
||||
try (DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES)) {
|
||||
canMatchReader = ElasticsearchDirectoryReader.wrap(new RewriteCachingDirectoryReader(directory, reader.leaves()),
|
||||
config.getShardId());
|
||||
// we record the segment stats here - that's what the reader needs when it's open and it give the user
|
||||
@ -168,7 +168,7 @@ public final class FrozenEngine extends ReadOnlyEngine {
|
||||
for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) {
|
||||
listeners.beforeRefresh();
|
||||
}
|
||||
reader = DirectoryReader.open(engineConfig.getStore().directory());
|
||||
reader = DirectoryReader.open(engineConfig.getStore().directory(), OFF_HEAP_READER_ATTRIBUTES);
|
||||
processReader(reader);
|
||||
reader = lastOpenedReader = wrapReader(reader, Function.identity());
|
||||
reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed);
|
||||
|
@ -38,6 +38,7 @@ import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.FixedBitSet;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.engine.ReadOnlyEngine;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
@ -80,7 +81,8 @@ public class SourceOnlySnapshot {
|
||||
List<String> createdFiles = new ArrayList<>();
|
||||
String segmentFileName;
|
||||
try (Lock writeLock = targetDirectory.obtainLock(IndexWriter.WRITE_LOCK_NAME);
|
||||
StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(commit)) {
|
||||
StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(commit,
|
||||
ReadOnlyEngine.OFF_HEAP_READER_ATTRIBUTES)) {
|
||||
SegmentInfos segmentInfos = reader.getSegmentInfos().clone();
|
||||
DirectoryReader wrappedReader = wrapReader(reader);
|
||||
List<SegmentCommitInfo> newInfos = new ArrayList<>();
|
||||
|
@ -135,7 +135,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
|
||||
final long maxDoc = segmentInfos.totalMaxDoc();
|
||||
tempStore.bootstrapNewHistory(maxDoc, maxDoc);
|
||||
store.incRef();
|
||||
try (DirectoryReader reader = DirectoryReader.open(tempStore.directory())) {
|
||||
try (DirectoryReader reader = DirectoryReader.open(tempStore.directory(), ReadOnlyEngine.OFF_HEAP_READER_ATTRIBUTES)) {
|
||||
IndexCommit indexCommit = reader.getIndexCommit();
|
||||
super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus);
|
||||
} finally {
|
||||
|
@ -19,6 +19,7 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.DocsStats;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
@ -146,12 +147,17 @@ public class FrozenEngineTests extends EngineTestCase {
|
||||
null, listener, null, globalCheckpoint::get, new HierarchyCircuitBreakerService(defaultSettings.getSettings(),
|
||||
new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
|
||||
CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING);
|
||||
long expectedUse;
|
||||
final int docs;
|
||||
try (InternalEngine engine = createEngine(config)) {
|
||||
addDocuments(globalCheckpoint, engine);
|
||||
docs = addDocuments(globalCheckpoint, engine);
|
||||
engine.flush(false, true); // first flush to make sure we have a commit that we open in the frozen engine blow.
|
||||
engine.refresh("test"); // pull the reader to account for RAM in the breaker.
|
||||
}
|
||||
final long expectedUse;
|
||||
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, i -> i)) {
|
||||
expectedUse = breaker.getUsed();
|
||||
DocsStats docsStats = readOnlyEngine.docStats();
|
||||
assertEquals(docs, docsStats.getCount());
|
||||
}
|
||||
assertTrue(expectedUse > 0);
|
||||
assertEquals(0, breaker.getUsed());
|
||||
|
@ -46,7 +46,6 @@ import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
@ -64,7 +63,7 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
||||
return pluginList(XPackPlugin.class);
|
||||
}
|
||||
|
||||
public void testCloseFreezeAndOpen() throws ExecutionException, InterruptedException {
|
||||
public void testCloseFreezeAndOpen() {
|
||||
createIndex("index", Settings.builder().put("index.number_of_shards", 2).build());
|
||||
client().prepareIndex("index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
client().prepareIndex("index", "_doc", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
@ -106,7 +105,7 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
||||
} while (searchResponse.getHits().getHits().length > 0);
|
||||
}
|
||||
|
||||
public void testSearchAndGetAPIsAreThrottled() throws InterruptedException, IOException, ExecutionException {
|
||||
public void testSearchAndGetAPIsAreThrottled() throws InterruptedException, IOException {
|
||||
XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject("_doc")
|
||||
.startObject("properties").startObject("field").field("type", "text").field("term_vector", "with_positions_offsets_payloads")
|
||||
.endObject().endObject()
|
||||
@ -150,7 +149,7 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
||||
assertEquals(numRefreshes, index.getTotal().refresh.getTotal());
|
||||
}
|
||||
|
||||
public void testFreezeAndUnfreeze() throws InterruptedException, ExecutionException {
|
||||
public void testFreezeAndUnfreeze() {
|
||||
createIndex("index", Settings.builder().put("index.number_of_shards", 2).build());
|
||||
client().prepareIndex("index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
client().prepareIndex("index", "_doc", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
@ -190,7 +189,7 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
||||
assertTrue(FrozenEngine.INDEX_FROZEN.get(indexService.getIndexSettings().getSettings()));
|
||||
}
|
||||
|
||||
public void testDoubleFreeze() throws ExecutionException, InterruptedException {
|
||||
public void testDoubleFreeze() {
|
||||
createIndex("test-idx", Settings.builder().put("index.number_of_shards", 2).build());
|
||||
XPackClient xPackClient = new XPackClient(client());
|
||||
assertAcked(xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("test-idx")));
|
||||
@ -201,7 +200,7 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
||||
assertEquals("no index found to freeze", executionException.getCause().getMessage());
|
||||
}
|
||||
|
||||
public void testUnfreezeClosedIndices() throws ExecutionException, InterruptedException {
|
||||
public void testUnfreezeClosedIndices() {
|
||||
createIndex("idx", Settings.builder().put("index.number_of_shards", 1).build());
|
||||
client().prepareIndex("idx", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
createIndex("idx-closed", Settings.builder().put("index.number_of_shards", 1).build());
|
||||
@ -217,7 +216,7 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
||||
assertHitCount(client().prepareSearch().get(), 1L);
|
||||
}
|
||||
|
||||
public void testFreezePattern() throws ExecutionException, InterruptedException {
|
||||
public void testFreezePattern() {
|
||||
createIndex("test-idx", Settings.builder().put("index.number_of_shards", 1).build());
|
||||
client().prepareIndex("test-idx", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
createIndex("test-idx-1", Settings.builder().put("index.number_of_shards", 1).build());
|
||||
@ -241,7 +240,7 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
||||
assertEquals(0, index.getTotal().refresh.getTotal());
|
||||
}
|
||||
|
||||
public void testCanMatch() throws ExecutionException, InterruptedException, IOException {
|
||||
public void testCanMatch() throws IOException {
|
||||
createIndex("index");
|
||||
client().prepareIndex("index", "_doc", "1").setSource("field", "2010-01-05T02:00").setRefreshPolicy(IMMEDIATE).execute()
|
||||
.actionGet();
|
||||
@ -298,7 +297,7 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testWriteToFrozenIndex() throws ExecutionException, InterruptedException {
|
||||
public void testWriteToFrozenIndex() {
|
||||
createIndex("idx", Settings.builder().put("index.number_of_shards", 1).build());
|
||||
client().prepareIndex("idx", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
XPackClient xPackClient = new XPackClient(client());
|
||||
@ -308,7 +307,7 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
||||
client().prepareIndex("idx", "_doc", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get());
|
||||
}
|
||||
|
||||
public void testIgnoreUnavailable() throws ExecutionException, InterruptedException {
|
||||
public void testIgnoreUnavailable() {
|
||||
createIndex("idx", Settings.builder().put("index.number_of_shards", 1).build());
|
||||
createIndex("idx-close", Settings.builder().put("index.number_of_shards", 1).build());
|
||||
assertAcked(client().admin().indices().prepareClose("idx-close"));
|
||||
@ -320,7 +319,7 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
||||
client().admin().cluster().prepareState().get().getState().metaData().index("idx-close").getState());
|
||||
}
|
||||
|
||||
public void testUnfreezeClosedIndex() throws ExecutionException, InterruptedException {
|
||||
public void testUnfreezeClosedIndex() {
|
||||
createIndex("idx", Settings.builder().put("index.number_of_shards", 1).build());
|
||||
XPackClient xPackClient = new XPackClient(client());
|
||||
assertAcked(xPackClient.freeze(new TransportFreezeIndexAction.FreezeRequest("idx")));
|
||||
@ -337,7 +336,7 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
||||
client().admin().cluster().prepareState().get().getState().metaData().index("idx").getState());
|
||||
}
|
||||
|
||||
public void testFreezeIndexIncreasesIndexSettingsVersion() throws ExecutionException, InterruptedException {
|
||||
public void testFreezeIndexIncreasesIndexSettingsVersion() {
|
||||
final String index = "test";
|
||||
createIndex(index, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build());
|
||||
client().prepareIndex(index, "_doc").setSource("field", "value").execute().actionGet();
|
||||
@ -378,7 +377,7 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
||||
assertIndexFrozen(indexName);
|
||||
}
|
||||
|
||||
public void testRecoveryState() throws ExecutionException, InterruptedException {
|
||||
public void testRecoveryState() {
|
||||
final String indexName = "index_recovery_state";
|
||||
createIndex(indexName, Settings.builder()
|
||||
.put("index.number_of_replicas", 0)
|
||||
|
Loading…
x
Reference in New Issue
Block a user