Fix translog replay multiple operations same doc
Modifying the translog replay to not replay again into the translog introduced a bug for the case of multiple operations for the same doc. Namely, since we were no longer updating the version map for each operation, the second operation for a doc would be treated as a creation instead of as an update. This commit fixes this bug by placing these operations into version map. This commit includes a failing test case. Relates #18611
This commit is contained in:
parent
f0020caf7c
commit
df7b3c6e82
|
@ -395,6 +395,12 @@ public class InternalEngine extends Engine {
|
|||
final Translog.Location translogLocation = translog.add(new Translog.Index(index));
|
||||
index.setTranslogLocation(translogLocation);
|
||||
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, index.getTranslogLocation()));
|
||||
} else {
|
||||
// we do not replay in to the translog, so there is no
|
||||
// translog location; that is okay because real-time
|
||||
// gets are not possible during recovery and we will
|
||||
// flush when the recovery is complete
|
||||
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, null));
|
||||
}
|
||||
|
||||
return created;
|
||||
|
@ -498,6 +504,12 @@ public class InternalEngine extends Engine {
|
|||
final Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
|
||||
delete.setTranslogLocation(translogLocation);
|
||||
versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), delete.getTranslogLocation()));
|
||||
} else {
|
||||
// we do not replay in to the translog, so there is no
|
||||
// translog location; that is okay because real-time
|
||||
// gets are not possible during recovery and we will
|
||||
// flush when the recovery is complete
|
||||
versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), null));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,7 +54,8 @@ class VersionValue implements Accountable {
|
|||
|
||||
@Override
|
||||
public long ramBytesUsed() {
|
||||
return RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + Long.BYTES + RamUsageEstimator.NUM_BYTES_OBJECT_REF + translogLocation.ramBytesUsed();
|
||||
return RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + Long.BYTES + RamUsageEstimator.NUM_BYTES_OBJECT_REF +
|
||||
(translogLocation != null ? translogLocation.size : 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.lucene.search.IndexSearcher;
|
|||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.search.TotalHitCountCollector;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.MockDirectoryWrapper;
|
||||
|
@ -592,6 +593,42 @@ public class InternalEngineTests extends ESTestCase {
|
|||
engine.flush();
|
||||
}
|
||||
|
||||
public void testTranslogMultipleOperationsSameDocument() throws IOException {
|
||||
final int ops = randomIntBetween(1, 32);
|
||||
Engine initialEngine;
|
||||
final List<Engine.Operation> operations = new ArrayList<>();
|
||||
try {
|
||||
initialEngine = engine;
|
||||
for (int i = 0; i < ops; i++) {
|
||||
final ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
|
||||
if (randomBoolean()) {
|
||||
final Engine.Index operation = new Engine.Index(newUid("test#1"), doc, i, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime());
|
||||
operations.add(operation);
|
||||
initialEngine.index(operation);
|
||||
} else {
|
||||
final Engine.Delete operation = new Engine.Delete("test", "1", newUid("test#1"), i, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false);
|
||||
operations.add(operation);
|
||||
initialEngine.delete(operation);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
IOUtils.close(engine);
|
||||
}
|
||||
|
||||
Engine recoveringEngine = null;
|
||||
try {
|
||||
recoveringEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
|
||||
recoveringEngine.recoverFromTranslog();
|
||||
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
|
||||
final TotalHitCountCollector collector = new TotalHitCountCollector();
|
||||
searcher.searcher().search(new MatchAllDocsQuery(), collector);
|
||||
assertThat(collector.getTotalHits(), equalTo(operations.get(operations.size() - 1) instanceof Engine.Delete ? 0 : 1));
|
||||
}
|
||||
} finally {
|
||||
IOUtils.close(recoveringEngine);
|
||||
}
|
||||
}
|
||||
|
||||
public void testTranslogRecoveryDoesNotReplayIntoTranslog() throws IOException {
|
||||
final int docs = randomIntBetween(1, 32);
|
||||
Engine initialEngine = null;
|
||||
|
|
Loading…
Reference in New Issue