Use explict flag if index should be created on engine creation

Today we try to detect if there is an index existing in the directory
and if not we create one. This can be tricky and errof prone since we
rely on the filesystem without taking the context into account when the
engine gets created. We know in all situations if the index should be created
so we can just use this infromation and rely on the lucene index writer to barf
if we hit a situations where we can't append to an index while we should.
This commit is contained in:
Simon Willnauer 2015-08-05 16:01:07 +02:00
parent abf763c1c5
commit 0b9729af5b
9 changed files with 43 additions and 18 deletions

View File

@ -135,7 +135,7 @@ public final class EngineConfig {
private static final String DEFAULT_CODEC_NAME = "default";
private TranslogConfig translogConfig;
private boolean create = false;
/**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
@ -433,4 +433,20 @@ public final class EngineConfig {
public TranslogConfig getTranslogConfig() {
return translogConfig;
}
/**
* Iff set to <code>true</code> the engine will create a new lucene index when opening the engine.
* Otherwise the lucene index writer is opened in append mode. The default is <code>false</code>
*/
public void setCreate(boolean create) {
this.create = create;
}
/**
* Iff <code>true</code> the engine should create a new lucene index when opening the engine.
* Otherwise the lucene index writer should be opened in append mode. The default is <code>false</code>
*/
public boolean isCreate() {
return create;
}
}

View File

@ -118,14 +118,11 @@ public class InternalEngine extends Engine {
for (int i = 0; i < dirtyLocks.length; i++) {
dirtyLocks[i] = new Object();
}
throttle = new IndexThrottle();
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
final Translog.TranslogGeneration translogGeneration;
try {
// TODO: would be better if ES could tell us "from above" whether this shard was already here, instead of using Lucene's API
// (which relies on IO ops, directory listing, and has had scary bugs in the past):
boolean create = !Lucene.indexExists(store.directory());
final boolean create = engineConfig.isCreate();
writer = createWriter(create);
indexWriter = writer;
translog = openTranslog(engineConfig, writer, create || skipInitialTranslogRecovery || engineConfig.forceNewTranslog());

View File

@ -828,14 +828,13 @@ public class IndexShard extends AbstractIndexShardComponent {
/**
* After the store has been recovered, we need to start the engine in order to apply operations
*/
public Map<String, Mapping> performTranslogRecovery() {
final Map<String, Mapping> recoveredTypes = internalPerformTranslogRecovery(false);
public Map<String, Mapping> performTranslogRecovery(boolean indexExists) {
final Map<String, Mapping> recoveredTypes = internalPerformTranslogRecovery(false, indexExists);
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
return recoveredTypes;
}
private Map<String, Mapping> internalPerformTranslogRecovery(boolean skipTranslogRecovery) {
private Map<String, Mapping> internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
@ -852,6 +851,7 @@ public class IndexShard extends AbstractIndexShardComponent {
// we disable deletes since we allow for operations to be executed against the shard while recovering
// but we need to make sure we don't loose deletes until we are done recovering
engineConfig.setEnableGcDeletes(false);
engineConfig.setCreate(indexExists == false);
createNewEngine(skipTranslogRecovery, engineConfig);
return engineConfig.getTranslogRecoveryPerformer().getRecoveredTypes();
}
@ -860,12 +860,10 @@ public class IndexShard extends AbstractIndexShardComponent {
* After the store has been recovered, we need to start the engine. This method starts a new engine but skips
* the replay of the transaction log which is required in cases where we restore a previous index or recover from
* a remote peer.
*
* @param wipeTranslogs if set to <code>true</code> all skipped / uncommitted translogs are removed.
*/
public void skipTranslogRecovery(boolean wipeTranslogs) throws IOException {
public void skipTranslogRecovery() throws IOException {
assert engineUnsafe() == null : "engine was already created";
Map<String, Mapping> recoveredTypes = internalPerformTranslogRecovery(true);
Map<String, Mapping> recoveredTypes = internalPerformTranslogRecovery(true, true);
assert recoveredTypes.isEmpty();
assert recoveryState.getTranslog().recoveredOperations() == 0;
}

View File

@ -104,6 +104,7 @@ public final class ShadowIndexShard extends IndexShard {
protected Engine newEngine(boolean skipInitialTranslogRecovery, EngineConfig config) {
assert this.shardRouting.primary() == false;
assert skipInitialTranslogRecovery : "can not recover from gateway";
config.setCreate(false); // hardcoded - we always expect an index to be present
return engineFactory.newReadOnlyEngine(config);
}

View File

@ -246,7 +246,7 @@ public class StoreRecoveryService extends AbstractIndexShardComponent implements
recoveryState.getTranslog().totalOperations(0);
recoveryState.getTranslog().totalOperationsOnStart(0);
}
typesToUpdate = indexShard.performTranslogRecovery();
typesToUpdate = indexShard.performTranslogRecovery(indexShouldExists);
indexShard.finalizeRecovery();
String indexName = indexShard.shardId().index().name();
@ -318,7 +318,7 @@ public class StoreRecoveryService extends AbstractIndexShardComponent implements
snapshotShardId = new ShardId(restoreSource.index(), shardId.id());
}
indexShardRepository.restore(restoreSource.snapshotId(), restoreSource.version(), shardId, snapshotShardId, recoveryState);
indexShard.skipTranslogRecovery(true);
indexShard.skipTranslogRecovery();
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), shardId);

View File

@ -204,7 +204,7 @@ public class TranslogRecoveryPerformer {
query = queryParserService.parseQuery(source).query();
} catch (QueryParsingException ex) {
// for BWC we try to parse directly the query since pre 1.0.0.Beta2 we didn't require a top level query field
if ( queryParserService.getIndexCreatedVersion().onOrBefore(Version.V_1_0_0_Beta2)) {
if (queryParserService.getIndexCreatedVersion().onOrBefore(Version.V_1_0_0_Beta2)) {
try {
XContentParser parser = XContentHelper.createParser(source);
ParsedQuery parse = queryParserService.parse(parser);

View File

@ -274,7 +274,7 @@ public class RecoveryTarget extends AbstractComponent {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final RecoveryStatus recoveryStatus = statusRef.status();
recoveryStatus.state().getTranslog().totalOperations(request.totalTranslogOps());
recoveryStatus.indexShard().skipTranslogRecovery(false);
recoveryStatus.indexShard().skipTranslogRecovery();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

View File

@ -39,6 +39,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.bwcompat.OldIndexBackwardsCompatibilityIT;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -256,7 +257,11 @@ public class InternalEngineTests extends ESTestCase {
// we don't need to notify anybody in this test
}
}, new TranslogHandler(shardId.index().getName()), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(new HashSet<>(Arrays.asList(wrappers))), translogConfig);
try {
config.setCreate(Lucene.indexExists(store.directory()) == false);
} catch (IOException e) {
throw new ElasticsearchException("can't find index?", e);
}
return config;
}
@ -775,6 +780,7 @@ public class InternalEngineTests extends ESTestCase {
// this so we have to disable the check explicitly
directory.setPreventDoubleWrite(false);
}
config.setCreate(false);
engine = new InternalEngine(config, false);
assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID));
}
@ -1869,6 +1875,7 @@ public class InternalEngineTests extends ESTestCase {
parser.mappingUpdate = dynamicUpdate();
engine.close();
engine.config().setCreate(false);
engine = new InternalEngine(engine.config(), false); // we need to reuse the engine config unless the parser.mappingModified won't work
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {

View File

@ -29,6 +29,7 @@ import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
@ -226,6 +227,11 @@ public class ShadowEngineTests extends ESTestCase {
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
// we don't need to notify anybody in this test
}}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(), translogConfig);
try {
config.setCreate(Lucene.indexExists(store.directory()) == false);
} catch (IOException e) {
throw new ElasticsearchException("can't find index?", e);
}
return config;
}