From c783488e971d3099a2fc03446c978fae0286cba8 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 12 Sep 2018 17:47:10 +0200 Subject: [PATCH] Add `_source`-only snapshot repository (#32844) This change adds a `_source` only snapshot repository that allows to wrap any existing repository as a _backend_ to snapshot only the `_source` part including live docs markers. Snapshots taken with the `source` repository won't include any indices, doc-values or points. The snapshot will be reduced in size and functionality such that it requires full re-indexing after it's successfully restored. The restore process will copy the `_source` data locally starts a special shard and engine to allow `match_all` scrolls and searches. Any other query, or get call will fail with and unsupported operation exception. The restored index is also marked as read-only. This feature aims mainly for disaster recovery use-cases where snapshot size is a concern or where time to restore is less of an issue. **NOTE**: The snapshot produced by this repository is still a valid lucene index. This change doesn't allow for any longer retention policies which is out of scope for this change. --- docs/reference/modules/snapshots.asciidoc | 45 +++ .../core/internal/io/IOUtils.java | 9 + .../elasticsearch/index/engine/Engine.java | 2 +- .../index/engine/EngineFactory.java | 1 + .../elasticsearch/index/seqno/SeqNoStats.java | 1 - .../shard/AbstractIndexShardComponent.java | 2 - .../org/elasticsearch/index/store/Store.java | 21 +- .../elasticsearch/indices/IndicesService.java | 1 - .../repositories/FilterRepository.java | 167 ++++++++ .../repositories/RepositoriesService.java | 2 +- .../repositories/Repository.java | 13 +- .../blobstore/BlobStoreRepository.java | 15 +- .../snapshots/SnapshotShardsService.java | 3 +- .../index/shard/IndexShardTests.java | 3 +- .../index/engine/EngineTestCase.java | 3 +- .../index/shard/IndexShardTestCase.java | 5 +- .../test/InternalTestCluster.java | 5 +- .../SeqIdGeneratingFilterReader.java | 162 ++++++++ .../snapshots/SourceOnlySnapshot.java | 261 +++++++++++++ .../SourceOnlySnapshotRepository.java | 181 +++++++++ .../elasticsearch/xpack/core/XPackPlugin.java | 30 +- .../snapshots/SourceOnlySnapshotIT.java | 291 ++++++++++++++ .../SourceOnlySnapshotShardTests.java | 358 ++++++++++++++++++ .../snapshots/SourceOnlySnapshotTests.java | 245 ++++++++++++ .../rest-api-spec/test/snapshot/10_basic.yml | 84 ++++ 25 files changed, 1885 insertions(+), 25 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/repositories/FilterRepository.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SeqIdGeneratingFilterReader.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshot.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotTests.java create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/test/snapshot/10_basic.yml diff --git a/docs/reference/modules/snapshots.asciidoc b/docs/reference/modules/snapshots.asciidoc index 0562a677a8d..ba6adf1d35f 100644 --- a/docs/reference/modules/snapshots.asciidoc +++ b/docs/reference/modules/snapshots.asciidoc @@ -207,6 +207,51 @@ repositories.url.allowed_urls: ["http://www.example.org/root/*", "https://*.mydo URL repositories with `file:` URLs can only point to locations registered in the `path.repo` setting similar to shared file system repository. +[float] +[role="xpack"] +[testenv="basic"] +===== Source Only Repository + +A source repository enables you to create minimal, source-only snapshots that take up to 50% less space on disk. +Source only snapshots contain stored fields and index metadata. They do not include index or doc values structures +and are not searchable when restored. After restoring a source-only snapshot, you must <> +the data into a new index. + +Source repositories delegate to another snapshot repository for storage. + + +[IMPORTANT] +================================================== + +Source only snapshots are only supported if the `_source` field is enabled and no source-filtering is applied. +When you restore a source only snapshot: + + * The restored index is read-only and can only serve `match_all` search or scroll requests to enable reindexing. + + * Queries other than `match_all` and `_get` requests are not supported. + + * The mapping of the restored index is empty, but the original mapping is available from the types top + level `meta` element. + +================================================== + +When you create a source repository, you must specify the type and name of the delegate repository +where the snapshots will be stored: + +[source,js] +----------------------------------- +PUT _snapshot/my_src_only_repository +{ + "type": "source", + "settings": { + "delegate_type": "fs", + "location": "my_backup_location" + } +} +----------------------------------- +// CONSOLE +// TEST[continued] + [float] ===== Repository plugins diff --git a/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java b/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java index 67663516167..493d809f9dc 100644 --- a/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java +++ b/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java @@ -20,6 +20,7 @@ package org.elasticsearch.core.internal.io; import java.io.Closeable; import java.io.IOException; import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; import java.nio.file.FileVisitResult; import java.nio.file.FileVisitor; import java.nio.file.Files; @@ -36,6 +37,14 @@ import java.util.Map; */ public final class IOUtils { + /** + * UTF-8 charset string. + *

Where possible, use {@link StandardCharsets#UTF_8} instead, + * as using the String constant may slow things down. + * @see StandardCharsets#UTF_8 + */ + public static final String UTF_8 = StandardCharsets.UTF_8.name(); + private IOUtils() { // Static utils methods } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 5ebe13577f4..fc693113fee 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1594,7 +1594,7 @@ public abstract class Engine implements Closeable { private final CheckedRunnable onClose; private final IndexCommit indexCommit; - IndexCommitRef(IndexCommit indexCommit, CheckedRunnable onClose) { + public IndexCommitRef(IndexCommit indexCommit, CheckedRunnable onClose) { this.indexCommit = indexCommit; this.onClose = onClose; } diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineFactory.java b/server/src/main/java/org/elasticsearch/index/engine/EngineFactory.java index b477e27b6e1..e50bdd86e75 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineFactory.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineFactory.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.engine; /** * Simple Engine Factory */ +@FunctionalInterface public interface EngineFactory { Engine newReadWriteEngine(EngineConfig config); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java b/server/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java index 9c1795d654c..c711fb42936 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java @@ -91,5 +91,4 @@ public class SeqNoStats implements ToXContentFragment, Writeable { ", globalCheckpoint=" + globalCheckpoint + '}'; } - } diff --git a/server/src/main/java/org/elasticsearch/index/shard/AbstractIndexShardComponent.java b/server/src/main/java/org/elasticsearch/index/shard/AbstractIndexShardComponent.java index c56b0d740e7..c967e94f7da 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/AbstractIndexShardComponent.java +++ b/server/src/main/java/org/elasticsearch/index/shard/AbstractIndexShardComponent.java @@ -51,6 +51,4 @@ public abstract class AbstractIndexShardComponent implements IndexShardComponent public String nodeName() { return indexSettings.getNodeName(); } - - } diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index b892c5c01fe..8e57caad3b4 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -1439,11 +1439,28 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref */ public void bootstrapNewHistory() throws IOException { metadataLock.writeLock().lock(); - try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) { - final Map userData = getUserData(writer); + try { + Map userData = readLastCommittedSegmentsInfo().getUserData(); final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)); + bootstrapNewHistory(maxSeqNo); + } finally { + metadataLock.writeLock().unlock(); + } + } + + /** + * Marks an existing lucene index with a new history uuid and sets the given maxSeqNo as the local checkpoint + * as well as the maximum sequence number. + * This is used to make sure no existing shard will recovery from this index using ops based recovery. + * @see SequenceNumbers#LOCAL_CHECKPOINT_KEY + * @see SequenceNumbers#MAX_SEQ_NO + */ + public void bootstrapNewHistory(long maxSeqNo) throws IOException { + metadataLock.writeLock().lock(); + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) { final Map map = new HashMap<>(); map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); + map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo)); updateCommitData(writer, map); } finally { diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 1c83a880511..e9f674e14a5 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -396,7 +396,6 @@ public class IndicesService extends AbstractLifecycleComponent public IndexService indexService(Index index) { return indices.get(index.getUUID()); } - /** * Returns an IndexService for the specified index if exists otherwise a {@link IndexNotFoundException} is thrown. */ diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java new file mode 100644 index 00000000000..4e8e9b6c7f5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -0,0 +1,167 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.repositories; + +import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.component.LifecycleListener; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotShardFailure; + +import java.io.IOException; +import java.util.List; + +public class FilterRepository implements Repository { + + private final Repository in; + + public FilterRepository(Repository in) { + this.in = in; + } + + @Override + public RepositoryMetaData getMetadata() { + return in.getMetadata(); + } + + @Override + public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { + return in.getSnapshotInfo(snapshotId); + } + + @Override + public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { + return in.getSnapshotGlobalMetaData(snapshotId); + } + + @Override + public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException { + return in.getSnapshotIndexMetaData(snapshotId, index); + } + + @Override + public RepositoryData getRepositoryData() { + return in.getRepositoryData(); + } + + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { + in.initializeSnapshot(snapshotId, indices, metaData); + } + + @Override + public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId, boolean includeGlobalState) { + return in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, + includeGlobalState); + } + + @Override + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + in.deleteSnapshot(snapshotId, repositoryStateId); + } + + @Override + public long getSnapshotThrottleTimeInNanos() { + return in.getSnapshotThrottleTimeInNanos(); + } + + @Override + public long getRestoreThrottleTimeInNanos() { + return in.getRestoreThrottleTimeInNanos(); + } + + @Override + public String startVerification() { + return in.startVerification(); + } + + @Override + public void endVerification(String verificationToken) { + in.endVerification(verificationToken); + } + + @Override + public void verify(String verificationToken, DiscoveryNode localNode) { + in.verify(verificationToken, localNode); + } + + @Override + public boolean isReadOnly() { + return in.isReadOnly(); + } + + @Override + public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, + IndexShardSnapshotStatus snapshotStatus) { + in.snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus); + } + + @Override + public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, + RecoveryState recoveryState) { + in.restoreShard(shard, snapshotId, version, indexId, snapshotShardId, recoveryState); + } + + @Override + public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { + return in.getShardSnapshotStatus(snapshotId, version, indexId, shardId); + } + + @Override + public Lifecycle.State lifecycleState() { + return in.lifecycleState(); + } + + @Override + public void addLifecycleListener(LifecycleListener listener) { + in.addLifecycleListener(listener); + } + + @Override + public void removeLifecycleListener(LifecycleListener listener) { + in.removeLifecycleListener(listener); + } + + @Override + public void start() { + in.start(); + } + + @Override + public void stop() { + in.stop(); + } + + @Override + public void close() { + in.close(); + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index c6cbaa50cdf..aef4381cd8b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -398,7 +398,7 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta "repository type [" + repositoryMetaData.type() + "] does not exist"); } try { - Repository repository = factory.create(repositoryMetaData); + Repository repository = factory.create(repositoryMetaData, typesRegistry::get); repository.start(); return repository; } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index c0b45259f99..9f16d26ac75 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; @@ -35,6 +36,7 @@ import org.elasticsearch.snapshots.SnapshotShardFailure; import java.io.IOException; import java.util.List; +import java.util.function.Function; /** * An interface for interacting with a repository in snapshot and restore. @@ -46,7 +48,7 @@ import java.util.List; *

@@ -63,6 +65,10 @@ public interface Repository extends LifecycleComponent { * @param metadata metadata for the repository including name and settings */ Repository create(RepositoryMetaData metadata) throws Exception; + + default Repository create(RepositoryMetaData metaData, Function typeLookup) throws Exception { + return create(metaData); + } } /** @@ -188,14 +194,15 @@ public interface Repository extends LifecycleComponent { *

* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check * {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted. - * * @param shard shard to be snapshotted + * @param store store to be snapshotted * @param snapshotId snapshot id * @param indexId id for the index being snapshotted * @param snapshotIndexCommit commit point * @param snapshotStatus snapshot status */ - void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus); + void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, + IndexShardSnapshotStatus snapshotStatus); /** * Restores snapshot of the shard. diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 4c36cc5eed8..df80dd473f1 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -845,8 +845,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } @Override - public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { - SnapshotContext snapshotContext = new SnapshotContext(shard, snapshotId, indexId, snapshotStatus, System.currentTimeMillis()); + public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, + IndexShardSnapshotStatus snapshotStatus) { + SnapshotContext snapshotContext = new SnapshotContext(store, snapshotId, indexId, snapshotStatus, System.currentTimeMillis()); try { snapshotContext.snapshot(snapshotIndexCommit); } catch (Exception e) { @@ -854,7 +855,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp if (e instanceof IndexShardSnapshotFailedException) { throw (IndexShardSnapshotFailedException) e; } else { - throw new IndexShardSnapshotFailedException(shard.shardId(), e); + throw new IndexShardSnapshotFailedException(store.shardId(), e); } } } @@ -1157,15 +1158,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp /** * Constructs new context * - * @param shard shard to be snapshotted + * @param store store to be snapshotted * @param snapshotId snapshot id * @param indexId the id of the index being snapshotted * @param snapshotStatus snapshot status to report progress */ - SnapshotContext(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus, long startTime) { - super(snapshotId, Version.CURRENT, indexId, shard.shardId()); + SnapshotContext(Store store, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus, long startTime) { + super(snapshotId, Version.CURRENT, indexId, store.shardId()); this.snapshotStatus = snapshotStatus; - this.store = shard.store(); + this.store = store; this.startTime = startTime; } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 33b4d852987..88612dbcc50 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -389,7 +389,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements try { // we flush first to make sure we get the latest writes snapshotted try (Engine.IndexCommitRef snapshotRef = indexShard.acquireLastIndexCommit(true)) { - repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus); + repository.snapshotShard(indexShard, indexShard.store(), snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), + snapshotStatus); if (logger.isDebugEnabled()) { final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); logger.debug("snapshot ({}) completed to {} with {}", snapshot, repository, lastSnapshotStatus); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 0c5d9b1613f..9a5df39a970 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2969,7 +2969,8 @@ public class IndexShardTests extends IndexShardTestCase { } @Override - public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { + public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, + IndexShardSnapshotStatus snapshotStatus) { } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index f9377afe6ed..86f7bd903cc 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -833,7 +833,8 @@ public abstract class EngineTestCase extends ESTestCase { * Asserts the provided engine has a consistent document history between translog and Lucene index. */ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine engine, MapperService mapper) throws IOException { - if (mapper.documentMapper() == null || engine.config().getIndexSettings().isSoftDeleteEnabled() == false) { + if (mapper.documentMapper() == null || engine.config().getIndexSettings().isSoftDeleteEnabled() == false + || (engine instanceof InternalEngine) == false) { return; } final long maxSeqNo = ((InternalEngine) engine).getLocalCheckpointTracker().getMaxSeqNo(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index a9e715a1129..78ce5bc500c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -126,7 +126,7 @@ public abstract class IndexShardTestCase extends ESTestCase { }; protected ThreadPool threadPool; - private long primaryTerm; + protected long primaryTerm; @Override public void setUp() throws Exception { @@ -753,7 +753,8 @@ public abstract class IndexShardTestCase extends ESTestCase { Index index = shard.shardId().getIndex(); IndexId indexId = new IndexId(index.getName(), index.getUUID()); - repository.snapshotShard(shard, snapshot.getSnapshotId(), indexId, indexCommitRef.getIndexCommit(), snapshotStatus); + repository.snapshotShard(shard, shard.store(), snapshot.getSnapshotId(), indexId, indexCommitRef.getIndexCommit(), + snapshotStatus); } final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 3c46acd0fbe..08aafaea399 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -75,6 +75,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; @@ -1199,7 +1200,9 @@ public final class InternalTestCluster extends TestCluster { for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { try { - IndexShardTestCase.getTranslog(indexShard).getDeletionPolicy().assertNoOpenTranslogRefs(); + if (IndexShardTestCase.getEngine(indexShard) instanceof InternalEngine) { + IndexShardTestCase.getTranslog(indexShard).getDeletionPolicy().assertNoOpenTranslogRefs(); + } } catch (AlreadyClosedException ok) { // all good } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SeqIdGeneratingFilterReader.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SeqIdGeneratingFilterReader.java new file mode 100644 index 00000000000..8dd5d9d98ca --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SeqIdGeneratingFilterReader.java @@ -0,0 +1,162 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.snapshots; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FilterDirectoryReader; +import org.apache.lucene.index.FilterLeafReader; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.Terms; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.mapper.VersionFieldMapper; + +import java.io.IOException; +import java.util.IdentityHashMap; +import java.util.Map; + +/** + * This filter reader fakes sequence ID, primary term and version + * for a source only index. + */ +final class SeqIdGeneratingFilterReader extends FilterDirectoryReader { + private final long primaryTerm; + + private SeqIdGeneratingFilterReader(DirectoryReader in, SeqIdGeneratingSubReaderWrapper wrapper) throws IOException { + super(in, wrapper); + primaryTerm = wrapper.primaryTerm; + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { + return wrap(in, primaryTerm); + } + + static DirectoryReader wrap(DirectoryReader in, long primaryTerm) throws IOException { + Map ctxMap = new IdentityHashMap<>(); + for (LeafReaderContext leave : in.leaves()) { + ctxMap.put(leave.reader(), leave); + } + return new SeqIdGeneratingFilterReader(in, new SeqIdGeneratingSubReaderWrapper(ctxMap, primaryTerm)); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return in.getReaderCacheHelper(); + } + + private abstract static class FakeNumericDocValues extends NumericDocValues { + private final int maxDoc; + int docID = -1; + + FakeNumericDocValues(int maxDoc) { + this.maxDoc = maxDoc; + } + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() { + if (docID+1 < maxDoc) { + docID++; + } else { + docID = NO_MORE_DOCS; + } + return docID; + } + + @Override + public int advance(int target) { + if (target >= maxDoc) { + docID = NO_MORE_DOCS; + } else { + docID = target; + } + return docID; + } + + @Override + public long cost() { + return maxDoc; + } + + @Override + public boolean advanceExact(int target) { + advance(target); + return docID != NO_MORE_DOCS; + } + } + + private static class SeqIdGeneratingSubReaderWrapper extends SubReaderWrapper { + private final Map ctxMap; + private final long primaryTerm; + + SeqIdGeneratingSubReaderWrapper(Map ctxMap, long primaryTerm) { + this.ctxMap = ctxMap; + this.primaryTerm = primaryTerm; + } + + @Override + public LeafReader wrap(LeafReader reader) { + LeafReaderContext leafReaderContext = ctxMap.get(reader); + final int docBase = leafReaderContext.docBase; + return new FilterLeafReader(reader) { + + @Override + public NumericDocValues getNumericDocValues(String field) throws IOException { + if (SeqNoFieldMapper.NAME.equals(field)) { + return new FakeNumericDocValues(maxDoc()) { + @Override + public long longValue() { + return docBase + docID; + } + }; + } else if (SeqNoFieldMapper.PRIMARY_TERM_NAME.equals(field)) { + return new FakeNumericDocValues(maxDoc()) { + @Override + public long longValue() { + return primaryTerm; + } + }; + } else if (VersionFieldMapper.NAME.equals(field)) { + return new FakeNumericDocValues(maxDoc()) { + @Override + public long longValue() { + return 1; + } + }; + } + return super.getNumericDocValues(field); + } + + @Override + public CacheHelper getCoreCacheHelper() { + return reader.getCoreCacheHelper(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return reader.getReaderCacheHelper(); + } + + @Override + public Terms terms(String field) { + throw new UnsupportedOperationException("_source only indices can't be searched or filtered"); + } + + @Override + public PointValues getPointValues(String field) { + throw new UnsupportedOperationException("_source only indices can't be searched or filtered"); + } + }; + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshot.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshot.java new file mode 100644 index 00000000000..b7d6a51f45a --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshot.java @@ -0,0 +1,261 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.snapshots; + +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.index.CheckIndex; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentInfo; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; +import org.apache.lucene.index.StandardDirectoryReader; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.Lock; +import org.apache.lucene.store.TrackingDirectoryWrapper; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.FixedBitSet; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.core.internal.io.IOUtils; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.FIELDS_EXTENSION; +import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.FIELDS_INDEX_EXTENSION; + +public class SourceOnlySnapshot { + private final Directory targetDirectory; + private final Supplier deleteByQuerySupplier; + + public SourceOnlySnapshot(Directory targetDirectory, Supplier deleteByQuerySupplier) { + this.targetDirectory = targetDirectory; + this.deleteByQuerySupplier = deleteByQuerySupplier; + } + + public SourceOnlySnapshot(Directory targetDirectory) { + this(targetDirectory, null); + } + + public synchronized List syncSnapshot(IndexCommit commit) throws IOException { + long generation; + Map existingSegments = new HashMap<>(); + if (Lucene.indexExists(targetDirectory)) { + SegmentInfos existingsSegmentInfos = Lucene.readSegmentInfos(targetDirectory); + for (SegmentCommitInfo info : existingsSegmentInfos) { + existingSegments.put(new BytesRef(info.info.getId()), info); + } + generation = existingsSegmentInfos.getGeneration(); + } else { + generation = 1; + } + List createdFiles = new ArrayList<>(); + String segmentFileName; + try (Lock writeLock = targetDirectory.obtainLock(IndexWriter.WRITE_LOCK_NAME); + StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(commit)) { + SegmentInfos segmentInfos = reader.getSegmentInfos(); + DirectoryReader wrapper = wrapReader(reader); + List newInfos = new ArrayList<>(); + for (LeafReaderContext ctx : wrapper.leaves()) { + SegmentCommitInfo info = segmentInfos.info(ctx.ord); + LeafReader leafReader = ctx.reader(); + LiveDocs liveDocs = getLiveDocs(leafReader); + if (leafReader.numDocs() != 0) { // fully deleted segments don't need to be processed + SegmentCommitInfo newInfo = syncSegment(info, liveDocs, leafReader.getFieldInfos(), existingSegments, createdFiles); + newInfos.add(newInfo); + } + } + segmentInfos.clear(); + segmentInfos.addAll(newInfos); + segmentInfos.setNextWriteGeneration(Math.max(segmentInfos.getGeneration(), generation)+1); + String pendingSegmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.PENDING_SEGMENTS, + "", segmentInfos.getGeneration()); + try (IndexOutput segnOutput = targetDirectory.createOutput(pendingSegmentFileName, IOContext.DEFAULT)) { + segmentInfos.write(targetDirectory, segnOutput); + } + targetDirectory.sync(Collections.singleton(pendingSegmentFileName)); + targetDirectory.sync(createdFiles); + segmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, "", segmentInfos.getGeneration()); + targetDirectory.rename(pendingSegmentFileName, segmentFileName); + } + Lucene.pruneUnreferencedFiles(segmentFileName, targetDirectory); + assert assertCheckIndex(); + return Collections.unmodifiableList(createdFiles); + } + + private LiveDocs getLiveDocs(LeafReader reader) throws IOException { + if (deleteByQuerySupplier != null) { + // we have this additional delete by query functionality to filter out documents before we snapshot them + // we can't filter after the fact since we don't have an index anymore. + Query query = deleteByQuerySupplier.get(); + IndexSearcher s = new IndexSearcher(reader); + s.setQueryCache(null); + Query rewrite = s.rewrite(query); + Weight weight = s.createWeight(rewrite, ScoreMode.COMPLETE_NO_SCORES, 1.0f); + Scorer scorer = weight.scorer(reader.getContext()); + if (scorer != null) { + DocIdSetIterator iterator = scorer.iterator(); + if (iterator != null) { + Bits liveDocs = reader.getLiveDocs(); + final FixedBitSet bits; + if (liveDocs != null) { + bits = FixedBitSet.copyOf(liveDocs); + } else { + bits = new FixedBitSet(reader.maxDoc()); + bits.set(0, reader.maxDoc()); + } + int newDeletes = apply(iterator, bits); + if (newDeletes != 0) { + int numDeletes = reader.numDeletedDocs() + newDeletes; + return new LiveDocs(numDeletes, bits); + } + } + } + } + return new LiveDocs(reader.numDeletedDocs(), reader.getLiveDocs()); + } + + private int apply(DocIdSetIterator iterator, FixedBitSet bits) throws IOException { + int docID = -1; + int newDeletes = 0; + while ((docID = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + if (bits.get(docID)) { + bits.clear(docID); + newDeletes++; + } + } + return newDeletes; + } + + + private boolean assertCheckIndex() throws IOException { + ByteArrayOutputStream output = new ByteArrayOutputStream(1024); + try (CheckIndex checkIndex = new CheckIndex(targetDirectory)) { + checkIndex.setFailFast(true); + checkIndex.setInfoStream(new PrintStream(output, false, IOUtils.UTF_8), false); + CheckIndex.Status status = checkIndex.checkIndex(); + if (status == null || status.clean == false) { + throw new RuntimeException("CheckIndex failed: " + output.toString(IOUtils.UTF_8)); + } + return true; + } + } + + DirectoryReader wrapReader(DirectoryReader reader) throws IOException { + String softDeletesField = null; + for (LeafReaderContext ctx : reader.leaves()) { + String field = ctx.reader().getFieldInfos().getSoftDeletesField(); + if (field != null) { + softDeletesField = field; + break; + } + } + return softDeletesField == null ? reader : new SoftDeletesDirectoryReaderWrapper(reader, softDeletesField); + } + + private SegmentCommitInfo syncSegment(SegmentCommitInfo segmentCommitInfo, LiveDocs liveDocs, FieldInfos fieldInfos, + Map existingSegments, List createdFiles) throws IOException { + SegmentInfo si = segmentCommitInfo.info; + Codec codec = si.getCodec(); + final String segmentSuffix = ""; + SegmentCommitInfo newInfo; + final TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(targetDirectory); + BytesRef segmentId = new BytesRef(si.getId()); + boolean exists = existingSegments.containsKey(segmentId); + if (exists == false) { + SegmentInfo newSegmentInfo = new SegmentInfo(si.dir, si.getVersion(), si.getMinVersion(), si.name, si.maxDoc(), false, + si.getCodec(), si.getDiagnostics(), si.getId(), si.getAttributes(), null); + // we drop the sort on purpose since the field we sorted on doesn't exist in the target index anymore. + newInfo = new SegmentCommitInfo(newSegmentInfo, 0, 0, -1, -1, -1); + List fieldInfoCopy = new ArrayList<>(fieldInfos.size()); + for (FieldInfo fieldInfo : fieldInfos) { + fieldInfoCopy.add(new FieldInfo(fieldInfo.name, fieldInfo.number, + false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, fieldInfo.attributes(), 0, 0, + fieldInfo.isSoftDeletesField())); + } + FieldInfos newFieldInfos = new FieldInfos(fieldInfoCopy.toArray(new FieldInfo[0])); + codec.fieldInfosFormat().write(trackingDir, newSegmentInfo, segmentSuffix, newFieldInfos, IOContext.DEFAULT); + newInfo.setFieldInfosFiles(trackingDir.getCreatedFiles()); + String idxFile = IndexFileNames.segmentFileName(newSegmentInfo.name, segmentSuffix, FIELDS_INDEX_EXTENSION); + String dataFile = IndexFileNames.segmentFileName(newSegmentInfo.name, segmentSuffix, FIELDS_EXTENSION); + Directory sourceDir = newSegmentInfo.dir; + if (si.getUseCompoundFile()) { + sourceDir = codec.compoundFormat().getCompoundReader(sourceDir, si, IOContext.DEFAULT); + } + trackingDir.copyFrom(sourceDir, idxFile, idxFile, IOContext.DEFAULT); + trackingDir.copyFrom(sourceDir, dataFile, dataFile, IOContext.DEFAULT); + if (sourceDir != newSegmentInfo.dir) { + sourceDir.close(); + } + } else { + newInfo = existingSegments.get(segmentId); + assert newInfo.info.getUseCompoundFile() == false; + } + if (liveDocs.bits != null && liveDocs.numDeletes != 0 && liveDocs.numDeletes != newInfo.getDelCount()) { + if (newInfo.getDelCount() != 0) { + assert assertLiveDocs(liveDocs.bits, liveDocs.numDeletes); + } + codec.liveDocsFormat().writeLiveDocs(liveDocs.bits, trackingDir, newInfo, liveDocs.numDeletes - newInfo.getDelCount(), + IOContext.DEFAULT); + SegmentCommitInfo info = new SegmentCommitInfo(newInfo.info, liveDocs.numDeletes, 0, newInfo.getNextDelGen(), -1, -1); + info.setFieldInfosFiles(newInfo.getFieldInfosFiles()); + info.info.setFiles(trackingDir.getCreatedFiles()); + newInfo = info; + } + if (exists == false) { + newInfo.info.setFiles(trackingDir.getCreatedFiles()); + codec.segmentInfoFormat().write(trackingDir, newInfo.info, IOContext.DEFAULT); + } + createdFiles.addAll(trackingDir.getCreatedFiles()); + return newInfo; + } + + private boolean assertLiveDocs(Bits liveDocs, int deletes) { + int actualDeletes = 0; + for (int i = 0; i < liveDocs.length(); i++ ) { + if (liveDocs.get(i) == false) { + actualDeletes++; + } + } + assert actualDeletes == deletes : " actual: " + actualDeletes + " deletes: " + deletes; + return true; + } + + private static class LiveDocs { + final int numDeletes; + final Bits bits; + + LiveDocs(int numDeletes, Bits bits) { + this.numDeletes = numDeletes; + this.bits = bits; + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java new file mode 100644 index 00000000000..a75d5f488ee --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -0,0 +1,181 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.snapshots; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.search.Query; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.SimpleFSDirectory; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.lucene.search.Queries; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.ShardLock; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.ReadOnlyEngine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.TranslogStats; +import org.elasticsearch.repositories.FilterRepository; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.Repository; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + *

+ * This is a filter snapshot repository that only snapshots the minimal required information + * that is needed to recreate the index. In other words instead of snapshotting the entire shard + * with all it's lucene indexed fields, doc values, points etc. it only snapshots the the stored + * fields including _source and _routing as well as the live docs in oder to distinguish between + * live and deleted docs. + *

+ *

+ * The repository can wrap any other repository delegating the source only snapshot to it to and read + * from it. For instance a file repository of type fs by passing settings.delegate_type=fs + * at repository creation time. + *

+ * Snapshots restored from source only snapshots are minimal indices that are read-only and only allow + * match_all scroll searches in order to reindex the data. + */ +public final class SourceOnlySnapshotRepository extends FilterRepository { + private static final Setting DELEGATE_TYPE = new Setting<>("delegate_type", "", Function.identity(), Setting.Property + .NodeScope); + public static final Setting SOURCE_ONLY = Setting.boolSetting("index.source_only", false, Setting + .Property.IndexScope, Setting.Property.Final, Setting.Property.PrivateIndex); + + private static final String SNAPSHOT_DIR_NAME = "_snapshot"; + + SourceOnlySnapshotRepository(Repository in) { + super(in); + } + + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { + // we process the index metadata at snapshot time. This means if somebody tries to restore + // a _source only snapshot with a plain repository it will be just fine since we already set the + // required engine, that the index is read-only and the mapping to a default mapping + try { + MetaData.Builder builder = MetaData.builder(metaData); + for (IndexId indexId : indices) { + IndexMetaData index = metaData.index(indexId.getName()); + IndexMetaData.Builder indexMetadataBuilder = IndexMetaData.builder(index); + // for a minimal restore we basically disable indexing on all fields and only create an index + // that is valid from an operational perspective. ie. it will have all metadata fields like version/ + // seqID etc. and an indexed ID field such that we can potentially perform updates on them or delete documents. + ImmutableOpenMap mappings = index.getMappings(); + Iterator> iterator = mappings.iterator(); + while (iterator.hasNext()) { + ObjectObjectCursor next = iterator.next(); + // we don't need to obey any routing here stuff is read-only anyway and get is disabled + final String mapping = "{ \"" + next.key + "\": { \"enabled\": false, \"_meta\": " + next.value.source().string() + + " } }"; + indexMetadataBuilder.putMapping(next.key, mapping); + } + indexMetadataBuilder.settings(Settings.builder().put(index.getSettings()) + .put(SOURCE_ONLY.getKey(), true) + .put("index.blocks.write", true)); // read-only! + builder.put(indexMetadataBuilder); + } + super.initializeSnapshot(snapshotId, indices, builder.build()); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + + @Override + public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, + IndexShardSnapshotStatus snapshotStatus) { + if (shard.mapperService().documentMapper() != null // if there is no mapping this is null + && shard.mapperService().documentMapper().sourceMapper().isComplete() == false) { + throw new IllegalStateException("Can't snapshot _source only on an index that has incomplete source ie. has _source disabled " + + "or filters the source"); + } + ShardPath shardPath = shard.shardPath(); + Path dataPath = shardPath.getDataPath(); + // TODO should we have a snapshot tmp directory per shard that is maintained by the system? + Path snapPath = dataPath.resolve(SNAPSHOT_DIR_NAME); + try (FSDirectory directory = new SimpleFSDirectory(snapPath)) { + Store tempStore = new Store(store.shardId(), store.indexSettings(), directory, new ShardLock(store.shardId()) { + @Override + protected void closeInternal() { + // do nothing; + } + }, Store.OnClose.EMPTY); + Supplier querySupplier = shard.mapperService().hasNested() ? Queries::newNestedFilter : null; + // SourceOnlySnapshot will take care of soft- and hard-deletes no special casing needed here + SourceOnlySnapshot snapshot = new SourceOnlySnapshot(tempStore.directory(), querySupplier); + snapshot.syncSnapshot(snapshotIndexCommit); + // we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID + SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); + tempStore.bootstrapNewHistory(segmentInfos.totalMaxDoc()); + store.incRef(); + try (DirectoryReader reader = DirectoryReader.open(tempStore.directory())) { + IndexCommit indexCommit = reader.getIndexCommit(); + super.snapshotShard(shard, tempStore, snapshotId, indexId, indexCommit, snapshotStatus); + } finally { + store.decRef(); + } + } catch (IOException e) { + // why on earth does this super method not declare IOException + throw new UncheckedIOException(e); + } + } + + /** + * Returns an {@link EngineFactory} for the source only snapshots. + */ + public static EngineFactory getEngineFactory() { + return config -> new ReadOnlyEngine(config, null, new TranslogStats(0, 0, 0, 0, 0), true, + reader -> { + try { + return SeqIdGeneratingFilterReader.wrap(reader, config.getPrimaryTermSupplier().getAsLong()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + /** + * Returns a new source only repository factory + */ + public static Repository.Factory newRepositoryFactory() { + return new Repository.Factory() { + + @Override + public Repository create(RepositoryMetaData metadata) { + throw new UnsupportedOperationException(); + } + + @Override + public Repository create(RepositoryMetaData metaData, Function typeLookup) throws Exception { + String delegateType = DELEGATE_TYPE.get(metaData.settings()); + if (Strings.hasLength(delegateType) == false) { + throw new IllegalArgumentException(DELEGATE_TYPE.getKey() + " must be set"); + } + Repository.Factory factory = typeLookup.apply(delegateType); + return new SourceOnlySnapshotRepository(factory.create(new RepositoryMetaData(metaData.name(), + delegateType, metaData.settings()), typeLookup)); + } + }; + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index aaa3effcfe8..ca76e71e052 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -31,21 +31,28 @@ import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.license.LicenseService; import org.elasticsearch.license.LicensesMetaData; import org.elasticsearch.license.Licensing; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.ExtensiblePlugin; +import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.plugins.ScriptPlugin; +import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.snapshots.SourceOnlySnapshotRepository; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.action.TransportXPackInfoAction; @@ -67,13 +74,15 @@ import java.security.PrivilegedAction; import java.time.Clock; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -public class XPackPlugin extends XPackClientPlugin implements ScriptPlugin, ExtensiblePlugin { +public class XPackPlugin extends XPackClientPlugin implements ScriptPlugin, ExtensiblePlugin, RepositoryPlugin, EnginePlugin { private static Logger logger = ESLoggerFactory.getLogger(XPackPlugin.class); private static DeprecationLogger deprecationLogger = new DeprecationLogger(logger); @@ -340,4 +349,23 @@ public class XPackPlugin extends XPackClientPlugin implements ScriptPlugin, Exte } } + @Override + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory()); + } + + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + if (indexSettings.getValue(SourceOnlySnapshotRepository.SOURCE_ONLY)) { + return Optional.of(SourceOnlySnapshotRepository.getEngineFactory()); + } + return Optional.empty(); + } + + @Override + public List> getSettings() { + List> settings = super.getSettings(); + settings.add(SourceOnlySnapshotRepository.SOURCE_ONLY); + return settings; + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java new file mode 100644 index 00000000000..6d3a17e3ebf --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java @@ -0,0 +1,291 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.env.Environment; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MockEngineFactoryPlugin; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.node.Node; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.RepositoryPlugin; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.slice.SliceBuilder; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.test.ESIntegTestCase; +import org.hamcrest.Matchers; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; + +public class SourceOnlySnapshotIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + Collection> classes = new ArrayList<>(super.nodePlugins()); + classes.add(MyPlugin.class); + return classes; + } + + @Override + protected Collection> getMockPlugins() { + Collection> classes = new ArrayList<>(super.getMockPlugins()); + classes.remove(MockEngineFactoryPlugin.class); + return classes; + } + + public static final class MyPlugin extends Plugin implements RepositoryPlugin, EnginePlugin { + @Override + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory()); + } + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + if (indexSettings.getValue(SourceOnlySnapshotRepository.SOURCE_ONLY)) { + return Optional.of(SourceOnlySnapshotRepository.getEngineFactory()); + } + return Optional.empty(); + } + + @Override + public List> getSettings() { + List> settings = new ArrayList<>(super.getSettings()); + settings.add(SourceOnlySnapshotRepository.SOURCE_ONLY); + return settings; + } + } + + public void testSnapshotAndRestore() throws Exception { + final String sourceIdx = "test-idx"; + boolean requireRouting = randomBoolean(); + boolean useNested = randomBoolean(); + IndexRequestBuilder[] builders = snashotAndRestore(sourceIdx, 1, true, requireRouting, useNested); + assertHits(sourceIdx, builders.length); + assertMappings(sourceIdx, requireRouting, useNested); + SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () -> { + client().prepareSearch(sourceIdx).setQuery(QueryBuilders.idsQuery() + .addIds("" + randomIntBetween(0, builders.length))).get(); + }); + assertTrue(e.toString().contains("_source only indices can't be searched or filtered")); + + e = expectThrows(SearchPhaseExecutionException.class, () -> + client().prepareSearch(sourceIdx).setQuery(QueryBuilders.termQuery("field1", "bar")).get()); + assertTrue(e.toString().contains("_source only indices can't be searched or filtered")); + // make sure deletes do not work + String idToDelete = "" + randomIntBetween(0, builders.length); + expectThrows(ClusterBlockException.class, () -> client().prepareDelete(sourceIdx, "_doc", idToDelete) + .setRouting("r" + idToDelete).get()); + internalCluster().ensureAtLeastNumDataNodes(2); + client().admin().indices().prepareUpdateSettings(sourceIdx) + .setSettings(Settings.builder().put("index.number_of_replicas", 1)).get(); + ensureGreen(sourceIdx); + assertHits(sourceIdx, builders.length); + } + + public void testSnapshotAndRestoreWithNested() throws Exception { + final String sourceIdx = "test-idx"; + boolean requireRouting = randomBoolean(); + IndexRequestBuilder[] builders = snashotAndRestore(sourceIdx, 1, true, requireRouting, true); + IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().clear().setDocs(true).get(); + assertThat(indicesStatsResponse.getTotal().docs.getDeleted(), Matchers.greaterThan(0L)); + assertHits(sourceIdx, builders.length); + assertMappings(sourceIdx, requireRouting, true); + SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () -> + client().prepareSearch(sourceIdx).setQuery(QueryBuilders.idsQuery().addIds("" + randomIntBetween(0, builders.length))).get()); + assertTrue(e.toString().contains("_source only indices can't be searched or filtered")); + e = expectThrows(SearchPhaseExecutionException.class, () -> + client().prepareSearch(sourceIdx).setQuery(QueryBuilders.termQuery("field1", "bar")).get()); + assertTrue(e.toString().contains("_source only indices can't be searched or filtered")); + // make sure deletes do not work + String idToDelete = "" + randomIntBetween(0, builders.length); + expectThrows(ClusterBlockException.class, () -> client().prepareDelete(sourceIdx, "_doc", idToDelete) + .setRouting("r" + idToDelete).get()); + internalCluster().ensureAtLeastNumDataNodes(2); + client().admin().indices().prepareUpdateSettings(sourceIdx).setSettings(Settings.builder().put("index.number_of_replicas", 1)) + .get(); + ensureGreen(sourceIdx); + assertHits(sourceIdx, builders.length); + } + + private void assertMappings(String sourceIdx, boolean requireRouting, boolean useNested) throws IOException { + GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings(sourceIdx).get(); + ImmutableOpenMap mapping = getMappingsResponse + .getMappings().get(sourceIdx); + assertTrue(mapping.containsKey("_doc")); + String nested = useNested ? + ",\"incorrect\":{\"type\":\"object\"},\"nested\":{\"type\":\"nested\",\"properties\":{\"value\":{\"type\":\"long\"}}}" : ""; + if (requireRouting) { + assertEquals("{\"_doc\":{\"enabled\":false," + + "\"_meta\":{\"_doc\":{\"_routing\":{\"required\":true}," + + "\"properties\":{\"field1\":{\"type\":\"text\"," + + "\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}}" + nested + + "}}}}}", mapping.get("_doc").source().string()); + } else { + assertEquals("{\"_doc\":{\"enabled\":false," + + "\"_meta\":{\"_doc\":{\"properties\":{\"field1\":{\"type\":\"text\"," + + "\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}}" + nested + "}}}}}", + mapping.get("_doc").source().string()); + } + } + + private void assertHits(String index, int numDocsExpected) { + SearchResponse searchResponse = client().prepareSearch(index) + .addSort(SeqNoFieldMapper.NAME, SortOrder.ASC) + .setSize(numDocsExpected).get(); + Consumer assertConsumer = res -> { + SearchHits hits = res.getHits(); + IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().clear().setDocs(true).get(); + long deleted = indicesStatsResponse.getTotal().docs.getDeleted(); + boolean allowHoles = deleted > 0; // we use indexRandom which might create holes ie. deleted docs + long i = 0; + for (SearchHit hit : hits) { + String id = hit.getId(); + Map sourceAsMap = hit.getSourceAsMap(); + assertTrue(sourceAsMap.containsKey("field1")); + if (allowHoles) { + long seqId = ((Number) hit.getSortValues()[0]).longValue(); + assertThat(i, Matchers.lessThanOrEqualTo(seqId)); + i = seqId + 1; + } else { + assertEquals(i++, hit.getSortValues()[0]); + } + assertEquals("bar " + id, sourceAsMap.get("field1")); + assertEquals("r" + id, hit.field("_routing").getValue()); + } + }; + assertConsumer.accept(searchResponse); + assertEquals(numDocsExpected, searchResponse.getHits().totalHits); + searchResponse = client().prepareSearch(index) + .addSort(SeqNoFieldMapper.NAME, SortOrder.ASC) + .setScroll("1m") + .slice(new SliceBuilder(SeqNoFieldMapper.NAME, randomIntBetween(0,1), 2)) + .setSize(randomIntBetween(1, 10)).get(); + do { + // now do a scroll with a slice + assertConsumer.accept(searchResponse); + searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get(); + } while (searchResponse.getHits().getHits().length > 0); + + } + + private IndexRequestBuilder[] snashotAndRestore(String sourceIdx, int numShards, boolean minimal, boolean requireRouting, boolean + useNested) + throws ExecutionException, InterruptedException, IOException { + logger.info("--> starting a master node and a data node"); + internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNode(); + + final Client client = client(); + final String repo = "test-repo"; + final String snapshot = "test-snap"; + + logger.info("--> creating repository"); + assertAcked(client.admin().cluster().preparePutRepository(repo).setType("source") + .setSettings(Settings.builder().put("location", randomRepoPath()) + .put("delegate_type", "fs") + .put("restore_minimal", minimal) + .put("compress", randomBoolean()))); + + CreateIndexRequestBuilder createIndexRequestBuilder = prepareCreate(sourceIdx, 0, Settings.builder() + .put("number_of_shards", numShards).put("number_of_replicas", 0)); + List mappings = new ArrayList<>(); + if (requireRouting) { + mappings.addAll(Arrays.asList("_routing", "required=true")); + } + + if (useNested) { + mappings.addAll(Arrays.asList("nested", "type=nested", "incorrect", "type=object")); + } + if (mappings.isEmpty() == false) { + createIndexRequestBuilder.addMapping("_doc", mappings.toArray()); + } + assertAcked(createIndexRequestBuilder); + ensureGreen(); + + logger.info("--> indexing some data"); + IndexRequestBuilder[] builders = new IndexRequestBuilder[randomIntBetween(10, 100)]; + for (int i = 0; i < builders.length; i++) { + XContentBuilder source = jsonBuilder() + .startObject() + .field("field1", "bar " + i); + if (useNested) { + source.startArray("nested"); + for (int j = 0; j < 2; ++j) { + source = source.startObject().field("value", i + 1 + j).endObject(); + } + source.endArray(); + } + source.endObject(); + builders[i] = client().prepareIndex(sourceIdx, "_doc", + Integer.toString(i)).setSource(source).setRouting("r" + i); + } + indexRandom(true, builders); + flushAndRefresh(); + assertHitCount(client().prepareSearch(sourceIdx).setQuery(QueryBuilders.idsQuery().addIds("0")).get(), 1); + + logger.info("--> snapshot the index"); + CreateSnapshotResponse createResponse = client.admin().cluster() + .prepareCreateSnapshot(repo, snapshot) + .setWaitForCompletion(true).setIndices(sourceIdx).get(); + assertEquals(SnapshotState.SUCCESS, createResponse.getSnapshotInfo().state()); + + logger.info("--> delete index and stop the data node"); + assertAcked(client.admin().indices().prepareDelete(sourceIdx).get()); + internalCluster().stopRandomDataNode(); + client().admin().cluster().prepareHealth().setTimeout("30s").setWaitForNodes("1"); + + logger.info("--> start a new data node"); + final Settings dataSettings = Settings.builder() + .put(Node.NODE_NAME_SETTING.getKey(), randomAlphaOfLength(5)) + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) // to get a new node id + .build(); + internalCluster().startDataOnlyNode(dataSettings); + client().admin().cluster().prepareHealth().setTimeout("30s").setWaitForNodes("2"); + + logger.info("--> restore the index and ensure all shards are allocated"); + RestoreSnapshotResponse restoreResponse = client().admin().cluster() + .prepareRestoreSnapshot(repo, snapshot).setWaitForCompletion(true) + .setIndices(sourceIdx).get(); + assertEquals(restoreResponse.getRestoreInfo().totalShards(), + restoreResponse.getRestoreInfo().successfulShards()); + ensureYellow(); + return builders; + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java new file mode 100644 index 00000000000..7058724ecf0 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -0,0 +1,358 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.snapshots; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.util.Bits; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.fieldvisitor.FieldsVisitor; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.threadpool.ThreadPool; +import org.hamcrest.Matchers; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.index.mapper.SourceToParse.source; + +public class SourceOnlySnapshotShardTests extends IndexShardTestCase { + + public void testSourceIncomplete() throws IOException { + ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("index", "_na_", 0), randomAlphaOfLength(10), true, + ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE); + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder(shardRouting.getIndexName()) + .settings(settings) + .primaryTerm(0, primaryTerm) + .putMapping("_doc", + "{\"_source\":{\"enabled\": false}}").build(); + IndexShard shard = newShard(shardRouting, metaData, new InternalEngineFactory()); + recoverShardFromStore(shard); + + for (int i = 0; i < 1; i++) { + final String id = Integer.toString(i); + indexDoc(shard, "_doc", id); + } + SnapshotId snapshotId = new SnapshotId("test", "test"); + IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID()); + SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository()); + repository.start(); + try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) { + IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(); + IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, () -> + runAsSnapshot(shard.getThreadPool(), + () -> repository.snapshotShard(shard, shard.store(), snapshotId, indexId, + snapshotRef.getIndexCommit(), indexShardSnapshotStatus))); + assertEquals("Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source" + , illegalStateException.getMessage()); + } + closeShards(shard); + } + + public void testIncrementalSnapshot() throws IOException { + IndexShard shard = newStartedShard(); + for (int i = 0; i < 10; i++) { + final String id = Integer.toString(i); + indexDoc(shard, "_doc", id); + } + + IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID()); + SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository()); + repository.start(); + int totalFileCount = -1; + try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) { + IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(); + SnapshotId snapshotId = new SnapshotId("test", "test"); + runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard, shard.store(), snapshotId, indexId, snapshotRef + .getIndexCommit(), indexShardSnapshotStatus)); + IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); + assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); + totalFileCount = copy.getTotalFileCount(); + assertEquals(copy.getStage(), IndexShardSnapshotStatus.Stage.DONE); + } + + indexDoc(shard, "_doc", Integer.toString(10)); + indexDoc(shard, "_doc", Integer.toString(11)); + try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) { + SnapshotId snapshotId = new SnapshotId("test_1", "test_1"); + + IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(); + runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard, shard.store(), snapshotId, indexId, snapshotRef + .getIndexCommit(), indexShardSnapshotStatus)); + IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); + // we processed the segments_N file plus _1.si, _1.fdx, _1.fnm, _1.fdt + assertEquals(5, copy.getIncrementalFileCount()); + // in total we have 4 more files than the previous snap since we don't count the segments_N twice + assertEquals(totalFileCount+4, copy.getTotalFileCount()); + assertEquals(copy.getStage(), IndexShardSnapshotStatus.Stage.DONE); + } + deleteDoc(shard, "_doc", Integer.toString(10)); + try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) { + SnapshotId snapshotId = new SnapshotId("test_2", "test_2"); + + IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(); + runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard, shard.store(), snapshotId, indexId, snapshotRef + .getIndexCommit(), indexShardSnapshotStatus)); + IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); + // we processed the segments_N file plus _1_1.liv + assertEquals(2, copy.getIncrementalFileCount()); + // in total we have 5 more files than the previous snap since we don't count the segments_N twice + assertEquals(totalFileCount+5, copy.getTotalFileCount()); + assertEquals(copy.getStage(), IndexShardSnapshotStatus.Stage.DONE); + } + closeShards(shard); + } + + private String randomDoc() { + return "{ \"value\" : \"" + randomAlphaOfLength(10) + "\"}"; + } + + public void testRestoreMinmal() throws IOException { + IndexShard shard = newStartedShard(true); + int numInitialDocs = randomIntBetween(10, 100); + for (int i = 0; i < numInitialDocs; i++) { + final String id = Integer.toString(i); + indexDoc(shard, "_doc", id, randomDoc()); + if (randomBoolean()) { + shard.refresh("test"); + } + } + for (int i = 0; i < numInitialDocs; i++) { + final String id = Integer.toString(i); + if (randomBoolean()) { + if (rarely()) { + deleteDoc(shard, "_doc", id); + } else { + indexDoc(shard, "_doc", id, randomDoc()); + } + } + if (frequently()) { + shard.refresh("test"); + } + } + SnapshotId snapshotId = new SnapshotId("test", "test"); + IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID()); + SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository()); + repository.start(); + try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) { + IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(); + runAsSnapshot(shard.getThreadPool(), () -> { + repository.initializeSnapshot(snapshotId, Arrays.asList(indexId), + MetaData.builder().put(shard.indexSettings() + .getIndexMetaData(), false).build()); + repository.snapshotShard(shard, shard.store(), snapshotId, indexId, snapshotRef.getIndexCommit(), indexShardSnapshotStatus); + }); + IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); + assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); + assertEquals(copy.getStage(), IndexShardSnapshotStatus.Stage.DONE); + } + shard.refresh("test"); + ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("index", "_na_", 0), randomAlphaOfLength(10), true, + ShardRoutingState.INITIALIZING, + new RecoverySource.SnapshotRecoverySource(new Snapshot("src_only", snapshotId), Version.CURRENT, indexId.getId())); + IndexMetaData metaData = runAsSnapshot(threadPool, () -> repository.getSnapshotIndexMetaData(snapshotId, indexId)); + IndexShard restoredShard = newShard(shardRouting, metaData, null, SourceOnlySnapshotRepository.getEngineFactory(), () -> {}); + restoredShard.mapperService().merge(shard.indexSettings().getIndexMetaData(), MapperService.MergeReason.MAPPING_RECOVERY); + DiscoveryNode discoveryNode = new DiscoveryNode("node_g", buildNewFakeTransportAddress(), Version.CURRENT); + restoredShard.markAsRecovering("test from snap", new RecoveryState(restoredShard.routingEntry(), discoveryNode, null)); + runAsSnapshot(shard.getThreadPool(), () -> + assertTrue(restoredShard.restoreFromRepository(repository))); + assertEquals(restoredShard.recoveryState().getStage(), RecoveryState.Stage.DONE); + assertEquals(restoredShard.recoveryState().getTranslog().recoveredOperations(), 0); + assertEquals(IndexShardState.POST_RECOVERY, restoredShard.state()); + restoredShard.refresh("test"); + assertEquals(restoredShard.docStats().getCount(), shard.docStats().getCount()); + EngineException engineException = expectThrows(EngineException.class, () -> restoredShard.get( + new Engine.Get(false, false, "_doc", Integer.toString(0), new Term("_id", Uid.encodeId(Integer.toString(0)))))); + assertEquals(engineException.getCause().getMessage(), "_source only indices can't be searched or filtered"); + SeqNoStats seqNoStats = restoredShard.seqNoStats(); + assertEquals(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint()); + final IndexShard targetShard; + try (Engine.Searcher searcher = restoredShard.acquireSearcher("test")) { + assertEquals(searcher.reader().maxDoc(), seqNoStats.getLocalCheckpoint()); + TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), Integer.MAX_VALUE); + assertEquals(searcher.reader().numDocs(), search.totalHits.value); + search = searcher.searcher().search(new MatchAllDocsQuery(), Integer.MAX_VALUE, + new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG)), false); + assertEquals(searcher.reader().numDocs(), search.totalHits.value); + long previous = -1; + for (ScoreDoc doc : search.scoreDocs) { + FieldDoc fieldDoc = (FieldDoc) doc; + assertEquals(1, fieldDoc.fields.length); + long current = (Long)fieldDoc.fields[0]; + assertThat(previous, Matchers.lessThan(current)); + previous = current; + } + expectThrows(UnsupportedOperationException.class, () -> searcher.searcher().search(new TermQuery(new Term("boom", "boom")), 1)); + targetShard = reindex(searcher.getDirectoryReader(), new MappingMetaData("_doc", + restoredShard.mapperService().documentMapper("_doc").meta())); + } + + for (int i = 0; i < numInitialDocs; i++) { + Engine.Get get = new Engine.Get(false, false, "_doc", Integer.toString(i), new Term("_id", Uid.encodeId(Integer.toString(i)))); + Engine.GetResult original = shard.get(get); + Engine.GetResult restored = targetShard.get(get); + assertEquals(original.exists(), restored.exists()); + + if (original.exists()) { + Document document = original.docIdAndVersion().reader.document(original.docIdAndVersion().docId); + Document restoredDocument = restored.docIdAndVersion().reader.document(restored.docIdAndVersion().docId); + for (IndexableField field : document) { + assertEquals(document.get(field.name()), restoredDocument.get(field.name())); + } + } + IOUtils.close(original, restored); + } + + closeShards(shard, restoredShard, targetShard); + } + + public IndexShard reindex(DirectoryReader reader, MappingMetaData mapping) throws IOException { + ShardRouting targetShardRouting = TestShardRouting.newShardRouting(new ShardId("target", "_na_", 0), randomAlphaOfLength(10), true, + ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE); + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData.Builder metaData = IndexMetaData.builder(targetShardRouting.getIndexName()) + .settings(settings) + .primaryTerm(0, primaryTerm); + metaData.putMapping(mapping); + IndexShard targetShard = newShard(targetShardRouting, metaData.build(), new InternalEngineFactory()); + boolean success = false; + try { + recoverShardFromStore(targetShard); + String index = targetShard.shardId().getIndexName(); + FieldsVisitor rootFieldsVisitor = new FieldsVisitor(true); + for (LeafReaderContext ctx : reader.leaves()) { + LeafReader leafReader = ctx.reader(); + Bits liveDocs = leafReader.getLiveDocs(); + for (int i = 0; i < leafReader.maxDoc(); i++) { + if (liveDocs == null || liveDocs.get(i)) { + rootFieldsVisitor.reset(); + leafReader.document(i, rootFieldsVisitor); + rootFieldsVisitor.postProcess(targetShard.mapperService()); + Uid uid = rootFieldsVisitor.uid(); + BytesReference source = rootFieldsVisitor.source(); + assert source != null : "_source is null but should have been filtered out at snapshot time"; + Engine.Result result = targetShard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, source + (index, uid.type(), uid.id(), source, XContentHelper.xContentType(source)) + .routing(rootFieldsVisitor.routing()), 1, false); + if (result.getResultType() != Engine.Result.Type.SUCCESS) { + throw new IllegalStateException("failed applying post restore operation result: " + result + .getResultType(), result.getFailure()); + } + } + } + } + targetShard.refresh("test"); + success = true; + } finally { + if (success == false) { + closeShards(targetShard); + } + } + return targetShard; + } + + + /** Create a {@link Environment} with random path.home and path.repo **/ + private Environment createEnvironment() { + Path home = createTempDir(); + return TestEnvironment.newEnvironment(Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), home.toAbsolutePath()) + .put(Environment.PATH_REPO_SETTING.getKey(), home.resolve("repo").toAbsolutePath()) + .build()); + } + + /** Create a {@link Repository} with a random name **/ + private Repository createRepository() throws IOException { + Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); + RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); + return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry()); + } + + private static void runAsSnapshot(ThreadPool pool, Runnable runnable) { + runAsSnapshot(pool, (Callable) () -> { + runnable.run(); + return null; + }); + } + + private static T runAsSnapshot(ThreadPool pool, Callable runnable) { + PlainActionFuture future = new PlainActionFuture<>(); + pool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + try { + future.onResponse(runnable.call()); + } catch (Exception e) { + future.onFailure(e); + } + }); + try { + return future.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof Exception) { + throw ExceptionsHelper.convertToRuntime((Exception) e.getCause()); + } else { + throw new AssertionError(e.getCause()); + } + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotTests.java new file mode 100644 index 00000000000..e7d731739de --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotTests.java @@ -0,0 +1,245 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.snapshots; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.FloatPoint; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FilterMergePolicy; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; +import org.apache.lucene.index.StandardDirectoryReader; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.DocValuesFieldExistsQuery; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.List; + +public class SourceOnlySnapshotTests extends ESTestCase { + public void testSourceOnlyRandom() throws IOException { + try (Directory dir = newDirectory(); Directory targetDir = newDirectory()) { + SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); + IndexWriterConfig indexWriterConfig = newIndexWriterConfig().setIndexDeletionPolicy + (deletionPolicy).setSoftDeletesField(random().nextBoolean() ? null : Lucene.SOFT_DELETES_FIELD); + try (RandomIndexWriter writer = new RandomIndexWriter(random(), dir, indexWriterConfig, false)) { + final String softDeletesField = writer.w.getConfig().getSoftDeletesField(); + // we either use the soft deletes directly or manually delete them to test the additional delete functionality + boolean modifyDeletedDocs = softDeletesField != null && randomBoolean(); + SourceOnlySnapshot snapshoter = new SourceOnlySnapshot(targetDir, + modifyDeletedDocs ? () -> new DocValuesFieldExistsQuery(softDeletesField) : null) { + @Override + DirectoryReader wrapReader(DirectoryReader reader) throws IOException { + return modifyDeletedDocs ? reader : super.wrapReader(reader); + } + }; + writer.commit(); + int numDocs = scaledRandomIntBetween(100, 10000); + boolean appendOnly = randomBoolean(); + for (int i = 0; i < numDocs; i++) { + int docId = appendOnly ? i : randomIntBetween(0, 100); + Document d = newRandomDocument(docId); + if (appendOnly) { + writer.addDocument(d); + } else { + writer.updateDocument(new Term("id", Integer.toString(docId)), d); + } + if (rarely()) { + if (randomBoolean()) { + writer.commit(); + } + IndexCommit snapshot = deletionPolicy.snapshot(); + try { + snapshoter.syncSnapshot(snapshot); + } finally { + deletionPolicy.release(snapshot); + } + } + } + if (randomBoolean()) { + writer.commit(); + } + IndexCommit snapshot = deletionPolicy.snapshot(); + try { + snapshoter.syncSnapshot(snapshot); + try (DirectoryReader snapReader = snapshoter.wrapReader(DirectoryReader.open(targetDir)); + DirectoryReader wrappedReader = snapshoter.wrapReader(DirectoryReader.open(snapshot))) { + DirectoryReader reader = modifyDeletedDocs + ? new SoftDeletesDirectoryReaderWrapper(wrappedReader, softDeletesField) : wrappedReader; + assertEquals(snapReader.maxDoc(), reader.maxDoc()); + assertEquals(snapReader.numDocs(), reader.numDocs()); + for (int i = 0; i < snapReader.maxDoc(); i++) { + assertEquals(snapReader.document(i).get("_source"), reader.document(i).get("_source")); + } + for (LeafReaderContext ctx : snapReader.leaves()) { + if (ctx.reader() instanceof SegmentReader) { + assertNull(((SegmentReader) ctx.reader()).getSegmentInfo().info.getIndexSort()); + } + } + } + } finally { + deletionPolicy.release(snapshot); + } + } + } + } + + private Document newRandomDocument(int id) { + Document doc = new Document(); + doc.add(new StringField("id", Integer.toString(id), Field.Store.YES)); + doc.add(new NumericDocValuesField("id", id)); + if (randomBoolean()) { + doc.add(new TextField("text", "the quick brown fox", Field.Store.NO)); + } + if (randomBoolean()) { + doc.add(new FloatPoint("float_point", 1.3f, 3.4f)); + } + if (randomBoolean()) { + doc.add(new NumericDocValuesField("some_value", randomLong())); + } + doc.add(new StoredField("_source", randomRealisticUnicodeOfCodepointLengthBetween(5, 10))); + return doc; + } + + public void testSrcOnlySnap() throws IOException { + try (Directory dir = newDirectory()) { + SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig() + .setSoftDeletesField(Lucene.SOFT_DELETES_FIELD) + .setIndexDeletionPolicy(deletionPolicy).setMergePolicy(new FilterMergePolicy(NoMergePolicy.INSTANCE) { + @Override + public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) { + return randomBoolean(); + } + })); + Document doc = new Document(); + doc.add(new StringField("id", "1", Field.Store.YES)); + doc.add(new TextField("text", "the quick brown fox", Field.Store.NO)); + doc.add(new NumericDocValuesField("rank", 1)); + doc.add(new StoredField("src", "the quick brown fox")); + writer.addDocument(doc); + doc = new Document(); + doc.add(new StringField("id", "2", Field.Store.YES)); + doc.add(new TextField("text", "the quick blue fox", Field.Store.NO)); + doc.add(new NumericDocValuesField("rank", 2)); + doc.add(new StoredField("src", "the quick blue fox")); + doc.add(new StoredField("dummy", "foo")); // add a field only this segment has + writer.addDocument(doc); + writer.flush(); + doc = new Document(); + doc.add(new StringField("id", "1", Field.Store.YES)); + doc.add(new TextField("text", "the quick brown fox", Field.Store.NO)); + doc.add(new NumericDocValuesField("rank", 3)); + doc.add(new StoredField("src", "the quick brown fox")); + writer.softUpdateDocument(new Term("id", "1"), doc, new NumericDocValuesField(Lucene.SOFT_DELETES_FIELD, 1)); + writer.commit(); + Directory targetDir = newDirectory(); + IndexCommit snapshot = deletionPolicy.snapshot(); + SourceOnlySnapshot snapshoter = new SourceOnlySnapshot(targetDir); + snapshoter.syncSnapshot(snapshot); + + StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(snapshot); + try (DirectoryReader snapReader = DirectoryReader.open(targetDir)) { + assertEquals(snapReader.maxDoc(), 3); + assertEquals(snapReader.numDocs(), 2); + for (int i = 0; i < 3; i++) { + assertEquals(snapReader.document(i).get("src"), reader.document(i).get("src")); + } + IndexSearcher searcher = new IndexSearcher(snapReader); + TopDocs id = searcher.search(new TermQuery(new Term("id", "1")), 10); + assertEquals(0, id.totalHits.value); + } + + snapshoter = new SourceOnlySnapshot(targetDir); + List createdFiles = snapshoter.syncSnapshot(snapshot); + assertEquals(0, createdFiles.size()); + deletionPolicy.release(snapshot); + // now add another doc + doc = new Document(); + doc.add(new StringField("id", "4", Field.Store.YES)); + doc.add(new TextField("text", "the quick blue fox", Field.Store.NO)); + doc.add(new NumericDocValuesField("rank", 2)); + doc.add(new StoredField("src", "the quick blue fox")); + writer.addDocument(doc); + doc = new Document(); + doc.add(new StringField("id", "5", Field.Store.YES)); + doc.add(new TextField("text", "the quick blue fox", Field.Store.NO)); + doc.add(new NumericDocValuesField("rank", 2)); + doc.add(new StoredField("src", "the quick blue fox")); + writer.addDocument(doc); + writer.commit(); + { + snapshot = deletionPolicy.snapshot(); + snapshoter = new SourceOnlySnapshot(targetDir); + createdFiles = snapshoter.syncSnapshot(snapshot); + assertEquals(4, createdFiles.size()); + for (String file : createdFiles) { + String extension = IndexFileNames.getExtension(file); + switch (extension) { + case "fdt": + case "fdx": + case "fnm": + case "si": + break; + default: + fail("unexpected extension: " + extension); + } + } + try(DirectoryReader snapReader = DirectoryReader.open(targetDir)) { + assertEquals(snapReader.maxDoc(), 5); + assertEquals(snapReader.numDocs(), 4); + } + deletionPolicy.release(snapshot); + } + writer.deleteDocuments(new Term("id", "5")); + writer.commit(); + { + snapshot = deletionPolicy.snapshot(); + snapshoter = new SourceOnlySnapshot(targetDir); + createdFiles = snapshoter.syncSnapshot(snapshot); + assertEquals(1, createdFiles.size()); + for (String file : createdFiles) { + String extension = IndexFileNames.getExtension(file); + switch (extension) { + case "liv": + break; + default: + fail("unexpected extension: " + extension); + } + } + try(DirectoryReader snapReader = DirectoryReader.open(targetDir)) { + assertEquals(snapReader.maxDoc(), 5); + assertEquals(snapReader.numDocs(), 3); + } + deletionPolicy.release(snapshot); + } + writer.close(); + targetDir.close(); + reader.close(); + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/snapshot/10_basic.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/snapshot/10_basic.yml new file mode 100644 index 00000000000..c0f161472b7 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/snapshot/10_basic.yml @@ -0,0 +1,84 @@ +--- +setup: + + - do: + snapshot.create_repository: + repository: test_repo_restore_1 + body: + type: source + settings: + delegate_type: fs + location: "test_repo_restore_1_loc" + + - do: + indices.create: + index: test_index + body: + settings: + number_of_shards: 1 + number_of_replicas: 0 + + - do: + cluster.health: + wait_for_status: green + +--- +"Create a source only snapshot and then restore it": + + - do: + index: + index: test_index + type: _doc + id: 1 + body: { foo: bar } + - do: + indices.flush: + index: test_index + + - do: + snapshot.create: + repository: test_repo_restore_1 + snapshot: test_snapshot + wait_for_completion: true + + - match: { snapshot.snapshot: test_snapshot } + - match: { snapshot.state : SUCCESS } + - match: { snapshot.shards.successful: 1 } + - match: { snapshot.shards.failed : 0 } + - is_true: snapshot.version + - gt: { snapshot.version_id: 0} + + - do: + indices.close: + index : test_index + + - do: + snapshot.restore: + repository: test_repo_restore_1 + snapshot: test_snapshot + wait_for_completion: true + + - do: + indices.recovery: + index: test_index + + - match: { test_index.shards.0.type: SNAPSHOT } + - match: { test_index.shards.0.stage: DONE } + - match: { test_index.shards.0.translog.recovered: 0} + - match: { test_index.shards.0.translog.total: 0} + - match: { test_index.shards.0.translog.total_on_start: 0} + - match: { test_index.shards.0.index.files.recovered: 5} + - match: { test_index.shards.0.index.files.reused: 0} + - match: { test_index.shards.0.index.size.reused_in_bytes: 0} + - gt: { test_index.shards.0.index.size.recovered_in_bytes: 0} + + - do: + search: + index: test_index + body: + query: + match_all: {} + + - match: {hits.total: 1 } + - length: {hits.hits: 1 } + - match: {hits.hits.0._id: "1" }