mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Only verify global checkpoint if translog sync occurred (#45980)
We only sync translog if the given offset hasn't synced yet. We can't verify the global checkpoint from the latest translog checkpoint unless a sync has occurred. Closes #46065 Relates #45634
This commit is contained in:
parent
028e792e1d
commit
bb49124690
@ -31,7 +31,9 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.InternalSettingsPlugin;
|
||||
@ -226,4 +228,35 @@ public class GlobalCheckpointSyncIT extends ESIntegTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testPersistGlobalCheckpoint() throws Exception {
|
||||
internalCluster().ensureAtLeastNumDataNodes(2);
|
||||
Settings.Builder indexSettings = Settings.builder()
|
||||
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), randomTimeValue(100, 1000, "ms"))
|
||||
.put("index.number_of_replicas", randomIntBetween(0, 1));
|
||||
if (randomBoolean()) {
|
||||
indexSettings.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)
|
||||
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), randomTimeValue(100, 1000, "ms"));
|
||||
}
|
||||
prepareCreate("test", indexSettings).get();
|
||||
if (randomBoolean()) {
|
||||
ensureGreen("test");
|
||||
}
|
||||
int numDocs = randomIntBetween(1, 20);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
client().prepareIndex("test", "test", Integer.toString(i)).setSource("{}", XContentType.JSON).get();
|
||||
}
|
||||
ensureGreen("test");
|
||||
assertBusy(() -> {
|
||||
for (IndicesService indicesService : internalCluster().getDataNodeInstances(IndicesService.class)) {
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard shard : indexService) {
|
||||
final SeqNoStats seqNoStats = shard.seqNoStats();
|
||||
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
|
||||
assertThat(shard.getLastKnownGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
|
||||
assertThat(shard.getLastSyncedGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -3309,21 +3309,25 @@ public class TranslogTests extends ESTestCase {
|
||||
}
|
||||
assertNotNull(location);
|
||||
long globalCheckpoint = lastGlobalCheckpoint.get();
|
||||
final boolean synced;
|
||||
if (randomBoolean()) {
|
||||
translog.ensureSynced(location);
|
||||
synced = translog.ensureSynced(location);
|
||||
} else {
|
||||
translog.sync();
|
||||
synced = true;
|
||||
}
|
||||
for (Translog.Operation op : ops) {
|
||||
assertThat("seq# " + op.seqNo() + " was not marked as persisted", persistedSeqNos, hasItem(op.seqNo()));
|
||||
}
|
||||
Checkpoint checkpoint = translog.getLastSyncedCheckpoint();
|
||||
assertThat(checkpoint.offset, greaterThanOrEqualTo(location.translogLocation));
|
||||
assertThat(checkpoint.globalCheckpoint, greaterThanOrEqualTo(globalCheckpoint));
|
||||
for (Translog.Operation op : ops) {
|
||||
assertThat(checkpoint.minSeqNo, lessThanOrEqualTo(op.seqNo()));
|
||||
assertThat(checkpoint.maxSeqNo, greaterThanOrEqualTo(op.seqNo()));
|
||||
}
|
||||
if (synced) {
|
||||
assertThat(checkpoint.globalCheckpoint, greaterThanOrEqualTo(globalCheckpoint));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user