[ENGINE] Add engine lifecycle store reference to EngineHolder

This commit add the engines reference to the store out of the actual
implementation into the hodler since the holder manages the actual lifcycle.
Engine internal references like per searcher or per recovery are kept inside
the actual implemenation since the have a different lifecycle.
This commit is contained in:
Simon Willnauer 2014-12-08 14:06:18 +01:00
parent 31a77185a6
commit b28fc1afa5
6 changed files with 160 additions and 77 deletions

View File

@ -24,6 +24,7 @@ import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.search.*;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
@ -65,6 +66,7 @@ import org.elasticsearch.indices.warmer.IndicesWarmer;
import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
@ -122,6 +124,7 @@ public class InternalEngine implements Engine {
private volatile SearcherManager searcherManager;
private volatile boolean closed = false;
private volatile Closeable storeReference;
// flag indicating if a dirty operation has occurred since the last refresh
private volatile boolean dirty = false;
@ -198,9 +201,6 @@ public class InternalEngine implements Engine {
this.optimizeAutoGenerateId = optimizeAutoGenerateId;
this.failEngineOnCorruption = failEngineOnCorruption;
this.failedEngineListener = failedEngineListener;
// will be decremented in close()
store.incRef();
throttle = new IndexThrottle();
}
@ -236,81 +236,91 @@ public class InternalEngine implements Engine {
@Override
public void addFailedEngineListener(FailedEngineListener listener) {
throw new UnsupportedOperationException("addFailedEngineListener is not supported by InternalEngineImpl. Use InternalEngine.");
throw new UnsupportedOperationException("addFailedEngineListener is not supported by InternalEngine. Use InternalEngineHolder.");
}
@Override
public void start() throws EngineException {
store.incRef();
/*
* This might look weird but it's in-fact needed since if we close
* the engine due to a corruption on IW startup the reference is decremented in the close
* method and this must not happen more than once
*/
final Closeable storeRef = new Closeable() {
private final AtomicBoolean closed = new AtomicBoolean(false);
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
store.decRef();
}
}
};
final List<Closeable> closeOnFailure = new ArrayList<>(Arrays.asList(storeRef));
try (InternalLock _ = writeLock.acquire()) {
IndexWriter indexWriter = this.indexWriter;
if (indexWriter != null) {
throw new EngineAlreadyStartedException(shardId);
}
if (closed) {
throw new EngineClosedException(shardId);
}
storeReference = storeRef;
if (logger.isDebugEnabled()) {
logger.debug("starting engine");
}
try {
this.indexWriter = createWriter();
indexWriter = createWriter();
closeOnFailure.add(indexWriter);
} catch (IOException e) {
maybeFailEngine(e, "start");
if (this.indexWriter != null) {
try {
IndexWriter pending = indexWriter;
indexWriter = null;
pending.rollback();
} catch (IOException e1) {
e.addSuppressed(e1);
}
}
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
}
try {
// commit on a just opened writer will commit even if there are no changes done to it
// we rely on that for the commit data translog id key
translogIdGenerator.set(Math.max(0, translog.findLargestPresentTranslogId()));
translogIdGenerator.incrementAndGet();
final long translogId = Math.max(0, translog.findLargestPresentTranslogId()) + 1;
boolean mustCommitTranslogId = true;
if (Lucene.indexExists(store.directory())) {
final Map<String, String> commitUserData = Lucene.readSegmentInfos(store.directory()).getUserData();
mustCommitTranslogId = !commitUserData.containsKey(Translog.TRANSLOG_ID_KEY);
}
if (mustCommitTranslogId) { // translog id is not in the metadata - fix this inconsistency some code relies on this and old indices might not have it.
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogIdGenerator.get())));
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit();
}
translog.newTranslog(translogIdGenerator.get());
this.searcherManager = buildSearchManager(indexWriter);
translog.newTranslog(translogId);
final SearcherManager searcherManager = buildSearchManager(indexWriter);
closeOnFailure.add(searcherManager);
versionMap.setManager(searcherManager);
readLastCommittedSegmentsInfo();
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
this.searcherManager = searcherManager;
translogIdGenerator.set(translogId);
this.indexWriter = indexWriter;
closeOnFailure.clear(); // all is well
} catch (IOException e) {
maybeFailEngine(e, "start");
try {
if (indexWriter != null) {
indexWriter.rollback();
} catch (IOException e1) {
// ignore
} finally {
IOUtils.closeWhileHandlingException(indexWriter);
}
} catch (IOException e1) { // iw is closed below
e.addSuppressed(e1);
}
throw new EngineCreationFailureException(shardId, "failed to open reader on writer", e);
}
} finally {
store.decRef();
if (closeOnFailure.isEmpty() == false) { // release everything we created on a failure
IOUtils.closeWhileHandlingException(closeOnFailure);
}
}
}
@Override
public void stop() throws EngineException {
throw new UnsupportedOperationException("stop() is not supported by InternalEngineImpl. Use InternalEngine.");
}
private void readLastCommittedSegmentsInfo() throws IOException {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
throw new UnsupportedOperationException("stop() is not supported by InternalEngine. Use InternalEngineHolder.");
}
@Override
@ -969,7 +979,7 @@ public class InternalEngine implements Engine {
// reread the last committed segment infos
try (InternalLock _ = readLock.acquire()) {
ensureOpen();
readLastCommittedSegmentsInfo();
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
} catch (Throwable e) {
if (!closed) {
logger.warn("failed to read latest segment infos on flush", e);
@ -1333,8 +1343,8 @@ public class InternalEngine implements Engine {
} catch (Throwable e) {
logger.warn("failed to rollback writer on close", e);
} finally {
store.decRef();
indexWriter = null;
IOUtils.closeWhileHandlingException(storeReference);
}
}
}

View File

@ -144,7 +144,7 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements
this.settingsListener = new ApplySettings();
this.indexSettingsService.addListener(this.settingsListener);
store.incRef();
}
@Override
@ -193,10 +193,15 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements
if (currentEngine != null) {
throw new EngineAlreadyStartedException(shardId);
}
InternalEngine newEngine = createEngineImpl();
InternalEngine newEngine = createEngine();
store.incRef();
try {
newEngine.start();
boolean success = this.currentEngine.compareAndSet(null, newEngine);
assert success : "engine changes should be done under a synchronize";
} finally {
store.decRef();
}
}
@Override
@ -209,7 +214,11 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements
@Override
public synchronized void close() throws ElasticsearchException {
if (closed) {
return;
}
closed = true;
try {
InternalEngine currentEngine = this.currentEngine.getAndSet(null);
if (currentEngine != null) {
currentEngine.close();
@ -217,9 +226,12 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements
mergeScheduler.removeFailureListener(mergeSchedulerFailureListener);
mergeScheduler.removeListener(mergeSchedulerListener);
indexSettingsService.removeListener(settingsListener);
} finally {
store.decRef();
}
}
protected InternalEngine createEngineImpl() {
protected InternalEngine createEngine() {
return new InternalEngine(shardId, logger, codecService, threadPool, indexingService,
warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler, analysisService, similarityService,
enableGcDeletes, gcDeletesInMillis,
@ -331,6 +343,7 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements
// called by the current engine
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) {
try {
for (FailedEngineListener listener : failedEngineListeners) {
try {
listener.onFailedEngine(shardId, reason, failure);
@ -338,6 +351,9 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements
logger.warn("exception while notifying engine failure", e);
}
}
} finally {
close(); // we need to close ourself - we failed all bets are off
}
}
class ApplySettings implements IndexSettingsService.Listener {

View File

@ -603,6 +603,13 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
}
/**
* Returns the current reference count.
*/
public int refCount() {
return refCounter.refCount();
}
private static final class StoreDirectory extends FilterDirectory {
private final ESLogger deletesLogger;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.engine.internal;
import com.carrotsearch.randomizedtesting.annotations.Seed;
import com.google.common.base.Predicate;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
@ -33,8 +34,7 @@ import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -74,7 +74,7 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogSizeMatcher;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.test.ElasticsearchLuceneTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.MatcherAssert;
import org.junit.After;
@ -89,15 +89,20 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomBoolean;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomDouble;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomIntBetween;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.test.ElasticsearchTestCase.awaitBusy;
import static org.elasticsearch.test.ElasticsearchTestCase.terminate;
import static org.hamcrest.Matchers.*;
/**
*
*/
public class InternalEngineTests extends ElasticsearchTestCase {
public class InternalEngineTests extends ElasticsearchLuceneTestCase {
protected final ShardId shardId = new ShardId(new Index("index"), 1);
@ -119,14 +124,14 @@ public class InternalEngineTests extends ElasticsearchTestCase {
public void setUp() throws Exception {
super.setUp();
defaultSettings = ImmutableSettings.builder()
.put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, getRandom().nextBoolean())
.put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, randomBoolean())
.put(InternalEngineHolder.INDEX_GC_DELETES, "1h") // make sure this doesn't kick in on us
.put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, randomBoolean())
.build(); // TODO randomize more settings
threadPool = new ThreadPool(getClass().getName());
store = createStore();
store.deleteContent();
storeReplica = createStoreReplica();
storeReplica = createStore();
storeReplica.deleteContent();
engineSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
engine = createEngine(engineSettingsService, store, createTranslog());
@ -181,26 +186,14 @@ public class InternalEngineTests extends ElasticsearchTestCase {
}
protected Store createStore() throws IOException {
return createStore(newDirectory());
}
protected Store createStore(final Directory directory) throws IOException {
final DirectoryService directoryService = new DirectoryService(shardId, EMPTY_SETTINGS) {
@Override
public Directory[] build() throws IOException {
return new Directory[]{new RAMDirectory()};
}
@Override
public long throttleTimeInNanos() {
return 0;
}
};
return new Store(shardId, EMPTY_SETTINGS, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId));
}
protected Store createStoreReplica() throws IOException {
final DirectoryService directoryService = new DirectoryService(shardId, EMPTY_SETTINGS) {
@Override
public Directory[] build() throws IOException {
return new Directory[]{new RAMDirectory()};
return new Directory[]{ directory };
}
@Override
@ -1423,6 +1416,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
// Get should not find the document
getResult = engine.get(new Engine.Get(true, newUid("2")));
assertThat(getResult.exists(), equalTo(false));
engine.close();
}
protected Term newUid(String id) {
@ -1435,6 +1429,61 @@ public class InternalEngineTests extends ElasticsearchTestCase {
ShardId shardId = ShardUtils.extractShardId(test.reader());
assertNotNull(shardId);
assertEquals(shardId, engine.shardId());
};
}
}
/**
* Random test that throws random exception and ensures all references are
* counted down / released and resources are closed.
*/
@Test
public void testFailStart() throws IOException {
// this test fails if any reader, searcher or directory is not closed - MDW FTW
final int iters = scaledRandomIntBetween(10, 100);
for (int i = 0; i < iters; i++) {
MockDirectoryWrapper wrapper = newMockDirectory();
wrapper.setFailOnOpenInput(randomBoolean());
wrapper.setAllowRandomFileNotFoundException(randomBoolean());
wrapper.setRandomIOExceptionRate(randomDouble());
wrapper.setRandomIOExceptionRateOnOpen(randomDouble());
try (Store store = createStore(wrapper)) {
int refCount = store.refCount();
assertTrue("refCount: "+ store.refCount(), store.refCount() > 0);
Translog translog = createTranslog();
Settings build = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), build);
Engine holder = createEngine(indexSettingsService, store, translog);
indexSettingsService.refreshSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, true).build());
assertEquals(store.refCount(), refCount+1);
final int numStarts = scaledRandomIntBetween(1, 5);
for (int j = 0; j < numStarts; j++) {
try {
holder.start();
assertEquals(store.refCount(), refCount + 2);
break;
} catch (EngineCreationFailureException ex) {
// all is fine
if (ex.getCause() instanceof CorruptIndexException) {
assertEquals(store.refCount(), refCount);
try {
holder.start();
fail("Engine must have failed on corrupt index");
} catch (EngineClosedException e) {
// good!
}
break; // failed engine can't start again
}
assertEquals(store.refCount(), refCount + 1);
}
}
translog.close();
holder.close();
assertEquals(store.refCount(), refCount);
}
}
}
}

View File

@ -36,7 +36,8 @@ import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter;
ReproduceInfoPrinter.class
})
@ThreadLeakFilters(defaultFilters = true, filters = {ElasticsearchThreadFilter.class})
@ThreadLeakScope(Scope.NONE)
@ThreadLeakScope(Scope.SUITE)
@ThreadLeakLingering(linger = 5000) // 5 sec lingering
@TimeoutSuite(millis = TimeUnits.HOUR)
@SuppressCodecs("Lucene3x")
@LuceneTestCase.SuppressSysoutChecks(bugUrl = "we log a lot on purpose")

View File

@ -89,7 +89,7 @@ public final class MockInternalEngineHolder extends InternalEngineHolder impleme
}
@Override
protected InternalEngine createEngineImpl() {
protected InternalEngine createEngine() {
return new MockInternalEngine(mockContext, shardId, logger, codecService, threadPool, indexingService,
warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler, analysisService, similarityService,
enableGcDeletes, gcDeletesInMillis,