diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ab2fe683e81..a97aad7abfa 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -94,7 +94,6 @@ import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.ShardSearchStats; import org.elasticsearch.index.similarity.SimilarityService; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store.MetadataSnapshot; import org.elasticsearch.index.store.StoreFileMetaData; @@ -111,6 +110,7 @@ import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTargetService; import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.Repository; import org.elasticsearch.search.suggest.completion.CompletionFieldStats; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.search.suggest.completion2x.Completion090PostingsFormat; @@ -1161,7 +1161,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return storeRecovery.recoverFromStore(this, shouldExist); } - public boolean restoreFromRepository(IndexShardRepository repository) { + public boolean restoreFromRepository(Repository repository) { assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); return storeRecovery.recoverFromRepository(this, repository); @@ -1449,9 +1449,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread threadPool.generic().execute(() -> { try { - final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository( + final Repository repository = repositoriesService.repository( recoveryState.getRestoreSource().snapshot().getRepository()); - if (restoreFromRepository(indexShardRepository)) { + if (restoreFromRepository(repository)) { recoveryListener.onRecoveryDone(recoveryState); } } catch (Exception first) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 01513aa6bc6..c1016f0f4e3 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -40,10 +40,10 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.Repository; import java.io.IOException; import java.util.Arrays; @@ -219,14 +219,14 @@ final class StoreRecovery { } /** - * Recovers an index from a given {@link IndexShardRepository}. This method restores a + * Recovers an index from a given {@link Repository}. This method restores a * previously created index snapshot into an existing initializing shard. * @param indexShard the index shard instance to recovery the snapshot from * @param repository the repository holding the physical files the shard should be recovered from * @return true if the shard has been recovered successfully, false if the recovery * has been ignored due to a concurrent modification of if the clusters state has changed due to async updates. */ - boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository) { + boolean recoverFromRepository(final IndexShard indexShard, Repository repository) { if (canRecover(indexShard)) { final ShardRouting shardRouting = indexShard.routingEntry(); if (shardRouting.restoreSource() == null) { @@ -380,7 +380,7 @@ final class StoreRecovery { /** * Restores shard from {@link RestoreSource} associated with this shard in routing table */ - private void restore(final IndexShard indexShard, final IndexShardRepository indexShardRepository) { + private void restore(final IndexShard indexShard, final Repository repository) { RestoreSource restoreSource = indexShard.routingEntry().restoreSource(); final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog(); if (restoreSource == null) { @@ -397,7 +397,7 @@ final class StoreRecovery { if (!shardId.getIndexName().equals(restoreSource.index())) { snapshotShardId = new ShardId(restoreSource.index(), IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id()); } - indexShardRepository.restore(restoreSource.snapshot().getSnapshotId(), restoreSource.version(), shardId, snapshotShardId, indexShard.recoveryState()); + repository.restore(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), snapshotShardId, indexShard.recoveryState()); indexShard.skipTranslogRecovery(); indexShard.finalizeRecovery(); indexShard.postRecovery("restore done"); diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java deleted file mode 100644 index 5988d82def2..00000000000 --- a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.snapshots; - -import org.apache.lucene.index.IndexCommit; -import org.elasticsearch.Version; -import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.recovery.RecoveryState; - -/** - * Shard-level snapshot repository - *

- * IndexShardRepository is used on data node to create snapshots of individual shards. See {@link org.elasticsearch.repositories.Repository} - * for more information. - */ -public interface IndexShardRepository { - - /** - * Creates a snapshot of the shard based on the index commit point. - *

- * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex} method. - * IndexShardRepository implementations shouldn't release the snapshot index commit point. It is done by the method caller. - *

- * As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check - * {@link IndexShardSnapshotStatus#aborted()} to see if the snapshot process should be aborted. - * - * @param snapshotId snapshot id - * @param shardId shard to be snapshotted - * @param snapshotIndexCommit commit point - * @param snapshotStatus snapshot status - */ - void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus); - - /** - * Restores snapshot of the shard. - *

- * The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied. - * - * @param snapshotId snapshot id - * @param shardId shard id (in the current index) - * @param version version of elasticsearch that created this snapshot - * @param snapshotShardId shard id (in the snapshot) - * @param recoveryState recovery state - */ - void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState); - - /** - * Retrieve shard snapshot status for the stored snapshot - * - * @param snapshotId snapshot id - * @param version version of elasticsearch that created this snapshot - * @param shardId shard id - * @return snapshot status - */ - IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId); - - /** - * Verifies repository settings on data node - * @param verificationToken value returned by {@link org.elasticsearch.repositories.Repository#startVerification()} - */ - void verify(String verificationToken); - -} diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java deleted file mode 100644 index 526aab7581b..00000000000 --- a/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ /dev/null @@ -1,972 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.snapshots.blobstore; - -import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; -import org.apache.lucene.store.RateLimiter; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefBuilder; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.Version; -import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.ParseFieldMatcher; -import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.blobstore.BlobMetaData; -import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.lucene.store.InputStreamIndexInput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.iterable.Iterables; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.snapshots.IndexShardRepository; -import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; -import org.elasticsearch.index.snapshots.IndexShardSnapshotException; -import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; -import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; -import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.store.StoreFileMetaData; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.indices.recovery.RecoveryState; -import org.elasticsearch.repositories.RepositoryName; -import org.elasticsearch.repositories.RepositoryVerificationException; -import org.elasticsearch.repositories.blobstore.BlobStoreFormat; -import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat; -import org.elasticsearch.repositories.blobstore.LegacyBlobStoreFormat; - -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static java.util.Collections.emptyMap; -import static java.util.Collections.unmodifiableMap; -import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.testBlobPrefix; - -/** - * Blob store based implementation of IndexShardRepository - */ -public class BlobStoreIndexShardRepository extends AbstractComponent implements IndexShardRepository { - - private static final int BUFFER_SIZE = 4096; - private BlobStore blobStore; - - private BlobPath basePath; - - private final String repositoryName; - - private ByteSizeValue chunkSize; - - private final IndicesService indicesService; - - private final ClusterService clusterService; - - private RateLimiter snapshotRateLimiter; - - private RateLimiter restoreRateLimiter; - - private RateLimitingInputStream.Listener snapshotThrottleListener; - - private RateLimitingInputStream.Listener restoreThrottleListener; - - private boolean compress; - - private final ParseFieldMatcher parseFieldMatcher; - - protected static final String LEGACY_SNAPSHOT_PREFIX = "snapshot-"; - - protected static final String LEGACY_SNAPSHOT_NAME_FORMAT = LEGACY_SNAPSHOT_PREFIX + "%s"; - - protected static final String SNAPSHOT_PREFIX = "snap-"; - - protected static final String SNAPSHOT_NAME_FORMAT = SNAPSHOT_PREFIX + "%s.dat"; - - protected static final String SNAPSHOT_CODEC = "snapshot"; - - protected static final String SNAPSHOT_INDEX_PREFIX = "index-"; - - protected static final String SNAPSHOT_INDEX_NAME_FORMAT = SNAPSHOT_INDEX_PREFIX + "%s"; - - protected static final String SNAPSHOT_INDEX_CODEC = "snapshots"; - - protected static final String DATA_BLOB_PREFIX = "__"; - - private ChecksumBlobStoreFormat indexShardSnapshotFormat; - - private LegacyBlobStoreFormat indexShardSnapshotLegacyFormat; - - private ChecksumBlobStoreFormat indexShardSnapshotsFormat; - - @Inject - public BlobStoreIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) { - super(settings); - this.parseFieldMatcher = new ParseFieldMatcher(settings); - this.repositoryName = repositoryName.name(); - this.indicesService = indicesService; - this.clusterService = clusterService; - } - - /** - * Called by {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository} on repository startup - * - * @param blobStore blob store - * @param basePath base path to blob store - * @param chunkSize chunk size - */ - public void initialize(BlobStore blobStore, BlobPath basePath, ByteSizeValue chunkSize, - RateLimiter snapshotRateLimiter, RateLimiter restoreRateLimiter, - final RateLimiterListener rateLimiterListener, boolean compress) { - this.blobStore = blobStore; - this.basePath = basePath; - this.chunkSize = chunkSize; - this.snapshotRateLimiter = snapshotRateLimiter; - this.restoreRateLimiter = restoreRateLimiter; - this.snapshotThrottleListener = nanos -> rateLimiterListener.onSnapshotPause(nanos); - this.restoreThrottleListener = nanos -> rateLimiterListener.onRestorePause(nanos); - this.compress = compress; - indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress()); - indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher); - indexShardSnapshotsFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_INDEX_CODEC, SNAPSHOT_INDEX_NAME_FORMAT, BlobStoreIndexShardSnapshots.PROTO, parseFieldMatcher, isCompress()); - } - - /** - * {@inheritDoc} - */ - @Override - public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { - SnapshotContext snapshotContext = new SnapshotContext(snapshotId, shardId, snapshotStatus); - snapshotStatus.startTime(System.currentTimeMillis()); - - try { - snapshotContext.snapshot(snapshotIndexCommit); - snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime()); - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE); - } catch (Exception e) { - snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime()); - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FAILURE); - snapshotStatus.failure(ExceptionsHelper.detailedMessage(e)); - if (e instanceof IndexShardSnapshotFailedException) { - throw (IndexShardSnapshotFailedException) e; - } else { - throw new IndexShardSnapshotFailedException(shardId, e); - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) { - final RestoreContext snapshotContext = new RestoreContext(snapshotId, version, shardId, snapshotShardId, recoveryState); - try { - snapshotContext.restore(); - } catch (Exception e) { - throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e); - } - } - - /** - * {@inheritDoc} - */ - @Override - public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) { - Context context = new Context(snapshotId, version, shardId); - BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot(); - IndexShardSnapshotStatus status = new IndexShardSnapshotStatus(); - status.updateStage(IndexShardSnapshotStatus.Stage.DONE); - status.startTime(snapshot.startTime()); - status.files(snapshot.numberOfFiles(), snapshot.totalSize()); - // The snapshot is done which means the number of processed files is the same as total - status.processedFiles(snapshot.numberOfFiles(), snapshot.totalSize()); - status.time(snapshot.time()); - return status; - } - - @Override - public void verify(String seed) { - BlobContainer testBlobContainer = blobStore.blobContainer(basePath.add(testBlobPrefix(seed))); - DiscoveryNode localNode = clusterService.localNode(); - if (testBlobContainer.blobExists("master.dat")) { - try { - testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", new BytesArray(seed)); - } catch (IOException exp) { - throw new RepositoryVerificationException(repositoryName, "store location [" + blobStore + "] is not accessible on the node [" + localNode + "]", exp); - } - } else { - throw new RepositoryVerificationException(repositoryName, "a file written by master to the store [" + blobStore + "] cannot be accessed on the node [" + localNode + "]. " - + "This might indicate that the store [" + blobStore + "] is not shared between this node and the master node or " - + "that permissions on the store don't allow reading files written by the master node"); - } - } - - /** - * Delete shard snapshot - * - * @param snapshotId snapshot id - * @param shardId shard id - */ - public void delete(SnapshotId snapshotId, Version version, ShardId shardId) { - Context context = new Context(snapshotId, version, shardId, shardId); - context.delete(); - } - - @Override - public String toString() { - return "BlobStoreIndexShardRepository[" + - "[" + repositoryName + - "], [" + blobStore + ']' + - ']'; - } - - /** - * Returns true if metadata files should be compressed - * - * @return true if compression is needed - */ - protected boolean isCompress() { - return compress; - } - - BlobStoreFormat indexShardSnapshotFormat(Version version) { - if (BlobStoreRepository.legacyMetaData(version)) { - return indexShardSnapshotLegacyFormat; - } else { - return indexShardSnapshotFormat; - } - } - - /** - * Context for snapshot/restore operations - */ - private class Context { - - protected final SnapshotId snapshotId; - - protected final ShardId shardId; - - protected final BlobContainer blobContainer; - - protected final Version version; - - public Context(SnapshotId snapshotId, Version version, ShardId shardId) { - this(snapshotId, version, shardId, shardId); - } - - public Context(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId) { - this.snapshotId = snapshotId; - this.version = version; - this.shardId = shardId; - blobContainer = blobStore.blobContainer(basePath.add("indices").add(snapshotShardId.getIndexName()).add(Integer.toString(snapshotShardId.getId()))); - } - - /** - * Delete shard snapshot - */ - public void delete() { - final Map blobs; - try { - blobs = blobContainer.listBlobs(); - } catch (IOException e) { - throw new IndexShardSnapshotException(shardId, "Failed to list content of gateway", e); - } - - Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); - BlobStoreIndexShardSnapshots snapshots = tuple.v1(); - int fileListGeneration = tuple.v2(); - - try { - indexShardSnapshotFormat(version).delete(blobContainer, snapshotId.getName()); - } catch (IOException e) { - logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId); - } - - // Build a list of snapshots that should be preserved - List newSnapshotsList = new ArrayList<>(); - for (SnapshotFiles point : snapshots) { - if (!point.snapshot().equals(snapshotId.getName())) { - newSnapshotsList.add(point); - } - } - // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index - finalize(newSnapshotsList, fileListGeneration + 1, blobs); - } - - /** - * Loads information about shard snapshot - */ - public BlobStoreIndexShardSnapshot loadSnapshot() { - try { - return indexShardSnapshotFormat(version).read(blobContainer, snapshotId.getName()); - } catch (IOException ex) { - throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex); - } - } - - /** - * Removes all unreferenced files from the repository and writes new index file - * - * We need to be really careful in handling index files in case of failures to make sure we have index file that - * points to files that were deleted. - * - * - * @param snapshots list of active snapshots in the container - * @param fileListGeneration the generation number of the snapshot index file - * @param blobs list of blobs in the container - */ - protected void finalize(List snapshots, int fileListGeneration, Map blobs) { - BlobStoreIndexShardSnapshots newSnapshots = new BlobStoreIndexShardSnapshots(snapshots); - List blobsToDelete = new ArrayList<>(); - // delete old index files first - for (String blobName : blobs.keySet()) { - // delete old file lists - if (indexShardSnapshotsFormat.isTempBlobName(blobName) || blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) { - blobsToDelete.add(blobName); - } - } - - try { - blobContainer.deleteBlobs(blobsToDelete); - } catch (IOException e) { - // We cannot delete index file - this is fatal, we cannot continue, otherwise we might end up - // with references to non-existing files - throw new IndexShardSnapshotFailedException(shardId, "error deleting index files during cleanup", e); - } - - blobsToDelete = new ArrayList<>(); - // now go over all the blobs, and if they don't exists in a snapshot, delete them - for (String blobName : blobs.keySet()) { - // delete unused files - if (blobName.startsWith(DATA_BLOB_PREFIX)) { - if (newSnapshots.findNameFile(FileInfo.canonicalName(blobName)) == null) { - blobsToDelete.add(blobName); - } - } - } - try { - blobContainer.deleteBlobs(blobsToDelete); - } catch (IOException e) { - logger.debug("[{}] [{}] error deleting some of the blobs [{}] during cleanup", e, snapshotId, shardId, blobsToDelete); - } - - // If we deleted all snapshots - we don't need to create the index file - if (snapshots.size() > 0) { - try { - indexShardSnapshotsFormat.writeAtomic(newSnapshots, blobContainer, Integer.toString(fileListGeneration)); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to write file list", e); - } - } - } - - /** - * Generates blob name - * - * @param generation the blob number - * @return the blob name - */ - protected String fileNameFromGeneration(long generation) { - return DATA_BLOB_PREFIX + Long.toString(generation, Character.MAX_RADIX); - } - - /** - * Finds the next available blob number - * - * @param blobs list of blobs in the repository - * @return next available blob number - */ - protected long findLatestFileNameGeneration(Map blobs) { - long generation = -1; - for (String name : blobs.keySet()) { - if (!name.startsWith(DATA_BLOB_PREFIX)) { - continue; - } - name = FileInfo.canonicalName(name); - try { - long currentGen = Long.parseLong(name.substring(DATA_BLOB_PREFIX.length()), Character.MAX_RADIX); - if (currentGen > generation) { - generation = currentGen; - } - } catch (NumberFormatException e) { - logger.warn("file [{}] does not conform to the '{}' schema", name, DATA_BLOB_PREFIX); - } - } - return generation; - } - - /** - * Loads all available snapshots in the repository - * - * @param blobs list of blobs in repository - * @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation - */ - protected Tuple buildBlobStoreIndexShardSnapshots(Map blobs) { - int latest = -1; - for (String name : blobs.keySet()) { - if (name.startsWith(SNAPSHOT_INDEX_PREFIX)) { - try { - int gen = Integer.parseInt(name.substring(SNAPSHOT_INDEX_PREFIX.length())); - if (gen > latest) { - latest = gen; - } - } catch (NumberFormatException ex) { - logger.warn("failed to parse index file name [{}]", name); - } - } - } - if (latest >= 0) { - try { - final BlobStoreIndexShardSnapshots shardSnapshots = - indexShardSnapshotsFormat.read(blobContainer, Integer.toString(latest)); - return new Tuple<>(shardSnapshots, latest); - } catch (IOException e) { - logger.warn("failed to read index file [{}]", e, SNAPSHOT_INDEX_PREFIX + latest); - } - } - - // We couldn't load the index file - falling back to loading individual snapshots - List snapshots = new ArrayList<>(); - for (String name : blobs.keySet()) { - try { - BlobStoreIndexShardSnapshot snapshot = null; - if (name.startsWith(SNAPSHOT_PREFIX)) { - snapshot = indexShardSnapshotFormat.readBlob(blobContainer, name); - } else if (name.startsWith(LEGACY_SNAPSHOT_PREFIX)) { - snapshot = indexShardSnapshotLegacyFormat.readBlob(blobContainer, name); - } - if (snapshot != null) { - snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); - } - } catch (IOException e) { - logger.warn("failed to read commit point [{}]", e, name); - } - } - return new Tuple<>(new BlobStoreIndexShardSnapshots(snapshots), -1); - } - } - - /** - * Context for snapshot operations - */ - private class SnapshotContext extends Context { - - private final Store store; - - private final IndexShardSnapshotStatus snapshotStatus; - - /** - * Constructs new context - * - * @param snapshotId snapshot id - * @param shardId shard to be snapshotted - * @param snapshotStatus snapshot status to report progress - */ - public SnapshotContext(SnapshotId snapshotId, ShardId shardId, IndexShardSnapshotStatus snapshotStatus) { - super(snapshotId, Version.CURRENT, shardId); - this.snapshotStatus = snapshotStatus; - store = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()).store(); - } - - /** - * Create snapshot from index commit point - * - * @param snapshotIndexCommit snapshot commit point - */ - public void snapshot(IndexCommit snapshotIndexCommit) { - logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, repositoryName); - store.incRef(); - try { - final Map blobs; - try { - blobs = blobContainer.listBlobs(); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e); - } - - long generation = findLatestFileNameGeneration(blobs); - Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); - BlobStoreIndexShardSnapshots snapshots = tuple.v1(); - int fileListGeneration = tuple.v2(); - - final List indexCommitPointFiles = new ArrayList<>(); - - int indexNumberOfFiles = 0; - long indexTotalFilesSize = 0; - ArrayList filesToSnapshot = new ArrayList<>(); - final Store.MetadataSnapshot metadata; - // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should - final Collection fileNames; - try { - metadata = store.getMetadata(snapshotIndexCommit); - fileNames = snapshotIndexCommit.getFileNames(); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); - } - for (String fileName : fileNames) { - if (snapshotStatus.aborted()) { - logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); - throw new IndexShardSnapshotFailedException(shardId, "Aborted"); - } - logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName); - final StoreFileMetaData md = metadata.get(fileName); - FileInfo existingFileInfo = null; - List filesInfo = snapshots.findPhysicalIndexFiles(fileName); - if (filesInfo != null) { - for (FileInfo fileInfo : filesInfo) { - try { - // in 1.3.3 we added additional hashes for .si / segments_N files - // to ensure we don't double the space in the repo since old snapshots - // don't have this hash we try to read that hash from the blob store - // in a bwc compatible way. - maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata); - } catch (Exception e) { - logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); - } - if (fileInfo.isSame(md) && snapshotFileExistsInBlobs(fileInfo, blobs)) { - // a commit point file with the same name, size and checksum was already copied to repository - // we will reuse it for this snapshot - existingFileInfo = fileInfo; - break; - } - } - } - if (existingFileInfo == null) { - indexNumberOfFiles++; - indexTotalFilesSize += md.length(); - // create a new FileInfo - BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = new BlobStoreIndexShardSnapshot.FileInfo(fileNameFromGeneration(++generation), md, chunkSize); - indexCommitPointFiles.add(snapshotFileInfo); - filesToSnapshot.add(snapshotFileInfo); - } else { - indexCommitPointFiles.add(existingFileInfo); - } - } - - snapshotStatus.files(indexNumberOfFiles, indexTotalFilesSize); - - if (snapshotStatus.aborted()) { - logger.debug("[{}] [{}] Aborted during initialization", shardId, snapshotId); - throw new IndexShardSnapshotFailedException(shardId, "Aborted"); - } - - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.STARTED); - - for (FileInfo snapshotFileInfo : filesToSnapshot) { - try { - snapshotFile(snapshotFileInfo); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); - } - } - - snapshotStatus.indexVersion(snapshotIndexCommit.getGeneration()); - // now create and write the commit point - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE); - - BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), - snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(), - // snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong - System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize); - //TODO: The time stored in snapshot doesn't include cleanup time. - logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); - try { - indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getName()); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); - } - - // delete all files that are not referenced by any commit point - // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones - List newSnapshotsList = new ArrayList<>(); - newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); - for (SnapshotFiles point : snapshots) { - newSnapshotsList.add(point); - } - // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index - finalize(newSnapshotsList, fileListGeneration + 1, blobs); - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE); - } finally { - store.decRef(); - } - } - - /** - * Snapshot individual file - *

- * This is asynchronous method. Upon completion of the operation latch is getting counted down and any failures are - * added to the {@code failures} list - * - * @param fileInfo file to be snapshotted - */ - private void snapshotFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) throws IOException { - final String file = fileInfo.physicalName(); - try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) { - for (int i = 0; i < fileInfo.numberOfParts(); i++) { - final long partBytes = fileInfo.partBytes(i); - - final InputStreamIndexInput inputStreamIndexInput = new InputStreamIndexInput(indexInput, partBytes); - InputStream inputStream = snapshotRateLimiter == null ? inputStreamIndexInput : new RateLimitingInputStream(inputStreamIndexInput, snapshotRateLimiter, snapshotThrottleListener); - inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName()); - blobContainer.writeBlob(fileInfo.partName(i), inputStream, partBytes); - } - Store.verify(indexInput); - snapshotStatus.addProcessedFile(fileInfo.length()); - } catch (Exception t) { - failStoreIfCorrupted(t); - snapshotStatus.addProcessedFile(0); - throw t; - } - } - - private void failStoreIfCorrupted(Exception e) { - if (e instanceof CorruptIndexException || e instanceof IndexFormatTooOldException || e instanceof IndexFormatTooNewException) { - try { - store.markStoreCorrupted((IOException) e); - } catch (IOException inner) { - inner.addSuppressed(e); - logger.warn("store cannot be marked as corrupted", inner); - } - } - } - - /** - * Checks if snapshot file already exists in the list of blobs - * - * @param fileInfo file to check - * @param blobs list of blobs - * @return true if file exists in the list of blobs - */ - private boolean snapshotFileExistsInBlobs(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Map blobs) { - BlobMetaData blobMetaData = blobs.get(fileInfo.name()); - if (blobMetaData != null) { - return blobMetaData.length() == fileInfo.length(); - } else if (blobs.containsKey(fileInfo.partName(0))) { - // multi part file sum up the size and check - int part = 0; - long totalSize = 0; - while (true) { - blobMetaData = blobs.get(fileInfo.partName(part++)); - if (blobMetaData == null) { - break; - } - totalSize += blobMetaData.length(); - } - return totalSize == fileInfo.length(); - } - // no file, not exact and not multipart - return false; - } - - private class AbortableInputStream extends FilterInputStream { - private final String fileName; - - public AbortableInputStream(InputStream delegate, String fileName) { - super(delegate); - this.fileName = fileName; - } - - @Override - public int read() throws IOException { - checkAborted(); - return in.read(); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - checkAborted(); - return in.read(b, off, len); - } - - private void checkAborted() { - if (snapshotStatus.aborted()) { - logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); - throw new IndexShardSnapshotFailedException(shardId, "Aborted"); - } - } - } - } - - /** - * This is a BWC layer to ensure we update the snapshots metadata with the corresponding hashes before we compare them. - * The new logic for StoreFileMetaData reads the entire .si and segments.n files to strengthen the - * comparison of the files on a per-segment / per-commit level. - */ - private static void maybeRecalculateMetadataHash(final BlobContainer blobContainer, final FileInfo fileInfo, Store.MetadataSnapshot snapshot) throws Exception { - final StoreFileMetaData metadata; - if (fileInfo != null && (metadata = snapshot.get(fileInfo.physicalName())) != null) { - if (metadata.hash().length > 0 && fileInfo.metadata().hash().length == 0) { - // we have a hash - check if our repo has a hash too otherwise we have - // to calculate it. - // we might have multiple parts even though the file is small... make sure we read all of it. - try (final InputStream stream = new PartSliceStream(blobContainer, fileInfo)) { - BytesRefBuilder builder = new BytesRefBuilder(); - Store.MetadataSnapshot.hashFile(builder, stream, fileInfo.length()); - BytesRef hash = fileInfo.metadata().hash(); // reset the file infos metadata hash - assert hash.length == 0; - hash.bytes = builder.bytes(); - hash.offset = 0; - hash.length = builder.length(); - } - } - } - } - - private static final class PartSliceStream extends SlicedInputStream { - - private final BlobContainer container; - private final FileInfo info; - - public PartSliceStream(BlobContainer container, FileInfo info) { - super(info.numberOfParts()); - this.info = info; - this.container = container; - } - - @Override - protected InputStream openSlice(long slice) throws IOException { - return container.readBlob(info.partName(slice)); - } - } - - /** - * Context for restore operations - */ - private class RestoreContext extends Context { - - private final Store store; - - private final RecoveryState recoveryState; - - /** - * Constructs new restore context - * - * @param snapshotId snapshot id - * @param shardId shard to be restored - * @param snapshotShardId shard in the snapshot that data should be restored from - * @param recoveryState recovery state to report progress - */ - public RestoreContext(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) { - super(snapshotId, version, shardId, snapshotShardId); - this.recoveryState = recoveryState; - store = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()).store(); - } - - /** - * Performs restore operation - */ - public void restore() throws IOException { - store.incRef(); - try { - logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId); - BlobStoreIndexShardSnapshot snapshot = loadSnapshot(); - - if (snapshot.indexFiles().size() == 1 - && snapshot.indexFiles().get(0).physicalName().startsWith("segments_") - && snapshot.indexFiles().get(0).hasUnknownChecksum()) { - // If the shard has no documents, it will only contain a single segments_N file for the - // shard's snapshot. If we are restoring a snapshot created by a previous supported version, - // it is still possible that in that version, an empty shard has a segments_N file with an unsupported - // version (and no checksum), because we don't know the Lucene version to assign segments_N until we - // have written some data. Since the segments_N for an empty shard could have an incompatible Lucene - // version number and no checksum, even though the index itself is perfectly fine to restore, this - // empty shard would cause exceptions to be thrown. Since there is no data to restore from an empty - // shard anyway, we just create the empty shard here and then exit. - IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(null) - .setOpenMode(IndexWriterConfig.OpenMode.CREATE) - .setCommitOnClose(true)); - writer.close(); - return; - } - - SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); - final Store.MetadataSnapshot recoveryTargetMetadata; - try { - recoveryTargetMetadata = store.getMetadataOrEmpty(); - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) { - logger.warn("{} Can't read metadata from store", e, shardId); - throw new IndexShardRestoreFailedException(shardId, "Can't restore corrupted shard", e); - } - - final List filesToRecover = new ArrayList<>(); - final Map snapshotMetaData = new HashMap<>(); - final Map fileInfos = new HashMap<>(); - for (final FileInfo fileInfo : snapshot.indexFiles()) { - try { - // in 1.3.3 we added additional hashes for .si / segments_N files - // to ensure we don't double the space in the repo since old snapshots - // don't have this hash we try to read that hash from the blob store - // in a bwc compatible way. - maybeRecalculateMetadataHash(blobContainer, fileInfo, recoveryTargetMetadata); - } catch (Exception e) { - // if the index is broken we might not be able to read it - logger.warn("{} Can't calculate hash from blog for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); - } - snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata()); - fileInfos.put(fileInfo.metadata().name(), fileInfo); - } - final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(unmodifiableMap(snapshotMetaData), emptyMap(), 0); - final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata); - for (StoreFileMetaData md : diff.identical) { - FileInfo fileInfo = fileInfos.get(md.name()); - recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), true); - if (logger.isTraceEnabled()) { - logger.trace("[{}] [{}] not_recovering [{}] from [{}], exists in local store and is same", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name()); - } - } - - for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) { - FileInfo fileInfo = fileInfos.get(md.name()); - filesToRecover.add(fileInfo); - recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), false); - if (logger.isTraceEnabled()) { - if (md == null) { - logger.trace("[{}] [{}] recovering [{}] from [{}], does not exists in local store", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name()); - } else { - logger.trace("[{}] [{}] recovering [{}] from [{}], exists in local store but is different", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name()); - } - } - } - final RecoveryState.Index index = recoveryState.getIndex(); - if (filesToRecover.isEmpty()) { - logger.trace("no files to recover, all exists within the local store"); - } - - if (logger.isTraceEnabled()) { - logger.trace("[{}] [{}] recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", shardId, snapshotId, - index.totalRecoverFiles(), new ByteSizeValue(index.totalRecoverBytes()), index.reusedFileCount(), new ByteSizeValue(index.reusedFileCount())); - } - try { - for (final FileInfo fileToRecover : filesToRecover) { - logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); - restoreFile(fileToRecover); - } - } catch (IOException ex) { - throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex); - } - final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile(); - if (recoveryTargetMetadata == null) { - throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file"); - } - assert restoredSegmentsFile != null; - // read the snapshot data persisted - final SegmentInfos segmentCommitInfos; - try { - segmentCommitInfos = Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory()); - } catch (IOException e) { - throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); - } - recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion()); - - /// now, go over and clean files that are in the store, but were not in the snapshot - try { - for (String storeFile : store.directory().listAll()) { - if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) { - continue; //skip write.lock, checksum files and files that exist in the snapshot - } - try { - store.deleteQuiet("restore", storeFile); - store.directory().deleteFile(storeFile); - } catch (IOException e) { - logger.warn("[{}] failed to delete file [{}] during snapshot cleanup", snapshotId, storeFile); - } - } - } catch (IOException e) { - logger.warn("[{}] failed to list directory - some of files might not be deleted", snapshotId); - } - } finally { - store.decRef(); - } - } - - /** - * Restores a file - * This is asynchronous method. Upon completion of the operation latch is getting counted down and any failures are - * added to the {@code failures} list - * - * @param fileInfo file to be restored - */ - private void restoreFile(final FileInfo fileInfo) throws IOException { - boolean success = false; - - try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) { - final InputStream stream; - if (restoreRateLimiter == null) { - stream = partSliceStream; - } else { - stream = new RateLimitingInputStream(partSliceStream, restoreRateLimiter, restoreThrottleListener); - } - try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { - final byte[] buffer = new byte[BUFFER_SIZE]; - int length; - while ((length = stream.read(buffer)) > 0) { - indexOutput.writeBytes(buffer, 0, length); - recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length); - } - Store.verify(indexOutput); - indexOutput.close(); - store.directory().sync(Collections.singleton(fileInfo.physicalName())); - success = true; - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { - try { - store.markStoreCorrupted(ex); - } catch (IOException e) { - logger.warn("store cannot be marked as corrupted", e); - } - throw ex; - } finally { - if (success == false) { - store.deleteQuiet(fileInfo.physicalName()); - } - } - } - } - - } - - public interface RateLimiterListener { - void onRestorePause(long nanos); - - void onSnapshotPause(long nanos); - } - -} diff --git a/core/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java b/core/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java index acad2753f7a..0ba9cccddaf 100644 --- a/core/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java +++ b/core/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java @@ -21,8 +21,6 @@ package org.elasticsearch.repositories; import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.index.snapshots.IndexShardRepository; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.repositories.uri.URLRepository; import org.elasticsearch.snapshots.RestoreService; @@ -32,20 +30,20 @@ import org.elasticsearch.snapshots.SnapshotsService; /** * Sets up classes for Snapshot/Restore. * - * Plugins can add custom repository types by calling {@link #registerRepository(String, Class, Class)}. + * Plugins can add custom repository types by calling {@link #registerRepository(String, Class)}. */ public class RepositoriesModule extends AbstractModule { private final RepositoryTypesRegistry repositoryTypes = new RepositoryTypesRegistry(); public RepositoriesModule() { - registerRepository(FsRepository.TYPE, FsRepository.class, BlobStoreIndexShardRepository.class); - registerRepository(URLRepository.TYPE, URLRepository.class, BlobStoreIndexShardRepository.class); + registerRepository(FsRepository.TYPE, FsRepository.class); + registerRepository(URLRepository.TYPE, URLRepository.class); } - /** Registers a custom repository type to the given {@link Repository} and {@link IndexShardRepository}. */ - public void registerRepository(String type, Class repositoryType, Class shardRepositoryType) { - repositoryTypes.registerRepository(type, repositoryType, shardRepositoryType); + /** Registers a custom repository type to the given {@link Repository}. */ + public void registerRepository(String type, Class repositoryType) { + repositoryTypes.registerRepository(type, repositoryType); } @Override diff --git a/core/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/core/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 0ed53c0a9c5..6efba054c40 100644 --- a/core/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/core/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -37,7 +37,6 @@ import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.transport.TransportService; @@ -336,23 +335,6 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta throw new RepositoryMissingException(repository); } - /** - * Returns registered index shard repository - *

- * This method is called only on data nodes - * - * @param repository repository name - * @return registered repository - * @throws RepositoryMissingException if repository with such name isn't registered - */ - public IndexShardRepository indexShardRepository(String repository) { - RepositoryHolder holder = repositories.get(repository); - if (holder != null) { - return holder.indexShardRepository; - } - throw new RepositoryMissingException(repository); - } - /** * Creates a new repository and adds it to the list of registered repositories. *

@@ -403,14 +385,16 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta try { ModulesBuilder modules = new ModulesBuilder(); RepositoryName name = new RepositoryName(repositoryMetaData.type(), repositoryMetaData.name()); - modules.add(new RepositoryNameModule(name)); - modules.add(new RepositoryModule(name, repositoryMetaData.settings(), this.settings, typesRegistry)); + modules.add(b -> { + b.bind(RepositoryName.class).toInstance(name); + typesRegistry.bindType(b, repositoryMetaData.type()); + b.bind(RepositorySettings.class).toInstance(new RepositorySettings(settings, repositoryMetaData.settings())); + }); repositoryInjector = modules.createChildInjector(injector); Repository repository = repositoryInjector.getInstance(Repository.class); - IndexShardRepository indexShardRepository = repositoryInjector.getInstance(IndexShardRepository.class); repository.start(); - return new RepositoryHolder(repositoryMetaData.type(), repositoryMetaData.settings(), repository, indexShardRepository); + return new RepositoryHolder(repositoryMetaData.type(), repositoryMetaData.settings(), repository); } catch (Exception e) { logger.warn("failed to create repository [{}][{}]", e, repositoryMetaData.type(), repositoryMetaData.name()); throw new RepositoryException(repositoryMetaData.name(), "failed to create repository", e); @@ -472,13 +456,11 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta private final String type; private final Settings settings; private final Repository repository; - private final IndexShardRepository indexShardRepository; - public RepositoryHolder(String type, Settings settings,Repository repository, IndexShardRepository indexShardRepository) { + public RepositoryHolder(String type, Settings settings,Repository repository) { this.type = type; this.settings = settings; this.repository = repository; - this.indexShardRepository = indexShardRepository; } } diff --git a/core/src/main/java/org/elasticsearch/repositories/Repository.java b/core/src/main/java/org/elasticsearch/repositories/Repository.java index 47a87edd7f9..6c4cf93a7a4 100644 --- a/core/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/core/src/main/java/org/elasticsearch/repositories/Repository.java @@ -19,7 +19,11 @@ package org.elasticsearch.repositories; import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.index.shard.ShardId; @@ -29,22 +33,19 @@ import org.elasticsearch.snapshots.SnapshotShardFailure; import java.io.IOException; import java.util.List; -import java.util.function.Predicate; /** * Snapshot repository interface. *

- * Responsible for index and cluster level operations. It's called only on master. - * Shard-level operations are performed using {@link org.elasticsearch.index.snapshots.IndexShardRepository} - * interface on data nodes. + * Responsible for index and cluster and shard level operations. *

* Typical snapshot usage pattern: *

*/ public interface Repository extends LifecycleComponent { @@ -141,4 +142,49 @@ public interface Repository extends LifecycleComponent { */ boolean readOnly(); + /** + * Creates a snapshot of the shard based on the index commit point. + *

+ * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex} method. + * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller. + *

+ * As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check + * {@link IndexShardSnapshotStatus#aborted()} to see if the snapshot process should be aborted. + * + * @param shard shard to be snapshotted + * @param snapshotId snapshot id + * @param snapshotIndexCommit commit point + * @param snapshotStatus snapshot status + */ + void snapshot(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus); + + /** + * Restores snapshot of the shard. + *

+ * The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied. + * + * @param shard the shard to restore the index into + * @param snapshotId snapshot id + * @param version version of elasticsearch that created this snapshot + * @param snapshotShardId shard id (in the snapshot) + * @param recoveryState recovery state + */ + void restore(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState); + + /** + * Retrieve shard snapshot status for the stored snapshot + * + * @param snapshotId snapshot id + * @param version version of elasticsearch that created this snapshot + * @param shardId shard id + * @return snapshot status + */ + IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId); + + /** + * Verifies repository settings on data node. + * @param verificationToken value returned by {@link org.elasticsearch.repositories.Repository#startVerification()} + * @param localNode the local node information, for inclusion in verification errors + */ + void verify(String verificationToken, DiscoveryNode localNode); } diff --git a/core/src/main/java/org/elasticsearch/repositories/RepositoryModule.java b/core/src/main/java/org/elasticsearch/repositories/RepositoryModule.java deleted file mode 100644 index 3ed3a78c0d1..00000000000 --- a/core/src/main/java/org/elasticsearch/repositories/RepositoryModule.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.repositories; - -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.settings.Settings; - -/** - * Binds repository classes for the specific repository type. - */ -public class RepositoryModule extends AbstractModule { - - private RepositoryName repositoryName; - - private final Settings globalSettings; - - private final Settings settings; - - private final RepositoryTypesRegistry typesRegistry; - - /** - * Spawns module for repository with specified name, type and settings - * - * @param repositoryName repository name and type - * @param settings repository settings - * @param globalSettings global settings - * @param typesRegistry registry of repository types - */ - public RepositoryModule(RepositoryName repositoryName, Settings settings, Settings globalSettings, RepositoryTypesRegistry typesRegistry) { - this.repositoryName = repositoryName; - this.globalSettings = globalSettings; - this.settings = settings; - this.typesRegistry = typesRegistry; - } - - /** - * {@inheritDoc} - */ - @Override - protected void configure() { - typesRegistry.bindType(binder(), repositoryName.type()); - bind(RepositorySettings.class).toInstance(new RepositorySettings(globalSettings, settings)); - } -} diff --git a/core/src/main/java/org/elasticsearch/repositories/RepositoryNameModule.java b/core/src/main/java/org/elasticsearch/repositories/RepositoryNameModule.java deleted file mode 100644 index 47be67df34c..00000000000 --- a/core/src/main/java/org/elasticsearch/repositories/RepositoryNameModule.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.repositories; - -import org.elasticsearch.common.inject.AbstractModule; - -/** - * Binds specific instance of RepositoryName for injection to repository module - */ -public class RepositoryNameModule extends AbstractModule { - - private final RepositoryName repositoryName; - - public RepositoryNameModule(RepositoryName repositoryName) { - this.repositoryName = repositoryName; - } - - @Override - protected void configure() { - bind(RepositoryName.class).toInstance(repositoryName); - } -} diff --git a/core/src/main/java/org/elasticsearch/repositories/RepositoryTypesRegistry.java b/core/src/main/java/org/elasticsearch/repositories/RepositoryTypesRegistry.java index d2f02aa5795..63a790b77fd 100644 --- a/core/src/main/java/org/elasticsearch/repositories/RepositoryTypesRegistry.java +++ b/core/src/main/java/org/elasticsearch/repositories/RepositoryTypesRegistry.java @@ -22,22 +22,18 @@ package org.elasticsearch.repositories; import org.elasticsearch.common.inject.Binder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.ExtensionPoint; -import org.elasticsearch.index.snapshots.IndexShardRepository; /** - * A mapping from type name to implementations of {@link Repository} and {@link IndexShardRepository}. + * A mapping from type name to implementations of {@link Repository}. */ public class RepositoryTypesRegistry { // invariant: repositories and shardRepositories have the same keyset private final ExtensionPoint.SelectedType repositoryTypes = new ExtensionPoint.SelectedType<>("repository", Repository.class); - private final ExtensionPoint.SelectedType shardRepositoryTypes = - new ExtensionPoint.SelectedType<>("index_repository", IndexShardRepository.class); /** Adds a new repository type to the registry, bound to the given implementation classes. */ - public void registerRepository(String name, Class repositoryType, Class shardRepositoryType) { + public void registerRepository(String name, Class repositoryType) { repositoryTypes.registerExtension(name, repositoryType); - shardRepositoryTypes.registerExtension(name, shardRepositoryType); } /** @@ -47,6 +43,5 @@ public class RepositoryTypesRegistry { public void bindType(Binder binder, String type) { Settings settings = Settings.builder().put("type", type).build(); repositoryTypes.bindType(binder, settings, "type", null); - shardRepositoryTypes.bindType(binder, settings, "type", null); } } diff --git a/core/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java b/core/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java index 796c65e9b46..65544421c8c 100644 --- a/core/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java +++ b/core/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java @@ -19,6 +19,12 @@ package org.elasticsearch.repositories; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + import com.carrotsearch.hppc.ObjectContainer; import com.carrotsearch.hppc.cursors.ObjectCursor; import org.elasticsearch.action.ActionListener; @@ -29,7 +35,6 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoriesService.VerifyResponse; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.EmptyTransportResponseHandler; @@ -40,12 +45,6 @@ import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicInteger; - public class VerifyNodeRepositoryAction extends AbstractComponent { public static final String ACTION_NAME = "internal:admin/repository/verify"; @@ -82,7 +81,7 @@ public class VerifyNodeRepositoryAction extends AbstractComponent { for (final DiscoveryNode node : nodes) { if (node.equals(localNode)) { try { - doVerify(repository, verificationToken); + doVerify(repository, verificationToken, localNode); } catch (Exception e) { logger.warn("[{}] failed to verify repository", e, repository); errors.add(new VerificationFailure(node.getId(), e)); @@ -115,9 +114,9 @@ public class VerifyNodeRepositoryAction extends AbstractComponent { listener.onResponse(new RepositoriesService.VerifyResponse(nodes.toArray(new DiscoveryNode[nodes.size()]), errors.toArray(new VerificationFailure[errors.size()]))); } - private void doVerify(String repository, String verificationToken) { - IndexShardRepository blobStoreIndexShardRepository = repositoriesService.indexShardRepository(repository); - blobStoreIndexShardRepository.verify(verificationToken); + private void doVerify(String repositoryName, String verificationToken, DiscoveryNode localNode) { + Repository repository = repositoriesService.repository(repositoryName); + repository.verify(verificationToken, localNode); } public static class VerifyNodeRepositoryRequest extends TransportRequest { @@ -151,8 +150,9 @@ public class VerifyNodeRepositoryAction extends AbstractComponent { class VerifyNodeRepositoryRequestHandler implements TransportRequestHandler { @Override public void messageReceived(VerifyNodeRepositoryRequest request, TransportChannel channel) throws Exception { + DiscoveryNode localNode = clusterService.state().nodes().getLocalNode(); try { - doVerify(request.repository, request.verificationToken); + doVerify(request.repository, request.verificationToken, localNode); } catch (Exception ex) { logger.warn("[{}] failed to verify repository", ex, request.repository); throw ex; diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index b1f1ff66cf5..4c328872742 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -19,12 +19,43 @@ package org.elasticsearch.repositories.blobstore; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimiter; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefBuilder; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Numbers; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.store.InputStreamIndexInput; +import org.elasticsearch.common.util.iterable.Iterables; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; +import org.elasticsearch.index.snapshots.IndexShardSnapshotException; +import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots; +import org.elasticsearch.index.snapshots.blobstore.RateLimitingInputStream; +import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; +import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.Strings; @@ -50,9 +81,6 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.snapshots.IndexShardRepository; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository.RateLimiterListener; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositorySettings; @@ -64,15 +92,21 @@ import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotShardFailure; import java.io.FileNotFoundException; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static java.util.Collections.emptyMap; +import static java.util.Collections.unmodifiableMap; + /** * BlobStore - based implementation of Snapshot Repository *

@@ -116,19 +150,19 @@ import java.util.stream.Collectors; * } * */ -public abstract class BlobStoreRepository extends AbstractLifecycleComponent implements Repository, RateLimiterListener { +public abstract class BlobStoreRepository extends AbstractLifecycleComponent implements Repository { private BlobContainer snapshotsBlobContainer; protected final String repositoryName; + private static final int BUFFER_SIZE = 4096; + private static final String LEGACY_SNAPSHOT_PREFIX = "snapshot-"; private static final String SNAPSHOT_PREFIX = "snap-"; - private static final String SNAPSHOT_SUFFIX = ".dat"; - - private static final String SNAPSHOT_CODEC = "snapshot"; + protected static final String SNAPSHOT_CODEC = "snapshot"; static final String SNAPSHOTS_FILE = "index"; // package private for unit testing @@ -146,11 +180,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private static final String INDEX_METADATA_CODEC = "index-metadata"; - private static final String SNAPSHOT_NAME_FORMAT = SNAPSHOT_PREFIX + "%s" + SNAPSHOT_SUFFIX; + protected static final String LEGACY_SNAPSHOT_NAME_FORMAT = LEGACY_SNAPSHOT_PREFIX + "%s"; - private static final String LEGACY_SNAPSHOT_NAME_FORMAT = LEGACY_SNAPSHOT_PREFIX + "%s"; + protected static final String SNAPSHOT_NAME_FORMAT = SNAPSHOT_PREFIX + "%s.dat"; - private final BlobStoreIndexShardRepository indexShardRepository; + protected static final String SNAPSHOT_INDEX_PREFIX = "index-"; + + protected static final String SNAPSHOT_INDEX_NAME_FORMAT = SNAPSHOT_INDEX_PREFIX + "%s"; + + protected static final String SNAPSHOT_INDEX_CODEC = "snapshots"; + + protected static final String DATA_BLOB_PREFIX = "__"; private final RateLimiter snapshotRateLimiter; @@ -174,29 +214,36 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final boolean readOnly; + private final ParseFieldMatcher parseFieldMatcher; + + private final ChecksumBlobStoreFormat indexShardSnapshotFormat; + + private final LegacyBlobStoreFormat indexShardSnapshotLegacyFormat; + + private final ChecksumBlobStoreFormat indexShardSnapshotsFormat; + /** * Constructs new BlobStoreRepository * * @param repositoryName repository name * @param repositorySettings repository settings - * @param indexShardRepository an instance of IndexShardRepository */ - protected BlobStoreRepository(String repositoryName, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) { + protected BlobStoreRepository(String repositoryName, RepositorySettings repositorySettings) { super(repositorySettings.globalSettings()); this.repositoryName = repositoryName; - this.indexShardRepository = (BlobStoreIndexShardRepository) indexShardRepository; + parseFieldMatcher = new ParseFieldMatcher(settings); snapshotRateLimiter = getRateLimiter(repositorySettings, "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); restoreRateLimiter = getRateLimiter(repositorySettings, "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); readOnly = repositorySettings.settings().getAsBoolean("readonly", false); + indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress()); + indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher); + indexShardSnapshotsFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_INDEX_CODEC, SNAPSHOT_INDEX_NAME_FORMAT, BlobStoreIndexShardSnapshots.PROTO, parseFieldMatcher, isCompress()); + } - /** - * {@inheritDoc} - */ @Override protected void doStart() { this.snapshotsBlobContainer = blobStore().blobContainer(basePath()); - indexShardRepository.initialize(blobStore(), basePath(), chunkSize(), snapshotRateLimiter, restoreRateLimiter, this, isCompress()); ParseFieldMatcher parseFieldMatcher = new ParseFieldMatcher(settings); globalMetaDataFormat = new ChecksumBlobStoreFormat<>(METADATA_CODEC, METADATA_NAME_FORMAT, MetaData.PROTO, parseFieldMatcher, isCompress()); @@ -209,16 +256,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp snapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, SnapshotInfo.PROTO, parseFieldMatcher); } - /** - * {@inheritDoc} - */ @Override - protected void doStop() { - } + protected void doStop() {} - /** - * {@inheritDoc} - */ @Override protected void doClose() { try { @@ -229,18 +269,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } /** - * Returns initialized and ready to use BlobStore - *

- * This method is first called in the {@link #doStart()} method. - * - * @return blob store + * Returns the BlobStore to read and write data. */ - protected abstract BlobStore blobStore(); + protected abstract BlobStore blobStore(); /** * Returns base path of the repository */ - protected abstract BlobPath basePath(); + protected abstract BlobPath basePath(); /** * Returns true if metadata and snapshot files should be compressed @@ -262,9 +298,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp return null; } - /** - * {@inheritDoc} - */ @Override public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { if (readOnly()) { @@ -293,9 +326,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } - /** - * {@inheritDoc} - */ @Override public void deleteSnapshot(SnapshotId snapshotId) { if (readOnly()) { @@ -352,7 +382,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp if (indexMetaData != null) { for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { try { - indexShardRepository.delete(snapshotId, snapshot.version(), new ShardId(indexMetaData.getIndex(), shardId)); + delete(snapshotId, snapshot.version(), new ShardId(indexMetaData.getIndex(), shardId)); } catch (SnapshotException ex) { logger.warn("[{}] failed to delete shard data for shard [{}][{}]", ex, snapshotId, index, shardId); } @@ -365,9 +395,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } - /** - * {@inheritDoc} - */ @Override public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId, final List indices, @@ -397,9 +424,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } - /** - * {@inheritDoc} - */ @Override public List snapshots() { try { @@ -412,9 +436,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } - /** - * {@inheritDoc} - */ @Override public MetaData readSnapshotMetaData(SnapshotInfo snapshot, List indices) throws IOException { return readSnapshotMetaData(snapshot.snapshotId(), snapshot.version(), indices, false); @@ -539,16 +560,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private static final String NAME = "name"; private static final String UUID = "uuid"; - @Override - public void onRestorePause(long nanos) { - restoreRateLimitingTimeInNanos.inc(nanos); - } - - @Override - public void onSnapshotPause(long nanos) { - snapshotRateLimitingTimeInNanos.inc(nanos); - } - @Override public long snapshotThrottleTimeInNanos() { return snapshotRateLimitingTimeInNanos.count(); @@ -785,4 +796,782 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } + @Override + public void snapshot(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { + SnapshotContext snapshotContext = new SnapshotContext(shard, snapshotId, snapshotStatus); + snapshotStatus.startTime(System.currentTimeMillis()); + + try { + snapshotContext.snapshot(snapshotIndexCommit); + snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime()); + snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE); + } catch (Exception e) { + snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime()); + snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FAILURE); + snapshotStatus.failure(ExceptionsHelper.detailedMessage(e)); + if (e instanceof IndexShardSnapshotFailedException) { + throw (IndexShardSnapshotFailedException) e; + } else { + throw new IndexShardSnapshotFailedException(shard.shardId(), e); + } + } + } + + @Override + public void restore(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) { + final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, version, snapshotShardId, recoveryState); + try { + snapshotContext.restore(); + } catch (Exception e) { + throw new IndexShardRestoreFailedException(shard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e); + } + } + + @Override + public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) { + Context context = new Context(snapshotId, version, shardId); + BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot(); + IndexShardSnapshotStatus status = new IndexShardSnapshotStatus(); + status.updateStage(IndexShardSnapshotStatus.Stage.DONE); + status.startTime(snapshot.startTime()); + status.files(snapshot.numberOfFiles(), snapshot.totalSize()); + // The snapshot is done which means the number of processed files is the same as total + status.processedFiles(snapshot.numberOfFiles(), snapshot.totalSize()); + status.time(snapshot.time()); + return status; + } + + @Override + public void verify(String seed, DiscoveryNode localNode) { + BlobContainer testBlobContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed))); + if (testBlobContainer.blobExists("master.dat")) { + try { + testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", new BytesArray(seed)); + } catch (IOException exp) { + throw new RepositoryVerificationException(repositoryName, "store location [" + blobStore() + "] is not accessible on the node [" + localNode + "]", exp); + } + } else { + throw new RepositoryVerificationException(repositoryName, "a file written by master to the store [" + blobStore() + "] cannot be accessed on the node [" + localNode + "]. " + + "This might indicate that the store [" + blobStore() + "] is not shared between this node and the master node or " + + "that permissions on the store don't allow reading files written by the master node"); + } + } + + /** + * Delete shard snapshot + * + * @param snapshotId snapshot id + * @param shardId shard id + */ + public void delete(SnapshotId snapshotId, Version version, ShardId shardId) { + Context context = new Context(snapshotId, version, shardId, shardId); + context.delete(); + } + + @Override + public String toString() { + return "BlobStoreRepository[" + + "[" + repositoryName + + "], [" + blobStore() + ']' + + ']'; + } + + BlobStoreFormat indexShardSnapshotFormat(Version version) { + if (BlobStoreRepository.legacyMetaData(version)) { + return indexShardSnapshotLegacyFormat; + } else { + return indexShardSnapshotFormat; + } + } + + /** + * Context for snapshot/restore operations + */ + private class Context { + + protected final SnapshotId snapshotId; + + protected final ShardId shardId; + + protected final BlobContainer blobContainer; + + protected final Version version; + + public Context(SnapshotId snapshotId, Version version, ShardId shardId) { + this(snapshotId, version, shardId, shardId); + } + + public Context(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId) { + this.snapshotId = snapshotId; + this.version = version; + this.shardId = shardId; + blobContainer = blobStore().blobContainer(basePath().add("indices").add(snapshotShardId.getIndexName()).add(Integer.toString(snapshotShardId.getId()))); + } + + /** + * Delete shard snapshot + */ + public void delete() { + final Map blobs; + try { + blobs = blobContainer.listBlobs(); + } catch (IOException e) { + throw new IndexShardSnapshotException(shardId, "Failed to list content of gateway", e); + } + + Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); + BlobStoreIndexShardSnapshots snapshots = tuple.v1(); + int fileListGeneration = tuple.v2(); + + try { + indexShardSnapshotFormat(version).delete(blobContainer, snapshotId.getName()); + } catch (IOException e) { + logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId); + } + + // Build a list of snapshots that should be preserved + List newSnapshotsList = new ArrayList<>(); + for (SnapshotFiles point : snapshots) { + if (!point.snapshot().equals(snapshotId.getName())) { + newSnapshotsList.add(point); + } + } + // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index + finalize(newSnapshotsList, fileListGeneration + 1, blobs); + } + + /** + * Loads information about shard snapshot + */ + public BlobStoreIndexShardSnapshot loadSnapshot() { + try { + return indexShardSnapshotFormat(version).read(blobContainer, snapshotId.getName()); + } catch (IOException ex) { + throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex); + } + } + + /** + * Removes all unreferenced files from the repository and writes new index file + * + * We need to be really careful in handling index files in case of failures to make sure we have index file that + * points to files that were deleted. + * + * + * @param snapshots list of active snapshots in the container + * @param fileListGeneration the generation number of the snapshot index file + * @param blobs list of blobs in the container + */ + protected void finalize(List snapshots, int fileListGeneration, Map blobs) { + BlobStoreIndexShardSnapshots newSnapshots = new BlobStoreIndexShardSnapshots(snapshots); + List blobsToDelete = new ArrayList<>(); + // delete old index files first + for (String blobName : blobs.keySet()) { + // delete old file lists + if (indexShardSnapshotsFormat.isTempBlobName(blobName) || blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) { + blobsToDelete.add(blobName); + } + } + + try { + blobContainer.deleteBlobs(blobsToDelete); + } catch (IOException e) { + // We cannot delete index file - this is fatal, we cannot continue, otherwise we might end up + // with references to non-existing files + throw new IndexShardSnapshotFailedException(shardId, "error deleting index files during cleanup", e); + } + + blobsToDelete = new ArrayList<>(); + // now go over all the blobs, and if they don't exists in a snapshot, delete them + for (String blobName : blobs.keySet()) { + // delete unused files + if (blobName.startsWith(DATA_BLOB_PREFIX)) { + if (newSnapshots.findNameFile(BlobStoreIndexShardSnapshot.FileInfo.canonicalName(blobName)) == null) { + blobsToDelete.add(blobName); + } + } + } + try { + blobContainer.deleteBlobs(blobsToDelete); + } catch (IOException e) { + logger.debug("[{}] [{}] error deleting some of the blobs [{}] during cleanup", e, snapshotId, shardId, blobsToDelete); + } + + // If we deleted all snapshots - we don't need to create the index file + if (snapshots.size() > 0) { + try { + indexShardSnapshotsFormat.writeAtomic(newSnapshots, blobContainer, Integer.toString(fileListGeneration)); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to write file list", e); + } + } + } + + /** + * Generates blob name + * + * @param generation the blob number + * @return the blob name + */ + protected String fileNameFromGeneration(long generation) { + return DATA_BLOB_PREFIX + Long.toString(generation, Character.MAX_RADIX); + } + + /** + * Finds the next available blob number + * + * @param blobs list of blobs in the repository + * @return next available blob number + */ + protected long findLatestFileNameGeneration(Map blobs) { + long generation = -1; + for (String name : blobs.keySet()) { + if (!name.startsWith(DATA_BLOB_PREFIX)) { + continue; + } + name = BlobStoreIndexShardSnapshot.FileInfo.canonicalName(name); + try { + long currentGen = Long.parseLong(name.substring(DATA_BLOB_PREFIX.length()), Character.MAX_RADIX); + if (currentGen > generation) { + generation = currentGen; + } + } catch (NumberFormatException e) { + logger.warn("file [{}] does not conform to the '{}' schema", name, DATA_BLOB_PREFIX); + } + } + return generation; + } + + /** + * Loads all available snapshots in the repository + * + * @param blobs list of blobs in repository + * @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation + */ + protected Tuple buildBlobStoreIndexShardSnapshots(Map blobs) { + int latest = -1; + for (String name : blobs.keySet()) { + if (name.startsWith(SNAPSHOT_INDEX_PREFIX)) { + try { + int gen = Integer.parseInt(name.substring(SNAPSHOT_INDEX_PREFIX.length())); + if (gen > latest) { + latest = gen; + } + } catch (NumberFormatException ex) { + logger.warn("failed to parse index file name [{}]", name); + } + } + } + if (latest >= 0) { + try { + final BlobStoreIndexShardSnapshots shardSnapshots = + indexShardSnapshotsFormat.read(blobContainer, Integer.toString(latest)); + return new Tuple<>(shardSnapshots, latest); + } catch (IOException e) { + logger.warn("failed to read index file [{}]", e, SNAPSHOT_INDEX_PREFIX + latest); + } + } + + // We couldn't load the index file - falling back to loading individual snapshots + List snapshots = new ArrayList<>(); + for (String name : blobs.keySet()) { + try { + BlobStoreIndexShardSnapshot snapshot = null; + if (name.startsWith(SNAPSHOT_PREFIX)) { + snapshot = indexShardSnapshotFormat.readBlob(blobContainer, name); + } else if (name.startsWith(LEGACY_SNAPSHOT_PREFIX)) { + snapshot = indexShardSnapshotLegacyFormat.readBlob(blobContainer, name); + } + if (snapshot != null) { + snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); + } + } catch (IOException e) { + logger.warn("failed to read commit point [{}]", e, name); + } + } + return new Tuple<>(new BlobStoreIndexShardSnapshots(snapshots), -1); + } + } + + /** + * Context for snapshot operations + */ + private class SnapshotContext extends Context { + + private final Store store; + + private final IndexShardSnapshotStatus snapshotStatus; + + /** + * Constructs new context + * + * @param shard shard to be snapshotted + * @param snapshotId snapshot id + * @param snapshotStatus snapshot status to report progress + */ + public SnapshotContext(IndexShard shard, SnapshotId snapshotId, IndexShardSnapshotStatus snapshotStatus) { + super(snapshotId, Version.CURRENT, shard.shardId()); + this.snapshotStatus = snapshotStatus; + this.store = shard.store(); + } + + /** + * Create snapshot from index commit point + * + * @param snapshotIndexCommit snapshot commit point + */ + public void snapshot(IndexCommit snapshotIndexCommit) { + logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, repositoryName); + store.incRef(); + try { + final Map blobs; + try { + blobs = blobContainer.listBlobs(); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e); + } + + long generation = findLatestFileNameGeneration(blobs); + Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); + BlobStoreIndexShardSnapshots snapshots = tuple.v1(); + int fileListGeneration = tuple.v2(); + + final List indexCommitPointFiles = new ArrayList<>(); + + int indexNumberOfFiles = 0; + long indexTotalFilesSize = 0; + ArrayList filesToSnapshot = new ArrayList<>(); + final Store.MetadataSnapshot metadata; + // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should + final Collection fileNames; + try { + metadata = store.getMetadata(snapshotIndexCommit); + fileNames = snapshotIndexCommit.getFileNames(); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); + } + for (String fileName : fileNames) { + if (snapshotStatus.aborted()) { + logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); + throw new IndexShardSnapshotFailedException(shardId, "Aborted"); + } + logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName); + final StoreFileMetaData md = metadata.get(fileName); + BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null; + List filesInfo = snapshots.findPhysicalIndexFiles(fileName); + if (filesInfo != null) { + for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) { + try { + // in 1.3.3 we added additional hashes for .si / segments_N files + // to ensure we don't double the space in the repo since old snapshots + // don't have this hash we try to read that hash from the blob store + // in a bwc compatible way. + maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata); + } catch (Exception e) { + logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); + } + if (fileInfo.isSame(md) && snapshotFileExistsInBlobs(fileInfo, blobs)) { + // a commit point file with the same name, size and checksum was already copied to repository + // we will reuse it for this snapshot + existingFileInfo = fileInfo; + break; + } + } + } + if (existingFileInfo == null) { + indexNumberOfFiles++; + indexTotalFilesSize += md.length(); + // create a new FileInfo + BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = new BlobStoreIndexShardSnapshot.FileInfo(fileNameFromGeneration(++generation), md, chunkSize()); + indexCommitPointFiles.add(snapshotFileInfo); + filesToSnapshot.add(snapshotFileInfo); + } else { + indexCommitPointFiles.add(existingFileInfo); + } + } + + snapshotStatus.files(indexNumberOfFiles, indexTotalFilesSize); + + if (snapshotStatus.aborted()) { + logger.debug("[{}] [{}] Aborted during initialization", shardId, snapshotId); + throw new IndexShardSnapshotFailedException(shardId, "Aborted"); + } + + snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.STARTED); + + for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { + try { + snapshotFile(snapshotFileInfo); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); + } + } + + snapshotStatus.indexVersion(snapshotIndexCommit.getGeneration()); + // now create and write the commit point + snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE); + + BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), + snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(), + // snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong + System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize); + //TODO: The time stored in snapshot doesn't include cleanup time. + logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); + try { + indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getName()); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); + } + + // delete all files that are not referenced by any commit point + // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones + List newSnapshotsList = new ArrayList<>(); + newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); + for (SnapshotFiles point : snapshots) { + newSnapshotsList.add(point); + } + // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index + finalize(newSnapshotsList, fileListGeneration + 1, blobs); + snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE); + } finally { + store.decRef(); + } + } + + /** + * Snapshot individual file + *

+ * This is asynchronous method. Upon completion of the operation latch is getting counted down and any failures are + * added to the {@code failures} list + * + * @param fileInfo file to be snapshotted + */ + private void snapshotFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) throws IOException { + final String file = fileInfo.physicalName(); + try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) { + for (int i = 0; i < fileInfo.numberOfParts(); i++) { + final long partBytes = fileInfo.partBytes(i); + + final InputStreamIndexInput inputStreamIndexInput = new InputStreamIndexInput(indexInput, partBytes); + InputStream inputStream = inputStreamIndexInput; + if (snapshotRateLimiter != null) { + inputStream = new RateLimitingInputStream(inputStreamIndexInput, snapshotRateLimiter, + snapshotRateLimitingTimeInNanos::inc); + } + inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName()); + blobContainer.writeBlob(fileInfo.partName(i), inputStream, partBytes); + } + Store.verify(indexInput); + snapshotStatus.addProcessedFile(fileInfo.length()); + } catch (Exception t) { + failStoreIfCorrupted(t); + snapshotStatus.addProcessedFile(0); + throw t; + } + } + + private void failStoreIfCorrupted(Exception e) { + if (e instanceof CorruptIndexException || e instanceof IndexFormatTooOldException || e instanceof IndexFormatTooNewException) { + try { + store.markStoreCorrupted((IOException) e); + } catch (IOException inner) { + inner.addSuppressed(e); + logger.warn("store cannot be marked as corrupted", inner); + } + } + } + + /** + * Checks if snapshot file already exists in the list of blobs + * + * @param fileInfo file to check + * @param blobs list of blobs + * @return true if file exists in the list of blobs + */ + private boolean snapshotFileExistsInBlobs(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Map blobs) { + BlobMetaData blobMetaData = blobs.get(fileInfo.name()); + if (blobMetaData != null) { + return blobMetaData.length() == fileInfo.length(); + } else if (blobs.containsKey(fileInfo.partName(0))) { + // multi part file sum up the size and check + int part = 0; + long totalSize = 0; + while (true) { + blobMetaData = blobs.get(fileInfo.partName(part++)); + if (blobMetaData == null) { + break; + } + totalSize += blobMetaData.length(); + } + return totalSize == fileInfo.length(); + } + // no file, not exact and not multipart + return false; + } + + private class AbortableInputStream extends FilterInputStream { + private final String fileName; + + public AbortableInputStream(InputStream delegate, String fileName) { + super(delegate); + this.fileName = fileName; + } + + @Override + public int read() throws IOException { + checkAborted(); + return in.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + checkAborted(); + return in.read(b, off, len); + } + + private void checkAborted() { + if (snapshotStatus.aborted()) { + logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); + throw new IndexShardSnapshotFailedException(shardId, "Aborted"); + } + } + } + } + + /** + * This is a BWC layer to ensure we update the snapshots metadata with the corresponding hashes before we compare them. + * The new logic for StoreFileMetaData reads the entire .si and segments.n files to strengthen the + * comparison of the files on a per-segment / per-commit level. + */ + private static void maybeRecalculateMetadataHash(final BlobContainer blobContainer, final BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store.MetadataSnapshot snapshot) throws Exception { + final StoreFileMetaData metadata; + if (fileInfo != null && (metadata = snapshot.get(fileInfo.physicalName())) != null) { + if (metadata.hash().length > 0 && fileInfo.metadata().hash().length == 0) { + // we have a hash - check if our repo has a hash too otherwise we have + // to calculate it. + // we might have multiple parts even though the file is small... make sure we read all of it. + try (final InputStream stream = new PartSliceStream(blobContainer, fileInfo)) { + BytesRefBuilder builder = new BytesRefBuilder(); + Store.MetadataSnapshot.hashFile(builder, stream, fileInfo.length()); + BytesRef hash = fileInfo.metadata().hash(); // reset the file infos metadata hash + assert hash.length == 0; + hash.bytes = builder.bytes(); + hash.offset = 0; + hash.length = builder.length(); + } + } + } + } + + private static final class PartSliceStream extends SlicedInputStream { + + private final BlobContainer container; + private final BlobStoreIndexShardSnapshot.FileInfo info; + + public PartSliceStream(BlobContainer container, BlobStoreIndexShardSnapshot.FileInfo info) { + super(info.numberOfParts()); + this.info = info; + this.container = container; + } + + @Override + protected InputStream openSlice(long slice) throws IOException { + return container.readBlob(info.partName(slice)); + } + } + + /** + * Context for restore operations + */ + private class RestoreContext extends Context { + + private final Store store; + + private final RecoveryState recoveryState; + + /** + * Constructs new restore context + * + * @param shard shard to restore into + * @param snapshotId snapshot id + * @param snapshotShardId shard in the snapshot that data should be restored from + * @param recoveryState recovery state to report progress + */ + public RestoreContext(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) { + super(snapshotId, version, shard.shardId(), snapshotShardId); + this.recoveryState = recoveryState; + store = shard.store(); + } + + /** + * Performs restore operation + */ + public void restore() throws IOException { + store.incRef(); + try { + logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId); + BlobStoreIndexShardSnapshot snapshot = loadSnapshot(); + + if (snapshot.indexFiles().size() == 1 + && snapshot.indexFiles().get(0).physicalName().startsWith("segments_") + && snapshot.indexFiles().get(0).hasUnknownChecksum()) { + // If the shard has no documents, it will only contain a single segments_N file for the + // shard's snapshot. If we are restoring a snapshot created by a previous supported version, + // it is still possible that in that version, an empty shard has a segments_N file with an unsupported + // version (and no checksum), because we don't know the Lucene version to assign segments_N until we + // have written some data. Since the segments_N for an empty shard could have an incompatible Lucene + // version number and no checksum, even though the index itself is perfectly fine to restore, this + // empty shard would cause exceptions to be thrown. Since there is no data to restore from an empty + // shard anyway, we just create the empty shard here and then exit. + IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(null) + .setOpenMode(IndexWriterConfig.OpenMode.CREATE) + .setCommitOnClose(true)); + writer.close(); + return; + } + + SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); + final Store.MetadataSnapshot recoveryTargetMetadata; + try { + recoveryTargetMetadata = store.getMetadataOrEmpty(); + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) { + logger.warn("{} Can't read metadata from store", e, shardId); + throw new IndexShardRestoreFailedException(shardId, "Can't restore corrupted shard", e); + } + + final List filesToRecover = new ArrayList<>(); + final Map snapshotMetaData = new HashMap<>(); + final Map fileInfos = new HashMap<>(); + for (final BlobStoreIndexShardSnapshot.FileInfo fileInfo : snapshot.indexFiles()) { + try { + // in 1.3.3 we added additional hashes for .si / segments_N files + // to ensure we don't double the space in the repo since old snapshots + // don't have this hash we try to read that hash from the blob store + // in a bwc compatible way. + maybeRecalculateMetadataHash(blobContainer, fileInfo, recoveryTargetMetadata); + } catch (Exception e) { + // if the index is broken we might not be able to read it + logger.warn("{} Can't calculate hash from blog for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); + } + snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata()); + fileInfos.put(fileInfo.metadata().name(), fileInfo); + } + final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(unmodifiableMap(snapshotMetaData), emptyMap(), 0); + final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata); + for (StoreFileMetaData md : diff.identical) { + BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name()); + recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), true); + if (logger.isTraceEnabled()) { + logger.trace("[{}] [{}] not_recovering [{}] from [{}], exists in local store and is same", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name()); + } + } + + for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) { + BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name()); + filesToRecover.add(fileInfo); + recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), false); + if (logger.isTraceEnabled()) { + if (md == null) { + logger.trace("[{}] [{}] recovering [{}] from [{}], does not exists in local store", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name()); + } else { + logger.trace("[{}] [{}] recovering [{}] from [{}], exists in local store but is different", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name()); + } + } + } + final RecoveryState.Index index = recoveryState.getIndex(); + if (filesToRecover.isEmpty()) { + logger.trace("no files to recover, all exists within the local store"); + } + + if (logger.isTraceEnabled()) { + logger.trace("[{}] [{}] recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", shardId, snapshotId, + index.totalRecoverFiles(), new ByteSizeValue(index.totalRecoverBytes()), index.reusedFileCount(), new ByteSizeValue(index.reusedFileCount())); + } + try { + for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { + logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); + restoreFile(fileToRecover); + } + } catch (IOException ex) { + throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex); + } + final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile(); + if (recoveryTargetMetadata == null) { + throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file"); + } + assert restoredSegmentsFile != null; + // read the snapshot data persisted + final SegmentInfos segmentCommitInfos; + try { + segmentCommitInfos = Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory()); + } catch (IOException e) { + throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); + } + recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion()); + + /// now, go over and clean files that are in the store, but were not in the snapshot + try { + for (String storeFile : store.directory().listAll()) { + if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) { + continue; //skip write.lock, checksum files and files that exist in the snapshot + } + try { + store.deleteQuiet("restore", storeFile); + store.directory().deleteFile(storeFile); + } catch (IOException e) { + logger.warn("[{}] failed to delete file [{}] during snapshot cleanup", snapshotId, storeFile); + } + } + } catch (IOException e) { + logger.warn("[{}] failed to list directory - some of files might not be deleted", snapshotId); + } + } finally { + store.decRef(); + } + } + + /** + * Restores a file + * This is asynchronous method. Upon completion of the operation latch is getting counted down and any failures are + * added to the {@code failures} list + * + * @param fileInfo file to be restored + */ + private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) throws IOException { + boolean success = false; + + try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) { + final InputStream stream; + if (restoreRateLimiter == null) { + stream = partSliceStream; + } else { + stream = new RateLimitingInputStream(partSliceStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc); + } + try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { + final byte[] buffer = new byte[BUFFER_SIZE]; + int length; + while ((length = stream.read(buffer)) > 0) { + indexOutput.writeBytes(buffer, 0, length); + recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length); + } + Store.verify(indexOutput); + indexOutput.close(); + store.directory().sync(Collections.singleton(fileInfo.physicalName())); + success = true; + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { + try { + store.markStoreCorrupted(ex); + } catch (IOException e) { + logger.warn("store cannot be marked as corrupted", e); + } + throw ex; + } finally { + if (success == false) { + store.deleteQuiet(fileInfo.physicalName()); + } + } + } + } + + } } diff --git a/core/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java b/core/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java index 4e57c431f14..5be7115a25b 100644 --- a/core/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.env.Environment; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositorySettings; @@ -77,11 +76,10 @@ public class FsRepository extends BlobStoreRepository { * * @param name repository name * @param repositorySettings repository settings - * @param indexShardRepository index shard repository */ @Inject - public FsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, Environment environment) throws IOException { - super(name.getName(), repositorySettings, indexShardRepository); + public FsRepository(RepositoryName name, RepositorySettings repositorySettings, Environment environment) throws IOException { + super(name.getName(), repositorySettings); Path locationFile; String location = REPOSITORIES_LOCATION_SETTING.get(repositorySettings.settings()); if (location.isEmpty()) { diff --git a/core/src/main/java/org/elasticsearch/repositories/uri/URLIndexShardRepository.java b/core/src/main/java/org/elasticsearch/repositories/uri/URLIndexShardRepository.java deleted file mode 100644 index 616a36d5066..00000000000 --- a/core/src/main/java/org/elasticsearch/repositories/uri/URLIndexShardRepository.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.repositories.uri; - -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.repositories.RepositoryName; - -/** - */ -public class URLIndexShardRepository extends BlobStoreIndexShardRepository { - - @Inject - public URLIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) { - super(settings, repositoryName, indicesService, clusterService); - } - - - @Override - public void verify(String seed) { - //TODO: Add verification that URL is accessible - } -} diff --git a/core/src/main/java/org/elasticsearch/repositories/uri/URLRepository.java b/core/src/main/java/org/elasticsearch/repositories/uri/URLRepository.java index 0eb3006c5c3..f08faa2c0d6 100644 --- a/core/src/main/java/org/elasticsearch/repositories/uri/URLRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/uri/URLRepository.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.util.URIPattern; import org.elasticsearch.env.Environment; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositorySettings; @@ -82,11 +81,10 @@ public class URLRepository extends BlobStoreRepository { * * @param name repository name * @param repositorySettings repository settings - * @param indexShardRepository shard repository */ @Inject - public URLRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, Environment environment) throws IOException { - super(name.getName(), repositorySettings, indexShardRepository); + public URLRepository(RepositoryName name, RepositorySettings repositorySettings, Environment environment) throws IOException { + super(name.getName(), repositorySettings); if (URL_SETTING.exists(repositorySettings.settings()) == false && REPOSITORIES_URL_SETTING.exists(settings) == false) { throw new RepositoryException(name.name(), "missing url"); @@ -101,9 +99,6 @@ public class URLRepository extends BlobStoreRepository { basePath = BlobPath.cleanPath(); } - /** - * {@inheritDoc} - */ @Override protected BlobStore blobStore() { return blobStore; diff --git a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 53fb6f4781e..67a5ae361bb 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -63,7 +63,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.threadpool.ThreadPool; @@ -112,7 +111,7 @@ import static org.elasticsearch.common.util.set.Sets.newHashSet; * method. *

* Individual shards are getting restored as part of normal recovery process in - * {@link IndexShard#restoreFromRepository(IndexShardRepository)} )} + * {@link IndexShard#restoreFromRepository(Repository)} )} * method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking * at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property. *

diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 91db508b7e3..b24627bb634 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -43,10 +43,10 @@ import org.elasticsearch.index.engine.SnapshotFailedEngineException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.repositories.Repository; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.TransportChannel; @@ -322,7 +322,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements * @param snapshotStatus snapshot status */ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexShardSnapshotStatus snapshotStatus) { - IndexShardRepository indexShardRepository = snapshotsService.getRepositoriesService().indexShardRepository(snapshot.getRepository()); + Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository()); ShardId shardId = indexShard.shardId(); if (!indexShard.routingEntry().primary()) { throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary"); @@ -340,11 +340,11 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements // we flush first to make sure we get the latest writes snapshotted IndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true); try { - indexShardRepository.snapshot(snapshot.getSnapshotId(), shardId, snapshotIndexCommit, snapshotStatus); + repository.snapshot(indexShard, snapshot.getSnapshotId(), snapshotIndexCommit, snapshotStatus); if (logger.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n"); - logger.debug("snapshot ({}) completed to {}, took [{}]\n{}", snapshot, indexShardRepository, + logger.debug("snapshot ({}) completed to {}, took [{}]\n{}", snapshot, repository, TimeValue.timeValueMillis(snapshotStatus.time()), sb); } } finally { diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 5f5ac65e38e..8d2407dd2e4 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -55,7 +55,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -547,7 +546,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus final SnapshotInfo snapshotInfo) throws IOException { Map shardStatus = new HashMap<>(); Repository repository = repositoriesService.repository(repositoryName); - IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(repositoryName); MetaData metaData = repository.readSnapshotMetaData(snapshotInfo, snapshotInfo.indices()); for (String index : snapshotInfo.indices()) { IndexMetaData indexMetaData = metaData.indices().get(index); @@ -563,7 +561,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus shardStatus.put(shardId, shardSnapshotStatus); } else { IndexShardSnapshotStatus shardSnapshotStatus = - indexShardRepository.snapshotStatus(snapshotInfo.snapshotId(), snapshotInfo.version(), shardId); + repository.snapshotStatus(snapshotInfo.snapshotId(), snapshotInfo.version(), shardId); shardStatus.put(shardId, shardSnapshotStatus); } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index a0813fb572f..acec53da24b 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -18,6 +18,27 @@ */ package org.elasticsearch.index.shard; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; + import org.apache.lucene.document.Field; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.CorruptIndexException; @@ -50,6 +71,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.RestoreSource; @@ -63,6 +85,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; @@ -91,15 +114,17 @@ import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.internal.UidFieldMapper; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.Repository; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotShardFailure; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.FieldMaskingReader; @@ -108,27 +133,6 @@ import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.ThreadPool; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; - import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; @@ -1178,13 +1182,9 @@ public class IndexShardTests extends ESSingleNodeTestCase { test_target_shard.updateRoutingEntry(routing); DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); test_target_shard.markAsRecovering("store", new RecoveryState(routing.shardId(), routing.primary(), RecoveryState.Type.SNAPSHOT, routing.restoreSource(), localNode)); - assertTrue(test_target_shard.restoreFromRepository(new IndexShardRepository() { + assertTrue(test_target_shard.restoreFromRepository(new RestoreOnlyRepository() { @Override - public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { - } - - @Override - public void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) { + public void restore(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) { try { cleanLuceneIndex(targetStore.directory()); for (String file : sourceStore.directory().listAll()) { @@ -1197,15 +1197,6 @@ public class IndexShardTests extends ESSingleNodeTestCase { throw new RuntimeException(ex); } } - - @Override - public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) { - return null; - } - - @Override - public void verify(String verificationToken) { - } })); test_target_shard.updateRoutingEntry(routing.moveToStarted()); @@ -1649,4 +1640,63 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertEquals(mappings.get("index_1").get("test").get().source().string(), "{\"test\":{\"properties\":{\"foo\":{\"type\":\"text\"}}}}"); } + + /** A dummy repository for testing which just needs restore overridden */ + private static abstract class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository { + public RestoreOnlyRepository() { + super(Settings.EMPTY); + } + @Override + protected void doStart() {} + @Override + protected void doStop() {} + @Override + protected void doClose() {} + @Override + public SnapshotInfo readSnapshot(SnapshotId snapshotId) { + return null; + } + @Override + public MetaData readSnapshotMetaData(SnapshotInfo snapshot, List indices) throws IOException { + return null; + } + @Override + public List snapshots() { + return null; + } + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) {} + @Override + public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures) { + return null; + } + @Override + public void deleteSnapshot(SnapshotId snapshotId) {} + @Override + public long snapshotThrottleTimeInNanos() { + return 0; + } + @Override + public long restoreThrottleTimeInNanos() { + return 0; + } + @Override + public String startVerification() { + return null; + } + @Override + public void endVerification(String verificationToken) {} + @Override + public boolean readOnly() { + return false; + } + @Override + public void snapshot(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {} + @Override + public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) { + return null; + } + @Override + public void verify(String verificationToken, DiscoveryNode localNode) {} + } } diff --git a/core/src/test/java/org/elasticsearch/snapshots/RepositoriesIT.java b/core/src/test/java/org/elasticsearch/snapshots/RepositoriesIT.java index 48a1cc6081e..639b60d6d09 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/RepositoriesIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/RepositoriesIT.java @@ -44,8 +44,6 @@ import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; -/** - */ @ESIntegTestCase.ClusterScope(minNumDataNodes = 2) public class RepositoriesIT extends AbstractSnapshotIntegTestCase { public void testRepositoryCreation() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index bb6f5cc4f74..762be7a3684 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -19,29 +19,6 @@ package org.elasticsearch.snapshots.mockstore; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.blobstore.BlobMetaData; -import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.PathUtils; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Setting.Property; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.settings.SettingsModule; -import org.elasticsearch.env.Environment; -import org.elasticsearch.index.snapshots.IndexShardRepository; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; -import org.elasticsearch.repositories.RepositoriesModule; -import org.elasticsearch.repositories.RepositoryName; -import org.elasticsearch.repositories.RepositorySettings; -import org.elasticsearch.repositories.fs.FsRepository; - import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; @@ -55,6 +32,26 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.repositories.RepositoriesModule; +import org.elasticsearch.repositories.RepositoryName; +import org.elasticsearch.repositories.RepositorySettings; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.snapshots.SnapshotId; + public class MockRepository extends FsRepository { public static class Plugin extends org.elasticsearch.plugins.Plugin { @@ -64,7 +61,7 @@ public class MockRepository extends FsRepository { Setting.simpleString("secret.mock.password", Property.NodeScope, Property.Filtered); public void onModule(RepositoriesModule repositoriesModule) { - repositoriesModule.registerRepository("mock", MockRepository.class, BlobStoreIndexShardRepository.class); + repositoriesModule.registerRepository("mock", MockRepository.class); } @Override @@ -100,8 +97,8 @@ public class MockRepository extends FsRepository { private volatile boolean blocked = false; @Inject - public MockRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, ClusterService clusterService, Environment environment) throws IOException { - super(name, overrideSettings(repositorySettings, clusterService), indexShardRepository, environment); + public MockRepository(RepositoryName name, RepositorySettings repositorySettings, ClusterService clusterService, Environment environment) throws IOException { + super(name, overrideSettings(repositorySettings, clusterService), environment); randomControlIOExceptionRate = repositorySettings.settings().getAsDouble("random_control_io_exception_rate", 0.0); randomDataFileIOExceptionRate = repositorySettings.settings().getAsDouble("random_data_file_io_exception_rate", 0.0); maximumNumberOfFailures = repositorySettings.settings().getAsLong("max_failure_number", 100L); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/plugin/repository/azure/AzureRepositoryPlugin.java b/plugins/repository-azure/src/main/java/org/elasticsearch/plugin/repository/azure/AzureRepositoryPlugin.java index b04b613df21..e7701a828ac 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/plugin/repository/azure/AzureRepositoryPlugin.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/plugin/repository/azure/AzureRepositoryPlugin.java @@ -19,6 +19,11 @@ package org.elasticsearch.plugin.repository.azure; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + import org.elasticsearch.cloud.azure.AzureRepositoryModule; import org.elasticsearch.cloud.azure.storage.AzureStorageService; import org.elasticsearch.common.inject.Module; @@ -26,16 +31,10 @@ import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.RepositoriesModule; import org.elasticsearch.repositories.azure.AzureRepository; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - /** * */ @@ -56,7 +55,7 @@ public class AzureRepositoryPlugin extends Plugin { public void onModule(RepositoriesModule module) { logger.debug("registering repository type [{}]", AzureRepository.TYPE); - module.registerRepository(AzureRepository.TYPE, AzureRepository.class, BlobStoreIndexShardRepository.class); + module.registerRepository(AzureRepository.TYPE, AzureRepository.class); } @Override diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index 1333c755e7c..47ec4128c7c 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositorySettings; import org.elasticsearch.repositories.RepositoryVerificationException; @@ -86,9 +85,8 @@ public class AzureRepository extends BlobStoreRepository { @Inject public AzureRepository(RepositoryName name, RepositorySettings repositorySettings, - IndexShardRepository indexShardRepository, AzureBlobStore azureBlobStore) throws IOException, URISyntaxException, StorageException { - super(name.getName(), repositorySettings, indexShardRepository); + super(name.getName(), repositorySettings); String container = getValue(repositorySettings, Repository.CONTAINER_SETTING, Storage.CONTAINER_SETTING); diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/plugin/repository/gcs/GoogleCloudStoragePlugin.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/plugin/repository/gcs/GoogleCloudStoragePlugin.java index 77d8e23c9e5..33022ff449e 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/plugin/repository/gcs/GoogleCloudStoragePlugin.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/plugin/repository/gcs/GoogleCloudStoragePlugin.java @@ -19,6 +19,11 @@ package org.elasticsearch.plugin.repository.gcs; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Collection; +import java.util.Collections; + import com.google.api.client.auth.oauth2.TokenRequest; import com.google.api.client.auth.oauth2.TokenResponse; import com.google.api.client.googleapis.json.GoogleJsonError; @@ -35,16 +40,10 @@ import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; import org.elasticsearch.SpecialPermission; import org.elasticsearch.common.inject.Module; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.RepositoriesModule; import org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.Collection; -import java.util.Collections; - public class GoogleCloudStoragePlugin extends Plugin { public static final String NAME = "repository-gcs"; @@ -115,7 +114,6 @@ public class GoogleCloudStoragePlugin extends Plugin { } public void onModule(RepositoriesModule repositoriesModule) { - repositoriesModule.registerRepository(GoogleCloudStorageRepository.TYPE, - GoogleCloudStorageRepository.class, BlobStoreIndexShardRepository.class); + repositoriesModule.registerRepository(GoogleCloudStorageRepository.TYPE, GoogleCloudStorageRepository.class); } } diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index 3616e18e83d..df2f054fa91 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.plugin.repository.gcs.GoogleCloudStoragePlugin; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryName; @@ -75,9 +74,8 @@ public class GoogleCloudStorageRepository extends BlobStoreRepository { @Inject public GoogleCloudStorageRepository(RepositoryName repositoryName, RepositorySettings repositorySettings, - IndexShardRepository indexShardRepository, GoogleCloudStorageService storageService) throws Exception { - super(repositoryName.getName(), repositorySettings, indexShardRepository); + super(repositoryName.getName(), repositorySettings); String bucket = get(BUCKET, repositoryName, repositorySettings); String application = get(APPLICATION_NAME, repositoryName, repositorySettings); diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java index ab539aeea1c..3e8f484e3e7 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java @@ -26,7 +26,6 @@ import java.security.PrivilegedAction; import org.elasticsearch.SpecialPermission; import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.RepositoriesModule; @@ -85,6 +84,6 @@ public final class HdfsPlugin extends Plugin { } public void onModule(RepositoriesModule repositoriesModule) { - repositoriesModule.registerRepository("hdfs", HdfsRepository.class, BlobStoreIndexShardRepository.class); + repositoriesModule.registerRepository("hdfs", HdfsRepository.class); } } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java index 1e8e267bd41..de8fb155abb 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java @@ -44,7 +44,6 @@ import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositorySettings; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; @@ -63,8 +62,8 @@ public final class HdfsRepository extends BlobStoreRepository { private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB); @Inject - public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException { - super(name.getName(), repositorySettings, indexShardRepository); + public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings) throws IOException { + super(name.getName(), repositorySettings); this.repositorySettings = repositorySettings; this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", null); diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/plugin/repository/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/elasticsearch/plugin/repository/s3/S3RepositoryPlugin.java index 7d52a067b65..fc55e1b0ff1 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/plugin/repository/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/plugin/repository/s3/S3RepositoryPlugin.java @@ -19,18 +19,6 @@ package org.elasticsearch.plugin.repository.s3; -import org.elasticsearch.SpecialPermission; -import org.elasticsearch.cloud.aws.AwsS3Service; -import org.elasticsearch.cloud.aws.S3Module; -import org.elasticsearch.common.component.LifecycleComponent; -import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.SettingsModule; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.repositories.RepositoriesModule; -import org.elasticsearch.repositories.s3.S3Repository; - import java.security.AccessController; import java.security.PrivilegedAction; import java.util.ArrayList; @@ -39,6 +27,16 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import org.elasticsearch.SpecialPermission; +import org.elasticsearch.cloud.aws.AwsS3Service; +import org.elasticsearch.cloud.aws.S3Module; +import org.elasticsearch.common.component.LifecycleComponent; +import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.RepositoriesModule; +import org.elasticsearch.repositories.s3.S3Repository; + /** * */ @@ -78,7 +76,7 @@ public class S3RepositoryPlugin extends Plugin { } public void onModule(RepositoriesModule repositoriesModule) { - repositoriesModule.registerRepository(S3Repository.TYPE, S3Repository.class, BlobStoreIndexShardRepository.class); + repositoriesModule.registerRepository(S3Repository.TYPE, S3Repository.class); } @Override diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index e3b7e8296e0..182612041f1 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -32,7 +32,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositorySettings; @@ -250,12 +249,11 @@ public class S3Repository extends BlobStoreRepository { * * @param name repository name * @param repositorySettings repository settings - * @param indexShardRepository index shard repository * @param s3Service S3 service */ @Inject - public S3Repository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AwsS3Service s3Service) throws IOException { - super(name.getName(), repositorySettings, indexShardRepository); + public S3Repository(RepositoryName name, RepositorySettings repositorySettings, AwsS3Service s3Service) throws IOException { + super(name.getName(), repositorySettings); String bucket = getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING); if (bucket == null) { diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/cloud/aws/RepositoryS3SettingsTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/cloud/aws/RepositoryS3SettingsTests.java index 4cb8e4d3abb..c11cb969570 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/cloud/aws/RepositoryS3SettingsTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/cloud/aws/RepositoryS3SettingsTests.java @@ -336,7 +336,7 @@ public class RepositoryS3SettingsTests extends ESTestCase { .build()); try { - new S3Repository(new RepositoryName("s3", "s3repo"), s3RepositorySettings, null, null); + new S3Repository(new RepositoryName("s3", "s3repo"), s3RepositorySettings, null); fail("We should either raise a NPE or a RepositoryException or a IllegalArgumentException"); } catch (RepositoryException e) { assertThat(e.getDetailedMessage(), containsString(expectedMessage));