Merge pull request #17422 from s1monw/recovery_mem_buffer_access

Move translog recover outside of the engine

We changed the way we manage engine memory buffers to an
open model where each shard can essentially has infinite memory.
The indexing memory controller is responsible for moving memory to disk
when it's needed. Yet, this doesn't work today when we recover from store/translog
since the engine is not fully initialized such that IMC has no access to the engine,
neither to it's memory buffer nor can it move data to disk.

The biggest issue here is that translog recovery happends inside the Engine constructor
which is problematic by itself since it might take minutes and uses a not yet fully
initialzied engine to perform write operations on.

This change detaches the translog recovery and makes it the responsibility of the caller
to run it once the engine is fully constructed or skip it if not necessary.
This commit is contained in:
Simon Willnauer 2016-03-31 21:03:00 +02:00
commit baa2d51e59
19 changed files with 502 additions and 206 deletions

View File

@ -65,6 +65,7 @@ import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
@ -1149,7 +1150,7 @@ public abstract class Engine implements Closeable {
} }
/** /**
* Request that this engine throttle incoming indexing requests to one thread. Must be matched by a later call to {@link deactivateThrottling}. * Request that this engine throttle incoming indexing requests to one thread. Must be matched by a later call to {@link #deactivateThrottling()}.
*/ */
public abstract void activateThrottling(); public abstract void activateThrottling();
@ -1157,4 +1158,10 @@ public abstract class Engine implements Closeable {
* Reverses a previous {@link #activateThrottling} call. * Reverses a previous {@link #activateThrottling} call.
*/ */
public abstract void deactivateThrottling(); public abstract void deactivateThrottling();
/**
* Performs recovery from the transaction log.
* This operation will close the engine if the recovery fails.
*/
public abstract Engine recoverFromTranslog() throws IOException;
} }

View File

@ -64,7 +64,6 @@ public final class EngineConfig {
private final Similarity similarity; private final Similarity similarity;
private final CodecService codecService; private final CodecService codecService;
private final Engine.EventListener eventListener; private final Engine.EventListener eventListener;
private final boolean forceNewTranslog;
private final QueryCache queryCache; private final QueryCache queryCache;
private final QueryCachingPolicy queryCachingPolicy; private final QueryCachingPolicy queryCachingPolicy;
@ -89,23 +88,22 @@ public final class EngineConfig {
} }
}, Property.IndexScope, Property.NodeScope); }, Property.IndexScope, Property.NodeScope);
/** if set to true the engine will start even if the translog id in the commit point can not be found */
public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog";
private TranslogConfig translogConfig; private TranslogConfig translogConfig;
private boolean create = false; private final OpenMode openMode;
/** /**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig} * Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/ */
public EngineConfig(ShardId shardId, ThreadPool threadPool, public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
IndexSettings indexSettings, Engine.Warmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, IndexSettings indexSettings, Engine.Warmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
MergePolicy mergePolicy,Analyzer analyzer, MergePolicy mergePolicy,Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener, Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig, TimeValue flushMergesAfter) { TranslogConfig translogConfig, TimeValue flushMergesAfter) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
}
this.shardId = shardId; this.shardId = shardId;
final Settings settings = indexSettings.getSettings();
this.indexSettings = indexSettings; this.indexSettings = indexSettings;
this.threadPool = threadPool; this.threadPool = threadPool;
this.warmer = warmer == null ? (a) -> {} : warmer; this.warmer = warmer == null ? (a) -> {} : warmer;
@ -122,16 +120,11 @@ public final class EngineConfig {
// and refreshes the most heap-consuming shards when total indexing heap usage across all shards is too high: // and refreshes the most heap-consuming shards when total indexing heap usage across all shards is too high:
indexingBufferSize = new ByteSizeValue(256, ByteSizeUnit.MB); indexingBufferSize = new ByteSizeValue(256, ByteSizeUnit.MB);
this.translogRecoveryPerformer = translogRecoveryPerformer; this.translogRecoveryPerformer = translogRecoveryPerformer;
this.forceNewTranslog = settings.getAsBoolean(INDEX_FORCE_NEW_TRANSLOG, false);
this.queryCache = queryCache; this.queryCache = queryCache;
this.queryCachingPolicy = queryCachingPolicy; this.queryCachingPolicy = queryCachingPolicy;
this.translogConfig = translogConfig; this.translogConfig = translogConfig;
this.flushMergesAfter = flushMergesAfter; this.flushMergesAfter = flushMergesAfter;
} this.openMode = openMode;
/** if true the engine will start even if the translog id in the commit point can not be found */
public boolean forceNewTranslog() {
return forceNewTranslog;
} }
/** /**
@ -282,22 +275,6 @@ public final class EngineConfig {
return translogConfig; return translogConfig;
} }
/**
* Iff set to <code>true</code> the engine will create a new lucene index when opening the engine.
* Otherwise the lucene index writer is opened in append mode. The default is <code>false</code>
*/
public void setCreate(boolean create) {
this.create = create;
}
/**
* Iff <code>true</code> the engine should create a new lucene index when opening the engine.
* Otherwise the lucene index writer should be opened in append mode. The default is <code>false</code>
*/
public boolean isCreate() {
return create;
}
/** /**
* Returns a {@link TimeValue} at what time interval after the last write modification to the engine finished merges * Returns a {@link TimeValue} at what time interval after the last write modification to the engine finished merges
* should be automatically flushed. This is used to free up transient disk usage of potentially large segments that * should be automatically flushed. This is used to free up transient disk usage of potentially large segments that
@ -305,4 +282,25 @@ public final class EngineConfig {
*/ */
public TimeValue getFlushMergesAfter() { return flushMergesAfter; } public TimeValue getFlushMergesAfter() { return flushMergesAfter; }
/**
* Returns the {@link OpenMode} for this engine config.
*/
public OpenMode getOpenMode() {
return openMode;
}
/**
* Engine open mode defines how the engine should be opened or in other words what the engine should expect
* to recover from. We either create a brand new engine with a new index and translog or we recover from an existing index.
* If the index exists we also have the ability open only the index and create a new transaction log which happens
* during remote recovery since we have already transferred the index files but the translog is replayed from remote. The last
* and safest option opens the lucene index as well as it's referenced transaction log for a translog recovery.
* See also {@link Engine#recoverFromTranslog()}
*/
public enum OpenMode {
CREATE_INDEX_AND_TRANSLOG,
OPEN_INDEX_CREATE_TRANSLOG,
OPEN_INDEX_AND_TRANSLOG;
}
} }

View File

@ -23,7 +23,7 @@ package org.elasticsearch.index.engine;
*/ */
public interface EngineFactory { public interface EngineFactory {
public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery); Engine newReadWriteEngine(EngineConfig config);
public Engine newReadOnlyEngine(EngineConfig config); Engine newReadOnlyEngine(EngineConfig config);
} }

View File

@ -113,9 +113,12 @@ public class InternalEngine extends Engine {
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
// incoming indexing ops to a single thread: // incoming indexing ops to a single thread:
private final AtomicInteger throttleRequestCount = new AtomicInteger(); private final AtomicInteger throttleRequestCount = new AtomicInteger();
private final EngineConfig.OpenMode openMode;
private final AtomicBoolean allowCommits = new AtomicBoolean(true);
public InternalEngine(EngineConfig engineConfig, boolean skipInitialTranslogRecovery) throws EngineException { public InternalEngine(EngineConfig engineConfig) throws EngineException {
super(engineConfig); super(engineConfig);
openMode = engineConfig.getOpenMode();
this.versionMap = new LiveVersionMap(); this.versionMap = new LiveVersionMap();
store.incRef(); store.incRef();
IndexWriter writer = null; IndexWriter writer = null;
@ -132,14 +135,11 @@ public class InternalEngine extends Engine {
} }
throttle = new IndexThrottle(); throttle = new IndexThrottle();
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig); this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
final Translog.TranslogGeneration translogGeneration;
try { try {
final boolean create = engineConfig.isCreate(); writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
writer = createWriter(create);
indexWriter = writer; indexWriter = writer;
translog = openTranslog(engineConfig, writer, create || skipInitialTranslogRecovery || engineConfig.forceNewTranslog()); translog = openTranslog(engineConfig, writer);
translogGeneration = translog.getGeneration(); assert translog.getGeneration() != null;
assert translogGeneration != null;
} catch (IOException | TranslogCorruptedException e) { } catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e); throw new EngineCreationFailureException(shardId, "failed to create engine", e);
} catch (AssertionError e) { } catch (AssertionError e) {
@ -151,20 +151,13 @@ public class InternalEngine extends Engine {
throw e; throw e;
} }
} }
this.translog = translog; this.translog = translog;
manager = createSearcherManager(); manager = createSearcherManager();
this.searcherManager = manager; this.searcherManager = manager;
this.versionMap.setManager(searcherManager); this.versionMap.setManager(searcherManager);
try { // don't allow commits until we are done with recovering
if (skipInitialTranslogRecovery) { allowCommits.compareAndSet(true, openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
// make sure we point at the latest translog from now on..
commitIndexWriter(writer, translog, lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID));
} else {
recoverFromTranslog(engineConfig, translogGeneration);
}
} catch (IOException | EngineException ex) {
throw new EngineCreationFailureException(shardId, "failed to recover from translog", ex);
}
success = true; success = true;
} finally { } finally {
if (success == false) { if (success == false) {
@ -179,11 +172,57 @@ public class InternalEngine extends Engine {
logger.trace("created new InternalEngine"); logger.trace("created new InternalEngine");
} }
private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, boolean createNew) throws IOException { @Override
final Translog.TranslogGeneration generation = loadTranslogIdFromCommit(writer); public InternalEngine recoverFromTranslog() throws IOException {
final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); flushLock.lock();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
throw new IllegalStateException("Can't recover from translog with open mode: " + openMode);
}
if (allowCommits.get()) {
throw new IllegalStateException("Engine has already been recovered");
}
try {
recoverFromTranslog(engineConfig.getTranslogRecoveryPerformer());
} catch (Throwable t) {
allowCommits.set(false); // just play safe and never allow commits on this
failEngine("failed to recover from translog", t);
throw t;
}
} finally {
flushLock.unlock();
}
return this;
}
if (createNew == false) { private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
try {
Translog.Snapshot snapshot = translog.newSnapshot();
opsRecovered = handler.recoveryFromSnapshot(this, snapshot);
} catch (Throwable e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
assert allowCommits.get() == false : "commits are allowed but shouldn't";
allowCommits.set(true); // we are good - now we can commit
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration());
flush(true, true);
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
}
}
private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) throws IOException {
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
translogConfig.setTranslogGeneration(null);
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
final Translog.TranslogGeneration generation = loadTranslogIdFromCommit(writer);
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong! // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
if (generation == null) { if (generation == null) {
throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist"); throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist");
@ -194,7 +233,10 @@ public class InternalEngine extends Engine {
} }
} }
final Translog translog = new Translog(translogConfig); final Translog translog = new Translog(translogConfig);
final Translog.TranslogGeneration generation = translogConfig.getTranslogGeneration();
if (generation == null || generation.translogUUID == null) { if (generation == null || generation.translogUUID == null) {
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be "
+ EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
if (generation == null) { if (generation == null) {
logger.debug("no translog ID present in the current generation - creating one"); logger.debug("no translog ID present in the current generation - creating one");
} else if (generation.translogUUID == null) { } else if (generation.translogUUID == null) {
@ -202,7 +244,8 @@ public class InternalEngine extends Engine {
} }
boolean success = false; boolean success = false;
try { try {
commitIndexWriter(writer, translog); commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
? writer.getCommitData().get(SYNC_COMMIT_ID) : null);
success = true; success = true;
} finally { } finally {
if (success == false) { if (success == false) {
@ -219,27 +262,6 @@ public class InternalEngine extends Engine {
return translog; return translog;
} }
protected void recoverFromTranslog(EngineConfig engineConfig, Translog.TranslogGeneration translogGeneration) throws IOException {
int opsRecovered = 0;
final TranslogRecoveryPerformer handler = engineConfig.getTranslogRecoveryPerformer();
try {
Translog.Snapshot snapshot = translog.newSnapshot();
opsRecovered = handler.recoveryFromSnapshot(this, snapshot);
} catch (Throwable e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration());
flush(true, true);
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
}
}
/** /**
* Reads the current stored translog ID from the IW commit data. If the id is not found, recommits the current * Reads the current stored translog ID from the IW commit data. If the id is not found, recommits the current
* translog id into lucene and returns null. * translog id into lucene and returns null.
@ -556,6 +578,7 @@ public class InternalEngine extends Engine {
} }
try (ReleasableLock lock = writeLock.acquire()) { try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen(); ensureOpen();
ensureCanFlush();
if (indexWriter.hasUncommittedChanges()) { if (indexWriter.hasUncommittedChanges()) {
logger.trace("can't sync commit [{}]. have pending changes", syncId); logger.trace("can't sync commit [{}]. have pending changes", syncId);
return SyncedFlushResult.PENDING_OPERATIONS; return SyncedFlushResult.PENDING_OPERATIONS;
@ -579,6 +602,7 @@ public class InternalEngine extends Engine {
boolean renewed = false; boolean renewed = false;
try (ReleasableLock lock = writeLock.acquire()) { try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen(); ensureOpen();
ensureCanFlush();
String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID); String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID);
if (syncId != null && translog.totalOperations() == 0 && indexWriter.hasUncommittedChanges()) { if (syncId != null && translog.totalOperations() == 0 && indexWriter.hasUncommittedChanges()) {
logger.trace("start renewing sync commit [{}]", syncId); logger.trace("start renewing sync commit [{}]", syncId);
@ -629,10 +653,11 @@ public class InternalEngine extends Engine {
} }
try { try {
if (indexWriter.hasUncommittedChanges() || force) { if (indexWriter.hasUncommittedChanges() || force) {
ensureCanFlush();
try { try {
translog.prepareCommit(); translog.prepareCommit();
logger.trace("starting commit for flush; commitTranslog=true"); logger.trace("starting commit for flush; commitTranslog=true");
commitIndexWriter(indexWriter, translog); commitIndexWriter(indexWriter, translog, null);
logger.trace("finished commit for flush"); logger.trace("finished commit for flush");
// we need to refresh in order to clear older version values // we need to refresh in order to clear older version values
refresh("version_table_flush"); refresh("version_table_flush");
@ -1072,6 +1097,7 @@ public class InternalEngine extends Engine {
} }
private void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { private void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
ensureCanFlush();
try { try {
Translog.TranslogGeneration translogGeneration = translog.getGeneration(); Translog.TranslogGeneration translogGeneration = translog.getGeneration();
logger.trace("committing writer with translog id [{}] and sync id [{}] ", translogGeneration.translogFileGeneration, syncId); logger.trace("committing writer with translog id [{}] and sync id [{}] ", translogGeneration.translogFileGeneration, syncId);
@ -1089,8 +1115,14 @@ public class InternalEngine extends Engine {
} }
} }
private void commitIndexWriter(IndexWriter writer, Translog translog) throws IOException { private void ensureCanFlush() {
commitIndexWriter(writer, translog, null); // translog recover happens after the engine is fully constructed
// if we are in this stage we have to prevent flushes from this
// engine otherwise we might loose documents if the flush succeeds
// and the translog recover fails we we "commit" the translog on flush.
if (allowCommits.get() == false) {
throw new FlushNotAllowedEngineException(shardId, "flushes are disabled - pending translog recovery");
}
} }
public void onSettingsChanged() { public void onSettingsChanged() {

View File

@ -21,8 +21,8 @@ package org.elasticsearch.index.engine;
public class InternalEngineFactory implements EngineFactory { public class InternalEngineFactory implements EngineFactory {
@Override @Override
public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery) { public Engine newReadWriteEngine(EngineConfig config) {
return new InternalEngine(config, skipTranslogRecovery); return new InternalEngine(config);
} }
@Override @Override

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import java.io.IOException; import java.io.IOException;
@ -247,4 +248,9 @@ public class ShadowEngine extends Engine {
public void deactivateThrottling() { public void deactivateThrottling() {
throw new UnsupportedOperationException("ShadowEngine has no IndexWriter"); throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
} }
@Override
public Engine recoverFromTranslog() throws IOException {
throw new UnsupportedOperationException("can't recover on a shadow engine");
}
} }

View File

@ -41,6 +41,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.metrics.MeanMetric;
@ -53,7 +54,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.SuspendableRefContainer; import org.elasticsearch.common.util.concurrent.SuspendableRefContainer;
import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.SearchSlowLog;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache; import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
@ -110,7 +110,6 @@ import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -141,9 +140,10 @@ public class IndexShard extends AbstractIndexShardComponent {
private final Engine.Warmer warmer; private final Engine.Warmer warmer;
private final SnapshotDeletionPolicy deletionPolicy; private final SnapshotDeletionPolicy deletionPolicy;
private final SimilarityService similarityService; private final SimilarityService similarityService;
private final EngineConfig engineConfig;
private final TranslogConfig translogConfig; private final TranslogConfig translogConfig;
private final IndexEventListener indexEventListener; private final IndexEventListener indexEventListener;
private final QueryCachingPolicy cachingPolicy;
/** /**
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this * How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
@ -185,7 +185,6 @@ public class IndexShard extends AbstractIndexShardComponent {
private static final EnumSet<IndexShardState> writeAllowedStatesForReplica = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); private static final EnumSet<IndexShardState> writeAllowedStatesForReplica = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);
private final IndexSearcherWrapper searcherWrapper; private final IndexSearcherWrapper searcherWrapper;
/** /**
* True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link * True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link
* IndexingMemoryController}). * IndexingMemoryController}).
@ -231,7 +230,6 @@ public class IndexShard extends AbstractIndexShardComponent {
this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings,
bigArrays); bigArrays);
final QueryCachingPolicy cachingPolicy;
// the query cache is a node-level thing, however we want the most popular filters // the query cache is a node-level thing, however we want the most popular filters
// to be computed on a per-shard basis // to be computed on a per-shard basis
if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) { if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) {
@ -239,11 +237,9 @@ public class IndexShard extends AbstractIndexShardComponent {
} else { } else {
cachingPolicy = new UsageTrackingQueryCachingPolicy(); cachingPolicy = new UsageTrackingQueryCachingPolicy();
} }
suspendableRefContainer = new SuspendableRefContainer();
this.engineConfig = newEngineConfig(translogConfig, cachingPolicy); searcherWrapper = indexSearcherWrapper;
this.suspendableRefContainer = new SuspendableRefContainer(); primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
this.searcherWrapper = indexSearcherWrapper;
this.primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
} }
public Store store() { public Store store() {
@ -322,7 +318,7 @@ public class IndexShard extends AbstractIndexShardComponent {
} }
public QueryCachingPolicy getQueryCachingPolicy() { public QueryCachingPolicy getQueryCachingPolicy() {
return this.engineConfig.getQueryCachingPolicy(); return cachingPolicy;
} }
/** /**
@ -853,13 +849,14 @@ public class IndexShard extends AbstractIndexShardComponent {
// we still invoke any onShardInactive listeners ... we won't sync'd flush in this case because we only do that on primary and this // we still invoke any onShardInactive listeners ... we won't sync'd flush in this case because we only do that on primary and this
// is a replica // is a replica
active.set(true); active.set(true);
return engineConfig.getTranslogRecoveryPerformer().performBatchRecovery(getEngine(), operations); Engine engine = getEngine();
return engine.config().getTranslogRecoveryPerformer().performBatchRecovery(engine, operations);
} }
/** /**
* After the store has been recovered, we need to start the engine in order to apply operations * After the store has been recovered, we need to start the engine in order to apply operations
*/ */
public void performTranslogRecovery(boolean indexExists) { public void performTranslogRecovery(boolean indexExists) throws IOException {
if (indexExists == false) { if (indexExists == false) {
// note: these are set when recovering from the translog // note: these are set when recovering from the translog
final RecoveryState.Translog translogStats = recoveryState().getTranslog(); final RecoveryState.Translog translogStats = recoveryState().getTranslog();
@ -870,7 +867,7 @@ public class IndexShard extends AbstractIndexShardComponent {
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
} }
private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) { private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) throws IOException {
if (state != IndexShardState.RECOVERING) { if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state); throw new IndexShardNotRecoveringException(shardId, state);
} }
@ -884,16 +881,34 @@ public class IndexShard extends AbstractIndexShardComponent {
} }
} }
recoveryState.setStage(RecoveryState.Stage.TRANSLOG); recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
final EngineConfig.OpenMode openMode;
/* by default we recover and index and replay the translog but if the index
* doesn't exist we create everything from the scratch. Yet, if the index
* doesn't exist we don't need to worry about the skipTranslogRecovery since
* there is no translog on a non-existing index.
* The skipTranslogRecovery invariant is used if we do remote recovery since
* there the translog isn't local but on the remote host, hence we can skip it.
*/
if (indexExists == false) {
openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG;
} else if (skipTranslogRecovery) {
openMode = EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG;
} else {
openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
}
final EngineConfig config = newEngineConfig(openMode, translogConfig, cachingPolicy,
new IndexShardRecoveryPerformer(shardId, mapperService, logger));
// we disable deletes since we allow for operations to be executed against the shard while recovering // we disable deletes since we allow for operations to be executed against the shard while recovering
// but we need to make sure we don't loose deletes until we are done recovering // but we need to make sure we don't loose deletes until we are done recovering
engineConfig.setEnableGcDeletes(false); config.setEnableGcDeletes(false);
engineConfig.setCreate(indexExists == false); Engine newEngine = createNewEngine(config);
if (skipTranslogRecovery == false) { verifyNotClosed();
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still give sync'd flush a chance to run: // we still give sync'd flush a chance to run:
active.set(true); active.set(true);
newEngine.recoverFromTranslog();
} }
createNewEngine(skipTranslogRecovery, engineConfig);
} }
@ -943,8 +958,9 @@ public class IndexShard extends AbstractIndexShardComponent {
*/ */
public void finalizeRecovery() { public void finalizeRecovery() {
recoveryState().setStage(RecoveryState.Stage.FINALIZE); recoveryState().setStage(RecoveryState.Stage.FINALIZE);
getEngine().refresh("recovery_finalization"); Engine engine = getEngine();
engineConfig.setEnableGcDeletes(true); engine.refresh("recovery_finalization");
engine.config().setEnableGcDeletes(true);
} }
/** /**
@ -1313,13 +1329,14 @@ public class IndexShard extends AbstractIndexShardComponent {
} }
} }
private void createNewEngine(boolean skipTranslogRecovery, EngineConfig config) { private Engine createNewEngine(EngineConfig config) {
synchronized (mutex) { synchronized (mutex) {
if (state == IndexShardState.CLOSED) { if (state == IndexShardState.CLOSED) {
throw new EngineClosedException(shardId); throw new EngineClosedException(shardId);
} }
assert this.currentEngineReference.get() == null; assert this.currentEngineReference.get() == null;
this.currentEngineReference.set(newEngine(skipTranslogRecovery, config)); this.currentEngineReference.set(newEngine(config));
} }
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during which // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during which
@ -1330,10 +1347,11 @@ public class IndexShard extends AbstractIndexShardComponent {
if (engine != null) { if (engine != null) {
engine.onSettingsChanged(); engine.onSettingsChanged();
} }
return engine;
} }
protected Engine newEngine(boolean skipTranslogRecovery, EngineConfig config) { protected Engine newEngine(EngineConfig config) {
return engineFactory.newReadWriteEngine(config, skipTranslogRecovery); return engineFactory.newReadWriteEngine(config);
} }
/** /**
@ -1374,34 +1392,8 @@ public class IndexShard extends AbstractIndexShardComponent {
return mapperService.documentMapperWithAutoCreate(type); return mapperService.documentMapperWithAutoCreate(type);
} }
private final EngineConfig newEngineConfig(TranslogConfig translogConfig, QueryCachingPolicy cachingPolicy) { private final EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, TranslogConfig translogConfig, QueryCachingPolicy cachingPolicy, TranslogRecoveryPerformer translogRecoveryPerformer) {
final TranslogRecoveryPerformer translogRecoveryPerformer = new TranslogRecoveryPerformer(shardId, mapperService, logger) { return new EngineConfig(openMode, shardId,
@Override
protected void operationProcessed() {
assert recoveryState != null;
recoveryState.getTranslog().incrementRecoveredOperations();
}
@Override
public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException {
assert recoveryState != null;
RecoveryState.Translog translogStats = recoveryState.getTranslog();
translogStats.totalOperations(snapshot.totalOperations());
translogStats.totalOperationsOnStart(snapshot.totalOperations());
return super.recoveryFromSnapshot(engine, snapshot);
}
@Override
protected void index(Engine engine, Engine.Index engineIndex) {
IndexShard.this.index(engine, engineIndex);
}
@Override
protected void delete(Engine engine, Engine.Delete engineDelete) {
IndexShard.this.delete(engine, engineDelete);
}
};
return new EngineConfig(shardId,
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(), threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
indexSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME)); indexSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME));
@ -1535,4 +1527,36 @@ public class IndexShard extends AbstractIndexShardComponent {
return getEngine().refreshNeeded(); return getEngine().refreshNeeded();
} }
private class IndexShardRecoveryPerformer extends TranslogRecoveryPerformer {
protected IndexShardRecoveryPerformer(ShardId shardId, MapperService mapperService, ESLogger logger) {
super(shardId, mapperService, logger);
}
@Override
protected void operationProcessed() {
assert recoveryState != null;
recoveryState.getTranslog().incrementRecoveredOperations();
}
@Override
public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException {
assert recoveryState != null;
RecoveryState.Translog translogStats = recoveryState.getTranslog();
translogStats.totalOperations(snapshot.totalOperations());
translogStats.totalOperationsOnStart(snapshot.totalOperations());
return super.recoveryFromSnapshot(engine, snapshot);
}
@Override
protected void index(Engine engine, Engine.Index engineIndex) {
IndexShard.this.index(engine, engineIndex);
}
@Override
protected void delete(Engine engine, Engine.Delete engineDelete) {
IndexShard.this.delete(engine, engineDelete);
}
}
} }

View File

@ -29,7 +29,6 @@ import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.SearchSlowLog;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.index.translog.TranslogStats;
@ -81,10 +80,9 @@ public final class ShadowIndexShard extends IndexShard {
} }
@Override @Override
protected Engine newEngine(boolean skipInitialTranslogRecovery, EngineConfig config) { protected Engine newEngine(EngineConfig config) {
assert this.shardRouting.primary() == false; assert this.shardRouting.primary() == false;
assert skipInitialTranslogRecovery : "can not recover from gateway"; assert config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG;
config.setCreate(false); // hardcoded - we always expect an index to be present
return engineFactory.newReadOnlyEngine(config); return engineFactory.newReadOnlyEngine(config);
} }

View File

@ -225,7 +225,7 @@ final class StoreRecovery {
indexShard.performTranslogRecovery(indexShouldExists); indexShard.performTranslogRecovery(indexShouldExists);
indexShard.finalizeRecovery(); indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store"); indexShard.postRecovery("post recovery from shard_store");
} catch (EngineException e) { } catch (EngineException | IOException e) {
throw new IndexShardRecoveryException(shardId, "failed to recovery from gateway", e); throw new IndexShardRecoveryException(shardId, "failed to recovery from gateway", e);
} finally { } finally {
store.decRef(); store.decRef();

View File

@ -1343,4 +1343,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME)); return Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME));
} }
/**
* Returns the translog uuid used to associate a lucene index with a translog.
*/
public String getTranslogUUID() {
return translogUUID;
}
} }

View File

@ -100,7 +100,7 @@ public final class TranslogConfig {
* file referenced by this generation. The translog creation will fail if this generation can't be opened. * file referenced by this generation. The translog creation will fail if this generation can't be opened.
*/ */
public TranslogGeneration getTranslogGeneration() { public TranslogGeneration getTranslogGeneration() {
return translogGeneration; return translogGeneration; // TODO make this a ctor argument on the Translog - this mutable state is aweful
} }
/** /**

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException; import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
@ -43,8 +42,6 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.PriorityQueue; import java.util.PriorityQueue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -71,7 +68,7 @@ public class IndexingMemoryController extends AbstractComponent implements Index
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final IndicesService indicesService; private final Iterable<IndexShard> indexShards;
private final ByteSizeValue indexingBuffer; private final ByteSizeValue indexingBuffer;
@ -88,14 +85,14 @@ public class IndexingMemoryController extends AbstractComponent implements Index
private final ShardsIndicesStatusChecker statusChecker; private final ShardsIndicesStatusChecker statusChecker;
IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) { IndexingMemoryController(Settings settings, ThreadPool threadPool, Iterable<IndexShard>indexServices) {
this(settings, threadPool, indicesService, JvmInfo.jvmInfo().getMem().getHeapMax().bytes()); this(settings, threadPool, indexServices, JvmInfo.jvmInfo().getMem().getHeapMax().bytes());
} }
// for testing // for testing
IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService, long jvmMemoryInBytes) { IndexingMemoryController(Settings settings, ThreadPool threadPool, Iterable<IndexShard> indexServices, long jvmMemoryInBytes) {
super(settings); super(settings);
this.indicesService = indicesService; this.indexShards = indexServices;
ByteSizeValue indexingBuffer; ByteSizeValue indexingBuffer;
String indexingBufferSetting = this.settings.get(INDEX_BUFFER_SIZE_SETTING, "10%"); String indexingBufferSetting = this.settings.get(INDEX_BUFFER_SIZE_SETTING, "10%");
@ -152,15 +149,12 @@ public class IndexingMemoryController extends AbstractComponent implements Index
protected List<IndexShard> availableShards() { protected List<IndexShard> availableShards() {
List<IndexShard> availableShards = new ArrayList<>(); List<IndexShard> availableShards = new ArrayList<>();
for (IndexShard shard : indexShards) {
for (IndexService indexService : indicesService) {
for (IndexShard shard : indexService) {
// shadow replica doesn't have an indexing buffer // shadow replica doesn't have an indexing buffer
if (shard.canIndex() && CAN_WRITE_INDEX_BUFFER_STATES.contains(shard.state())) { if (shard.canIndex() && CAN_WRITE_INDEX_BUFFER_STATES.contains(shard.state())) {
availableShards.add(shard); availableShards.add(shard);
} }
} }
}
return availableShards; return availableShards;
} }

View File

@ -53,6 +53,7 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock; import org.elasticsearch.env.ShardLock;
import org.elasticsearch.gateway.MetaDataStateFormat; import org.elasticsearch.gateway.MetaDataStateFormat;
@ -171,7 +172,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
this.mapperRegistry = mapperRegistry; this.mapperRegistry = mapperRegistry;
clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING, indexStoreConfig::setRateLimitingType); clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING, indexStoreConfig::setRateLimitingType);
clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, indexStoreConfig::setRateLimitingThrottle); clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, indexStoreConfig::setRateLimitingThrottle);
indexingMemoryController = new IndexingMemoryController(settings, threadPool, this); indexingMemoryController = new IndexingMemoryController(settings, threadPool, Iterables.flatten(this));
this.indexScopeSetting = indexScopedSettings; this.indexScopeSetting = indexScopedSettings;
this.circuitBreakerService = circuitBreakerService; this.circuitBreakerService = circuitBreakerService;
this.indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() { this.indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {

View File

@ -79,13 +79,11 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.MetadataFieldMapper; import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.core.StringFieldMapper;
import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper; import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.object.RootObjectMapper; import org.elasticsearch.index.mapper.object.RootObjectMapper;
import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils; import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
@ -197,6 +195,13 @@ public class InternalEngineTests extends ESTestCase {
} }
} }
public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode) {
return new EngineConfig(openMode, config.getShardId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(),
config.getStore(), config.getDeletionPolicy(), config.getMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), config.getQueryCache(),
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter());
}
@Override @Override
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
@ -259,19 +264,33 @@ public class InternalEngineTests extends ESTestCase {
return new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); return new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
} }
protected InternalEngine createEngine(Store store, Path translogPath) { protected InternalEngine createEngine(Store store, Path translogPath) throws IOException {
return createEngine(defaultSettings, store, translogPath, newMergePolicy()); return createEngine(defaultSettings, store, translogPath, newMergePolicy());
} }
protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) { protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException {
return new InternalEngine(config(indexSettings, store, translogPath, mergePolicy), false); EngineConfig config = config(indexSettings, store, translogPath, mergePolicy);
InternalEngine internalEngine = new InternalEngine(config);
if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
internalEngine.recoverFromTranslog();
}
return internalEngine;
} }
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) { public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) {
IndexWriterConfig iwc = newIndexWriterConfig(); IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
final EngineConfig.OpenMode openMode;
EngineConfig config = new EngineConfig(shardId, threadPool, indexSettings try {
if (Lucene.indexExists(store.directory()) == false) {
openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG;
} else {
openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
}
} catch (IOException e) {
throw new ElasticsearchException("can't find index?", e);
}
EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings
, null, store, createSnapshotDeletionPolicy(), mergePolicy, , null, store, createSnapshotDeletionPolicy(), mergePolicy,
iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), new Engine.EventListener() { iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), new Engine.EventListener() {
@Override @Override
@ -279,11 +298,7 @@ public class InternalEngineTests extends ESTestCase {
// we don't need to notify anybody in this test // we don't need to notify anybody in this test
} }
}, new TranslogHandler(shardId.getIndexName(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5)); }, new TranslogHandler(shardId.getIndexName(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5));
try {
config.setCreate(Lucene.indexExists(store.directory()) == false);
} catch (IOException e) {
throw new ElasticsearchException("can't find index?", e);
}
return config; return config;
} }
@ -501,7 +516,7 @@ public class InternalEngineTests extends ESTestCase {
SegmentsStats stats = engine.segmentsStats(true); SegmentsStats stats = engine.segmentsStats(true);
assertThat(stats.getFileSizes().size(), greaterThan(0)); assertThat(stats.getFileSizes().size(), greaterThan(0));
assertThat((Iterable<Long>) () -> stats.getFileSizes().valuesIt(), everyItem(greaterThan(0L))); assertThat(() -> stats.getFileSizes().valuesIt(), everyItem(greaterThan(0L)));
ObjectObjectCursor<String, Long> firstEntry = stats.getFileSizes().iterator().next(); ObjectObjectCursor<String, Long> firstEntry = stats.getFileSizes().iterator().next();
@ -556,13 +571,27 @@ public class InternalEngineTests extends ESTestCase {
InternalEngine engine = createEngine(store, translog); InternalEngine engine = createEngine(store, translog);
engine.close(); engine.close();
engine = new InternalEngine(engine.config(), false); engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
engine.recoverFromTranslog();
Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test")); Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test"));
assertThat(counter.get(), equalTo(2)); assertThat(counter.get(), equalTo(2));
searcher.close(); searcher.close();
IOUtils.close(store, engine); IOUtils.close(store, engine);
} }
public void testFlushIsDisabledDuringTranslogRecovery() throws IOException {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("1"), doc));
engine.close();
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
expectThrows(FlushNotAllowedEngineException.class, () -> engine.flush(true, true));
engine.recoverFromTranslog();
doc = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("2"), doc));
engine.flush();
}
public void testConcurrentGetAndFlush() throws Exception { public void testConcurrentGetAndFlush() throws Exception {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("1"), doc)); engine.index(new Engine.Index(newUid("1"), doc));
@ -794,7 +823,7 @@ public class InternalEngineTests extends ESTestCase {
public void testSyncedFlush() throws IOException { public void testSyncedFlush() throws IOException {
try (Store store = createStore(); try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogByteSizeMergePolicy()), false)) { new LogByteSizeMergePolicy()))) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("1"), doc)); engine.index(new Engine.Index(newUid("1"), doc));
@ -821,7 +850,7 @@ public class InternalEngineTests extends ESTestCase {
for (int i = 0; i < iters; i++) { for (int i = 0; i < iters; i++) {
try (Store store = createStore(); try (Store store = createStore();
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogDocMergePolicy()), false)) { new LogDocMergePolicy()))) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
Engine.Index doc1 = new Engine.Index(newUid("1"), doc); Engine.Index doc1 = new Engine.Index(newUid("1"), doc);
@ -889,8 +918,12 @@ public class InternalEngineTests extends ESTestCase {
} else { } else {
engine.flushAndClose(); engine.flushAndClose();
} }
engine = new InternalEngine(config, randomBoolean()); engine = new InternalEngine(copy(config, randomFrom(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG)));
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
if (engine.config().getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG && randomBoolean()) {
engine.recoverFromTranslog();
}
assertEquals(engine.config().getOpenMode().toString(), engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
} }
public void testSycnedFlushVanishesOnReplay() throws IOException { public void testSycnedFlushVanishesOnReplay() throws IOException {
@ -912,8 +945,8 @@ public class InternalEngineTests extends ESTestCase {
// this so we have to disable the check explicitly // this so we have to disable the check explicitly
directory.setPreventDoubleWrite(false); directory.setPreventDoubleWrite(false);
} }
config.setCreate(false); engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
engine = new InternalEngine(config, false); engine.recoverFromTranslog();
assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID));
} }
@ -1051,7 +1084,7 @@ public class InternalEngineTests extends ESTestCase {
public void testForceMerge() throws IOException { public void testForceMerge() throws IOException {
try (Store store = createStore(); try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogByteSizeMergePolicy()), false)) { // use log MP here we test some behavior in ESMP new LogByteSizeMergePolicy()))) { // use log MP here we test some behavior in ESMP
int numDocs = randomIntBetween(10, 100); int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) { for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), B_1, null); ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), B_1, null);
@ -1489,7 +1522,7 @@ public class InternalEngineTests extends ESTestCase {
public void testEnableGcDeletes() throws Exception { public void testEnableGcDeletes() throws Exception {
try (Store store = createStore(); try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy()), false)) { Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy()))) {
engine.config().setEnableGcDeletes(false); engine.config().setEnableGcDeletes(false);
// Add document // Add document
@ -1625,9 +1658,8 @@ public class InternalEngineTests extends ESTestCase {
// expected // expected
} }
// now it should be OK. // now it should be OK.
IndexSettings indexSettings = new IndexSettings(defaultSettings.getIndexMetaData(), EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy()), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG);
Settings.builder().put(defaultSettings.getSettings()).put(EngineConfig.INDEX_FORCE_NEW_TRANSLOG, true).build()); engine = new InternalEngine(config);
engine = createEngine(indexSettings, store, primaryTranslogDir, newMergePolicy());
} }
public void testTranslogReplayWithFailure() throws IOException { public void testTranslogReplayWithFailure() throws IOException {
@ -1644,7 +1676,6 @@ public class InternalEngineTests extends ESTestCase {
assertThat(topDocs.totalHits, equalTo(numDocs)); assertThat(topDocs.totalHits, equalTo(numDocs));
} }
engine.close(); engine.close();
boolean recoveredButFailed = false;
final MockDirectoryWrapper directory = DirectoryUtils.getLeaf(store.directory(), MockDirectoryWrapper.class); final MockDirectoryWrapper directory = DirectoryUtils.getLeaf(store.directory(), MockDirectoryWrapper.class);
if (directory != null) { if (directory != null) {
// since we rollback the IW we are writing the same segment files again after starting IW but MDW prevents // since we rollback the IW we are writing the same segment files again after starting IW but MDW prevents
@ -1661,7 +1692,7 @@ public class InternalEngineTests extends ESTestCase {
engine = createEngine(store, primaryTranslogDir); engine = createEngine(store, primaryTranslogDir);
started = true; started = true;
break; break;
} catch (EngineCreationFailureException ex) { } catch (EngineException | IOException e) {
} }
} }
@ -1702,7 +1733,7 @@ public class InternalEngineTests extends ESTestCase {
directory.setPreventDoubleWrite(false); directory.setPreventDoubleWrite(false);
} }
engine.close(); engine.close();
engine = new InternalEngine(engine.config(), true); engine = new InternalEngine(engine.config());
try (Engine.Searcher searcher = engine.acquireSearcher("test")) { try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
@ -1830,8 +1861,8 @@ public class InternalEngineTests extends ESTestCase {
parser.mappingUpdate = dynamicUpdate(); parser.mappingUpdate = dynamicUpdate();
engine.close(); engine.close();
engine.config().setCreate(false); engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); // we need to reuse the engine config unless the parser.mappingModified won't work
engine = new InternalEngine(engine.config(), false); // we need to reuse the engine config unless the parser.mappingModified won't work engine.recoverFromTranslog();
try (Engine.Searcher searcher = engine.acquireSearcher("test")) { try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
@ -1960,13 +1991,13 @@ public class InternalEngineTests extends ESTestCase {
/* create a TranslogConfig that has been created with a different UUID */ /* create a TranslogConfig that has been created with a different UUID */
TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), BigArrays.NON_RECYCLING_INSTANCE); TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), BigArrays.NON_RECYCLING_INSTANCE);
EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexSettings() EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, threadPool, config.getIndexSettings()
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(), , null, store, createSnapshotDeletionPolicy(), newMergePolicy(),
config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener() config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener()
, config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5)); , config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5));
try { try {
new InternalEngine(brokenConfig, false); InternalEngine internalEngine = new InternalEngine(brokenConfig);
fail("translog belongs to a different engine"); fail("translog belongs to a different engine");
} catch (EngineCreationFailureException ex) { } catch (EngineCreationFailureException ex) {
} }
@ -2015,4 +2046,67 @@ public class InternalEngineTests extends ESTestCase {
logger.info("exception caught: ", throwable.get()); logger.info("exception caught: ", throwable.get());
assertTrue("expected an Exception that signals shard is not available", TransportActions.isShardNotAvailableException(throwable.get())); assertTrue("expected an Exception that signals shard is not available", TransportActions.isShardNotAvailableException(throwable.get()));
} }
public void testCurrentTranslogIDisCommitted() throws IOException {
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy());
// create
{
ParsedDocument doc = testParsedDocument(Integer.toString(0), Integer.toString(0), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(0)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG))){
engine.index(firstIndexRequest);
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
// open and recover tlog
{
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
if (i == 0) {
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
} else {
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
}
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
}
// open index with new tlog
{
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
}
}
// open and recover tlog with empty tlog
{
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
}
}
}
} }

View File

@ -45,7 +45,6 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.Mapping;
@ -53,7 +52,6 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper; import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils; import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryService;
@ -217,24 +215,31 @@ public class ShadowEngineTests extends ESTestCase {
} }
protected InternalEngine createInternalEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) { protected InternalEngine createInternalEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) {
return new InternalEngine(config(indexSettings, store, translogPath, mergePolicy), true); EngineConfig config = config(indexSettings, store, translogPath, mergePolicy);
return new InternalEngine(config);
} }
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) { public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) {
IndexWriterConfig iwc = newIndexWriterConfig(); IndexWriterConfig iwc = newIndexWriterConfig();
final EngineConfig.OpenMode openMode;
try {
if (Lucene.indexExists(store.directory()) == false) {
openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG;
} else {
openMode = EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG;
}
} catch (IOException e) {
throw new ElasticsearchException("can't find index?", e);
}
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
EngineConfig config = new EngineConfig(shardId, threadPool, indexSettings EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings
, null, store, createSnapshotDeletionPolicy(), mergePolicy, , null, store, createSnapshotDeletionPolicy(), mergePolicy,
iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(null, logger), new Engine.EventListener() { iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(null, logger), new Engine.EventListener() {
@Override @Override
public void onFailedEngine(String reason, @Nullable Throwable t) { public void onFailedEngine(String reason, @Nullable Throwable t) {
// we don't need to notify anybody in this test // we don't need to notify anybody in this test
}}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5)); }}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5));
try {
config.setCreate(Lucene.indexExists(store.directory()) == false);
} catch (IOException e) {
throw new ElasticsearchException("can't find index?", e);
}
return config; return config;
} }

View File

@ -31,6 +31,7 @@ import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStats;
@ -117,6 +118,8 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
@ -1317,6 +1320,60 @@ public class IndexShardTests extends ESSingleNodeTestCase {
} }
} }
public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexService(resolveIndex("test"));
IndexShard shard = indexService.getShardOrNull(0);
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").get();
client().prepareDelete("test", "test", "0").get();
client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get();
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {};
shard.close("simon says", false);
AtomicReference<IndexShard> shardRef = new AtomicReference<>();
List<Throwable> failures = new ArrayList<>();
IndexingOperationListener listener = new IndexingOperationListener() {
@Override
public void postIndex(Engine.Index index, boolean created) {
try {
assertNotNull(shardRef.get());
// this is all IMC needs to do - check current memory and refresh
assertTrue(shardRef.get().getIndexBufferRAMBytesUsed() > 0);
shardRef.get().refresh("test");
} catch (Throwable t) {
failures.add(t);
throw t;
}
}
@Override
public void postDelete(Engine.Delete delete) {
try {
assertNotNull(shardRef.get());
// this is all IMC needs to do - check current memory and refresh
assertTrue(shardRef.get().getIndexBufferRAMBytesUsed() > 0);
shardRef.get().refresh("test");
} catch (Throwable t) {
failures.add(t);
throw t;
}
}
};
final IndexShard newShard = newIndexShard(indexService, shard, wrapper, listener);
shardRef.set(newShard);
recoverShard(newShard, shard.routingEntry());
try {
ExceptionsHelper.rethrowAndSuppress(failures);
} finally {
newShard.close("just do it", randomBoolean());
}
}
public void testSearchIsReleaseIfWrapperFails() throws IOException { public void testSearchIsReleaseIfWrapperFails() throws IOException {
createIndex("test"); createIndex("test");
ensureGreen(); ensureGreen();
@ -1347,13 +1404,13 @@ public class IndexShardTests extends ESSingleNodeTestCase {
} }
} }
private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException { public static final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException {
ShardRouting routing = new ShardRouting(shard.routingEntry()); IndexShard newShard = newIndexShard(indexService, shard, wrapper, listeners);
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), return recoverShard(newShard, shard.routingEntry());
shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), }
indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper,
indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners) public static final IndexShard recoverShard(IndexShard newShard, ShardRouting oldRouting) throws IOException {
); ShardRouting routing = new ShardRouting(oldRouting);
ShardRoutingHelper.reinit(routing); ShardRoutingHelper.reinit(routing);
newShard.updateRoutingEntry(routing, false); newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
@ -1365,6 +1422,15 @@ public class IndexShardTests extends ESSingleNodeTestCase {
return newShard; return newShard;
} }
public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException {
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(),
shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(),
indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper,
indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners)
);
return newShard;
}
public void testTranslogRecoverySyncsTranslog() throws IOException { public void testTranslogRecoverySyncsTranslog() throws IOException {
createIndex("testindexfortranslogsync"); createIndex("testindexfortranslogsync");
client().admin().indices().preparePutMapping("testindexfortranslogsync").setType("testtype").setSource(jsonBuilder().startObject() client().admin().indices().preparePutMapping("testindexfortranslogsync").setType("testtype").setSource(jsonBuilder().startObject()

View File

@ -19,26 +19,43 @@
package org.elasticsearch.indices; package org.elasticsearch.indices;
import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DirectoryReader;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTests;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
@ -377,4 +394,51 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
} }
}); });
} }
public void testTranslogRecoveryWorksWithIMC() throws IOException {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexService(resolveIndex("test"));
IndexShard shard = indexService.getShardOrNull(0);
for (int i = 0; i < 100; i++) {
client().prepareIndex("test", "test", Integer.toString(i)).setSource("{\"foo\" : \"bar\"}").get();
}
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {};
shard.close("simon says", false);
AtomicReference<IndexShard> shardRef = new AtomicReference<>();
Settings settings = Settings.builder().put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50kb").build();
Iterable<IndexShard> iterable = () -> (shardRef.get() == null) ? Collections.<IndexShard>emptyList().iterator()
: Collections.singleton(shardRef.get()).iterator();
AtomicInteger flushes = new AtomicInteger();
IndexingMemoryController imc = new IndexingMemoryController(settings, client().threadPool(), iterable) {
@Override
protected void writeIndexingBufferAsync(IndexShard shard) {
assertEquals(shard, shardRef.get());
flushes.incrementAndGet();
shard.writeIndexingBuffer();
}
};
final IndexShard newShard = IndexShardTests.newIndexShard(indexService, shard, wrapper, imc);
shardRef.set(newShard);
try {
assertEquals(0, imc.availableShards().size());
ShardRouting routing = new ShardRouting(shard.routingEntry());
ShardRoutingHelper.reinit(routing);
newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
assertEquals(1, imc.availableShards().size());
assertTrue(newShard.recoverFromStore(localNode));
assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1);
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
} finally {
newShard.close("simon says", false);
}
}
} }

View File

@ -32,8 +32,8 @@ public final class MockEngineFactory implements EngineFactory {
} }
@Override @Override
public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery) { public Engine newReadWriteEngine(EngineConfig config) {
return new MockInternalEngine(config, skipTranslogRecovery, wrapper); return new MockInternalEngine(config, wrapper);
} }
@Override @Override

View File

@ -32,8 +32,8 @@ final class MockInternalEngine extends InternalEngine {
private final boolean randomizeFlushOnClose; private final boolean randomizeFlushOnClose;
private Class<? extends FilterDirectoryReader> wrapperClass; private Class<? extends FilterDirectoryReader> wrapperClass;
MockInternalEngine(EngineConfig config, boolean skipInitialTranslogRecovery, Class<? extends FilterDirectoryReader> wrapper) throws EngineException { MockInternalEngine(EngineConfig config, Class<? extends FilterDirectoryReader> wrapper) throws EngineException {
super(config, skipInitialTranslogRecovery); super(config);
randomizeFlushOnClose = config.getIndexSettings().isOnSharedFilesystem() == false; randomizeFlushOnClose = config.getIndexSettings().isOnSharedFilesystem() == false;
wrapperClass = wrapper; wrapperClass = wrapper;