mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-22 12:56:53 +00:00
`ReadOnlyEngine` never recovers operations from translog and never updates translog information in the index shard's recovery state, even though the recovery state goes through the `TRANSLOG` stage during the recovery. It means that recovery information for frozen shards indicates an unkown number of recovered translog ops in the Recovery APIs (translog_ops: `-1` and translog_ops_percent: `-1.0%`) and this is confusing. This commit changes the `recoverFromTranslog()` method in `ReadOnlyEngine` so that it always recover from an empty translog snapshot, allowing the recovery state translog information to be correctly updated. Related to #33888
This commit is contained in:
parent
0f68814e04
commit
fc896e452c
@ -34,6 +34,7 @@ import org.elasticsearch.Assertions;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||
@ -287,18 +288,7 @@ public class ReadOnlyEngine extends Engine {
|
||||
|
||||
@Override
|
||||
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
|
||||
return new Translog.Snapshot() {
|
||||
@Override
|
||||
public void close() { }
|
||||
@Override
|
||||
public int totalOperations() {
|
||||
return 0;
|
||||
}
|
||||
@Override
|
||||
public Translog.Operation next() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
return newEmptySnapshot();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -429,7 +419,15 @@ public class ReadOnlyEngine extends Engine {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) {
|
||||
public Engine recoverFromTranslog(final TranslogRecoveryRunner translogRecoveryRunner, final long recoverUpToSeqNo) {
|
||||
try (ReleasableLock lock = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
try (Translog.Snapshot snapshot = newEmptySnapshot()) {
|
||||
translogRecoveryRunner.run(this, snapshot);
|
||||
} catch (final Exception e) {
|
||||
throw new EngineException(shardId, "failed to recover from empty translog snapshot", e);
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -468,4 +466,22 @@ public class ReadOnlyEngine extends Engine {
|
||||
public boolean refreshNeeded() {
|
||||
return false;
|
||||
}
|
||||
|
||||
private Translog.Snapshot newEmptySnapshot() {
|
||||
return new Translog.Snapshot() {
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int totalOperations() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Translog.Operation next() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -210,4 +210,36 @@ public class ReadOnlyEngineTests extends EngineTestCase {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testRecoverFromTranslogAppliesNoOperations() throws IOException {
|
||||
IOUtils.close(engine, store);
|
||||
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
try (Store store = createStore()) {
|
||||
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
|
||||
int numDocs = scaledRandomIntBetween(10, 1000);
|
||||
try (InternalEngine engine = createEngine(config)) {
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
if (rarely()) {
|
||||
continue; // gap in sequence number
|
||||
}
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
|
||||
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
|
||||
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
|
||||
if (rarely()) {
|
||||
engine.flush();
|
||||
}
|
||||
globalCheckpoint.set(i);
|
||||
}
|
||||
engine.syncTranslog();
|
||||
engine.flushAndClose();
|
||||
}
|
||||
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
|
||||
final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings());
|
||||
readOnlyEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
|
||||
readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong());
|
||||
|
||||
assertThat(translogHandler.appliedOperations(), equalTo(0L));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -9,12 +9,14 @@ import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
@ -27,6 +29,7 @@ import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardTestCase;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
@ -49,8 +52,10 @@ import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDI
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
public class FrozenIndexTests extends ESSingleNodeTestCase {
|
||||
|
||||
@ -372,4 +377,36 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
|
||||
assertAcked(new XPackClient(client()).freeze(new TransportFreezeIndexAction.FreezeRequest(indexName)));
|
||||
assertIndexFrozen(indexName);
|
||||
}
|
||||
|
||||
public void testRecoveryState() throws ExecutionException, InterruptedException {
|
||||
final String indexName = "index_recovery_state";
|
||||
createIndex(indexName, Settings.builder()
|
||||
.put("index.number_of_replicas", 0)
|
||||
.build());
|
||||
|
||||
final long nbDocs = randomIntBetween(0, 50);
|
||||
for (long i = 0; i < nbDocs; i++) {
|
||||
final IndexResponse indexResponse = client().prepareIndex(indexName, "_doc", Long.toString(i)).setSource("field", i).get();
|
||||
assertThat(indexResponse.status(), is(RestStatus.CREATED));
|
||||
}
|
||||
|
||||
assertAcked(new XPackClient(client()).freeze(new TransportFreezeIndexAction.FreezeRequest(indexName)));
|
||||
assertIndexFrozen(indexName);
|
||||
|
||||
final IndexMetaData indexMetaData = client().admin().cluster().prepareState().get().getState().metaData().index(indexName);
|
||||
final IndexService indexService = getInstanceFromNode(IndicesService.class).indexService(indexMetaData.getIndex());
|
||||
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
|
||||
final IndexShard indexShard = indexService.getShardOrNull(i);
|
||||
assertThat("Shard [" + i + "] is missing for index " + indexMetaData.getIndex(), indexShard, notNullValue());
|
||||
final RecoveryState recoveryState = indexShard.recoveryState();
|
||||
assertThat(recoveryState.getRecoverySource(), is(RecoverySource.ExistingStoreRecoverySource.INSTANCE));
|
||||
assertThat(recoveryState.getStage(), is(RecoveryState.Stage.DONE));
|
||||
assertThat(recoveryState.getTargetNode(), notNullValue());
|
||||
assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0));
|
||||
assertThat(recoveryState.getIndex().reusedFileCount(), greaterThan(0));
|
||||
assertThat(recoveryState.getTranslog().recoveredOperations(), equalTo(0));
|
||||
assertThat(recoveryState.getTranslog().totalOperations(), equalTo(0));
|
||||
assertThat(recoveryState.getTranslog().recoveredPercent(), equalTo(100.0f));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user