Some Cleanup in o.e.i.engine (#42278) (#42566)

* Some Cleanup in o.e.i.engine

* Remove dead code and parameters
* Reduce visibility in some obvious spots
* Add missing `assert`s (not that important here since the methods
themselves will probably be dead-code eliminated) but still
This commit is contained in:
Armin Braun 2019-05-27 11:04:54 +02:00 committed by GitHub
parent d2cd36bd9f
commit a5ca20a250
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 50 additions and 146 deletions

View File

@ -911,7 +911,7 @@ public abstract class Engine implements Closeable {
map.put(extension, length);
}
if (useCompoundFile && directory != null) {
if (useCompoundFile) {
try {
directory.close();
} catch (IOException e) {
@ -954,8 +954,7 @@ public abstract class Engine implements Closeable {
// now, correlate or add the committed ones...
if (lastCommittedSegmentInfos != null) {
SegmentInfos infos = lastCommittedSegmentInfos;
for (SegmentCommitInfo info : infos) {
for (SegmentCommitInfo info : lastCommittedSegmentInfos) {
Segment segment = segments.get(info.info.name);
if (segment == null) {
segment = new Segment(info.info.name);
@ -1783,11 +1782,8 @@ public abstract class Engine implements Closeable {
CommitId commitId = (CommitId) o;
if (!Arrays.equals(id, commitId.id)) {
return false;
}
return Arrays.equals(id, commitId.id);
return true;
}
@Override

View File

@ -563,7 +563,7 @@ public class InternalEngine extends Engine {
/**
* Reads the current stored history ID from the IW commit data.
*/
private String loadHistoryUUID(final IndexWriter writer) throws IOException {
private String loadHistoryUUID(final IndexWriter writer) {
final String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY);
if (uuid == null) {
throw new IllegalStateException("commit doesn't contain history uuid");
@ -635,9 +635,8 @@ public class InternalEngine extends Engine {
if (operation != null) {
// in the case of a already pruned translog generation we might get null here - yet very unlikely
final Translog.Index index = (Translog.Index) operation;
TranslogLeafReader reader = new TranslogLeafReader(index, engineConfig
.getIndexSettings().getIndexVersionCreated());
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), reader::close),
TranslogLeafReader reader = new TranslogLeafReader(index);
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), reader),
new VersionsAndSeqNoResolver.DocIdAndVersion(0, index.version(), index.seqNo(), index.primaryTerm(),
reader, 0));
}
@ -756,7 +755,7 @@ public class InternalEngine extends Engine {
+ index.getAutoGeneratedIdTimestamp();
switch (index.origin()) {
case PRIMARY:
assertPrimaryCanOptimizeAddDocument(index);
assert assertPrimaryCanOptimizeAddDocument(index);
return true;
case PEER_RECOVERY:
case REPLICA:
@ -782,7 +781,7 @@ public class InternalEngine extends Engine {
private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
if (origin == Operation.Origin.PRIMARY) {
assertPrimaryIncomingSequenceNumber(origin, seqNo);
assert assertPrimaryIncomingSequenceNumber(origin, seqNo);
} else {
// sequence number should be set when operation origin is not primary
assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no.; origin: " + origin;
@ -923,7 +922,7 @@ public class InternalEngine extends Engine {
}
protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
assertNonPrimaryOrigin(index);
assert assertNonPrimaryOrigin(index);
final IndexingStrategy plan;
final boolean appendOnlyRequest = canOptimizeAddDocument(index);
if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) {
@ -978,13 +977,13 @@ public class InternalEngine extends Engine {
}
}
protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
final IndexingStrategy plan;
// resolve an external operation into an internal one which is safe to replay
if (canOptimizeAddDocument(index)) {
if (mayHaveBeenIndexedBefore(index)) {
plan = IndexingStrategy.overrideExistingAsIfNotThere(1L);
plan = IndexingStrategy.overrideExistingAsIfNotThere();
versionMap.enforceSafeAccess();
} else {
plan = IndexingStrategy.optimizedAppendOnly(1L);
@ -1006,7 +1005,7 @@ public class InternalEngine extends Engine {
if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, index.id(),
index.getIfSeqNo(), index.getIfPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm());
plan = IndexingStrategy.skipDueToVersionConflict(e, true, currentVersion, getPrimaryTerm());
} else if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionValue.seqNo != index.getIfSeqNo() || versionValue.term != index.getIfPrimaryTerm()
)) {
@ -1164,9 +1163,9 @@ public class InternalEngine extends Engine {
true, false, versionForIndexing, null);
}
static IndexingStrategy overrideExistingAsIfNotThere(long versionForIndexing) {
static IndexingStrategy overrideExistingAsIfNotThere() {
return new IndexingStrategy(true, true, true,
false, versionForIndexing, null);
false, 1L, null);
}
public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long versionForIndexing) {
@ -1285,7 +1284,7 @@ public class InternalEngine extends Engine {
}
protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
assertNonPrimaryOrigin(delete);
assert assertNonPrimaryOrigin(delete);
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr));
assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" +
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]";
@ -1305,7 +1304,7 @@ public class InternalEngine extends Engine {
} else {
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.version());
plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, delete.version());
} else {
plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.version());
}
@ -1318,7 +1317,7 @@ public class InternalEngine extends Engine {
return true;
}
protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
// resolve operation from external to internal
final VersionValue versionValue = resolveDocVersion(delete, delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO);
@ -1336,7 +1335,7 @@ public class InternalEngine extends Engine {
if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete.id(),
delete.getIfSeqNo(), delete.getIfPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), true);
} else if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionValue.seqNo != delete.getIfSeqNo() || versionValue.term != delete.getIfPrimaryTerm()
)) {
@ -1428,8 +1427,8 @@ public class InternalEngine extends Engine {
return new DeletionStrategy(false, false, currentlyDeleted, versionOfDeletion, null);
}
static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, boolean currentlyDeleted, long versionOfDeletion) {
return new DeletionStrategy(false, addStaleOpToLucene, currentlyDeleted, versionOfDeletion, null);
static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, long versionOfDeletion) {
return new DeletionStrategy(false, addStaleOpToLucene, false, versionOfDeletion, null);
}
}

View File

@ -234,7 +234,7 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
/**
* Tracks bytes used by tombstones (deletes)
*/
final AtomicLong ramBytesUsedTombstones = new AtomicLong();
private final AtomicLong ramBytesUsedTombstones = new AtomicLong();
@Override
public void beforeRefresh() throws IOException {

View File

@ -188,8 +188,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
int readerIndex = 0;
CombinedDocValues combinedDocValues = null;
LeafReaderContext leaf = null;
for (int i = 0; i < scoreDocs.length; i++) {
ScoreDoc scoreDoc = scoreDocs[i];
for (ScoreDoc scoreDoc : scoreDocs) {
if (scoreDoc.doc >= docBase + maxDoc) {
do {
leaf = leaves.get(readerIndex++);

View File

@ -457,8 +457,8 @@ public class ReadOnlyEngine extends Engine {
}
protected void processReaders(IndexReader reader, IndexReader previousReader) {
searcherFactory.processReaders(reader, previousReader);
protected void processReader(IndexReader reader) {
searcherFactory.processReaders(reader, null);
}
@Override

View File

@ -1,65 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.store.Store;
import java.util.concurrent.atomic.AtomicInteger;
/**
* RecoveryCounter keeps tracks of the number of ongoing recoveries for a
* particular {@link Store}
*/
public class RecoveryCounter implements Releasable {
private final Store store;
RecoveryCounter(Store store) {
this.store = store;
}
private final AtomicInteger onGoingRecoveries = new AtomicInteger();
void startRecovery() {
store.incRef();
onGoingRecoveries.incrementAndGet();
}
public int get() {
return onGoingRecoveries.get();
}
/**
* End the recovery counter by decrementing the store's ref and the ongoing recovery counter
* @return number of ongoing recoveries remaining
*/
int endRecovery() {
store.decRef();
int left = onGoingRecoveries.decrementAndGet();
assert onGoingRecoveries.get() >= 0 : "ongoingRecoveries must be >= 0 but was: " + onGoingRecoveries.get();
return left;
}
@Override
public void close() {
endRecovery();
}
}

View File

@ -58,8 +58,7 @@ final class RecoverySourcePruneMergePolicy extends OneMergeWrappingMergePolicy {
});
}
// pkg private for testing
static CodecReader wrapReader(String recoverySourceField, CodecReader reader, Supplier<Query> retainSourceQuerySupplier)
private static CodecReader wrapReader(String recoverySourceField, CodecReader reader, Supplier<Query> retainSourceQuerySupplier)
throws IOException {
NumericDocValues recoverySource = reader.getNumericDocValues(recoverySourceField);
if (recoverySource == null || recoverySource.nextDoc() == DocIdSetIterator.NO_MORE_DOCS) {

View File

@ -40,6 +40,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class Segment implements Streamable {
@ -94,10 +95,6 @@ public class Segment implements Streamable {
return new ByteSizeValue(sizeInBytes);
}
public long getSizeInBytes() {
return this.sizeInBytes;
}
public org.apache.lucene.util.Version getVersion() {
return version;
}
@ -145,9 +142,8 @@ public class Segment implements Streamable {
Segment segment = (Segment) o;
if (name != null ? !name.equals(segment.name) : segment.name != null) return false;
return Objects.equals(name, segment.name);
return true;
}
@Override
@ -220,7 +216,7 @@ public class Segment implements Streamable {
}
}
Sort readSegmentSort(StreamInput in) throws IOException {
private Sort readSegmentSort(StreamInput in) throws IOException {
int size = in.readVInt();
if (size == 0) {
return null;
@ -271,7 +267,7 @@ public class Segment implements Streamable {
return new Sort(fields);
}
void writeSegmentSort(StreamOutput out, Sort sort) throws IOException {
private void writeSegmentSort(StreamOutput out, Sort sort) throws IOException {
if (sort == null) {
out.writeVInt(0);
return;
@ -311,14 +307,14 @@ public class Segment implements Streamable {
}
}
Accountable readRamTree(StreamInput in) throws IOException {
private Accountable readRamTree(StreamInput in) throws IOException {
final String name = in.readString();
final long bytes = in.readVLong();
int numChildren = in.readVInt();
if (numChildren == 0) {
return Accountables.namedAccountable(name, bytes);
}
List<Accountable> children = new ArrayList(numChildren);
List<Accountable> children = new ArrayList<>(numChildren);
while (numChildren-- > 0) {
children.add(readRamTree(in));
}
@ -326,7 +322,7 @@ public class Segment implements Streamable {
}
// the ram tree is written recursively since the depth is fairly low (5 or 6)
void writeRamTree(StreamOutput out, Accountable tree) throws IOException {
private void writeRamTree(StreamOutput out, Accountable tree) throws IOException {
out.writeString(tree.toString());
out.writeVLong(tree.ramBytesUsed());
Collection<Accountable> children = tree.getChildResources();

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Iterator;
public class SegmentsStats implements Streamable, Writeable, ToXContentFragment {
@ -54,7 +53,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
* Ideally this should be in sync to what the current version of Lucene is using, but it's harmless to leave extensions out,
* they'll just miss a proper description in the stats
*/
private static ImmutableOpenMap<String, String> fileDescriptions = ImmutableOpenMap.<String, String>builder()
private static final ImmutableOpenMap<String, String> FILE_DESCRIPTIONS = ImmutableOpenMap.<String, String>builder()
.fPut("si", "Segment Info")
.fPut("fnm", "Fields")
.fPut("fdx", "Field Index")
@ -150,8 +149,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
public void addFileSizes(ImmutableOpenMap<String, Long> fileSizes) {
ImmutableOpenMap.Builder<String, Long> map = ImmutableOpenMap.builder(this.fileSizes);
for (Iterator<ObjectObjectCursor<String, Long>> it = fileSizes.iterator(); it.hasNext();) {
ObjectObjectCursor<String, Long> entry = it.next();
for (ObjectObjectCursor<String, Long> entry : fileSizes) {
if (map.containsKey(entry.key)) {
Long oldValue = map.get(entry.key);
map.put(entry.key, oldValue + entry.value);
@ -206,7 +204,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
return this.termsMemoryInBytes;
}
public ByteSizeValue getTermsMemory() {
private ByteSizeValue getTermsMemory() {
return new ByteSizeValue(termsMemoryInBytes);
}
@ -217,7 +215,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
return this.storedFieldsMemoryInBytes;
}
public ByteSizeValue getStoredFieldsMemory() {
private ByteSizeValue getStoredFieldsMemory() {
return new ByteSizeValue(storedFieldsMemoryInBytes);
}
@ -228,7 +226,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
return this.termVectorsMemoryInBytes;
}
public ByteSizeValue getTermVectorsMemory() {
private ByteSizeValue getTermVectorsMemory() {
return new ByteSizeValue(termVectorsMemoryInBytes);
}
@ -239,7 +237,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
return this.normsMemoryInBytes;
}
public ByteSizeValue getNormsMemory() {
private ByteSizeValue getNormsMemory() {
return new ByteSizeValue(normsMemoryInBytes);
}
@ -250,7 +248,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
return this.pointsMemoryInBytes;
}
public ByteSizeValue getPointsMemory() {
private ByteSizeValue getPointsMemory() {
return new ByteSizeValue(pointsMemoryInBytes);
}
@ -261,7 +259,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
return this.docValuesMemoryInBytes;
}
public ByteSizeValue getDocValuesMemory() {
private ByteSizeValue getDocValuesMemory() {
return new ByteSizeValue(docValuesMemoryInBytes);
}
@ -326,11 +324,10 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
builder.humanReadableField(Fields.FIXED_BIT_SET_MEMORY_IN_BYTES, Fields.FIXED_BIT_SET, getBitsetMemory());
builder.field(Fields.MAX_UNSAFE_AUTO_ID_TIMESTAMP, maxUnsafeAutoIdTimestamp);
builder.startObject(Fields.FILE_SIZES);
for (Iterator<ObjectObjectCursor<String, Long>> it = fileSizes.iterator(); it.hasNext();) {
ObjectObjectCursor<String, Long> entry = it.next();
for (ObjectObjectCursor<String, Long> entry : fileSizes) {
builder.startObject(entry.key);
builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, new ByteSizeValue(entry.value));
builder.field(Fields.DESCRIPTION, fileDescriptions.getOrDefault(entry.key, "Others"));
builder.field(Fields.DESCRIPTION, FILE_DESCRIPTIONS.getOrDefault(entry.key, "Others"));
builder.endObject();
}
builder.endObject();
@ -391,7 +388,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
out.writeVInt(fileSizes.size());
for (ObjectObjectCursor<String, Long> entry : fileSizes) {
out.writeString(entry.key);
out.writeLong(entry.value.longValue());
out.writeLong(entry.value);
}
}

View File

@ -20,16 +20,11 @@
package org.elasticsearch.index.engine;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
public class SnapshotFailedEngineException extends EngineException {
public SnapshotFailedEngineException(ShardId shardId, Throwable cause) {
super(shardId, "Snapshot failed", cause);
}
public SnapshotFailedEngineException(StreamInput in) throws IOException{
super(in);
}

View File

@ -35,7 +35,6 @@ import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.index.Terms;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
@ -61,11 +60,9 @@ final class TranslogLeafReader extends LeafReader {
private static final FieldInfo FAKE_ID_FIELD
= new FieldInfo(IdFieldMapper.NAME, 3, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(),
0, 0, 0, false);
private final Version indexVersionCreated;
TranslogLeafReader(Translog.Index operation, Version indexVersionCreated) {
TranslogLeafReader(Translog.Index operation) {
this.operation = operation;
this.indexVersionCreated = indexVersionCreated;
}
@Override
public CacheHelper getCoreCacheHelper() {
@ -161,14 +158,9 @@ final class TranslogLeafReader extends LeafReader {
visitor.stringField(FAKE_ROUTING_FIELD, operation.routing().getBytes(StandardCharsets.UTF_8));
}
if (visitor.needsField(FAKE_ID_FIELD) == StoredFieldVisitor.Status.YES) {
final byte[] id;
if (indexVersionCreated.onOrAfter(Version.V_6_0_0)) {
BytesRef bytesRef = Uid.encodeId(operation.id());
id = new byte[bytesRef.length];
final byte[] id = new byte[bytesRef.length];
System.arraycopy(bytesRef.bytes, bytesRef.offset, id, 0, bytesRef.length);
} else { // TODO this can go away in 7.0 after backport
id = operation.id().getBytes(StandardCharsets.UTF_8);
}
visitor.stringField(FAKE_ID_FIELD, id);
}
}

View File

@ -42,11 +42,7 @@ public class VersionConflictEngineException extends EngineException {
}
public VersionConflictEngineException(ShardId shardId, String id, String explanation) {
this(shardId, null, id, explanation);
}
public VersionConflictEngineException(ShardId shardId, Throwable cause, String id, String explanation) {
this(shardId, "[{}]: version conflict, {}", cause, id, explanation);
this(shardId, "[{}]: version conflict, {}", null, id, explanation);
}
public VersionConflictEngineException(ShardId shardId, String msg, Throwable cause, Object... params) {

View File

@ -169,7 +169,7 @@ public final class FrozenEngine extends ReadOnlyEngine {
listeners.beforeRefresh();
}
reader = DirectoryReader.open(engineConfig.getStore().directory());
processReaders(reader, null);
processReader(reader);
reader = lastOpenedReader = wrapReader(reader, Function.identity());
reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed);
for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) {