Make EngineConfig less mutable and ensure we prevent flushes during translog recovery phase.

This commit is contained in:
Simon Willnauer 2016-03-31 12:00:51 +02:00
parent 1e06139584
commit d006200c77
6 changed files with 178 additions and 114 deletions

View File

@ -64,7 +64,6 @@ public final class EngineConfig {
private final Similarity similarity;
private final CodecService codecService;
private final Engine.EventListener eventListener;
private volatile boolean forceNewTranslog = false;
private final QueryCache queryCache;
private final QueryCachingPolicy queryCachingPolicy;
@ -90,17 +89,20 @@ public final class EngineConfig {
}, Property.IndexScope, Property.NodeScope);
private TranslogConfig translogConfig;
private boolean create = false;
private final OpenMode openMode;
/**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
public EngineConfig(ShardId shardId, ThreadPool threadPool,
public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
IndexSettings indexSettings, Engine.Warmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
MergePolicy mergePolicy,Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig, TimeValue flushMergesAfter) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
}
this.shardId = shardId;
this.indexSettings = indexSettings;
this.threadPool = threadPool;
@ -122,11 +124,7 @@ public final class EngineConfig {
this.queryCachingPolicy = queryCachingPolicy;
this.translogConfig = translogConfig;
this.flushMergesAfter = flushMergesAfter;
}
/** if true the engine will start even if the translog id in the commit point can not be found */
public boolean forceNewTranslog() {
return forceNewTranslog;
this.openMode = openMode;
}
/**
@ -277,22 +275,6 @@ public final class EngineConfig {
return translogConfig;
}
/**
* Iff set to <code>true</code> the engine will create a new lucene index when opening the engine.
* Otherwise the lucene index writer is opened in append mode. The default is <code>false</code>
*/
public void setCreate(boolean create) {
this.create = create;
}
/**
* Iff <code>true</code> the engine should create a new lucene index when opening the engine.
* Otherwise the lucene index writer should be opened in append mode. The default is <code>false</code>
*/
public boolean isCreate() {
return create;
}
/**
* Returns a {@link TimeValue} at what time interval after the last write modification to the engine finished merges
* should be automatically flushed. This is used to free up transient disk usage of potentially large segments that
@ -300,10 +282,25 @@ public final class EngineConfig {
*/
public TimeValue getFlushMergesAfter() { return flushMergesAfter; }
/** if set to true the engine will start even if the translog id in the commit point can not be found and a new transaction log
* will be created this should be used if recovery from translog should be skipped */
public void setForceNewTranslog(boolean forceNewTranslog) {
this.forceNewTranslog = forceNewTranslog;
/**
* Returns the {@link OpenMode} for this engine config.
*/
public OpenMode getOpenMode() {
return openMode;
}
/**
* Engine open mode defines how the engine should be opened or in other words what the engine should expect
* to recover from. We either create a brand new engine with a new index and translog or we recover from an existing index.
* If the index exists we also have the ability open only the index and create a new transaction log which happens
* during remote recovery since we have already transferred the index files but the translog is replayed from remote. The last
* and safest option opens the lucene index as well as it's referenced transaction log for a translog recovery.
* See also {@link Engine#recoverFromTranslog()}
*/
public enum OpenMode {
CREATE_INDEX_AND_TRANSLOG,
OPEN_INDEX_CREATE_TRANSLOG,
OPEN_INDEX_AND_TRANSLOG;
}
}

View File

@ -113,9 +113,12 @@ public class InternalEngine extends Engine {
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
// incoming indexing ops to a single thread:
private final AtomicInteger throttleRequestCount = new AtomicInteger();
private final EngineConfig.OpenMode openMode;
private final AtomicBoolean allowCommits = new AtomicBoolean(true);
public InternalEngine(EngineConfig engineConfig) throws EngineException {
super(engineConfig);
openMode = engineConfig.getOpenMode();
this.versionMap = new LiveVersionMap();
store.incRef();
IndexWriter writer = null;
@ -133,10 +136,9 @@ public class InternalEngine extends Engine {
throttle = new IndexThrottle();
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
try {
final boolean create = engineConfig.isCreate();
writer = createWriter(create);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
indexWriter = writer;
translog = openTranslog(engineConfig, writer, create || engineConfig.forceNewTranslog());
translog = openTranslog(engineConfig, writer);
assert translog.getGeneration() != null;
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
@ -155,13 +157,15 @@ public class InternalEngine extends Engine {
this.searcherManager = manager;
this.versionMap.setManager(searcherManager);
try {
if (engineConfig.forceNewTranslog()) {
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG) {
// make sure we point at the latest translog from now on..
commitIndexWriter(writer, translog, lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID));
}
} catch (IOException | EngineException ex) {
throw new EngineCreationFailureException(shardId, "failed to recover from translog", ex);
}
// don't allow commits unitl we are done with recovering
allowCommits.compareAndSet(true, openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
success = true;
} finally {
if (success == false) {
@ -178,23 +182,55 @@ public class InternalEngine extends Engine {
@Override
public InternalEngine recoverFromTranslog() throws IOException {
boolean success = false;
try {
recoverFromTranslog(engineConfig.getTranslogRecoveryPerformer());
success = true;
} finally {
if (success == false) {
close();
flushLock.lock();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
throw new IllegalStateException("Can't recover from translog with open mode: " + openMode);
}
if (allowCommits.get()) {
throw new IllegalStateException("Engine has already been recovered");
}
try {
recoverFromTranslog(engineConfig.getTranslogRecoveryPerformer());
} catch (Throwable t) {
allowCommits.set(false); // just play safe and never allow commits on this
failEngine("failed to recover from translog", t);
throw t;
}
} finally {
flushLock.unlock();
}
return this;
}
private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, boolean createNew) throws IOException {
private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
try {
Translog.Snapshot snapshot = translog.newSnapshot();
opsRecovered = handler.recoveryFromSnapshot(this, snapshot);
} catch (Throwable e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
assert allowCommits.get() == false : "commits are allowed but shouldn't";
allowCommits.set(true); // we are good - now we can commit
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration());
flush(true, true);
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
}
}
private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) throws IOException {
final Translog.TranslogGeneration generation = loadTranslogIdFromCommit(writer);
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
if (createNew == false) {
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
if (generation == null) {
throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist");
@ -230,28 +266,6 @@ public class InternalEngine extends Engine {
return translog;
}
private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
int opsRecovered = 0;
try {
Translog.Snapshot snapshot = translog.newSnapshot();
opsRecovered = handler.recoveryFromSnapshot(this, snapshot);
} catch (Throwable e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration());
flush(true, true);
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
}
}
/**
* Reads the current stored translog ID from the IW commit data. If the id is not found, recommits the current
* translog id into lucene and returns null.
@ -568,6 +582,7 @@ public class InternalEngine extends Engine {
}
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
ensureCanFlush();
if (indexWriter.hasUncommittedChanges()) {
logger.trace("can't sync commit [{}]. have pending changes", syncId);
return SyncedFlushResult.PENDING_OPERATIONS;
@ -591,6 +606,7 @@ public class InternalEngine extends Engine {
boolean renewed = false;
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
ensureCanFlush();
String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID);
if (syncId != null && translog.totalOperations() == 0 && indexWriter.hasUncommittedChanges()) {
logger.trace("start renewing sync commit [{}]", syncId);
@ -641,6 +657,7 @@ public class InternalEngine extends Engine {
}
try {
if (indexWriter.hasUncommittedChanges() || force) {
ensureCanFlush();
try {
translog.prepareCommit();
logger.trace("starting commit for flush; commitTranslog=true");
@ -1084,6 +1101,7 @@ public class InternalEngine extends Engine {
}
private void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
ensureCanFlush();
try {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
logger.trace("committing writer with translog id [{}] and sync id [{}] ", translogGeneration.translogFileGeneration, syncId);
@ -1101,6 +1119,16 @@ public class InternalEngine extends Engine {
}
}
private void ensureCanFlush() {
// translog recover happens after the engine is fully constructed
// if we are in this stage we have to prevent flushes from this
// engine otherwise we might loose documents if the flush succeeds
// and the translog recover fails we we "commit" the translog on flush.
if (allowCommits.get() == false) {
throw new FlushNotAllowedEngineException(shardId, "flushes are disabled - pending translog recovery");
}
}
private void commitIndexWriter(IndexWriter writer, Translog translog) throws IOException {
commitIndexWriter(writer, translog, null);
}

View File

@ -140,9 +140,10 @@ public class IndexShard extends AbstractIndexShardComponent {
private final Engine.Warmer warmer;
private final SnapshotDeletionPolicy deletionPolicy;
private final SimilarityService similarityService;
private final EngineConfig engineConfig;
private final TranslogConfig translogConfig;
private final IndexEventListener indexEventListener;
private final QueryCachingPolicy cachingPolicy;
/**
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
@ -229,7 +230,6 @@ public class IndexShard extends AbstractIndexShardComponent {
this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings,
bigArrays);
final QueryCachingPolicy cachingPolicy;
// the query cache is a node-level thing, however we want the most popular filters
// to be computed on a per-shard basis
if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) {
@ -237,7 +237,6 @@ public class IndexShard extends AbstractIndexShardComponent {
} else {
cachingPolicy = new UsageTrackingQueryCachingPolicy();
}
engineConfig = newEngineConfig(translogConfig, cachingPolicy, new IndexShardRecoveryPerformer(shardId, mapperService, logger));
suspendableRefContainer = new SuspendableRefContainer();
searcherWrapper = indexSearcherWrapper;
primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
@ -319,7 +318,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
public QueryCachingPolicy getQueryCachingPolicy() {
return this.engineConfig.getQueryCachingPolicy();
return cachingPolicy;
}
/**
@ -850,7 +849,8 @@ public class IndexShard extends AbstractIndexShardComponent {
// we still invoke any onShardInactive listeners ... we won't sync'd flush in this case because we only do that on primary and this
// is a replica
active.set(true);
return engineConfig.getTranslogRecoveryPerformer().performBatchRecovery(getEngine(), operations);
Engine engine = getEngine();
return engine.config().getTranslogRecoveryPerformer().performBatchRecovery(engine, operations);
}
/**
@ -881,19 +881,25 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
final EngineConfig.OpenMode openMode;
if (indexExists == false) {
openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG;
} else if (skipTranslogRecovery) {
openMode = EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG;
} else {
openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
}
final EngineConfig config = newEngineConfig(openMode, translogConfig, cachingPolicy,
new IndexShardRecoveryPerformer(shardId, mapperService, logger));
// we disable deletes since we allow for operations to be executed against the shard while recovering
// but we need to make sure we don't loose deletes until we are done recovering
engineConfig.setEnableGcDeletes(false);
engineConfig.setCreate(indexExists == false);
if (skipTranslogRecovery == false) {
config.setEnableGcDeletes(false);
Engine newEngine = createNewEngine(config);
verifyNotClosed();
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still give sync'd flush a chance to run:
active.set(true);
}
engineConfig.setForceNewTranslog(skipTranslogRecovery);
Engine newEngine = createNewEngine(engineConfig);
verifyNotClosed();
if (skipTranslogRecovery == false) {
newEngine.recoverFromTranslog();
}
@ -945,8 +951,9 @@ public class IndexShard extends AbstractIndexShardComponent {
*/
public void finalizeRecovery() {
recoveryState().setStage(RecoveryState.Stage.FINALIZE);
getEngine().refresh("recovery_finalization");
engineConfig.setEnableGcDeletes(true);
Engine engine = getEngine();
engine.refresh("recovery_finalization");
engine.config().setEnableGcDeletes(true);
}
/**
@ -1378,8 +1385,8 @@ public class IndexShard extends AbstractIndexShardComponent {
return mapperService.documentMapperWithAutoCreate(type);
}
private final EngineConfig newEngineConfig(TranslogConfig translogConfig, QueryCachingPolicy cachingPolicy, TranslogRecoveryPerformer translogRecoveryPerformer) {
return new EngineConfig(shardId,
private final EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, TranslogConfig translogConfig, QueryCachingPolicy cachingPolicy, TranslogRecoveryPerformer translogRecoveryPerformer) {
return new EngineConfig(openMode, shardId,
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
indexSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME));

View File

@ -29,7 +29,6 @@ import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.SearchSlowLog;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogStats;
@ -83,7 +82,7 @@ public final class ShadowIndexShard extends IndexShard {
@Override
protected Engine newEngine(EngineConfig config) {
assert this.shardRouting.primary() == false;
config.setCreate(false); // hardcoded - we always expect an index to be present
assert config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG;
return engineFactory.newReadOnlyEngine(config);
}

View File

@ -195,6 +195,13 @@ public class InternalEngineTests extends ESTestCase {
}
}
public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode) {
return new EngineConfig(openMode, config.getShardId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(),
config.getStore(), config.getDeletionPolicy(), config.getMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), config.getQueryCache(),
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter());
}
@Override
@After
public void tearDown() throws Exception {
@ -263,14 +270,27 @@ public class InternalEngineTests extends ESTestCase {
protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException {
EngineConfig config = config(indexSettings, store, translogPath, mergePolicy);
return new InternalEngine(config).recoverFromTranslog();
InternalEngine internalEngine = new InternalEngine(config);
if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
internalEngine.recoverFromTranslog();
}
return internalEngine;
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
EngineConfig config = new EngineConfig(shardId, threadPool, indexSettings
final EngineConfig.OpenMode openMode;
try {
if (Lucene.indexExists(store.directory()) == false) {
openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG;
} else {
openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
}
} catch (IOException e) {
throw new ElasticsearchException("can't find index?", e);
}
EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings
, null, store, createSnapshotDeletionPolicy(), mergePolicy,
iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), new Engine.EventListener() {
@Override
@ -278,11 +298,7 @@ public class InternalEngineTests extends ESTestCase {
// we don't need to notify anybody in this test
}
}, new TranslogHandler(shardId.getIndexName(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5));
try {
config.setCreate(Lucene.indexExists(store.directory()) == false);
} catch (IOException e) {
throw new ElasticsearchException("can't find index?", e);
}
return config;
}
@ -500,7 +516,7 @@ public class InternalEngineTests extends ESTestCase {
SegmentsStats stats = engine.segmentsStats(true);
assertThat(stats.getFileSizes().size(), greaterThan(0));
assertThat((Iterable<Long>) () -> stats.getFileSizes().valuesIt(), everyItem(greaterThan(0L)));
assertThat(() -> stats.getFileSizes().valuesIt(), everyItem(greaterThan(0L)));
ObjectObjectCursor<String, Long> firstEntry = stats.getFileSizes().iterator().next();
@ -555,13 +571,27 @@ public class InternalEngineTests extends ESTestCase {
InternalEngine engine = createEngine(store, translog);
engine.close();
engine = new InternalEngine(engine.config()).recoverFromTranslog();
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
engine.recoverFromTranslog();
Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test"));
assertThat(counter.get(), equalTo(2));
searcher.close();
IOUtils.close(store, engine);
}
public void testFlushIsDisabledDuringTranslogRecovery() throws IOException {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("1"), doc));
engine.close();
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
expectThrows(FlushNotAllowedEngineException.class, () -> engine.flush(true, true));
engine.recoverFromTranslog();
doc = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("2"), doc));
engine.flush();
}
public void testConcurrentGetAndFlush() throws Exception {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("1"), doc));
@ -793,7 +823,7 @@ public class InternalEngineTests extends ESTestCase {
public void testSyncedFlush() throws IOException {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogByteSizeMergePolicy())).recoverFromTranslog()) {
new LogByteSizeMergePolicy()))) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("1"), doc));
@ -820,7 +850,7 @@ public class InternalEngineTests extends ESTestCase {
for (int i = 0; i < iters; i++) {
try (Store store = createStore();
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogDocMergePolicy())).recoverFromTranslog()) {
new LogDocMergePolicy()))) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
Engine.Index doc1 = new Engine.Index(newUid("1"), doc);
@ -888,7 +918,8 @@ public class InternalEngineTests extends ESTestCase {
} else {
engine.flushAndClose();
}
engine = new InternalEngine(config);
engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
if (randomBoolean()) {
engine.recoverFromTranslog();
}
@ -914,8 +945,8 @@ public class InternalEngineTests extends ESTestCase {
// this so we have to disable the check explicitly
directory.setPreventDoubleWrite(false);
}
config.setCreate(false);
engine = new InternalEngine(config).recoverFromTranslog();
engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
engine.recoverFromTranslog();
assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID));
}
@ -1053,7 +1084,7 @@ public class InternalEngineTests extends ESTestCase {
public void testForceMerge() throws IOException {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogByteSizeMergePolicy())).recoverFromTranslog()) { // use log MP here we test some behavior in ESMP
new LogByteSizeMergePolicy()))) { // use log MP here we test some behavior in ESMP
int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), B_1, null);
@ -1491,7 +1522,7 @@ public class InternalEngineTests extends ESTestCase {
public void testEnableGcDeletes() throws Exception {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy())).recoverFromTranslog()) {
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy()))) {
engine.config().setEnableGcDeletes(false);
// Add document
@ -1627,8 +1658,7 @@ public class InternalEngineTests extends ESTestCase {
// expected
}
// now it should be OK.
EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy());
config.setForceNewTranslog(true);
EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy()), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG);
engine = new InternalEngine(config);
}
@ -1832,8 +1862,8 @@ public class InternalEngineTests extends ESTestCase {
parser.mappingUpdate = dynamicUpdate();
engine.close();
engine.config().setCreate(false);
engine = new InternalEngine(engine.config()).recoverFromTranslog(); // we need to reuse the engine config unless the parser.mappingModified won't work
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); // we need to reuse the engine config unless the parser.mappingModified won't work
engine.recoverFromTranslog();
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
@ -1962,13 +1992,13 @@ public class InternalEngineTests extends ESTestCase {
/* create a TranslogConfig that has been created with a different UUID */
TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), BigArrays.NON_RECYCLING_INSTANCE);
EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexSettings()
EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, threadPool, config.getIndexSettings()
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(),
config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener()
, config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5));
try {
new InternalEngine(brokenConfig).recoverFromTranslog();
InternalEngine internalEngine = new InternalEngine(brokenConfig);
fail("translog belongs to a different engine");
} catch (EngineCreationFailureException ex) {
}

View File

@ -45,7 +45,6 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.mapper.Mapping;
@ -53,7 +52,6 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.store.DirectoryService;
@ -218,25 +216,30 @@ public class ShadowEngineTests extends ESTestCase {
protected InternalEngine createInternalEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) {
EngineConfig config = config(indexSettings, store, translogPath, mergePolicy);
config.setForceNewTranslog(true);
return new InternalEngine(config);
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) {
IndexWriterConfig iwc = newIndexWriterConfig();
final EngineConfig.OpenMode openMode;
try {
if (Lucene.indexExists(store.directory()) == false) {
openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG;
} else {
openMode = EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG;
}
} catch (IOException e) {
throw new ElasticsearchException("can't find index?", e);
}
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
EngineConfig config = new EngineConfig(shardId, threadPool, indexSettings
EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings
, null, store, createSnapshotDeletionPolicy(), mergePolicy,
iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(null, logger), new Engine.EventListener() {
@Override
public void onFailedEngine(String reason, @Nullable Throwable t) {
// we don't need to notify anybody in this test
}}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5));
try {
config.setCreate(Lucene.indexExists(store.directory()) == false);
} catch (IOException e) {
throw new ElasticsearchException("can't find index?", e);
}
return config;
}