Add read-only Engine (#33563)

This change adds an engine implementation that opens a reader on an
existing index but doesn't permit any refreshes or modifications
to the index.

Relates to #32867
Relates to #32844
This commit is contained in:
Simon Willnauer 2018-09-11 14:05:14 +02:00 committed by GitHub
parent 2f3b542d57
commit 517cfc3cc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 533 additions and 8 deletions

View File

@ -661,7 +661,7 @@ public abstract class Engine implements Closeable {
}
/** get commits stats for the last commit */
public CommitStats commitStats() {
public final CommitStats commitStats() {
return new CommitStats(getLastCommittedSegmentInfos());
}
@ -951,7 +951,9 @@ public abstract class Engine implements Closeable {
*
* @return the commit Id for the resulting commit
*/
public abstract CommitId flush() throws EngineException;
public final CommitId flush() throws EngineException {
return flush(false, false);
}
/**

View File

@ -1576,11 +1576,6 @@ public class InternalEngine extends Engine {
|| localCheckpointTracker.getCheckpoint() == localCheckpointTracker.getMaxSeqNo();
}
@Override
public CommitId flush() throws EngineException {
return flush(false, false);
}
@Override
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
ensureOpen();

View File

@ -0,0 +1,372 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStats;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;
/**
* A basic read-only engine that allows switching a shard to be true read-only temporarily or permanently.
* Note: this engine can be opened side-by-side with a read-write engine but will not reflect any changes made to the read-write
* engine.
*
* @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function)
*/
public final class ReadOnlyEngine extends Engine {
private final SegmentInfos lastCommittedSegmentInfos;
private final SeqNoStats seqNoStats;
private final TranslogStats translogStats;
private final SearcherManager searcherManager;
private final IndexCommit indexCommit;
private final Lock indexWriterLock;
/**
* Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened
* read-write engine. It allows to optionally obtain the writer locks for the shard which would time-out if another
* engine is still open.
*
* @param config the engine configuration
* @param seqNoStats sequence number statistics for this engine or null if not provided
* @param translogStats translog stats for this engine or null if not provided
* @param obtainLock if <code>true</code> this engine will try to obtain the {@link IndexWriter#WRITE_LOCK_NAME} lock. Otherwise
* the lock won't be obtained
* @param readerWrapperFunction allows to wrap the index-reader for this engine.
*/
public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats translogStats, boolean obtainLock,
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) {
super(config);
try {
Store store = config.getStore();
store.incRef();
DirectoryReader reader = null;
Directory directory = store.directory();
Lock indexWriterLock = null;
boolean success = false;
try {
// we obtain the IW lock even though we never modify the index.
// yet this makes sure nobody else does. including some testing tools that try to be messy
indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null;
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats;
reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(directory), config.getShardId());
if (config.getIndexSettings().isSoftDeleteEnabled()) {
reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
}
reader = readerWrapperFunction.apply(reader);
this.indexCommit = reader.getIndexCommit();
this.searcherManager = new SearcherManager(reader,
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
this.indexWriterLock = indexWriterLock;
success = true;
} finally {
if (success == false) {
IOUtils.close(reader, indexWriterLock, store::decRef);
}
}
} catch (IOException e) {
throw new UncheckedIOException(e); // this is stupid
}
}
@Override
protected void closeNoLock(String reason, CountDownLatch closedLatch) {
if (isClosed.compareAndSet(false, true)) {
try {
IOUtils.close(searcherManager, indexWriterLock, store::decRef);
} catch (Exception ex) {
logger.warn("failed to close searcher", ex);
} finally {
closedLatch.countDown();
}
}
}
public static SeqNoStats buildSeqNoStats(SegmentInfos infos) {
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(infos.userData.entrySet());
long maxSeqNo = seqNoStats.maxSeqNo;
long localCheckpoint = seqNoStats.localCheckpoint;
return new SeqNoStats(maxSeqNo, localCheckpoint, localCheckpoint);
}
@Override
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL);
}
@Override
protected ReferenceManager<IndexSearcher> getReferenceManager(SearcherScope scope) {
return searcherManager;
}
@Override
protected SegmentInfos getLastCommittedSegmentInfos() {
return lastCommittedSegmentInfos;
}
@Override
public String getHistoryUUID() {
return lastCommittedSegmentInfos.userData.get(Engine.HISTORY_UUID_KEY);
}
@Override
public long getWritingBytes() {
return 0;
}
@Override
public long getIndexThrottleTimeInMillis() {
return 0;
}
@Override
public boolean isThrottled() {
return false;
}
@Override
public IndexResult index(Index index) {
assert false : "this should not be called";
throw new UnsupportedOperationException("indexing is not supported on a read-only engine");
}
@Override
public DeleteResult delete(Delete delete) {
assert false : "this should not be called";
throw new UnsupportedOperationException("deletes are not supported on a read-only engine");
}
@Override
public NoOpResult noOp(NoOp noOp) {
assert false : "this should not be called";
throw new UnsupportedOperationException("no-ops are not supported on a read-only engine");
}
@Override
public boolean isTranslogSyncNeeded() {
return false;
}
@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) {
return false;
}
@Override
public void syncTranslog() {
}
@Override
public Closeable acquireRetentionLockForPeerRecovery() {
return () -> {};
}
@Override
public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService, long fromSeqNo, long toSeqNo,
boolean requiredFullRange) throws IOException {
return readHistoryOperations(source, mapperService, fromSeqNo);
}
@Override
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
return new Translog.Snapshot() {
@Override
public void close() { }
@Override
public int totalOperations() {
return 0;
}
@Override
public Translog.Operation next() {
return null;
}
};
}
@Override
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
return 0;
}
@Override
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
return false;
}
@Override
public TranslogStats getTranslogStats() {
return translogStats;
}
@Override
public Translog.Location getTranslogLastWriteLocation() {
return new Translog.Location(0,0,0);
}
@Override
public long getLocalCheckpoint() {
return seqNoStats.getLocalCheckpoint();
}
@Override
public void waitForOpsToComplete(long seqNo) {
}
@Override
public void resetLocalCheckpoint(long newCheckpoint) {
}
@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint);
}
@Override
public long getLastSyncedGlobalCheckpoint() {
return seqNoStats.getGlobalCheckpoint();
}
@Override
public long getIndexBufferRAMBytesUsed() {
return 0;
}
@Override
public List<Segment> segments(boolean verbose) {
return Arrays.asList(getSegmentInfo(lastCommittedSegmentInfos, verbose));
}
@Override
public void refresh(String source) {
// we could allow refreshes if we want down the road the searcher manager will then reflect changes to a rw-engine
// opened side-by-side
}
@Override
public void writeIndexingBuffer() throws EngineException {
}
@Override
public boolean shouldPeriodicallyFlush() {
return false;
}
@Override
public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) {
// we can't do synced flushes this would require an indexWriter which we don't have
throw new UnsupportedOperationException("syncedFlush is not supported on a read-only engine");
}
@Override
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
return new CommitId(lastCommittedSegmentInfos.getId());
}
@Override
public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
boolean upgrade, boolean upgradeOnlyAncientSegments) {
}
@Override
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
store.incRef();
return new IndexCommitRef(indexCommit, store::decRef);
}
@Override
public IndexCommitRef acquireSafeIndexCommit() {
return acquireLastIndexCommit(false);
}
@Override
public void activateThrottling() {
}
@Override
public void deactivateThrottling() {
}
@Override
public void trimUnreferencedTranslogFiles() {
}
@Override
public boolean shouldRollTranslogGeneration() {
return false;
}
@Override
public void rollTranslogGeneration() {
}
@Override
public void restoreLocalCheckpointFromTranslog() {
}
@Override
public int fillSeqNoGaps(long primaryTerm) {
return 0;
}
@Override
public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) {
return this;
}
@Override
public void skipTranslogRecovery() {
}
@Override
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) {
}
@Override
public void maybePruneDeletes() {
}
}

View File

@ -5033,7 +5033,7 @@ public class InternalEngineTests extends EngineTestCase {
expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test"));
}
private static void trimUnsafeCommits(EngineConfig config) throws IOException {
static void trimUnsafeCommits(EngineConfig config) throws IOException {
final Store store = config.getStore();
final TranslogConfig translogConfig = config.getTranslogConfig();
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);

View File

@ -0,0 +1,156 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.Store;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import static org.hamcrest.Matchers.equalTo;
public class ReadOnlyEngineTests extends EngineTestCase {
public void testReadOnlyEngine() throws Exception {
IOUtils.close(engine, store);
Engine readOnlyEngine = null;
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
int numDocs = scaledRandomIntBetween(10, 1000);
final SeqNoStats lastSeqNoStats;
final Set<String> lastDocIds;
try (InternalEngine engine = createEngine(config)) {
Engine.Get get = null;
for (int i = 0; i < numDocs; i++) {
if (rarely()) {
continue; // gap in sequence number
}
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
System.nanoTime(), -1, false));
if (get == null || rarely()) {
get = newGet(randomBoolean(), doc);
}
if (rarely()) {
engine.flush();
}
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
}
engine.syncTranslog();
engine.flush();
readOnlyEngine = new ReadOnlyEngine(engine.engineConfig, engine.getSeqNoStats(globalCheckpoint.get()),
engine.getTranslogStats(), false, Function.identity());
lastSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get());
lastDocIds = getDocIds(engine, true);
assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint()));
assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo()));
assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds));
for (int i = 0; i < numDocs; i++) {
if (randomBoolean()) {
String delId = Integer.toString(i);
engine.delete(new Engine.Delete("test", delId, newUid(delId), primaryTerm.get()));
}
if (rarely()) {
engine.flush();
}
}
Engine.Searcher external = readOnlyEngine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL);
Engine.Searcher internal = readOnlyEngine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
assertSame(external.reader(), internal.reader());
IOUtils.close(external, internal);
// the locked down engine should still point to the previous commit
assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint()));
assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo()));
assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds));
try (Engine.GetResult getResult = readOnlyEngine.get(get, readOnlyEngine::acquireSearcher)) {
assertTrue(getResult.exists());
}
}
// Close and reopen the main engine
InternalEngineTests.trimUnsafeCommits(config);
try (InternalEngine recoveringEngine = new InternalEngine(config)) {
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
// the locked down engine should still point to the previous commit
assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint()));
assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo()));
assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds));
}
} finally {
IOUtils.close(readOnlyEngine);
}
}
public void testFlushes() throws IOException {
IOUtils.close(engine, store);
Engine readOnlyEngine = null;
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
int numDocs = scaledRandomIntBetween(10, 1000);
try (InternalEngine engine = createEngine(config)) {
for (int i = 0; i < numDocs; i++) {
if (rarely()) {
continue; // gap in sequence number
}
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
System.nanoTime(), -1, false));
if (rarely()) {
engine.flush();
}
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
}
engine.syncTranslog();
engine.flushAndClose();
readOnlyEngine = new ReadOnlyEngine(engine.engineConfig, null , null, true, Function.identity());
Engine.CommitId flush = readOnlyEngine.flush(randomBoolean(), randomBoolean());
assertEquals(flush, readOnlyEngine.flush(randomBoolean(), randomBoolean()));
} finally {
IOUtils.close(readOnlyEngine);
}
}
}
public void testReadOnly() throws IOException {
IOUtils.close(engine, store);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
store.createEmpty();
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
Class<? extends Throwable> expectedException = LuceneTestCase.TEST_ASSERTS_ENABLED ? AssertionError.class :
UnsupportedOperationException.class;
expectThrows(expectedException, () -> readOnlyEngine.index(null));
expectThrows(expectedException, () -> readOnlyEngine.delete(null));
expectThrows(expectedException, () -> readOnlyEngine.noOp(null));
expectThrows(UnsupportedOperationException.class, () -> readOnlyEngine.syncFlush(null, null));
}
}
}
}