Engine: store maxUnsafeAutoIdTimestamp in commit (#24149)

The `maxUnsafeAutoIdTimestamp` timestamp is a safety marker guaranteeing that no retried-indexing operation with a higher auto gen id timestamp was process by the engine. This allows us to safely process documents without checking if they were seen before.

Currently this property is maintained in memory and is handed off from the primary to any replica during the recovery process.

This commit takes a more natural approach and stores it in the lucene commit, using the same semantics (no retry op with a higher time stamp is part of this commit). This means that the knowledge is transferred during the file copy and also means that we don't need to worry about crazy situations where an original append only request arrives at the engine after a retry was processed *and* the engine was restarted.
This commit is contained in:
Boaz Leskes 2017-04-18 20:11:32 +02:00 committed by GitHub
parent ab9884b2e9
commit edff30f82a
2 changed files with 49 additions and 6 deletions

View File

@ -128,6 +128,7 @@ public class InternalEngine extends Engine {
private final AtomicInteger throttleRequestCount = new AtomicInteger();
private final EngineConfig.OpenMode openMode;
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
private static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
@ -178,6 +179,7 @@ public class InternalEngine extends Engine {
}
logger.trace("recovered [{}]", seqNoStats);
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
// norelease
/*
* We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means
@ -226,6 +228,17 @@ public class InternalEngine extends Engine {
logger.trace("created new InternalEngine");
}
private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) {
long commitMaxUnsafeAutoIdTimestamp = Long.MIN_VALUE;
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
if (entry.getKey().equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
commitMaxUnsafeAutoIdTimestamp = Long.parseLong(entry.getValue());
break;
}
}
maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp));
}
private static SequenceNumbersService sequenceNumberService(
final ShardId shardId,
final IndexSettings indexSettings,
@ -500,7 +513,7 @@ public class InternalEngine extends Engine {
return true;
case LOCAL_TRANSLOG_RECOVERY:
assert index.isRetry();
return false; // even if retry is set we never optimize local recovery
return true; // allow to optimize in order to update the max safe time stamp
default:
throw new IllegalArgumentException("unknown origin " + index.origin());
}
@ -1770,6 +1783,7 @@ public class InternalEngine extends Engine {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});

View File

@ -3223,11 +3223,40 @@ public class InternalEngineTests extends ESTestCase {
}
long maxTimestamp = Math.abs(randomLong());
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
maxTimestamp, null))) {
assertEquals(maxTimestamp, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
final long timestamp1 = Math.abs(randomLong());
final Path storeDir = createTempDir();
final Path translogDir = createTempDir();
try (Store store = createStore(newFSDirectory(storeDir));
Engine engine = new InternalEngine(
config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, timestamp1, null))) {
assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
}
final long timestamp2 = randomNonNegativeLong();
final long timestamp3 = randomNonNegativeLong();
final long maxTimestamp12 = Math.max(timestamp1, timestamp2);
final long maxTimestamp123 = Math.max(maxTimestamp12, timestamp3);
try (Store store = createStore(newFSDirectory(storeDir));
Engine engine = new InternalEngine(
copy(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, timestamp2, null),
randomFrom(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG)))) {
assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
if (engine.config().getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
// recover from translog and commit maxTimestamp12
engine.recoverFromTranslog();
// force flush as the were no ops performed
engine.flush(true, false);
}
final ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
engine.index(appendOnlyPrimary(doc, true, timestamp3));
assertEquals(maxTimestamp123, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
}
try (Store store = createStore(newFSDirectory(storeDir));
Engine engine = new InternalEngine(
config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
engine.recoverFromTranslog();
assertEquals(maxTimestamp123, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
}
}