Plugins: Simplified repository api for snapshot/restore

The api for snapshot/restore was split up between two interfaces,
Repository and IndexShardRepository. There was also complex
initialization and injection between the two. However, there is always a
one to one relationship between the two.

This change moves the IndexShardRepository api into Repository, as well
as updates the API so as not to require any services to be injected for
sublcasses.
This commit is contained in:
Ryan Ernst 2016-07-05 22:12:24 -07:00
parent 76057e6b05
commit dd7be74bcf
30 changed files with 1087 additions and 1451 deletions

View File

@ -94,7 +94,6 @@ import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.Store.MetadataSnapshot;
import org.elasticsearch.index.store.StoreFileMetaData;
@ -111,6 +110,7 @@ import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTargetService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.search.suggest.completion.CompletionFieldStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.search.suggest.completion2x.Completion090PostingsFormat;
@ -1161,7 +1161,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return storeRecovery.recoverFromStore(this, shouldExist);
}
public boolean restoreFromRepository(IndexShardRepository repository) {
public boolean restoreFromRepository(Repository repository) {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromRepository(this, repository);
@ -1449,9 +1449,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(
final Repository repository = repositoriesService.repository(
recoveryState.getRestoreSource().snapshot().getRepository());
if (restoreFromRepository(indexShardRepository)) {
if (restoreFromRepository(repository)) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Exception first) {

View File

@ -40,10 +40,10 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.Repository;
import java.io.IOException;
import java.util.Arrays;
@ -219,14 +219,14 @@ final class StoreRecovery {
}
/**
* Recovers an index from a given {@link IndexShardRepository}. This method restores a
* Recovers an index from a given {@link Repository}. This method restores a
* previously created index snapshot into an existing initializing shard.
* @param indexShard the index shard instance to recovery the snapshot from
* @param repository the repository holding the physical files the shard should be recovered from
* @return <code>true</code> if the shard has been recovered successfully, <code>false</code> if the recovery
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
*/
boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository) {
boolean recoverFromRepository(final IndexShard indexShard, Repository repository) {
if (canRecover(indexShard)) {
final ShardRouting shardRouting = indexShard.routingEntry();
if (shardRouting.restoreSource() == null) {
@ -380,7 +380,7 @@ final class StoreRecovery {
/**
* Restores shard from {@link RestoreSource} associated with this shard in routing table
*/
private void restore(final IndexShard indexShard, final IndexShardRepository indexShardRepository) {
private void restore(final IndexShard indexShard, final Repository repository) {
RestoreSource restoreSource = indexShard.routingEntry().restoreSource();
final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog();
if (restoreSource == null) {
@ -397,7 +397,7 @@ final class StoreRecovery {
if (!shardId.getIndexName().equals(restoreSource.index())) {
snapshotShardId = new ShardId(restoreSource.index(), IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
}
indexShardRepository.restore(restoreSource.snapshot().getSnapshotId(), restoreSource.version(), shardId, snapshotShardId, indexShard.recoveryState());
repository.restore(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), snapshotShardId, indexShard.recoveryState());
indexShard.skipTranslogRecovery();
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");

View File

@ -1,81 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.snapshots;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoveryState;
/**
* Shard-level snapshot repository
* <p>
* IndexShardRepository is used on data node to create snapshots of individual shards. See {@link org.elasticsearch.repositories.Repository}
* for more information.
*/
public interface IndexShardRepository {
/**
* Creates a snapshot of the shard based on the index commit point.
* <p>
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex} method.
* IndexShardRepository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
* {@link IndexShardSnapshotStatus#aborted()} to see if the snapshot process should be aborted.
*
* @param snapshotId snapshot id
* @param shardId shard to be snapshotted
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
*/
void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
/**
* Restores snapshot of the shard.
* <p>
* The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied.
*
* @param snapshotId snapshot id
* @param shardId shard id (in the current index)
* @param version version of elasticsearch that created this snapshot
* @param snapshotShardId shard id (in the snapshot)
* @param recoveryState recovery state
*/
void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState);
/**
* Retrieve shard snapshot status for the stored snapshot
*
* @param snapshotId snapshot id
* @param version version of elasticsearch that created this snapshot
* @param shardId shard id
* @return snapshot status
*/
IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId);
/**
* Verifies repository settings on data node
* @param verificationToken value returned by {@link org.elasticsearch.repositories.Repository#startVerification()}
*/
void verify(String verificationToken);
}

View File

@ -1,972 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.snapshots.blobstore;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.repositories.blobstore.BlobStoreFormat;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.elasticsearch.repositories.blobstore.LegacyBlobStoreFormat;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.testBlobPrefix;
/**
* Blob store based implementation of IndexShardRepository
*/
public class BlobStoreIndexShardRepository extends AbstractComponent implements IndexShardRepository {
private static final int BUFFER_SIZE = 4096;
private BlobStore blobStore;
private BlobPath basePath;
private final String repositoryName;
private ByteSizeValue chunkSize;
private final IndicesService indicesService;
private final ClusterService clusterService;
private RateLimiter snapshotRateLimiter;
private RateLimiter restoreRateLimiter;
private RateLimitingInputStream.Listener snapshotThrottleListener;
private RateLimitingInputStream.Listener restoreThrottleListener;
private boolean compress;
private final ParseFieldMatcher parseFieldMatcher;
protected static final String LEGACY_SNAPSHOT_PREFIX = "snapshot-";
protected static final String LEGACY_SNAPSHOT_NAME_FORMAT = LEGACY_SNAPSHOT_PREFIX + "%s";
protected static final String SNAPSHOT_PREFIX = "snap-";
protected static final String SNAPSHOT_NAME_FORMAT = SNAPSHOT_PREFIX + "%s.dat";
protected static final String SNAPSHOT_CODEC = "snapshot";
protected static final String SNAPSHOT_INDEX_PREFIX = "index-";
protected static final String SNAPSHOT_INDEX_NAME_FORMAT = SNAPSHOT_INDEX_PREFIX + "%s";
protected static final String SNAPSHOT_INDEX_CODEC = "snapshots";
protected static final String DATA_BLOB_PREFIX = "__";
private ChecksumBlobStoreFormat<BlobStoreIndexShardSnapshot> indexShardSnapshotFormat;
private LegacyBlobStoreFormat<BlobStoreIndexShardSnapshot> indexShardSnapshotLegacyFormat;
private ChecksumBlobStoreFormat<BlobStoreIndexShardSnapshots> indexShardSnapshotsFormat;
@Inject
public BlobStoreIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) {
super(settings);
this.parseFieldMatcher = new ParseFieldMatcher(settings);
this.repositoryName = repositoryName.name();
this.indicesService = indicesService;
this.clusterService = clusterService;
}
/**
* Called by {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository} on repository startup
*
* @param blobStore blob store
* @param basePath base path to blob store
* @param chunkSize chunk size
*/
public void initialize(BlobStore blobStore, BlobPath basePath, ByteSizeValue chunkSize,
RateLimiter snapshotRateLimiter, RateLimiter restoreRateLimiter,
final RateLimiterListener rateLimiterListener, boolean compress) {
this.blobStore = blobStore;
this.basePath = basePath;
this.chunkSize = chunkSize;
this.snapshotRateLimiter = snapshotRateLimiter;
this.restoreRateLimiter = restoreRateLimiter;
this.snapshotThrottleListener = nanos -> rateLimiterListener.onSnapshotPause(nanos);
this.restoreThrottleListener = nanos -> rateLimiterListener.onRestorePause(nanos);
this.compress = compress;
indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress());
indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher);
indexShardSnapshotsFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_INDEX_CODEC, SNAPSHOT_INDEX_NAME_FORMAT, BlobStoreIndexShardSnapshots.PROTO, parseFieldMatcher, isCompress());
}
/**
* {@inheritDoc}
*/
@Override
public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
SnapshotContext snapshotContext = new SnapshotContext(snapshotId, shardId, snapshotStatus);
snapshotStatus.startTime(System.currentTimeMillis());
try {
snapshotContext.snapshot(snapshotIndexCommit);
snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime());
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE);
} catch (Exception e) {
snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime());
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FAILURE);
snapshotStatus.failure(ExceptionsHelper.detailedMessage(e));
if (e instanceof IndexShardSnapshotFailedException) {
throw (IndexShardSnapshotFailedException) e;
} else {
throw new IndexShardSnapshotFailedException(shardId, e);
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
final RestoreContext snapshotContext = new RestoreContext(snapshotId, version, shardId, snapshotShardId, recoveryState);
try {
snapshotContext.restore();
} catch (Exception e) {
throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e);
}
}
/**
* {@inheritDoc}
*/
@Override
public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) {
Context context = new Context(snapshotId, version, shardId);
BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
IndexShardSnapshotStatus status = new IndexShardSnapshotStatus();
status.updateStage(IndexShardSnapshotStatus.Stage.DONE);
status.startTime(snapshot.startTime());
status.files(snapshot.numberOfFiles(), snapshot.totalSize());
// The snapshot is done which means the number of processed files is the same as total
status.processedFiles(snapshot.numberOfFiles(), snapshot.totalSize());
status.time(snapshot.time());
return status;
}
@Override
public void verify(String seed) {
BlobContainer testBlobContainer = blobStore.blobContainer(basePath.add(testBlobPrefix(seed)));
DiscoveryNode localNode = clusterService.localNode();
if (testBlobContainer.blobExists("master.dat")) {
try {
testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", new BytesArray(seed));
} catch (IOException exp) {
throw new RepositoryVerificationException(repositoryName, "store location [" + blobStore + "] is not accessible on the node [" + localNode + "]", exp);
}
} else {
throw new RepositoryVerificationException(repositoryName, "a file written by master to the store [" + blobStore + "] cannot be accessed on the node [" + localNode + "]. "
+ "This might indicate that the store [" + blobStore + "] is not shared between this node and the master node or "
+ "that permissions on the store don't allow reading files written by the master node");
}
}
/**
* Delete shard snapshot
*
* @param snapshotId snapshot id
* @param shardId shard id
*/
public void delete(SnapshotId snapshotId, Version version, ShardId shardId) {
Context context = new Context(snapshotId, version, shardId, shardId);
context.delete();
}
@Override
public String toString() {
return "BlobStoreIndexShardRepository[" +
"[" + repositoryName +
"], [" + blobStore + ']' +
']';
}
/**
* Returns true if metadata files should be compressed
*
* @return true if compression is needed
*/
protected boolean isCompress() {
return compress;
}
BlobStoreFormat<BlobStoreIndexShardSnapshot> indexShardSnapshotFormat(Version version) {
if (BlobStoreRepository.legacyMetaData(version)) {
return indexShardSnapshotLegacyFormat;
} else {
return indexShardSnapshotFormat;
}
}
/**
* Context for snapshot/restore operations
*/
private class Context {
protected final SnapshotId snapshotId;
protected final ShardId shardId;
protected final BlobContainer blobContainer;
protected final Version version;
public Context(SnapshotId snapshotId, Version version, ShardId shardId) {
this(snapshotId, version, shardId, shardId);
}
public Context(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId) {
this.snapshotId = snapshotId;
this.version = version;
this.shardId = shardId;
blobContainer = blobStore.blobContainer(basePath.add("indices").add(snapshotShardId.getIndexName()).add(Integer.toString(snapshotShardId.getId())));
}
/**
* Delete shard snapshot
*/
public void delete() {
final Map<String, BlobMetaData> blobs;
try {
blobs = blobContainer.listBlobs();
} catch (IOException e) {
throw new IndexShardSnapshotException(shardId, "Failed to list content of gateway", e);
}
Tuple<BlobStoreIndexShardSnapshots, Integer> tuple = buildBlobStoreIndexShardSnapshots(blobs);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
int fileListGeneration = tuple.v2();
try {
indexShardSnapshotFormat(version).delete(blobContainer, snapshotId.getName());
} catch (IOException e) {
logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId);
}
// Build a list of snapshots that should be preserved
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
for (SnapshotFiles point : snapshots) {
if (!point.snapshot().equals(snapshotId.getName())) {
newSnapshotsList.add(point);
}
}
// finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index
finalize(newSnapshotsList, fileListGeneration + 1, blobs);
}
/**
* Loads information about shard snapshot
*/
public BlobStoreIndexShardSnapshot loadSnapshot() {
try {
return indexShardSnapshotFormat(version).read(blobContainer, snapshotId.getName());
} catch (IOException ex) {
throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex);
}
}
/**
* Removes all unreferenced files from the repository and writes new index file
*
* We need to be really careful in handling index files in case of failures to make sure we have index file that
* points to files that were deleted.
*
*
* @param snapshots list of active snapshots in the container
* @param fileListGeneration the generation number of the snapshot index file
* @param blobs list of blobs in the container
*/
protected void finalize(List<SnapshotFiles> snapshots, int fileListGeneration, Map<String, BlobMetaData> blobs) {
BlobStoreIndexShardSnapshots newSnapshots = new BlobStoreIndexShardSnapshots(snapshots);
List<String> blobsToDelete = new ArrayList<>();
// delete old index files first
for (String blobName : blobs.keySet()) {
// delete old file lists
if (indexShardSnapshotsFormat.isTempBlobName(blobName) || blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) {
blobsToDelete.add(blobName);
}
}
try {
blobContainer.deleteBlobs(blobsToDelete);
} catch (IOException e) {
// We cannot delete index file - this is fatal, we cannot continue, otherwise we might end up
// with references to non-existing files
throw new IndexShardSnapshotFailedException(shardId, "error deleting index files during cleanup", e);
}
blobsToDelete = new ArrayList<>();
// now go over all the blobs, and if they don't exists in a snapshot, delete them
for (String blobName : blobs.keySet()) {
// delete unused files
if (blobName.startsWith(DATA_BLOB_PREFIX)) {
if (newSnapshots.findNameFile(FileInfo.canonicalName(blobName)) == null) {
blobsToDelete.add(blobName);
}
}
}
try {
blobContainer.deleteBlobs(blobsToDelete);
} catch (IOException e) {
logger.debug("[{}] [{}] error deleting some of the blobs [{}] during cleanup", e, snapshotId, shardId, blobsToDelete);
}
// If we deleted all snapshots - we don't need to create the index file
if (snapshots.size() > 0) {
try {
indexShardSnapshotsFormat.writeAtomic(newSnapshots, blobContainer, Integer.toString(fileListGeneration));
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write file list", e);
}
}
}
/**
* Generates blob name
*
* @param generation the blob number
* @return the blob name
*/
protected String fileNameFromGeneration(long generation) {
return DATA_BLOB_PREFIX + Long.toString(generation, Character.MAX_RADIX);
}
/**
* Finds the next available blob number
*
* @param blobs list of blobs in the repository
* @return next available blob number
*/
protected long findLatestFileNameGeneration(Map<String, BlobMetaData> blobs) {
long generation = -1;
for (String name : blobs.keySet()) {
if (!name.startsWith(DATA_BLOB_PREFIX)) {
continue;
}
name = FileInfo.canonicalName(name);
try {
long currentGen = Long.parseLong(name.substring(DATA_BLOB_PREFIX.length()), Character.MAX_RADIX);
if (currentGen > generation) {
generation = currentGen;
}
} catch (NumberFormatException e) {
logger.warn("file [{}] does not conform to the '{}' schema", name, DATA_BLOB_PREFIX);
}
}
return generation;
}
/**
* Loads all available snapshots in the repository
*
* @param blobs list of blobs in repository
* @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation
*/
protected Tuple<BlobStoreIndexShardSnapshots, Integer> buildBlobStoreIndexShardSnapshots(Map<String, BlobMetaData> blobs) {
int latest = -1;
for (String name : blobs.keySet()) {
if (name.startsWith(SNAPSHOT_INDEX_PREFIX)) {
try {
int gen = Integer.parseInt(name.substring(SNAPSHOT_INDEX_PREFIX.length()));
if (gen > latest) {
latest = gen;
}
} catch (NumberFormatException ex) {
logger.warn("failed to parse index file name [{}]", name);
}
}
}
if (latest >= 0) {
try {
final BlobStoreIndexShardSnapshots shardSnapshots =
indexShardSnapshotsFormat.read(blobContainer, Integer.toString(latest));
return new Tuple<>(shardSnapshots, latest);
} catch (IOException e) {
logger.warn("failed to read index file [{}]", e, SNAPSHOT_INDEX_PREFIX + latest);
}
}
// We couldn't load the index file - falling back to loading individual snapshots
List<SnapshotFiles> snapshots = new ArrayList<>();
for (String name : blobs.keySet()) {
try {
BlobStoreIndexShardSnapshot snapshot = null;
if (name.startsWith(SNAPSHOT_PREFIX)) {
snapshot = indexShardSnapshotFormat.readBlob(blobContainer, name);
} else if (name.startsWith(LEGACY_SNAPSHOT_PREFIX)) {
snapshot = indexShardSnapshotLegacyFormat.readBlob(blobContainer, name);
}
if (snapshot != null) {
snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
}
} catch (IOException e) {
logger.warn("failed to read commit point [{}]", e, name);
}
}
return new Tuple<>(new BlobStoreIndexShardSnapshots(snapshots), -1);
}
}
/**
* Context for snapshot operations
*/
private class SnapshotContext extends Context {
private final Store store;
private final IndexShardSnapshotStatus snapshotStatus;
/**
* Constructs new context
*
* @param snapshotId snapshot id
* @param shardId shard to be snapshotted
* @param snapshotStatus snapshot status to report progress
*/
public SnapshotContext(SnapshotId snapshotId, ShardId shardId, IndexShardSnapshotStatus snapshotStatus) {
super(snapshotId, Version.CURRENT, shardId);
this.snapshotStatus = snapshotStatus;
store = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()).store();
}
/**
* Create snapshot from index commit point
*
* @param snapshotIndexCommit snapshot commit point
*/
public void snapshot(IndexCommit snapshotIndexCommit) {
logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, repositoryName);
store.incRef();
try {
final Map<String, BlobMetaData> blobs;
try {
blobs = blobContainer.listBlobs();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e);
}
long generation = findLatestFileNameGeneration(blobs);
Tuple<BlobStoreIndexShardSnapshots, Integer> tuple = buildBlobStoreIndexShardSnapshots(blobs);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
int fileListGeneration = tuple.v2();
final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles = new ArrayList<>();
int indexNumberOfFiles = 0;
long indexTotalFilesSize = 0;
ArrayList<FileInfo> filesToSnapshot = new ArrayList<>();
final Store.MetadataSnapshot metadata;
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
final Collection<String> fileNames;
try {
metadata = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
}
for (String fileName : fileNames) {
if (snapshotStatus.aborted()) {
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
}
logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
final StoreFileMetaData md = metadata.get(fileName);
FileInfo existingFileInfo = null;
List<FileInfo> filesInfo = snapshots.findPhysicalIndexFiles(fileName);
if (filesInfo != null) {
for (FileInfo fileInfo : filesInfo) {
try {
// in 1.3.3 we added additional hashes for .si / segments_N files
// to ensure we don't double the space in the repo since old snapshots
// don't have this hash we try to read that hash from the blob store
// in a bwc compatible way.
maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata);
} catch (Exception e) {
logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata());
}
if (fileInfo.isSame(md) && snapshotFileExistsInBlobs(fileInfo, blobs)) {
// a commit point file with the same name, size and checksum was already copied to repository
// we will reuse it for this snapshot
existingFileInfo = fileInfo;
break;
}
}
}
if (existingFileInfo == null) {
indexNumberOfFiles++;
indexTotalFilesSize += md.length();
// create a new FileInfo
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = new BlobStoreIndexShardSnapshot.FileInfo(fileNameFromGeneration(++generation), md, chunkSize);
indexCommitPointFiles.add(snapshotFileInfo);
filesToSnapshot.add(snapshotFileInfo);
} else {
indexCommitPointFiles.add(existingFileInfo);
}
}
snapshotStatus.files(indexNumberOfFiles, indexTotalFilesSize);
if (snapshotStatus.aborted()) {
logger.debug("[{}] [{}] Aborted during initialization", shardId, snapshotId);
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
}
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.STARTED);
for (FileInfo snapshotFileInfo : filesToSnapshot) {
try {
snapshotFile(snapshotFileInfo);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e);
}
}
snapshotStatus.indexVersion(snapshotIndexCommit.getGeneration());
// now create and write the commit point
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE);
BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(),
snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(),
// snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong
System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize);
//TODO: The time stored in snapshot doesn't include cleanup time.
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try {
indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getName());
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
}
// delete all files that are not referenced by any commit point
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
for (SnapshotFiles point : snapshots) {
newSnapshotsList.add(point);
}
// finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index
finalize(newSnapshotsList, fileListGeneration + 1, blobs);
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE);
} finally {
store.decRef();
}
}
/**
* Snapshot individual file
* <p>
* This is asynchronous method. Upon completion of the operation latch is getting counted down and any failures are
* added to the {@code failures} list
*
* @param fileInfo file to be snapshotted
*/
private void snapshotFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) throws IOException {
final String file = fileInfo.physicalName();
try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) {
for (int i = 0; i < fileInfo.numberOfParts(); i++) {
final long partBytes = fileInfo.partBytes(i);
final InputStreamIndexInput inputStreamIndexInput = new InputStreamIndexInput(indexInput, partBytes);
InputStream inputStream = snapshotRateLimiter == null ? inputStreamIndexInput : new RateLimitingInputStream(inputStreamIndexInput, snapshotRateLimiter, snapshotThrottleListener);
inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName());
blobContainer.writeBlob(fileInfo.partName(i), inputStream, partBytes);
}
Store.verify(indexInput);
snapshotStatus.addProcessedFile(fileInfo.length());
} catch (Exception t) {
failStoreIfCorrupted(t);
snapshotStatus.addProcessedFile(0);
throw t;
}
}
private void failStoreIfCorrupted(Exception e) {
if (e instanceof CorruptIndexException || e instanceof IndexFormatTooOldException || e instanceof IndexFormatTooNewException) {
try {
store.markStoreCorrupted((IOException) e);
} catch (IOException inner) {
inner.addSuppressed(e);
logger.warn("store cannot be marked as corrupted", inner);
}
}
}
/**
* Checks if snapshot file already exists in the list of blobs
*
* @param fileInfo file to check
* @param blobs list of blobs
* @return true if file exists in the list of blobs
*/
private boolean snapshotFileExistsInBlobs(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Map<String, BlobMetaData> blobs) {
BlobMetaData blobMetaData = blobs.get(fileInfo.name());
if (blobMetaData != null) {
return blobMetaData.length() == fileInfo.length();
} else if (blobs.containsKey(fileInfo.partName(0))) {
// multi part file sum up the size and check
int part = 0;
long totalSize = 0;
while (true) {
blobMetaData = blobs.get(fileInfo.partName(part++));
if (blobMetaData == null) {
break;
}
totalSize += blobMetaData.length();
}
return totalSize == fileInfo.length();
}
// no file, not exact and not multipart
return false;
}
private class AbortableInputStream extends FilterInputStream {
private final String fileName;
public AbortableInputStream(InputStream delegate, String fileName) {
super(delegate);
this.fileName = fileName;
}
@Override
public int read() throws IOException {
checkAborted();
return in.read();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
checkAborted();
return in.read(b, off, len);
}
private void checkAborted() {
if (snapshotStatus.aborted()) {
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
}
}
}
}
/**
* This is a BWC layer to ensure we update the snapshots metadata with the corresponding hashes before we compare them.
* The new logic for StoreFileMetaData reads the entire <tt>.si</tt> and <tt>segments.n</tt> files to strengthen the
* comparison of the files on a per-segment / per-commit level.
*/
private static void maybeRecalculateMetadataHash(final BlobContainer blobContainer, final FileInfo fileInfo, Store.MetadataSnapshot snapshot) throws Exception {
final StoreFileMetaData metadata;
if (fileInfo != null && (metadata = snapshot.get(fileInfo.physicalName())) != null) {
if (metadata.hash().length > 0 && fileInfo.metadata().hash().length == 0) {
// we have a hash - check if our repo has a hash too otherwise we have
// to calculate it.
// we might have multiple parts even though the file is small... make sure we read all of it.
try (final InputStream stream = new PartSliceStream(blobContainer, fileInfo)) {
BytesRefBuilder builder = new BytesRefBuilder();
Store.MetadataSnapshot.hashFile(builder, stream, fileInfo.length());
BytesRef hash = fileInfo.metadata().hash(); // reset the file infos metadata hash
assert hash.length == 0;
hash.bytes = builder.bytes();
hash.offset = 0;
hash.length = builder.length();
}
}
}
}
private static final class PartSliceStream extends SlicedInputStream {
private final BlobContainer container;
private final FileInfo info;
public PartSliceStream(BlobContainer container, FileInfo info) {
super(info.numberOfParts());
this.info = info;
this.container = container;
}
@Override
protected InputStream openSlice(long slice) throws IOException {
return container.readBlob(info.partName(slice));
}
}
/**
* Context for restore operations
*/
private class RestoreContext extends Context {
private final Store store;
private final RecoveryState recoveryState;
/**
* Constructs new restore context
*
* @param snapshotId snapshot id
* @param shardId shard to be restored
* @param snapshotShardId shard in the snapshot that data should be restored from
* @param recoveryState recovery state to report progress
*/
public RestoreContext(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
super(snapshotId, version, shardId, snapshotShardId);
this.recoveryState = recoveryState;
store = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()).store();
}
/**
* Performs restore operation
*/
public void restore() throws IOException {
store.incRef();
try {
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
BlobStoreIndexShardSnapshot snapshot = loadSnapshot();
if (snapshot.indexFiles().size() == 1
&& snapshot.indexFiles().get(0).physicalName().startsWith("segments_")
&& snapshot.indexFiles().get(0).hasUnknownChecksum()) {
// If the shard has no documents, it will only contain a single segments_N file for the
// shard's snapshot. If we are restoring a snapshot created by a previous supported version,
// it is still possible that in that version, an empty shard has a segments_N file with an unsupported
// version (and no checksum), because we don't know the Lucene version to assign segments_N until we
// have written some data. Since the segments_N for an empty shard could have an incompatible Lucene
// version number and no checksum, even though the index itself is perfectly fine to restore, this
// empty shard would cause exceptions to be thrown. Since there is no data to restore from an empty
// shard anyway, we just create the empty shard here and then exit.
IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(null)
.setOpenMode(IndexWriterConfig.OpenMode.CREATE)
.setCommitOnClose(true));
writer.close();
return;
}
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
final Store.MetadataSnapshot recoveryTargetMetadata;
try {
recoveryTargetMetadata = store.getMetadataOrEmpty();
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
logger.warn("{} Can't read metadata from store", e, shardId);
throw new IndexShardRestoreFailedException(shardId, "Can't restore corrupted shard", e);
}
final List<FileInfo> filesToRecover = new ArrayList<>();
final Map<String, StoreFileMetaData> snapshotMetaData = new HashMap<>();
final Map<String, FileInfo> fileInfos = new HashMap<>();
for (final FileInfo fileInfo : snapshot.indexFiles()) {
try {
// in 1.3.3 we added additional hashes for .si / segments_N files
// to ensure we don't double the space in the repo since old snapshots
// don't have this hash we try to read that hash from the blob store
// in a bwc compatible way.
maybeRecalculateMetadataHash(blobContainer, fileInfo, recoveryTargetMetadata);
} catch (Exception e) {
// if the index is broken we might not be able to read it
logger.warn("{} Can't calculate hash from blog for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata());
}
snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata());
fileInfos.put(fileInfo.metadata().name(), fileInfo);
}
final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(unmodifiableMap(snapshotMetaData), emptyMap(), 0);
final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata);
for (StoreFileMetaData md : diff.identical) {
FileInfo fileInfo = fileInfos.get(md.name());
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), true);
if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] not_recovering [{}] from [{}], exists in local store and is same", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name());
}
}
for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) {
FileInfo fileInfo = fileInfos.get(md.name());
filesToRecover.add(fileInfo);
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), false);
if (logger.isTraceEnabled()) {
if (md == null) {
logger.trace("[{}] [{}] recovering [{}] from [{}], does not exists in local store", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name());
} else {
logger.trace("[{}] [{}] recovering [{}] from [{}], exists in local store but is different", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name());
}
}
}
final RecoveryState.Index index = recoveryState.getIndex();
if (filesToRecover.isEmpty()) {
logger.trace("no files to recover, all exists within the local store");
}
if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", shardId, snapshotId,
index.totalRecoverFiles(), new ByteSizeValue(index.totalRecoverBytes()), index.reusedFileCount(), new ByteSizeValue(index.reusedFileCount()));
}
try {
for (final FileInfo fileToRecover : filesToRecover) {
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
restoreFile(fileToRecover);
}
} catch (IOException ex) {
throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex);
}
final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile();
if (recoveryTargetMetadata == null) {
throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file");
}
assert restoredSegmentsFile != null;
// read the snapshot data persisted
final SegmentInfos segmentCommitInfos;
try {
segmentCommitInfos = Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory());
} catch (IOException e) {
throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
}
recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion());
/// now, go over and clean files that are in the store, but were not in the snapshot
try {
for (String storeFile : store.directory().listAll()) {
if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) {
continue; //skip write.lock, checksum files and files that exist in the snapshot
}
try {
store.deleteQuiet("restore", storeFile);
store.directory().deleteFile(storeFile);
} catch (IOException e) {
logger.warn("[{}] failed to delete file [{}] during snapshot cleanup", snapshotId, storeFile);
}
}
} catch (IOException e) {
logger.warn("[{}] failed to list directory - some of files might not be deleted", snapshotId);
}
} finally {
store.decRef();
}
}
/**
* Restores a file
* This is asynchronous method. Upon completion of the operation latch is getting counted down and any failures are
* added to the {@code failures} list
*
* @param fileInfo file to be restored
*/
private void restoreFile(final FileInfo fileInfo) throws IOException {
boolean success = false;
try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) {
final InputStream stream;
if (restoreRateLimiter == null) {
stream = partSliceStream;
} else {
stream = new RateLimitingInputStream(partSliceStream, restoreRateLimiter, restoreThrottleListener);
}
try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
final byte[] buffer = new byte[BUFFER_SIZE];
int length;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, 0, length);
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length);
}
Store.verify(indexOutput);
indexOutput.close();
store.directory().sync(Collections.singleton(fileInfo.physicalName()));
success = true;
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
try {
store.markStoreCorrupted(ex);
} catch (IOException e) {
logger.warn("store cannot be marked as corrupted", e);
}
throw ex;
} finally {
if (success == false) {
store.deleteQuiet(fileInfo.physicalName());
}
}
}
}
}
public interface RateLimiterListener {
void onRestorePause(long nanos);
void onSnapshotPause(long nanos);
}
}

View File

@ -21,8 +21,6 @@ package org.elasticsearch.repositories;
import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.repositories.uri.URLRepository;
import org.elasticsearch.snapshots.RestoreService;
@ -32,20 +30,20 @@ import org.elasticsearch.snapshots.SnapshotsService;
/**
* Sets up classes for Snapshot/Restore.
*
* Plugins can add custom repository types by calling {@link #registerRepository(String, Class, Class)}.
* Plugins can add custom repository types by calling {@link #registerRepository(String, Class)}.
*/
public class RepositoriesModule extends AbstractModule {
private final RepositoryTypesRegistry repositoryTypes = new RepositoryTypesRegistry();
public RepositoriesModule() {
registerRepository(FsRepository.TYPE, FsRepository.class, BlobStoreIndexShardRepository.class);
registerRepository(URLRepository.TYPE, URLRepository.class, BlobStoreIndexShardRepository.class);
registerRepository(FsRepository.TYPE, FsRepository.class);
registerRepository(URLRepository.TYPE, URLRepository.class);
}
/** Registers a custom repository type to the given {@link Repository} and {@link IndexShardRepository}. */
public void registerRepository(String type, Class<? extends Repository> repositoryType, Class<? extends IndexShardRepository> shardRepositoryType) {
repositoryTypes.registerRepository(type, repositoryType, shardRepositoryType);
/** Registers a custom repository type to the given {@link Repository}. */
public void registerRepository(String type, Class<? extends Repository> repositoryType) {
repositoryTypes.registerRepository(type, repositoryType);
}
@Override

View File

@ -37,7 +37,6 @@ import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.transport.TransportService;
@ -336,23 +335,6 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
throw new RepositoryMissingException(repository);
}
/**
* Returns registered index shard repository
* <p>
* This method is called only on data nodes
*
* @param repository repository name
* @return registered repository
* @throws RepositoryMissingException if repository with such name isn't registered
*/
public IndexShardRepository indexShardRepository(String repository) {
RepositoryHolder holder = repositories.get(repository);
if (holder != null) {
return holder.indexShardRepository;
}
throw new RepositoryMissingException(repository);
}
/**
* Creates a new repository and adds it to the list of registered repositories.
* <p>
@ -403,14 +385,16 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
try {
ModulesBuilder modules = new ModulesBuilder();
RepositoryName name = new RepositoryName(repositoryMetaData.type(), repositoryMetaData.name());
modules.add(new RepositoryNameModule(name));
modules.add(new RepositoryModule(name, repositoryMetaData.settings(), this.settings, typesRegistry));
modules.add(b -> {
b.bind(RepositoryName.class).toInstance(name);
typesRegistry.bindType(b, repositoryMetaData.type());
b.bind(RepositorySettings.class).toInstance(new RepositorySettings(settings, repositoryMetaData.settings()));
});
repositoryInjector = modules.createChildInjector(injector);
Repository repository = repositoryInjector.getInstance(Repository.class);
IndexShardRepository indexShardRepository = repositoryInjector.getInstance(IndexShardRepository.class);
repository.start();
return new RepositoryHolder(repositoryMetaData.type(), repositoryMetaData.settings(), repository, indexShardRepository);
return new RepositoryHolder(repositoryMetaData.type(), repositoryMetaData.settings(), repository);
} catch (Exception e) {
logger.warn("failed to create repository [{}][{}]", e, repositoryMetaData.type(), repositoryMetaData.name());
throw new RepositoryException(repositoryMetaData.name(), "failed to create repository", e);
@ -472,13 +456,11 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
private final String type;
private final Settings settings;
private final Repository repository;
private final IndexShardRepository indexShardRepository;
public RepositoryHolder(String type, Settings settings,Repository repository, IndexShardRepository indexShardRepository) {
public RepositoryHolder(String type, Settings settings,Repository repository) {
this.type = type;
this.settings = settings;
this.repository = repository;
this.indexShardRepository = indexShardRepository;
}
}

View File

@ -19,7 +19,11 @@
package org.elasticsearch.repositories;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.index.shard.ShardId;
@ -29,22 +33,19 @@ import org.elasticsearch.snapshots.SnapshotShardFailure;
import java.io.IOException;
import java.util.List;
import java.util.function.Predicate;
/**
* Snapshot repository interface.
* <p>
* Responsible for index and cluster level operations. It's called only on master.
* Shard-level operations are performed using {@link org.elasticsearch.index.snapshots.IndexShardRepository}
* interface on data nodes.
* Responsible for index and cluster and shard level operations.
* <p>
* Typical snapshot usage pattern:
* <ul>
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link org.elasticsearch.index.snapshots.IndexShardRepository#snapshot(SnapshotId, ShardId, IndexCommit, IndexShardSnapshotStatus)} for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot}
* with possible list of failures</li>
* <li>Data nodes call {@link Repository#snapshot(IndexShard, SnapshotId, IndexCommit, IndexShardSnapshotStatus)}
* for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
* </ul>
*/
public interface Repository extends LifecycleComponent {
@ -141,4 +142,49 @@ public interface Repository extends LifecycleComponent {
*/
boolean readOnly();
/**
* Creates a snapshot of the shard based on the index commit point.
* <p>
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex} method.
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
* {@link IndexShardSnapshotStatus#aborted()} to see if the snapshot process should be aborted.
*
* @param shard shard to be snapshotted
* @param snapshotId snapshot id
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
*/
void snapshot(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
/**
* Restores snapshot of the shard.
* <p>
* The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied.
*
* @param shard the shard to restore the index into
* @param snapshotId snapshot id
* @param version version of elasticsearch that created this snapshot
* @param snapshotShardId shard id (in the snapshot)
* @param recoveryState recovery state
*/
void restore(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState);
/**
* Retrieve shard snapshot status for the stored snapshot
*
* @param snapshotId snapshot id
* @param version version of elasticsearch that created this snapshot
* @param shardId shard id
* @return snapshot status
*/
IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId);
/**
* Verifies repository settings on data node.
* @param verificationToken value returned by {@link org.elasticsearch.repositories.Repository#startVerification()}
* @param localNode the local node information, for inclusion in verification errors
*/
void verify(String verificationToken, DiscoveryNode localNode);
}

View File

@ -1,61 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
/**
* Binds repository classes for the specific repository type.
*/
public class RepositoryModule extends AbstractModule {
private RepositoryName repositoryName;
private final Settings globalSettings;
private final Settings settings;
private final RepositoryTypesRegistry typesRegistry;
/**
* Spawns module for repository with specified name, type and settings
*
* @param repositoryName repository name and type
* @param settings repository settings
* @param globalSettings global settings
* @param typesRegistry registry of repository types
*/
public RepositoryModule(RepositoryName repositoryName, Settings settings, Settings globalSettings, RepositoryTypesRegistry typesRegistry) {
this.repositoryName = repositoryName;
this.globalSettings = globalSettings;
this.settings = settings;
this.typesRegistry = typesRegistry;
}
/**
* {@inheritDoc}
*/
@Override
protected void configure() {
typesRegistry.bindType(binder(), repositoryName.type());
bind(RepositorySettings.class).toInstance(new RepositorySettings(globalSettings, settings));
}
}

View File

@ -1,39 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.inject.AbstractModule;
/**
* Binds specific instance of RepositoryName for injection to repository module
*/
public class RepositoryNameModule extends AbstractModule {
private final RepositoryName repositoryName;
public RepositoryNameModule(RepositoryName repositoryName) {
this.repositoryName = repositoryName;
}
@Override
protected void configure() {
bind(RepositoryName.class).toInstance(repositoryName);
}
}

View File

@ -22,22 +22,18 @@ package org.elasticsearch.repositories;
import org.elasticsearch.common.inject.Binder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.ExtensionPoint;
import org.elasticsearch.index.snapshots.IndexShardRepository;
/**
* A mapping from type name to implementations of {@link Repository} and {@link IndexShardRepository}.
* A mapping from type name to implementations of {@link Repository}.
*/
public class RepositoryTypesRegistry {
// invariant: repositories and shardRepositories have the same keyset
private final ExtensionPoint.SelectedType<Repository> repositoryTypes =
new ExtensionPoint.SelectedType<>("repository", Repository.class);
private final ExtensionPoint.SelectedType<IndexShardRepository> shardRepositoryTypes =
new ExtensionPoint.SelectedType<>("index_repository", IndexShardRepository.class);
/** Adds a new repository type to the registry, bound to the given implementation classes. */
public void registerRepository(String name, Class<? extends Repository> repositoryType, Class<? extends IndexShardRepository> shardRepositoryType) {
public void registerRepository(String name, Class<? extends Repository> repositoryType) {
repositoryTypes.registerExtension(name, repositoryType);
shardRepositoryTypes.registerExtension(name, shardRepositoryType);
}
/**
@ -47,6 +43,5 @@ public class RepositoryTypesRegistry {
public void bindType(Binder binder, String type) {
Settings settings = Settings.builder().put("type", type).build();
repositoryTypes.bindType(binder, settings, "type", null);
shardRepositoryTypes.bindType(binder, settings, "type", null);
}
}

View File

@ -19,6 +19,12 @@
package org.elasticsearch.repositories;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import com.carrotsearch.hppc.ObjectContainer;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.action.ActionListener;
@ -29,7 +35,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoriesService.VerifyResponse;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
@ -40,12 +45,6 @@ import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
public class VerifyNodeRepositoryAction extends AbstractComponent {
public static final String ACTION_NAME = "internal:admin/repository/verify";
@ -82,7 +81,7 @@ public class VerifyNodeRepositoryAction extends AbstractComponent {
for (final DiscoveryNode node : nodes) {
if (node.equals(localNode)) {
try {
doVerify(repository, verificationToken);
doVerify(repository, verificationToken, localNode);
} catch (Exception e) {
logger.warn("[{}] failed to verify repository", e, repository);
errors.add(new VerificationFailure(node.getId(), e));
@ -115,9 +114,9 @@ public class VerifyNodeRepositoryAction extends AbstractComponent {
listener.onResponse(new RepositoriesService.VerifyResponse(nodes.toArray(new DiscoveryNode[nodes.size()]), errors.toArray(new VerificationFailure[errors.size()])));
}
private void doVerify(String repository, String verificationToken) {
IndexShardRepository blobStoreIndexShardRepository = repositoriesService.indexShardRepository(repository);
blobStoreIndexShardRepository.verify(verificationToken);
private void doVerify(String repositoryName, String verificationToken, DiscoveryNode localNode) {
Repository repository = repositoriesService.repository(repositoryName);
repository.verify(verificationToken, localNode);
}
public static class VerifyNodeRepositoryRequest extends TransportRequest {
@ -151,8 +150,9 @@ public class VerifyNodeRepositoryAction extends AbstractComponent {
class VerifyNodeRepositoryRequestHandler implements TransportRequestHandler<VerifyNodeRepositoryRequest> {
@Override
public void messageReceived(VerifyNodeRepositoryRequest request, TransportChannel channel) throws Exception {
DiscoveryNode localNode = clusterService.state().nodes().getLocalNode();
try {
doVerify(request.repository, request.verificationToken);
doVerify(request.repository, request.verificationToken, localNode);
} catch (Exception ex) {
logger.warn("[{}] failed to verify repository", ex, request.repository);
throw ex;

View File

@ -27,7 +27,6 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
@ -77,11 +76,10 @@ public class FsRepository extends BlobStoreRepository {
*
* @param name repository name
* @param repositorySettings repository settings
* @param indexShardRepository index shard repository
*/
@Inject
public FsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, Environment environment) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);
public FsRepository(RepositoryName name, RepositorySettings repositorySettings, Environment environment) throws IOException {
super(name.getName(), repositorySettings);
Path locationFile;
String location = REPOSITORIES_LOCATION_SETTING.get(repositorySettings.settings());
if (location.isEmpty()) {

View File

@ -1,43 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.uri;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.repositories.RepositoryName;
/**
*/
public class URLIndexShardRepository extends BlobStoreIndexShardRepository {
@Inject
public URLIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) {
super(settings, repositoryName, indicesService, clusterService);
}
@Override
public void verify(String seed) {
//TODO: Add verification that URL is accessible
}
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.util.URIPattern;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
@ -82,11 +81,10 @@ public class URLRepository extends BlobStoreRepository {
*
* @param name repository name
* @param repositorySettings repository settings
* @param indexShardRepository shard repository
*/
@Inject
public URLRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, Environment environment) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);
public URLRepository(RepositoryName name, RepositorySettings repositorySettings, Environment environment) throws IOException {
super(name.getName(), repositorySettings);
if (URL_SETTING.exists(repositorySettings.settings()) == false && REPOSITORIES_URL_SETTING.exists(settings) == false) {
throw new RepositoryException(name.name(), "missing url");
@ -101,9 +99,6 @@ public class URLRepository extends BlobStoreRepository {
basePath = BlobPath.cleanPath();
}
/**
* {@inheritDoc}
*/
@Override
protected BlobStore blobStore() {
return blobStore;

View File

@ -63,7 +63,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;
@ -112,7 +111,7 @@ import static org.elasticsearch.common.util.set.Sets.newHashSet;
* method.
* <p>
* Individual shards are getting restored as part of normal recovery process in
* {@link IndexShard#restoreFromRepository(IndexShardRepository)} )}
* {@link IndexShard#restoreFromRepository(Repository)} )}
* method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking
* at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property.
* <p>

View File

@ -43,10 +43,10 @@ import org.elasticsearch.index.engine.SnapshotFailedEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
@ -322,7 +322,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
* @param snapshotStatus snapshot status
*/
private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexShardSnapshotStatus snapshotStatus) {
IndexShardRepository indexShardRepository = snapshotsService.getRepositoriesService().indexShardRepository(snapshot.getRepository());
Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository());
ShardId shardId = indexShard.shardId();
if (!indexShard.routingEntry().primary()) {
throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
@ -340,11 +340,11 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
// we flush first to make sure we get the latest writes snapshotted
IndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true);
try {
indexShardRepository.snapshot(snapshot.getSnapshotId(), shardId, snapshotIndexCommit, snapshotStatus);
repository.snapshot(indexShard, snapshot.getSnapshotId(), snapshotIndexCommit, snapshotStatus);
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n");
logger.debug("snapshot ({}) completed to {}, took [{}]\n{}", snapshot, indexShardRepository,
logger.debug("snapshot ({}) completed to {}, took [{}]\n{}", snapshot, repository,
TimeValue.timeValueMillis(snapshotStatus.time()), sb);
}
} finally {

View File

@ -55,7 +55,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
@ -547,7 +546,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
final SnapshotInfo snapshotInfo) throws IOException {
Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>();
Repository repository = repositoriesService.repository(repositoryName);
IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(repositoryName);
MetaData metaData = repository.readSnapshotMetaData(snapshotInfo, snapshotInfo.indices());
for (String index : snapshotInfo.indices()) {
IndexMetaData indexMetaData = metaData.indices().get(index);
@ -563,7 +561,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
shardStatus.put(shardId, shardSnapshotStatus);
} else {
IndexShardSnapshotStatus shardSnapshotStatus =
indexShardRepository.snapshotStatus(snapshotInfo.snapshotId(), snapshotInfo.version(), shardId);
repository.snapshotStatus(snapshotInfo.snapshotId(), snapshotInfo.version(), shardId);
shardStatus.put(shardId, shardSnapshotStatus);
}
}

View File

@ -18,6 +18,27 @@
*/
package org.elasticsearch.index.shard;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
@ -50,6 +71,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.RestoreSource;
@ -63,6 +85,7 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
@ -91,15 +114,17 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.FieldMaskingReader;
@ -108,27 +133,6 @@ import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
@ -1178,13 +1182,9 @@ public class IndexShardTests extends ESSingleNodeTestCase {
test_target_shard.updateRoutingEntry(routing);
DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT);
test_target_shard.markAsRecovering("store", new RecoveryState(routing.shardId(), routing.primary(), RecoveryState.Type.SNAPSHOT, routing.restoreSource(), localNode));
assertTrue(test_target_shard.restoreFromRepository(new IndexShardRepository() {
assertTrue(test_target_shard.restoreFromRepository(new RestoreOnlyRepository() {
@Override
public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
}
@Override
public void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
public void restore(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) {
try {
cleanLuceneIndex(targetStore.directory());
for (String file : sourceStore.directory().listAll()) {
@ -1197,15 +1197,6 @@ public class IndexShardTests extends ESSingleNodeTestCase {
throw new RuntimeException(ex);
}
}
@Override
public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) {
return null;
}
@Override
public void verify(String verificationToken) {
}
}));
test_target_shard.updateRoutingEntry(routing.moveToStarted());
@ -1649,4 +1640,63 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertEquals(mappings.get("index_1").get("test").get().source().string(), "{\"test\":{\"properties\":{\"foo\":{\"type\":\"text\"}}}}");
}
/** A dummy repository for testing which just needs restore overridden */
private static abstract class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
public RestoreOnlyRepository() {
super(Settings.EMPTY);
}
@Override
protected void doStart() {}
@Override
protected void doStop() {}
@Override
protected void doClose() {}
@Override
public SnapshotInfo readSnapshot(SnapshotId snapshotId) {
return null;
}
@Override
public MetaData readSnapshotMetaData(SnapshotInfo snapshot, List<String> indices) throws IOException {
return null;
}
@Override
public List<SnapshotId> snapshots() {
return null;
}
@Override
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData metaData) {}
@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<String> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures) {
return null;
}
@Override
public void deleteSnapshot(SnapshotId snapshotId) {}
@Override
public long snapshotThrottleTimeInNanos() {
return 0;
}
@Override
public long restoreThrottleTimeInNanos() {
return 0;
}
@Override
public String startVerification() {
return null;
}
@Override
public void endVerification(String verificationToken) {}
@Override
public boolean readOnly() {
return false;
}
@Override
public void snapshot(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {}
@Override
public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) {
return null;
}
@Override
public void verify(String verificationToken, DiscoveryNode localNode) {}
}
}

View File

@ -44,8 +44,6 @@ import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
/**
*/
@ESIntegTestCase.ClusterScope(minNumDataNodes = 2)
public class RepositoriesIT extends AbstractSnapshotIntegTestCase {
public void testRepositoryCreation() throws Exception {

View File

@ -19,29 +19,6 @@
package org.elasticsearch.snapshots.mockstore;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.fs.FsRepository;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
@ -55,6 +32,26 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotId;
public class MockRepository extends FsRepository {
public static class Plugin extends org.elasticsearch.plugins.Plugin {
@ -64,7 +61,7 @@ public class MockRepository extends FsRepository {
Setting.simpleString("secret.mock.password", Property.NodeScope, Property.Filtered);
public void onModule(RepositoriesModule repositoriesModule) {
repositoriesModule.registerRepository("mock", MockRepository.class, BlobStoreIndexShardRepository.class);
repositoriesModule.registerRepository("mock", MockRepository.class);
}
@Override
@ -100,8 +97,8 @@ public class MockRepository extends FsRepository {
private volatile boolean blocked = false;
@Inject
public MockRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, ClusterService clusterService, Environment environment) throws IOException {
super(name, overrideSettings(repositorySettings, clusterService), indexShardRepository, environment);
public MockRepository(RepositoryName name, RepositorySettings repositorySettings, ClusterService clusterService, Environment environment) throws IOException {
super(name, overrideSettings(repositorySettings, clusterService), environment);
randomControlIOExceptionRate = repositorySettings.settings().getAsDouble("random_control_io_exception_rate", 0.0);
randomDataFileIOExceptionRate = repositorySettings.settings().getAsDouble("random_data_file_io_exception_rate", 0.0);
maximumNumberOfFailures = repositorySettings.settings().getAsLong("max_failure_number", 100L);

View File

@ -19,6 +19,11 @@
package org.elasticsearch.plugin.repository.azure;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.cloud.azure.AzureRepositoryModule;
import org.elasticsearch.cloud.azure.storage.AzureStorageService;
import org.elasticsearch.common.inject.Module;
@ -26,16 +31,10 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.azure.AzureRepository;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
*
*/
@ -56,7 +55,7 @@ public class AzureRepositoryPlugin extends Plugin {
public void onModule(RepositoriesModule module) {
logger.debug("registering repository type [{}]", AzureRepository.TYPE);
module.registerRepository(AzureRepository.TYPE, AzureRepository.class, BlobStoreIndexShardRepository.class);
module.registerRepository(AzureRepository.TYPE, AzureRepository.class);
}
@Override

View File

@ -34,7 +34,6 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.RepositoryVerificationException;
@ -86,9 +85,8 @@ public class AzureRepository extends BlobStoreRepository {
@Inject
public AzureRepository(RepositoryName name, RepositorySettings repositorySettings,
IndexShardRepository indexShardRepository,
AzureBlobStore azureBlobStore) throws IOException, URISyntaxException, StorageException {
super(name.getName(), repositorySettings, indexShardRepository);
super(name.getName(), repositorySettings);
String container = getValue(repositorySettings, Repository.CONTAINER_SETTING, Storage.CONTAINER_SETTING);

View File

@ -19,6 +19,11 @@
package org.elasticsearch.plugin.repository.gcs;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.Collections;
import com.google.api.client.auth.oauth2.TokenRequest;
import com.google.api.client.auth.oauth2.TokenResponse;
import com.google.api.client.googleapis.json.GoogleJsonError;
@ -35,16 +40,10 @@ import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.Collections;
public class GoogleCloudStoragePlugin extends Plugin {
public static final String NAME = "repository-gcs";
@ -115,7 +114,6 @@ public class GoogleCloudStoragePlugin extends Plugin {
}
public void onModule(RepositoriesModule repositoriesModule) {
repositoriesModule.registerRepository(GoogleCloudStorageRepository.TYPE,
GoogleCloudStorageRepository.class, BlobStoreIndexShardRepository.class);
repositoriesModule.registerRepository(GoogleCloudStorageRepository.TYPE, GoogleCloudStorageRepository.class);
}
}

View File

@ -29,7 +29,6 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.plugin.repository.gcs.GoogleCloudStoragePlugin;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
@ -75,9 +74,8 @@ public class GoogleCloudStorageRepository extends BlobStoreRepository {
@Inject
public GoogleCloudStorageRepository(RepositoryName repositoryName, RepositorySettings repositorySettings,
IndexShardRepository indexShardRepository,
GoogleCloudStorageService storageService) throws Exception {
super(repositoryName.getName(), repositorySettings, indexShardRepository);
super(repositoryName.getName(), repositorySettings);
String bucket = get(BUCKET, repositoryName, repositorySettings);
String application = get(APPLICATION_NAME, repositoryName, repositorySettings);

View File

@ -26,7 +26,6 @@ import java.security.PrivilegedAction;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesModule;
@ -85,6 +84,6 @@ public final class HdfsPlugin extends Plugin {
}
public void onModule(RepositoriesModule repositoriesModule) {
repositoriesModule.registerRepository("hdfs", HdfsRepository.class, BlobStoreIndexShardRepository.class);
repositoriesModule.registerRepository("hdfs", HdfsRepository.class);
}
}

View File

@ -44,7 +44,6 @@ import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
@ -63,8 +62,8 @@ public final class HdfsRepository extends BlobStoreRepository {
private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB);
@Inject
public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);
public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings) throws IOException {
super(name.getName(), repositorySettings);
this.repositorySettings = repositorySettings;
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", null);

View File

@ -19,18 +19,6 @@
package org.elasticsearch.plugin.repository.s3;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cloud.aws.S3Module;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.s3.S3Repository;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
@ -39,6 +27,16 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cloud.aws.S3Module;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.s3.S3Repository;
/**
*
*/
@ -78,7 +76,7 @@ public class S3RepositoryPlugin extends Plugin {
}
public void onModule(RepositoriesModule repositoriesModule) {
repositoriesModule.registerRepository(S3Repository.TYPE, S3Repository.class, BlobStoreIndexShardRepository.class);
repositoriesModule.registerRepository(S3Repository.TYPE, S3Repository.class);
}
@Override

View File

@ -32,7 +32,6 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
@ -250,12 +249,11 @@ public class S3Repository extends BlobStoreRepository {
*
* @param name repository name
* @param repositorySettings repository settings
* @param indexShardRepository index shard repository
* @param s3Service S3 service
*/
@Inject
public S3Repository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AwsS3Service s3Service) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);
public S3Repository(RepositoryName name, RepositorySettings repositorySettings, AwsS3Service s3Service) throws IOException {
super(name.getName(), repositorySettings);
String bucket = getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING);
if (bucket == null) {

View File

@ -336,7 +336,7 @@ public class RepositoryS3SettingsTests extends ESTestCase {
.build());
try {
new S3Repository(new RepositoryName("s3", "s3repo"), s3RepositorySettings, null, null);
new S3Repository(new RepositoryName("s3", "s3repo"), s3RepositorySettings, null);
fail("We should either raise a NPE or a RepositoryException or a IllegalArgumentException");
} catch (RepositoryException e) {
assertThat(e.getDetailedMessage(), containsString(expectedMessage));