upgrade to Lucene 8.6.0 snapshot (#56661)

This commit is contained in:
Ignacio Vera 2020-05-13 14:25:16 +02:00 committed by GitHub
parent cbbbd499bf
commit b4521d5183
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
66 changed files with 331 additions and 430 deletions

View File

@ -1,5 +1,5 @@
elasticsearch = 7.9.0
lucene = 8.5.1
lucene = 8.6.0-snapshot-6c9024f7735
bundled_jdk_vendor = adoptopenjdk
bundled_jdk = 14+36

View File

@ -1,8 +1,8 @@
include::{asciidoc-dir}/../../shared/versions/stack/{source_branch}.asciidoc[]
:lucene_version: 8.5.1
:lucene_version_path: 8_5_1
:lucene_version: 8.6.0
:lucene_version_path: 8_6_0
:jdk: 1.8.0_131
:jdk_major: 8
:build_flavor: default

View File

@ -1 +0,0 @@
666436c6624adac8af49623e7ac58d565bd88902

View File

@ -0,0 +1 @@
f7e94697a2f2e65aa19056118ddaa1a00df7ebbc

View File

@ -19,6 +19,7 @@
package org.elasticsearch.percolator;
import org.apache.lucene.document.BinaryRange;
import org.apache.lucene.index.PrefixCodedTerms;
import org.apache.lucene.index.Term;
import org.apache.lucene.queries.BlendedTermQuery;
import org.apache.lucene.search.BooleanClause.Occur;
@ -38,6 +39,7 @@ import org.apache.lucene.search.spans.SpanOrQuery;
import org.apache.lucene.search.spans.SpanTermQuery;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.NumericUtils;
import org.apache.lucene.util.automaton.ByteRunAutomaton;
import org.elasticsearch.Version;
import org.elasticsearch.common.lucene.search.function.FunctionScoreQuery;
import org.elasticsearch.index.query.DateRangeIncludingNowQuery;
@ -51,6 +53,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
final class QueryAnalyzer {
@ -218,6 +221,22 @@ final class QueryAnalyzer {
}
}
@Override
public void consumeTermsMatching(Query query, String field, Supplier<ByteRunAutomaton> automaton) {
if (query instanceof TermInSetQuery) {
TermInSetQuery q = (TermInSetQuery) query;
PrefixCodedTerms.TermIterator ti = q.getTermData().iterator();
BytesRef term;
Set<QueryExtraction> qe = new HashSet<>();
while ((term = ti.next()) != null) {
qe.add(new QueryExtraction(new Term(field, term)));
}
this.terms.add(new Result(true, qe, 1));
} else {
super.consumeTermsMatching(query, field, automaton);
}
}
}
private static Result pointRangeQuery(PointRangeQuery query) {

View File

@ -1 +0,0 @@
0a8422b9b8a1b936ff354add5fa70e8e74497b30

View File

@ -0,0 +1 @@
891b25ddde3edffe59f9f25345762078203ddb29

View File

@ -1 +0,0 @@
87015734d14c46347fac8b6e5f52ea972082a34e

View File

@ -0,0 +1 @@
56f0262ae595875e6e163b8e31d8fcc464c208fe

View File

@ -1 +0,0 @@
bb3a59f0e68d659d677a9534282b94a3caaf20be

View File

@ -0,0 +1 @@
0b97440d8349b6e19059ef1f8566ea8753166e81

View File

@ -1 +0,0 @@
f281e8f6446250e0b2ef93768b9f822f4a2dc7b5

View File

@ -0,0 +1 @@
2b65d6825a96eb2bea79d76606fdd76a789a3cd4

View File

@ -1 +0,0 @@
d6f919075b16eb42461500838367227c467b633c

View File

@ -0,0 +1 @@
f88549ade9a0f77856d3b7808920e105e9d61bc4

View File

@ -1 +0,0 @@
6af456327323cf6897a5fe64ba9628556665094b

View File

@ -0,0 +1 @@
ea5671e66acb6f70a6c7cd16276b24ed0751dbf5

View File

@ -1 +0,0 @@
1994c5719e4a6e39aaffdb2b5832511d87fbc675

View File

@ -0,0 +1 @@
bac12d02041e93e9c73c99ac0e7798a0382453c7

View File

@ -19,14 +19,20 @@
package org.elasticsearch.index.engine;
import org.apache.lucene.index.FilterMergePolicy;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.MergeTrigger;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.index.mapper.ParsedDocument;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -54,31 +60,72 @@ public class EvilInternalEngineTests extends EngineTestCase {
});
final AtomicReference<List<SegmentCommitInfo>> segmentsReference = new AtomicReference<>();
final FilterMergePolicy mergePolicy = new FilterMergePolicy(newMergePolicy()) {
@Override
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, int maxSegmentCount,
Map<SegmentCommitInfo, Boolean> segmentsToMerge,
MergeContext mergeContext) throws IOException {
final List<SegmentCommitInfo> segments = segmentsReference.get();
if (segments != null) {
final MergeSpecification spec = new MergeSpecification();
spec.add(new OneMerge(segments));
return spec;
}
return super.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, mergeContext);
}
@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos,
MergeContext mergeContext) throws IOException {
final List<SegmentCommitInfo> segments = segmentsReference.get();
if (segments != null) {
final MergeSpecification spec = new MergeSpecification();
spec.add(new OneMerge(segments));
return spec;
}
return super.findMerges(mergeTrigger, segmentInfos, mergeContext);
}
};
try (Engine e = createEngine(
defaultSettings,
store,
primaryTranslogDir,
newMergePolicy(),
(directory, iwc) -> new IndexWriter(directory, iwc) {
@Override
public void merge(final MergePolicy.OneMerge merge) throws IOException {
throw new OutOfMemoryError("640K ought to be enough for anybody");
}
mergePolicy,
(directory, iwc) -> {
final MergeScheduler mergeScheduler = iwc.getMergeScheduler();
assertNotNull(mergeScheduler);
iwc.setMergeScheduler(new FilterMergeScheduler(mergeScheduler) {
@Override
public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
final FilterMergeSource wrappedMergeSource = new FilterMergeSource(mergeSource) {
@Override
public MergePolicy.OneMerge getNextMerge() {
synchronized (mergeSource) {
/*
* This will be called when we flush when we will not be ready to return the segments.
* After the segments are on disk, we can only return them from here once or the merge
* scheduler will be stuck in a loop repeatedly peeling off the same segments to schedule
* for merging.
*/
if (segmentsReference.get() == null) {
return super.getNextMerge();
} else {
final List<SegmentCommitInfo> segments = segmentsReference.getAndSet(null);
return new MergePolicy.OneMerge(segments);
}
}
}
@Override
public synchronized MergePolicy.OneMerge getNextMerge() {
/*
* This will be called when we flush when we will not be ready to return the segments. After the segments are on
* disk, we can only return them from here once or the merge scheduler will be stuck in a loop repeatedly
* peeling off the same segments to schedule for merging.
*/
if (segmentsReference.get() == null) {
return super.getNextMerge();
} else {
final List<SegmentCommitInfo> segments = segmentsReference.getAndSet(null);
return new MergePolicy.OneMerge(segments);
@Override
public void merge(MergePolicy.OneMerge merge) {
throw new OutOfMemoryError("640K ought to be enough for anybody");
}
};
super.merge(wrappedMergeSource, trigger);
}
}
});
return new IndexWriter(directory, iwc);
},
null,
null)) {
@ -105,5 +152,54 @@ public class EvilInternalEngineTests extends EngineTestCase {
}
}
static class FilterMergeScheduler extends MergeScheduler {
private final MergeScheduler delegate;
FilterMergeScheduler(MergeScheduler delegate) {
this.delegate = delegate;
}
@Override
public Directory wrapForMerge(MergePolicy.OneMerge merge, Directory in) {
return delegate.wrapForMerge(merge, in);
}
@Override
public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
delegate.merge(mergeSource, trigger);
}
@Override
public void close() throws IOException {
delegate.close();
}
}
static class FilterMergeSource implements MergeScheduler.MergeSource {
private final MergeScheduler.MergeSource delegate;
FilterMergeSource(MergeScheduler.MergeSource delegate) {
this.delegate = delegate;
}
@Override
public MergePolicy.OneMerge getNextMerge() {
return delegate.getNextMerge();
}
@Override
public void onMergeFinished(MergePolicy.OneMerge merge) {
delegate.onMergeFinished(merge);
}
@Override
public boolean hasPendingMerges() {
return delegate.hasPendingMerges();
}
@Override
public void merge(MergePolicy.OneMerge merge) throws IOException {
delegate.merge(merge);
}
}
}

View File

@ -1 +0,0 @@
704685ddf536e1af4da025b6e6f4e50b9846ef18

View File

@ -0,0 +1 @@
e7fb1f82ecc255e015ae6fe8bb77c8ec8b8748ce

View File

@ -1 +0,0 @@
0ab12c24a7c33ef5dfe8b57f17f67fec4a3fee1c

View File

@ -0,0 +1 @@
637bd4a785df39cf7720160e3d7ef40eabad13fc

View File

@ -1 +0,0 @@
24212de43c19269f5211f3e79eb2f414c4a0254b

View File

@ -0,0 +1 @@
ec8401dfd1b41113eba06f3d626a1f2fdf589335

View File

@ -1 +0,0 @@
4404f3ff6341b7518843d09141df743bf91a8284

View File

@ -0,0 +1 @@
138abf463c27088a50a2b7c4a267694386c3a0cb

View File

@ -1 +0,0 @@
142f5f249aa0803f8283a3d08615e37a56f40e8a

View File

@ -0,0 +1 @@
d8d618145b7eff4ea1a4081ef1918e8967b5ec8e

View File

@ -1 +0,0 @@
b0a48846662fc504bd7796b5506dad94981fca08

View File

@ -0,0 +1 @@
4b09c5c1a69ce6fe22328006d9129585009eb41a

View File

@ -1 +0,0 @@
ba9e24b90323aacc98a4ac661ac34bfbf0ed66d8

View File

@ -0,0 +1 @@
1491780984dc014d34d3d1d0c6656630ba67ca98

View File

@ -1 +0,0 @@
a0418e9bc16fc876448accb828a6ca38ed63d4a8

View File

@ -0,0 +1 @@
08a78a91082bd6ae7e4e5535060a1e59a51d8983

View File

@ -1 +0,0 @@
269c67a4ee9b806cfdacddc211744243cbcbd127

View File

@ -0,0 +1 @@
a58e4fa4d7390d0c2dfaa7697702e0c4ab5add48

View File

@ -1 +0,0 @@
ee5ba0e07a178a32987b0a92da149f2104e26dd9

View File

@ -0,0 +1 @@
745ed85193fa82ef75ac92be524e90d89ead7345

View File

@ -1 +0,0 @@
f1461680109e499d8c58dcaf5d314aeeef41d99a

View File

@ -0,0 +1 @@
e46dc362bfbac1609f4ecbd7838acd5dae1aa960

View File

@ -1 +0,0 @@
eece1ef3f919634c79b9ae9d99264ac9efa4276c

View File

@ -0,0 +1 @@
167ea704134a3e5bd6dd93283c030c21d3360c63

View File

@ -1 +0,0 @@
a8fb2771ac562d60a3c945a4cef0e3742c390329

View File

@ -0,0 +1 @@
f20507834f2b8c6103ffdc08ac687bdf73d45a09

View File

@ -1 +0,0 @@
b5613f4995836fd9edae5925ed38559460721492

View File

@ -0,0 +1 @@
ac6c604ce977f2e44a13159021ba4133594ccc40

View File

@ -136,7 +136,7 @@ public class Version implements Comparable<Version>, ToXContentFragment {
public static final Version V_7_6_2 = new Version(7060299, org.apache.lucene.util.Version.LUCENE_8_4_0);
public static final Version V_7_7_0 = new Version(7070099, org.apache.lucene.util.Version.LUCENE_8_5_1);
public static final Version V_7_8_0 = new Version(7080099, org.apache.lucene.util.Version.LUCENE_8_5_1);
public static final Version V_7_9_0 = new Version(7090099, org.apache.lucene.util.Version.LUCENE_8_5_1);
public static final Version V_7_9_0 = new Version(7090099, org.apache.lucene.util.Version.LUCENE_8_6_0);
public static final Version CURRENT = V_7_9_0;
private static final ImmutableOpenIntMap<Version> idToVersion;

View File

@ -171,7 +171,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.DEFAULT_PIPELINE,
IndexSettings.FINAL_PIPELINE,
MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING,
IndexSettings.ON_HEAP_ID_TERMS_INDEX,
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING,
// validate that built-in similarities don't get redefined
@ -180,7 +179,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
for (String key : SimilarityService.BUILT_IN.keySet()) {
if (groups.containsKey(key)) {
throw new IllegalArgumentException("illegal value for [index.similarity." + key +
"] cannot redefine built-in similarity");
"] cannot redefine built-in similarity");
}
}
}, Property.IndexScope), // this allows similarity settings to be passed

View File

@ -84,9 +84,6 @@ public final class IndexSettings {
"[true, false, checksum] but was: " + s);
}
}, Property.IndexScope);
// This setting is undocumented as it is considered as an escape hatch.
public static final Setting<Boolean> ON_HEAP_ID_TERMS_INDEX =
Setting.boolSetting("index.force_memory_id_terms_dictionary", false, Property.IndexScope);
/**
* Index setting describing the maximum value of from + size on a query.

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.engine;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.OneMergeHelper;
@ -80,7 +79,7 @@ class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
}
@Override
protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
int totalNumDocs = merge.totalNumDocs();
long totalSizeInBytes = merge.totalBytesSize();
long timeNS = System.nanoTime();
@ -98,7 +97,7 @@ class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
}
try {
beforeMerge(onGoingMerge);
super.doMerge(writer, merge);
super.doMerge(mergeSource, merge);
} finally {
long tookMS = TimeValue.nsecToMSec(System.nanoTime() - timeNS);
@ -162,14 +161,14 @@ class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
}
@Override
protected boolean maybeStall(IndexWriter writer) {
protected boolean maybeStall(MergeSource mergeSource) {
// Don't stall here, because we do our own index throttling (in InternalEngine.IndexThrottle) when merges can't keep up
return true;
}
@Override
protected MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
MergeThread thread = super.getMergeThread(writer, merge);
protected MergeThread getMergeThread(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
MergeThread thread = super.getMergeThread(mergeSource, merge);
thread.setName(EsExecutors.threadName(indexSettings, "[" + shardId.getIndexName() + "][" + shardId.id() + "]: " +
thread.getName()));
return thread;

View File

@ -20,9 +20,6 @@
package org.elasticsearch.index.engine;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader;
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader.FSTLoadMode;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
@ -30,7 +27,6 @@ import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
@ -52,9 +48,7 @@ import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.Assertions;
@ -94,7 +88,6 @@ import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
@ -108,7 +101,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
@ -201,16 +193,16 @@ public class InternalEngine extends Engine {
}
InternalEngine(
final EngineConfig engineConfig,
final BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
final EngineConfig engineConfig,
final BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
super(engineConfig);
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
updateAutoIdTimestamp(Long.MAX_VALUE, true);
}
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
);
store.incRef();
IndexWriter writer = null;
@ -319,10 +311,10 @@ public class InternalEngine extends Engine {
lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1;
}
return new SoftDeletesPolicy(
translog::getLastSyncedGlobalCheckpoint,
lastMinRetainedSeqNo,
engineConfig.getIndexSettings().getSoftDeleteRetentionOperations(),
engineConfig.retentionLeasesSupplier());
translog::getLastSyncedGlobalCheckpoint,
lastMinRetainedSeqNo,
engineConfig.getIndexSettings().getSoftDeleteRetentionOperations(),
engineConfig.retentionLeasesSupplier());
}
@Override
@ -432,9 +424,9 @@ public class InternalEngine extends Engine {
final long maxSeqNo = localCheckpointTracker.getMaxSeqNo();
int numNoOpsAdded = 0;
for (
long seqNo = localCheckpoint + 1;
seqNo <= maxSeqNo;
seqNo = localCheckpointTracker.getProcessedCheckpoint() + 1 /* leap-frog the local checkpoint */) {
long seqNo = localCheckpoint + 1;
seqNo <= maxSeqNo;
seqNo = localCheckpointTracker.getProcessedCheckpoint() + 1 /* leap-frog the local checkpoint */) {
innerNoOp(new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling gaps"));
numNoOpsAdded++;
assert seqNo <= localCheckpointTracker.getProcessedCheckpoint() :
@ -643,7 +635,7 @@ public class InternalEngine extends Engine {
final ElasticsearchDirectoryReader directoryReader =
ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
internalReaderManager = new ElasticsearchReaderManager(directoryReader,
new RamAccountingRefreshListener(engineConfig.getCircuitBreakerService()));
new RamAccountingRefreshListener(engineConfig.getCircuitBreakerService()));
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
ExternalReaderManager externalReaderManager = new ExternalReaderManager(internalReaderManager, externalRefreshListener);
success = true;
@ -686,7 +678,7 @@ public class InternalEngine extends Engine {
}
if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term
)) {
)) {
throw new VersionConflictEngineException(shardId, get.id(),
get.getIfSeqNo(), get.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term);
}
@ -789,7 +781,7 @@ public class InternalEngine extends Engine {
assert incrementIndexVersionLookup(); // used for asserting in tests
final VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion;
try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) {
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.getIndexReader(), op.uid(), loadSeqNo);
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.getIndexReader(), op.uid(), loadSeqNo);
}
if (docIdAndVersion != null) {
versionValue = new IndexVersionValue(null, docIdAndVersion.version, docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);
@ -860,7 +852,7 @@ public class InternalEngine extends Engine {
protected boolean assertPrimaryIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
// sequence number should not be set when operation origin is primary
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO
: "primary operations must never have an assigned sequence number but was [" + seqNo + "]";
: "primary operations must never have an assigned sequence number but was [" + seqNo + "]";
return true;
}
@ -893,7 +885,7 @@ public class InternalEngine extends Engine {
ensureOpen();
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
try (Releasable ignored = versionMap.acquireLock(index.uid().bytes());
Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) {
Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) {
lastWriteNanos = index.startTime();
/* A NOTE ABOUT APPEND ONLY OPTIMIZATIONS:
* if we have an autoGeneratedID that comes into the engine we can potentially optimize
@ -1074,7 +1066,7 @@ public class InternalEngine extends Engine {
} else if (index.versionType().isVersionConflictForWrites(
currentVersion, index.version(), currentNotFoundOrDeleted)) {
final VersionConflictEngineException e =
new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted);
new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
} else {
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
@ -1217,10 +1209,10 @@ public class InternalEngine extends Engine {
}
public static IndexingStrategy skipDueToVersionConflict(
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) {
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) {
final IndexResult result = new IndexResult(e, currentVersion);
return new IndexingStrategy(
currentNotFoundOrDeleted, false, false, false,
currentNotFoundOrDeleted, false, false, false,
Versions.NOT_FOUND, result);
}
@ -1483,7 +1475,7 @@ public class InternalEngine extends Engine {
}
public static DeletionStrategy skipDueToVersionConflict(
VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) {
VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) {
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
SequenceNumbers.UNASSIGNED_SEQ_NO, currentlyDeleted == false);
return new DeletionStrategy(false, false, currentlyDeleted, Versions.NOT_FOUND, deleteResult);
@ -1508,7 +1500,7 @@ public class InternalEngine extends Engine {
// It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it
// every 1/4 of gcDeletesInMillis:
if (engineConfig.isEnableGcDeletes() &&
engineConfig.getThreadPool().relativeTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) {
engineConfig.getThreadPool().relativeTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) {
pruneDeletedTombstones();
}
}
@ -1833,12 +1825,12 @@ public class InternalEngine extends Engine {
}
private void refreshLastCommittedSegmentInfos() {
/*
* we have to inc-ref the store here since if the engine is closed by a tragic event
* we don't acquire the write lock and wait until we have exclusive access. This might also
* dec the store reference which can essentially close the store and unless we can inc the reference
* we can't use it.
*/
/*
* we have to inc-ref the store here since if the engine is closed by a tragic event
* we don't acquire the write lock and wait until we have exclusive access. This might also
* dec the store reference which can essentially close the store and unless we can inc the reference
* we can't use it.
*/
store.incRef();
try {
// reread the last committed segment infos
@ -2104,8 +2096,8 @@ public class InternalEngine extends Engine {
if (e instanceof AlreadyClosedException) {
return failOnTragicEvent((AlreadyClosedException)e);
} else if (e != null &&
((indexWriter.isOpen() == false && indexWriter.getTragicException() == e)
|| (translog.isOpen() == false && translog.getTragicException() == e))) {
((indexWriter.isOpen() == false && indexWriter.getTragicException() == e)
|| (translog.isOpen() == false && translog.getTragicException() == e))) {
// this spot on - we are handling the tragic event exception here so we have to fail the engine
// right away
failEngine(source, e);
@ -2231,24 +2223,10 @@ public class InternalEngine extends Engine {
}
}
static Map<String, String> getReaderAttributes(Directory directory, IndexSettings indexSettings) {
Directory unwrap = FilterDirectory.unwrap(directory);
boolean defaultOffHeap = FsDirectoryFactory.isHybridFs(unwrap) || unwrap instanceof MMapDirectory;
Map<String, String> attributes = new HashMap<>();
attributes.put(BlockTreeTermsReader.FST_MODE_KEY, defaultOffHeap ? FSTLoadMode.OFF_HEAP.name() : FSTLoadMode.ON_HEAP.name());
if (IndexSettings.ON_HEAP_ID_TERMS_INDEX.exists(indexSettings.getSettings())) {
final boolean idOffHeap = IndexSettings.ON_HEAP_ID_TERMS_INDEX.get(indexSettings.getSettings()) == false;
attributes.put(BlockTreeTermsReader.FST_MODE_KEY + "." + IdFieldMapper.NAME,
idOffHeap ? FSTLoadMode.OFF_HEAP.name() : FSTLoadMode.ON_HEAP.name());
}
return Collections.unmodifiableMap(attributes);
}
private IndexWriterConfig getIndexWriterConfig() {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
iwc.setReaderAttributes(getReaderAttributes(store.directory(), engineConfig.getIndexSettings()));
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
@ -2378,7 +2356,7 @@ public class InternalEngine extends Engine {
}
}
if (indexWriter.hasPendingMerges() == false &&
System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
// NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the writer
// we deadlock on engine#close for instance.
engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
@ -2411,7 +2389,7 @@ public class InternalEngine extends Engine {
}
@Override
protected void handleMergeException(final Directory dir, final Throwable exc) {
protected void handleMergeException(final Throwable exc) {
engineConfig.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
@ -2425,7 +2403,7 @@ public class InternalEngine extends Engine {
* confidence that the call stack does not contain catch statements that would cause the error that might be thrown
* here from being caught and never reaching the uncaught exception handler.
*/
failEngine("merge failed", new MergePolicy.MergeException(exc, dir));
failEngine("merge failed", new MergePolicy.MergeException(exc));
}
});
}
@ -2722,32 +2700,11 @@ public class InternalEngine extends Engine {
super(d, conf);
}
@Override
public long updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException {
assert softDeleteEnabled == false : "Call #updateDocument but soft-deletes is enabled";
return super.updateDocument(term, doc);
}
@Override
public long updateDocuments(Term delTerm, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
assert softDeleteEnabled == false : "Call #updateDocuments but soft-deletes is enabled";
return super.updateDocuments(delTerm, docs);
}
@Override
public long deleteDocuments(Term... terms) throws IOException {
assert softDeleteEnabled == false : "Call #deleteDocuments but soft-deletes is enabled";
return super.deleteDocuments(terms);
}
@Override
public long softUpdateDocument(Term term, Iterable<? extends IndexableField> doc, Field... softDeletes) throws IOException {
assert softDeleteEnabled : "Call #softUpdateDocument but soft-deletes is disabled";
return super.softUpdateDocument(term, doc, softDeletes);
}
@Override
public long softUpdateDocuments(Term term, Iterable<? extends Iterable<? extends IndexableField>> docs,
Field... softDeletes) throws IOException {
assert softDeleteEnabled : "Call #softUpdateDocuments but soft-deletes is disabled";
return super.softUpdateDocuments(term, docs, softDeletes);
}
@Override
public long tryDeleteDocument(IndexReader readerIn, int docID) {
assert false : "#tryDeleteDocument is not supported. See Lucene#DirectoryReaderWithAllLiveDocs";
throw new UnsupportedOperationException();
@ -2875,7 +2832,7 @@ public class InternalEngine extends Engine {
searcher.setQueryCache(null);
final Query query = new BooleanQuery.Builder()
.add(LongPoint.newRangeQuery(
SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE), BooleanClause.Occur.MUST)
SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE), BooleanClause.Occur.MUST)
// exclude non-root nested documents
.add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.MUST)
.build();

View File

@ -18,7 +18,6 @@
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
@ -47,9 +46,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.function.Function;
@ -68,8 +65,6 @@ public class ReadOnlyEngine extends Engine {
* Reader attributes used for read only engines. These attributes prevent loading term dictionaries on-heap even if the field is an
* ID field.
*/
private static final Map<String, String> OFF_HEAP_READER_ATTRIBUTES = Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY,
BlockTreeTermsReader.FSTLoadMode.OFF_HEAP.name());
private final SegmentInfos lastCommittedSegmentInfos;
private final SeqNoStats seqNoStats;
private final ElasticsearchReaderManager readerManager;
@ -186,7 +181,7 @@ public class ReadOnlyEngine extends Engine {
protected DirectoryReader open(IndexCommit commit) throws IOException {
assert Transports.assertNotTransportThread("opening index commit of a read-only engine");
return DirectoryReader.open(commit, OFF_HEAP_READER_ATTRIBUTES);
return DirectoryReader.open(commit);
}
@Override
@ -224,7 +219,7 @@ public class ReadOnlyEngine extends Engine {
final long localCheckpoint = Long.parseLong(infos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(), seqNo -> {})
config.getPrimaryTermSupplier(), seqNo -> {})
) {
return translog.stats();
}
@ -523,7 +518,7 @@ public class ReadOnlyEngine extends Engine {
protected static DirectoryReader openDirectory(Directory directory, boolean wrapSoftDeletes) throws IOException {
assert Transports.assertNotTransportThread("opening directory reader of a read-only engine");
final DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES);
final DirectoryReader reader = DirectoryReader.open(directory);
if (wrapSoftDeletes) {
return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
} else {

View File

@ -62,11 +62,8 @@ import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BaseDirectoryWrapper;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
@ -105,7 +102,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.codec.CodecService;
@ -131,9 +127,7 @@ import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.TestTranslog;
@ -5907,79 +5901,6 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(engine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L));
}
public void testGetReaderAttributes() throws IOException {
Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT);
Settings settings = settingsBuilder.build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
try(BaseDirectoryWrapper dir = newFSDirectory(createTempDir())) {
Directory unwrap = FilterDirectory.unwrap(dir);
boolean isMMap = unwrap instanceof MMapDirectory;
Map<String, String> readerAttributes = InternalEngine.getReaderAttributes(dir, indexSettings);
assertEquals(Collections.singletonMap("blocktree.terms.fst", isMMap ? "OFF_HEAP" : "ON_HEAP"), readerAttributes);
}
try(MMapDirectory dir = new MMapDirectory(createTempDir())) {
Map<String, String> readerAttributes =
InternalEngine.getReaderAttributes(randomBoolean() ? dir :
new MockDirectoryWrapper(random(), dir), indexSettings);
assertEquals(Collections.singletonMap("blocktree.terms.fst", "OFF_HEAP"), readerAttributes);
}
FsDirectoryFactory service = new FsDirectoryFactory();
Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0");
ShardPath path = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0));
try (Directory directory = service.newDirectory(indexSettings, path)) {
Map<String, String> readerAttributes =
InternalEngine.getReaderAttributes(randomBoolean() ? directory :
new MockDirectoryWrapper(random(), directory), indexSettings);
assertEquals(1, readerAttributes.size());
switch (IndexModule.defaultStoreType(true)) {
case HYBRIDFS:
case MMAPFS:
assertEquals("OFF_HEAP", readerAttributes.get("blocktree.terms.fst"));
break;
case NIOFS:
case SIMPLEFS:
case FS:
assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst"));
break;
default:
fail("unknownw type");
}
}
settingsBuilder.put(IndexSettings.ON_HEAP_ID_TERMS_INDEX.getKey(), true);
settings = settingsBuilder.build();
indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
try (Directory directory = service.newDirectory(indexSettings, path)) {
Map<String, String> readerAttributes =
InternalEngine.getReaderAttributes(randomBoolean() ? directory :
new MockDirectoryWrapper(random(), directory), indexSettings);
assertEquals(2, readerAttributes.size());
switch (IndexModule.defaultStoreType(true)) {
case HYBRIDFS:
case MMAPFS:
assertEquals("OFF_HEAP", readerAttributes.get("blocktree.terms.fst"));
assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst._id"));
break;
case NIOFS:
case SIMPLEFS:
case FS:
assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst"));
assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst._id"));
break;
default:
fail("unknownw type");
}
}
}
public void testPruneAwayDeletedButRetainedIds() throws Exception {
IOUtils.close(engine, store);
Settings settings = Settings.builder()

View File

@ -106,6 +106,7 @@ import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
@ -654,8 +655,8 @@ public class QueryPhaseTests extends IndexShardTestCase {
MapperService mapperService = mock(MapperService.class);
when(mapperService.fieldType(fieldNameLong)).thenReturn(fieldTypeLong);
when(mapperService.fieldType(fieldNameDate)).thenReturn(fieldTypeDate);
final int numDocs = 7000;
// enough docs to have a tree with several leaf nodes
final int numDocs = 3500 * 20;
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(null));
for (int i = 1; i <= numDocs; ++i) {
@ -820,7 +821,11 @@ public class QueryPhaseTests extends IndexShardTestCase {
// assert score docs are in order and their number is as expected
private void assertSortResults(TopDocs topDocs, long expectedNumDocs, boolean isDoubleSort) {
assertEquals(topDocs.totalHits.value, expectedNumDocs);
if (topDocs.totalHits.relation == TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO) {
assertThat(topDocs.totalHits.value, lessThanOrEqualTo(expectedNumDocs));
} else {
assertEquals(topDocs.totalHits.value, expectedNumDocs);
}
long cur1, cur2;
long prev1 = Long.MIN_VALUE;
long prev2 = Long.MIN_VALUE;

View File

@ -79,14 +79,6 @@ public final class CorruptionUtils {
}
try (FileChannel raf = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
long maxPosition = raf.size();
if (fileToCorrupt.getFileName().toString().endsWith(".cfs") && maxPosition > 4) {
// TODO: it is known that Lucene does not check the checksum of CFS file (CompoundFileS, like an archive)
// see note at https://github.com/elastic/elasticsearch/pull/33911
// so far, don't corrupt crc32 part of checksum (last 4 bytes) of cfs file
// checksum is 8 bytes: first 4 bytes have to be zeros, while crc32 value is not verified
maxPosition -= 4;
}
final int position = random.nextInt((int) Math.min(Integer.MAX_VALUE, maxPosition));
corruptAt(fileToCorrupt, raf, position);
}

View File

@ -461,17 +461,13 @@ public abstract class ESIntegTestCase extends ESTestCase {
if (random.nextBoolean()) {
// keep this low so we don't stall tests
builder.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(),
RandomNumbers.randomIntBetween(random, 1, 15) + "ms");
RandomNumbers.randomIntBetween(random, 1, 15) + "ms");
}
if (random.nextBoolean()) {
builder.put(Store.FORCE_RAM_TERM_DICT.getKey(), true);
}
if (random.nextBoolean()) {
builder.put(IndexSettings.ON_HEAP_ID_TERMS_INDEX.getKey(), random.nextBoolean());
}
return builder;
}
@ -495,20 +491,20 @@ public abstract class ESIntegTestCase extends ESTestCase {
private static Settings.Builder setRandomIndexTranslogSettings(Random random, Settings.Builder builder) {
if (random.nextBoolean()) {
builder.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),
new ByteSizeValue(RandomNumbers.randomIntBetween(random, 1, 300), ByteSizeUnit.MB));
new ByteSizeValue(RandomNumbers.randomIntBetween(random, 1, 300), ByteSizeUnit.MB));
}
if (random.nextBoolean()) {
builder.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),
new ByteSizeValue(1, ByteSizeUnit.PB)); // just don't flush
new ByteSizeValue(1, ByteSizeUnit.PB)); // just don't flush
}
if (random.nextBoolean()) {
builder.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(),
RandomPicks.randomFrom(random, Translog.Durability.values()));
RandomPicks.randomFrom(random, Translog.Durability.values()));
}
if (random.nextBoolean()) {
builder.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(),
RandomNumbers.randomIntBetween(random, 100, 5000), TimeUnit.MILLISECONDS);
RandomNumbers.randomIntBetween(random, 100, 5000), TimeUnit.MILLISECONDS);
}
return builder;
@ -734,7 +730,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
}
if (randomBoolean()) {
builder.put(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.getKey(), timeValueMillis(randomLongBetween(0, randomBoolean()
? 1000 : INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(Settings.EMPTY).millis())).getStringRep());
? 1000 : INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(Settings.EMPTY).millis())).getStringRep());
}
return builder.build();
}
@ -794,15 +790,15 @@ public abstract class ESIntegTestCase extends ESTestCase {
public CreateIndexRequestBuilder prepareCreate(String index, Settings.Builder settingsBuilder) {
return prepareCreate(index, -1, settingsBuilder);
}
/**
* Creates a new {@link CreateIndexRequestBuilder} with the settings obtained from {@link #indexSettings()}.
* The index that is created with this builder will only be allowed to allocate on the number of nodes passed to this
* method.
* <p>
* This method uses allocation deciders to filter out certain nodes to allocate the created index on. It defines allocation
* rules based on <code>index.routing.allocation.exclude._name</code>.
* </p>
*/
/**
* Creates a new {@link CreateIndexRequestBuilder} with the settings obtained from {@link #indexSettings()}.
* The index that is created with this builder will only be allowed to allocate on the number of nodes passed to this
* method.
* <p>
* This method uses allocation deciders to filter out certain nodes to allocate the created index on. It defines allocation
* rules based on <code>index.routing.allocation.exclude._name</code>.
* </p>
*/
public CreateIndexRequestBuilder prepareCreate(String index, int numNodes, Settings.Builder settingsBuilder) {
Settings.Builder builder = Settings.builder().put(indexSettings()).put(settingsBuilder.build());
@ -1064,7 +1060,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
// Check that the non-master node has the same version of the cluster state as the master and
// that the master node matches the master (otherwise there is no requirement for the cluster state to match)
if (masterClusterState.version() == localClusterState.version()
&& masterId.equals(localClusterState.nodes().getMasterNodeId())) {
&& masterId.equals(localClusterState.nodes().getMasterNodeId())) {
try {
assertEquals("cluster state UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
/*
@ -1079,21 +1075,21 @@ public abstract class ESIntegTestCase extends ESTestCase {
assertEquals("cluster state size does not match", masterClusterStateSize, localClusterStateSize);
// Compare JSON serialization
assertNull(
"cluster state JSON serialization does not match",
differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap));
"cluster state JSON serialization does not match",
differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap));
} else {
// remove non-core customs and compare the cluster states
assertNull(
"cluster state JSON serialization does not match (after removing some customs)",
differenceBetweenMapsIgnoringArrayOrder(
convertToMap(removePluginCustoms(masterClusterState)),
convertToMap(removePluginCustoms(localClusterState))));
"cluster state JSON serialization does not match (after removing some customs)",
differenceBetweenMapsIgnoringArrayOrder(
convertToMap(removePluginCustoms(masterClusterState)),
convertToMap(removePluginCustoms(localClusterState))));
}
} catch (final AssertionError error) {
logger.error(
"Cluster state from master:\n{}\nLocal cluster state:\n{}",
masterClusterState.toString(),
localClusterState.toString());
"Cluster state from master:\n{}\nLocal cluster state:\n{}",
masterClusterState.toString(),
localClusterState.toString());
throw error;
}
}
@ -1204,12 +1200,12 @@ public abstract class ESIntegTestCase extends ESTestCase {
}
private static final Set<String> SAFE_METADATA_CUSTOMS =
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(IndexGraveyard.TYPE, IngestMetadata.TYPE, RepositoriesMetadata.TYPE, ScriptMetadata.TYPE)));
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(IndexGraveyard.TYPE, IngestMetadata.TYPE, RepositoriesMetadata.TYPE, ScriptMetadata.TYPE)));
private static final Set<String> SAFE_CUSTOMS =
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(RestoreInProgress.TYPE, SnapshotDeletionsInProgress.TYPE, SnapshotsInProgress.TYPE)));
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(RestoreInProgress.TYPE, SnapshotDeletionsInProgress.TYPE, SnapshotsInProgress.TYPE)));
/**
* Remove any customs except for customs that we know all clients understand.
@ -1425,7 +1421,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
}
public void indexRandom(boolean forceRefresh, boolean dummyDocuments, IndexRequestBuilder... builders)
throws InterruptedException {
throws InterruptedException {
indexRandom(forceRefresh, dummyDocuments, Arrays.asList(builders));
}
@ -1461,7 +1457,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
* @param builders the documents to index.
*/
public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List<IndexRequestBuilder> builders)
throws InterruptedException {
throws InterruptedException {
indexRandom(forceRefresh, dummyDocuments, true, builders);
}
@ -1479,7 +1475,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
* @param builders the documents to index.
*/
public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean maybeFlush, List<IndexRequestBuilder> builders)
throws InterruptedException {
throws InterruptedException {
Random random = random();
Map<String, Set<String>> indicesAndTypes = new HashMap<>();
for (IndexRequestBuilder builder : builders) {
@ -1494,8 +1490,8 @@ public abstract class ESIntegTestCase extends ESTestCase {
final int unicodeLen = between(1, 10);
for (int i = 0; i < numBogusDocs; i++) {
String id = "bogus_doc_"
+ randomRealisticUnicodeOfLength(unicodeLen)
+ Integer.toString(dummmyDocIdGenerator.incrementAndGet());
+ randomRealisticUnicodeOfLength(unicodeLen)
+ Integer.toString(dummmyDocIdGenerator.incrementAndGet());
Map.Entry<String, Set<String>> indexAndTypes = RandomPicks.randomFrom(random, indicesAndTypes.entrySet());
String index = indexAndTypes.getKey();
String type = RandomPicks.randomFrom(random, indexAndTypes.getValue());
@ -1514,7 +1510,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), true, false);
for (IndexRequestBuilder indexRequestBuilder : builders) {
indexRequestBuilder.execute(
new PayloadLatchedActionListener<>(indexRequestBuilder, newLatch(inFlightAsyncOperations), errors));
new PayloadLatchedActionListener<>(indexRequestBuilder, newLatch(inFlightAsyncOperations), errors));
postIndexAsyncActions(indices, inFlightAsyncOperations, maybeFlush);
}
} else {
@ -1561,8 +1557,8 @@ public abstract class ESIntegTestCase extends ESTestCase {
}
if (forceRefresh) {
assertNoFailures(client().admin().indices().prepareRefresh(indices)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.get());
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.get());
}
}
@ -1597,7 +1593,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
* Maybe refresh, force merge, or flush then always make sure there aren't too many in flight async operations.
*/
private void postIndexAsyncActions(String[] indices, List<CountDownLatch> inFlightAsyncOperations, boolean maybeFlush)
throws InterruptedException {
throws InterruptedException {
if (rarely()) {
if (rarely()) {
client().admin().indices().prepareRefresh(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute(
@ -1612,10 +1608,10 @@ public abstract class ESIntegTestCase extends ESTestCase {
}
} else if (rarely()) {
client().admin().indices().prepareForceMerge(indices)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setMaxNumSegments(between(1, 10))
.setFlush(maybeFlush && randomBoolean())
.execute(new LatchedActionListener<>(newLatch(inFlightAsyncOperations)));
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setMaxNumSegments(between(1, 10))
.setFlush(maybeFlush && randomBoolean())
.execute(new LatchedActionListener<>(newLatch(inFlightAsyncOperations)));
}
}
while (inFlightAsyncOperations.size() > MAX_IN_FLIGHT_ASYNC_INDEXES) {
@ -1790,13 +1786,13 @@ public abstract class ESIntegTestCase extends ESTestCase {
private int getMinNumDataNodes() {
ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class);
return annotation == null || annotation.minNumDataNodes() == -1
? InternalTestCluster.DEFAULT_MIN_NUM_DATA_NODES : annotation.minNumDataNodes();
? InternalTestCluster.DEFAULT_MIN_NUM_DATA_NODES : annotation.minNumDataNodes();
}
private int getMaxNumDataNodes() {
ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class);
return annotation == null || annotation.maxNumDataNodes() == -1
? InternalTestCluster.DEFAULT_MAX_NUM_DATA_NODES : annotation.maxNumDataNodes();
? InternalTestCluster.DEFAULT_MAX_NUM_DATA_NODES : annotation.maxNumDataNodes();
}
private int getNumClientNodes() {

View File

@ -1,81 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.store.SimpleFSDirectory;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardPath;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.stream.Stream;
import static org.elasticsearch.test.CorruptionUtils.corruptAt;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
public class CorruptionUtilsTests extends IndexShardTestCase {
/**
* There is a dependency on Lucene bug fix
* https://github.com/elastic/elasticsearch/pull/33911
*/
public void testLuceneCheckIndexIgnoresLast4Bytes() throws Exception {
final IndexShard indexShard = newStartedShard(true);
final long numDocs = between(10, 100);
for (long i = 0; i < numDocs; i++) {
indexDoc(indexShard, "_doc", Long.toString(i), "{}");
}
indexShard.flush(new FlushRequest());
closeShards(indexShard);
final ShardPath shardPath = indexShard.shardPath();
final Path indexPath = shardPath.getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);
final Path cfsFile;
try (Stream<Path> paths = Files.walk(indexPath)) {
cfsFile = paths.filter(p -> p.getFileName().toString().endsWith(".cfs")).findFirst()
.orElseThrow(() -> new IllegalStateException("CFS file has to be there"));
}
try (FileChannel raf = FileChannel.open(cfsFile, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
assertThat(raf.size(), lessThan(Integer.MAX_VALUE * 1L));
final int maxPosition = (int) raf.size();
// corrupt only last 4 bytes!
final int position = randomIntBetween(maxPosition - 4, maxPosition - 1);
corruptAt(cfsFile, raf, position);
}
final CheckIndex.Status status;
try (CheckIndex checkIndex = new CheckIndex(new SimpleFSDirectory(indexPath))) {
status = checkIndex.checkIndex();
}
assertThat("That's a good news! "
+ "Lucene now validates CRC32 of CFS file: time to drop workaround at CorruptionUtils (and this test)",
status.clean, equalTo(true));
}
}

View File

@ -80,15 +80,15 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
@Inject
public TransportResumeFollowAction(
final ThreadPool threadPool,
final TransportService transportService,
final ActionFilters actionFilters,
final Client client,
final ClusterService clusterService,
final IndexNameExpressionResolver indexNameExpressionResolver,
final PersistentTasksService persistentTasksService,
final IndicesService indicesService,
final CcrLicenseChecker ccrLicenseChecker) {
final ThreadPool threadPool,
final TransportService transportService,
final ActionFilters actionFilters,
final Client client,
final ClusterService clusterService,
final IndexNameExpressionResolver indexNameExpressionResolver,
final PersistentTasksService persistentTasksService,
final IndicesService indicesService,
final CcrLicenseChecker ccrLicenseChecker) {
super(ResumeFollowAction.NAME, true, transportService, clusterService, threadPool, actionFilters,
ResumeFollowAction.Request::new, indexNameExpressionResolver);
this.client = client;
@ -161,20 +161,20 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
* </ul>
*/
void start(
ResumeFollowAction.Request request,
String clusterNameAlias,
IndexMetadata leaderIndexMetadata,
IndexMetadata followIndexMetadata,
String[] leaderIndexHistoryUUIDs,
ActionListener<AcknowledgedResponse> listener) throws IOException {
ResumeFollowAction.Request request,
String clusterNameAlias,
IndexMetadata leaderIndexMetadata,
IndexMetadata followIndexMetadata,
String[] leaderIndexHistoryUUIDs,
ActionListener<AcknowledgedResponse> listener) throws IOException {
MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null;
validate(request, leaderIndexMetadata, followIndexMetadata, leaderIndexHistoryUUIDs, mapperService);
final int numShards = followIndexMetadata.getNumberOfShards();
final ResponseHandler handler = new ResponseHandler(numShards, listener);
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
for (int shardId = 0; shardId < numShards; shardId++) {
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
@ -185,11 +185,11 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
}
static void validate(
final ResumeFollowAction.Request request,
final IndexMetadata leaderIndex,
final IndexMetadata followIndex,
final String[] leaderIndexHistoryUUID,
final MapperService followerMapperService) {
final ResumeFollowAction.Request request,
final IndexMetadata leaderIndex,
final IndexMetadata followIndex,
final String[] leaderIndexHistoryUUID,
final MapperService followerMapperService) {
FollowParameters parameters = request.getParameters();
Map<String, String> ccrIndexMetadata = followIndex.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
@ -224,18 +224,18 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
}
if (leaderIndex.getNumberOfShards() != followIndex.getNumberOfShards()) {
throw new IllegalArgumentException("leader index primary shards [" + leaderIndex.getNumberOfShards() +
"] does not match with the number of shards of the follow index [" + followIndex.getNumberOfShards() + "]");
"] does not match with the number of shards of the follow index [" + followIndex.getNumberOfShards() + "]");
}
if (leaderIndex.getRoutingNumShards() != followIndex.getRoutingNumShards()) {
throw new IllegalArgumentException("leader index number_of_routing_shards [" + leaderIndex.getRoutingNumShards() +
"] does not match with the number_of_routing_shards of the follow index [" + followIndex.getRoutingNumShards() + "]");
"] does not match with the number_of_routing_shards of the follow index [" + followIndex.getRoutingNumShards() + "]");
}
if (leaderIndex.getState() != IndexMetadata.State.OPEN || followIndex.getState() != IndexMetadata.State.OPEN) {
throw new IllegalArgumentException("leader and follow index must be open");
}
if (CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(followIndex.getSettings()) == false) {
throw new IllegalArgumentException("the following index [" + request.getFollowerIndex() + "] is not ready " +
"to follow; the setting [" + CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey() + "] must be enabled.");
"to follow; the setting [" + CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey() + "] must be enabled.");
}
// Make a copy, remove settings that are allowed to be different and then compare if the settings are equal.
Settings leaderSettings = filter(leaderIndex.getSettings());
@ -409,7 +409,6 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING);
nonReplicatedSettings.add(IndexSettings.INDEX_GC_DELETES_SETTING);
nonReplicatedSettings.add(IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD);
nonReplicatedSettings.add(IndexSettings.ON_HEAP_ID_TERMS_INDEX);
nonReplicatedSettings.add(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING);
nonReplicatedSettings.add(BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING);

View File

@ -6,7 +6,6 @@
package org.elasticsearch.snapshots;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.DocValuesType;
@ -39,6 +38,7 @@ import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.StringHelper;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.core.internal.io.IOUtils;
@ -96,8 +96,7 @@ public class SourceOnlySnapshot {
List<String> createdFiles = new ArrayList<>();
String segmentFileName;
try (Lock writeLock = targetDirectory.obtainLock(IndexWriter.WRITE_LOCK_NAME);
StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(commit,
Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY, BlockTreeTermsReader.FSTLoadMode.OFF_HEAP.name()))) {
StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(commit)) {
SegmentInfos segmentInfos = reader.getSegmentInfos().clone();
DirectoryReader wrappedReader = wrapReader(reader);
List<SegmentCommitInfo> newInfos = new ArrayList<>();
@ -116,7 +115,7 @@ public class SourceOnlySnapshot {
String pendingSegmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.PENDING_SEGMENTS,
"", segmentInfos.getGeneration());
try (IndexOutput segnOutput = targetDirectory.createOutput(pendingSegmentFileName, IOContext.DEFAULT)) {
segmentInfos.write(targetDirectory, segnOutput);
segmentInfos.write(segnOutput);
}
targetDirectory.sync(Collections.singleton(pendingSegmentFileName));
targetDirectory.sync(createdFiles);
@ -219,7 +218,7 @@ public class SourceOnlySnapshot {
SegmentInfo newSegmentInfo = new SegmentInfo(targetDirectory, si.getVersion(), si.getMinVersion(), si.name, si.maxDoc(),
false, si.getCodec(), si.getDiagnostics(), si.getId(), si.getAttributes(), null);
// we drop the sort on purpose since the field we sorted on doesn't exist in the target index anymore.
newInfo = new SegmentCommitInfo(newSegmentInfo, 0, 0, -1, -1, -1);
newInfo = new SegmentCommitInfo(newSegmentInfo, 0, 0, -1, -1, -1, StringHelper.randomId());
List<FieldInfo> fieldInfoCopy = new ArrayList<>(fieldInfos.size());
for (FieldInfo fieldInfo : fieldInfos) {
fieldInfoCopy.add(new FieldInfo(fieldInfo.name, fieldInfo.number,
@ -254,7 +253,8 @@ public class SourceOnlySnapshot {
assert newInfo.getDelCount() == 0 || assertLiveDocs(liveDocs.bits, liveDocs.numDeletes);
codec.liveDocsFormat().writeLiveDocs(liveDocs.bits, trackingDir, newInfo, liveDocs.numDeletes - newInfo.getDelCount(),
IOContext.DEFAULT);
SegmentCommitInfo info = new SegmentCommitInfo(newInfo.info, liveDocs.numDeletes, 0, newInfo.getNextDelGen(), -1, -1);
SegmentCommitInfo info = new SegmentCommitInfo(newInfo.info, liveDocs.numDeletes, 0, newInfo.getNextDelGen(),
-1, -1, StringHelper.randomId());
info.setFieldInfosFiles(newInfo.getFieldInfosFiles());
info.info.setFiles(trackingDir.getCreatedFiles());
newInfo = info;

View File

@ -6,7 +6,6 @@
package org.elasticsearch.snapshots;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
@ -48,7 +47,6 @@ import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -107,7 +105,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
try {
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, metadataToSnapshot(shardGenerations.indices(), metadata), userMetadata, repositoryMetaVersion,
stateTransformer, listener);
stateTransformer, listener);
} catch (IOException ex) {
listener.onFailure(ex);
}
@ -179,8 +177,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
tempStore.bootstrapNewHistory(maxDoc, maxDoc);
store.incRef();
toClose.add(store::decRef);
DirectoryReader reader = DirectoryReader.open(tempStore.directory(),
Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY, BlockTreeTermsReader.FSTLoadMode.OFF_HEAP.name()));
DirectoryReader reader = DirectoryReader.open(tempStore.directory());
toClose.add(reader);
IndexCommit indexCommit = reader.getIndexCommit();
super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, shardStateIdentifier, snapshotStatus,

View File

@ -128,16 +128,15 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
}
@Override
protected void readInternal(final byte[] buffer, final int offset, final int length) throws IOException {
protected void readInternal(ByteBuffer b) throws IOException {
ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT);
final long position = getFilePointer() + this.offset;
final int length = b.remaining();
int totalBytesRead = 0;
while (totalBytesRead < length) {
final long pos = position + totalBytesRead;
final int off = offset + totalBytesRead;
final int len = length - totalBytesRead;
int bytesRead = 0;
try {
final CacheFile cacheFile = getCacheFileSafe();
@ -146,7 +145,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
bytesRead = cacheFile.fetchRange(
range.v1(),
range.v2(),
(start, end) -> readCacheFile(cacheFile.getChannel(), end, pos, buffer, off, len),
(start, end) -> readCacheFile(cacheFile.getChannel(), end, pos, b, len),
(start, end) -> writeCacheFile(cacheFile.getChannel(), start, end)
).get();
}
@ -154,7 +153,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
if (e instanceof AlreadyClosedException || (e.getCause() != null && e.getCause() instanceof AlreadyClosedException)) {
try {
// cache file was evicted during the range fetching, read bytes directly from source
bytesRead = readDirectly(pos, pos + len, buffer, off);
bytesRead = readDirectly(pos, pos + len, b);
continue;
} catch (Exception inner) {
e.addSuppressed(inner);
@ -319,9 +318,20 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
return true;
}
private int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int offset, long length) throws IOException {
private int readCacheFile(FileChannel fc, long end, long position, ByteBuffer b, long length) throws IOException {
assert assertFileChannelOpen(fc);
int bytesRead = Channels.readFromFileChannel(fc, position, buffer, offset, Math.toIntExact(Math.min(length, end - position)));
final int bytesRead;
assert b.remaining() == length;
if (end - position < b.remaining()) {
final ByteBuffer duplicate = b.duplicate();
duplicate.limit(b.position() + Math.toIntExact(end - position));
bytesRead = Channels.readFromFileChannel(fc, position, duplicate);
assert duplicate.position() < b.limit();
b.position(duplicate.position());
} else {
bytesRead = Channels.readFromFileChannel(fc, position, b);
}
if (bytesRead == -1) {
throw new EOFException(
String.format(Locale.ROOT, "unexpected EOF reading [%d-%d] from %s", position, position + length, cacheFileReference)
@ -416,7 +426,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
+ '}';
}
private int readDirectly(long start, long end, byte[] buffer, int offset) throws IOException {
private int readDirectly(long start, long end, ByteBuffer b) throws IOException {
final long length = end - start;
final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))];
logger.trace(() -> new ParameterizedMessage("direct reading of range [{}-{}] for cache file [{}]", start, end, cacheFileReference));
@ -440,7 +450,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
)
);
}
System.arraycopy(copyBuffer, 0, buffer, offset + bytesCopied, bytesRead);
b.put(copyBuffer, 0, bytesRead);
bytesCopied += bytesRead;
remaining -= bytesRead;
}

View File

@ -21,6 +21,7 @@ import java.io.EOFException;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.atomic.LongAdder;
@ -39,7 +40,7 @@ import java.util.concurrent.atomic.LongAdder;
*
* {@link DirectBlobContainerIndexInput} maintains a global position that indicates the current position in the Lucene file where the
* next read will occur. In the case of a Lucene file snapshotted into multiple parts, this position is used to identify which part must
* be read at which position (see {@link #readInternal(byte[], int, int)}. This position is also passed over to cloned and sliced input
* be read at which position (see {@link #readInternal(ByteBuffer)}. This position is also passed over to cloned and sliced input
* along with the {@link FileInfo} so that they can also track their reading position.
*
* The {@code sequentialReadSize} constructor parameter configures the {@link DirectBlobContainerIndexInput} to perform a larger read on the
@ -56,6 +57,7 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
private StreamForSequentialReads streamForSequentialReads;
private long sequentialReadSize;
private static final long NO_SEQUENTIAL_READ_OPTIMIZATION = 0L;
private static final int COPY_BUFFER_SIZE = 8192;
public DirectBlobContainerIndexInput(
BlobContainer blobContainer,
@ -99,14 +101,12 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
}
@Override
protected void readInternal(byte[] b, int offset, int length) throws IOException {
protected void readInternal(ByteBuffer b) throws IOException {
ensureOpen();
if (fileInfo.numberOfParts() == 1L) {
readInternalBytes(0, position, b, offset, length);
readInternalBytes(0, position, b, b.remaining());
} else {
int len = length;
int off = offset;
while (len > 0) {
while (b.hasRemaining()) {
int currentPart = Math.toIntExact(position / fileInfo.partSize().getBytes());
int remainingBytesInPart;
if (currentPart < (fileInfo.numberOfParts() - 1)) {
@ -114,16 +114,14 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
} else {
remainingBytesInPart = Math.toIntExact(fileInfo.length() - position);
}
final int read = Math.min(len, remainingBytesInPart);
readInternalBytes(currentPart, position % fileInfo.partSize().getBytes(), b, off, read);
len -= read;
off += read;
final int read = Math.min(b.remaining(), remainingBytesInPart);
readInternalBytes(currentPart, position % fileInfo.partSize().getBytes(), b, read);
}
}
}
private void readInternalBytes(final int part, long pos, final byte[] b, int offset, int length) throws IOException {
int optimizedReadSize = readOptimized(part, pos, b, offset, length);
private void readInternalBytes(final int part, long pos, final ByteBuffer b, int length) throws IOException {
int optimizedReadSize = readOptimized(part, pos, b, length);
assert optimizedReadSize <= length;
position += optimizedReadSize;
@ -134,7 +132,6 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
final int directReadSize = readFully(
inputStream,
b,
offset + optimizedReadSize,
length - optimizedReadSize,
() -> { throw new EOFException("Read past EOF at [" + position + "] with length [" + fileInfo.partBytes(part) + "]"); }
);
@ -150,7 +147,7 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
* Attempt to satisfy this read in an optimized fashion using {@code streamForSequentialReadsRef}.
* @return the number of bytes read
*/
private int readOptimized(int part, long pos, byte[] b, int offset, int length) throws IOException {
private int readOptimized(int part, long pos, ByteBuffer b, int length) throws IOException {
if (sequentialReadSize == NO_SEQUENTIAL_READ_OPTIMIZATION) {
return 0;
}
@ -158,10 +155,10 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
int read = 0;
if (streamForSequentialReads == null) {
// starting a new sequential read
read = readFromNewSequentialStream(part, pos, b, offset, length);
read = readFromNewSequentialStream(part, pos, b, length);
} else if (streamForSequentialReads.canContinueSequentialRead(part, pos)) {
// continuing a sequential read that we started previously
read = streamForSequentialReads.read(b, offset, length);
read = streamForSequentialReads.read(b, length);
if (streamForSequentialReads.isFullyRead()) {
// the current stream was exhausted by this read, so it should be closed
streamForSequentialReads.close();
@ -173,7 +170,7 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
if (read < length) {
// the current stream didn't contain enough data for this read, so we must read more
read += readFromNewSequentialStream(part, pos + read, b, offset + read, length - read);
read += readFromNewSequentialStream(part, pos + read, b, length - read);
}
} else {
// not a sequential read, so stop optimizing for this usage pattern and fall through to the unoptimized behaviour
@ -196,7 +193,7 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
* If appropriate, open a new stream for sequential reading and satisfy the given read using it.
* @return the number of bytes read; if a new stream wasn't opened then nothing was read so the caller should perform the read directly.
*/
private int readFromNewSequentialStream(int part, long pos, byte[] b, int offset, int length) throws IOException {
private int readFromNewSequentialStream(int part, long pos, ByteBuffer b, int length) throws IOException {
assert streamForSequentialReads == null : "should only be called when a new stream is needed";
assert sequentialReadSize > 0L : "should only be called if optimizing sequential reads";
@ -243,7 +240,7 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
}
}, part, pos, streamLength);
final int read = streamForSequentialReads.read(b, offset, length);
final int read = streamForSequentialReads.read(b, length);
assert read == length : read + " vs " + length;
assert streamForSequentialReads.isFullyRead() == false;
return read;
@ -347,15 +344,18 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
/**
* Fully read up to {@code length} bytes from the given {@link InputStream}
*/
private static int readFully(InputStream inputStream, byte[] b, int offset, int length, CheckedRunnable<IOException> onEOF)
private static int readFully(InputStream inputStream, final ByteBuffer b, int length, CheckedRunnable<IOException> onEOF)
throws IOException {
int totalRead = 0;
final byte[] buffer = new byte[Math.min(length, COPY_BUFFER_SIZE)];
while (totalRead < length) {
final int read = inputStream.read(b, offset + totalRead, length - totalRead);
final int len = Math.min(length - totalRead, COPY_BUFFER_SIZE);
final int read = inputStream.read(buffer, 0, len);
if (read == -1) {
onEOF.run();
break;
}
b.put(buffer, 0, read);
totalRead += read;
}
return totalRead > 0 ? totalRead : -1;
@ -378,9 +378,9 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
return this.part == part && this.pos == pos;
}
int read(byte[] b, int offset, int length) throws IOException {
int read(ByteBuffer b, int length) throws IOException {
assert this.pos < maxPos : "should not try and read from a fully-read stream";
final int read = readFully(inputStream, b, offset, length, () -> {});
final int read = readFully(inputStream, b, length, () -> {});
assert read <= length : read + " vs " + length;
pos += read;
return read;

View File

@ -1 +0,0 @@
24212de43c19269f5211f3e79eb2f414c4a0254b

View File

@ -0,0 +1 @@
ec8401dfd1b41113eba06f3d626a1f2fdf589335