From dd7be74bcf12cd66621acc2bc86336bd694ca738 Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Tue, 5 Jul 2016 22:12:24 -0700 Subject: [PATCH] Plugins: Simplified repository api for snapshot/restore The api for snapshot/restore was split up between two interfaces, Repository and IndexShardRepository. There was also complex initialization and injection between the two. However, there is always a one to one relationship between the two. This change moves the IndexShardRepository api into Repository, as well as updates the API so as not to require any services to be injected for sublcasses. --- .../elasticsearch/index/shard/IndexShard.java | 8 +- .../index/shard/StoreRecovery.java | 10 +- .../index/snapshots/IndexShardRepository.java | 81 -- .../BlobStoreIndexShardRepository.java | 972 ------------------ .../repositories/RepositoriesModule.java | 14 +- .../repositories/RepositoriesService.java | 32 +- .../repositories/Repository.java | 60 +- .../repositories/RepositoryModule.java | 61 -- .../repositories/RepositoryNameModule.java | 39 - .../repositories/RepositoryTypesRegistry.java | 9 +- .../VerifyNodeRepositoryAction.java | 24 +- .../blobstore/BlobStoreRepository.java | 905 ++++++++++++++-- .../repositories/fs/FsRepository.java | 6 +- .../uri/URLIndexShardRepository.java | 43 - .../repositories/uri/URLRepository.java | 9 +- .../snapshots/RestoreService.java | 3 +- .../snapshots/SnapshotShardsService.java | 8 +- .../snapshots/SnapshotsService.java | 4 +- .../index/shard/IndexShardTests.java | 124 ++- .../snapshots/RepositoriesIT.java | 2 - .../snapshots/mockstore/MockRepository.java | 49 +- .../azure/AzureRepositoryPlugin.java | 13 +- .../repositories/azure/AzureRepository.java | 4 +- .../gcs/GoogleCloudStoragePlugin.java | 14 +- .../gcs/GoogleCloudStorageRepository.java | 4 +- .../repositories/hdfs/HdfsPlugin.java | 3 +- .../repositories/hdfs/HdfsRepository.java | 5 +- .../repository/s3/S3RepositoryPlugin.java | 24 +- .../repositories/s3/S3Repository.java | 6 +- .../cloud/aws/RepositoryS3SettingsTests.java | 2 +- 30 files changed, 1087 insertions(+), 1451 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java delete mode 100644 core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java delete mode 100644 core/src/main/java/org/elasticsearch/repositories/RepositoryModule.java delete mode 100644 core/src/main/java/org/elasticsearch/repositories/RepositoryNameModule.java delete mode 100644 core/src/main/java/org/elasticsearch/repositories/uri/URLIndexShardRepository.java diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ab2fe683e81..a97aad7abfa 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -94,7 +94,6 @@ import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.ShardSearchStats; import org.elasticsearch.index.similarity.SimilarityService; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store.MetadataSnapshot; import org.elasticsearch.index.store.StoreFileMetaData; @@ -111,6 +110,7 @@ import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTargetService; import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.Repository; import org.elasticsearch.search.suggest.completion.CompletionFieldStats; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.search.suggest.completion2x.Completion090PostingsFormat; @@ -1161,7 +1161,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return storeRecovery.recoverFromStore(this, shouldExist); } - public boolean restoreFromRepository(IndexShardRepository repository) { + public boolean restoreFromRepository(Repository repository) { assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); return storeRecovery.recoverFromRepository(this, repository); @@ -1449,9 +1449,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread threadPool.generic().execute(() -> { try { - final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository( + final Repository repository = repositoriesService.repository( recoveryState.getRestoreSource().snapshot().getRepository()); - if (restoreFromRepository(indexShardRepository)) { + if (restoreFromRepository(repository)) { recoveryListener.onRecoveryDone(recoveryState); } } catch (Exception first) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 01513aa6bc6..c1016f0f4e3 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -40,10 +40,10 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.Repository; import java.io.IOException; import java.util.Arrays; @@ -219,14 +219,14 @@ final class StoreRecovery { } /** - * Recovers an index from a given {@link IndexShardRepository}. This method restores a + * Recovers an index from a given {@link Repository}. This method restores a * previously created index snapshot into an existing initializing shard. * @param indexShard the index shard instance to recovery the snapshot from * @param repository the repository holding the physical files the shard should be recovered from * @return true if the shard has been recovered successfully, false if the recovery * has been ignored due to a concurrent modification of if the clusters state has changed due to async updates. */ - boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository) { + boolean recoverFromRepository(final IndexShard indexShard, Repository repository) { if (canRecover(indexShard)) { final ShardRouting shardRouting = indexShard.routingEntry(); if (shardRouting.restoreSource() == null) { @@ -380,7 +380,7 @@ final class StoreRecovery { /** * Restores shard from {@link RestoreSource} associated with this shard in routing table */ - private void restore(final IndexShard indexShard, final IndexShardRepository indexShardRepository) { + private void restore(final IndexShard indexShard, final Repository repository) { RestoreSource restoreSource = indexShard.routingEntry().restoreSource(); final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog(); if (restoreSource == null) { @@ -397,7 +397,7 @@ final class StoreRecovery { if (!shardId.getIndexName().equals(restoreSource.index())) { snapshotShardId = new ShardId(restoreSource.index(), IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id()); } - indexShardRepository.restore(restoreSource.snapshot().getSnapshotId(), restoreSource.version(), shardId, snapshotShardId, indexShard.recoveryState()); + repository.restore(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), snapshotShardId, indexShard.recoveryState()); indexShard.skipTranslogRecovery(); indexShard.finalizeRecovery(); indexShard.postRecovery("restore done"); diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java deleted file mode 100644 index 5988d82def2..00000000000 --- a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.snapshots; - -import org.apache.lucene.index.IndexCommit; -import org.elasticsearch.Version; -import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.recovery.RecoveryState; - -/** - * Shard-level snapshot repository - *

- * IndexShardRepository is used on data node to create snapshots of individual shards. See {@link org.elasticsearch.repositories.Repository} - * for more information. - */ -public interface IndexShardRepository { - - /** - * Creates a snapshot of the shard based on the index commit point. - *

- * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex} method. - * IndexShardRepository implementations shouldn't release the snapshot index commit point. It is done by the method caller. - *

- * As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check - * {@link IndexShardSnapshotStatus#aborted()} to see if the snapshot process should be aborted. - * - * @param snapshotId snapshot id - * @param shardId shard to be snapshotted - * @param snapshotIndexCommit commit point - * @param snapshotStatus snapshot status - */ - void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus); - - /** - * Restores snapshot of the shard. - *

- * The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied. - * - * @param snapshotId snapshot id - * @param shardId shard id (in the current index) - * @param version version of elasticsearch that created this snapshot - * @param snapshotShardId shard id (in the snapshot) - * @param recoveryState recovery state - */ - void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState); - - /** - * Retrieve shard snapshot status for the stored snapshot - * - * @param snapshotId snapshot id - * @param version version of elasticsearch that created this snapshot - * @param shardId shard id - * @return snapshot status - */ - IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId); - - /** - * Verifies repository settings on data node - * @param verificationToken value returned by {@link org.elasticsearch.repositories.Repository#startVerification()} - */ - void verify(String verificationToken); - -} diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java deleted file mode 100644 index 526aab7581b..00000000000 --- a/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ /dev/null @@ -1,972 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.snapshots.blobstore; - -import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; -import org.apache.lucene.store.RateLimiter; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefBuilder; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.Version; -import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.ParseFieldMatcher; -import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.blobstore.BlobMetaData; -import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.lucene.store.InputStreamIndexInput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.iterable.Iterables; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.snapshots.IndexShardRepository; -import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; -import org.elasticsearch.index.snapshots.IndexShardSnapshotException; -import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; -import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; -import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.store.StoreFileMetaData; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.indices.recovery.RecoveryState; -import org.elasticsearch.repositories.RepositoryName; -import org.elasticsearch.repositories.RepositoryVerificationException; -import org.elasticsearch.repositories.blobstore.BlobStoreFormat; -import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat; -import org.elasticsearch.repositories.blobstore.LegacyBlobStoreFormat; - -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static java.util.Collections.emptyMap; -import static java.util.Collections.unmodifiableMap; -import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.testBlobPrefix; - -/** - * Blob store based implementation of IndexShardRepository - */ -public class BlobStoreIndexShardRepository extends AbstractComponent implements IndexShardRepository { - - private static final int BUFFER_SIZE = 4096; - private BlobStore blobStore; - - private BlobPath basePath; - - private final String repositoryName; - - private ByteSizeValue chunkSize; - - private final IndicesService indicesService; - - private final ClusterService clusterService; - - private RateLimiter snapshotRateLimiter; - - private RateLimiter restoreRateLimiter; - - private RateLimitingInputStream.Listener snapshotThrottleListener; - - private RateLimitingInputStream.Listener restoreThrottleListener; - - private boolean compress; - - private final ParseFieldMatcher parseFieldMatcher; - - protected static final String LEGACY_SNAPSHOT_PREFIX = "snapshot-"; - - protected static final String LEGACY_SNAPSHOT_NAME_FORMAT = LEGACY_SNAPSHOT_PREFIX + "%s"; - - protected static final String SNAPSHOT_PREFIX = "snap-"; - - protected static final String SNAPSHOT_NAME_FORMAT = SNAPSHOT_PREFIX + "%s.dat"; - - protected static final String SNAPSHOT_CODEC = "snapshot"; - - protected static final String SNAPSHOT_INDEX_PREFIX = "index-"; - - protected static final String SNAPSHOT_INDEX_NAME_FORMAT = SNAPSHOT_INDEX_PREFIX + "%s"; - - protected static final String SNAPSHOT_INDEX_CODEC = "snapshots"; - - protected static final String DATA_BLOB_PREFIX = "__"; - - private ChecksumBlobStoreFormat indexShardSnapshotFormat; - - private LegacyBlobStoreFormat indexShardSnapshotLegacyFormat; - - private ChecksumBlobStoreFormat indexShardSnapshotsFormat; - - @Inject - public BlobStoreIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) { - super(settings); - this.parseFieldMatcher = new ParseFieldMatcher(settings); - this.repositoryName = repositoryName.name(); - this.indicesService = indicesService; - this.clusterService = clusterService; - } - - /** - * Called by {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository} on repository startup - * - * @param blobStore blob store - * @param basePath base path to blob store - * @param chunkSize chunk size - */ - public void initialize(BlobStore blobStore, BlobPath basePath, ByteSizeValue chunkSize, - RateLimiter snapshotRateLimiter, RateLimiter restoreRateLimiter, - final RateLimiterListener rateLimiterListener, boolean compress) { - this.blobStore = blobStore; - this.basePath = basePath; - this.chunkSize = chunkSize; - this.snapshotRateLimiter = snapshotRateLimiter; - this.restoreRateLimiter = restoreRateLimiter; - this.snapshotThrottleListener = nanos -> rateLimiterListener.onSnapshotPause(nanos); - this.restoreThrottleListener = nanos -> rateLimiterListener.onRestorePause(nanos); - this.compress = compress; - indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress()); - indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher); - indexShardSnapshotsFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_INDEX_CODEC, SNAPSHOT_INDEX_NAME_FORMAT, BlobStoreIndexShardSnapshots.PROTO, parseFieldMatcher, isCompress()); - } - - /** - * {@inheritDoc} - */ - @Override - public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { - SnapshotContext snapshotContext = new SnapshotContext(snapshotId, shardId, snapshotStatus); - snapshotStatus.startTime(System.currentTimeMillis()); - - try { - snapshotContext.snapshot(snapshotIndexCommit); - snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime()); - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE); - } catch (Exception e) { - snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime()); - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FAILURE); - snapshotStatus.failure(ExceptionsHelper.detailedMessage(e)); - if (e instanceof IndexShardSnapshotFailedException) { - throw (IndexShardSnapshotFailedException) e; - } else { - throw new IndexShardSnapshotFailedException(shardId, e); - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) { - final RestoreContext snapshotContext = new RestoreContext(snapshotId, version, shardId, snapshotShardId, recoveryState); - try { - snapshotContext.restore(); - } catch (Exception e) { - throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e); - } - } - - /** - * {@inheritDoc} - */ - @Override - public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) { - Context context = new Context(snapshotId, version, shardId); - BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot(); - IndexShardSnapshotStatus status = new IndexShardSnapshotStatus(); - status.updateStage(IndexShardSnapshotStatus.Stage.DONE); - status.startTime(snapshot.startTime()); - status.files(snapshot.numberOfFiles(), snapshot.totalSize()); - // The snapshot is done which means the number of processed files is the same as total - status.processedFiles(snapshot.numberOfFiles(), snapshot.totalSize()); - status.time(snapshot.time()); - return status; - } - - @Override - public void verify(String seed) { - BlobContainer testBlobContainer = blobStore.blobContainer(basePath.add(testBlobPrefix(seed))); - DiscoveryNode localNode = clusterService.localNode(); - if (testBlobContainer.blobExists("master.dat")) { - try { - testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", new BytesArray(seed)); - } catch (IOException exp) { - throw new RepositoryVerificationException(repositoryName, "store location [" + blobStore + "] is not accessible on the node [" + localNode + "]", exp); - } - } else { - throw new RepositoryVerificationException(repositoryName, "a file written by master to the store [" + blobStore + "] cannot be accessed on the node [" + localNode + "]. " - + "This might indicate that the store [" + blobStore + "] is not shared between this node and the master node or " - + "that permissions on the store don't allow reading files written by the master node"); - } - } - - /** - * Delete shard snapshot - * - * @param snapshotId snapshot id - * @param shardId shard id - */ - public void delete(SnapshotId snapshotId, Version version, ShardId shardId) { - Context context = new Context(snapshotId, version, shardId, shardId); - context.delete(); - } - - @Override - public String toString() { - return "BlobStoreIndexShardRepository[" + - "[" + repositoryName + - "], [" + blobStore + ']' + - ']'; - } - - /** - * Returns true if metadata files should be compressed - * - * @return true if compression is needed - */ - protected boolean isCompress() { - return compress; - } - - BlobStoreFormat indexShardSnapshotFormat(Version version) { - if (BlobStoreRepository.legacyMetaData(version)) { - return indexShardSnapshotLegacyFormat; - } else { - return indexShardSnapshotFormat; - } - } - - /** - * Context for snapshot/restore operations - */ - private class Context { - - protected final SnapshotId snapshotId; - - protected final ShardId shardId; - - protected final BlobContainer blobContainer; - - protected final Version version; - - public Context(SnapshotId snapshotId, Version version, ShardId shardId) { - this(snapshotId, version, shardId, shardId); - } - - public Context(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId) { - this.snapshotId = snapshotId; - this.version = version; - this.shardId = shardId; - blobContainer = blobStore.blobContainer(basePath.add("indices").add(snapshotShardId.getIndexName()).add(Integer.toString(snapshotShardId.getId()))); - } - - /** - * Delete shard snapshot - */ - public void delete() { - final Map blobs; - try { - blobs = blobContainer.listBlobs(); - } catch (IOException e) { - throw new IndexShardSnapshotException(shardId, "Failed to list content of gateway", e); - } - - Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); - BlobStoreIndexShardSnapshots snapshots = tuple.v1(); - int fileListGeneration = tuple.v2(); - - try { - indexShardSnapshotFormat(version).delete(blobContainer, snapshotId.getName()); - } catch (IOException e) { - logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId); - } - - // Build a list of snapshots that should be preserved - List newSnapshotsList = new ArrayList<>(); - for (SnapshotFiles point : snapshots) { - if (!point.snapshot().equals(snapshotId.getName())) { - newSnapshotsList.add(point); - } - } - // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index - finalize(newSnapshotsList, fileListGeneration + 1, blobs); - } - - /** - * Loads information about shard snapshot - */ - public BlobStoreIndexShardSnapshot loadSnapshot() { - try { - return indexShardSnapshotFormat(version).read(blobContainer, snapshotId.getName()); - } catch (IOException ex) { - throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex); - } - } - - /** - * Removes all unreferenced files from the repository and writes new index file - * - * We need to be really careful in handling index files in case of failures to make sure we have index file that - * points to files that were deleted. - * - * - * @param snapshots list of active snapshots in the container - * @param fileListGeneration the generation number of the snapshot index file - * @param blobs list of blobs in the container - */ - protected void finalize(List snapshots, int fileListGeneration, Map blobs) { - BlobStoreIndexShardSnapshots newSnapshots = new BlobStoreIndexShardSnapshots(snapshots); - List blobsToDelete = new ArrayList<>(); - // delete old index files first - for (String blobName : blobs.keySet()) { - // delete old file lists - if (indexShardSnapshotsFormat.isTempBlobName(blobName) || blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) { - blobsToDelete.add(blobName); - } - } - - try { - blobContainer.deleteBlobs(blobsToDelete); - } catch (IOException e) { - // We cannot delete index file - this is fatal, we cannot continue, otherwise we might end up - // with references to non-existing files - throw new IndexShardSnapshotFailedException(shardId, "error deleting index files during cleanup", e); - } - - blobsToDelete = new ArrayList<>(); - // now go over all the blobs, and if they don't exists in a snapshot, delete them - for (String blobName : blobs.keySet()) { - // delete unused files - if (blobName.startsWith(DATA_BLOB_PREFIX)) { - if (newSnapshots.findNameFile(FileInfo.canonicalName(blobName)) == null) { - blobsToDelete.add(blobName); - } - } - } - try { - blobContainer.deleteBlobs(blobsToDelete); - } catch (IOException e) { - logger.debug("[{}] [{}] error deleting some of the blobs [{}] during cleanup", e, snapshotId, shardId, blobsToDelete); - } - - // If we deleted all snapshots - we don't need to create the index file - if (snapshots.size() > 0) { - try { - indexShardSnapshotsFormat.writeAtomic(newSnapshots, blobContainer, Integer.toString(fileListGeneration)); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to write file list", e); - } - } - } - - /** - * Generates blob name - * - * @param generation the blob number - * @return the blob name - */ - protected String fileNameFromGeneration(long generation) { - return DATA_BLOB_PREFIX + Long.toString(generation, Character.MAX_RADIX); - } - - /** - * Finds the next available blob number - * - * @param blobs list of blobs in the repository - * @return next available blob number - */ - protected long findLatestFileNameGeneration(Map blobs) { - long generation = -1; - for (String name : blobs.keySet()) { - if (!name.startsWith(DATA_BLOB_PREFIX)) { - continue; - } - name = FileInfo.canonicalName(name); - try { - long currentGen = Long.parseLong(name.substring(DATA_BLOB_PREFIX.length()), Character.MAX_RADIX); - if (currentGen > generation) { - generation = currentGen; - } - } catch (NumberFormatException e) { - logger.warn("file [{}] does not conform to the '{}' schema", name, DATA_BLOB_PREFIX); - } - } - return generation; - } - - /** - * Loads all available snapshots in the repository - * - * @param blobs list of blobs in repository - * @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation - */ - protected Tuple buildBlobStoreIndexShardSnapshots(Map blobs) { - int latest = -1; - for (String name : blobs.keySet()) { - if (name.startsWith(SNAPSHOT_INDEX_PREFIX)) { - try { - int gen = Integer.parseInt(name.substring(SNAPSHOT_INDEX_PREFIX.length())); - if (gen > latest) { - latest = gen; - } - } catch (NumberFormatException ex) { - logger.warn("failed to parse index file name [{}]", name); - } - } - } - if (latest >= 0) { - try { - final BlobStoreIndexShardSnapshots shardSnapshots = - indexShardSnapshotsFormat.read(blobContainer, Integer.toString(latest)); - return new Tuple<>(shardSnapshots, latest); - } catch (IOException e) { - logger.warn("failed to read index file [{}]", e, SNAPSHOT_INDEX_PREFIX + latest); - } - } - - // We couldn't load the index file - falling back to loading individual snapshots - List snapshots = new ArrayList<>(); - for (String name : blobs.keySet()) { - try { - BlobStoreIndexShardSnapshot snapshot = null; - if (name.startsWith(SNAPSHOT_PREFIX)) { - snapshot = indexShardSnapshotFormat.readBlob(blobContainer, name); - } else if (name.startsWith(LEGACY_SNAPSHOT_PREFIX)) { - snapshot = indexShardSnapshotLegacyFormat.readBlob(blobContainer, name); - } - if (snapshot != null) { - snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); - } - } catch (IOException e) { - logger.warn("failed to read commit point [{}]", e, name); - } - } - return new Tuple<>(new BlobStoreIndexShardSnapshots(snapshots), -1); - } - } - - /** - * Context for snapshot operations - */ - private class SnapshotContext extends Context { - - private final Store store; - - private final IndexShardSnapshotStatus snapshotStatus; - - /** - * Constructs new context - * - * @param snapshotId snapshot id - * @param shardId shard to be snapshotted - * @param snapshotStatus snapshot status to report progress - */ - public SnapshotContext(SnapshotId snapshotId, ShardId shardId, IndexShardSnapshotStatus snapshotStatus) { - super(snapshotId, Version.CURRENT, shardId); - this.snapshotStatus = snapshotStatus; - store = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()).store(); - } - - /** - * Create snapshot from index commit point - * - * @param snapshotIndexCommit snapshot commit point - */ - public void snapshot(IndexCommit snapshotIndexCommit) { - logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, repositoryName); - store.incRef(); - try { - final Map blobs; - try { - blobs = blobContainer.listBlobs(); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e); - } - - long generation = findLatestFileNameGeneration(blobs); - Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); - BlobStoreIndexShardSnapshots snapshots = tuple.v1(); - int fileListGeneration = tuple.v2(); - - final List indexCommitPointFiles = new ArrayList<>(); - - int indexNumberOfFiles = 0; - long indexTotalFilesSize = 0; - ArrayList filesToSnapshot = new ArrayList<>(); - final Store.MetadataSnapshot metadata; - // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should - final Collection fileNames; - try { - metadata = store.getMetadata(snapshotIndexCommit); - fileNames = snapshotIndexCommit.getFileNames(); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); - } - for (String fileName : fileNames) { - if (snapshotStatus.aborted()) { - logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); - throw new IndexShardSnapshotFailedException(shardId, "Aborted"); - } - logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName); - final StoreFileMetaData md = metadata.get(fileName); - FileInfo existingFileInfo = null; - List filesInfo = snapshots.findPhysicalIndexFiles(fileName); - if (filesInfo != null) { - for (FileInfo fileInfo : filesInfo) { - try { - // in 1.3.3 we added additional hashes for .si / segments_N files - // to ensure we don't double the space in the repo since old snapshots - // don't have this hash we try to read that hash from the blob store - // in a bwc compatible way. - maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata); - } catch (Exception e) { - logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); - } - if (fileInfo.isSame(md) && snapshotFileExistsInBlobs(fileInfo, blobs)) { - // a commit point file with the same name, size and checksum was already copied to repository - // we will reuse it for this snapshot - existingFileInfo = fileInfo; - break; - } - } - } - if (existingFileInfo == null) { - indexNumberOfFiles++; - indexTotalFilesSize += md.length(); - // create a new FileInfo - BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = new BlobStoreIndexShardSnapshot.FileInfo(fileNameFromGeneration(++generation), md, chunkSize); - indexCommitPointFiles.add(snapshotFileInfo); - filesToSnapshot.add(snapshotFileInfo); - } else { - indexCommitPointFiles.add(existingFileInfo); - } - } - - snapshotStatus.files(indexNumberOfFiles, indexTotalFilesSize); - - if (snapshotStatus.aborted()) { - logger.debug("[{}] [{}] Aborted during initialization", shardId, snapshotId); - throw new IndexShardSnapshotFailedException(shardId, "Aborted"); - } - - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.STARTED); - - for (FileInfo snapshotFileInfo : filesToSnapshot) { - try { - snapshotFile(snapshotFileInfo); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); - } - } - - snapshotStatus.indexVersion(snapshotIndexCommit.getGeneration()); - // now create and write the commit point - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE); - - BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), - snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(), - // snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong - System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize); - //TODO: The time stored in snapshot doesn't include cleanup time. - logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); - try { - indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getName()); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); - } - - // delete all files that are not referenced by any commit point - // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones - List newSnapshotsList = new ArrayList<>(); - newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); - for (SnapshotFiles point : snapshots) { - newSnapshotsList.add(point); - } - // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index - finalize(newSnapshotsList, fileListGeneration + 1, blobs); - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE); - } finally { - store.decRef(); - } - } - - /** - * Snapshot individual file - *

- * This is asynchronous method. Upon completion of the operation latch is getting counted down and any failures are - * added to the {@code failures} list - * - * @param fileInfo file to be snapshotted - */ - private void snapshotFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) throws IOException { - final String file = fileInfo.physicalName(); - try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) { - for (int i = 0; i < fileInfo.numberOfParts(); i++) { - final long partBytes = fileInfo.partBytes(i); - - final InputStreamIndexInput inputStreamIndexInput = new InputStreamIndexInput(indexInput, partBytes); - InputStream inputStream = snapshotRateLimiter == null ? inputStreamIndexInput : new RateLimitingInputStream(inputStreamIndexInput, snapshotRateLimiter, snapshotThrottleListener); - inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName()); - blobContainer.writeBlob(fileInfo.partName(i), inputStream, partBytes); - } - Store.verify(indexInput); - snapshotStatus.addProcessedFile(fileInfo.length()); - } catch (Exception t) { - failStoreIfCorrupted(t); - snapshotStatus.addProcessedFile(0); - throw t; - } - } - - private void failStoreIfCorrupted(Exception e) { - if (e instanceof CorruptIndexException || e instanceof IndexFormatTooOldException || e instanceof IndexFormatTooNewException) { - try { - store.markStoreCorrupted((IOException) e); - } catch (IOException inner) { - inner.addSuppressed(e); - logger.warn("store cannot be marked as corrupted", inner); - } - } - } - - /** - * Checks if snapshot file already exists in the list of blobs - * - * @param fileInfo file to check - * @param blobs list of blobs - * @return true if file exists in the list of blobs - */ - private boolean snapshotFileExistsInBlobs(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Map blobs) { - BlobMetaData blobMetaData = blobs.get(fileInfo.name()); - if (blobMetaData != null) { - return blobMetaData.length() == fileInfo.length(); - } else if (blobs.containsKey(fileInfo.partName(0))) { - // multi part file sum up the size and check - int part = 0; - long totalSize = 0; - while (true) { - blobMetaData = blobs.get(fileInfo.partName(part++)); - if (blobMetaData == null) { - break; - } - totalSize += blobMetaData.length(); - } - return totalSize == fileInfo.length(); - } - // no file, not exact and not multipart - return false; - } - - private class AbortableInputStream extends FilterInputStream { - private final String fileName; - - public AbortableInputStream(InputStream delegate, String fileName) { - super(delegate); - this.fileName = fileName; - } - - @Override - public int read() throws IOException { - checkAborted(); - return in.read(); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - checkAborted(); - return in.read(b, off, len); - } - - private void checkAborted() { - if (snapshotStatus.aborted()) { - logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); - throw new IndexShardSnapshotFailedException(shardId, "Aborted"); - } - } - } - } - - /** - * This is a BWC layer to ensure we update the snapshots metadata with the corresponding hashes before we compare them. - * The new logic for StoreFileMetaData reads the entire .si and segments.n files to strengthen the - * comparison of the files on a per-segment / per-commit level. - */ - private static void maybeRecalculateMetadataHash(final BlobContainer blobContainer, final FileInfo fileInfo, Store.MetadataSnapshot snapshot) throws Exception { - final StoreFileMetaData metadata; - if (fileInfo != null && (metadata = snapshot.get(fileInfo.physicalName())) != null) { - if (metadata.hash().length > 0 && fileInfo.metadata().hash().length == 0) { - // we have a hash - check if our repo has a hash too otherwise we have - // to calculate it. - // we might have multiple parts even though the file is small... make sure we read all of it. - try (final InputStream stream = new PartSliceStream(blobContainer, fileInfo)) { - BytesRefBuilder builder = new BytesRefBuilder(); - Store.MetadataSnapshot.hashFile(builder, stream, fileInfo.length()); - BytesRef hash = fileInfo.metadata().hash(); // reset the file infos metadata hash - assert hash.length == 0; - hash.bytes = builder.bytes(); - hash.offset = 0; - hash.length = builder.length(); - } - } - } - } - - private static final class PartSliceStream extends SlicedInputStream { - - private final BlobContainer container; - private final FileInfo info; - - public PartSliceStream(BlobContainer container, FileInfo info) { - super(info.numberOfParts()); - this.info = info; - this.container = container; - } - - @Override - protected InputStream openSlice(long slice) throws IOException { - return container.readBlob(info.partName(slice)); - } - } - - /** - * Context for restore operations - */ - private class RestoreContext extends Context { - - private final Store store; - - private final RecoveryState recoveryState; - - /** - * Constructs new restore context - * - * @param snapshotId snapshot id - * @param shardId shard to be restored - * @param snapshotShardId shard in the snapshot that data should be restored from - * @param recoveryState recovery state to report progress - */ - public RestoreContext(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) { - super(snapshotId, version, shardId, snapshotShardId); - this.recoveryState = recoveryState; - store = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()).store(); - } - - /** - * Performs restore operation - */ - public void restore() throws IOException { - store.incRef(); - try { - logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId); - BlobStoreIndexShardSnapshot snapshot = loadSnapshot(); - - if (snapshot.indexFiles().size() == 1 - && snapshot.indexFiles().get(0).physicalName().startsWith("segments_") - && snapshot.indexFiles().get(0).hasUnknownChecksum()) { - // If the shard has no documents, it will only contain a single segments_N file for the - // shard's snapshot. If we are restoring a snapshot created by a previous supported version, - // it is still possible that in that version, an empty shard has a segments_N file with an unsupported - // version (and no checksum), because we don't know the Lucene version to assign segments_N until we - // have written some data. Since the segments_N for an empty shard could have an incompatible Lucene - // version number and no checksum, even though the index itself is perfectly fine to restore, this - // empty shard would cause exceptions to be thrown. Since there is no data to restore from an empty - // shard anyway, we just create the empty shard here and then exit. - IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(null) - .setOpenMode(IndexWriterConfig.OpenMode.CREATE) - .setCommitOnClose(true)); - writer.close(); - return; - } - - SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); - final Store.MetadataSnapshot recoveryTargetMetadata; - try { - recoveryTargetMetadata = store.getMetadataOrEmpty(); - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) { - logger.warn("{} Can't read metadata from store", e, shardId); - throw new IndexShardRestoreFailedException(shardId, "Can't restore corrupted shard", e); - } - - final List filesToRecover = new ArrayList<>(); - final Map snapshotMetaData = new HashMap<>(); - final Map fileInfos = new HashMap<>(); - for (final FileInfo fileInfo : snapshot.indexFiles()) { - try { - // in 1.3.3 we added additional hashes for .si / segments_N files - // to ensure we don't double the space in the repo since old snapshots - // don't have this hash we try to read that hash from the blob store - // in a bwc compatible way. - maybeRecalculateMetadataHash(blobContainer, fileInfo, recoveryTargetMetadata); - } catch (Exception e) { - // if the index is broken we might not be able to read it - logger.warn("{} Can't calculate hash from blog for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); - } - snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata()); - fileInfos.put(fileInfo.metadata().name(), fileInfo); - } - final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(unmodifiableMap(snapshotMetaData), emptyMap(), 0); - final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata); - for (StoreFileMetaData md : diff.identical) { - FileInfo fileInfo = fileInfos.get(md.name()); - recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), true); - if (logger.isTraceEnabled()) { - logger.trace("[{}] [{}] not_recovering [{}] from [{}], exists in local store and is same", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name()); - } - } - - for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) { - FileInfo fileInfo = fileInfos.get(md.name()); - filesToRecover.add(fileInfo); - recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), false); - if (logger.isTraceEnabled()) { - if (md == null) { - logger.trace("[{}] [{}] recovering [{}] from [{}], does not exists in local store", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name()); - } else { - logger.trace("[{}] [{}] recovering [{}] from [{}], exists in local store but is different", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name()); - } - } - } - final RecoveryState.Index index = recoveryState.getIndex(); - if (filesToRecover.isEmpty()) { - logger.trace("no files to recover, all exists within the local store"); - } - - if (logger.isTraceEnabled()) { - logger.trace("[{}] [{}] recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", shardId, snapshotId, - index.totalRecoverFiles(), new ByteSizeValue(index.totalRecoverBytes()), index.reusedFileCount(), new ByteSizeValue(index.reusedFileCount())); - } - try { - for (final FileInfo fileToRecover : filesToRecover) { - logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); - restoreFile(fileToRecover); - } - } catch (IOException ex) { - throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex); - } - final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile(); - if (recoveryTargetMetadata == null) { - throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file"); - } - assert restoredSegmentsFile != null; - // read the snapshot data persisted - final SegmentInfos segmentCommitInfos; - try { - segmentCommitInfos = Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory()); - } catch (IOException e) { - throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); - } - recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion()); - - /// now, go over and clean files that are in the store, but were not in the snapshot - try { - for (String storeFile : store.directory().listAll()) { - if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) { - continue; //skip write.lock, checksum files and files that exist in the snapshot - } - try { - store.deleteQuiet("restore", storeFile); - store.directory().deleteFile(storeFile); - } catch (IOException e) { - logger.warn("[{}] failed to delete file [{}] during snapshot cleanup", snapshotId, storeFile); - } - } - } catch (IOException e) { - logger.warn("[{}] failed to list directory - some of files might not be deleted", snapshotId); - } - } finally { - store.decRef(); - } - } - - /** - * Restores a file - * This is asynchronous method. Upon completion of the operation latch is getting counted down and any failures are - * added to the {@code failures} list - * - * @param fileInfo file to be restored - */ - private void restoreFile(final FileInfo fileInfo) throws IOException { - boolean success = false; - - try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) { - final InputStream stream; - if (restoreRateLimiter == null) { - stream = partSliceStream; - } else { - stream = new RateLimitingInputStream(partSliceStream, restoreRateLimiter, restoreThrottleListener); - } - try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { - final byte[] buffer = new byte[BUFFER_SIZE]; - int length; - while ((length = stream.read(buffer)) > 0) { - indexOutput.writeBytes(buffer, 0, length); - recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length); - } - Store.verify(indexOutput); - indexOutput.close(); - store.directory().sync(Collections.singleton(fileInfo.physicalName())); - success = true; - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { - try { - store.markStoreCorrupted(ex); - } catch (IOException e) { - logger.warn("store cannot be marked as corrupted", e); - } - throw ex; - } finally { - if (success == false) { - store.deleteQuiet(fileInfo.physicalName()); - } - } - } - } - - } - - public interface RateLimiterListener { - void onRestorePause(long nanos); - - void onSnapshotPause(long nanos); - } - -} diff --git a/core/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java b/core/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java index acad2753f7a..0ba9cccddaf 100644 --- a/core/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java +++ b/core/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java @@ -21,8 +21,6 @@ package org.elasticsearch.repositories; import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.index.snapshots.IndexShardRepository; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.repositories.uri.URLRepository; import org.elasticsearch.snapshots.RestoreService; @@ -32,20 +30,20 @@ import org.elasticsearch.snapshots.SnapshotsService; /** * Sets up classes for Snapshot/Restore. * - * Plugins can add custom repository types by calling {@link #registerRepository(String, Class, Class)}. + * Plugins can add custom repository types by calling {@link #registerRepository(String, Class)}. */ public class RepositoriesModule extends AbstractModule { private final RepositoryTypesRegistry repositoryTypes = new RepositoryTypesRegistry(); public RepositoriesModule() { - registerRepository(FsRepository.TYPE, FsRepository.class, BlobStoreIndexShardRepository.class); - registerRepository(URLRepository.TYPE, URLRepository.class, BlobStoreIndexShardRepository.class); + registerRepository(FsRepository.TYPE, FsRepository.class); + registerRepository(URLRepository.TYPE, URLRepository.class); } - /** Registers a custom repository type to the given {@link Repository} and {@link IndexShardRepository}. */ - public void registerRepository(String type, Class repositoryType, Class shardRepositoryType) { - repositoryTypes.registerRepository(type, repositoryType, shardRepositoryType); + /** Registers a custom repository type to the given {@link Repository}. */ + public void registerRepository(String type, Class repositoryType) { + repositoryTypes.registerRepository(type, repositoryType); } @Override diff --git a/core/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/core/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 0ed53c0a9c5..6efba054c40 100644 --- a/core/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/core/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -37,7 +37,6 @@ import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.transport.TransportService; @@ -336,23 +335,6 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta throw new RepositoryMissingException(repository); } - /** - * Returns registered index shard repository - *

- * This method is called only on data nodes - * - * @param repository repository name - * @return registered repository - * @throws RepositoryMissingException if repository with such name isn't registered - */ - public IndexShardRepository indexShardRepository(String repository) { - RepositoryHolder holder = repositories.get(repository); - if (holder != null) { - return holder.indexShardRepository; - } - throw new RepositoryMissingException(repository); - } - /** * Creates a new repository and adds it to the list of registered repositories. *

@@ -403,14 +385,16 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta try { ModulesBuilder modules = new ModulesBuilder(); RepositoryName name = new RepositoryName(repositoryMetaData.type(), repositoryMetaData.name()); - modules.add(new RepositoryNameModule(name)); - modules.add(new RepositoryModule(name, repositoryMetaData.settings(), this.settings, typesRegistry)); + modules.add(b -> { + b.bind(RepositoryName.class).toInstance(name); + typesRegistry.bindType(b, repositoryMetaData.type()); + b.bind(RepositorySettings.class).toInstance(new RepositorySettings(settings, repositoryMetaData.settings())); + }); repositoryInjector = modules.createChildInjector(injector); Repository repository = repositoryInjector.getInstance(Repository.class); - IndexShardRepository indexShardRepository = repositoryInjector.getInstance(IndexShardRepository.class); repository.start(); - return new RepositoryHolder(repositoryMetaData.type(), repositoryMetaData.settings(), repository, indexShardRepository); + return new RepositoryHolder(repositoryMetaData.type(), repositoryMetaData.settings(), repository); } catch (Exception e) { logger.warn("failed to create repository [{}][{}]", e, repositoryMetaData.type(), repositoryMetaData.name()); throw new RepositoryException(repositoryMetaData.name(), "failed to create repository", e); @@ -472,13 +456,11 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta private final String type; private final Settings settings; private final Repository repository; - private final IndexShardRepository indexShardRepository; - public RepositoryHolder(String type, Settings settings,Repository repository, IndexShardRepository indexShardRepository) { + public RepositoryHolder(String type, Settings settings,Repository repository) { this.type = type; this.settings = settings; this.repository = repository; - this.indexShardRepository = indexShardRepository; } } diff --git a/core/src/main/java/org/elasticsearch/repositories/Repository.java b/core/src/main/java/org/elasticsearch/repositories/Repository.java index 47a87edd7f9..6c4cf93a7a4 100644 --- a/core/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/core/src/main/java/org/elasticsearch/repositories/Repository.java @@ -19,7 +19,11 @@ package org.elasticsearch.repositories; import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.index.shard.ShardId; @@ -29,22 +33,19 @@ import org.elasticsearch.snapshots.SnapshotShardFailure; import java.io.IOException; import java.util.List; -import java.util.function.Predicate; /** * Snapshot repository interface. *

- * Responsible for index and cluster level operations. It's called only on master. - * Shard-level operations are performed using {@link org.elasticsearch.index.snapshots.IndexShardRepository} - * interface on data nodes. + * Responsible for index and cluster and shard level operations. *

* Typical snapshot usage pattern: *

*/ public interface Repository extends LifecycleComponent { @@ -141,4 +142,49 @@ public interface Repository extends LifecycleComponent { */ boolean readOnly(); + /** + * Creates a snapshot of the shard based on the index commit point. + *

+ * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex} method. + * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller. + *

+ * As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check + * {@link IndexShardSnapshotStatus#aborted()} to see if the snapshot process should be aborted. + * + * @param shard shard to be snapshotted + * @param snapshotId snapshot id + * @param snapshotIndexCommit commit point + * @param snapshotStatus snapshot status + */ + void snapshot(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus); + + /** + * Restores snapshot of the shard. + *

+ * The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied. + * + * @param shard the shard to restore the index into + * @param snapshotId snapshot id + * @param version version of elasticsearch that created this snapshot + * @param snapshotShardId shard id (in the snapshot) + * @param recoveryState recovery state + */ + void restore(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState); + + /** + * Retrieve shard snapshot status for the stored snapshot + * + * @param snapshotId snapshot id + * @param version version of elasticsearch that created this snapshot + * @param shardId shard id + * @return snapshot status + */ + IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId); + + /** + * Verifies repository settings on data node. + * @param verificationToken value returned by {@link org.elasticsearch.repositories.Repository#startVerification()} + * @param localNode the local node information, for inclusion in verification errors + */ + void verify(String verificationToken, DiscoveryNode localNode); } diff --git a/core/src/main/java/org/elasticsearch/repositories/RepositoryModule.java b/core/src/main/java/org/elasticsearch/repositories/RepositoryModule.java deleted file mode 100644 index 3ed3a78c0d1..00000000000 --- a/core/src/main/java/org/elasticsearch/repositories/RepositoryModule.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.repositories; - -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.settings.Settings; - -/** - * Binds repository classes for the specific repository type. - */ -public class RepositoryModule extends AbstractModule { - - private RepositoryName repositoryName; - - private final Settings globalSettings; - - private final Settings settings; - - private final RepositoryTypesRegistry typesRegistry; - - /** - * Spawns module for repository with specified name, type and settings - * - * @param repositoryName repository name and type - * @param settings repository settings - * @param globalSettings global settings - * @param typesRegistry registry of repository types - */ - public RepositoryModule(RepositoryName repositoryName, Settings settings, Settings globalSettings, RepositoryTypesRegistry typesRegistry) { - this.repositoryName = repositoryName; - this.globalSettings = globalSettings; - this.settings = settings; - this.typesRegistry = typesRegistry; - } - - /** - * {@inheritDoc} - */ - @Override - protected void configure() { - typesRegistry.bindType(binder(), repositoryName.type()); - bind(RepositorySettings.class).toInstance(new RepositorySettings(globalSettings, settings)); - } -} diff --git a/core/src/main/java/org/elasticsearch/repositories/RepositoryNameModule.java b/core/src/main/java/org/elasticsearch/repositories/RepositoryNameModule.java deleted file mode 100644 index 47be67df34c..00000000000 --- a/core/src/main/java/org/elasticsearch/repositories/RepositoryNameModule.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.repositories; - -import org.elasticsearch.common.inject.AbstractModule; - -/** - * Binds specific instance of RepositoryName for injection to repository module - */ -public class RepositoryNameModule extends AbstractModule { - - private final RepositoryName repositoryName; - - public RepositoryNameModule(RepositoryName repositoryName) { - this.repositoryName = repositoryName; - } - - @Override - protected void configure() { - bind(RepositoryName.class).toInstance(repositoryName); - } -} diff --git a/core/src/main/java/org/elasticsearch/repositories/RepositoryTypesRegistry.java b/core/src/main/java/org/elasticsearch/repositories/RepositoryTypesRegistry.java index d2f02aa5795..63a790b77fd 100644 --- a/core/src/main/java/org/elasticsearch/repositories/RepositoryTypesRegistry.java +++ b/core/src/main/java/org/elasticsearch/repositories/RepositoryTypesRegistry.java @@ -22,22 +22,18 @@ package org.elasticsearch.repositories; import org.elasticsearch.common.inject.Binder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.ExtensionPoint; -import org.elasticsearch.index.snapshots.IndexShardRepository; /** - * A mapping from type name to implementations of {@link Repository} and {@link IndexShardRepository}. + * A mapping from type name to implementations of {@link Repository}. */ public class RepositoryTypesRegistry { // invariant: repositories and shardRepositories have the same keyset private final ExtensionPoint.SelectedType repositoryTypes = new ExtensionPoint.SelectedType<>("repository", Repository.class); - private final ExtensionPoint.SelectedType shardRepositoryTypes = - new ExtensionPoint.SelectedType<>("index_repository", IndexShardRepository.class); /** Adds a new repository type to the registry, bound to the given implementation classes. */ - public void registerRepository(String name, Class repositoryType, Class shardRepositoryType) { + public void registerRepository(String name, Class repositoryType) { repositoryTypes.registerExtension(name, repositoryType); - shardRepositoryTypes.registerExtension(name, shardRepositoryType); } /** @@ -47,6 +43,5 @@ public class RepositoryTypesRegistry { public void bindType(Binder binder, String type) { Settings settings = Settings.builder().put("type", type).build(); repositoryTypes.bindType(binder, settings, "type", null); - shardRepositoryTypes.bindType(binder, settings, "type", null); } } diff --git a/core/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java b/core/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java index 796c65e9b46..65544421c8c 100644 --- a/core/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java +++ b/core/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java @@ -19,6 +19,12 @@ package org.elasticsearch.repositories; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + import com.carrotsearch.hppc.ObjectContainer; import com.carrotsearch.hppc.cursors.ObjectCursor; import org.elasticsearch.action.ActionListener; @@ -29,7 +35,6 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoriesService.VerifyResponse; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.EmptyTransportResponseHandler; @@ -40,12 +45,6 @@ import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicInteger; - public class VerifyNodeRepositoryAction extends AbstractComponent { public static final String ACTION_NAME = "internal:admin/repository/verify"; @@ -82,7 +81,7 @@ public class VerifyNodeRepositoryAction extends AbstractComponent { for (final DiscoveryNode node : nodes) { if (node.equals(localNode)) { try { - doVerify(repository, verificationToken); + doVerify(repository, verificationToken, localNode); } catch (Exception e) { logger.warn("[{}] failed to verify repository", e, repository); errors.add(new VerificationFailure(node.getId(), e)); @@ -115,9 +114,9 @@ public class VerifyNodeRepositoryAction extends AbstractComponent { listener.onResponse(new RepositoriesService.VerifyResponse(nodes.toArray(new DiscoveryNode[nodes.size()]), errors.toArray(new VerificationFailure[errors.size()]))); } - private void doVerify(String repository, String verificationToken) { - IndexShardRepository blobStoreIndexShardRepository = repositoriesService.indexShardRepository(repository); - blobStoreIndexShardRepository.verify(verificationToken); + private void doVerify(String repositoryName, String verificationToken, DiscoveryNode localNode) { + Repository repository = repositoriesService.repository(repositoryName); + repository.verify(verificationToken, localNode); } public static class VerifyNodeRepositoryRequest extends TransportRequest { @@ -151,8 +150,9 @@ public class VerifyNodeRepositoryAction extends AbstractComponent { class VerifyNodeRepositoryRequestHandler implements TransportRequestHandler { @Override public void messageReceived(VerifyNodeRepositoryRequest request, TransportChannel channel) throws Exception { + DiscoveryNode localNode = clusterService.state().nodes().getLocalNode(); try { - doVerify(request.repository, request.verificationToken); + doVerify(request.repository, request.verificationToken, localNode); } catch (Exception ex) { logger.warn("[{}] failed to verify repository", ex, request.repository); throw ex; diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index b1f1ff66cf5..4c328872742 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -19,12 +19,43 @@ package org.elasticsearch.repositories.blobstore; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimiter; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefBuilder; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Numbers; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.store.InputStreamIndexInput; +import org.elasticsearch.common.util.iterable.Iterables; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; +import org.elasticsearch.index.snapshots.IndexShardSnapshotException; +import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots; +import org.elasticsearch.index.snapshots.blobstore.RateLimitingInputStream; +import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; +import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.Strings; @@ -50,9 +81,6 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.snapshots.IndexShardRepository; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository.RateLimiterListener; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositorySettings; @@ -64,15 +92,21 @@ import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotShardFailure; import java.io.FileNotFoundException; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static java.util.Collections.emptyMap; +import static java.util.Collections.unmodifiableMap; + /** * BlobStore - based implementation of Snapshot Repository *

@@ -116,19 +150,19 @@ import java.util.stream.Collectors; * } * */ -public abstract class BlobStoreRepository extends AbstractLifecycleComponent implements Repository, RateLimiterListener { +public abstract class BlobStoreRepository extends AbstractLifecycleComponent implements Repository { private BlobContainer snapshotsBlobContainer; protected final String repositoryName; + private static final int BUFFER_SIZE = 4096; + private static final String LEGACY_SNAPSHOT_PREFIX = "snapshot-"; private static final String SNAPSHOT_PREFIX = "snap-"; - private static final String SNAPSHOT_SUFFIX = ".dat"; - - private static final String SNAPSHOT_CODEC = "snapshot"; + protected static final String SNAPSHOT_CODEC = "snapshot"; static final String SNAPSHOTS_FILE = "index"; // package private for unit testing @@ -146,11 +180,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private static final String INDEX_METADATA_CODEC = "index-metadata"; - private static final String SNAPSHOT_NAME_FORMAT = SNAPSHOT_PREFIX + "%s" + SNAPSHOT_SUFFIX; + protected static final String LEGACY_SNAPSHOT_NAME_FORMAT = LEGACY_SNAPSHOT_PREFIX + "%s"; - private static final String LEGACY_SNAPSHOT_NAME_FORMAT = LEGACY_SNAPSHOT_PREFIX + "%s"; + protected static final String SNAPSHOT_NAME_FORMAT = SNAPSHOT_PREFIX + "%s.dat"; - private final BlobStoreIndexShardRepository indexShardRepository; + protected static final String SNAPSHOT_INDEX_PREFIX = "index-"; + + protected static final String SNAPSHOT_INDEX_NAME_FORMAT = SNAPSHOT_INDEX_PREFIX + "%s"; + + protected static final String SNAPSHOT_INDEX_CODEC = "snapshots"; + + protected static final String DATA_BLOB_PREFIX = "__"; private final RateLimiter snapshotRateLimiter; @@ -174,29 +214,36 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final boolean readOnly; + private final ParseFieldMatcher parseFieldMatcher; + + private final ChecksumBlobStoreFormat indexShardSnapshotFormat; + + private final LegacyBlobStoreFormat indexShardSnapshotLegacyFormat; + + private final ChecksumBlobStoreFormat indexShardSnapshotsFormat; + /** * Constructs new BlobStoreRepository * * @param repositoryName repository name * @param repositorySettings repository settings - * @param indexShardRepository an instance of IndexShardRepository */ - protected BlobStoreRepository(String repositoryName, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) { + protected BlobStoreRepository(String repositoryName, RepositorySettings repositorySettings) { super(repositorySettings.globalSettings()); this.repositoryName = repositoryName; - this.indexShardRepository = (BlobStoreIndexShardRepository) indexShardRepository; + parseFieldMatcher = new ParseFieldMatcher(settings); snapshotRateLimiter = getRateLimiter(repositorySettings, "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); restoreRateLimiter = getRateLimiter(repositorySettings, "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); readOnly = repositorySettings.settings().getAsBoolean("readonly", false); + indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress()); + indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher); + indexShardSnapshotsFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_INDEX_CODEC, SNAPSHOT_INDEX_NAME_FORMAT, BlobStoreIndexShardSnapshots.PROTO, parseFieldMatcher, isCompress()); + } - /** - * {@inheritDoc} - */ @Override protected void doStart() { this.snapshotsBlobContainer = blobStore().blobContainer(basePath()); - indexShardRepository.initialize(blobStore(), basePath(), chunkSize(), snapshotRateLimiter, restoreRateLimiter, this, isCompress()); ParseFieldMatcher parseFieldMatcher = new ParseFieldMatcher(settings); globalMetaDataFormat = new ChecksumBlobStoreFormat<>(METADATA_CODEC, METADATA_NAME_FORMAT, MetaData.PROTO, parseFieldMatcher, isCompress()); @@ -209,16 +256,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp snapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, SnapshotInfo.PROTO, parseFieldMatcher); } - /** - * {@inheritDoc} - */ @Override - protected void doStop() { - } + protected void doStop() {} - /** - * {@inheritDoc} - */ @Override protected void doClose() { try { @@ -229,18 +269,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } /** - * Returns initialized and ready to use BlobStore - *

- * This method is first called in the {@link #doStart()} method. - * - * @return blob store + * Returns the BlobStore to read and write data. */ - protected abstract BlobStore blobStore(); + protected abstract BlobStore blobStore(); /** * Returns base path of the repository */ - protected abstract BlobPath basePath(); + protected abstract BlobPath basePath(); /** * Returns true if metadata and snapshot files should be compressed @@ -262,9 +298,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp return null; } - /** - * {@inheritDoc} - */ @Override public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { if (readOnly()) { @@ -293,9 +326,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } - /** - * {@inheritDoc} - */ @Override public void deleteSnapshot(SnapshotId snapshotId) { if (readOnly()) { @@ -352,7 +382,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp if (indexMetaData != null) { for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { try { - indexShardRepository.delete(snapshotId, snapshot.version(), new ShardId(indexMetaData.getIndex(), shardId)); + delete(snapshotId, snapshot.version(), new ShardId(indexMetaData.getIndex(), shardId)); } catch (SnapshotException ex) { logger.warn("[{}] failed to delete shard data for shard [{}][{}]", ex, snapshotId, index, shardId); } @@ -365,9 +395,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } - /** - * {@inheritDoc} - */ @Override public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId, final List indices, @@ -397,9 +424,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } - /** - * {@inheritDoc} - */ @Override public List snapshots() { try { @@ -412,9 +436,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } - /** - * {@inheritDoc} - */ @Override public MetaData readSnapshotMetaData(SnapshotInfo snapshot, List indices) throws IOException { return readSnapshotMetaData(snapshot.snapshotId(), snapshot.version(), indices, false); @@ -539,16 +560,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private static final String NAME = "name"; private static final String UUID = "uuid"; - @Override - public void onRestorePause(long nanos) { - restoreRateLimitingTimeInNanos.inc(nanos); - } - - @Override - public void onSnapshotPause(long nanos) { - snapshotRateLimitingTimeInNanos.inc(nanos); - } - @Override public long snapshotThrottleTimeInNanos() { return snapshotRateLimitingTimeInNanos.count(); @@ -785,4 +796,782 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } + @Override + public void snapshot(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { + SnapshotContext snapshotContext = new SnapshotContext(shard, snapshotId, snapshotStatus); + snapshotStatus.startTime(System.currentTimeMillis()); + + try { + snapshotContext.snapshot(snapshotIndexCommit); + snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime()); + snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE); + } catch (Exception e) { + snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime()); + snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FAILURE); + snapshotStatus.failure(ExceptionsHelper.detailedMessage(e)); + if (e instanceof IndexShardSnapshotFailedException) { + throw (IndexShardSnapshotFailedException) e; + } else { + throw new IndexShardSnapshotFailedException(shard.shardId(), e); + } + } + } + + @Override + public void restore(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) { + final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, version, snapshotShardId, recoveryState); + try { + snapshotContext.restore(); + } catch (Exception e) { + throw new IndexShardRestoreFailedException(shard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e); + } + } + + @Override + public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) { + Context context = new Context(snapshotId, version, shardId); + BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot(); + IndexShardSnapshotStatus status = new IndexShardSnapshotStatus(); + status.updateStage(IndexShardSnapshotStatus.Stage.DONE); + status.startTime(snapshot.startTime()); + status.files(snapshot.numberOfFiles(), snapshot.totalSize()); + // The snapshot is done which means the number of processed files is the same as total + status.processedFiles(snapshot.numberOfFiles(), snapshot.totalSize()); + status.time(snapshot.time()); + return status; + } + + @Override + public void verify(String seed, DiscoveryNode localNode) { + BlobContainer testBlobContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed))); + if (testBlobContainer.blobExists("master.dat")) { + try { + testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", new BytesArray(seed)); + } catch (IOException exp) { + throw new RepositoryVerificationException(repositoryName, "store location [" + blobStore() + "] is not accessible on the node [" + localNode + "]", exp); + } + } else { + throw new RepositoryVerificationException(repositoryName, "a file written by master to the store [" + blobStore() + "] cannot be accessed on the node [" + localNode + "]. " + + "This might indicate that the store [" + blobStore() + "] is not shared between this node and the master node or " + + "that permissions on the store don't allow reading files written by the master node"); + } + } + + /** + * Delete shard snapshot + * + * @param snapshotId snapshot id + * @param shardId shard id + */ + public void delete(SnapshotId snapshotId, Version version, ShardId shardId) { + Context context = new Context(snapshotId, version, shardId, shardId); + context.delete(); + } + + @Override + public String toString() { + return "BlobStoreRepository[" + + "[" + repositoryName + + "], [" + blobStore() + ']' + + ']'; + } + + BlobStoreFormat indexShardSnapshotFormat(Version version) { + if (BlobStoreRepository.legacyMetaData(version)) { + return indexShardSnapshotLegacyFormat; + } else { + return indexShardSnapshotFormat; + } + } + + /** + * Context for snapshot/restore operations + */ + private class Context { + + protected final SnapshotId snapshotId; + + protected final ShardId shardId; + + protected final BlobContainer blobContainer; + + protected final Version version; + + public Context(SnapshotId snapshotId, Version version, ShardId shardId) { + this(snapshotId, version, shardId, shardId); + } + + public Context(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId) { + this.snapshotId = snapshotId; + this.version = version; + this.shardId = shardId; + blobContainer = blobStore().blobContainer(basePath().add("indices").add(snapshotShardId.getIndexName()).add(Integer.toString(snapshotShardId.getId()))); + } + + /** + * Delete shard snapshot + */ + public void delete() { + final Map blobs; + try { + blobs = blobContainer.listBlobs(); + } catch (IOException e) { + throw new IndexShardSnapshotException(shardId, "Failed to list content of gateway", e); + } + + Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); + BlobStoreIndexShardSnapshots snapshots = tuple.v1(); + int fileListGeneration = tuple.v2(); + + try { + indexShardSnapshotFormat(version).delete(blobContainer, snapshotId.getName()); + } catch (IOException e) { + logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId); + } + + // Build a list of snapshots that should be preserved + List newSnapshotsList = new ArrayList<>(); + for (SnapshotFiles point : snapshots) { + if (!point.snapshot().equals(snapshotId.getName())) { + newSnapshotsList.add(point); + } + } + // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index + finalize(newSnapshotsList, fileListGeneration + 1, blobs); + } + + /** + * Loads information about shard snapshot + */ + public BlobStoreIndexShardSnapshot loadSnapshot() { + try { + return indexShardSnapshotFormat(version).read(blobContainer, snapshotId.getName()); + } catch (IOException ex) { + throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex); + } + } + + /** + * Removes all unreferenced files from the repository and writes new index file + * + * We need to be really careful in handling index files in case of failures to make sure we have index file that + * points to files that were deleted. + * + * + * @param snapshots list of active snapshots in the container + * @param fileListGeneration the generation number of the snapshot index file + * @param blobs list of blobs in the container + */ + protected void finalize(List snapshots, int fileListGeneration, Map blobs) { + BlobStoreIndexShardSnapshots newSnapshots = new BlobStoreIndexShardSnapshots(snapshots); + List blobsToDelete = new ArrayList<>(); + // delete old index files first + for (String blobName : blobs.keySet()) { + // delete old file lists + if (indexShardSnapshotsFormat.isTempBlobName(blobName) || blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) { + blobsToDelete.add(blobName); + } + } + + try { + blobContainer.deleteBlobs(blobsToDelete); + } catch (IOException e) { + // We cannot delete index file - this is fatal, we cannot continue, otherwise we might end up + // with references to non-existing files + throw new IndexShardSnapshotFailedException(shardId, "error deleting index files during cleanup", e); + } + + blobsToDelete = new ArrayList<>(); + // now go over all the blobs, and if they don't exists in a snapshot, delete them + for (String blobName : blobs.keySet()) { + // delete unused files + if (blobName.startsWith(DATA_BLOB_PREFIX)) { + if (newSnapshots.findNameFile(BlobStoreIndexShardSnapshot.FileInfo.canonicalName(blobName)) == null) { + blobsToDelete.add(blobName); + } + } + } + try { + blobContainer.deleteBlobs(blobsToDelete); + } catch (IOException e) { + logger.debug("[{}] [{}] error deleting some of the blobs [{}] during cleanup", e, snapshotId, shardId, blobsToDelete); + } + + // If we deleted all snapshots - we don't need to create the index file + if (snapshots.size() > 0) { + try { + indexShardSnapshotsFormat.writeAtomic(newSnapshots, blobContainer, Integer.toString(fileListGeneration)); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to write file list", e); + } + } + } + + /** + * Generates blob name + * + * @param generation the blob number + * @return the blob name + */ + protected String fileNameFromGeneration(long generation) { + return DATA_BLOB_PREFIX + Long.toString(generation, Character.MAX_RADIX); + } + + /** + * Finds the next available blob number + * + * @param blobs list of blobs in the repository + * @return next available blob number + */ + protected long findLatestFileNameGeneration(Map blobs) { + long generation = -1; + for (String name : blobs.keySet()) { + if (!name.startsWith(DATA_BLOB_PREFIX)) { + continue; + } + name = BlobStoreIndexShardSnapshot.FileInfo.canonicalName(name); + try { + long currentGen = Long.parseLong(name.substring(DATA_BLOB_PREFIX.length()), Character.MAX_RADIX); + if (currentGen > generation) { + generation = currentGen; + } + } catch (NumberFormatException e) { + logger.warn("file [{}] does not conform to the '{}' schema", name, DATA_BLOB_PREFIX); + } + } + return generation; + } + + /** + * Loads all available snapshots in the repository + * + * @param blobs list of blobs in repository + * @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation + */ + protected Tuple buildBlobStoreIndexShardSnapshots(Map blobs) { + int latest = -1; + for (String name : blobs.keySet()) { + if (name.startsWith(SNAPSHOT_INDEX_PREFIX)) { + try { + int gen = Integer.parseInt(name.substring(SNAPSHOT_INDEX_PREFIX.length())); + if (gen > latest) { + latest = gen; + } + } catch (NumberFormatException ex) { + logger.warn("failed to parse index file name [{}]", name); + } + } + } + if (latest >= 0) { + try { + final BlobStoreIndexShardSnapshots shardSnapshots = + indexShardSnapshotsFormat.read(blobContainer, Integer.toString(latest)); + return new Tuple<>(shardSnapshots, latest); + } catch (IOException e) { + logger.warn("failed to read index file [{}]", e, SNAPSHOT_INDEX_PREFIX + latest); + } + } + + // We couldn't load the index file - falling back to loading individual snapshots + List snapshots = new ArrayList<>(); + for (String name : blobs.keySet()) { + try { + BlobStoreIndexShardSnapshot snapshot = null; + if (name.startsWith(SNAPSHOT_PREFIX)) { + snapshot = indexShardSnapshotFormat.readBlob(blobContainer, name); + } else if (name.startsWith(LEGACY_SNAPSHOT_PREFIX)) { + snapshot = indexShardSnapshotLegacyFormat.readBlob(blobContainer, name); + } + if (snapshot != null) { + snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); + } + } catch (IOException e) { + logger.warn("failed to read commit point [{}]", e, name); + } + } + return new Tuple<>(new BlobStoreIndexShardSnapshots(snapshots), -1); + } + } + + /** + * Context for snapshot operations + */ + private class SnapshotContext extends Context { + + private final Store store; + + private final IndexShardSnapshotStatus snapshotStatus; + + /** + * Constructs new context + * + * @param shard shard to be snapshotted + * @param snapshotId snapshot id + * @param snapshotStatus snapshot status to report progress + */ + public SnapshotContext(IndexShard shard, SnapshotId snapshotId, IndexShardSnapshotStatus snapshotStatus) { + super(snapshotId, Version.CURRENT, shard.shardId()); + this.snapshotStatus = snapshotStatus; + this.store = shard.store(); + } + + /** + * Create snapshot from index commit point + * + * @param snapshotIndexCommit snapshot commit point + */ + public void snapshot(IndexCommit snapshotIndexCommit) { + logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, repositoryName); + store.incRef(); + try { + final Map blobs; + try { + blobs = blobContainer.listBlobs(); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e); + } + + long generation = findLatestFileNameGeneration(blobs); + Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); + BlobStoreIndexShardSnapshots snapshots = tuple.v1(); + int fileListGeneration = tuple.v2(); + + final List indexCommitPointFiles = new ArrayList<>(); + + int indexNumberOfFiles = 0; + long indexTotalFilesSize = 0; + ArrayList filesToSnapshot = new ArrayList<>(); + final Store.MetadataSnapshot metadata; + // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should + final Collection fileNames; + try { + metadata = store.getMetadata(snapshotIndexCommit); + fileNames = snapshotIndexCommit.getFileNames(); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); + } + for (String fileName : fileNames) { + if (snapshotStatus.aborted()) { + logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); + throw new IndexShardSnapshotFailedException(shardId, "Aborted"); + } + logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName); + final StoreFileMetaData md = metadata.get(fileName); + BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null; + List filesInfo = snapshots.findPhysicalIndexFiles(fileName); + if (filesInfo != null) { + for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) { + try { + // in 1.3.3 we added additional hashes for .si / segments_N files + // to ensure we don't double the space in the repo since old snapshots + // don't have this hash we try to read that hash from the blob store + // in a bwc compatible way. + maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata); + } catch (Exception e) { + logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); + } + if (fileInfo.isSame(md) && snapshotFileExistsInBlobs(fileInfo, blobs)) { + // a commit point file with the same name, size and checksum was already copied to repository + // we will reuse it for this snapshot + existingFileInfo = fileInfo; + break; + } + } + } + if (existingFileInfo == null) { + indexNumberOfFiles++; + indexTotalFilesSize += md.length(); + // create a new FileInfo + BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = new BlobStoreIndexShardSnapshot.FileInfo(fileNameFromGeneration(++generation), md, chunkSize()); + indexCommitPointFiles.add(snapshotFileInfo); + filesToSnapshot.add(snapshotFileInfo); + } else { + indexCommitPointFiles.add(existingFileInfo); + } + } + + snapshotStatus.files(indexNumberOfFiles, indexTotalFilesSize); + + if (snapshotStatus.aborted()) { + logger.debug("[{}] [{}] Aborted during initialization", shardId, snapshotId); + throw new IndexShardSnapshotFailedException(shardId, "Aborted"); + } + + snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.STARTED); + + for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { + try { + snapshotFile(snapshotFileInfo); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); + } + } + + snapshotStatus.indexVersion(snapshotIndexCommit.getGeneration()); + // now create and write the commit point + snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE); + + BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), + snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(), + // snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong + System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize); + //TODO: The time stored in snapshot doesn't include cleanup time. + logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); + try { + indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getName()); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); + } + + // delete all files that are not referenced by any commit point + // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones + List newSnapshotsList = new ArrayList<>(); + newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); + for (SnapshotFiles point : snapshots) { + newSnapshotsList.add(point); + } + // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index + finalize(newSnapshotsList, fileListGeneration + 1, blobs); + snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE); + } finally { + store.decRef(); + } + } + + /** + * Snapshot individual file + *

+ * This is asynchronous method. Upon completion of the operation latch is getting counted down and any failures are + * added to the {@code failures} list + * + * @param fileInfo file to be snapshotted + */ + private void snapshotFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) throws IOException { + final String file = fileInfo.physicalName(); + try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) { + for (int i = 0; i < fileInfo.numberOfParts(); i++) { + final long partBytes = fileInfo.partBytes(i); + + final InputStreamIndexInput inputStreamIndexInput = new InputStreamIndexInput(indexInput, partBytes); + InputStream inputStream = inputStreamIndexInput; + if (snapshotRateLimiter != null) { + inputStream = new RateLimitingInputStream(inputStreamIndexInput, snapshotRateLimiter, + snapshotRateLimitingTimeInNanos::inc); + } + inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName()); + blobContainer.writeBlob(fileInfo.partName(i), inputStream, partBytes); + } + Store.verify(indexInput); + snapshotStatus.addProcessedFile(fileInfo.length()); + } catch (Exception t) { + failStoreIfCorrupted(t); + snapshotStatus.addProcessedFile(0); + throw t; + } + } + + private void failStoreIfCorrupted(Exception e) { + if (e instanceof CorruptIndexException || e instanceof IndexFormatTooOldException || e instanceof IndexFormatTooNewException) { + try { + store.markStoreCorrupted((IOException) e); + } catch (IOException inner) { + inner.addSuppressed(e); + logger.warn("store cannot be marked as corrupted", inner); + } + } + } + + /** + * Checks if snapshot file already exists in the list of blobs + * + * @param fileInfo file to check + * @param blobs list of blobs + * @return true if file exists in the list of blobs + */ + private boolean snapshotFileExistsInBlobs(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Map blobs) { + BlobMetaData blobMetaData = blobs.get(fileInfo.name()); + if (blobMetaData != null) { + return blobMetaData.length() == fileInfo.length(); + } else if (blobs.containsKey(fileInfo.partName(0))) { + // multi part file sum up the size and check + int part = 0; + long totalSize = 0; + while (true) { + blobMetaData = blobs.get(fileInfo.partName(part++)); + if (blobMetaData == null) { + break; + } + totalSize += blobMetaData.length(); + } + return totalSize == fileInfo.length(); + } + // no file, not exact and not multipart + return false; + } + + private class AbortableInputStream extends FilterInputStream { + private final String fileName; + + public AbortableInputStream(InputStream delegate, String fileName) { + super(delegate); + this.fileName = fileName; + } + + @Override + public int read() throws IOException { + checkAborted(); + return in.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + checkAborted(); + return in.read(b, off, len); + } + + private void checkAborted() { + if (snapshotStatus.aborted()) { + logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); + throw new IndexShardSnapshotFailedException(shardId, "Aborted"); + } + } + } + } + + /** + * This is a BWC layer to ensure we update the snapshots metadata with the corresponding hashes before we compare them. + * The new logic for StoreFileMetaData reads the entire .si and segments.n files to strengthen the + * comparison of the files on a per-segment / per-commit level. + */ + private static void maybeRecalculateMetadataHash(final BlobContainer blobContainer, final BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store.MetadataSnapshot snapshot) throws Exception { + final StoreFileMetaData metadata; + if (fileInfo != null && (metadata = snapshot.get(fileInfo.physicalName())) != null) { + if (metadata.hash().length > 0 && fileInfo.metadata().hash().length == 0) { + // we have a hash - check if our repo has a hash too otherwise we have + // to calculate it. + // we might have multiple parts even though the file is small... make sure we read all of it. + try (final InputStream stream = new PartSliceStream(blobContainer, fileInfo)) { + BytesRefBuilder builder = new BytesRefBuilder(); + Store.MetadataSnapshot.hashFile(builder, stream, fileInfo.length()); + BytesRef hash = fileInfo.metadata().hash(); // reset the file infos metadata hash + assert hash.length == 0; + hash.bytes = builder.bytes(); + hash.offset = 0; + hash.length = builder.length(); + } + } + } + } + + private static final class PartSliceStream extends SlicedInputStream { + + private final BlobContainer container; + private final BlobStoreIndexShardSnapshot.FileInfo info; + + public PartSliceStream(BlobContainer container, BlobStoreIndexShardSnapshot.FileInfo info) { + super(info.numberOfParts()); + this.info = info; + this.container = container; + } + + @Override + protected InputStream openSlice(long slice) throws IOException { + return container.readBlob(info.partName(slice)); + } + } + + /** + * Context for restore operations + */ + private class RestoreContext extends Context { + + private final Store store; + + private final RecoveryState recoveryState; + + /** + * Constructs new restore context + * + * @param shard shard to restore into + * @param snapshotId snapshot id + * @param snapshotShardId shard in the snapshot that data should be restored from + * @param recoveryState recovery state to report progress + */ + public RestoreContext(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) { + super(snapshotId, version, shard.shardId(), snapshotShardId); + this.recoveryState = recoveryState; + store = shard.store(); + } + + /** + * Performs restore operation + */ + public void restore() throws IOException { + store.incRef(); + try { + logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId); + BlobStoreIndexShardSnapshot snapshot = loadSnapshot(); + + if (snapshot.indexFiles().size() == 1 + && snapshot.indexFiles().get(0).physicalName().startsWith("segments_") + && snapshot.indexFiles().get(0).hasUnknownChecksum()) { + // If the shard has no documents, it will only contain a single segments_N file for the + // shard's snapshot. If we are restoring a snapshot created by a previous supported version, + // it is still possible that in that version, an empty shard has a segments_N file with an unsupported + // version (and no checksum), because we don't know the Lucene version to assign segments_N until we + // have written some data. Since the segments_N for an empty shard could have an incompatible Lucene + // version number and no checksum, even though the index itself is perfectly fine to restore, this + // empty shard would cause exceptions to be thrown. Since there is no data to restore from an empty + // shard anyway, we just create the empty shard here and then exit. + IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(null) + .setOpenMode(IndexWriterConfig.OpenMode.CREATE) + .setCommitOnClose(true)); + writer.close(); + return; + } + + SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); + final Store.MetadataSnapshot recoveryTargetMetadata; + try { + recoveryTargetMetadata = store.getMetadataOrEmpty(); + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) { + logger.warn("{} Can't read metadata from store", e, shardId); + throw new IndexShardRestoreFailedException(shardId, "Can't restore corrupted shard", e); + } + + final List filesToRecover = new ArrayList<>(); + final Map snapshotMetaData = new HashMap<>(); + final Map fileInfos = new HashMap<>(); + for (final BlobStoreIndexShardSnapshot.FileInfo fileInfo : snapshot.indexFiles()) { + try { + // in 1.3.3 we added additional hashes for .si / segments_N files + // to ensure we don't double the space in the repo since old snapshots + // don't have this hash we try to read that hash from the blob store + // in a bwc compatible way. + maybeRecalculateMetadataHash(blobContainer, fileInfo, recoveryTargetMetadata); + } catch (Exception e) { + // if the index is broken we might not be able to read it + logger.warn("{} Can't calculate hash from blog for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); + } + snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata()); + fileInfos.put(fileInfo.metadata().name(), fileInfo); + } + final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(unmodifiableMap(snapshotMetaData), emptyMap(), 0); + final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata); + for (StoreFileMetaData md : diff.identical) { + BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name()); + recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), true); + if (logger.isTraceEnabled()) { + logger.trace("[{}] [{}] not_recovering [{}] from [{}], exists in local store and is same", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name()); + } + } + + for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) { + BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name()); + filesToRecover.add(fileInfo); + recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), false); + if (logger.isTraceEnabled()) { + if (md == null) { + logger.trace("[{}] [{}] recovering [{}] from [{}], does not exists in local store", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name()); + } else { + logger.trace("[{}] [{}] recovering [{}] from [{}], exists in local store but is different", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name()); + } + } + } + final RecoveryState.Index index = recoveryState.getIndex(); + if (filesToRecover.isEmpty()) { + logger.trace("no files to recover, all exists within the local store"); + } + + if (logger.isTraceEnabled()) { + logger.trace("[{}] [{}] recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", shardId, snapshotId, + index.totalRecoverFiles(), new ByteSizeValue(index.totalRecoverBytes()), index.reusedFileCount(), new ByteSizeValue(index.reusedFileCount())); + } + try { + for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { + logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); + restoreFile(fileToRecover); + } + } catch (IOException ex) { + throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex); + } + final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile(); + if (recoveryTargetMetadata == null) { + throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file"); + } + assert restoredSegmentsFile != null; + // read the snapshot data persisted + final SegmentInfos segmentCommitInfos; + try { + segmentCommitInfos = Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory()); + } catch (IOException e) { + throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); + } + recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion()); + + /// now, go over and clean files that are in the store, but were not in the snapshot + try { + for (String storeFile : store.directory().listAll()) { + if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) { + continue; //skip write.lock, checksum files and files that exist in the snapshot + } + try { + store.deleteQuiet("restore", storeFile); + store.directory().deleteFile(storeFile); + } catch (IOException e) { + logger.warn("[{}] failed to delete file [{}] during snapshot cleanup", snapshotId, storeFile); + } + } + } catch (IOException e) { + logger.warn("[{}] failed to list directory - some of files might not be deleted", snapshotId); + } + } finally { + store.decRef(); + } + } + + /** + * Restores a file + * This is asynchronous method. Upon completion of the operation latch is getting counted down and any failures are + * added to the {@code failures} list + * + * @param fileInfo file to be restored + */ + private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) throws IOException { + boolean success = false; + + try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) { + final InputStream stream; + if (restoreRateLimiter == null) { + stream = partSliceStream; + } else { + stream = new RateLimitingInputStream(partSliceStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc); + } + try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { + final byte[] buffer = new byte[BUFFER_SIZE]; + int length; + while ((length = stream.read(buffer)) > 0) { + indexOutput.writeBytes(buffer, 0, length); + recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length); + } + Store.verify(indexOutput); + indexOutput.close(); + store.directory().sync(Collections.singleton(fileInfo.physicalName())); + success = true; + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { + try { + store.markStoreCorrupted(ex); + } catch (IOException e) { + logger.warn("store cannot be marked as corrupted", e); + } + throw ex; + } finally { + if (success == false) { + store.deleteQuiet(fileInfo.physicalName()); + } + } + } + } + + } } diff --git a/core/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java b/core/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java index 4e57c431f14..5be7115a25b 100644 --- a/core/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.env.Environment; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositorySettings; @@ -77,11 +76,10 @@ public class FsRepository extends BlobStoreRepository { * * @param name repository name * @param repositorySettings repository settings - * @param indexShardRepository index shard repository */ @Inject - public FsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, Environment environment) throws IOException { - super(name.getName(), repositorySettings, indexShardRepository); + public FsRepository(RepositoryName name, RepositorySettings repositorySettings, Environment environment) throws IOException { + super(name.getName(), repositorySettings); Path locationFile; String location = REPOSITORIES_LOCATION_SETTING.get(repositorySettings.settings()); if (location.isEmpty()) { diff --git a/core/src/main/java/org/elasticsearch/repositories/uri/URLIndexShardRepository.java b/core/src/main/java/org/elasticsearch/repositories/uri/URLIndexShardRepository.java deleted file mode 100644 index 616a36d5066..00000000000 --- a/core/src/main/java/org/elasticsearch/repositories/uri/URLIndexShardRepository.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.repositories.uri; - -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.repositories.RepositoryName; - -/** - */ -public class URLIndexShardRepository extends BlobStoreIndexShardRepository { - - @Inject - public URLIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) { - super(settings, repositoryName, indicesService, clusterService); - } - - - @Override - public void verify(String seed) { - //TODO: Add verification that URL is accessible - } -} diff --git a/core/src/main/java/org/elasticsearch/repositories/uri/URLRepository.java b/core/src/main/java/org/elasticsearch/repositories/uri/URLRepository.java index 0eb3006c5c3..f08faa2c0d6 100644 --- a/core/src/main/java/org/elasticsearch/repositories/uri/URLRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/uri/URLRepository.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.util.URIPattern; import org.elasticsearch.env.Environment; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositorySettings; @@ -82,11 +81,10 @@ public class URLRepository extends BlobStoreRepository { * * @param name repository name * @param repositorySettings repository settings - * @param indexShardRepository shard repository */ @Inject - public URLRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, Environment environment) throws IOException { - super(name.getName(), repositorySettings, indexShardRepository); + public URLRepository(RepositoryName name, RepositorySettings repositorySettings, Environment environment) throws IOException { + super(name.getName(), repositorySettings); if (URL_SETTING.exists(repositorySettings.settings()) == false && REPOSITORIES_URL_SETTING.exists(settings) == false) { throw new RepositoryException(name.name(), "missing url"); @@ -101,9 +99,6 @@ public class URLRepository extends BlobStoreRepository { basePath = BlobPath.cleanPath(); } - /** - * {@inheritDoc} - */ @Override protected BlobStore blobStore() { return blobStore; diff --git a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 53fb6f4781e..67a5ae361bb 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -63,7 +63,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.threadpool.ThreadPool; @@ -112,7 +111,7 @@ import static org.elasticsearch.common.util.set.Sets.newHashSet; * method. *

* Individual shards are getting restored as part of normal recovery process in - * {@link IndexShard#restoreFromRepository(IndexShardRepository)} )} + * {@link IndexShard#restoreFromRepository(Repository)} )} * method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking * at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property. *

diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 91db508b7e3..b24627bb634 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -43,10 +43,10 @@ import org.elasticsearch.index.engine.SnapshotFailedEngineException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.repositories.Repository; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.TransportChannel; @@ -322,7 +322,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements * @param snapshotStatus snapshot status */ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexShardSnapshotStatus snapshotStatus) { - IndexShardRepository indexShardRepository = snapshotsService.getRepositoriesService().indexShardRepository(snapshot.getRepository()); + Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository()); ShardId shardId = indexShard.shardId(); if (!indexShard.routingEntry().primary()) { throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary"); @@ -340,11 +340,11 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements // we flush first to make sure we get the latest writes snapshotted IndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true); try { - indexShardRepository.snapshot(snapshot.getSnapshotId(), shardId, snapshotIndexCommit, snapshotStatus); + repository.snapshot(indexShard, snapshot.getSnapshotId(), snapshotIndexCommit, snapshotStatus); if (logger.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n"); - logger.debug("snapshot ({}) completed to {}, took [{}]\n{}", snapshot, indexShardRepository, + logger.debug("snapshot ({}) completed to {}, took [{}]\n{}", snapshot, repository, TimeValue.timeValueMillis(snapshotStatus.time()), sb); } } finally { diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 5f5ac65e38e..8d2407dd2e4 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -55,7 +55,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -547,7 +546,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus final SnapshotInfo snapshotInfo) throws IOException { Map shardStatus = new HashMap<>(); Repository repository = repositoriesService.repository(repositoryName); - IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(repositoryName); MetaData metaData = repository.readSnapshotMetaData(snapshotInfo, snapshotInfo.indices()); for (String index : snapshotInfo.indices()) { IndexMetaData indexMetaData = metaData.indices().get(index); @@ -563,7 +561,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus shardStatus.put(shardId, shardSnapshotStatus); } else { IndexShardSnapshotStatus shardSnapshotStatus = - indexShardRepository.snapshotStatus(snapshotInfo.snapshotId(), snapshotInfo.version(), shardId); + repository.snapshotStatus(snapshotInfo.snapshotId(), snapshotInfo.version(), shardId); shardStatus.put(shardId, shardSnapshotStatus); } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index a0813fb572f..acec53da24b 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -18,6 +18,27 @@ */ package org.elasticsearch.index.shard; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; + import org.apache.lucene.document.Field; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.CorruptIndexException; @@ -50,6 +71,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.RestoreSource; @@ -63,6 +85,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; @@ -91,15 +114,17 @@ import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.internal.UidFieldMapper; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.Repository; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotShardFailure; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.FieldMaskingReader; @@ -108,27 +133,6 @@ import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.ThreadPool; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; - import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; @@ -1178,13 +1182,9 @@ public class IndexShardTests extends ESSingleNodeTestCase { test_target_shard.updateRoutingEntry(routing); DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); test_target_shard.markAsRecovering("store", new RecoveryState(routing.shardId(), routing.primary(), RecoveryState.Type.SNAPSHOT, routing.restoreSource(), localNode)); - assertTrue(test_target_shard.restoreFromRepository(new IndexShardRepository() { + assertTrue(test_target_shard.restoreFromRepository(new RestoreOnlyRepository() { @Override - public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { - } - - @Override - public void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) { + public void restore(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) { try { cleanLuceneIndex(targetStore.directory()); for (String file : sourceStore.directory().listAll()) { @@ -1197,15 +1197,6 @@ public class IndexShardTests extends ESSingleNodeTestCase { throw new RuntimeException(ex); } } - - @Override - public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) { - return null; - } - - @Override - public void verify(String verificationToken) { - } })); test_target_shard.updateRoutingEntry(routing.moveToStarted()); @@ -1649,4 +1640,63 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertEquals(mappings.get("index_1").get("test").get().source().string(), "{\"test\":{\"properties\":{\"foo\":{\"type\":\"text\"}}}}"); } + + /** A dummy repository for testing which just needs restore overridden */ + private static abstract class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository { + public RestoreOnlyRepository() { + super(Settings.EMPTY); + } + @Override + protected void doStart() {} + @Override + protected void doStop() {} + @Override + protected void doClose() {} + @Override + public SnapshotInfo readSnapshot(SnapshotId snapshotId) { + return null; + } + @Override + public MetaData readSnapshotMetaData(SnapshotInfo snapshot, List indices) throws IOException { + return null; + } + @Override + public List snapshots() { + return null; + } + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) {} + @Override + public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures) { + return null; + } + @Override + public void deleteSnapshot(SnapshotId snapshotId) {} + @Override + public long snapshotThrottleTimeInNanos() { + return 0; + } + @Override + public long restoreThrottleTimeInNanos() { + return 0; + } + @Override + public String startVerification() { + return null; + } + @Override + public void endVerification(String verificationToken) {} + @Override + public boolean readOnly() { + return false; + } + @Override + public void snapshot(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {} + @Override + public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) { + return null; + } + @Override + public void verify(String verificationToken, DiscoveryNode localNode) {} + } } diff --git a/core/src/test/java/org/elasticsearch/snapshots/RepositoriesIT.java b/core/src/test/java/org/elasticsearch/snapshots/RepositoriesIT.java index 48a1cc6081e..639b60d6d09 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/RepositoriesIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/RepositoriesIT.java @@ -44,8 +44,6 @@ import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; -/** - */ @ESIntegTestCase.ClusterScope(minNumDataNodes = 2) public class RepositoriesIT extends AbstractSnapshotIntegTestCase { public void testRepositoryCreation() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index bb6f5cc4f74..762be7a3684 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -19,29 +19,6 @@ package org.elasticsearch.snapshots.mockstore; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.blobstore.BlobMetaData; -import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.PathUtils; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Setting.Property; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.settings.SettingsModule; -import org.elasticsearch.env.Environment; -import org.elasticsearch.index.snapshots.IndexShardRepository; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; -import org.elasticsearch.repositories.RepositoriesModule; -import org.elasticsearch.repositories.RepositoryName; -import org.elasticsearch.repositories.RepositorySettings; -import org.elasticsearch.repositories.fs.FsRepository; - import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; @@ -55,6 +32,26 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.repositories.RepositoriesModule; +import org.elasticsearch.repositories.RepositoryName; +import org.elasticsearch.repositories.RepositorySettings; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.snapshots.SnapshotId; + public class MockRepository extends FsRepository { public static class Plugin extends org.elasticsearch.plugins.Plugin { @@ -64,7 +61,7 @@ public class MockRepository extends FsRepository { Setting.simpleString("secret.mock.password", Property.NodeScope, Property.Filtered); public void onModule(RepositoriesModule repositoriesModule) { - repositoriesModule.registerRepository("mock", MockRepository.class, BlobStoreIndexShardRepository.class); + repositoriesModule.registerRepository("mock", MockRepository.class); } @Override @@ -100,8 +97,8 @@ public class MockRepository extends FsRepository { private volatile boolean blocked = false; @Inject - public MockRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, ClusterService clusterService, Environment environment) throws IOException { - super(name, overrideSettings(repositorySettings, clusterService), indexShardRepository, environment); + public MockRepository(RepositoryName name, RepositorySettings repositorySettings, ClusterService clusterService, Environment environment) throws IOException { + super(name, overrideSettings(repositorySettings, clusterService), environment); randomControlIOExceptionRate = repositorySettings.settings().getAsDouble("random_control_io_exception_rate", 0.0); randomDataFileIOExceptionRate = repositorySettings.settings().getAsDouble("random_data_file_io_exception_rate", 0.0); maximumNumberOfFailures = repositorySettings.settings().getAsLong("max_failure_number", 100L); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/plugin/repository/azure/AzureRepositoryPlugin.java b/plugins/repository-azure/src/main/java/org/elasticsearch/plugin/repository/azure/AzureRepositoryPlugin.java index b04b613df21..e7701a828ac 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/plugin/repository/azure/AzureRepositoryPlugin.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/plugin/repository/azure/AzureRepositoryPlugin.java @@ -19,6 +19,11 @@ package org.elasticsearch.plugin.repository.azure; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + import org.elasticsearch.cloud.azure.AzureRepositoryModule; import org.elasticsearch.cloud.azure.storage.AzureStorageService; import org.elasticsearch.common.inject.Module; @@ -26,16 +31,10 @@ import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.RepositoriesModule; import org.elasticsearch.repositories.azure.AzureRepository; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - /** * */ @@ -56,7 +55,7 @@ public class AzureRepositoryPlugin extends Plugin { public void onModule(RepositoriesModule module) { logger.debug("registering repository type [{}]", AzureRepository.TYPE); - module.registerRepository(AzureRepository.TYPE, AzureRepository.class, BlobStoreIndexShardRepository.class); + module.registerRepository(AzureRepository.TYPE, AzureRepository.class); } @Override diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index 1333c755e7c..47ec4128c7c 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositorySettings; import org.elasticsearch.repositories.RepositoryVerificationException; @@ -86,9 +85,8 @@ public class AzureRepository extends BlobStoreRepository { @Inject public AzureRepository(RepositoryName name, RepositorySettings repositorySettings, - IndexShardRepository indexShardRepository, AzureBlobStore azureBlobStore) throws IOException, URISyntaxException, StorageException { - super(name.getName(), repositorySettings, indexShardRepository); + super(name.getName(), repositorySettings); String container = getValue(repositorySettings, Repository.CONTAINER_SETTING, Storage.CONTAINER_SETTING); diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/plugin/repository/gcs/GoogleCloudStoragePlugin.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/plugin/repository/gcs/GoogleCloudStoragePlugin.java index 77d8e23c9e5..33022ff449e 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/plugin/repository/gcs/GoogleCloudStoragePlugin.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/plugin/repository/gcs/GoogleCloudStoragePlugin.java @@ -19,6 +19,11 @@ package org.elasticsearch.plugin.repository.gcs; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Collection; +import java.util.Collections; + import com.google.api.client.auth.oauth2.TokenRequest; import com.google.api.client.auth.oauth2.TokenResponse; import com.google.api.client.googleapis.json.GoogleJsonError; @@ -35,16 +40,10 @@ import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; import org.elasticsearch.SpecialPermission; import org.elasticsearch.common.inject.Module; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.RepositoriesModule; import org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.Collection; -import java.util.Collections; - public class GoogleCloudStoragePlugin extends Plugin { public static final String NAME = "repository-gcs"; @@ -115,7 +114,6 @@ public class GoogleCloudStoragePlugin extends Plugin { } public void onModule(RepositoriesModule repositoriesModule) { - repositoriesModule.registerRepository(GoogleCloudStorageRepository.TYPE, - GoogleCloudStorageRepository.class, BlobStoreIndexShardRepository.class); + repositoriesModule.registerRepository(GoogleCloudStorageRepository.TYPE, GoogleCloudStorageRepository.class); } } diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index 3616e18e83d..df2f054fa91 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.plugin.repository.gcs.GoogleCloudStoragePlugin; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryName; @@ -75,9 +74,8 @@ public class GoogleCloudStorageRepository extends BlobStoreRepository { @Inject public GoogleCloudStorageRepository(RepositoryName repositoryName, RepositorySettings repositorySettings, - IndexShardRepository indexShardRepository, GoogleCloudStorageService storageService) throws Exception { - super(repositoryName.getName(), repositorySettings, indexShardRepository); + super(repositoryName.getName(), repositorySettings); String bucket = get(BUCKET, repositoryName, repositorySettings); String application = get(APPLICATION_NAME, repositoryName, repositorySettings); diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java index ab539aeea1c..3e8f484e3e7 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java @@ -26,7 +26,6 @@ import java.security.PrivilegedAction; import org.elasticsearch.SpecialPermission; import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.RepositoriesModule; @@ -85,6 +84,6 @@ public final class HdfsPlugin extends Plugin { } public void onModule(RepositoriesModule repositoriesModule) { - repositoriesModule.registerRepository("hdfs", HdfsRepository.class, BlobStoreIndexShardRepository.class); + repositoriesModule.registerRepository("hdfs", HdfsRepository.class); } } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java index 1e8e267bd41..de8fb155abb 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java @@ -44,7 +44,6 @@ import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositorySettings; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; @@ -63,8 +62,8 @@ public final class HdfsRepository extends BlobStoreRepository { private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB); @Inject - public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException { - super(name.getName(), repositorySettings, indexShardRepository); + public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings) throws IOException { + super(name.getName(), repositorySettings); this.repositorySettings = repositorySettings; this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", null); diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/plugin/repository/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/elasticsearch/plugin/repository/s3/S3RepositoryPlugin.java index 7d52a067b65..fc55e1b0ff1 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/plugin/repository/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/plugin/repository/s3/S3RepositoryPlugin.java @@ -19,18 +19,6 @@ package org.elasticsearch.plugin.repository.s3; -import org.elasticsearch.SpecialPermission; -import org.elasticsearch.cloud.aws.AwsS3Service; -import org.elasticsearch.cloud.aws.S3Module; -import org.elasticsearch.common.component.LifecycleComponent; -import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.SettingsModule; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.repositories.RepositoriesModule; -import org.elasticsearch.repositories.s3.S3Repository; - import java.security.AccessController; import java.security.PrivilegedAction; import java.util.ArrayList; @@ -39,6 +27,16 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import org.elasticsearch.SpecialPermission; +import org.elasticsearch.cloud.aws.AwsS3Service; +import org.elasticsearch.cloud.aws.S3Module; +import org.elasticsearch.common.component.LifecycleComponent; +import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.RepositoriesModule; +import org.elasticsearch.repositories.s3.S3Repository; + /** * */ @@ -78,7 +76,7 @@ public class S3RepositoryPlugin extends Plugin { } public void onModule(RepositoriesModule repositoriesModule) { - repositoriesModule.registerRepository(S3Repository.TYPE, S3Repository.class, BlobStoreIndexShardRepository.class); + repositoriesModule.registerRepository(S3Repository.TYPE, S3Repository.class); } @Override diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index e3b7e8296e0..182612041f1 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -32,7 +32,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositorySettings; @@ -250,12 +249,11 @@ public class S3Repository extends BlobStoreRepository { * * @param name repository name * @param repositorySettings repository settings - * @param indexShardRepository index shard repository * @param s3Service S3 service */ @Inject - public S3Repository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AwsS3Service s3Service) throws IOException { - super(name.getName(), repositorySettings, indexShardRepository); + public S3Repository(RepositoryName name, RepositorySettings repositorySettings, AwsS3Service s3Service) throws IOException { + super(name.getName(), repositorySettings); String bucket = getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING); if (bucket == null) { diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/cloud/aws/RepositoryS3SettingsTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/cloud/aws/RepositoryS3SettingsTests.java index 4cb8e4d3abb..c11cb969570 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/cloud/aws/RepositoryS3SettingsTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/cloud/aws/RepositoryS3SettingsTests.java @@ -336,7 +336,7 @@ public class RepositoryS3SettingsTests extends ESTestCase { .build()); try { - new S3Repository(new RepositoryName("s3", "s3repo"), s3RepositorySettings, null, null); + new S3Repository(new RepositoryName("s3", "s3repo"), s3RepositorySettings, null); fail("We should either raise a NPE or a RepositoryException or a IllegalArgumentException"); } catch (RepositoryException e) { assertThat(e.getDetailedMessage(), containsString(expectedMessage));