Add `_source`-only snapshot repository (#32844)

This change adds a `_source` only snapshot repository that allows to wrap
any existing repository as a _backend_ to snapshot only the `_source` part
including live docs markers. Snapshots taken with the `source` repository
won't include any indices,  doc-values or points. The snapshot will be reduced in size and
functionality such that it requires full re-indexing after it's successfully restored.

The restore process will copy the `_source` data locally starts a special shard and engine
to allow `match_all` scrolls and searches. Any other query, or get call will fail with and unsupported operation exception.  The restored index is also marked as read-only.

This feature aims mainly for disaster recovery use-cases where snapshot size is
a concern or where time to restore is less of an issue.

**NOTE**: The snapshot produced by this repository is still a valid lucene index. This change doesn't allow for any longer retention policies which is out of scope for this change.
This commit is contained in:
Simon Willnauer 2018-09-12 17:47:10 +02:00 committed by GitHub
parent 141c6ef93e
commit c783488e97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1885 additions and 25 deletions

View File

@ -207,6 +207,51 @@ repositories.url.allowed_urls: ["http://www.example.org/root/*", "https://*.mydo
URL repositories with `file:` URLs can only point to locations registered in the `path.repo` setting similar to
shared file system repository.
[float]
[role="xpack"]
[testenv="basic"]
===== Source Only Repository
A source repository enables you to create minimal, source-only snapshots that take up to 50% less space on disk.
Source only snapshots contain stored fields and index metadata. They do not include index or doc values structures
and are not searchable when restored. After restoring a source-only snapshot, you must <<docs-reindex,reindex>>
the data into a new index.
Source repositories delegate to another snapshot repository for storage.
[IMPORTANT]
==================================================
Source only snapshots are only supported if the `_source` field is enabled and no source-filtering is applied.
When you restore a source only snapshot:
* The restored index is read-only and can only serve `match_all` search or scroll requests to enable reindexing.
* Queries other than `match_all` and `_get` requests are not supported.
* The mapping of the restored index is empty, but the original mapping is available from the types top
level `meta` element.
==================================================
When you create a source repository, you must specify the type and name of the delegate repository
where the snapshots will be stored:
[source,js]
-----------------------------------
PUT _snapshot/my_src_only_repository
{
"type": "source",
"settings": {
"delegate_type": "fs",
"location": "my_backup_location"
}
}
-----------------------------------
// CONSOLE
// TEST[continued]
[float]
===== Repository plugins

View File

@ -20,6 +20,7 @@ package org.elasticsearch.core.internal.io;
import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
@ -36,6 +37,14 @@ import java.util.Map;
*/
public final class IOUtils {
/**
* UTF-8 charset string.
* <p>Where possible, use {@link StandardCharsets#UTF_8} instead,
* as using the String constant may slow things down.
* @see StandardCharsets#UTF_8
*/
public static final String UTF_8 = StandardCharsets.UTF_8.name();
private IOUtils() {
// Static utils methods
}

View File

@ -1594,7 +1594,7 @@ public abstract class Engine implements Closeable {
private final CheckedRunnable<IOException> onClose;
private final IndexCommit indexCommit;
IndexCommitRef(IndexCommit indexCommit, CheckedRunnable<IOException> onClose) {
public IndexCommitRef(IndexCommit indexCommit, CheckedRunnable<IOException> onClose) {
this.indexCommit = indexCommit;
this.onClose = onClose;
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.engine;
/**
* Simple Engine Factory
*/
@FunctionalInterface
public interface EngineFactory {
Engine newReadWriteEngine(EngineConfig config);

View File

@ -91,5 +91,4 @@ public class SeqNoStats implements ToXContentFragment, Writeable {
", globalCheckpoint=" + globalCheckpoint +
'}';
}
}

View File

@ -51,6 +51,4 @@ public abstract class AbstractIndexShardComponent implements IndexShardComponent
public String nodeName() {
return indexSettings.getNodeName();
}
}

View File

@ -1439,11 +1439,28 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
*/
public void bootstrapNewHistory() throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) {
final Map<String, String> userData = getUserData(writer);
try {
Map<String, String> userData = readLastCommittedSegmentsInfo().getUserData();
final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO));
bootstrapNewHistory(maxSeqNo);
} finally {
metadataLock.writeLock().unlock();
}
}
/**
* Marks an existing lucene index with a new history uuid and sets the given maxSeqNo as the local checkpoint
* as well as the maximum sequence number.
* This is used to make sure no existing shard will recovery from this index using ops based recovery.
* @see SequenceNumbers#LOCAL_CHECKPOINT_KEY
* @see SequenceNumbers#MAX_SEQ_NO
*/
public void bootstrapNewHistory(long maxSeqNo) throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) {
final Map<String, String> map = new HashMap<>();
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
updateCommitData(writer, map);
} finally {

View File

@ -396,7 +396,6 @@ public class IndicesService extends AbstractLifecycleComponent
public IndexService indexService(Index index) {
return indices.get(index.getUUID());
}
/**
* Returns an IndexService for the specified index if exists otherwise a {@link IndexNotFoundException} is thrown.
*/

View File

@ -0,0 +1,167 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import java.io.IOException;
import java.util.List;
public class FilterRepository implements Repository {
private final Repository in;
public FilterRepository(Repository in) {
this.in = in;
}
@Override
public RepositoryMetaData getMetadata() {
return in.getMetadata();
}
@Override
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
return in.getSnapshotInfo(snapshotId);
}
@Override
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
return in.getSnapshotGlobalMetaData(snapshotId);
}
@Override
public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException {
return in.getSnapshotIndexMetaData(snapshotId, index);
}
@Override
public RepositoryData getRepositoryData() {
return in.getRepositoryData();
}
@Override
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {
in.initializeSnapshot(snapshotId, indices, metaData);
}
@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState) {
return in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState);
}
@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
in.deleteSnapshot(snapshotId, repositoryStateId);
}
@Override
public long getSnapshotThrottleTimeInNanos() {
return in.getSnapshotThrottleTimeInNanos();
}
@Override
public long getRestoreThrottleTimeInNanos() {
return in.getRestoreThrottleTimeInNanos();
}
@Override
public String startVerification() {
return in.startVerification();
}
@Override
public void endVerification(String verificationToken) {
in.endVerification(verificationToken);
}
@Override
public void verify(String verificationToken, DiscoveryNode localNode) {
in.verify(verificationToken, localNode);
}
@Override
public boolean isReadOnly() {
return in.isReadOnly();
}
@Override
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
in.snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
}
@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
in.restoreShard(shard, snapshotId, version, indexId, snapshotShardId, recoveryState);
}
@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
return in.getShardSnapshotStatus(snapshotId, version, indexId, shardId);
}
@Override
public Lifecycle.State lifecycleState() {
return in.lifecycleState();
}
@Override
public void addLifecycleListener(LifecycleListener listener) {
in.addLifecycleListener(listener);
}
@Override
public void removeLifecycleListener(LifecycleListener listener) {
in.removeLifecycleListener(listener);
}
@Override
public void start() {
in.start();
}
@Override
public void stop() {
in.stop();
}
@Override
public void close() {
in.close();
}
}

View File

@ -398,7 +398,7 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
"repository type [" + repositoryMetaData.type() + "] does not exist");
}
try {
Repository repository = factory.create(repositoryMetaData);
Repository repository = factory.create(repositoryMetaData, typesRegistry::get);
repository.start();
return repository;
} catch (Exception e) {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
@ -35,6 +36,7 @@ import org.elasticsearch.snapshots.SnapshotShardFailure;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
/**
* An interface for interacting with a repository in snapshot and restore.
@ -46,7 +48,7 @@ import java.util.List;
* <ul>
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link Repository#snapshotShard(IndexShard, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
* <li>Data nodes call {@link Repository#snapshotShard(IndexShard, Store, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
* for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
* </ul>
@ -63,6 +65,10 @@ public interface Repository extends LifecycleComponent {
* @param metadata metadata for the repository including name and settings
*/
Repository create(RepositoryMetaData metadata) throws Exception;
default Repository create(RepositoryMetaData metaData, Function<String, Repository.Factory> typeLookup) throws Exception {
return create(metaData);
}
}
/**
@ -188,14 +194,15 @@ public interface Repository extends LifecycleComponent {
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
* {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
*
* @param shard shard to be snapshotted
* @param store store to be snapshotted
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
*/
void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus);
/**
* Restores snapshot of the shard.

View File

@ -845,8 +845,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
@Override
public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
SnapshotContext snapshotContext = new SnapshotContext(shard, snapshotId, indexId, snapshotStatus, System.currentTimeMillis());
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
SnapshotContext snapshotContext = new SnapshotContext(store, snapshotId, indexId, snapshotStatus, System.currentTimeMillis());
try {
snapshotContext.snapshot(snapshotIndexCommit);
} catch (Exception e) {
@ -854,7 +855,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
if (e instanceof IndexShardSnapshotFailedException) {
throw (IndexShardSnapshotFailedException) e;
} else {
throw new IndexShardSnapshotFailedException(shard.shardId(), e);
throw new IndexShardSnapshotFailedException(store.shardId(), e);
}
}
}
@ -1157,15 +1158,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
/**
* Constructs new context
*
* @param shard shard to be snapshotted
* @param store store to be snapshotted
* @param snapshotId snapshot id
* @param indexId the id of the index being snapshotted
* @param snapshotStatus snapshot status to report progress
*/
SnapshotContext(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus, long startTime) {
super(snapshotId, Version.CURRENT, indexId, shard.shardId());
SnapshotContext(Store store, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus, long startTime) {
super(snapshotId, Version.CURRENT, indexId, store.shardId());
this.snapshotStatus = snapshotStatus;
this.store = shard.store();
this.store = store;
this.startTime = startTime;
}

View File

@ -389,7 +389,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
try {
// we flush first to make sure we get the latest writes snapshotted
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireLastIndexCommit(true)) {
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
repository.snapshotShard(indexShard, indexShard.store(), snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(),
snapshotStatus);
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
logger.debug("snapshot ({}) completed to {} with {}", snapshot, repository, lastSnapshotStatus);

View File

@ -2969,7 +2969,8 @@ public class IndexShardTests extends IndexShardTestCase {
}
@Override
public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
}
@Override

View File

@ -833,7 +833,8 @@ public abstract class EngineTestCase extends ESTestCase {
* Asserts the provided engine has a consistent document history between translog and Lucene index.
*/
public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine engine, MapperService mapper) throws IOException {
if (mapper.documentMapper() == null || engine.config().getIndexSettings().isSoftDeleteEnabled() == false) {
if (mapper.documentMapper() == null || engine.config().getIndexSettings().isSoftDeleteEnabled() == false
|| (engine instanceof InternalEngine) == false) {
return;
}
final long maxSeqNo = ((InternalEngine) engine).getLocalCheckpointTracker().getMaxSeqNo();

View File

@ -126,7 +126,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
};
protected ThreadPool threadPool;
private long primaryTerm;
protected long primaryTerm;
@Override
public void setUp() throws Exception {
@ -753,7 +753,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
Index index = shard.shardId().getIndex();
IndexId indexId = new IndexId(index.getName(), index.getUUID());
repository.snapshotShard(shard, snapshot.getSnapshotId(), indexId, indexCommitRef.getIndexCommit(), snapshotStatus);
repository.snapshotShard(shard, shard.store(), snapshot.getSnapshotId(), indexId, indexCommitRef.getIndexCommit(),
snapshotStatus);
}
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();

View File

@ -75,6 +75,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
@ -1199,7 +1200,9 @@ public final class InternalTestCluster extends TestCluster {
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
try {
IndexShardTestCase.getTranslog(indexShard).getDeletionPolicy().assertNoOpenTranslogRefs();
if (IndexShardTestCase.getEngine(indexShard) instanceof InternalEngine) {
IndexShardTestCase.getTranslog(indexShard).getDeletionPolicy().assertNoOpenTranslogRefs();
}
} catch (AlreadyClosedException ok) {
// all good
}

View File

@ -0,0 +1,162 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.snapshots;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.Terms;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import java.io.IOException;
import java.util.IdentityHashMap;
import java.util.Map;
/**
* This filter reader fakes sequence ID, primary term and version
* for a source only index.
*/
final class SeqIdGeneratingFilterReader extends FilterDirectoryReader {
private final long primaryTerm;
private SeqIdGeneratingFilterReader(DirectoryReader in, SeqIdGeneratingSubReaderWrapper wrapper) throws IOException {
super(in, wrapper);
primaryTerm = wrapper.primaryTerm;
}
@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return wrap(in, primaryTerm);
}
static DirectoryReader wrap(DirectoryReader in, long primaryTerm) throws IOException {
Map<LeafReader, LeafReaderContext> ctxMap = new IdentityHashMap<>();
for (LeafReaderContext leave : in.leaves()) {
ctxMap.put(leave.reader(), leave);
}
return new SeqIdGeneratingFilterReader(in, new SeqIdGeneratingSubReaderWrapper(ctxMap, primaryTerm));
}
@Override
public CacheHelper getReaderCacheHelper() {
return in.getReaderCacheHelper();
}
private abstract static class FakeNumericDocValues extends NumericDocValues {
private final int maxDoc;
int docID = -1;
FakeNumericDocValues(int maxDoc) {
this.maxDoc = maxDoc;
}
@Override
public int docID() {
return docID;
}
@Override
public int nextDoc() {
if (docID+1 < maxDoc) {
docID++;
} else {
docID = NO_MORE_DOCS;
}
return docID;
}
@Override
public int advance(int target) {
if (target >= maxDoc) {
docID = NO_MORE_DOCS;
} else {
docID = target;
}
return docID;
}
@Override
public long cost() {
return maxDoc;
}
@Override
public boolean advanceExact(int target) {
advance(target);
return docID != NO_MORE_DOCS;
}
}
private static class SeqIdGeneratingSubReaderWrapper extends SubReaderWrapper {
private final Map<LeafReader, LeafReaderContext> ctxMap;
private final long primaryTerm;
SeqIdGeneratingSubReaderWrapper(Map<LeafReader, LeafReaderContext> ctxMap, long primaryTerm) {
this.ctxMap = ctxMap;
this.primaryTerm = primaryTerm;
}
@Override
public LeafReader wrap(LeafReader reader) {
LeafReaderContext leafReaderContext = ctxMap.get(reader);
final int docBase = leafReaderContext.docBase;
return new FilterLeafReader(reader) {
@Override
public NumericDocValues getNumericDocValues(String field) throws IOException {
if (SeqNoFieldMapper.NAME.equals(field)) {
return new FakeNumericDocValues(maxDoc()) {
@Override
public long longValue() {
return docBase + docID;
}
};
} else if (SeqNoFieldMapper.PRIMARY_TERM_NAME.equals(field)) {
return new FakeNumericDocValues(maxDoc()) {
@Override
public long longValue() {
return primaryTerm;
}
};
} else if (VersionFieldMapper.NAME.equals(field)) {
return new FakeNumericDocValues(maxDoc()) {
@Override
public long longValue() {
return 1;
}
};
}
return super.getNumericDocValues(field);
}
@Override
public CacheHelper getCoreCacheHelper() {
return reader.getCoreCacheHelper();
}
@Override
public CacheHelper getReaderCacheHelper() {
return reader.getReaderCacheHelper();
}
@Override
public Terms terms(String field) {
throw new UnsupportedOperationException("_source only indices can't be searched or filtered");
}
@Override
public PointValues getPointValues(String field) {
throw new UnsupportedOperationException("_source only indices can't be searched or filtered");
}
};
}
}
}

View File

@ -0,0 +1,261 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.snapshots;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.core.internal.io.IOUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.FIELDS_EXTENSION;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.FIELDS_INDEX_EXTENSION;
public class SourceOnlySnapshot {
private final Directory targetDirectory;
private final Supplier<Query> deleteByQuerySupplier;
public SourceOnlySnapshot(Directory targetDirectory, Supplier<Query> deleteByQuerySupplier) {
this.targetDirectory = targetDirectory;
this.deleteByQuerySupplier = deleteByQuerySupplier;
}
public SourceOnlySnapshot(Directory targetDirectory) {
this(targetDirectory, null);
}
public synchronized List<String> syncSnapshot(IndexCommit commit) throws IOException {
long generation;
Map<BytesRef, SegmentCommitInfo> existingSegments = new HashMap<>();
if (Lucene.indexExists(targetDirectory)) {
SegmentInfos existingsSegmentInfos = Lucene.readSegmentInfos(targetDirectory);
for (SegmentCommitInfo info : existingsSegmentInfos) {
existingSegments.put(new BytesRef(info.info.getId()), info);
}
generation = existingsSegmentInfos.getGeneration();
} else {
generation = 1;
}
List<String> createdFiles = new ArrayList<>();
String segmentFileName;
try (Lock writeLock = targetDirectory.obtainLock(IndexWriter.WRITE_LOCK_NAME);
StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(commit)) {
SegmentInfos segmentInfos = reader.getSegmentInfos();
DirectoryReader wrapper = wrapReader(reader);
List<SegmentCommitInfo> newInfos = new ArrayList<>();
for (LeafReaderContext ctx : wrapper.leaves()) {
SegmentCommitInfo info = segmentInfos.info(ctx.ord);
LeafReader leafReader = ctx.reader();
LiveDocs liveDocs = getLiveDocs(leafReader);
if (leafReader.numDocs() != 0) { // fully deleted segments don't need to be processed
SegmentCommitInfo newInfo = syncSegment(info, liveDocs, leafReader.getFieldInfos(), existingSegments, createdFiles);
newInfos.add(newInfo);
}
}
segmentInfos.clear();
segmentInfos.addAll(newInfos);
segmentInfos.setNextWriteGeneration(Math.max(segmentInfos.getGeneration(), generation)+1);
String pendingSegmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.PENDING_SEGMENTS,
"", segmentInfos.getGeneration());
try (IndexOutput segnOutput = targetDirectory.createOutput(pendingSegmentFileName, IOContext.DEFAULT)) {
segmentInfos.write(targetDirectory, segnOutput);
}
targetDirectory.sync(Collections.singleton(pendingSegmentFileName));
targetDirectory.sync(createdFiles);
segmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, "", segmentInfos.getGeneration());
targetDirectory.rename(pendingSegmentFileName, segmentFileName);
}
Lucene.pruneUnreferencedFiles(segmentFileName, targetDirectory);
assert assertCheckIndex();
return Collections.unmodifiableList(createdFiles);
}
private LiveDocs getLiveDocs(LeafReader reader) throws IOException {
if (deleteByQuerySupplier != null) {
// we have this additional delete by query functionality to filter out documents before we snapshot them
// we can't filter after the fact since we don't have an index anymore.
Query query = deleteByQuerySupplier.get();
IndexSearcher s = new IndexSearcher(reader);
s.setQueryCache(null);
Query rewrite = s.rewrite(query);
Weight weight = s.createWeight(rewrite, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
Scorer scorer = weight.scorer(reader.getContext());
if (scorer != null) {
DocIdSetIterator iterator = scorer.iterator();
if (iterator != null) {
Bits liveDocs = reader.getLiveDocs();
final FixedBitSet bits;
if (liveDocs != null) {
bits = FixedBitSet.copyOf(liveDocs);
} else {
bits = new FixedBitSet(reader.maxDoc());
bits.set(0, reader.maxDoc());
}
int newDeletes = apply(iterator, bits);
if (newDeletes != 0) {
int numDeletes = reader.numDeletedDocs() + newDeletes;
return new LiveDocs(numDeletes, bits);
}
}
}
}
return new LiveDocs(reader.numDeletedDocs(), reader.getLiveDocs());
}
private int apply(DocIdSetIterator iterator, FixedBitSet bits) throws IOException {
int docID = -1;
int newDeletes = 0;
while ((docID = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (bits.get(docID)) {
bits.clear(docID);
newDeletes++;
}
}
return newDeletes;
}
private boolean assertCheckIndex() throws IOException {
ByteArrayOutputStream output = new ByteArrayOutputStream(1024);
try (CheckIndex checkIndex = new CheckIndex(targetDirectory)) {
checkIndex.setFailFast(true);
checkIndex.setInfoStream(new PrintStream(output, false, IOUtils.UTF_8), false);
CheckIndex.Status status = checkIndex.checkIndex();
if (status == null || status.clean == false) {
throw new RuntimeException("CheckIndex failed: " + output.toString(IOUtils.UTF_8));
}
return true;
}
}
DirectoryReader wrapReader(DirectoryReader reader) throws IOException {
String softDeletesField = null;
for (LeafReaderContext ctx : reader.leaves()) {
String field = ctx.reader().getFieldInfos().getSoftDeletesField();
if (field != null) {
softDeletesField = field;
break;
}
}
return softDeletesField == null ? reader : new SoftDeletesDirectoryReaderWrapper(reader, softDeletesField);
}
private SegmentCommitInfo syncSegment(SegmentCommitInfo segmentCommitInfo, LiveDocs liveDocs, FieldInfos fieldInfos,
Map<BytesRef, SegmentCommitInfo> existingSegments, List<String> createdFiles) throws IOException {
SegmentInfo si = segmentCommitInfo.info;
Codec codec = si.getCodec();
final String segmentSuffix = "";
SegmentCommitInfo newInfo;
final TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(targetDirectory);
BytesRef segmentId = new BytesRef(si.getId());
boolean exists = existingSegments.containsKey(segmentId);
if (exists == false) {
SegmentInfo newSegmentInfo = new SegmentInfo(si.dir, si.getVersion(), si.getMinVersion(), si.name, si.maxDoc(), false,
si.getCodec(), si.getDiagnostics(), si.getId(), si.getAttributes(), null);
// we drop the sort on purpose since the field we sorted on doesn't exist in the target index anymore.
newInfo = new SegmentCommitInfo(newSegmentInfo, 0, 0, -1, -1, -1);
List<FieldInfo> fieldInfoCopy = new ArrayList<>(fieldInfos.size());
for (FieldInfo fieldInfo : fieldInfos) {
fieldInfoCopy.add(new FieldInfo(fieldInfo.name, fieldInfo.number,
false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, fieldInfo.attributes(), 0, 0,
fieldInfo.isSoftDeletesField()));
}
FieldInfos newFieldInfos = new FieldInfos(fieldInfoCopy.toArray(new FieldInfo[0]));
codec.fieldInfosFormat().write(trackingDir, newSegmentInfo, segmentSuffix, newFieldInfos, IOContext.DEFAULT);
newInfo.setFieldInfosFiles(trackingDir.getCreatedFiles());
String idxFile = IndexFileNames.segmentFileName(newSegmentInfo.name, segmentSuffix, FIELDS_INDEX_EXTENSION);
String dataFile = IndexFileNames.segmentFileName(newSegmentInfo.name, segmentSuffix, FIELDS_EXTENSION);
Directory sourceDir = newSegmentInfo.dir;
if (si.getUseCompoundFile()) {
sourceDir = codec.compoundFormat().getCompoundReader(sourceDir, si, IOContext.DEFAULT);
}
trackingDir.copyFrom(sourceDir, idxFile, idxFile, IOContext.DEFAULT);
trackingDir.copyFrom(sourceDir, dataFile, dataFile, IOContext.DEFAULT);
if (sourceDir != newSegmentInfo.dir) {
sourceDir.close();
}
} else {
newInfo = existingSegments.get(segmentId);
assert newInfo.info.getUseCompoundFile() == false;
}
if (liveDocs.bits != null && liveDocs.numDeletes != 0 && liveDocs.numDeletes != newInfo.getDelCount()) {
if (newInfo.getDelCount() != 0) {
assert assertLiveDocs(liveDocs.bits, liveDocs.numDeletes);
}
codec.liveDocsFormat().writeLiveDocs(liveDocs.bits, trackingDir, newInfo, liveDocs.numDeletes - newInfo.getDelCount(),
IOContext.DEFAULT);
SegmentCommitInfo info = new SegmentCommitInfo(newInfo.info, liveDocs.numDeletes, 0, newInfo.getNextDelGen(), -1, -1);
info.setFieldInfosFiles(newInfo.getFieldInfosFiles());
info.info.setFiles(trackingDir.getCreatedFiles());
newInfo = info;
}
if (exists == false) {
newInfo.info.setFiles(trackingDir.getCreatedFiles());
codec.segmentInfoFormat().write(trackingDir, newInfo.info, IOContext.DEFAULT);
}
createdFiles.addAll(trackingDir.getCreatedFiles());
return newInfo;
}
private boolean assertLiveDocs(Bits liveDocs, int deletes) {
int actualDeletes = 0;
for (int i = 0; i < liveDocs.length(); i++ ) {
if (liveDocs.get(i) == false) {
actualDeletes++;
}
}
assert actualDeletes == deletes : " actual: " + actualDeletes + " deletes: " + deletes;
return true;
}
private static class LiveDocs {
final int numDeletes;
final Bits bits;
LiveDocs(int numDeletes, Bits bits) {
this.numDeletes = numDeletes;
this.bits = bits;
}
}
}

View File

@ -0,0 +1,181 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.snapshots;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.ReadOnlyEngine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.repositories.FilterRepository;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* <p>
* This is a filter snapshot repository that only snapshots the minimal required information
* that is needed to recreate the index. In other words instead of snapshotting the entire shard
* with all it's lucene indexed fields, doc values, points etc. it only snapshots the the stored
* fields including _source and _routing as well as the live docs in oder to distinguish between
* live and deleted docs.
* </p>
* <p>
* The repository can wrap any other repository delegating the source only snapshot to it to and read
* from it. For instance a file repository of type <i>fs</i> by passing <i>settings.delegate_type=fs</i>
* at repository creation time.
* </p>
* Snapshots restored from source only snapshots are minimal indices that are read-only and only allow
* match_all scroll searches in order to reindex the data.
*/
public final class SourceOnlySnapshotRepository extends FilterRepository {
private static final Setting<String> DELEGATE_TYPE = new Setting<>("delegate_type", "", Function.identity(), Setting.Property
.NodeScope);
public static final Setting<Boolean> SOURCE_ONLY = Setting.boolSetting("index.source_only", false, Setting
.Property.IndexScope, Setting.Property.Final, Setting.Property.PrivateIndex);
private static final String SNAPSHOT_DIR_NAME = "_snapshot";
SourceOnlySnapshotRepository(Repository in) {
super(in);
}
@Override
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {
// we process the index metadata at snapshot time. This means if somebody tries to restore
// a _source only snapshot with a plain repository it will be just fine since we already set the
// required engine, that the index is read-only and the mapping to a default mapping
try {
MetaData.Builder builder = MetaData.builder(metaData);
for (IndexId indexId : indices) {
IndexMetaData index = metaData.index(indexId.getName());
IndexMetaData.Builder indexMetadataBuilder = IndexMetaData.builder(index);
// for a minimal restore we basically disable indexing on all fields and only create an index
// that is valid from an operational perspective. ie. it will have all metadata fields like version/
// seqID etc. and an indexed ID field such that we can potentially perform updates on them or delete documents.
ImmutableOpenMap<String, MappingMetaData> mappings = index.getMappings();
Iterator<ObjectObjectCursor<String, MappingMetaData>> iterator = mappings.iterator();
while (iterator.hasNext()) {
ObjectObjectCursor<String, MappingMetaData> next = iterator.next();
// we don't need to obey any routing here stuff is read-only anyway and get is disabled
final String mapping = "{ \"" + next.key + "\": { \"enabled\": false, \"_meta\": " + next.value.source().string()
+ " } }";
indexMetadataBuilder.putMapping(next.key, mapping);
}
indexMetadataBuilder.settings(Settings.builder().put(index.getSettings())
.put(SOURCE_ONLY.getKey(), true)
.put("index.blocks.write", true)); // read-only!
builder.put(indexMetadataBuilder);
}
super.initializeSnapshot(snapshotId, indices, builder.build());
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
@Override
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
if (shard.mapperService().documentMapper() != null // if there is no mapping this is null
&& shard.mapperService().documentMapper().sourceMapper().isComplete() == false) {
throw new IllegalStateException("Can't snapshot _source only on an index that has incomplete source ie. has _source disabled " +
"or filters the source");
}
ShardPath shardPath = shard.shardPath();
Path dataPath = shardPath.getDataPath();
// TODO should we have a snapshot tmp directory per shard that is maintained by the system?
Path snapPath = dataPath.resolve(SNAPSHOT_DIR_NAME);
try (FSDirectory directory = new SimpleFSDirectory(snapPath)) {
Store tempStore = new Store(store.shardId(), store.indexSettings(), directory, new ShardLock(store.shardId()) {
@Override
protected void closeInternal() {
// do nothing;
}
}, Store.OnClose.EMPTY);
Supplier<Query> querySupplier = shard.mapperService().hasNested() ? Queries::newNestedFilter : null;
// SourceOnlySnapshot will take care of soft- and hard-deletes no special casing needed here
SourceOnlySnapshot snapshot = new SourceOnlySnapshot(tempStore.directory(), querySupplier);
snapshot.syncSnapshot(snapshotIndexCommit);
// we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID
SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
tempStore.bootstrapNewHistory(segmentInfos.totalMaxDoc());
store.incRef();
try (DirectoryReader reader = DirectoryReader.open(tempStore.directory())) {
IndexCommit indexCommit = reader.getIndexCommit();
super.snapshotShard(shard, tempStore, snapshotId, indexId, indexCommit, snapshotStatus);
} finally {
store.decRef();
}
} catch (IOException e) {
// why on earth does this super method not declare IOException
throw new UncheckedIOException(e);
}
}
/**
* Returns an {@link EngineFactory} for the source only snapshots.
*/
public static EngineFactory getEngineFactory() {
return config -> new ReadOnlyEngine(config, null, new TranslogStats(0, 0, 0, 0, 0), true,
reader -> {
try {
return SeqIdGeneratingFilterReader.wrap(reader, config.getPrimaryTermSupplier().getAsLong());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}
/**
* Returns a new source only repository factory
*/
public static Repository.Factory newRepositoryFactory() {
return new Repository.Factory() {
@Override
public Repository create(RepositoryMetaData metadata) {
throw new UnsupportedOperationException();
}
@Override
public Repository create(RepositoryMetaData metaData, Function<String, Repository.Factory> typeLookup) throws Exception {
String delegateType = DELEGATE_TYPE.get(metaData.settings());
if (Strings.hasLength(delegateType) == false) {
throw new IllegalArgumentException(DELEGATE_TYPE.getKey() + " must be set");
}
Repository.Factory factory = typeLookup.apply(delegateType);
return new SourceOnlySnapshotRepository(factory.create(new RepositoryMetaData(metaData.name(),
delegateType, metaData.settings()), typeLookup));
}
};
}
}

View File

@ -31,21 +31,28 @@ import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.LicensesMetaData;
import org.elasticsearch.license.Licensing;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.ExtensiblePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.snapshots.SourceOnlySnapshotRepository;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.action.TransportXPackInfoAction;
@ -67,13 +74,15 @@ import java.security.PrivilegedAction;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
public class XPackPlugin extends XPackClientPlugin implements ScriptPlugin, ExtensiblePlugin {
public class XPackPlugin extends XPackClientPlugin implements ScriptPlugin, ExtensiblePlugin, RepositoryPlugin, EnginePlugin {
private static Logger logger = ESLoggerFactory.getLogger(XPackPlugin.class);
private static DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
@ -340,4 +349,23 @@ public class XPackPlugin extends XPackClientPlugin implements ScriptPlugin, Exte
}
}
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory());
}
@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
if (indexSettings.getValue(SourceOnlySnapshotRepository.SOURCE_ONLY)) {
return Optional.of(SourceOnlySnapshotRepository.getEngineFactory());
}
return Optional.empty();
}
@Override
public List<Setting<?>> getSettings() {
List<Setting<?>> settings = super.getSettings();
settings.add(SourceOnlySnapshotRepository.SOURCE_ONLY);
return settings;
}
}

View File

@ -0,0 +1,291 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.snapshots;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.slice.SliceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
public class SourceOnlySnapshotIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Collection<Class<? extends Plugin>> classes = new ArrayList<>(super.nodePlugins());
classes.add(MyPlugin.class);
return classes;
}
@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
Collection<Class<? extends Plugin>> classes = new ArrayList<>(super.getMockPlugins());
classes.remove(MockEngineFactoryPlugin.class);
return classes;
}
public static final class MyPlugin extends Plugin implements RepositoryPlugin, EnginePlugin {
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory());
}
@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
if (indexSettings.getValue(SourceOnlySnapshotRepository.SOURCE_ONLY)) {
return Optional.of(SourceOnlySnapshotRepository.getEngineFactory());
}
return Optional.empty();
}
@Override
public List<Setting<?>> getSettings() {
List<Setting<?>> settings = new ArrayList<>(super.getSettings());
settings.add(SourceOnlySnapshotRepository.SOURCE_ONLY);
return settings;
}
}
public void testSnapshotAndRestore() throws Exception {
final String sourceIdx = "test-idx";
boolean requireRouting = randomBoolean();
boolean useNested = randomBoolean();
IndexRequestBuilder[] builders = snashotAndRestore(sourceIdx, 1, true, requireRouting, useNested);
assertHits(sourceIdx, builders.length);
assertMappings(sourceIdx, requireRouting, useNested);
SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () -> {
client().prepareSearch(sourceIdx).setQuery(QueryBuilders.idsQuery()
.addIds("" + randomIntBetween(0, builders.length))).get();
});
assertTrue(e.toString().contains("_source only indices can't be searched or filtered"));
e = expectThrows(SearchPhaseExecutionException.class, () ->
client().prepareSearch(sourceIdx).setQuery(QueryBuilders.termQuery("field1", "bar")).get());
assertTrue(e.toString().contains("_source only indices can't be searched or filtered"));
// make sure deletes do not work
String idToDelete = "" + randomIntBetween(0, builders.length);
expectThrows(ClusterBlockException.class, () -> client().prepareDelete(sourceIdx, "_doc", idToDelete)
.setRouting("r" + idToDelete).get());
internalCluster().ensureAtLeastNumDataNodes(2);
client().admin().indices().prepareUpdateSettings(sourceIdx)
.setSettings(Settings.builder().put("index.number_of_replicas", 1)).get();
ensureGreen(sourceIdx);
assertHits(sourceIdx, builders.length);
}
public void testSnapshotAndRestoreWithNested() throws Exception {
final String sourceIdx = "test-idx";
boolean requireRouting = randomBoolean();
IndexRequestBuilder[] builders = snashotAndRestore(sourceIdx, 1, true, requireRouting, true);
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().clear().setDocs(true).get();
assertThat(indicesStatsResponse.getTotal().docs.getDeleted(), Matchers.greaterThan(0L));
assertHits(sourceIdx, builders.length);
assertMappings(sourceIdx, requireRouting, true);
SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () ->
client().prepareSearch(sourceIdx).setQuery(QueryBuilders.idsQuery().addIds("" + randomIntBetween(0, builders.length))).get());
assertTrue(e.toString().contains("_source only indices can't be searched or filtered"));
e = expectThrows(SearchPhaseExecutionException.class, () ->
client().prepareSearch(sourceIdx).setQuery(QueryBuilders.termQuery("field1", "bar")).get());
assertTrue(e.toString().contains("_source only indices can't be searched or filtered"));
// make sure deletes do not work
String idToDelete = "" + randomIntBetween(0, builders.length);
expectThrows(ClusterBlockException.class, () -> client().prepareDelete(sourceIdx, "_doc", idToDelete)
.setRouting("r" + idToDelete).get());
internalCluster().ensureAtLeastNumDataNodes(2);
client().admin().indices().prepareUpdateSettings(sourceIdx).setSettings(Settings.builder().put("index.number_of_replicas", 1))
.get();
ensureGreen(sourceIdx);
assertHits(sourceIdx, builders.length);
}
private void assertMappings(String sourceIdx, boolean requireRouting, boolean useNested) throws IOException {
GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings(sourceIdx).get();
ImmutableOpenMap<String, MappingMetaData> mapping = getMappingsResponse
.getMappings().get(sourceIdx);
assertTrue(mapping.containsKey("_doc"));
String nested = useNested ?
",\"incorrect\":{\"type\":\"object\"},\"nested\":{\"type\":\"nested\",\"properties\":{\"value\":{\"type\":\"long\"}}}" : "";
if (requireRouting) {
assertEquals("{\"_doc\":{\"enabled\":false," +
"\"_meta\":{\"_doc\":{\"_routing\":{\"required\":true}," +
"\"properties\":{\"field1\":{\"type\":\"text\"," +
"\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}}" + nested +
"}}}}}", mapping.get("_doc").source().string());
} else {
assertEquals("{\"_doc\":{\"enabled\":false," +
"\"_meta\":{\"_doc\":{\"properties\":{\"field1\":{\"type\":\"text\"," +
"\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}}" + nested + "}}}}}",
mapping.get("_doc").source().string());
}
}
private void assertHits(String index, int numDocsExpected) {
SearchResponse searchResponse = client().prepareSearch(index)
.addSort(SeqNoFieldMapper.NAME, SortOrder.ASC)
.setSize(numDocsExpected).get();
Consumer<SearchResponse> assertConsumer = res -> {
SearchHits hits = res.getHits();
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().clear().setDocs(true).get();
long deleted = indicesStatsResponse.getTotal().docs.getDeleted();
boolean allowHoles = deleted > 0; // we use indexRandom which might create holes ie. deleted docs
long i = 0;
for (SearchHit hit : hits) {
String id = hit.getId();
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
assertTrue(sourceAsMap.containsKey("field1"));
if (allowHoles) {
long seqId = ((Number) hit.getSortValues()[0]).longValue();
assertThat(i, Matchers.lessThanOrEqualTo(seqId));
i = seqId + 1;
} else {
assertEquals(i++, hit.getSortValues()[0]);
}
assertEquals("bar " + id, sourceAsMap.get("field1"));
assertEquals("r" + id, hit.field("_routing").getValue());
}
};
assertConsumer.accept(searchResponse);
assertEquals(numDocsExpected, searchResponse.getHits().totalHits);
searchResponse = client().prepareSearch(index)
.addSort(SeqNoFieldMapper.NAME, SortOrder.ASC)
.setScroll("1m")
.slice(new SliceBuilder(SeqNoFieldMapper.NAME, randomIntBetween(0,1), 2))
.setSize(randomIntBetween(1, 10)).get();
do {
// now do a scroll with a slice
assertConsumer.accept(searchResponse);
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get();
} while (searchResponse.getHits().getHits().length > 0);
}
private IndexRequestBuilder[] snashotAndRestore(String sourceIdx, int numShards, boolean minimal, boolean requireRouting, boolean
useNested)
throws ExecutionException, InterruptedException, IOException {
logger.info("--> starting a master node and a data node");
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
final Client client = client();
final String repo = "test-repo";
final String snapshot = "test-snap";
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository(repo).setType("source")
.setSettings(Settings.builder().put("location", randomRepoPath())
.put("delegate_type", "fs")
.put("restore_minimal", minimal)
.put("compress", randomBoolean())));
CreateIndexRequestBuilder createIndexRequestBuilder = prepareCreate(sourceIdx, 0, Settings.builder()
.put("number_of_shards", numShards).put("number_of_replicas", 0));
List<Object> mappings = new ArrayList<>();
if (requireRouting) {
mappings.addAll(Arrays.asList("_routing", "required=true"));
}
if (useNested) {
mappings.addAll(Arrays.asList("nested", "type=nested", "incorrect", "type=object"));
}
if (mappings.isEmpty() == false) {
createIndexRequestBuilder.addMapping("_doc", mappings.toArray());
}
assertAcked(createIndexRequestBuilder);
ensureGreen();
logger.info("--> indexing some data");
IndexRequestBuilder[] builders = new IndexRequestBuilder[randomIntBetween(10, 100)];
for (int i = 0; i < builders.length; i++) {
XContentBuilder source = jsonBuilder()
.startObject()
.field("field1", "bar " + i);
if (useNested) {
source.startArray("nested");
for (int j = 0; j < 2; ++j) {
source = source.startObject().field("value", i + 1 + j).endObject();
}
source.endArray();
}
source.endObject();
builders[i] = client().prepareIndex(sourceIdx, "_doc",
Integer.toString(i)).setSource(source).setRouting("r" + i);
}
indexRandom(true, builders);
flushAndRefresh();
assertHitCount(client().prepareSearch(sourceIdx).setQuery(QueryBuilders.idsQuery().addIds("0")).get(), 1);
logger.info("--> snapshot the index");
CreateSnapshotResponse createResponse = client.admin().cluster()
.prepareCreateSnapshot(repo, snapshot)
.setWaitForCompletion(true).setIndices(sourceIdx).get();
assertEquals(SnapshotState.SUCCESS, createResponse.getSnapshotInfo().state());
logger.info("--> delete index and stop the data node");
assertAcked(client.admin().indices().prepareDelete(sourceIdx).get());
internalCluster().stopRandomDataNode();
client().admin().cluster().prepareHealth().setTimeout("30s").setWaitForNodes("1");
logger.info("--> start a new data node");
final Settings dataSettings = Settings.builder()
.put(Node.NODE_NAME_SETTING.getKey(), randomAlphaOfLength(5))
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) // to get a new node id
.build();
internalCluster().startDataOnlyNode(dataSettings);
client().admin().cluster().prepareHealth().setTimeout("30s").setWaitForNodes("2");
logger.info("--> restore the index and ensure all shards are allocated");
RestoreSnapshotResponse restoreResponse = client().admin().cluster()
.prepareRestoreSnapshot(repo, snapshot).setWaitForCompletion(true)
.setIndices(sourceIdx).get();
assertEquals(restoreResponse.getRestoreInfo().totalShards(),
restoreResponse.getRestoreInfo().successfulShards());
ensureYellow();
return builders;
}
}

View File

@ -0,0 +1,358 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.snapshots;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.Bits;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.index.mapper.SourceToParse.source;
public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
public void testSourceIncomplete() throws IOException {
ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("index", "_na_", 0), randomAlphaOfLength(10), true,
ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
IndexMetaData metaData = IndexMetaData.builder(shardRouting.getIndexName())
.settings(settings)
.primaryTerm(0, primaryTerm)
.putMapping("_doc",
"{\"_source\":{\"enabled\": false}}").build();
IndexShard shard = newShard(shardRouting, metaData, new InternalEngineFactory());
recoverShardFromStore(shard);
for (int i = 0; i < 1; i++) {
final String id = Integer.toString(i);
indexDoc(shard, "_doc", id);
}
SnapshotId snapshotId = new SnapshotId("test", "test");
IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID());
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository());
repository.start();
try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, () ->
runAsSnapshot(shard.getThreadPool(),
() -> repository.snapshotShard(shard, shard.store(), snapshotId, indexId,
snapshotRef.getIndexCommit(), indexShardSnapshotStatus)));
assertEquals("Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source"
, illegalStateException.getMessage());
}
closeShards(shard);
}
public void testIncrementalSnapshot() throws IOException {
IndexShard shard = newStartedShard();
for (int i = 0; i < 10; i++) {
final String id = Integer.toString(i);
indexDoc(shard, "_doc", id);
}
IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID());
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository());
repository.start();
int totalFileCount = -1;
try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
SnapshotId snapshotId = new SnapshotId("test", "test");
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard, shard.store(), snapshotId, indexId, snapshotRef
.getIndexCommit(), indexShardSnapshotStatus));
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
totalFileCount = copy.getTotalFileCount();
assertEquals(copy.getStage(), IndexShardSnapshotStatus.Stage.DONE);
}
indexDoc(shard, "_doc", Integer.toString(10));
indexDoc(shard, "_doc", Integer.toString(11));
try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
SnapshotId snapshotId = new SnapshotId("test_1", "test_1");
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard, shard.store(), snapshotId, indexId, snapshotRef
.getIndexCommit(), indexShardSnapshotStatus));
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
// we processed the segments_N file plus _1.si, _1.fdx, _1.fnm, _1.fdt
assertEquals(5, copy.getIncrementalFileCount());
// in total we have 4 more files than the previous snap since we don't count the segments_N twice
assertEquals(totalFileCount+4, copy.getTotalFileCount());
assertEquals(copy.getStage(), IndexShardSnapshotStatus.Stage.DONE);
}
deleteDoc(shard, "_doc", Integer.toString(10));
try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
SnapshotId snapshotId = new SnapshotId("test_2", "test_2");
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard, shard.store(), snapshotId, indexId, snapshotRef
.getIndexCommit(), indexShardSnapshotStatus));
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
// we processed the segments_N file plus _1_1.liv
assertEquals(2, copy.getIncrementalFileCount());
// in total we have 5 more files than the previous snap since we don't count the segments_N twice
assertEquals(totalFileCount+5, copy.getTotalFileCount());
assertEquals(copy.getStage(), IndexShardSnapshotStatus.Stage.DONE);
}
closeShards(shard);
}
private String randomDoc() {
return "{ \"value\" : \"" + randomAlphaOfLength(10) + "\"}";
}
public void testRestoreMinmal() throws IOException {
IndexShard shard = newStartedShard(true);
int numInitialDocs = randomIntBetween(10, 100);
for (int i = 0; i < numInitialDocs; i++) {
final String id = Integer.toString(i);
indexDoc(shard, "_doc", id, randomDoc());
if (randomBoolean()) {
shard.refresh("test");
}
}
for (int i = 0; i < numInitialDocs; i++) {
final String id = Integer.toString(i);
if (randomBoolean()) {
if (rarely()) {
deleteDoc(shard, "_doc", id);
} else {
indexDoc(shard, "_doc", id, randomDoc());
}
}
if (frequently()) {
shard.refresh("test");
}
}
SnapshotId snapshotId = new SnapshotId("test", "test");
IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID());
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository());
repository.start();
try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
runAsSnapshot(shard.getThreadPool(), () -> {
repository.initializeSnapshot(snapshotId, Arrays.asList(indexId),
MetaData.builder().put(shard.indexSettings()
.getIndexMetaData(), false).build());
repository.snapshotShard(shard, shard.store(), snapshotId, indexId, snapshotRef.getIndexCommit(), indexShardSnapshotStatus);
});
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
assertEquals(copy.getStage(), IndexShardSnapshotStatus.Stage.DONE);
}
shard.refresh("test");
ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("index", "_na_", 0), randomAlphaOfLength(10), true,
ShardRoutingState.INITIALIZING,
new RecoverySource.SnapshotRecoverySource(new Snapshot("src_only", snapshotId), Version.CURRENT, indexId.getId()));
IndexMetaData metaData = runAsSnapshot(threadPool, () -> repository.getSnapshotIndexMetaData(snapshotId, indexId));
IndexShard restoredShard = newShard(shardRouting, metaData, null, SourceOnlySnapshotRepository.getEngineFactory(), () -> {});
restoredShard.mapperService().merge(shard.indexSettings().getIndexMetaData(), MapperService.MergeReason.MAPPING_RECOVERY);
DiscoveryNode discoveryNode = new DiscoveryNode("node_g", buildNewFakeTransportAddress(), Version.CURRENT);
restoredShard.markAsRecovering("test from snap", new RecoveryState(restoredShard.routingEntry(), discoveryNode, null));
runAsSnapshot(shard.getThreadPool(), () ->
assertTrue(restoredShard.restoreFromRepository(repository)));
assertEquals(restoredShard.recoveryState().getStage(), RecoveryState.Stage.DONE);
assertEquals(restoredShard.recoveryState().getTranslog().recoveredOperations(), 0);
assertEquals(IndexShardState.POST_RECOVERY, restoredShard.state());
restoredShard.refresh("test");
assertEquals(restoredShard.docStats().getCount(), shard.docStats().getCount());
EngineException engineException = expectThrows(EngineException.class, () -> restoredShard.get(
new Engine.Get(false, false, "_doc", Integer.toString(0), new Term("_id", Uid.encodeId(Integer.toString(0))))));
assertEquals(engineException.getCause().getMessage(), "_source only indices can't be searched or filtered");
SeqNoStats seqNoStats = restoredShard.seqNoStats();
assertEquals(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint());
final IndexShard targetShard;
try (Engine.Searcher searcher = restoredShard.acquireSearcher("test")) {
assertEquals(searcher.reader().maxDoc(), seqNoStats.getLocalCheckpoint());
TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), Integer.MAX_VALUE);
assertEquals(searcher.reader().numDocs(), search.totalHits.value);
search = searcher.searcher().search(new MatchAllDocsQuery(), Integer.MAX_VALUE,
new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG)), false);
assertEquals(searcher.reader().numDocs(), search.totalHits.value);
long previous = -1;
for (ScoreDoc doc : search.scoreDocs) {
FieldDoc fieldDoc = (FieldDoc) doc;
assertEquals(1, fieldDoc.fields.length);
long current = (Long)fieldDoc.fields[0];
assertThat(previous, Matchers.lessThan(current));
previous = current;
}
expectThrows(UnsupportedOperationException.class, () -> searcher.searcher().search(new TermQuery(new Term("boom", "boom")), 1));
targetShard = reindex(searcher.getDirectoryReader(), new MappingMetaData("_doc",
restoredShard.mapperService().documentMapper("_doc").meta()));
}
for (int i = 0; i < numInitialDocs; i++) {
Engine.Get get = new Engine.Get(false, false, "_doc", Integer.toString(i), new Term("_id", Uid.encodeId(Integer.toString(i))));
Engine.GetResult original = shard.get(get);
Engine.GetResult restored = targetShard.get(get);
assertEquals(original.exists(), restored.exists());
if (original.exists()) {
Document document = original.docIdAndVersion().reader.document(original.docIdAndVersion().docId);
Document restoredDocument = restored.docIdAndVersion().reader.document(restored.docIdAndVersion().docId);
for (IndexableField field : document) {
assertEquals(document.get(field.name()), restoredDocument.get(field.name()));
}
}
IOUtils.close(original, restored);
}
closeShards(shard, restoredShard, targetShard);
}
public IndexShard reindex(DirectoryReader reader, MappingMetaData mapping) throws IOException {
ShardRouting targetShardRouting = TestShardRouting.newShardRouting(new ShardId("target", "_na_", 0), randomAlphaOfLength(10), true,
ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
IndexMetaData.Builder metaData = IndexMetaData.builder(targetShardRouting.getIndexName())
.settings(settings)
.primaryTerm(0, primaryTerm);
metaData.putMapping(mapping);
IndexShard targetShard = newShard(targetShardRouting, metaData.build(), new InternalEngineFactory());
boolean success = false;
try {
recoverShardFromStore(targetShard);
String index = targetShard.shardId().getIndexName();
FieldsVisitor rootFieldsVisitor = new FieldsVisitor(true);
for (LeafReaderContext ctx : reader.leaves()) {
LeafReader leafReader = ctx.reader();
Bits liveDocs = leafReader.getLiveDocs();
for (int i = 0; i < leafReader.maxDoc(); i++) {
if (liveDocs == null || liveDocs.get(i)) {
rootFieldsVisitor.reset();
leafReader.document(i, rootFieldsVisitor);
rootFieldsVisitor.postProcess(targetShard.mapperService());
Uid uid = rootFieldsVisitor.uid();
BytesReference source = rootFieldsVisitor.source();
assert source != null : "_source is null but should have been filtered out at snapshot time";
Engine.Result result = targetShard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, source
(index, uid.type(), uid.id(), source, XContentHelper.xContentType(source))
.routing(rootFieldsVisitor.routing()), 1, false);
if (result.getResultType() != Engine.Result.Type.SUCCESS) {
throw new IllegalStateException("failed applying post restore operation result: " + result
.getResultType(), result.getFailure());
}
}
}
}
targetShard.refresh("test");
success = true;
} finally {
if (success == false) {
closeShards(targetShard);
}
}
return targetShard;
}
/** Create a {@link Environment} with random path.home and path.repo **/
private Environment createEnvironment() {
Path home = createTempDir();
return TestEnvironment.newEnvironment(Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), home.toAbsolutePath())
.put(Environment.PATH_REPO_SETTING.getKey(), home.resolve("repo").toAbsolutePath())
.build());
}
/** Create a {@link Repository} with a random name **/
private Repository createRepository() throws IOException {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry());
}
private static void runAsSnapshot(ThreadPool pool, Runnable runnable) {
runAsSnapshot(pool, (Callable<Void>) () -> {
runnable.run();
return null;
});
}
private static <T> T runAsSnapshot(ThreadPool pool, Callable<T> runnable) {
PlainActionFuture<T> future = new PlainActionFuture<>();
pool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
future.onResponse(runnable.call());
} catch (Exception e) {
future.onFailure(e);
}
});
try {
return future.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof Exception) {
throw ExceptionsHelper.convertToRuntime((Exception) e.getCause());
} else {
throw new AssertionError(e.getCause());
}
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
}

View File

@ -0,0 +1,245 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.snapshots;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FloatPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterMergePolicy;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.List;
public class SourceOnlySnapshotTests extends ESTestCase {
public void testSourceOnlyRandom() throws IOException {
try (Directory dir = newDirectory(); Directory targetDir = newDirectory()) {
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
IndexWriterConfig indexWriterConfig = newIndexWriterConfig().setIndexDeletionPolicy
(deletionPolicy).setSoftDeletesField(random().nextBoolean() ? null : Lucene.SOFT_DELETES_FIELD);
try (RandomIndexWriter writer = new RandomIndexWriter(random(), dir, indexWriterConfig, false)) {
final String softDeletesField = writer.w.getConfig().getSoftDeletesField();
// we either use the soft deletes directly or manually delete them to test the additional delete functionality
boolean modifyDeletedDocs = softDeletesField != null && randomBoolean();
SourceOnlySnapshot snapshoter = new SourceOnlySnapshot(targetDir,
modifyDeletedDocs ? () -> new DocValuesFieldExistsQuery(softDeletesField) : null) {
@Override
DirectoryReader wrapReader(DirectoryReader reader) throws IOException {
return modifyDeletedDocs ? reader : super.wrapReader(reader);
}
};
writer.commit();
int numDocs = scaledRandomIntBetween(100, 10000);
boolean appendOnly = randomBoolean();
for (int i = 0; i < numDocs; i++) {
int docId = appendOnly ? i : randomIntBetween(0, 100);
Document d = newRandomDocument(docId);
if (appendOnly) {
writer.addDocument(d);
} else {
writer.updateDocument(new Term("id", Integer.toString(docId)), d);
}
if (rarely()) {
if (randomBoolean()) {
writer.commit();
}
IndexCommit snapshot = deletionPolicy.snapshot();
try {
snapshoter.syncSnapshot(snapshot);
} finally {
deletionPolicy.release(snapshot);
}
}
}
if (randomBoolean()) {
writer.commit();
}
IndexCommit snapshot = deletionPolicy.snapshot();
try {
snapshoter.syncSnapshot(snapshot);
try (DirectoryReader snapReader = snapshoter.wrapReader(DirectoryReader.open(targetDir));
DirectoryReader wrappedReader = snapshoter.wrapReader(DirectoryReader.open(snapshot))) {
DirectoryReader reader = modifyDeletedDocs
? new SoftDeletesDirectoryReaderWrapper(wrappedReader, softDeletesField) : wrappedReader;
assertEquals(snapReader.maxDoc(), reader.maxDoc());
assertEquals(snapReader.numDocs(), reader.numDocs());
for (int i = 0; i < snapReader.maxDoc(); i++) {
assertEquals(snapReader.document(i).get("_source"), reader.document(i).get("_source"));
}
for (LeafReaderContext ctx : snapReader.leaves()) {
if (ctx.reader() instanceof SegmentReader) {
assertNull(((SegmentReader) ctx.reader()).getSegmentInfo().info.getIndexSort());
}
}
}
} finally {
deletionPolicy.release(snapshot);
}
}
}
}
private Document newRandomDocument(int id) {
Document doc = new Document();
doc.add(new StringField("id", Integer.toString(id), Field.Store.YES));
doc.add(new NumericDocValuesField("id", id));
if (randomBoolean()) {
doc.add(new TextField("text", "the quick brown fox", Field.Store.NO));
}
if (randomBoolean()) {
doc.add(new FloatPoint("float_point", 1.3f, 3.4f));
}
if (randomBoolean()) {
doc.add(new NumericDocValuesField("some_value", randomLong()));
}
doc.add(new StoredField("_source", randomRealisticUnicodeOfCodepointLengthBetween(5, 10)));
return doc;
}
public void testSrcOnlySnap() throws IOException {
try (Directory dir = newDirectory()) {
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()
.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setIndexDeletionPolicy(deletionPolicy).setMergePolicy(new FilterMergePolicy(NoMergePolicy.INSTANCE) {
@Override
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) {
return randomBoolean();
}
}));
Document doc = new Document();
doc.add(new StringField("id", "1", Field.Store.YES));
doc.add(new TextField("text", "the quick brown fox", Field.Store.NO));
doc.add(new NumericDocValuesField("rank", 1));
doc.add(new StoredField("src", "the quick brown fox"));
writer.addDocument(doc);
doc = new Document();
doc.add(new StringField("id", "2", Field.Store.YES));
doc.add(new TextField("text", "the quick blue fox", Field.Store.NO));
doc.add(new NumericDocValuesField("rank", 2));
doc.add(new StoredField("src", "the quick blue fox"));
doc.add(new StoredField("dummy", "foo")); // add a field only this segment has
writer.addDocument(doc);
writer.flush();
doc = new Document();
doc.add(new StringField("id", "1", Field.Store.YES));
doc.add(new TextField("text", "the quick brown fox", Field.Store.NO));
doc.add(new NumericDocValuesField("rank", 3));
doc.add(new StoredField("src", "the quick brown fox"));
writer.softUpdateDocument(new Term("id", "1"), doc, new NumericDocValuesField(Lucene.SOFT_DELETES_FIELD, 1));
writer.commit();
Directory targetDir = newDirectory();
IndexCommit snapshot = deletionPolicy.snapshot();
SourceOnlySnapshot snapshoter = new SourceOnlySnapshot(targetDir);
snapshoter.syncSnapshot(snapshot);
StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(snapshot);
try (DirectoryReader snapReader = DirectoryReader.open(targetDir)) {
assertEquals(snapReader.maxDoc(), 3);
assertEquals(snapReader.numDocs(), 2);
for (int i = 0; i < 3; i++) {
assertEquals(snapReader.document(i).get("src"), reader.document(i).get("src"));
}
IndexSearcher searcher = new IndexSearcher(snapReader);
TopDocs id = searcher.search(new TermQuery(new Term("id", "1")), 10);
assertEquals(0, id.totalHits.value);
}
snapshoter = new SourceOnlySnapshot(targetDir);
List<String> createdFiles = snapshoter.syncSnapshot(snapshot);
assertEquals(0, createdFiles.size());
deletionPolicy.release(snapshot);
// now add another doc
doc = new Document();
doc.add(new StringField("id", "4", Field.Store.YES));
doc.add(new TextField("text", "the quick blue fox", Field.Store.NO));
doc.add(new NumericDocValuesField("rank", 2));
doc.add(new StoredField("src", "the quick blue fox"));
writer.addDocument(doc);
doc = new Document();
doc.add(new StringField("id", "5", Field.Store.YES));
doc.add(new TextField("text", "the quick blue fox", Field.Store.NO));
doc.add(new NumericDocValuesField("rank", 2));
doc.add(new StoredField("src", "the quick blue fox"));
writer.addDocument(doc);
writer.commit();
{
snapshot = deletionPolicy.snapshot();
snapshoter = new SourceOnlySnapshot(targetDir);
createdFiles = snapshoter.syncSnapshot(snapshot);
assertEquals(4, createdFiles.size());
for (String file : createdFiles) {
String extension = IndexFileNames.getExtension(file);
switch (extension) {
case "fdt":
case "fdx":
case "fnm":
case "si":
break;
default:
fail("unexpected extension: " + extension);
}
}
try(DirectoryReader snapReader = DirectoryReader.open(targetDir)) {
assertEquals(snapReader.maxDoc(), 5);
assertEquals(snapReader.numDocs(), 4);
}
deletionPolicy.release(snapshot);
}
writer.deleteDocuments(new Term("id", "5"));
writer.commit();
{
snapshot = deletionPolicy.snapshot();
snapshoter = new SourceOnlySnapshot(targetDir);
createdFiles = snapshoter.syncSnapshot(snapshot);
assertEquals(1, createdFiles.size());
for (String file : createdFiles) {
String extension = IndexFileNames.getExtension(file);
switch (extension) {
case "liv":
break;
default:
fail("unexpected extension: " + extension);
}
}
try(DirectoryReader snapReader = DirectoryReader.open(targetDir)) {
assertEquals(snapReader.maxDoc(), 5);
assertEquals(snapReader.numDocs(), 3);
}
deletionPolicy.release(snapshot);
}
writer.close();
targetDir.close();
reader.close();
}
}
}

View File

@ -0,0 +1,84 @@
---
setup:
- do:
snapshot.create_repository:
repository: test_repo_restore_1
body:
type: source
settings:
delegate_type: fs
location: "test_repo_restore_1_loc"
- do:
indices.create:
index: test_index
body:
settings:
number_of_shards: 1
number_of_replicas: 0
- do:
cluster.health:
wait_for_status: green
---
"Create a source only snapshot and then restore it":
- do:
index:
index: test_index
type: _doc
id: 1
body: { foo: bar }
- do:
indices.flush:
index: test_index
- do:
snapshot.create:
repository: test_repo_restore_1
snapshot: test_snapshot
wait_for_completion: true
- match: { snapshot.snapshot: test_snapshot }
- match: { snapshot.state : SUCCESS }
- match: { snapshot.shards.successful: 1 }
- match: { snapshot.shards.failed : 0 }
- is_true: snapshot.version
- gt: { snapshot.version_id: 0}
- do:
indices.close:
index : test_index
- do:
snapshot.restore:
repository: test_repo_restore_1
snapshot: test_snapshot
wait_for_completion: true
- do:
indices.recovery:
index: test_index
- match: { test_index.shards.0.type: SNAPSHOT }
- match: { test_index.shards.0.stage: DONE }
- match: { test_index.shards.0.translog.recovered: 0}
- match: { test_index.shards.0.translog.total: 0}
- match: { test_index.shards.0.translog.total_on_start: 0}
- match: { test_index.shards.0.index.files.recovered: 5}
- match: { test_index.shards.0.index.files.reused: 0}
- match: { test_index.shards.0.index.size.reused_in_bytes: 0}
- gt: { test_index.shards.0.index.size.recovered_in_bytes: 0}
- do:
search:
index: test_index
body:
query:
match_all: {}
- match: {hits.total: 1 }
- length: {hits.hits: 1 }
- match: {hits.hits.0._id: "1" }