mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-25 22:36:20 +00:00
Merge pull request #11193 from s1monw/remove_commit_without_translog
[ENGINE] Remove the ability to flush without flushing the translog
This commit is contained in:
commit
75b62e12a6
@ -682,15 +682,11 @@ public class InternalEngine extends Engine {
|
||||
|
||||
@Override
|
||||
public void flush() throws EngineException {
|
||||
flush(true, false, false);
|
||||
flush(false, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
|
||||
flush(true, force, waitIfOngoing);
|
||||
}
|
||||
|
||||
private void flush(boolean commitTranslog, boolean force, boolean waitIfOngoing) throws EngineException {
|
||||
ensureOpen();
|
||||
/*
|
||||
* Unfortunately the lock order is important here. We have to acquire the readlock first otherwise
|
||||
@ -714,37 +710,21 @@ public class InternalEngine extends Engine {
|
||||
logger.trace("acquired flush lock immediately");
|
||||
}
|
||||
try {
|
||||
if (commitTranslog) {
|
||||
if (flushNeeded || force) {
|
||||
flushNeeded = false;
|
||||
try {
|
||||
translog.prepareCommit();
|
||||
logger.trace("starting commit for flush; commitTranslog=true");
|
||||
commitIndexWriter(indexWriter, translog);
|
||||
logger.trace("finished commit for flush");
|
||||
translog.commit();
|
||||
// we need to refresh in order to clear older version values
|
||||
refresh("version_table_flush");
|
||||
|
||||
} catch (Throwable e) {
|
||||
throw new FlushFailedEngineException(shardId, e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// note, its ok to just commit without cleaning the translog, its perfectly fine to replay a
|
||||
// translog on an index that was opened on a committed point in time that is "in the future"
|
||||
// of that translog
|
||||
// we allow to *just* commit if there is an ongoing recovery happening...
|
||||
// its ok to use this, only a flush will cause a new translogFileGeneration, and we are locked here from
|
||||
// other flushes use flushLock
|
||||
if (flushNeeded || force) {
|
||||
flushNeeded = false;
|
||||
final long translogId;
|
||||
try {
|
||||
logger.trace("starting commit for flush; commitTranslog=false");
|
||||
translog.prepareCommit();
|
||||
logger.trace("starting commit for flush; commitTranslog=true");
|
||||
commitIndexWriter(indexWriter, translog);
|
||||
logger.trace("finished commit for flush");
|
||||
translog.commit();
|
||||
// we need to refresh in order to clear older version values
|
||||
refresh("version_table_flush");
|
||||
|
||||
} catch (Throwable e) {
|
||||
throw new FlushFailedEngineException(shardId, e);
|
||||
}
|
||||
|
||||
}
|
||||
/*
|
||||
* we have to inc-ref the store here since if the engine is closed by a tragic event
|
||||
@ -838,7 +818,7 @@ public class InternalEngine extends Engine {
|
||||
indexWriter.forceMerge(maxNumSegments, true /* blocks and waits for merges*/);
|
||||
}
|
||||
if (flush) {
|
||||
flush(true, true, true);
|
||||
flush(true, true);
|
||||
}
|
||||
if (upgrade) {
|
||||
logger.info("finished segment upgrade");
|
||||
@ -865,7 +845,7 @@ public class InternalEngine extends Engine {
|
||||
// the to a write lock when we fail the engine in this operation
|
||||
if (flushFirst) {
|
||||
logger.trace("start flush for snapshot");
|
||||
flush(false, false, true);
|
||||
flush(false, true);
|
||||
logger.trace("finish flush for snapshot");
|
||||
}
|
||||
try (ReleasableLock lock = readLock.acquire()) {
|
||||
|
@ -32,7 +32,7 @@ public class IndexStoreModule extends AbstractModule implements SpawnModules {
|
||||
|
||||
private final Settings settings;
|
||||
|
||||
public static enum Type {
|
||||
public enum Type {
|
||||
NIOFS {
|
||||
@Override
|
||||
public boolean match(String setting) {
|
||||
|
@ -40,6 +40,7 @@ import org.elasticsearch.monitor.fs.FsStats;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.engine.MockEngineSupport;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.store.MockFSIndexStoreModule;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TransportModule;
|
||||
import org.junit.Test;
|
||||
|
@ -1556,7 +1556,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
|
||||
SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-1").get().getSnapshots().get(0);
|
||||
List<SnapshotIndexShardStatus> shards = snapshotStatus.getShards();
|
||||
for (SnapshotIndexShardStatus status : shards) {
|
||||
assertThat(status.getStats().getProcessedFiles(), equalTo(1)); // we flush before the snapshot such that we have to process the segments_N files
|
||||
assertThat(status.getStats().getProcessedFiles(), equalTo(0));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -365,7 +365,7 @@ public final class InternalTestCluster extends TestCluster {
|
||||
.put("cluster.routing.schedule", (30 + random.nextInt(50)) + "ms")
|
||||
.put(SETTING_CLUSTER_NODE_SEED, seed);
|
||||
if (ENABLE_MOCK_MODULES && usually(random)) {
|
||||
builder.put(IndexStoreModule.STORE_TYPE, MockFSIndexStoreModule.class.getName()); // no RAM dir for now!
|
||||
builder.put(IndexStoreModule.STORE_TYPE, MockFSIndexStoreModule.class.getName());
|
||||
builder.put(IndexShardModule.ENGINE_FACTORY, MockEngineFactory.class);
|
||||
builder.put(PageCacheRecyclerModule.CACHE_IMPL, MockPageCacheRecyclerModule.class.getName());
|
||||
builder.put(BigArraysModule.IMPL, MockBigArraysModule.class.getName());
|
||||
|
@ -107,16 +107,7 @@ public class MockFSDirectoryService extends FsDirectoryService {
|
||||
public void beforeIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard,
|
||||
@IndexSettings Settings indexSettings) {
|
||||
if (indexShard != null && shardId.equals(sid)) {
|
||||
logger.info("{} shard state before potentially flushing is {}", indexShard.shardId(), indexShard.state());
|
||||
if (validCheckIndexStates.contains(indexShard.state()) && IndexMetaData.isOnSharedFilesystem(indexSettings) == false) {
|
||||
// When the the internal engine closes we do a rollback, which removes uncommitted segments
|
||||
// By doing a commit flush we perform a Lucene commit, but don't clear the translog,
|
||||
// so that even in tests where don't flush we can check the integrity of the Lucene index
|
||||
if (indexShard.engine().hasUncommittedChanges()) { // only if we have any changes
|
||||
logger.info("{} flushing in order to run checkindex", indexShard.shardId());
|
||||
Releasables.close(indexShard.engine().snapshotIndex(true)); // Keep translog for tests that rely on replaying it
|
||||
}
|
||||
logger.info("{} flush finished in beforeIndexShardClosed", indexShard.shardId());
|
||||
canRun = true;
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user