Return consistent source in updates (#48707)
This commit is contained in:
parent
5bea3898a9
commit
fe8901b00b
|
@ -628,7 +628,7 @@ public abstract class Engine implements Closeable {
|
|||
if (docIdAndVersion != null) {
|
||||
// don't release the searcher on this path, it is the
|
||||
// responsibility of the caller to call GetResult.release
|
||||
return new GetResult(searcher, docIdAndVersion);
|
||||
return new GetResult(searcher, docIdAndVersion, false);
|
||||
} else {
|
||||
Releasables.close(searcher);
|
||||
return GetResult.NOT_EXISTS;
|
||||
|
@ -1650,21 +1650,20 @@ public abstract class Engine implements Closeable {
|
|||
private final long version;
|
||||
private final DocIdAndVersion docIdAndVersion;
|
||||
private final Engine.Searcher searcher;
|
||||
private final boolean fromTranslog;
|
||||
|
||||
public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null, null);
|
||||
public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null, null, false);
|
||||
|
||||
private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, Engine.Searcher searcher) {
|
||||
private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, Engine.Searcher searcher, boolean fromTranslog) {
|
||||
this.exists = exists;
|
||||
this.version = version;
|
||||
this.docIdAndVersion = docIdAndVersion;
|
||||
this.searcher = searcher;
|
||||
this.fromTranslog = fromTranslog;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a non-realtime get result from the searcher.
|
||||
*/
|
||||
public GetResult(Engine.Searcher searcher, DocIdAndVersion docIdAndVersion) {
|
||||
this(true, docIdAndVersion.version, docIdAndVersion, searcher);
|
||||
public GetResult(Engine.Searcher searcher, DocIdAndVersion docIdAndVersion, boolean fromTranslog) {
|
||||
this(true, docIdAndVersion.version, docIdAndVersion, searcher, fromTranslog);
|
||||
}
|
||||
|
||||
public boolean exists() {
|
||||
|
@ -1675,6 +1674,10 @@ public abstract class Engine implements Closeable {
|
|||
return this.version;
|
||||
}
|
||||
|
||||
public boolean isFromTranslog() {
|
||||
return fromTranslog;
|
||||
}
|
||||
|
||||
public Engine.Searcher searcher() {
|
||||
return this.searcher;
|
||||
}
|
||||
|
|
|
@ -680,7 +680,7 @@ public class InternalEngine extends Engine {
|
|||
return new GetResult(new Engine.Searcher("realtime_get", reader,
|
||||
IndexSearcher.getDefaultSimilarity(), null, IndexSearcher.getDefaultQueryCachingPolicy(), reader),
|
||||
new VersionsAndSeqNoResolver.DocIdAndVersion(0, index.version(), index.seqNo(), index.primaryTerm(),
|
||||
reader, 0));
|
||||
reader, 0), true);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
|
||||
|
|
|
@ -168,6 +168,7 @@ public final class ShardGetService extends AbstractIndexShardComponent {
|
|||
Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
|
||||
get = indexShard.get(new Engine.Get(realtime, readFromTranslog, type, id, uidTerm)
|
||||
.version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
|
||||
assert get.isFromTranslog() == false || readFromTranslog : "should only read from translog if explicitly enabled";
|
||||
if (get.exists() == false) {
|
||||
get.close();
|
||||
}
|
||||
|
@ -206,7 +207,7 @@ public final class ShardGetService extends AbstractIndexShardComponent {
|
|||
metaDataFields = new HashMap<>();
|
||||
for (Map.Entry<String, List<Object>> entry : fieldVisitor.fields().entrySet()) {
|
||||
if (MapperService.isMetadataField(entry.getKey())) {
|
||||
metaDataFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));
|
||||
metaDataFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));
|
||||
} else {
|
||||
documentFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));
|
||||
}
|
||||
|
@ -230,12 +231,22 @@ public final class ShardGetService extends AbstractIndexShardComponent {
|
|||
|
||||
if (!fetchSourceContext.fetchSource()) {
|
||||
source = null;
|
||||
} else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) {
|
||||
}
|
||||
|
||||
if (source != null && get.isFromTranslog()) {
|
||||
// reapply source filters from mapping (possibly also nulling the source)
|
||||
try {
|
||||
source = docMapper.sourceMapper().applyFilters(source, null);
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchException("Failed to reapply filters for [" + id + "] after reading from translog", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (source != null && (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0)) {
|
||||
Map<String, Object> sourceAsMap;
|
||||
XContentType sourceContentType = null;
|
||||
// TODO: The source might parsed and available in the sourceLookup but that one uses unordered maps so different. Do we care?
|
||||
Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);
|
||||
sourceContentType = typeMapTuple.v1();
|
||||
XContentType sourceContentType = typeMapTuple.v1();
|
||||
sourceAsMap = typeMapTuple.v2();
|
||||
sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
|
||||
try {
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.lucene.index.IndexOptions;
|
|||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
@ -227,33 +228,43 @@ public class SourceFieldMapper extends MetadataFieldMapper {
|
|||
@Override
|
||||
protected void parseCreateField(ParseContext context, List<IndexableField> fields) throws IOException {
|
||||
BytesReference originalSource = context.sourceToParse().source();
|
||||
BytesReference source = originalSource;
|
||||
if (enabled && fieldType().stored() && source != null) {
|
||||
// Percolate and tv APIs may not set the source and that is ok, because these APIs will not index any data
|
||||
if (filter != null) {
|
||||
// we don't update the context source if we filter, we want to keep it as is...
|
||||
Tuple<XContentType, Map<String, Object>> mapTuple =
|
||||
XContentHelper.convertToMap(source, true, context.sourceToParse().getXContentType());
|
||||
Map<String, Object> filteredSource = filter.apply(mapTuple.v2());
|
||||
BytesStreamOutput bStream = new BytesStreamOutput();
|
||||
XContentType contentType = mapTuple.v1();
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(contentType, bStream).map(filteredSource);
|
||||
builder.close();
|
||||
source = bStream.bytes();
|
||||
}
|
||||
BytesRef ref = source.toBytesRef();
|
||||
XContentType contentType = context.sourceToParse().getXContentType();
|
||||
final BytesReference adaptedSource = applyFilters(originalSource, contentType);
|
||||
|
||||
if (adaptedSource != null) {
|
||||
final BytesRef ref = adaptedSource.toBytesRef();
|
||||
fields.add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));
|
||||
} else {
|
||||
source = null;
|
||||
}
|
||||
|
||||
if (originalSource != null && source != originalSource && context.indexSettings().isSoftDeleteEnabled()) {
|
||||
if (originalSource != null && adaptedSource != originalSource && context.indexSettings().isSoftDeleteEnabled()) {
|
||||
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
|
||||
BytesRef ref = originalSource.toBytesRef();
|
||||
fields.add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
|
||||
fields.add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public BytesReference applyFilters(@Nullable BytesReference originalSource, @Nullable XContentType contentType) throws IOException {
|
||||
if (enabled && fieldType().stored() && originalSource != null) {
|
||||
// Percolate and tv APIs may not set the source and that is ok, because these APIs will not index any data
|
||||
if (filter != null) {
|
||||
// we don't update the context source if we filter, we want to keep it as is...
|
||||
Tuple<XContentType, Map<String, Object>> mapTuple =
|
||||
XContentHelper.convertToMap(originalSource, true, contentType);
|
||||
Map<String, Object> filteredSource = filter.apply(mapTuple.v2());
|
||||
BytesStreamOutput bStream = new BytesStreamOutput();
|
||||
XContentType actualContentType = mapTuple.v1();
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(actualContentType, bStream).map(filteredSource);
|
||||
builder.close();
|
||||
return bStream.bytes();
|
||||
} else {
|
||||
return originalSource;
|
||||
}
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String contentType() {
|
||||
|
|
|
@ -89,6 +89,34 @@ public class ShardGetServiceTests extends IndexShardTestCase {
|
|||
closeShards(primary);
|
||||
}
|
||||
|
||||
public void testGetFromTranslogWithSourceMappingOptions() throws IOException {
|
||||
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.build();
|
||||
String docToIndex = "{\"foo\" : \"foo\", \"bar\" : \"bar\"}";
|
||||
boolean noSource = randomBoolean();
|
||||
String sourceOptions = noSource ? "\"enabled\": false" : randomBoolean() ? "\"excludes\": [\"fo*\"]" : "\"includes\": [\"ba*\"]";
|
||||
String expectedResult = noSource ? "" : "{\"bar\":\"bar\"}";
|
||||
IndexMetaData metaData = IndexMetaData.builder("test")
|
||||
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}, \"bar\": { \"type\": \"text\"}}, \"_source\": { "
|
||||
+ sourceOptions + "}}}")
|
||||
.settings(settings)
|
||||
.primaryTerm(0, 1).build();
|
||||
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
|
||||
recoverShardFromStore(primary);
|
||||
Engine.IndexResult test = indexDoc(primary, "test", "0", docToIndex);
|
||||
assertTrue(primary.getEngine().refreshNeeded());
|
||||
GetResult testGet = primary.getService().getForUpdate("test", "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
|
||||
assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME));
|
||||
assertEquals(new String(testGet.source() == null ? new byte[0] : testGet.source(), StandardCharsets.UTF_8), expectedResult);
|
||||
try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
|
||||
assertEquals(searcher.getIndexReader().maxDoc(), 1); // we refreshed
|
||||
}
|
||||
|
||||
closeShards(primary);
|
||||
}
|
||||
|
||||
public void testTypelessGetForUpdate() throws IOException {
|
||||
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
|
|
Loading…
Reference in New Issue