diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index fe27aea805e..ea8161c1589 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -661,7 +661,7 @@ public abstract class Engine implements Closeable { } /** get commits stats for the last commit */ - public CommitStats commitStats() { + public final CommitStats commitStats() { return new CommitStats(getLastCommittedSegmentInfos()); } @@ -951,7 +951,9 @@ public abstract class Engine implements Closeable { * * @return the commit Id for the resulting commit */ - public abstract CommitId flush() throws EngineException; + public final CommitId flush() throws EngineException { + return flush(false, false); + } /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index d9b03777f1b..b2ab0d71c32 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1576,11 +1576,6 @@ public class InternalEngine extends Engine { || localCheckpointTracker.getCheckpoint() == localCheckpointTracker.getMaxSeqNo(); } - @Override - public CommitId flush() throws EngineException { - return flush(false, false); - } - @Override public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException { ensureOpen(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java new file mode 100644 index 00000000000..a55987d0a00 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -0,0 +1,372 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.SearcherManager; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.Lock; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogStats; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Stream; + +/** + * A basic read-only engine that allows switching a shard to be true read-only temporarily or permanently. + * Note: this engine can be opened side-by-side with a read-write engine but will not reflect any changes made to the read-write + * engine. + * + * @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function) + */ +public final class ReadOnlyEngine extends Engine { + + private final SegmentInfos lastCommittedSegmentInfos; + private final SeqNoStats seqNoStats; + private final TranslogStats translogStats; + private final SearcherManager searcherManager; + private final IndexCommit indexCommit; + private final Lock indexWriterLock; + + /** + * Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened + * read-write engine. It allows to optionally obtain the writer locks for the shard which would time-out if another + * engine is still open. + * + * @param config the engine configuration + * @param seqNoStats sequence number statistics for this engine or null if not provided + * @param translogStats translog stats for this engine or null if not provided + * @param obtainLock if true this engine will try to obtain the {@link IndexWriter#WRITE_LOCK_NAME} lock. Otherwise + * the lock won't be obtained + * @param readerWrapperFunction allows to wrap the index-reader for this engine. + */ + public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats translogStats, boolean obtainLock, + Function readerWrapperFunction) { + super(config); + try { + Store store = config.getStore(); + store.incRef(); + DirectoryReader reader = null; + Directory directory = store.directory(); + Lock indexWriterLock = null; + boolean success = false; + try { + // we obtain the IW lock even though we never modify the index. + // yet this makes sure nobody else does. including some testing tools that try to be messy + indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null; + this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory); + this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats; + this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats; + reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(directory), config.getShardId()); + if (config.getIndexSettings().isSoftDeleteEnabled()) { + reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD); + } + reader = readerWrapperFunction.apply(reader); + this.indexCommit = reader.getIndexCommit(); + this.searcherManager = new SearcherManager(reader, + new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService())); + this.indexWriterLock = indexWriterLock; + success = true; + } finally { + if (success == false) { + IOUtils.close(reader, indexWriterLock, store::decRef); + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); // this is stupid + } + } + + @Override + protected void closeNoLock(String reason, CountDownLatch closedLatch) { + if (isClosed.compareAndSet(false, true)) { + try { + IOUtils.close(searcherManager, indexWriterLock, store::decRef); + } catch (Exception ex) { + logger.warn("failed to close searcher", ex); + } finally { + closedLatch.countDown(); + } + } + } + + public static SeqNoStats buildSeqNoStats(SegmentInfos infos) { + final SequenceNumbers.CommitInfo seqNoStats = + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(infos.userData.entrySet()); + long maxSeqNo = seqNoStats.maxSeqNo; + long localCheckpoint = seqNoStats.localCheckpoint; + return new SeqNoStats(maxSeqNo, localCheckpoint, localCheckpoint); + } + + @Override + public GetResult get(Get get, BiFunction searcherFactory) throws EngineException { + return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL); + } + + @Override + protected ReferenceManager getReferenceManager(SearcherScope scope) { + return searcherManager; + } + + @Override + protected SegmentInfos getLastCommittedSegmentInfos() { + return lastCommittedSegmentInfos; + } + + @Override + public String getHistoryUUID() { + return lastCommittedSegmentInfos.userData.get(Engine.HISTORY_UUID_KEY); + } + + @Override + public long getWritingBytes() { + return 0; + } + + @Override + public long getIndexThrottleTimeInMillis() { + return 0; + } + + @Override + public boolean isThrottled() { + return false; + } + + @Override + public IndexResult index(Index index) { + assert false : "this should not be called"; + throw new UnsupportedOperationException("indexing is not supported on a read-only engine"); + } + + @Override + public DeleteResult delete(Delete delete) { + assert false : "this should not be called"; + throw new UnsupportedOperationException("deletes are not supported on a read-only engine"); + } + + @Override + public NoOpResult noOp(NoOp noOp) { + assert false : "this should not be called"; + throw new UnsupportedOperationException("no-ops are not supported on a read-only engine"); + } + + @Override + public boolean isTranslogSyncNeeded() { + return false; + } + + @Override + public boolean ensureTranslogSynced(Stream locations) { + return false; + } + + @Override + public void syncTranslog() { + } + + @Override + public Closeable acquireRetentionLockForPeerRecovery() { + return () -> {}; + } + + @Override + public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService, long fromSeqNo, long toSeqNo, + boolean requiredFullRange) throws IOException { + return readHistoryOperations(source, mapperService, fromSeqNo); + } + + @Override + public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { + return new Translog.Snapshot() { + @Override + public void close() { } + @Override + public int totalOperations() { + return 0; + } + @Override + public Translog.Operation next() { + return null; + } + }; + } + + @Override + public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { + return 0; + } + + @Override + public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException { + return false; + } + + @Override + public TranslogStats getTranslogStats() { + return translogStats; + } + + @Override + public Translog.Location getTranslogLastWriteLocation() { + return new Translog.Location(0,0,0); + } + + @Override + public long getLocalCheckpoint() { + return seqNoStats.getLocalCheckpoint(); + } + + @Override + public void waitForOpsToComplete(long seqNo) { + } + + @Override + public void resetLocalCheckpoint(long newCheckpoint) { + } + + @Override + public SeqNoStats getSeqNoStats(long globalCheckpoint) { + return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint); + } + + @Override + public long getLastSyncedGlobalCheckpoint() { + return seqNoStats.getGlobalCheckpoint(); + } + + @Override + public long getIndexBufferRAMBytesUsed() { + return 0; + } + + @Override + public List segments(boolean verbose) { + return Arrays.asList(getSegmentInfo(lastCommittedSegmentInfos, verbose)); + } + + @Override + public void refresh(String source) { + // we could allow refreshes if we want down the road the searcher manager will then reflect changes to a rw-engine + // opened side-by-side + } + + @Override + public void writeIndexingBuffer() throws EngineException { + } + + @Override + public boolean shouldPeriodicallyFlush() { + return false; + } + + @Override + public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) { + // we can't do synced flushes this would require an indexWriter which we don't have + throw new UnsupportedOperationException("syncedFlush is not supported on a read-only engine"); + } + + @Override + public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException { + return new CommitId(lastCommittedSegmentInfos.getId()); + } + + @Override + public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, + boolean upgrade, boolean upgradeOnlyAncientSegments) { + } + + @Override + public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { + store.incRef(); + return new IndexCommitRef(indexCommit, store::decRef); + } + + @Override + public IndexCommitRef acquireSafeIndexCommit() { + return acquireLastIndexCommit(false); + } + + @Override + public void activateThrottling() { + } + + @Override + public void deactivateThrottling() { + } + + @Override + public void trimUnreferencedTranslogFiles() { + } + + @Override + public boolean shouldRollTranslogGeneration() { + return false; + } + + @Override + public void rollTranslogGeneration() { + } + + @Override + public void restoreLocalCheckpointFromTranslog() { + } + + @Override + public int fillSeqNoGaps(long primaryTerm) { + return 0; + } + + @Override + public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) { + return this; + } + + @Override + public void skipTranslogRecovery() { + } + + @Override + public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) { + } + + @Override + public void maybePruneDeletes() { + } +} diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index a26fd72468b..39132d805b2 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5033,7 +5033,7 @@ public class InternalEngineTests extends EngineTestCase { expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test")); } - private static void trimUnsafeCommits(EngineConfig config) throws IOException { + static void trimUnsafeCommits(EngineConfig config) throws IOException { final Store store = config.getStore(); final TranslogConfig translogConfig = config.getTranslogConfig(); final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java new file mode 100644 index 00000000000..4a5b89351bd --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -0,0 +1,156 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.engine; + +import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.store.Store; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +import static org.hamcrest.Matchers.equalTo; + +public class ReadOnlyEngineTests extends EngineTestCase { + + public void testReadOnlyEngine() throws Exception { + IOUtils.close(engine, store); + Engine readOnlyEngine = null; + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + int numDocs = scaledRandomIntBetween(10, 1000); + final SeqNoStats lastSeqNoStats; + final Set lastDocIds; + try (InternalEngine engine = createEngine(config)) { + Engine.Get get = null; + for (int i = 0; i < numDocs; i++) { + if (rarely()) { + continue; // gap in sequence number + } + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA, + System.nanoTime(), -1, false)); + if (get == null || rarely()) { + get = newGet(randomBoolean(), doc); + } + if (rarely()) { + engine.flush(); + } + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + } + engine.syncTranslog(); + engine.flush(); + readOnlyEngine = new ReadOnlyEngine(engine.engineConfig, engine.getSeqNoStats(globalCheckpoint.get()), + engine.getTranslogStats(), false, Function.identity()); + lastSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); + lastDocIds = getDocIds(engine, true); + assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); + assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); + assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); + for (int i = 0; i < numDocs; i++) { + if (randomBoolean()) { + String delId = Integer.toString(i); + engine.delete(new Engine.Delete("test", delId, newUid(delId), primaryTerm.get())); + } + if (rarely()) { + engine.flush(); + } + } + Engine.Searcher external = readOnlyEngine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL); + Engine.Searcher internal = readOnlyEngine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + assertSame(external.reader(), internal.reader()); + IOUtils.close(external, internal); + // the locked down engine should still point to the previous commit + assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); + assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); + assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); + try (Engine.GetResult getResult = readOnlyEngine.get(get, readOnlyEngine::acquireSearcher)) { + assertTrue(getResult.exists()); + } + + } + // Close and reopen the main engine + InternalEngineTests.trimUnsafeCommits(config); + try (InternalEngine recoveringEngine = new InternalEngine(config)) { + recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + // the locked down engine should still point to the previous commit + assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); + assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); + assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); + } + } finally { + IOUtils.close(readOnlyEngine); + } + } + + public void testFlushes() throws IOException { + IOUtils.close(engine, store); + Engine readOnlyEngine = null; + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + int numDocs = scaledRandomIntBetween(10, 1000); + try (InternalEngine engine = createEngine(config)) { + for (int i = 0; i < numDocs; i++) { + if (rarely()) { + continue; // gap in sequence number + } + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA, + System.nanoTime(), -1, false)); + if (rarely()) { + engine.flush(); + } + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + } + engine.syncTranslog(); + engine.flushAndClose(); + readOnlyEngine = new ReadOnlyEngine(engine.engineConfig, null , null, true, Function.identity()); + Engine.CommitId flush = readOnlyEngine.flush(randomBoolean(), randomBoolean()); + assertEquals(flush, readOnlyEngine.flush(randomBoolean(), randomBoolean())); + } finally { + IOUtils.close(readOnlyEngine); + } + } + } + + public void testReadOnly() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + store.createEmpty(); + try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) { + Class expectedException = LuceneTestCase.TEST_ASSERTS_ENABLED ? AssertionError.class : + UnsupportedOperationException.class; + expectThrows(expectedException, () -> readOnlyEngine.index(null)); + expectThrows(expectedException, () -> readOnlyEngine.delete(null)); + expectThrows(expectedException, () -> readOnlyEngine.noOp(null)); + expectThrows(UnsupportedOperationException.class, () -> readOnlyEngine.syncFlush(null, null)); + } + } + } +}