From f6aeb35ce8244f4e60cb827cccb42a359f3a2862 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 3 Aug 2016 08:34:09 +0200 Subject: [PATCH] Tighten up concurrent store metadata listing and engine writes (#19684) In several places in our code we need to get a consistent list of files + metadata of the current index. We currently have a couple of ways to do in the `Store` class, which also does the right things and tries to verify the integrity of the smaller files. Sadly, those methods can run into trouble if anyone writes into the folder while they are busy. Most notably, the index shard's engine decides to commit half way and remove a `segment_N` file before the store got to checksum (but did already list it). This race condition typically doesn't happen as almost all of the places where we list files also happen to be places where the relevant shard doesn't yet have an engine. There is however an exception (of course :)) which is the API to list shard stores, used by the master when it is looking for shard copies to assign to. I already took one shot at fixing this in #19416 , but it turns out not to be enough - see for example https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+master+multijob-os-compatibility/os=sles/822. The first inclination to fix this was to add more locking to the different Store methods and acquire the `IndexWriter` lock, thus preventing any engine for accessing if if the a shard is offline and use the current index commit snapshotting logic already existing in `IndexShard` for when the engine is started. That turned out to be a bad idea as we create more subtleties where, for example, a store listing can prevent a shard from starting up (the writer lock doesn't wait if it can't get access, but fails immediately, which is good). Another example is running on a shared directory where some other engine may actually hold the lock. Instead I decided to take another approach: 1) Remove all the various methods on store and keep one, which accepts an index commit (which can be null) and also clearly communicates that the *caller* is responsible for concurrent access. This also tightens up the API which is a plus. 2) Add a `snapshotStore` method to IndexShard that takes care of all the concurrency aspects with the engine, which is now possible because it's all in the same place. It's still a bit ugly but at least it's all in one place and we can evaluate how to improve on this later on. I also renamed the `snapshotIndex` method to `acquireIndexCommit` to avoid confusion and I think it communicates better what it does. --- .../elasticsearch/index/engine/Engine.java | 2 +- .../index/engine/InternalEngine.java | 2 +- .../index/engine/ShadowEngine.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 56 ++++++++++++-- .../index/shard/LocalShardSnapshot.java | 6 +- .../index/shard/ShadowIndexShard.java | 5 ++ .../org/elasticsearch/index/store/Store.java | 45 +++-------- .../recovery/RecoverySourceHandler.java | 4 +- .../recovery/RecoveryTargetService.java | 9 ++- .../TransportNodesListShardStoreMetaData.java | 10 +-- .../repositories/Repository.java | 10 +-- .../blobstore/BlobStoreRepository.java | 76 ++++++++++--------- .../snapshots/SnapshotShardsService.java | 4 +- .../ESIndexLevelReplicationTestCase.java | 17 ++++- .../index/shard/IndexShardTests.java | 72 ++++++++++++++++++ .../elasticsearch/index/store/StoreTests.java | 36 ++++----- .../recovery/RecoverySourceHandlerTests.java | 12 +-- 17 files changed, 237 insertions(+), 131 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index f9186185e61..d5dc64e3a56 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -654,7 +654,7 @@ public abstract class Engine implements Closeable { * * @param flushFirst indicates whether the engine should flush before returning the snapshot */ - public abstract IndexCommit snapshotIndex(boolean flushFirst) throws EngineException; + public abstract IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException; /** * fail engine due to some error. the engine will also be closed. diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1775fea6703..eba6fa10802 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -852,7 +852,7 @@ public class InternalEngine extends Engine { } @Override - public IndexCommit snapshotIndex(final boolean flushFirst) throws EngineException { + public IndexCommit acquireIndexCommit(final boolean flushFirst) throws EngineException { // we have to flush outside of the readlock otherwise we might have a problem upgrading // the to a write lock when we fail the engine in this operation if (flushFirst) { diff --git a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index 83f9d466f0e..2d5a134493a 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -205,7 +205,7 @@ public class ShadowEngine extends Engine { } @Override - public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException { + public IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException { throw new UnsupportedOperationException("Can not take snapshot from a shadow engine"); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index a97aad7abfa..1dff3ad8b9b 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -21,7 +21,11 @@ package org.elasticsearch.index.shard; import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.index.CheckIndex; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; @@ -29,6 +33,7 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.UsageTrackingQueryCachingPolicy; import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Lock; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.ThreadInterruptedException; import org.elasticsearch.ElasticsearchException; @@ -116,10 +121,12 @@ import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.search.suggest.completion2x.Completion090PostingsFormat; import org.elasticsearch.threadpool.ThreadPool; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; +import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; @@ -789,15 +796,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl /** * Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this - * commit won't be freed until the commit / snapshot is released via {@link #releaseSnapshot(IndexCommit)}. + * commit won't be freed until the commit / snapshot is released via {@link #releaseIndexCommit(IndexCommit)}. * * @param flushFirst true if the index should first be flushed to disk / a low level lucene commit should be executed */ - public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException { + public IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException { IndexShardState state = this.state; // one time volatile read // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) { - return getEngine().snapshotIndex(flushFirst); + return getEngine().acquireIndexCommit(flushFirst); } else { throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed"); } @@ -805,13 +812,50 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl /** - * Releases a snapshot taken from {@link #snapshotIndex(boolean)} this must be called to release the resources + * Releases a snapshot taken from {@link #acquireIndexCommit(boolean)} this must be called to release the resources * referenced by the given snapshot {@link IndexCommit}. */ - public void releaseSnapshot(IndexCommit snapshot) throws IOException { + public void releaseIndexCommit(IndexCommit snapshot) throws IOException { deletionPolicy.release(snapshot); } + /** + * gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard, + * without having to worry about the current state of the engine and concurrent flushes. + * + * @throws org.apache.lucene.index.IndexNotFoundException if no index is found in the current directory + * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an + * unexpected exception when opening the index reading the segments file. + * @throws IndexFormatTooOldException if the lucene index is too old to be opened. + * @throws IndexFormatTooNewException if the lucene index is too new to be opened. + * @throws FileNotFoundException if one or more files referenced by a commit are not present. + * @throws NoSuchFileException if one or more files referenced by a commit are not present. + */ + public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { + IndexCommit indexCommit = null; + store.incRef(); + try { + synchronized (mutex) { + // if the engine is not running, we can access the store directly, but we need to make sure no one starts + // the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized. + // That can be done out of mutex, since the engine can be closed half way. + Engine engine = getEngineOrNull(); + if (engine == null) { + try (Lock ignored = store.directory().obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + return store.getMetadata(null); + } + } + } + indexCommit = deletionPolicy.snapshot(); + return store.getMetadata(indexCommit); + } finally { + store.decRef(); + if (indexCommit != null) { + deletionPolicy.release(indexCommit); + } + } + } + /** * Fails the shard and marks the shard store as corrupted if * e is caused by index corruption @@ -1310,7 +1354,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl if ("checksum".equals(checkIndexOnStartup)) { // physical verification only: verify all checksums for the latest commit IOException corrupt = null; - MetadataSnapshot metadata = store.getMetadata(); + MetadataSnapshot metadata = snapshotStoreMetadata(); for (Map.Entry entry : metadata.asMap().entrySet()) { try { Store.checkIntegrity(entry.getValue(), store.directory()); diff --git a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java index 25c59caef99..0d53163f15e 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java @@ -23,7 +23,6 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.Lock; import org.apache.lucene.store.NoLockFactory; @@ -31,7 +30,6 @@ import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.index.Index; import org.elasticsearch.index.store.Store; -import org.elasticsearch.indices.recovery.RecoveryState; import java.io.Closeable; import java.io.IOException; @@ -52,7 +50,7 @@ final class LocalShardSnapshot implements Closeable { store.incRef(); boolean success = false; try { - indexCommit = shard.snapshotIndex(true); + indexCommit = shard.acquireIndexCommit(true); success = true; } finally { if (success == false) { @@ -120,7 +118,7 @@ final class LocalShardSnapshot implements Closeable { public void close() throws IOException { if (closed.compareAndSet(false, true)) { try { - shard.releaseSnapshot(indexCommit); + shard.releaseIndexCommit(indexCommit); } finally { store.decRef(); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index e35c95ae1f0..45a471e1aa9 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -109,4 +109,9 @@ public final class ShadowIndexShard extends IndexShard { public void addRefreshListener(Translog.Location location, Consumer listener) { throw new UnsupportedOperationException("Can't listen for a refresh on a shadow engine because it doesn't have a translog"); } + + @Override + public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { + throw new UnsupportedOperationException("can't snapshot the directory as the primary may change it underneath us"); + } } diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index 61d970c5594..659b230edab 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -73,6 +73,7 @@ import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import java.io.Closeable; @@ -208,45 +209,17 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } } - /** - * Returns a new MetadataSnapshot for the latest commit in this store or - * an empty snapshot if no index exists or can not be opened. - * - * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an - * unexpected exception when opening the index reading the segments file. - * @throws IndexFormatTooOldException if the lucene index is too old to be opened. - * @throws IndexFormatTooNewException if the lucene index is too new to be opened. - */ - public MetadataSnapshot getMetadataOrEmpty() throws IOException { - try { - return getMetadata(null); - } catch (IndexNotFoundException ex) { - // that's fine - happens all the time no need to log - } catch (FileNotFoundException | NoSuchFileException ex) { - logger.info("Failed to open / find files while reading metadata snapshot"); - } - return MetadataSnapshot.EMPTY; - } - - /** - * Returns a new MetadataSnapshot for the latest commit in this store. - * - * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an - * unexpected exception when opening the index reading the segments file. - * @throws IndexFormatTooOldException if the lucene index is too old to be opened. - * @throws IndexFormatTooNewException if the lucene index is too new to be opened. - * @throws FileNotFoundException if one or more files referenced by a commit are not present. - * @throws NoSuchFileException if one or more files referenced by a commit are not present. - * @throws IndexNotFoundException if no index / valid commit-point can be found in this store - */ - public MetadataSnapshot getMetadata() throws IOException { - return getMetadata(null); - } - /** * Returns a new MetadataSnapshot for the given commit. If the given commit is null * the latest commit point is used. * + * Note that this method requires the caller verify it has the right to access the store and + * no concurrent file changes are happening. If in doubt, you probably want to use one of the following: + * + * {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, ESLogger)} to read a meta data while locking + * {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard + * {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed + * * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an * unexpected exception when opening the index reading the segments file. * @throws IndexFormatTooOldException if the lucene index is too old to be opened. @@ -634,7 +607,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref // ignore, we don't really care, will get deleted later on } } - final Store.MetadataSnapshot metadataOrEmpty = getMetadata(); + final Store.MetadataSnapshot metadataOrEmpty = getMetadata(null); verifyAfterCleanup(sourceMetaData, metadataOrEmpty); } finally { metadataLock.writeLock().unlock(); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 7d201a3ca78..b226af7858e 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -127,7 +127,7 @@ public class RecoverySourceHandler { logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration()); final IndexCommit phase1Snapshot; try { - phase1Snapshot = shard.snapshotIndex(false); + phase1Snapshot = shard.acquireIndexCommit(false); } catch (Exception e) { IOUtils.closeWhileHandlingException(translogView); throw new RecoveryEngineException(shard.shardId(), 1, "Snapshot failed", e); @@ -139,7 +139,7 @@ public class RecoverySourceHandler { throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e); } finally { try { - shard.releaseSnapshot(phase1Snapshot); + shard.releaseIndexCommit(phase1Snapshot); } catch (IOException ex) { logger.warn("releasing snapshot caused exception", ex); } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java index a23b8060b17..1a13504fb1a 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java @@ -167,7 +167,13 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve logger.trace("collecting local files for {}", recoveryTarget); Store.MetadataSnapshot metadataSnapshot = null; try { - metadataSnapshot = recoveryTarget.store().getMetadataOrEmpty(); + if (recoveryTarget.indexShard().indexSettings().isOnSharedFilesystem()) { + // we are not going to copy any files, so don't bother listing files, potentially running + // into concurrency issues with the primary changing files underneath us. + metadataSnapshot = Store.MetadataSnapshot.EMPTY; + } else { + metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); + } } catch (IOException e) { logger.warn("error while listing local files, recover as if there are none", e); metadataSnapshot = Store.MetadataSnapshot.EMPTY; @@ -178,6 +184,7 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve new RecoveryFailedException(recoveryTarget.state(), "failed to list local files", e), true); return; } + logger.trace("{} local file count: [{}]", recoveryTarget, metadataSnapshot.size()); final StartRecoveryRequest request = new StartRecoveryRequest(recoveryTarget.shardId(), recoveryTarget.sourceNode(), clusterService.localNode(), metadataSnapshot, recoveryTarget.state().getType(), recoveryTarget.recoveryId()); diff --git a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index cdc95e1895a..341b0e57858 100644 --- a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -123,14 +123,8 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction - * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex} method. + * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireIndexCommit} method. * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller. *

* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index efc3c2cfe21..2bb92dd0c23 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -23,6 +23,7 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.IndexNotFoundException; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.SegmentInfos; @@ -40,14 +41,38 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Numbers; +import org.elasticsearch.common.ParseFieldMatcher; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.compress.NotXContentException; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.snapshots.IndexShardSnapshotException; import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; @@ -61,37 +86,13 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; -import org.elasticsearch.repositories.RepositoryData; -import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.common.ParseFieldMatcher; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.blobstore.BlobMetaData; -import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.compress.NotXContentException; -import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.metrics.CounterMetric; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.snapshots.SnapshotCreationException; import org.elasticsearch.snapshots.SnapshotException; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotShardFailure; @@ -1444,7 +1445,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp */ private class RestoreContext extends Context { - private final Store store; + private final IndexShard targetShard; private final RecoveryState recoveryState; @@ -1460,13 +1461,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp public RestoreContext(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { super(snapshotId, version, indexId, shard.shardId(), snapshotShardId); this.recoveryState = recoveryState; - store = shard.store(); + this.targetShard = shard; } /** * Performs restore operation */ public void restore() throws IOException { + final Store store = targetShard.store(); store.incRef(); try { logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, metadata.name(), shardId); @@ -1491,12 +1493,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); - final Store.MetadataSnapshot recoveryTargetMetadata; + Store.MetadataSnapshot recoveryTargetMetadata; try { - recoveryTargetMetadata = store.getMetadataOrEmpty(); - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) { - logger.warn("{} Can't read metadata from store", e, shardId); - throw new IndexShardRestoreFailedException(shardId, "Can't restore corrupted shard", e); + recoveryTargetMetadata = targetShard.snapshotStoreMetadata(); + } catch (IndexNotFoundException e) { + // happens when restore to an empty shard, not a big deal + logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId); + recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; + } catch (IOException e) { + logger.warn("{} Can't read metadata from store, will not reuse any local file while restoring", e, shardId); + recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; } final List filesToRecover = new ArrayList<>(); @@ -1550,7 +1556,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp try { for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); - restoreFile(fileToRecover); + restoreFile(fileToRecover, store); } } catch (IOException ex) { throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex); @@ -1597,7 +1603,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp * * @param fileInfo file to be restored */ - private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) throws IOException { + private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final Store store) throws IOException { boolean success = false; try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) { diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 136f37eee71..e957d2deb6c 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -348,7 +348,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements try { // we flush first to make sure we get the latest writes snapshotted - IndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true); + IndexCommit snapshotIndexCommit = indexShard.acquireIndexCommit(true); try { repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotIndexCommit, snapshotStatus); if (logger.isDebugEnabled()) { @@ -358,7 +358,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements TimeValue.timeValueMillis(snapshotStatus.time()), sb); } } finally { - indexShard.releaseSnapshot(snapshotIndexCommit); + indexShard.releaseIndexCommit(snapshotIndexCommit); } } catch (SnapshotFailedEngineException e) { throw e; diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 2bd2fe6e50d..b52c8fe9bdb 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.replication; import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexNotFoundException; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.store.AlreadyClosedException; @@ -299,7 +300,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { replica.prepareForIndexRecovery(); RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode); StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), pNode, rNode, - replica.store().getMetadataOrEmpty(), RecoveryState.Type.REPLICA, 0); + getMetadataSnapshotOrEmpty(replica), RecoveryState.Type.REPLICA, 0); RecoverySourceHandler recovery = new RecoverySourceHandler(primary, recoveryTarget, request, () -> 0L, e -> () -> {}, (int) ByteSizeUnit.MB.toKB(1), logger); recovery.recoverToTarget(); @@ -307,6 +308,20 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { replica.updateRoutingEntry(ShardRoutingHelper.moveToStarted(replica.routingEntry())); } + private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) throws IOException { + Store.MetadataSnapshot result; + try { + result = replica.snapshotStoreMetadata(); + } catch (IndexNotFoundException e) { + // OK! + result = Store.MetadataSnapshot.EMPTY; + } catch (IOException e) { + logger.warn("{} failed read store, treating as empty", e); + result = Store.MetadataSnapshot.EMPTY; + } + return result; + } + public synchronized DiscoveryNode getPrimaryNode() { return getDiscoveryNode(primary.routingEntry().currentNodeId()); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index a412d37c111..01eeb1edfe0 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -23,6 +23,7 @@ import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TermQuery; @@ -156,6 +157,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSear import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; /** * Simple unit-test IndexShard related operations. @@ -476,6 +478,76 @@ public class IndexShardTests extends ESSingleNodeTestCase { ShardStateMetaData.FORMAT.write(shardStateMetaData, shardPaths); } + public void testAcquireIndexCommit() throws IOException { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService(resolveIndex("test")); + final IndexShard shard = test.getShardOrNull(0); + int numDocs = randomInt(20); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test", "type", "id_" + i).setSource("{}").get(); + } + final boolean flushFirst = randomBoolean(); + IndexCommit commit = shard.acquireIndexCommit(flushFirst); + int moreDocs = randomInt(20); + for (int i = 0; i < moreDocs; i++) { + client().prepareIndex("test", "type", "id_" + numDocs + i).setSource("{}").get(); + } + shard.flush(new FlushRequest("index")); + // check that we can still read the commit that we captured + try (IndexReader reader = DirectoryReader.open(commit)) { + assertThat(reader.numDocs(), equalTo(flushFirst ? numDocs : 0)); + } + shard.releaseIndexCommit(commit); + shard.flush(new FlushRequest("index").force(true)); + // check it's clean up + assertThat(DirectoryReader.listCommits(shard.store().directory()), hasSize(1)); + } + + /*** + * test one can snapshot the store at various lifecycle stages + */ + public void testSnapshotStore() throws IOException { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService(resolveIndex("test")); + final IndexShard shard = test.getShardOrNull(0); + client().prepareIndex("test", "test", "0").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); + client().admin().indices().prepareFlush().get(); + ShardRouting routing = shard.routingEntry(); + test.removeShard(0, "b/c simon says so"); + routing = ShardRoutingHelper.reinit(routing); + IndexShard newShard = test.createShard(routing); + newShard.updateRoutingEntry(routing); + DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); + + Store.MetadataSnapshot snapshot = newShard.snapshotStoreMetadata(); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + + newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, + localNode)); + + snapshot = newShard.snapshotStoreMetadata(); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + + assertTrue(newShard.recoverFromStore()); + + snapshot = newShard.snapshotStoreMetadata(); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + + newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted()); + + snapshot = newShard.snapshotStoreMetadata(); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + + newShard.close("test", false); + + snapshot = newShard.snapshotStoreMetadata(); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + } + public void testDurableFlagHasEffect() { createIndex("test"); ensureGreen(); diff --git a/core/src/test/java/org/elasticsearch/index/store/StoreTests.java b/core/src/test/java/org/elasticsearch/index/store/StoreTests.java index e1636b713a1..25199caff91 100644 --- a/core/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/core/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -328,15 +328,14 @@ public class StoreTests extends ESTestCase { Store.MetadataSnapshot metadata; // check before we committed try { - store.getMetadata(); + store.getMetadata(null); fail("no index present - expected exception"); } catch (IndexNotFoundException ex) { // expected } - assertThat(store.getMetadataOrEmpty(), is(Store.MetadataSnapshot.EMPTY)); // nothing committed writer.commit(); writer.close(); - metadata = store.getMetadata(); + metadata = store.getMetadata(null); assertThat(metadata.asMap().isEmpty(), is(false)); for (StoreFileMetaData meta : metadata) { try (IndexInput input = store.directory().openInput(meta.name(), IOContext.DEFAULT)) { @@ -579,7 +578,7 @@ public class StoreTests extends ESTestCase { } writer.commit(); writer.close(); - first = store.getMetadata(); + first = store.getMetadata(null); assertDeleteContent(store, directoryService); store.close(); } @@ -609,7 +608,7 @@ public class StoreTests extends ESTestCase { } writer.commit(); writer.close(); - second = store.getMetadata(); + second = store.getMetadata(null); } Store.RecoveryDiff diff = first.recoveryDiff(second); assertThat(first.size(), equalTo(second.size())); @@ -639,7 +638,7 @@ public class StoreTests extends ESTestCase { writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(numDocs)))); writer.commit(); writer.close(); - Store.MetadataSnapshot metadata = store.getMetadata(); + Store.MetadataSnapshot metadata = store.getMetadata(null); StoreFileMetaData delFile = null; for (StoreFileMetaData md : metadata) { if (md.name().endsWith(".liv")) { @@ -674,7 +673,7 @@ public class StoreTests extends ESTestCase { writer.addDocument(docs.get(0)); writer.close(); - Store.MetadataSnapshot newCommitMetaData = store.getMetadata(); + Store.MetadataSnapshot newCommitMetaData = store.getMetadata(null); Store.RecoveryDiff newCommitDiff = newCommitMetaData.recoveryDiff(metadata); if (delFile != null) { assertThat(newCommitDiff.identical.size(), equalTo(newCommitMetaData.size() - 5)); // segments_N, del file, cfs, cfe, si for the new segment @@ -723,7 +722,7 @@ public class StoreTests extends ESTestCase { writer.addDocument(doc); } - Store.MetadataSnapshot firstMeta = store.getMetadata(); + Store.MetadataSnapshot firstMeta = store.getMetadata(null); if (random().nextBoolean()) { for (int i = 0; i < docs; i++) { @@ -738,7 +737,7 @@ public class StoreTests extends ESTestCase { writer.commit(); writer.close(); - Store.MetadataSnapshot secondMeta = store.getMetadata(); + Store.MetadataSnapshot secondMeta = store.getMetadata(null); if (randomBoolean()) { @@ -785,13 +784,10 @@ public class StoreTests extends ESTestCase { final AtomicInteger count = new AtomicInteger(0); final ShardLock lock = new DummyShardLock(shardId); - Store store = new Store(shardId, INDEX_SETTINGS, directoryService, lock, new Store.OnClose() { - @Override - public void handle(ShardLock theLock) { - assertEquals(shardId, theLock.getShardId()); - assertEquals(lock, theLock); - count.incrementAndGet(); - } + Store store = new Store(shardId, INDEX_SETTINGS, directoryService, lock, theLock -> { + assertEquals(shardId, theLock.getShardId()); + assertEquals(lock, theLock); + count.incrementAndGet(); }); assertEquals(count.get(), 0); @@ -917,11 +913,7 @@ public class StoreTests extends ESTestCase { writer.commit(); writer.close(); Store.MetadataSnapshot metadata; - if (randomBoolean()) { - metadata = store.getMetadata(); - } else { - metadata = store.getMetadata(deletionPolicy.snapshot()); - } + metadata = store.getMetadata(randomBoolean() ? null : deletionPolicy.snapshot()); assertFalse(metadata.asMap().isEmpty()); // do not check for correct files, we have enough tests for that above assertThat(metadata.getCommitUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId)); @@ -982,7 +974,7 @@ public class StoreTests extends ESTestCase { try { if (randomBoolean()) { - store.getMetadata(); + store.getMetadata(null); } else { store.readLastCommittedSegmentsInfo(); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index cfff28121ba..e0bb251f475 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -97,7 +97,9 @@ public class RecoverySourceHandlerTests extends ESTestCase { writer.addDocument(document); } writer.commit(); - Store.MetadataSnapshot metadata = store.getMetadata(); + writer.close(); + + Store.MetadataSnapshot metadata = store.getMetadata(null); List metas = new ArrayList<>(); for (StoreFileMetaData md : metadata) { metas.add(md); @@ -116,14 +118,14 @@ public class RecoverySourceHandlerTests extends ESTestCase { throw new RuntimeException(e); } }); - Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(); + Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(null); Store.RecoveryDiff recoveryDiff = targetStoreMetadata.recoveryDiff(metadata); assertEquals(metas.size(), recoveryDiff.identical.size()); assertEquals(0, recoveryDiff.different.size()); assertEquals(0, recoveryDiff.missing.size()); IndexReader reader = DirectoryReader.open(targetStore.directory()); assertEquals(numDocs, reader.maxDoc()); - IOUtils.close(reader, writer, store, targetStore); + IOUtils.close(reader, store, targetStore); } public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable { @@ -157,7 +159,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { writer.commit(); writer.close(); - Store.MetadataSnapshot metadata = store.getMetadata(); + Store.MetadataSnapshot metadata = store.getMetadata(null); List metas = new ArrayList<>(); for (StoreFileMetaData md : metadata) { metas.add(md); @@ -221,7 +223,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { writer.commit(); writer.close(); - Store.MetadataSnapshot metadata = store.getMetadata(); + Store.MetadataSnapshot metadata = store.getMetadata(null); List metas = new ArrayList<>(); for (StoreFileMetaData md : metadata) { metas.add(md);