Fill missing sequence IDs up to max sequence ID when recovering from store (#24238)
Today we might promote a primary and recover from store where after translog recovery the local checkpoint is still behind the maximum sequence ID seen. To fill the holes in the sequence ID history this PR adds a utility method that fills up all missing sequence IDs up to the maximum seen sequence ID with no-ops. Relates to #10708
This commit is contained in:
parent
2dd924bc15
commit
2ca7072b24
|
@ -565,7 +565,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
final long version = primaryResponse.getVersion();
|
||||
final long seqNo = primaryResponse.getSeqNo();
|
||||
final SourceToParse sourceToParse =
|
||||
SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(),
|
||||
SourceToParse.source(shardId.getIndexName(),
|
||||
request.type(), request.id(), request.source(), request.getContentType())
|
||||
.routing(request.routing()).parent(request.parent());
|
||||
final VersionType versionType = request.versionType().versionTypeForReplicationAndRecovery();
|
||||
|
@ -578,7 +578,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
/** Utility method to prepare an index operation on primary shards */
|
||||
private static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
|
||||
final SourceToParse sourceToParse =
|
||||
SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(),
|
||||
SourceToParse.source(request.index(), request.type(),
|
||||
request.id(), request.source(), request.getContentType())
|
||||
.routing(request.routing()).parent(request.parent());
|
||||
return primary.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(),
|
||||
|
|
|
@ -1415,6 +1415,14 @@ public abstract class Engine implements Closeable {
|
|||
*/
|
||||
public abstract void deactivateThrottling();
|
||||
|
||||
/**
|
||||
* Fills up the local checkpoints history with no-ops until the local checkpoint
|
||||
* and the max seen sequence ID are identical.
|
||||
* @param primaryTerm the shards primary term this engine was created for
|
||||
* @return the number of no-ops added
|
||||
*/
|
||||
public abstract int fillSequenceNumberHistory(long primaryTerm) throws IOException;
|
||||
|
||||
/**
|
||||
* Performs recovery from the transaction log.
|
||||
* This operation will close the engine if the recovery fails.
|
||||
|
|
|
@ -225,6 +225,28 @@ public class InternalEngine extends Engine {
|
|||
logger.trace("created new InternalEngine");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int fillSequenceNumberHistory(long primaryTerm) throws IOException {
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
ensureOpen();
|
||||
final long localCheckpoint = seqNoService.getLocalCheckpoint();
|
||||
final long maxSeqId = seqNoService.getMaxSeqNo();
|
||||
int numNoOpsAdded = 0;
|
||||
for (long seqNo = localCheckpoint + 1; seqNo <= maxSeqId;
|
||||
// the local checkpoint might have been advanced so we are leap-frogging
|
||||
// to the next seq ID we need to process and create a noop for
|
||||
seqNo = seqNoService.getLocalCheckpoint()+1) {
|
||||
final NoOp noOp = new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling up seqNo history");
|
||||
innerNoOp(noOp);
|
||||
numNoOpsAdded++;
|
||||
assert seqNo <= seqNoService.getLocalCheckpoint() : "localCheckpoint didn't advanced used to be " + seqNo + " now it's on:"
|
||||
+ seqNoService.getLocalCheckpoint();
|
||||
|
||||
}
|
||||
return numNoOpsAdded;
|
||||
}
|
||||
}
|
||||
|
||||
private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) {
|
||||
long commitMaxUnsafeAutoIdTimestamp = Long.MIN_VALUE;
|
||||
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
|
||||
|
@ -1071,6 +1093,7 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
|
||||
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
|
||||
assert noOp.seqNo() > SequenceNumbersService.NO_OPS_PERFORMED;
|
||||
final long seqNo = noOp.seqNo();
|
||||
try {
|
||||
|
|
|
@ -23,22 +23,15 @@ import java.util.Objects;
|
|||
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
|
||||
public class SourceToParse {
|
||||
|
||||
public static SourceToParse source(String index, String type, String id, BytesReference source, XContentType contentType) {
|
||||
return source(Origin.PRIMARY, index, type, id, source, contentType);
|
||||
}
|
||||
|
||||
public static SourceToParse source(Origin origin, String index, String type, String id, BytesReference source,
|
||||
public static SourceToParse source(String index, String type, String id, BytesReference source,
|
||||
XContentType contentType) {
|
||||
return new SourceToParse(origin, index, type, id, source, contentType);
|
||||
return new SourceToParse(index, type, id, source, contentType);
|
||||
}
|
||||
|
||||
private final Origin origin;
|
||||
|
||||
private final BytesReference source;
|
||||
|
||||
private final String index;
|
||||
|
@ -53,8 +46,7 @@ public class SourceToParse {
|
|||
|
||||
private XContentType xContentType;
|
||||
|
||||
private SourceToParse(Origin origin, String index, String type, String id, BytesReference source, XContentType xContentType) {
|
||||
this.origin = Objects.requireNonNull(origin);
|
||||
private SourceToParse(String index, String type, String id, BytesReference source, XContentType xContentType) {
|
||||
this.index = Objects.requireNonNull(index);
|
||||
this.type = Objects.requireNonNull(type);
|
||||
this.id = Objects.requireNonNull(id);
|
||||
|
@ -64,10 +56,6 @@ public class SourceToParse {
|
|||
this.xContentType = Objects.requireNonNull(xContentType);
|
||||
}
|
||||
|
||||
public Origin origin() {
|
||||
return origin;
|
||||
}
|
||||
|
||||
public BytesReference source() {
|
||||
return this.source;
|
||||
}
|
||||
|
|
|
@ -364,6 +364,8 @@ final class StoreRecovery {
|
|||
logger.debug("failed to list file details", e);
|
||||
}
|
||||
indexShard.performTranslogRecovery(indexShouldExists);
|
||||
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
|
||||
indexShard.getEngine().fillSequenceNumberHistory(indexShard.getPrimaryTerm());
|
||||
}
|
||||
indexShard.finalizeRecovery();
|
||||
indexShard.postRecovery("post recovery from shard_store");
|
||||
|
|
|
@ -114,6 +114,7 @@ import org.elasticsearch.index.mapper.ParsedDocument;
|
|||
import org.elasticsearch.index.mapper.RootObjectMapper;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.SourceFieldMapper;
|
||||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
|
@ -191,7 +192,7 @@ import static org.hamcrest.Matchers.nullValue;
|
|||
|
||||
public class InternalEngineTests extends ESTestCase {
|
||||
|
||||
protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 1);
|
||||
protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 0);
|
||||
private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY);
|
||||
|
||||
protected ThreadPool threadPool;
|
||||
|
@ -1956,7 +1957,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
@Override
|
||||
public void append(LogEvent event) {
|
||||
final String formattedMessage = event.getMessage().getFormattedMessage();
|
||||
if (event.getLevel() == Level.TRACE && event.getMarker().getName().contains("[index][1] ")) {
|
||||
if (event.getLevel() == Level.TRACE && event.getMarker().getName().contains("[index][0] ")) {
|
||||
if (event.getLoggerName().endsWith(".IW") &&
|
||||
formattedMessage.contains("IW: apply all deletes during flush")) {
|
||||
sawIndexWriterMessage = true;
|
||||
|
@ -3836,4 +3837,79 @@ public class InternalEngineTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
|
||||
final int docs = randomIntBetween(1, 32);
|
||||
int numDocsOnReplica = 0;
|
||||
long maxSeqIDOnReplica = -1;
|
||||
long checkpointOnReplica;
|
||||
try {
|
||||
for (int i = 0; i < docs; i++) {
|
||||
final String docId = Integer.toString(i);
|
||||
final ParsedDocument doc =
|
||||
testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null);
|
||||
Engine.Index primaryResponse = indexForDoc(doc);
|
||||
Engine.IndexResult indexResult = engine.index(primaryResponse);
|
||||
if (randomBoolean()) {
|
||||
numDocsOnReplica++;
|
||||
maxSeqIDOnReplica = indexResult.getSeqNo();
|
||||
replicaEngine.index(replicaIndexForDoc(doc, 1, indexResult.getSeqNo(), false));
|
||||
}
|
||||
}
|
||||
checkpointOnReplica = replicaEngine.seqNoService().getLocalCheckpoint();
|
||||
} finally {
|
||||
IOUtils.close(replicaEngine);
|
||||
}
|
||||
|
||||
|
||||
boolean flushed = false;
|
||||
Engine recoveringEngine = null;
|
||||
try {
|
||||
assertEquals(docs-1, engine.seqNoService().getMaxSeqNo());
|
||||
assertEquals(docs-1, engine.seqNoService().getLocalCheckpoint());
|
||||
assertEquals(maxSeqIDOnReplica, replicaEngine.seqNoService().getMaxSeqNo());
|
||||
assertEquals(checkpointOnReplica, replicaEngine.seqNoService().getLocalCheckpoint());
|
||||
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
|
||||
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().totalOperations());
|
||||
recoveringEngine.recoverFromTranslog();
|
||||
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
|
||||
assertEquals(checkpointOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
|
||||
assertEquals((maxSeqIDOnReplica+1) - numDocsOnReplica, recoveringEngine.fillSequenceNumberHistory(2));
|
||||
|
||||
// now snapshot the tlog and ensure the primary term is updated
|
||||
Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot();
|
||||
assertTrue((maxSeqIDOnReplica+1) - numDocsOnReplica <= snapshot.totalOperations());
|
||||
Translog.Operation operation;
|
||||
while((operation = snapshot.next()) != null) {
|
||||
if (operation.opType() == Translog.Operation.Type.NO_OP) {
|
||||
assertEquals(2, operation.primaryTerm());
|
||||
} else {
|
||||
assertEquals(1, operation.primaryTerm());
|
||||
}
|
||||
|
||||
}
|
||||
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
|
||||
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
|
||||
if ((flushed = randomBoolean())) {
|
||||
recoveringEngine.flush(true, true);
|
||||
}
|
||||
} finally {
|
||||
IOUtils.close(recoveringEngine);
|
||||
}
|
||||
|
||||
// now do it again to make sure we preserve values etc.
|
||||
try {
|
||||
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
|
||||
if (flushed) {
|
||||
assertEquals(0, recoveringEngine.getTranslog().totalOperations());
|
||||
}
|
||||
recoveringEngine.recoverFromTranslog();
|
||||
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
|
||||
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
|
||||
assertEquals(0, recoveringEngine.fillSequenceNumberHistory(3));
|
||||
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
|
||||
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
|
||||
} finally {
|
||||
IOUtils.close(recoveringEngine);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
|
|||
import org.elasticsearch.action.admin.indices.stats.CommonStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
|
||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
|
@ -77,6 +78,7 @@ import org.elasticsearch.index.mapper.Mapping;
|
|||
import org.elasticsearch.index.mapper.ParseContext;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||
|
@ -896,6 +898,46 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShards(newShard);
|
||||
}
|
||||
|
||||
/* This test just verifies that we fill up local checkpoint up to max seen seqID on primary recovery */
|
||||
public void testRecoverFromStoreWithNoOps() throws IOException {
|
||||
final IndexShard shard = newStartedShard(true);
|
||||
indexDoc(shard, "test", "0");
|
||||
Engine.Index test = indexDoc(shard, "test", "1");
|
||||
// start a replica shard and index the second doc
|
||||
final IndexShard otherShard = newStartedShard(false);
|
||||
test = otherShard.prepareIndexOnReplica(
|
||||
SourceToParse.source(shard.shardId().getIndexName(), test.type(), test.id(), test.source(),
|
||||
XContentType.JSON),
|
||||
1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
|
||||
otherShard.index(test);
|
||||
|
||||
final ShardRouting primaryShardRouting = shard.routingEntry();
|
||||
IndexShard newShard = reinitShard(otherShard, ShardRoutingHelper.initWithSameId(primaryShardRouting,
|
||||
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE));
|
||||
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
|
||||
assertTrue(newShard.recoverFromStore());
|
||||
assertEquals(1, newShard.recoveryState().getTranslog().recoveredOperations());
|
||||
assertEquals(1, newShard.recoveryState().getTranslog().totalOperations());
|
||||
assertEquals(1, newShard.recoveryState().getTranslog().totalOperationsOnStart());
|
||||
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
|
||||
Translog.Snapshot snapshot = newShard.getTranslog().newSnapshot();
|
||||
Translog.Operation operation;
|
||||
int numNoops = 0;
|
||||
while((operation = snapshot.next()) != null) {
|
||||
if (operation.opType() == Translog.Operation.Type.NO_OP) {
|
||||
numNoops++;
|
||||
assertEquals(1, operation.primaryTerm());
|
||||
assertEquals(0, operation.seqNo());
|
||||
}
|
||||
}
|
||||
assertEquals(1, numNoops);
|
||||
newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
|
||||
assertDocCount(newShard, 1);
|
||||
assertDocCount(shard, 2);
|
||||
closeShards(newShard, shard);
|
||||
}
|
||||
|
||||
public void testRecoverFromCleanStore() throws IOException {
|
||||
final IndexShard shard = newStartedShard(true);
|
||||
indexDoc(shard, "test", "0");
|
||||
|
|
|
@ -44,7 +44,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
|
|||
long seqNo = 0;
|
||||
for (int i = 0; i < docs; i++) {
|
||||
Engine.Index indexOp = replica.prepareIndexOnReplica(
|
||||
SourceToParse.source(SourceToParse.Origin.REPLICA, index, "type", "doc_" + i, new BytesArray("{}"), XContentType.JSON),
|
||||
SourceToParse.source(index, "type", "doc_" + i, new BytesArray("{}"), XContentType.JSON),
|
||||
seqNo++, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
|
||||
replica.index(indexOp);
|
||||
if (rarely()) {
|
||||
|
|
|
@ -484,7 +484,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
final Engine.Index index;
|
||||
if (shard.routingEntry().primary()) {
|
||||
index = shard.prepareIndexOnPrimary(
|
||||
SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source),
|
||||
SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source),
|
||||
xContentType),
|
||||
Versions.MATCH_ANY,
|
||||
VersionType.INTERNAL,
|
||||
|
@ -492,7 +492,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
false);
|
||||
} else {
|
||||
index = shard.prepareIndexOnReplica(
|
||||
SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source),
|
||||
SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source),
|
||||
xContentType),
|
||||
randomInt(1 << 10), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue