Allow _update and upsert to read from the transaction log (#29264)

We historically removed reading from the transaction log to get consistent
results from _GET calls. There was also the motivation that the read-modify-update
principle we apply should not be hidden from the user. We still agree on the fact
that we should not hide these aspects but the impact on updates is quite significant
especially if the same documents is updated before it's written to disk and made serachable.

This change adds back the ability to read from the transaction log but only for update calls.
Calls to the _GET API will always do a refresh if necessary to return consistent results ie.
if stored fields or DocValues Fields are requested.

Closes #26802
This commit is contained in:
Simon Willnauer 2018-03-28 18:03:34 +02:00 committed by GitHub
parent c3fdf8fbfb
commit 13e19e7428
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 602 additions and 45 deletions

View File

@ -112,13 +112,13 @@ public class TransportExplainAction extends TransportSingleShardAction<ExplainRe
if (uidTerm == null) {
return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), false);
}
result = context.indexShard().get(new Engine.Get(false, request.type(), request.id(), uidTerm));
result = context.indexShard().get(new Engine.Get(false, false, request.type(), request.id(), uidTerm));
if (!result.exists()) {
return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), false);
}
context.parsedQuery(context.getQueryShardContext().toQuery(request.query()));
context.preProcess(true);
int topLevelDocId = result.docIdAndVersion().docId + result.docIdAndVersion().context.docBase;
int topLevelDocId = result.docIdAndVersion().docId + result.docIdAndVersion().docBase;
Explanation explanation = context.searcher().explain(context.query(), topLevelDocId);
for (RescoreContext ctx : context.rescore()) {
Rescorer rescorer = ctx.rescorer();

View File

@ -47,7 +47,6 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.lookup.SourceLookup;
import java.io.IOException;
@ -71,9 +70,8 @@ public class UpdateHelper extends AbstractComponent {
* Prepares an update request by converting it into an index or delete request or an update response (no action).
*/
public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) {
final GetResult getResult = indexShard.getService().get(request.type(), request.id(),
new String[]{RoutingFieldMapper.NAME, ParentFieldMapper.NAME},
true, request.version(), request.versionType(), FetchSourceContext.FETCH_SOURCE);
final GetResult getResult = indexShard.getService().getForUpdate(request.type(), request.id(), request.version(),
request.versionType());
return prepare(indexShard.shardId(), request, getResult, nowInMillis);
}

View File

@ -100,7 +100,7 @@ final class PerThreadIDVersionAndSeqNoLookup {
if (versions.advanceExact(docID) == false) {
throw new IllegalArgumentException("Document [" + docID + "] misses the [" + VersionFieldMapper.NAME + "] field");
}
return new DocIdAndVersion(docID, versions.longValue(), context);
return new DocIdAndVersion(docID, versions.longValue(), context.reader(), context.docBase);
} else {
return null;
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.common.lucene.uid;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.Term;
@ -97,12 +98,14 @@ public final class VersionsAndSeqNoResolver {
public static class DocIdAndVersion {
public final int docId;
public final long version;
public final LeafReaderContext context;
public final LeafReader reader;
public final int docBase;
DocIdAndVersion(int docId, long version, LeafReaderContext context) {
public DocIdAndVersion(int docId, long version, LeafReader reader, int docBase) {
this.docId = docId;
this.version = version;
this.context = context;
this.reader = reader;
this.docBase = docBase;
}
}

View File

@ -1232,14 +1232,16 @@ public abstract class Engine implements Closeable {
private final boolean realtime;
private final Term uid;
private final String type, id;
private final boolean readFromTranslog;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
public Get(boolean realtime, String type, String id, Term uid) {
public Get(boolean realtime, boolean readFromTranslog, String type, String id, Term uid) {
this.realtime = realtime;
this.type = type;
this.id = id;
this.uid = uid;
this.readFromTranslog = readFromTranslog;
}
public boolean realtime() {
@ -1275,6 +1277,10 @@ public abstract class Engine implements Closeable {
this.versionType = versionType;
return this;
}
public boolean isReadFromTranslog() {
return readFromTranslog;
}
}
public static class GetResult implements Releasable {

View File

@ -78,6 +78,7 @@ import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -145,6 +146,7 @@ public class InternalEngine extends Engine {
* being indexed/deleted.
*/
private final AtomicLong writingBytes = new AtomicLong();
private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false);
@Nullable
private final String historyUUID;
@ -558,6 +560,27 @@ public class InternalEngine extends Engine {
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
get.versionType().explainConflictForReads(versionValue.version, get.version()));
}
if (get.isReadFromTranslog()) {
// this is only used for updates - API _GET calls will always read form a reader for consistency
// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
if (versionValue.getLocation() != null) {
try {
Translog.Operation operation = translog.readOperation(versionValue.getLocation());
if (operation != null) {
// in the case of a already pruned translog generation we might get null here - yet very unlikely
TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig
.getIndexSettings().getIndexVersionCreated());
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader)),
new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0));
}
} catch (IOException e) {
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
throw new EngineException(shardId, "failed to read operation from translog", e);
}
} else {
trackTranslogLocation.set(true);
}
}
refresh("realtime_get", SearcherScope.INTERNAL);
}
scope = SearcherScope.INTERNAL;
@ -790,6 +813,10 @@ public class InternalEngine extends Engine {
}
indexResult.setTranslogLocation(location);
}
if (plan.indexIntoLucene && indexResult.hasFailure() == false) {
versionMap.maybePutUnderLock(index.uid().bytes(),
getVersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm(), indexResult.getTranslogLocation()));
}
if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo());
}
@ -916,8 +943,6 @@ public class InternalEngine extends Engine {
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
index(index.docs(), indexWriter);
}
versionMap.maybePutUnderLock(index.uid().bytes(),
new VersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm()));
return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
} catch (Exception ex) {
if (indexWriter.getTragicException() == null) {
@ -941,6 +966,13 @@ public class InternalEngine extends Engine {
}
}
private VersionValue getVersionValue(long version, long seqNo, long term, Translog.Location location) {
if (location != null && trackTranslogLocation.get()) {
return new TranslogVersionValue(location, version, seqNo, term);
}
return new VersionValue(version, seqNo, term);
}
/**
* returns true if the indexing operation may have already be processed by this engine.
* Note that it is OK to rarely return true even if this is not the case. However a `false`

View File

@ -0,0 +1,237 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.LeafMetaData;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.index.Terms;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.index.fielddata.AbstractSortedDocValues;
import org.elasticsearch.index.fielddata.AbstractSortedSetDocValues;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.ParentFieldMapper;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
/**
* Internal class that mocks a single doc read from the transaction log as a leaf reader.
*/
final class TranslogLeafReader extends LeafReader {
private final Translog.Index operation;
private static final FieldInfo FAKE_SOURCE_FIELD
= new FieldInfo(SourceFieldMapper.NAME, 1, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(),
0,0);
private static final FieldInfo FAKE_ROUTING_FIELD
= new FieldInfo(RoutingFieldMapper.NAME, 2, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(),
0,0);
private static final FieldInfo FAKE_ID_FIELD
= new FieldInfo(IdFieldMapper.NAME, 3, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(),
0,0);
private static final FieldInfo FAKE_UID_FIELD
= new FieldInfo(UidFieldMapper.NAME, 4, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(),
0,0);
private final Version indexVersionCreated;
TranslogLeafReader(Translog.Index operation, Version indexVersionCreated) {
this.operation = operation;
this.indexVersionCreated = indexVersionCreated;
}
@Override
public CacheHelper getCoreCacheHelper() {
throw new UnsupportedOperationException();
}
@Override
public Terms terms(String field) {
throw new UnsupportedOperationException();
}
@Override
public NumericDocValues getNumericDocValues(String field) {
throw new UnsupportedOperationException();
}
@Override
public BinaryDocValues getBinaryDocValues(String field) {
throw new UnsupportedOperationException();
}
@Override
public SortedDocValues getSortedDocValues(String field) {
// TODO this can be removed in 7.0 and upwards we don't support the parent field anymore
if (field.startsWith(ParentFieldMapper.NAME + "#") && operation.parent() != null) {
return new AbstractSortedDocValues() {
@Override
public int docID() {
return 0;
}
private final BytesRef term = new BytesRef(operation.parent());
private int ord;
@Override
public boolean advanceExact(int docID) {
if (docID != 0) {
throw new IndexOutOfBoundsException("do such doc ID: " + docID);
}
ord = 0;
return true;
}
@Override
public int ordValue() {
return ord;
}
@Override
public BytesRef lookupOrd(int ord) {
if (ord == 0) {
return term;
}
return null;
}
@Override
public int getValueCount() {
return 1;
}
};
}
if (operation.parent() == null) {
return null;
}
assert false : "unexpected field: " + field;
return null;
}
@Override
public SortedNumericDocValues getSortedNumericDocValues(String field) {
throw new UnsupportedOperationException();
}
@Override
public SortedSetDocValues getSortedSetDocValues(String field) {
throw new UnsupportedOperationException();
}
@Override
public NumericDocValues getNormValues(String field) {
throw new UnsupportedOperationException();
}
@Override
public FieldInfos getFieldInfos() {
throw new UnsupportedOperationException();
}
@Override
public Bits getLiveDocs() {
throw new UnsupportedOperationException();
}
@Override
public PointValues getPointValues(String field) {
throw new UnsupportedOperationException();
}
@Override
public void checkIntegrity() {
}
@Override
public LeafMetaData getMetaData() {
throw new UnsupportedOperationException();
}
@Override
public Fields getTermVectors(int docID) {
throw new UnsupportedOperationException();
}
@Override
public int numDocs() {
return 1;
}
@Override
public int maxDoc() {
return 1;
}
@Override
public void document(int docID, StoredFieldVisitor visitor) throws IOException {
if (docID != 0) {
throw new IllegalArgumentException("no such doc ID " + docID);
}
if (visitor.needsField(FAKE_SOURCE_FIELD) == StoredFieldVisitor.Status.YES) {
assert operation.source().toBytesRef().offset == 0;
assert operation.source().toBytesRef().length == operation.source().toBytesRef().bytes.length;
visitor.binaryField(FAKE_SOURCE_FIELD, operation.source().toBytesRef().bytes);
}
if (operation.routing() != null && visitor.needsField(FAKE_ROUTING_FIELD) == StoredFieldVisitor.Status.YES) {
visitor.stringField(FAKE_ROUTING_FIELD, operation.routing().getBytes(StandardCharsets.UTF_8));
}
if (visitor.needsField(FAKE_ID_FIELD) == StoredFieldVisitor.Status.YES) {
final byte[] id;
if (indexVersionCreated.onOrAfter(Version.V_6_0_0)) {
BytesRef bytesRef = Uid.encodeId(operation.id());
id = new byte[bytesRef.length];
System.arraycopy(bytesRef.bytes, bytesRef.offset, id, 0, bytesRef.length);
} else { // TODO this can go away in 7.0 after backport
id = operation.id().getBytes(StandardCharsets.UTF_8);
}
visitor.stringField(FAKE_ID_FIELD, id);
}
if (visitor.needsField(FAKE_UID_FIELD) == StoredFieldVisitor.Status.YES) {
visitor.stringField(FAKE_UID_FIELD, Uid.createUid(operation.type(), operation.id()).getBytes(StandardCharsets.UTF_8));
}
}
@Override
protected void doClose() {
}
@Override
public CacheHelper getReaderCacheHelper() {
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.index.translog.Translog;
import java.util.Objects;
final class TranslogVersionValue extends VersionValue {
private static final long RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(TranslogVersionValue.class);
private final Translog.Location translogLocation;
TranslogVersionValue(Translog.Location translogLocation, long version, long seqNo, long term) {
super(version, seqNo, term);
this.translogLocation = translogLocation;
}
@Override
public long ramBytesUsed() {
return RAM_BYTES_USED;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
TranslogVersionValue that = (TranslogVersionValue) o;
return Objects.equals(translogLocation, that.translogLocation);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), translogLocation);
}
@Override
public String toString() {
return "TranslogVersionValue{" +
"version=" + version +
", seqNo=" + seqNo +
", term=" + term +
", location=" + translogLocation +
'}';
}
@Override
public Translog.Location getLocation() {
return translogLocation;
}
}

View File

@ -21,6 +21,8 @@ package org.elasticsearch.index.engine;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.translog.Translog;
import java.util.Collection;
import java.util.Collections;
@ -81,9 +83,16 @@ class VersionValue implements Accountable {
public String toString() {
return "VersionValue{" +
"version=" + version +
", seqNo=" + seqNo +
", term=" + term +
'}';
}
/**
* Returns the translog location for this version value or null. This is optional and might not be tracked all the time.
*/
@Nullable
public Translog.Location getLocation() {
return null;
}
}

View File

@ -42,6 +42,7 @@ import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParentFieldMapper;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
@ -75,10 +76,15 @@ public final class ShardGetService extends AbstractIndexShardComponent {
}
public GetResult get(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, FetchSourceContext fetchSourceContext) {
return get(type, id, gFields, realtime, version, versionType, fetchSourceContext, false);
}
private GetResult get(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType,
FetchSourceContext fetchSourceContext, boolean readFromTranslog) {
currentMetric.inc();
try {
long now = System.nanoTime();
GetResult getResult = innerGet(type, id, gFields, realtime, version, versionType, fetchSourceContext);
GetResult getResult = innerGet(type, id, gFields, realtime, version, versionType, fetchSourceContext, readFromTranslog);
if (getResult.isExists()) {
existsMetric.inc(System.nanoTime() - now);
@ -91,6 +97,11 @@ public final class ShardGetService extends AbstractIndexShardComponent {
}
}
public GetResult getForUpdate(String type, String id, long version, VersionType versionType) {
return get(type, id, new String[]{RoutingFieldMapper.NAME, ParentFieldMapper.NAME}, true, version, versionType,
FetchSourceContext.FETCH_SOURCE, true);
}
/**
* Returns {@link GetResult} based on the specified {@link org.elasticsearch.index.engine.Engine.GetResult} argument.
* This method basically loads specified fields for the associated document in the engineGetResult.
@ -137,7 +148,8 @@ public final class ShardGetService extends AbstractIndexShardComponent {
return FetchSourceContext.DO_NOT_FETCH_SOURCE;
}
private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, FetchSourceContext fetchSourceContext) {
private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType,
FetchSourceContext fetchSourceContext, boolean readFromTranslog) {
fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);
final Collection<String> types;
if (type == null || type.equals("_all")) {
@ -150,7 +162,7 @@ public final class ShardGetService extends AbstractIndexShardComponent {
for (String typeX : types) {
Term uidTerm = mapperService.createUidTerm(typeX, id);
if (uidTerm != null) {
get = indexShard.get(new Engine.Get(realtime, typeX, id, uidTerm)
get = indexShard.get(new Engine.Get(realtime, readFromTranslog, typeX, id, uidTerm)
.version(version).versionType(versionType));
if (get.exists()) {
type = typeX;
@ -180,7 +192,7 @@ public final class ShardGetService extends AbstractIndexShardComponent {
FieldsVisitor fieldVisitor = buildFieldsVisitors(gFields, fetchSourceContext);
if (fieldVisitor != null) {
try {
docIdAndVersion.context.reader().document(docIdAndVersion.docId, fieldVisitor);
docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor);
} catch (IOException e) {
throw new ElasticsearchException("Failed to get type [" + type + "] and id [" + id + "]", e);
}
@ -197,7 +209,7 @@ public final class ShardGetService extends AbstractIndexShardComponent {
DocumentMapper docMapper = mapperService.documentMapper(type);
if (docMapper.parentFieldMapper().active()) {
String parentId = ParentFieldSubFetchPhase.getParentId(docMapper.parentFieldMapper(), docIdAndVersion.context.reader(), docIdAndVersion.docId);
String parentId = ParentFieldSubFetchPhase.getParentId(docMapper.parentFieldMapper(), docIdAndVersion.reader, docIdAndVersion.docId);
if (fields == null) {
fields = new HashMap<>(1);
}

View File

@ -85,7 +85,7 @@ public class TermVectorsService {
termVectorsResponse.setExists(false);
return termVectorsResponse;
}
Engine.GetResult get = indexShard.get(new Engine.Get(request.realtime(), request.type(), request.id(), uidTerm)
Engine.GetResult get = indexShard.get(new Engine.Get(request.realtime(), false, request.type(), request.id(), uidTerm)
.version(request.version()).versionType(request.versionType()));
Fields termVectorsByField = null;
@ -114,7 +114,7 @@ public class TermVectorsService {
/* or from an existing document */
else if (docIdAndVersion != null) {
// fields with stored term vectors
termVectorsByField = docIdAndVersion.context.reader().getTermVectors(docIdAndVersion.docId);
termVectorsByField = docIdAndVersion.reader.getTermVectors(docIdAndVersion.docId);
Set<String> selectedFields = request.selectedFields();
// generate tvs for fields where analyzer is overridden
if (selectedFields == null && request.perFieldAnalyzer() != null) {

View File

@ -126,4 +126,13 @@ public abstract class BaseTranslogReader implements Comparable<BaseTranslogReade
public long getLastModifiedTime() throws IOException {
return Files.getLastModifiedTime(path).toMillis();
}
/**
* Reads a single opertation from the given location.
*/
Translog.Operation read(Translog.Location location) throws IOException {
assert location.generation == this.generation : "generation mismatch expected: " + generation + " got: " + location.generation;
ByteBuffer buffer = ByteBuffer.allocate(location.size);
return read(checksummedStream(buffer, location.translogLocation, location.size, null));
}
}

View File

@ -571,6 +571,33 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
}
/**
* Reads and returns the operation from the given location if the generation it references is still available. Otherwise
* this method will return <code>null</code>.
*/
public Operation readOperation(Location location) throws IOException {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
if (location.generation < getMinFileGeneration()) {
return null;
}
if (current.generation == location.generation) {
// no need to fsync here the read operation will ensure that buffers are written to disk
// if they are still in RAM and we are reading onto that position
return current.read(location);
} else {
// read backwards - it's likely we need to read on that is recent
for (int i = readers.size() - 1; i >= 0; i--) {
TranslogReader translogReader = readers.get(i);
if (translogReader.generation == location.generation) {
return translogReader.read(location);
}
}
}
}
return null;
}
public Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();

View File

@ -104,5 +104,4 @@ final class TranslogSnapshot extends BaseTranslogReader {
", reusableBuffer=" + reusableBuffer +
'}';
}
}

View File

@ -1238,7 +1238,7 @@ public class InternalEngineTests extends EngineTestCase {
Engine.Index create = new Engine.Index(newUid(doc), doc, Versions.MATCH_DELETED);
Engine.IndexResult indexResult = engine.index(create);
assertThat(indexResult.getVersion(), equalTo(1L));
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) {
try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), create.uid()), searcherFactory)) {
assertEquals(1, get.version());
}
@ -1246,7 +1246,7 @@ public class InternalEngineTests extends EngineTestCase {
Engine.IndexResult update_1_result = engine.index(update_1);
assertThat(update_1_result.getVersion(), equalTo(2L));
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) {
try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), create.uid()), searcherFactory)) {
assertEquals(2, get.version());
}
@ -1254,7 +1254,7 @@ public class InternalEngineTests extends EngineTestCase {
Engine.IndexResult update_2_result = engine.index(update_2);
assertThat(update_2_result.getVersion(), equalTo(3L));
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) {
try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), create.uid()), searcherFactory)) {
assertEquals(3, get.version());
}
@ -1765,7 +1765,7 @@ public class InternalEngineTests extends EngineTestCase {
assertOpsOnReplica(replicaOps, replicaEngine, true);
final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine);
final long currentSeqNo = getSequenceID(replicaEngine,
new Engine.Get(false, "type", lastReplicaOp.uid().text(), lastReplicaOp.uid())).v1();
new Engine.Get(false, false, "type", lastReplicaOp.uid().text(), lastReplicaOp.uid())).v1();
try (Searcher searcher = engine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.searcher().search(new MatchAllDocsQuery(), collector);
@ -1830,9 +1830,9 @@ public class InternalEngineTests extends EngineTestCase {
throw new AssertionError(e);
}
for (int op = 0; op < opsPerThread; op++) {
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory)) {
try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), uidTerm), searcherFactory)) {
FieldsVisitor visitor = new FieldsVisitor(true);
get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor);
get.docIdAndVersion().reader.document(get.docIdAndVersion().docId, visitor);
List<String> values = new ArrayList<>(Strings.commaDelimitedListToSet(visitor.source().utf8ToString()));
String removed = op % 3 == 0 && values.size() > 0 ? values.remove(0) : null;
String added = "v_" + idGenerator.incrementAndGet();
@ -1872,9 +1872,9 @@ public class InternalEngineTests extends EngineTestCase {
assertTrue(op.added + " should not exist", exists);
}
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory)) {
try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), uidTerm), searcherFactory)) {
FieldsVisitor visitor = new FieldsVisitor(true);
get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor);
get.docIdAndVersion().reader.document(get.docIdAndVersion().docId, visitor);
List<String> values = Arrays.asList(Strings.commaDelimitedListToStringArray(visitor.source().utf8ToString()));
assertThat(currentValues, equalTo(new HashSet<>(values)));
}
@ -2275,7 +2275,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.delete(new Engine.Delete("test", "2", newUid("2"), SequenceNumbers.UNASSIGNED_SEQ_NO, 0, 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
// Get should not find the document (we never indexed uid=2):
getResult = engine.get(new Engine.Get(true, "type", "2", newUid("2")), searcherFactory);
getResult = engine.get(new Engine.Get(true, false, "type", "2", newUid("2")), searcherFactory);
assertThat(getResult.exists(), equalTo(false));
// Try to index uid=1 with a too-old version, should fail:
@ -3450,7 +3450,7 @@ public class InternalEngineTests extends EngineTestCase {
}
public void testSequenceIDs() throws Exception {
Tuple<Long, Long> seqID = getSequenceID(engine, new Engine.Get(false, "type", "2", newUid("1")));
Tuple<Long, Long> seqID = getSequenceID(engine, new Engine.Get(false, false, "type", "2", newUid("1")));
// Non-existent doc returns no seqnum and no primary term
assertThat(seqID.v1(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
assertThat(seqID.v2(), equalTo(0L));
@ -3665,7 +3665,7 @@ public class InternalEngineTests extends EngineTestCase {
}
assertThat(engine.getLocalCheckpointTracker().getCheckpoint(), equalTo(expectedLocalCheckpoint));
try (Engine.GetResult result = engine.get(new Engine.Get(true, "type", "2", uid), searcherFactory)) {
try (Engine.GetResult result = engine.get(new Engine.Get(true, false, "type", "2", uid), searcherFactory)) {
assertThat(result.exists(), equalTo(exists));
}
}
@ -4454,14 +4454,14 @@ public class InternalEngineTests extends EngineTestCase {
CountDownLatch awaitStarted = new CountDownLatch(1);
Thread thread = new Thread(() -> {
awaitStarted.countDown();
try (Engine.GetResult getResult = engine.get(new Engine.Get(true, doc3.type(), doc3.id(), doc3.uid()),
try (Engine.GetResult getResult = engine.get(new Engine.Get(true, false, doc3.type(), doc3.id(), doc3.uid()),
engine::acquireSearcher)) {
assertTrue(getResult.exists());
}
});
thread.start();
awaitStarted.await();
try (Engine.GetResult getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), doc.uid()),
try (Engine.GetResult getResult = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), doc.uid()),
engine::acquireSearcher)) {
assertFalse(getResult.exists());
}

View File

@ -1185,7 +1185,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
long refreshCount = shard.refreshStats().getTotal();
indexDoc(shard, "test", "test");
try (Engine.GetResult ignored = shard.get(new Engine.Get(true, "test", "test",
try (Engine.GetResult ignored = shard.get(new Engine.Get(true, false, "test", "test",
new Term(IdFieldMapper.NAME, Uid.encodeId("test"))))) {
assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount+1));
}
@ -1833,7 +1833,7 @@ public class IndexShardTests extends IndexShardTestCase {
indexDoc(shard, "test", "1", "{\"foobar\" : \"bar\"}");
shard.refresh("test");
Engine.GetResult getResult = shard.get(new Engine.Get(false, "test", "1", new Term(IdFieldMapper.NAME, Uid.encodeId("1"))));
Engine.GetResult getResult = shard.get(new Engine.Get(false, false, "test", "1", new Term(IdFieldMapper.NAME, Uid.encodeId("1"))));
assertTrue(getResult.exists());
assertNotNull(getResult.searcher());
getResult.release();
@ -1867,7 +1867,7 @@ public class IndexShardTests extends IndexShardTestCase {
search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10);
assertEquals(search.totalHits, 1);
}
getResult = newShard.get(new Engine.Get(false, "test", "1", new Term(IdFieldMapper.NAME, Uid.encodeId("1"))));
getResult = newShard.get(new Engine.Get(false, false, "test", "1", new Term(IdFieldMapper.NAME, Uid.encodeId("1"))));
assertTrue(getResult.exists());
assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader
assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader);

View File

@ -323,12 +323,12 @@ public class RefreshListenersTests extends ESTestCase {
}
listener.assertNoError();
Engine.Get get = new Engine.Get(false, "test", threadId, new Term(IdFieldMapper.NAME, threadId));
Engine.Get get = new Engine.Get(false, false, "test", threadId, new Term(IdFieldMapper.NAME, threadId));
try (Engine.GetResult getResult = engine.get(get, engine::acquireSearcher)) {
assertTrue("document not found", getResult.exists());
assertEquals(iteration, getResult.version());
SingleFieldsVisitor visitor = new SingleFieldsVisitor("test");
getResult.docIdAndVersion().context.reader().document(getResult.docIdAndVersion().docId, visitor);
getResult.docIdAndVersion().reader.document(getResult.docIdAndVersion().docId, visitor);
assertEquals(Arrays.asList(testFieldValue), visitor.fields().get("test"));
}
} catch (Exception t) {

View File

@ -0,0 +1,132 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.ParentFieldMapper;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class ShardGetServiceTests extends IndexShardTestCase {
public void testGetForUpdate() throws IOException {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
IndexMetaData metaData = IndexMetaData.builder("test")
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
.settings(settings)
.primaryTerm(0, 1).build();
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
recoverShardFromStore(primary);
Engine.IndexResult test = indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}");
assertTrue(primary.getEngine().refreshNeeded());
GetResult testGet = primary.getService().getForUpdate("test", "0", test.getVersion(), VersionType.INTERNAL);
assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME));
assertEquals(new String(testGet.source(), StandardCharsets.UTF_8), "{\"foo\" : \"bar\"}");
try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
assertEquals(searcher.reader().maxDoc(), 1); // we refreshed
}
Engine.IndexResult test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar", null);
assertTrue(primary.getEngine().refreshNeeded());
GetResult testGet1 = primary.getService().getForUpdate("test", "1", test1.getVersion(), VersionType.INTERNAL);
assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}");
assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME));
assertFalse(testGet1.getFields().containsKey(ParentFieldMapper.NAME));
assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue());
try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
assertEquals(searcher.reader().maxDoc(), 1); // we read from the translog
}
primary.getEngine().refresh("test");
try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
assertEquals(searcher.reader().maxDoc(), 2);
}
// now again from the reader
test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar", null);
assertTrue(primary.getEngine().refreshNeeded());
testGet1 = primary.getService().getForUpdate("test", "1", test1.getVersion(), VersionType.INTERNAL);
assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}");
assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME));
assertFalse(testGet1.getFields().containsKey(ParentFieldMapper.NAME));
assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue());
closeShards(primary);
}
public void testGetForUpdateWithParentField() throws IOException {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put("index.version.created", Version.V_5_6_0) // for parent field mapper
.build();
IndexMetaData metaData = IndexMetaData.builder("test")
.putMapping("parent", "{ \"properties\": {}}")
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}, \"_parent\": { \"type\": \"parent\"}}")
.settings(settings)
.primaryTerm(0, 1).build();
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
recoverShardFromStore(primary);
Engine.IndexResult test = indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}");
assertTrue(primary.getEngine().refreshNeeded());
GetResult testGet = primary.getService().getForUpdate("test", "0", test.getVersion(), VersionType.INTERNAL);
assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME));
assertEquals(new String(testGet.source(), StandardCharsets.UTF_8), "{\"foo\" : \"bar\"}");
try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
assertEquals(searcher.reader().maxDoc(), 1); // we refreshed
}
Engine.IndexResult test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, null, "foobar");
assertTrue(primary.getEngine().refreshNeeded());
GetResult testGet1 = primary.getService().getForUpdate("test", "1", test1.getVersion(), VersionType.INTERNAL);
assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}");
assertTrue(testGet1.getFields().containsKey(ParentFieldMapper.NAME));
assertFalse(testGet1.getFields().containsKey(RoutingFieldMapper.NAME));
assertEquals("foobar", testGet1.getFields().get(ParentFieldMapper.NAME).getValue());
try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
assertEquals(searcher.reader().maxDoc(), 1); // we read from the translog
}
primary.getEngine().refresh("test");
try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
assertEquals(searcher.reader().maxDoc(), 2);
}
// now again from the reader
test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, null, "foobar");
assertTrue(primary.getEngine().refreshNeeded());
testGet1 = primary.getService().getForUpdate("test", "1", test1.getVersion(), VersionType.INTERNAL);
assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}");
assertTrue(testGet1.getFields().containsKey(ParentFieldMapper.NAME));
assertFalse(testGet1.getFields().containsKey(RoutingFieldMapper.NAME));
assertEquals("foobar", testGet1.getFields().get(ParentFieldMapper.NAME).getValue());
closeShards(primary);
}
}

View File

@ -235,9 +235,9 @@ public class TranslogTests extends ESTestCase {
return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize);
}
private void addToTranslogAndList(Translog translog, List<Translog.Operation> list, Translog.Operation op) throws IOException {
private Location addToTranslogAndList(Translog translog, List<Translog.Operation> list, Translog.Operation op) throws IOException {
list.add(op);
translog.add(op);
return translog.add(op);
}
public void testIdParsingFromFile() {
@ -579,6 +579,19 @@ public class TranslogTests extends ESTestCase {
}
}
public void testReadLocation() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
ArrayList<Translog.Location> locs = new ArrayList<>();
locs.add(addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, new byte[]{1})));
locs.add(addToTranslogAndList(translog, ops, new Translog.Index("test", "2", 1, new byte[]{1})));
locs.add(addToTranslogAndList(translog, ops, new Translog.Index("test", "3", 2, new byte[]{1})));
int i = 0;
for (Translog.Operation op : ops) {
assertEquals(op, translog.readOperation(locs.get(i++)));
}
assertNull(translog.readOperation(new Location(100, 0, 0)));
}
public void testSnapshotWithNewTranslog() throws IOException {
List<Closeable> toClose = new ArrayList<>();
try {
@ -689,6 +702,9 @@ public class TranslogTests extends ESTestCase {
Translog.Operation op = snapshot.next();
assertNotNull(op);
Translog.Operation expectedOp = locationOperation.operation;
if (randomBoolean()) {
assertEquals(expectedOp, translog.readOperation(locationOperation.location));
}
assertEquals(expectedOp.opType(), op.opType());
switch (op.opType()) {
case INDEX:
@ -1643,6 +1659,9 @@ public class TranslogTests extends ESTestCase {
Translog.Location loc = add(op);
writtenOperations.add(new LocationOperation(op, loc));
if (rarely()) { // lets verify we can concurrently read this
assertEquals(op, translog.readOperation(loc));
}
afterAdd();
}
} catch (Exception t) {

View File

@ -471,7 +471,7 @@ public abstract class EngineTestCase extends ESTestCase {
}
protected Engine.Get newGet(boolean realtime, ParsedDocument doc) {
return new Engine.Get(realtime, doc.type(), doc.id(), newUid(doc));
return new Engine.Get(realtime, false, doc.type(), doc.id(), newUid(doc));
}
protected Engine.Index indexForDoc(ParsedDocument doc) {

View File

@ -548,12 +548,15 @@ public abstract class IndexShardTestCase extends ESTestCase {
}
protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, String source) throws IOException {
return indexDoc(shard, type, id, source, XContentType.JSON);
return indexDoc(shard, type, id, source, XContentType.JSON, null, null);
}
protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, String source, XContentType xContentType)
protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, String source, XContentType xContentType,
String routing, String parentId)
throws IOException {
SourceToParse sourceToParse = SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source), xContentType);
sourceToParse.routing(routing);
sourceToParse.parent(parentId);
if (shard.routingEntry().primary()) {
final Engine.IndexResult result = shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, sourceToParse,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, getMappingUpdater(shard, type));