Rewrite SourceToParse with resolved docType (#36921)

We introduce a typeless API in #35790 where we translate the default
docType "_doc" to the user-defined docType. However, we do not rewrite
the SourceToParse with the resolved docType. This leads to a situation
where we have two translog operations for the same document with
different types:

- prvOp [Index{id='9LCpwGcBkJN7eZxaB54L', type='_doc', seqNo=1,
  primaryTerm=1, version=1, autoGeneratedIdTimestamp=1545125562123}]

- newOp [Index{id='9LCpwGcBkJN7eZxaB54L', type='not_doc', seqNo=1,
  primaryTerm=1, version=1, autoGeneratedIdTimestamp=-1}]

Closes #36769
This commit is contained in:
Nhat Nguyen 2018-12-23 15:14:21 -05:00 committed by GitHub
parent 9137d92ca6
commit 40c7ae6181
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 3 deletions

View File

@ -710,9 +710,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
ensureWriteAllowed(origin);
Engine.Index operation;
try {
operation = prepareIndex(docMapper(sourceToParse.type()), indexSettings.getIndexVersionCreated(), sourceToParse, seqNo,
opPrimaryTerm, version, versionType, origin, autoGeneratedTimeStamp, isRetry,
ifSeqNo, ifPrimaryTerm);
final String resolvedType = resolveType(sourceToParse.type());
final SourceToParse sourceWithResolvedType;
if (resolvedType.equals(sourceToParse.type())) {
sourceWithResolvedType = sourceToParse;
} else {
sourceWithResolvedType = SourceToParse.source(sourceToParse.index(), resolvedType, sourceToParse.id(),
sourceToParse.source(), sourceToParse.getXContentType());
sourceWithResolvedType.routing(sourceToParse.routing());
}
operation = prepareIndex(docMapper(resolvedType), indexSettings.getIndexVersionCreated(), sourceWithResolvedType,
seqNo, opPrimaryTerm, version, versionType, origin, autoGeneratedTimeStamp, isRetry, ifSeqNo, ifPrimaryTerm);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
return new Engine.IndexResult(update);

View File

@ -64,6 +64,7 @@ import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.TestTranslog;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
@ -862,4 +863,28 @@ public class IndexShardIT extends ESSingleNodeTestCase {
client().search(countRequest).actionGet().getHits().getTotalHits().value, equalTo(numDocs + moreDocs));
}
public void testShardChangesWithDefaultDocType() throws Exception {
Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.translog.flush_threshold_size", "512mb") // do not flush
.put("index.soft_deletes.enabled", true).build();
IndexService indexService = createIndex("index", settings, "user_doc", "title", "type=keyword");
int numOps = between(1, 10);
for (int i = 0; i < numOps; i++) {
if (randomBoolean()) {
client().prepareIndex("index", randomFrom("_doc", "user_doc"), randomFrom("1", "2"))
.setSource("{}", XContentType.JSON).get();
} else {
client().prepareDelete("index", randomFrom("_doc", "user_doc"), randomFrom("1", "2")).get();
}
}
IndexShard shard = indexService.getShard(0);
try (Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true);
Translog.Snapshot translogSnapshot = getTranslog(shard).newSnapshot()) {
List<Translog.Operation> opsFromLucene = TestTranslog.drainSnapshot(luceneSnapshot, true);
List<Translog.Operation> opsFromTranslog = TestTranslog.drainSnapshot(translogSnapshot, true);
assertThat(opsFromLucene, equalTo(opsFromTranslog));
}
}
}

View File

@ -34,7 +34,9 @@ import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.Set;
@ -128,4 +130,16 @@ public class TestTranslog {
public static long getCurrentTerm(Translog translog) {
return translog.getCurrent().getPrimaryTerm();
}
public static List<Translog.Operation> drainSnapshot(Translog.Snapshot snapshot, boolean sortBySeqNo) throws IOException {
final List<Translog.Operation> ops = new ArrayList<>(snapshot.totalOperations());
Translog.Operation op;
while ((op = snapshot.next()) != null) {
ops.add(op);
}
if (sortBySeqNo) {
ops.sort(Comparator.comparing(Translog.Operation::seqNo));
}
return ops;
}
}