Remove isRecovering method from Engine (#47039)
We already prevent flushing in Engine if it's recovering. Hence, we can remove the protection in IndexShard.
This commit is contained in:
parent
c4a166fc9a
commit
22575bd7e6
|
@ -1874,13 +1874,6 @@ public abstract class Engine implements Closeable {
|
||||||
*/
|
*/
|
||||||
public abstract void skipTranslogRecovery();
|
public abstract void skipTranslogRecovery();
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns <code>true</code> iff this engine is currently recovering from translog.
|
|
||||||
*/
|
|
||||||
public boolean isRecovering() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tries to prune buffered deletes from the version map.
|
* Tries to prune buffered deletes from the version map.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -2461,7 +2461,7 @@ public class InternalEngine extends Engine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void ensureCanFlush() {
|
final void ensureCanFlush() {
|
||||||
// translog recover happens after the engine is fully constructed
|
// translog recover happens after the engine is fully constructed
|
||||||
// if we are in this stage we have to prevent flushes from this
|
// if we are in this stage we have to prevent flushes from this
|
||||||
// engine otherwise we might loose documents if the flush succeeds
|
// engine otherwise we might loose documents if the flush succeeds
|
||||||
|
@ -2659,11 +2659,6 @@ public class InternalEngine extends Engine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isRecovering() {
|
|
||||||
return pendingTranslogRecovery.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the commit data from {@link IndexWriter} as a map.
|
* Gets the commit data from {@link IndexWriter} as a map.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -1054,12 +1054,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expectedCommitId) {
|
public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expectedCommitId) {
|
||||||
verifyNotClosed();
|
verifyNotClosed();
|
||||||
logger.trace("trying to sync flush. sync id [{}]. expected commit id [{}]]", syncId, expectedCommitId);
|
logger.trace("trying to sync flush. sync id [{}]. expected commit id [{}]]", syncId, expectedCommitId);
|
||||||
Engine engine = getEngine();
|
return getEngine().syncFlush(syncId, expectedCommitId);
|
||||||
if (engine.isRecovering()) {
|
|
||||||
throw new IllegalIndexShardStateException(shardId(), state, "syncFlush is only allowed if the engine is not recovery" +
|
|
||||||
" from translog");
|
|
||||||
}
|
|
||||||
return engine.syncFlush(syncId, expectedCommitId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1078,15 +1073,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
* since we use Engine#writeIndexingBuffer for this now.
|
* since we use Engine#writeIndexingBuffer for this now.
|
||||||
*/
|
*/
|
||||||
verifyNotClosed();
|
verifyNotClosed();
|
||||||
final Engine engine = getEngine();
|
|
||||||
if (engine.isRecovering()) {
|
|
||||||
throw new IllegalIndexShardStateException(
|
|
||||||
shardId(),
|
|
||||||
state,
|
|
||||||
"flush is only allowed if the engine is not recovery from translog");
|
|
||||||
}
|
|
||||||
final long time = System.nanoTime();
|
final long time = System.nanoTime();
|
||||||
final Engine.CommitId commitId = engine.flush(force, waitIfOngoing);
|
final Engine.CommitId commitId = getEngine().flush(force, waitIfOngoing);
|
||||||
flushMetric.inc(System.nanoTime() - time);
|
flushMetric.inc(System.nanoTime() - time);
|
||||||
return commitId;
|
return commitId;
|
||||||
}
|
}
|
||||||
|
|
|
@ -729,16 +729,20 @@ public class InternalEngineTests extends EngineTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testFlushIsDisabledDuringTranslogRecovery() throws IOException {
|
public void testFlushIsDisabledDuringTranslogRecovery() throws IOException {
|
||||||
assertFalse(engine.isRecovering());
|
engine.ensureCanFlush(); // recovered already
|
||||||
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
|
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
|
||||||
engine.index(indexForDoc(doc));
|
engine.index(indexForDoc(doc));
|
||||||
engine.close();
|
engine.close();
|
||||||
|
|
||||||
engine = new InternalEngine(engine.config());
|
engine = new InternalEngine(engine.config());
|
||||||
|
expectThrows(IllegalStateException.class, engine::ensureCanFlush);
|
||||||
expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
|
expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
|
||||||
assertTrue(engine.isRecovering());
|
if (randomBoolean()) {
|
||||||
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
|
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
|
||||||
assertFalse(engine.isRecovering());
|
} else {
|
||||||
|
engine.skipTranslogRecovery();
|
||||||
|
}
|
||||||
|
engine.ensureCanFlush(); // ready
|
||||||
doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null);
|
doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null);
|
||||||
engine.index(indexForDoc(doc));
|
engine.index(indexForDoc(doc));
|
||||||
engine.flush();
|
engine.flush();
|
||||||
|
@ -2825,7 +2829,7 @@ public class InternalEngineTests extends EngineTestCase {
|
||||||
{
|
{
|
||||||
for (int i = 0; i < 2; i++) {
|
for (int i = 0; i < 2; i++) {
|
||||||
try (InternalEngine engine = new InternalEngine(config)) {
|
try (InternalEngine engine = new InternalEngine(config)) {
|
||||||
assertTrue(engine.isRecovering());
|
expectThrows(IllegalStateException.class, engine::ensureCanFlush);
|
||||||
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
|
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
|
||||||
if (i == 0) {
|
if (i == 0) {
|
||||||
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
|
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
|
||||||
|
|
Loading…
Reference in New Issue