Merge pull request #19292 from rjernst/repository_deguice

Simplified repository api for snapshot/restore
This commit is contained in:
Ryan Ernst 2016-07-07 13:03:58 -07:00 committed by GitHub
commit 89d69ea5a2
32 changed files with 1132 additions and 1493 deletions

View File

@ -94,7 +94,6 @@ import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.Store.MetadataSnapshot;
import org.elasticsearch.index.store.StoreFileMetaData;
@ -111,6 +110,7 @@ import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTargetService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.search.suggest.completion.CompletionFieldStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.search.suggest.completion2x.Completion090PostingsFormat;
@ -1161,7 +1161,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return storeRecovery.recoverFromStore(this, shouldExist);
}
public boolean restoreFromRepository(IndexShardRepository repository) {
public boolean restoreFromRepository(Repository repository) {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromRepository(this, repository);
@ -1449,9 +1449,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(
final Repository repository = repositoriesService.repository(
recoveryState.getRestoreSource().snapshot().getRepository());
if (restoreFromRepository(indexShardRepository)) {
if (restoreFromRepository(repository)) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Exception first) {

View File

@ -40,10 +40,10 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.Repository;
import java.io.IOException;
import java.util.Arrays;
@ -219,14 +219,14 @@ final class StoreRecovery {
}
/**
* Recovers an index from a given {@link IndexShardRepository}. This method restores a
* Recovers an index from a given {@link Repository}. This method restores a
* previously created index snapshot into an existing initializing shard.
* @param indexShard the index shard instance to recovery the snapshot from
* @param repository the repository holding the physical files the shard should be recovered from
* @return <code>true</code> if the shard has been recovered successfully, <code>false</code> if the recovery
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
*/
boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository) {
boolean recoverFromRepository(final IndexShard indexShard, Repository repository) {
if (canRecover(indexShard)) {
final ShardRouting shardRouting = indexShard.routingEntry();
if (shardRouting.restoreSource() == null) {
@ -380,7 +380,7 @@ final class StoreRecovery {
/**
* Restores shard from {@link RestoreSource} associated with this shard in routing table
*/
private void restore(final IndexShard indexShard, final IndexShardRepository indexShardRepository) {
private void restore(final IndexShard indexShard, final Repository repository) {
RestoreSource restoreSource = indexShard.routingEntry().restoreSource();
final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog();
if (restoreSource == null) {
@ -397,7 +397,7 @@ final class StoreRecovery {
if (!shardId.getIndexName().equals(restoreSource.index())) {
snapshotShardId = new ShardId(restoreSource.index(), IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
}
indexShardRepository.restore(restoreSource.snapshot().getSnapshotId(), restoreSource.version(), shardId, snapshotShardId, indexShard.recoveryState());
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), snapshotShardId, indexShard.recoveryState());
indexShard.skipTranslogRecovery();
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");

View File

@ -1,81 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.snapshots;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoveryState;
/**
* Shard-level snapshot repository
* <p>
* IndexShardRepository is used on data node to create snapshots of individual shards. See {@link org.elasticsearch.repositories.Repository}
* for more information.
*/
public interface IndexShardRepository {
/**
* Creates a snapshot of the shard based on the index commit point.
* <p>
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex} method.
* IndexShardRepository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
* {@link IndexShardSnapshotStatus#aborted()} to see if the snapshot process should be aborted.
*
* @param snapshotId snapshot id
* @param shardId shard to be snapshotted
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
*/
void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
/**
* Restores snapshot of the shard.
* <p>
* The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied.
*
* @param snapshotId snapshot id
* @param shardId shard id (in the current index)
* @param version version of elasticsearch that created this snapshot
* @param snapshotShardId shard id (in the snapshot)
* @param recoveryState recovery state
*/
void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState);
/**
* Retrieve shard snapshot status for the stored snapshot
*
* @param snapshotId snapshot id
* @param version version of elasticsearch that created this snapshot
* @param shardId shard id
* @return snapshot status
*/
IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId);
/**
* Verifies repository settings on data node
* @param verificationToken value returned by {@link org.elasticsearch.repositories.Repository#startVerification()}
*/
void verify(String verificationToken);
}

View File

@ -1,972 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.snapshots.blobstore;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.repositories.blobstore.BlobStoreFormat;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.elasticsearch.repositories.blobstore.LegacyBlobStoreFormat;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.testBlobPrefix;
/**
* Blob store based implementation of IndexShardRepository
*/
public class BlobStoreIndexShardRepository extends AbstractComponent implements IndexShardRepository {
private static final int BUFFER_SIZE = 4096;
private BlobStore blobStore;
private BlobPath basePath;
private final String repositoryName;
private ByteSizeValue chunkSize;
private final IndicesService indicesService;
private final ClusterService clusterService;
private RateLimiter snapshotRateLimiter;
private RateLimiter restoreRateLimiter;
private RateLimitingInputStream.Listener snapshotThrottleListener;
private RateLimitingInputStream.Listener restoreThrottleListener;
private boolean compress;
private final ParseFieldMatcher parseFieldMatcher;
protected static final String LEGACY_SNAPSHOT_PREFIX = "snapshot-";
protected static final String LEGACY_SNAPSHOT_NAME_FORMAT = LEGACY_SNAPSHOT_PREFIX + "%s";
protected static final String SNAPSHOT_PREFIX = "snap-";
protected static final String SNAPSHOT_NAME_FORMAT = SNAPSHOT_PREFIX + "%s.dat";
protected static final String SNAPSHOT_CODEC = "snapshot";
protected static final String SNAPSHOT_INDEX_PREFIX = "index-";
protected static final String SNAPSHOT_INDEX_NAME_FORMAT = SNAPSHOT_INDEX_PREFIX + "%s";
protected static final String SNAPSHOT_INDEX_CODEC = "snapshots";
protected static final String DATA_BLOB_PREFIX = "__";
private ChecksumBlobStoreFormat<BlobStoreIndexShardSnapshot> indexShardSnapshotFormat;
private LegacyBlobStoreFormat<BlobStoreIndexShardSnapshot> indexShardSnapshotLegacyFormat;
private ChecksumBlobStoreFormat<BlobStoreIndexShardSnapshots> indexShardSnapshotsFormat;
@Inject
public BlobStoreIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) {
super(settings);
this.parseFieldMatcher = new ParseFieldMatcher(settings);
this.repositoryName = repositoryName.name();
this.indicesService = indicesService;
this.clusterService = clusterService;
}
/**
* Called by {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository} on repository startup
*
* @param blobStore blob store
* @param basePath base path to blob store
* @param chunkSize chunk size
*/
public void initialize(BlobStore blobStore, BlobPath basePath, ByteSizeValue chunkSize,
RateLimiter snapshotRateLimiter, RateLimiter restoreRateLimiter,
final RateLimiterListener rateLimiterListener, boolean compress) {
this.blobStore = blobStore;
this.basePath = basePath;
this.chunkSize = chunkSize;
this.snapshotRateLimiter = snapshotRateLimiter;
this.restoreRateLimiter = restoreRateLimiter;
this.snapshotThrottleListener = nanos -> rateLimiterListener.onSnapshotPause(nanos);
this.restoreThrottleListener = nanos -> rateLimiterListener.onRestorePause(nanos);
this.compress = compress;
indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress());
indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher);
indexShardSnapshotsFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_INDEX_CODEC, SNAPSHOT_INDEX_NAME_FORMAT, BlobStoreIndexShardSnapshots.PROTO, parseFieldMatcher, isCompress());
}
/**
* {@inheritDoc}
*/
@Override
public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
SnapshotContext snapshotContext = new SnapshotContext(snapshotId, shardId, snapshotStatus);
snapshotStatus.startTime(System.currentTimeMillis());
try {
snapshotContext.snapshot(snapshotIndexCommit);
snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime());
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE);
} catch (Exception e) {
snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime());
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FAILURE);
snapshotStatus.failure(ExceptionsHelper.detailedMessage(e));
if (e instanceof IndexShardSnapshotFailedException) {
throw (IndexShardSnapshotFailedException) e;
} else {
throw new IndexShardSnapshotFailedException(shardId, e);
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
final RestoreContext snapshotContext = new RestoreContext(snapshotId, version, shardId, snapshotShardId, recoveryState);
try {
snapshotContext.restore();
} catch (Exception e) {
throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e);
}
}
/**
* {@inheritDoc}
*/
@Override
public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) {
Context context = new Context(snapshotId, version, shardId);
BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
IndexShardSnapshotStatus status = new IndexShardSnapshotStatus();
status.updateStage(IndexShardSnapshotStatus.Stage.DONE);
status.startTime(snapshot.startTime());
status.files(snapshot.numberOfFiles(), snapshot.totalSize());
// The snapshot is done which means the number of processed files is the same as total
status.processedFiles(snapshot.numberOfFiles(), snapshot.totalSize());
status.time(snapshot.time());
return status;
}
@Override
public void verify(String seed) {
BlobContainer testBlobContainer = blobStore.blobContainer(basePath.add(testBlobPrefix(seed)));
DiscoveryNode localNode = clusterService.localNode();
if (testBlobContainer.blobExists("master.dat")) {
try {
testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", new BytesArray(seed));
} catch (IOException exp) {
throw new RepositoryVerificationException(repositoryName, "store location [" + blobStore + "] is not accessible on the node [" + localNode + "]", exp);
}
} else {
throw new RepositoryVerificationException(repositoryName, "a file written by master to the store [" + blobStore + "] cannot be accessed on the node [" + localNode + "]. "
+ "This might indicate that the store [" + blobStore + "] is not shared between this node and the master node or "
+ "that permissions on the store don't allow reading files written by the master node");
}
}
/**
* Delete shard snapshot
*
* @param snapshotId snapshot id
* @param shardId shard id
*/
public void delete(SnapshotId snapshotId, Version version, ShardId shardId) {
Context context = new Context(snapshotId, version, shardId, shardId);
context.delete();
}
@Override
public String toString() {
return "BlobStoreIndexShardRepository[" +
"[" + repositoryName +
"], [" + blobStore + ']' +
']';
}
/**
* Returns true if metadata files should be compressed
*
* @return true if compression is needed
*/
protected boolean isCompress() {
return compress;
}
BlobStoreFormat<BlobStoreIndexShardSnapshot> indexShardSnapshotFormat(Version version) {
if (BlobStoreRepository.legacyMetaData(version)) {
return indexShardSnapshotLegacyFormat;
} else {
return indexShardSnapshotFormat;
}
}
/**
* Context for snapshot/restore operations
*/
private class Context {
protected final SnapshotId snapshotId;
protected final ShardId shardId;
protected final BlobContainer blobContainer;
protected final Version version;
public Context(SnapshotId snapshotId, Version version, ShardId shardId) {
this(snapshotId, version, shardId, shardId);
}
public Context(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId) {
this.snapshotId = snapshotId;
this.version = version;
this.shardId = shardId;
blobContainer = blobStore.blobContainer(basePath.add("indices").add(snapshotShardId.getIndexName()).add(Integer.toString(snapshotShardId.getId())));
}
/**
* Delete shard snapshot
*/
public void delete() {
final Map<String, BlobMetaData> blobs;
try {
blobs = blobContainer.listBlobs();
} catch (IOException e) {
throw new IndexShardSnapshotException(shardId, "Failed to list content of gateway", e);
}
Tuple<BlobStoreIndexShardSnapshots, Integer> tuple = buildBlobStoreIndexShardSnapshots(blobs);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
int fileListGeneration = tuple.v2();
try {
indexShardSnapshotFormat(version).delete(blobContainer, snapshotId.getName());
} catch (IOException e) {
logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId);
}
// Build a list of snapshots that should be preserved
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
for (SnapshotFiles point : snapshots) {
if (!point.snapshot().equals(snapshotId.getName())) {
newSnapshotsList.add(point);
}
}
// finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index
finalize(newSnapshotsList, fileListGeneration + 1, blobs);
}
/**
* Loads information about shard snapshot
*/
public BlobStoreIndexShardSnapshot loadSnapshot() {
try {
return indexShardSnapshotFormat(version).read(blobContainer, snapshotId.getName());
} catch (IOException ex) {
throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex);
}
}
/**
* Removes all unreferenced files from the repository and writes new index file
*
* We need to be really careful in handling index files in case of failures to make sure we have index file that
* points to files that were deleted.
*
*
* @param snapshots list of active snapshots in the container
* @param fileListGeneration the generation number of the snapshot index file
* @param blobs list of blobs in the container
*/
protected void finalize(List<SnapshotFiles> snapshots, int fileListGeneration, Map<String, BlobMetaData> blobs) {
BlobStoreIndexShardSnapshots newSnapshots = new BlobStoreIndexShardSnapshots(snapshots);
List<String> blobsToDelete = new ArrayList<>();
// delete old index files first
for (String blobName : blobs.keySet()) {
// delete old file lists
if (indexShardSnapshotsFormat.isTempBlobName(blobName) || blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) {
blobsToDelete.add(blobName);
}
}
try {
blobContainer.deleteBlobs(blobsToDelete);
} catch (IOException e) {
// We cannot delete index file - this is fatal, we cannot continue, otherwise we might end up
// with references to non-existing files
throw new IndexShardSnapshotFailedException(shardId, "error deleting index files during cleanup", e);
}
blobsToDelete = new ArrayList<>();
// now go over all the blobs, and if they don't exists in a snapshot, delete them
for (String blobName : blobs.keySet()) {
// delete unused files
if (blobName.startsWith(DATA_BLOB_PREFIX)) {
if (newSnapshots.findNameFile(FileInfo.canonicalName(blobName)) == null) {
blobsToDelete.add(blobName);
}
}
}
try {
blobContainer.deleteBlobs(blobsToDelete);
} catch (IOException e) {
logger.debug("[{}] [{}] error deleting some of the blobs [{}] during cleanup", e, snapshotId, shardId, blobsToDelete);
}
// If we deleted all snapshots - we don't need to create the index file
if (snapshots.size() > 0) {
try {
indexShardSnapshotsFormat.writeAtomic(newSnapshots, blobContainer, Integer.toString(fileListGeneration));
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write file list", e);
}
}
}
/**
* Generates blob name
*
* @param generation the blob number
* @return the blob name
*/
protected String fileNameFromGeneration(long generation) {
return DATA_BLOB_PREFIX + Long.toString(generation, Character.MAX_RADIX);
}
/**
* Finds the next available blob number
*
* @param blobs list of blobs in the repository
* @return next available blob number
*/
protected long findLatestFileNameGeneration(Map<String, BlobMetaData> blobs) {
long generation = -1;
for (String name : blobs.keySet()) {
if (!name.startsWith(DATA_BLOB_PREFIX)) {
continue;
}
name = FileInfo.canonicalName(name);
try {
long currentGen = Long.parseLong(name.substring(DATA_BLOB_PREFIX.length()), Character.MAX_RADIX);
if (currentGen > generation) {
generation = currentGen;
}
} catch (NumberFormatException e) {
logger.warn("file [{}] does not conform to the '{}' schema", name, DATA_BLOB_PREFIX);
}
}
return generation;
}
/**
* Loads all available snapshots in the repository
*
* @param blobs list of blobs in repository
* @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation
*/
protected Tuple<BlobStoreIndexShardSnapshots, Integer> buildBlobStoreIndexShardSnapshots(Map<String, BlobMetaData> blobs) {
int latest = -1;
for (String name : blobs.keySet()) {
if (name.startsWith(SNAPSHOT_INDEX_PREFIX)) {
try {
int gen = Integer.parseInt(name.substring(SNAPSHOT_INDEX_PREFIX.length()));
if (gen > latest) {
latest = gen;
}
} catch (NumberFormatException ex) {
logger.warn("failed to parse index file name [{}]", name);
}
}
}
if (latest >= 0) {
try {
final BlobStoreIndexShardSnapshots shardSnapshots =
indexShardSnapshotsFormat.read(blobContainer, Integer.toString(latest));
return new Tuple<>(shardSnapshots, latest);
} catch (IOException e) {
logger.warn("failed to read index file [{}]", e, SNAPSHOT_INDEX_PREFIX + latest);
}
}
// We couldn't load the index file - falling back to loading individual snapshots
List<SnapshotFiles> snapshots = new ArrayList<>();
for (String name : blobs.keySet()) {
try {
BlobStoreIndexShardSnapshot snapshot = null;
if (name.startsWith(SNAPSHOT_PREFIX)) {
snapshot = indexShardSnapshotFormat.readBlob(blobContainer, name);
} else if (name.startsWith(LEGACY_SNAPSHOT_PREFIX)) {
snapshot = indexShardSnapshotLegacyFormat.readBlob(blobContainer, name);
}
if (snapshot != null) {
snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
}
} catch (IOException e) {
logger.warn("failed to read commit point [{}]", e, name);
}
}
return new Tuple<>(new BlobStoreIndexShardSnapshots(snapshots), -1);
}
}
/**
* Context for snapshot operations
*/
private class SnapshotContext extends Context {
private final Store store;
private final IndexShardSnapshotStatus snapshotStatus;
/**
* Constructs new context
*
* @param snapshotId snapshot id
* @param shardId shard to be snapshotted
* @param snapshotStatus snapshot status to report progress
*/
public SnapshotContext(SnapshotId snapshotId, ShardId shardId, IndexShardSnapshotStatus snapshotStatus) {
super(snapshotId, Version.CURRENT, shardId);
this.snapshotStatus = snapshotStatus;
store = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()).store();
}
/**
* Create snapshot from index commit point
*
* @param snapshotIndexCommit snapshot commit point
*/
public void snapshot(IndexCommit snapshotIndexCommit) {
logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, repositoryName);
store.incRef();
try {
final Map<String, BlobMetaData> blobs;
try {
blobs = blobContainer.listBlobs();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e);
}
long generation = findLatestFileNameGeneration(blobs);
Tuple<BlobStoreIndexShardSnapshots, Integer> tuple = buildBlobStoreIndexShardSnapshots(blobs);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
int fileListGeneration = tuple.v2();
final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles = new ArrayList<>();
int indexNumberOfFiles = 0;
long indexTotalFilesSize = 0;
ArrayList<FileInfo> filesToSnapshot = new ArrayList<>();
final Store.MetadataSnapshot metadata;
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
final Collection<String> fileNames;
try {
metadata = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
}
for (String fileName : fileNames) {
if (snapshotStatus.aborted()) {
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
}
logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
final StoreFileMetaData md = metadata.get(fileName);
FileInfo existingFileInfo = null;
List<FileInfo> filesInfo = snapshots.findPhysicalIndexFiles(fileName);
if (filesInfo != null) {
for (FileInfo fileInfo : filesInfo) {
try {
// in 1.3.3 we added additional hashes for .si / segments_N files
// to ensure we don't double the space in the repo since old snapshots
// don't have this hash we try to read that hash from the blob store
// in a bwc compatible way.
maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata);
} catch (Exception e) {
logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata());
}
if (fileInfo.isSame(md) && snapshotFileExistsInBlobs(fileInfo, blobs)) {
// a commit point file with the same name, size and checksum was already copied to repository
// we will reuse it for this snapshot
existingFileInfo = fileInfo;
break;
}
}
}
if (existingFileInfo == null) {
indexNumberOfFiles++;
indexTotalFilesSize += md.length();
// create a new FileInfo
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = new BlobStoreIndexShardSnapshot.FileInfo(fileNameFromGeneration(++generation), md, chunkSize);
indexCommitPointFiles.add(snapshotFileInfo);
filesToSnapshot.add(snapshotFileInfo);
} else {
indexCommitPointFiles.add(existingFileInfo);
}
}
snapshotStatus.files(indexNumberOfFiles, indexTotalFilesSize);
if (snapshotStatus.aborted()) {
logger.debug("[{}] [{}] Aborted during initialization", shardId, snapshotId);
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
}
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.STARTED);
for (FileInfo snapshotFileInfo : filesToSnapshot) {
try {
snapshotFile(snapshotFileInfo);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e);
}
}
snapshotStatus.indexVersion(snapshotIndexCommit.getGeneration());
// now create and write the commit point
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE);
BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(),
snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(),
// snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong
System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize);
//TODO: The time stored in snapshot doesn't include cleanup time.
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try {
indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getName());
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
}
// delete all files that are not referenced by any commit point
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
for (SnapshotFiles point : snapshots) {
newSnapshotsList.add(point);
}
// finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index
finalize(newSnapshotsList, fileListGeneration + 1, blobs);
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE);
} finally {
store.decRef();
}
}
/**
* Snapshot individual file
* <p>
* This is asynchronous method. Upon completion of the operation latch is getting counted down and any failures are
* added to the {@code failures} list
*
* @param fileInfo file to be snapshotted
*/
private void snapshotFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) throws IOException {
final String file = fileInfo.physicalName();
try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) {
for (int i = 0; i < fileInfo.numberOfParts(); i++) {
final long partBytes = fileInfo.partBytes(i);
final InputStreamIndexInput inputStreamIndexInput = new InputStreamIndexInput(indexInput, partBytes);
InputStream inputStream = snapshotRateLimiter == null ? inputStreamIndexInput : new RateLimitingInputStream(inputStreamIndexInput, snapshotRateLimiter, snapshotThrottleListener);
inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName());
blobContainer.writeBlob(fileInfo.partName(i), inputStream, partBytes);
}
Store.verify(indexInput);
snapshotStatus.addProcessedFile(fileInfo.length());
} catch (Exception t) {
failStoreIfCorrupted(t);
snapshotStatus.addProcessedFile(0);
throw t;
}
}
private void failStoreIfCorrupted(Exception e) {
if (e instanceof CorruptIndexException || e instanceof IndexFormatTooOldException || e instanceof IndexFormatTooNewException) {
try {
store.markStoreCorrupted((IOException) e);
} catch (IOException inner) {
inner.addSuppressed(e);
logger.warn("store cannot be marked as corrupted", inner);
}
}
}
/**
* Checks if snapshot file already exists in the list of blobs
*
* @param fileInfo file to check
* @param blobs list of blobs
* @return true if file exists in the list of blobs
*/
private boolean snapshotFileExistsInBlobs(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Map<String, BlobMetaData> blobs) {
BlobMetaData blobMetaData = blobs.get(fileInfo.name());
if (blobMetaData != null) {
return blobMetaData.length() == fileInfo.length();
} else if (blobs.containsKey(fileInfo.partName(0))) {
// multi part file sum up the size and check
int part = 0;
long totalSize = 0;
while (true) {
blobMetaData = blobs.get(fileInfo.partName(part++));
if (blobMetaData == null) {
break;
}
totalSize += blobMetaData.length();
}
return totalSize == fileInfo.length();
}
// no file, not exact and not multipart
return false;
}
private class AbortableInputStream extends FilterInputStream {
private final String fileName;
public AbortableInputStream(InputStream delegate, String fileName) {
super(delegate);
this.fileName = fileName;
}
@Override
public int read() throws IOException {
checkAborted();
return in.read();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
checkAborted();
return in.read(b, off, len);
}
private void checkAborted() {
if (snapshotStatus.aborted()) {
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
}
}
}
}
/**
* This is a BWC layer to ensure we update the snapshots metadata with the corresponding hashes before we compare them.
* The new logic for StoreFileMetaData reads the entire <tt>.si</tt> and <tt>segments.n</tt> files to strengthen the
* comparison of the files on a per-segment / per-commit level.
*/
private static void maybeRecalculateMetadataHash(final BlobContainer blobContainer, final FileInfo fileInfo, Store.MetadataSnapshot snapshot) throws Exception {
final StoreFileMetaData metadata;
if (fileInfo != null && (metadata = snapshot.get(fileInfo.physicalName())) != null) {
if (metadata.hash().length > 0 && fileInfo.metadata().hash().length == 0) {
// we have a hash - check if our repo has a hash too otherwise we have
// to calculate it.
// we might have multiple parts even though the file is small... make sure we read all of it.
try (final InputStream stream = new PartSliceStream(blobContainer, fileInfo)) {
BytesRefBuilder builder = new BytesRefBuilder();
Store.MetadataSnapshot.hashFile(builder, stream, fileInfo.length());
BytesRef hash = fileInfo.metadata().hash(); // reset the file infos metadata hash
assert hash.length == 0;
hash.bytes = builder.bytes();
hash.offset = 0;
hash.length = builder.length();
}
}
}
}
private static final class PartSliceStream extends SlicedInputStream {
private final BlobContainer container;
private final FileInfo info;
public PartSliceStream(BlobContainer container, FileInfo info) {
super(info.numberOfParts());
this.info = info;
this.container = container;
}
@Override
protected InputStream openSlice(long slice) throws IOException {
return container.readBlob(info.partName(slice));
}
}
/**
* Context for restore operations
*/
private class RestoreContext extends Context {
private final Store store;
private final RecoveryState recoveryState;
/**
* Constructs new restore context
*
* @param snapshotId snapshot id
* @param shardId shard to be restored
* @param snapshotShardId shard in the snapshot that data should be restored from
* @param recoveryState recovery state to report progress
*/
public RestoreContext(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
super(snapshotId, version, shardId, snapshotShardId);
this.recoveryState = recoveryState;
store = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()).store();
}
/**
* Performs restore operation
*/
public void restore() throws IOException {
store.incRef();
try {
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
BlobStoreIndexShardSnapshot snapshot = loadSnapshot();
if (snapshot.indexFiles().size() == 1
&& snapshot.indexFiles().get(0).physicalName().startsWith("segments_")
&& snapshot.indexFiles().get(0).hasUnknownChecksum()) {
// If the shard has no documents, it will only contain a single segments_N file for the
// shard's snapshot. If we are restoring a snapshot created by a previous supported version,
// it is still possible that in that version, an empty shard has a segments_N file with an unsupported
// version (and no checksum), because we don't know the Lucene version to assign segments_N until we
// have written some data. Since the segments_N for an empty shard could have an incompatible Lucene
// version number and no checksum, even though the index itself is perfectly fine to restore, this
// empty shard would cause exceptions to be thrown. Since there is no data to restore from an empty
// shard anyway, we just create the empty shard here and then exit.
IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(null)
.setOpenMode(IndexWriterConfig.OpenMode.CREATE)
.setCommitOnClose(true));
writer.close();
return;
}
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
final Store.MetadataSnapshot recoveryTargetMetadata;
try {
recoveryTargetMetadata = store.getMetadataOrEmpty();
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
logger.warn("{} Can't read metadata from store", e, shardId);
throw new IndexShardRestoreFailedException(shardId, "Can't restore corrupted shard", e);
}
final List<FileInfo> filesToRecover = new ArrayList<>();
final Map<String, StoreFileMetaData> snapshotMetaData = new HashMap<>();
final Map<String, FileInfo> fileInfos = new HashMap<>();
for (final FileInfo fileInfo : snapshot.indexFiles()) {
try {
// in 1.3.3 we added additional hashes for .si / segments_N files
// to ensure we don't double the space in the repo since old snapshots
// don't have this hash we try to read that hash from the blob store
// in a bwc compatible way.
maybeRecalculateMetadataHash(blobContainer, fileInfo, recoveryTargetMetadata);
} catch (Exception e) {
// if the index is broken we might not be able to read it
logger.warn("{} Can't calculate hash from blog for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata());
}
snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata());
fileInfos.put(fileInfo.metadata().name(), fileInfo);
}
final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(unmodifiableMap(snapshotMetaData), emptyMap(), 0);
final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata);
for (StoreFileMetaData md : diff.identical) {
FileInfo fileInfo = fileInfos.get(md.name());
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), true);
if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] not_recovering [{}] from [{}], exists in local store and is same", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name());
}
}
for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) {
FileInfo fileInfo = fileInfos.get(md.name());
filesToRecover.add(fileInfo);
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), false);
if (logger.isTraceEnabled()) {
if (md == null) {
logger.trace("[{}] [{}] recovering [{}] from [{}], does not exists in local store", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name());
} else {
logger.trace("[{}] [{}] recovering [{}] from [{}], exists in local store but is different", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name());
}
}
}
final RecoveryState.Index index = recoveryState.getIndex();
if (filesToRecover.isEmpty()) {
logger.trace("no files to recover, all exists within the local store");
}
if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", shardId, snapshotId,
index.totalRecoverFiles(), new ByteSizeValue(index.totalRecoverBytes()), index.reusedFileCount(), new ByteSizeValue(index.reusedFileCount()));
}
try {
for (final FileInfo fileToRecover : filesToRecover) {
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
restoreFile(fileToRecover);
}
} catch (IOException ex) {
throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex);
}
final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile();
if (recoveryTargetMetadata == null) {
throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file");
}
assert restoredSegmentsFile != null;
// read the snapshot data persisted
final SegmentInfos segmentCommitInfos;
try {
segmentCommitInfos = Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory());
} catch (IOException e) {
throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
}
recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion());
/// now, go over and clean files that are in the store, but were not in the snapshot
try {
for (String storeFile : store.directory().listAll()) {
if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) {
continue; //skip write.lock, checksum files and files that exist in the snapshot
}
try {
store.deleteQuiet("restore", storeFile);
store.directory().deleteFile(storeFile);
} catch (IOException e) {
logger.warn("[{}] failed to delete file [{}] during snapshot cleanup", snapshotId, storeFile);
}
}
} catch (IOException e) {
logger.warn("[{}] failed to list directory - some of files might not be deleted", snapshotId);
}
} finally {
store.decRef();
}
}
/**
* Restores a file
* This is asynchronous method. Upon completion of the operation latch is getting counted down and any failures are
* added to the {@code failures} list
*
* @param fileInfo file to be restored
*/
private void restoreFile(final FileInfo fileInfo) throws IOException {
boolean success = false;
try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) {
final InputStream stream;
if (restoreRateLimiter == null) {
stream = partSliceStream;
} else {
stream = new RateLimitingInputStream(partSliceStream, restoreRateLimiter, restoreThrottleListener);
}
try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
final byte[] buffer = new byte[BUFFER_SIZE];
int length;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, 0, length);
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length);
}
Store.verify(indexOutput);
indexOutput.close();
store.directory().sync(Collections.singleton(fileInfo.physicalName()));
success = true;
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
try {
store.markStoreCorrupted(ex);
} catch (IOException e) {
logger.warn("store cannot be marked as corrupted", e);
}
throw ex;
} finally {
if (success == false) {
store.deleteQuiet(fileInfo.physicalName());
}
}
}
}
}
public interface RateLimiterListener {
void onRestorePause(long nanos);
void onSnapshotPause(long nanos);
}
}

View File

@ -21,8 +21,6 @@ package org.elasticsearch.repositories;
import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.repositories.uri.URLRepository;
import org.elasticsearch.snapshots.RestoreService;
@ -32,20 +30,20 @@ import org.elasticsearch.snapshots.SnapshotsService;
/**
* Sets up classes for Snapshot/Restore.
*
* Plugins can add custom repository types by calling {@link #registerRepository(String, Class, Class)}.
* Plugins can add custom repository types by calling {@link #registerRepository(String, Class)}.
*/
public class RepositoriesModule extends AbstractModule {
private final RepositoryTypesRegistry repositoryTypes = new RepositoryTypesRegistry();
public RepositoriesModule() {
registerRepository(FsRepository.TYPE, FsRepository.class, BlobStoreIndexShardRepository.class);
registerRepository(URLRepository.TYPE, URLRepository.class, BlobStoreIndexShardRepository.class);
registerRepository(FsRepository.TYPE, FsRepository.class);
registerRepository(URLRepository.TYPE, URLRepository.class);
}
/** Registers a custom repository type to the given {@link Repository} and {@link IndexShardRepository}. */
public void registerRepository(String type, Class<? extends Repository> repositoryType, Class<? extends IndexShardRepository> shardRepositoryType) {
repositoryTypes.registerRepository(type, repositoryType, shardRepositoryType);
/** Registers a custom repository type to the given {@link Repository}. */
public void registerRepository(String type, Class<? extends Repository> repositoryType) {
repositoryTypes.registerRepository(type, repositoryType);
}
@Override

View File

@ -37,7 +37,6 @@ import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.transport.TransportService;
@ -336,23 +335,6 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
throw new RepositoryMissingException(repository);
}
/**
* Returns registered index shard repository
* <p>
* This method is called only on data nodes
*
* @param repository repository name
* @return registered repository
* @throws RepositoryMissingException if repository with such name isn't registered
*/
public IndexShardRepository indexShardRepository(String repository) {
RepositoryHolder holder = repositories.get(repository);
if (holder != null) {
return holder.indexShardRepository;
}
throw new RepositoryMissingException(repository);
}
/**
* Creates a new repository and adds it to the list of registered repositories.
* <p>
@ -403,14 +385,16 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
try {
ModulesBuilder modules = new ModulesBuilder();
RepositoryName name = new RepositoryName(repositoryMetaData.type(), repositoryMetaData.name());
modules.add(new RepositoryNameModule(name));
modules.add(new RepositoryModule(name, repositoryMetaData.settings(), this.settings, typesRegistry));
modules.add(b -> {
b.bind(RepositoryName.class).toInstance(name);
typesRegistry.bindType(b, repositoryMetaData.type());
b.bind(RepositorySettings.class).toInstance(new RepositorySettings(settings, repositoryMetaData.settings()));
});
repositoryInjector = modules.createChildInjector(injector);
Repository repository = repositoryInjector.getInstance(Repository.class);
IndexShardRepository indexShardRepository = repositoryInjector.getInstance(IndexShardRepository.class);
repository.start();
return new RepositoryHolder(repositoryMetaData.type(), repositoryMetaData.settings(), repository, indexShardRepository);
return new RepositoryHolder(repositoryMetaData.type(), repositoryMetaData.settings(), repository);
} catch (Exception e) {
logger.warn("failed to create repository [{}][{}]", e, repositoryMetaData.type(), repositoryMetaData.name());
throw new RepositoryException(repositoryMetaData.name(), "failed to create repository", e);
@ -472,13 +456,11 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
private final String type;
private final Settings settings;
private final Repository repository;
private final IndexShardRepository indexShardRepository;
public RepositoryHolder(String type, Settings settings,Repository repository, IndexShardRepository indexShardRepository) {
public RepositoryHolder(String type, Settings settings,Repository repository) {
this.type = type;
this.settings = settings;
this.repository = repository;
this.indexShardRepository = indexShardRepository;
}
}

View File

@ -19,7 +19,11 @@
package org.elasticsearch.repositories;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.index.shard.ShardId;
@ -29,22 +33,20 @@ import org.elasticsearch.snapshots.SnapshotShardFailure;
import java.io.IOException;
import java.util.List;
import java.util.function.Predicate;
/**
* Snapshot repository interface.
* An interface for interacting with a repository in snapshot and restore.
* <p>
* Responsible for index and cluster level operations. It's called only on master.
* Shard-level operations are performed using {@link org.elasticsearch.index.snapshots.IndexShardRepository}
* interface on data nodes.
* Implementations are responsible for reading and writing both metadata and actual shard data to and from
* a repository backend.
* <p>
* Typical snapshot usage pattern:
* To perform a snapshot:
* <ul>
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link org.elasticsearch.index.snapshots.IndexShardRepository#snapshot(SnapshotId, ShardId, IndexCommit, IndexShardSnapshotStatus)} for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot}
* with possible list of failures</li>
* <li>Data nodes call {@link Repository#snapshotShard(IndexShard, SnapshotId, IndexCommit, IndexShardSnapshotStatus)}
* for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
* </ul>
*/
public interface Repository extends LifecycleComponent {
@ -55,7 +57,7 @@ public interface Repository extends LifecycleComponent {
* @param snapshotId snapshot id
* @return information about snapshot
*/
SnapshotInfo readSnapshot(SnapshotId snapshotId);
SnapshotInfo getSnapshotInfo(SnapshotId snapshotId);
/**
* Returns global metadata associate with the snapshot.
@ -66,7 +68,7 @@ public interface Repository extends LifecycleComponent {
* @param indices list of indices
* @return information about snapshot
*/
MetaData readSnapshotMetaData(SnapshotInfo snapshot, List<String> indices) throws IOException;
MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<String> indices) throws IOException;
/**
* Returns the list of snapshots currently stored in the repository that match the given predicate on the snapshot name.
@ -74,7 +76,7 @@ public interface Repository extends LifecycleComponent {
*
* @return snapshot list
*/
List<SnapshotId> snapshots();
List<SnapshotId> getSnapshots();
/**
* Starts snapshotting process
@ -108,12 +110,12 @@ public interface Repository extends LifecycleComponent {
/**
* Returns snapshot throttle time in nanoseconds
*/
long snapshotThrottleTimeInNanos();
long getSnapshotThrottleTimeInNanos();
/**
* Returns restore throttle time in nanoseconds
*/
long restoreThrottleTimeInNanos();
long getRestoreThrottleTimeInNanos();
/**
@ -135,10 +137,57 @@ public interface Repository extends LifecycleComponent {
*/
void endVerification(String verificationToken);
/**
* Verifies repository settings on data node.
* @param verificationToken value returned by {@link org.elasticsearch.repositories.Repository#startVerification()}
* @param localNode the local node information, for inclusion in verification errors
*/
void verify(String verificationToken, DiscoveryNode localNode);
/**
* Returns true if the repository supports only read operations
* @return true if the repository is read/only
*/
boolean readOnly();
boolean isReadOnly();
/**
* Creates a snapshot of the shard based on the index commit point.
* <p>
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex} method.
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
* {@link IndexShardSnapshotStatus#aborted()} to see if the snapshot process should be aborted.
*
* @param shard shard to be snapshotted
* @param snapshotId snapshot id
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
*/
void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
/**
* Restores snapshot of the shard.
* <p>
* The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied.
*
* @param shard the shard to restore the index into
* @param snapshotId snapshot id
* @param version version of elasticsearch that created this snapshot
* @param snapshotShardId shard id (in the snapshot)
* @param recoveryState recovery state
*/
void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState);
/**
* Retrieve shard snapshot status for the stored snapshot
*
* @param snapshotId snapshot id
* @param version version of elasticsearch that created this snapshot
* @param shardId shard id
* @return snapshot status
*/
IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId);
}

View File

@ -1,61 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
/**
* Binds repository classes for the specific repository type.
*/
public class RepositoryModule extends AbstractModule {
private RepositoryName repositoryName;
private final Settings globalSettings;
private final Settings settings;
private final RepositoryTypesRegistry typesRegistry;
/**
* Spawns module for repository with specified name, type and settings
*
* @param repositoryName repository name and type
* @param settings repository settings
* @param globalSettings global settings
* @param typesRegistry registry of repository types
*/
public RepositoryModule(RepositoryName repositoryName, Settings settings, Settings globalSettings, RepositoryTypesRegistry typesRegistry) {
this.repositoryName = repositoryName;
this.globalSettings = globalSettings;
this.settings = settings;
this.typesRegistry = typesRegistry;
}
/**
* {@inheritDoc}
*/
@Override
protected void configure() {
typesRegistry.bindType(binder(), repositoryName.type());
bind(RepositorySettings.class).toInstance(new RepositorySettings(globalSettings, settings));
}
}

View File

@ -1,39 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.inject.AbstractModule;
/**
* Binds specific instance of RepositoryName for injection to repository module
*/
public class RepositoryNameModule extends AbstractModule {
private final RepositoryName repositoryName;
public RepositoryNameModule(RepositoryName repositoryName) {
this.repositoryName = repositoryName;
}
@Override
protected void configure() {
bind(RepositoryName.class).toInstance(repositoryName);
}
}

View File

@ -22,22 +22,18 @@ package org.elasticsearch.repositories;
import org.elasticsearch.common.inject.Binder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.ExtensionPoint;
import org.elasticsearch.index.snapshots.IndexShardRepository;
/**
* A mapping from type name to implementations of {@link Repository} and {@link IndexShardRepository}.
* A mapping from type name to implementations of {@link Repository}.
*/
public class RepositoryTypesRegistry {
// invariant: repositories and shardRepositories have the same keyset
private final ExtensionPoint.SelectedType<Repository> repositoryTypes =
new ExtensionPoint.SelectedType<>("repository", Repository.class);
private final ExtensionPoint.SelectedType<IndexShardRepository> shardRepositoryTypes =
new ExtensionPoint.SelectedType<>("index_repository", IndexShardRepository.class);
/** Adds a new repository type to the registry, bound to the given implementation classes. */
public void registerRepository(String name, Class<? extends Repository> repositoryType, Class<? extends IndexShardRepository> shardRepositoryType) {
public void registerRepository(String name, Class<? extends Repository> repositoryType) {
repositoryTypes.registerExtension(name, repositoryType);
shardRepositoryTypes.registerExtension(name, shardRepositoryType);
}
/**
@ -47,6 +43,5 @@ public class RepositoryTypesRegistry {
public void bindType(Binder binder, String type) {
Settings settings = Settings.builder().put("type", type).build();
repositoryTypes.bindType(binder, settings, "type", null);
shardRepositoryTypes.bindType(binder, settings, "type", null);
}
}

View File

@ -19,6 +19,12 @@
package org.elasticsearch.repositories;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import com.carrotsearch.hppc.ObjectContainer;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.action.ActionListener;
@ -29,7 +35,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoriesService.VerifyResponse;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
@ -40,12 +45,6 @@ import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
public class VerifyNodeRepositoryAction extends AbstractComponent {
public static final String ACTION_NAME = "internal:admin/repository/verify";
@ -82,7 +81,7 @@ public class VerifyNodeRepositoryAction extends AbstractComponent {
for (final DiscoveryNode node : nodes) {
if (node.equals(localNode)) {
try {
doVerify(repository, verificationToken);
doVerify(repository, verificationToken, localNode);
} catch (Exception e) {
logger.warn("[{}] failed to verify repository", e, repository);
errors.add(new VerificationFailure(node.getId(), e));
@ -115,9 +114,9 @@ public class VerifyNodeRepositoryAction extends AbstractComponent {
listener.onResponse(new RepositoriesService.VerifyResponse(nodes.toArray(new DiscoveryNode[nodes.size()]), errors.toArray(new VerificationFailure[errors.size()])));
}
private void doVerify(String repository, String verificationToken) {
IndexShardRepository blobStoreIndexShardRepository = repositoriesService.indexShardRepository(repository);
blobStoreIndexShardRepository.verify(verificationToken);
private void doVerify(String repositoryName, String verificationToken, DiscoveryNode localNode) {
Repository repository = repositoriesService.repository(repositoryName);
repository.verify(verificationToken, localNode);
}
public static class VerifyNodeRepositoryRequest extends TransportRequest {
@ -151,8 +150,9 @@ public class VerifyNodeRepositoryAction extends AbstractComponent {
class VerifyNodeRepositoryRequestHandler implements TransportRequestHandler<VerifyNodeRepositoryRequest> {
@Override
public void messageReceived(VerifyNodeRepositoryRequest request, TransportChannel channel) throws Exception {
DiscoveryNode localNode = clusterService.state().nodes().getLocalNode();
try {
doVerify(request.repository, request.verificationToken);
doVerify(request.repository, request.verificationToken, localNode);
} catch (Exception ex) {
logger.warn("[{}] failed to verify repository", ex, request.repository);
throw ex;

View File

@ -27,7 +27,6 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
@ -77,11 +76,10 @@ public class FsRepository extends BlobStoreRepository {
*
* @param name repository name
* @param repositorySettings repository settings
* @param indexShardRepository index shard repository
*/
@Inject
public FsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, Environment environment) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);
public FsRepository(RepositoryName name, RepositorySettings repositorySettings, Environment environment) throws IOException {
super(name.getName(), repositorySettings);
Path locationFile;
String location = REPOSITORIES_LOCATION_SETTING.get(repositorySettings.settings());
if (location.isEmpty()) {

View File

@ -1,43 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.uri;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.repositories.RepositoryName;
/**
*/
public class URLIndexShardRepository extends BlobStoreIndexShardRepository {
@Inject
public URLIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) {
super(settings, repositoryName, indicesService, clusterService);
}
@Override
public void verify(String seed) {
//TODO: Add verification that URL is accessible
}
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.util.URIPattern;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
@ -82,11 +81,10 @@ public class URLRepository extends BlobStoreRepository {
*
* @param name repository name
* @param repositorySettings repository settings
* @param indexShardRepository shard repository
*/
@Inject
public URLRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, Environment environment) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);
public URLRepository(RepositoryName name, RepositorySettings repositorySettings, Environment environment) throws IOException {
super(name.getName(), repositorySettings);
if (URL_SETTING.exists(repositorySettings.settings()) == false && REPOSITORIES_URL_SETTING.exists(settings) == false) {
throw new RepositoryException(name.name(), "missing url");
@ -101,9 +99,6 @@ public class URLRepository extends BlobStoreRepository {
basePath = BlobPath.cleanPath();
}
/**
* {@inheritDoc}
*/
@Override
protected BlobStore blobStore() {
return blobStore;
@ -146,7 +141,7 @@ public class URLRepository extends BlobStoreRepository {
}
@Override
public boolean readOnly() {
public boolean isReadOnly() {
return true;
}

View File

@ -63,7 +63,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;
@ -112,7 +111,7 @@ import static org.elasticsearch.common.util.set.Sets.newHashSet;
* method.
* <p>
* Individual shards are getting restored as part of normal recovery process in
* {@link IndexShard#restoreFromRepository(IndexShardRepository)} )}
* {@link IndexShard#restoreFromRepository(Repository)} )}
* method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking
* at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property.
* <p>
@ -186,16 +185,16 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
try {
// Read snapshot info and metadata from the repository
Repository repository = repositoriesService.repository(request.repositoryName);
final Optional<SnapshotId> matchingSnapshotId = repository.snapshots().stream()
final Optional<SnapshotId> matchingSnapshotId = repository.getSnapshots().stream()
.filter(s -> request.snapshotName.equals(s.getName())).findFirst();
if (matchingSnapshotId.isPresent() == false) {
throw new SnapshotRestoreException(request.repositoryName, request.snapshotName, "snapshot does not exist");
}
final SnapshotId snapshotId = matchingSnapshotId.get();
final SnapshotInfo snapshotInfo = repository.readSnapshot(snapshotId);
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
final Snapshot snapshot = new Snapshot(request.repositoryName, snapshotId);
List<String> filteredIndices = SnapshotUtils.filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions());
MetaData metaDataIn = repository.readSnapshotMetaData(snapshotInfo, filteredIndices);
MetaData metaDataIn = repository.getSnapshotMetaData(snapshotInfo, filteredIndices);
final MetaData metaData;
if (snapshotInfo.version().before(Version.V_2_0_0_beta1)) {

View File

@ -43,10 +43,10 @@ import org.elasticsearch.index.engine.SnapshotFailedEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
@ -322,7 +322,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
* @param snapshotStatus snapshot status
*/
private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexShardSnapshotStatus snapshotStatus) {
IndexShardRepository indexShardRepository = snapshotsService.getRepositoriesService().indexShardRepository(snapshot.getRepository());
Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository());
ShardId shardId = indexShard.shardId();
if (!indexShard.routingEntry().primary()) {
throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
@ -340,11 +340,11 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
// we flush first to make sure we get the latest writes snapshotted
IndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true);
try {
indexShardRepository.snapshot(snapshot.getSnapshotId(), shardId, snapshotIndexCommit, snapshotStatus);
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), snapshotIndexCommit, snapshotStatus);
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n");
logger.debug("snapshot ({}) completed to {}, took [{}]\n{}", snapshot, indexShardRepository,
logger.debug("snapshot ({}) completed to {}, took [{}]\n{}", snapshot, repository,
TimeValue.timeValueMillis(snapshotStatus.time()), sb);
}
} finally {

View File

@ -55,7 +55,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
@ -133,7 +132,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public List<SnapshotId> snapshotIds(final String repositoryName) {
Repository repository = repositoriesService.repository(repositoryName);
assert repository != null; // should only be called once we've validated the repository exists
return repository.snapshots();
return repository.getSnapshots();
}
/**
@ -149,7 +148,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (!entries.isEmpty()) {
return inProgressSnapshot(entries.iterator().next());
}
return repositoriesService.repository(repositoryName).readSnapshot(snapshotId);
return repositoriesService.repository(repositoryName).getSnapshotInfo(snapshotId);
}
/**
@ -175,7 +174,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
final Repository repository = repositoriesService.repository(repositoryName);
for (SnapshotId snapshotId : snapshotIdsToIterate) {
try {
snapshotSet.add(repository.readSnapshot(snapshotId));
snapshotSet.add(repository.getSnapshotInfo(snapshotId));
} catch (Exception ex) {
if (ignoreUnavailable) {
logger.warn("failed to get snapshot [{}]", ex, snapshotId);
@ -547,8 +546,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
final SnapshotInfo snapshotInfo) throws IOException {
Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>();
Repository repository = repositoriesService.repository(repositoryName);
IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(repositoryName);
MetaData metaData = repository.readSnapshotMetaData(snapshotInfo, snapshotInfo.indices());
MetaData metaData = repository.getSnapshotMetaData(snapshotInfo, snapshotInfo.indices());
for (String index : snapshotInfo.indices()) {
IndexMetaData indexMetaData = metaData.indices().get(index);
if (indexMetaData != null) {
@ -563,7 +561,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
shardStatus.put(shardId, shardSnapshotStatus);
} else {
IndexShardSnapshotStatus shardSnapshotStatus =
indexShardRepository.snapshotStatus(snapshotInfo.snapshotId(), snapshotInfo.version(), shardId);
repository.getShardSnapshotStatus(snapshotInfo.snapshotId(), snapshotInfo.version(), shardId);
shardStatus.put(shardId, shardSnapshotStatus);
}
}
@ -955,7 +953,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public void deleteSnapshot(final String repositoryName, final String snapshotName, final DeleteSnapshotListener listener) {
// First, look for the snapshot in the repository
final Repository repository = repositoriesService.repository(repositoryName);
Optional<SnapshotId> matchedEntry = repository.snapshots().stream().filter(s -> s.getName().equals(snapshotName)).findFirst();
Optional<SnapshotId> matchedEntry = repository.getSnapshots().stream().filter(s -> s.getName().equals(snapshotName)).findFirst();
// if nothing found by the same name, then look in the cluster state for current in progress snapshots
if (matchedEntry.isPresent() == false) {
matchedEntry = currentSnapshots(repositoryName, Collections.emptyList()).stream()

View File

@ -18,6 +18,27 @@
*/
package org.elasticsearch.index.shard;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
@ -50,6 +71,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.RestoreSource;
@ -63,6 +85,7 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
@ -91,15 +114,17 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.FieldMaskingReader;
@ -108,27 +133,6 @@ import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
@ -1178,13 +1182,9 @@ public class IndexShardTests extends ESSingleNodeTestCase {
test_target_shard.updateRoutingEntry(routing);
DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT);
test_target_shard.markAsRecovering("store", new RecoveryState(routing.shardId(), routing.primary(), RecoveryState.Type.SNAPSHOT, routing.restoreSource(), localNode));
assertTrue(test_target_shard.restoreFromRepository(new IndexShardRepository() {
assertTrue(test_target_shard.restoreFromRepository(new RestoreOnlyRepository() {
@Override
public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
}
@Override
public void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) {
try {
cleanLuceneIndex(targetStore.directory());
for (String file : sourceStore.directory().listAll()) {
@ -1197,15 +1197,6 @@ public class IndexShardTests extends ESSingleNodeTestCase {
throw new RuntimeException(ex);
}
}
@Override
public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) {
return null;
}
@Override
public void verify(String verificationToken) {
}
}));
test_target_shard.updateRoutingEntry(routing.moveToStarted());
@ -1649,4 +1640,63 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertEquals(mappings.get("index_1").get("test").get().source().string(), "{\"test\":{\"properties\":{\"foo\":{\"type\":\"text\"}}}}");
}
/** A dummy repository for testing which just needs restore overridden */
private static abstract class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
public RestoreOnlyRepository() {
super(Settings.EMPTY);
}
@Override
protected void doStart() {}
@Override
protected void doStop() {}
@Override
protected void doClose() {}
@Override
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
return null;
}
@Override
public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<String> indices) throws IOException {
return null;
}
@Override
public List<SnapshotId> getSnapshots() {
return null;
}
@Override
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData metaData) {}
@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<String> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures) {
return null;
}
@Override
public void deleteSnapshot(SnapshotId snapshotId) {}
@Override
public long getSnapshotThrottleTimeInNanos() {
return 0;
}
@Override
public long getRestoreThrottleTimeInNanos() {
return 0;
}
@Override
public String startVerification() {
return null;
}
@Override
public void endVerification(String verificationToken) {}
@Override
public boolean isReadOnly() {
return false;
}
@Override
public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {}
@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) {
return null;
}
@Override
public void verify(String verificationToken, DiscoveryNode localNode) {}
}
}

View File

@ -100,7 +100,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
(BlobStoreRepository) repositoriesService.repository(repositoryName);
final List<SnapshotId> originalSnapshots = Arrays.asList(snapshotId1, snapshotId2);
List<SnapshotId> snapshotIds = repository.snapshots().stream()
List<SnapshotId> snapshotIds = repository.getSnapshots().stream()
.sorted((s1, s2) -> s1.getName().compareTo(s2.getName()))
.collect(Collectors.toList());
assertThat(snapshotIds, equalTo(originalSnapshots));
@ -110,9 +110,9 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
final BlobStoreRepository repository = setupRepo();
// write to and read from a snapshot file with no entries
assertThat(repository.snapshots().size(), equalTo(0));
assertThat(repository.getSnapshots().size(), equalTo(0));
repository.writeSnapshotsToIndexGen(Collections.emptyList());
assertThat(repository.snapshots().size(), equalTo(0));
assertThat(repository.getSnapshots().size(), equalTo(0));
// write to and read from a snapshot file with a random number of entries
final int numSnapshots = randomIntBetween(1, 1000);
@ -121,7 +121,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
}
repository.writeSnapshotsToIndexGen(snapshotIds);
assertThat(repository.snapshots(), equalTo(snapshotIds));
assertThat(repository.getSnapshots(), equalTo(snapshotIds));
}
public void testIndexGenerationalFiles() throws Exception {
@ -165,7 +165,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), SnapshotId.UNASSIGNED_UUID));
}
writeOldFormat(repository, snapshotIds.stream().map(SnapshotId::getName).collect(Collectors.toList()));
assertThat(Sets.newHashSet(repository.snapshots()), equalTo(Sets.newHashSet(snapshotIds)));
assertThat(Sets.newHashSet(repository.getSnapshots()), equalTo(Sets.newHashSet(snapshotIds)));
// write to and read from a snapshot file with a random number of new entries added
final int numSnapshots = randomIntBetween(1, 1000);
@ -173,7 +173,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
}
repository.writeSnapshotsToIndexGen(snapshotIds);
assertThat(Sets.newHashSet(repository.snapshots()), equalTo(Sets.newHashSet(snapshotIds)));
assertThat(Sets.newHashSet(repository.getSnapshots()), equalTo(Sets.newHashSet(snapshotIds)));
}
public void testBlobId() {

View File

@ -44,8 +44,6 @@ import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
/**
*/
@ESIntegTestCase.ClusterScope(minNumDataNodes = 2)
public class RepositoriesIT extends AbstractSnapshotIntegTestCase {
public void testRepositoryCreation() throws Exception {

View File

@ -1397,8 +1397,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
long snapshotPause = 0L;
long restorePause = 0L;
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
snapshotPause += repositoriesService.repository("test-repo").snapshotThrottleTimeInNanos();
restorePause += repositoriesService.repository("test-repo").restoreThrottleTimeInNanos();
snapshotPause += repositoriesService.repository("test-repo").getSnapshotThrottleTimeInNanos();
restorePause += repositoriesService.repository("test-repo").getRestoreThrottleTimeInNanos();
}
if (throttleSnapshot) {

View File

@ -19,29 +19,6 @@
package org.elasticsearch.snapshots.mockstore;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.fs.FsRepository;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
@ -55,6 +32,26 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotId;
public class MockRepository extends FsRepository {
public static class Plugin extends org.elasticsearch.plugins.Plugin {
@ -64,7 +61,7 @@ public class MockRepository extends FsRepository {
Setting.simpleString("secret.mock.password", Property.NodeScope, Property.Filtered);
public void onModule(RepositoriesModule repositoriesModule) {
repositoriesModule.registerRepository("mock", MockRepository.class, BlobStoreIndexShardRepository.class);
repositoriesModule.registerRepository("mock", MockRepository.class);
}
@Override
@ -100,8 +97,8 @@ public class MockRepository extends FsRepository {
private volatile boolean blocked = false;
@Inject
public MockRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, ClusterService clusterService, Environment environment) throws IOException {
super(name, overrideSettings(repositorySettings, clusterService), indexShardRepository, environment);
public MockRepository(RepositoryName name, RepositorySettings repositorySettings, ClusterService clusterService, Environment environment) throws IOException {
super(name, overrideSettings(repositorySettings, clusterService), environment);
randomControlIOExceptionRate = repositorySettings.settings().getAsDouble("random_control_io_exception_rate", 0.0);
randomDataFileIOExceptionRate = repositorySettings.settings().getAsDouble("random_data_file_io_exception_rate", 0.0);
maximumNumberOfFailures = repositorySettings.settings().getAsLong("max_failure_number", 100L);

View File

@ -19,6 +19,11 @@
package org.elasticsearch.plugin.repository.azure;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.cloud.azure.AzureRepositoryModule;
import org.elasticsearch.cloud.azure.storage.AzureStorageService;
import org.elasticsearch.common.inject.Module;
@ -26,16 +31,10 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.azure.AzureRepository;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
*
*/
@ -56,7 +55,7 @@ public class AzureRepositoryPlugin extends Plugin {
public void onModule(RepositoriesModule module) {
logger.debug("registering repository type [{}]", AzureRepository.TYPE);
module.registerRepository(AzureRepository.TYPE, AzureRepository.class, BlobStoreIndexShardRepository.class);
module.registerRepository(AzureRepository.TYPE, AzureRepository.class);
}
@Override

View File

@ -34,7 +34,6 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.RepositoryVerificationException;
@ -86,9 +85,8 @@ public class AzureRepository extends BlobStoreRepository {
@Inject
public AzureRepository(RepositoryName name, RepositorySettings repositorySettings,
IndexShardRepository indexShardRepository,
AzureBlobStore azureBlobStore) throws IOException, URISyntaxException, StorageException {
super(name.getName(), repositorySettings, indexShardRepository);
super(name.getName(), repositorySettings);
String container = getValue(repositorySettings, Repository.CONTAINER_SETTING, Storage.CONTAINER_SETTING);
@ -187,7 +185,7 @@ public class AzureRepository extends BlobStoreRepository {
}
@Override
public boolean readOnly() {
public boolean isReadOnly() {
return readonly;
}
}

View File

@ -19,6 +19,11 @@
package org.elasticsearch.plugin.repository.gcs;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.Collections;
import com.google.api.client.auth.oauth2.TokenRequest;
import com.google.api.client.auth.oauth2.TokenResponse;
import com.google.api.client.googleapis.json.GoogleJsonError;
@ -35,16 +40,10 @@ import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.Collections;
public class GoogleCloudStoragePlugin extends Plugin {
public static final String NAME = "repository-gcs";
@ -115,7 +114,6 @@ public class GoogleCloudStoragePlugin extends Plugin {
}
public void onModule(RepositoriesModule repositoriesModule) {
repositoriesModule.registerRepository(GoogleCloudStorageRepository.TYPE,
GoogleCloudStorageRepository.class, BlobStoreIndexShardRepository.class);
repositoriesModule.registerRepository(GoogleCloudStorageRepository.TYPE, GoogleCloudStorageRepository.class);
}
}

View File

@ -29,7 +29,6 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.plugin.repository.gcs.GoogleCloudStoragePlugin;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
@ -75,9 +74,8 @@ public class GoogleCloudStorageRepository extends BlobStoreRepository {
@Inject
public GoogleCloudStorageRepository(RepositoryName repositoryName, RepositorySettings repositorySettings,
IndexShardRepository indexShardRepository,
GoogleCloudStorageService storageService) throws Exception {
super(repositoryName.getName(), repositorySettings, indexShardRepository);
super(repositoryName.getName(), repositorySettings);
String bucket = get(BUCKET, repositoryName, repositorySettings);
String application = get(APPLICATION_NAME, repositoryName, repositorySettings);

View File

@ -26,7 +26,6 @@ import java.security.PrivilegedAction;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesModule;
@ -85,6 +84,6 @@ public final class HdfsPlugin extends Plugin {
}
public void onModule(RepositoriesModule repositoriesModule) {
repositoriesModule.registerRepository("hdfs", HdfsRepository.class, BlobStoreIndexShardRepository.class);
repositoriesModule.registerRepository("hdfs", HdfsRepository.class);
}
}

View File

@ -44,7 +44,6 @@ import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
@ -63,8 +62,8 @@ public final class HdfsRepository extends BlobStoreRepository {
private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB);
@Inject
public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);
public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings) throws IOException {
super(name.getName(), repositorySettings);
this.repositorySettings = repositorySettings;
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", null);

View File

@ -19,18 +19,6 @@
package org.elasticsearch.plugin.repository.s3;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cloud.aws.S3Module;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.s3.S3Repository;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
@ -39,6 +27,16 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cloud.aws.S3Module;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.s3.S3Repository;
/**
*
*/
@ -78,7 +76,7 @@ public class S3RepositoryPlugin extends Plugin {
}
public void onModule(RepositoriesModule repositoriesModule) {
repositoriesModule.registerRepository(S3Repository.TYPE, S3Repository.class, BlobStoreIndexShardRepository.class);
repositoriesModule.registerRepository(S3Repository.TYPE, S3Repository.class);
}
@Override

View File

@ -32,7 +32,6 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
@ -250,12 +249,11 @@ public class S3Repository extends BlobStoreRepository {
*
* @param name repository name
* @param repositorySettings repository settings
* @param indexShardRepository index shard repository
* @param s3Service S3 service
*/
@Inject
public S3Repository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AwsS3Service s3Service) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);
public S3Repository(RepositoryName name, RepositorySettings repositorySettings, AwsS3Service s3Service) throws IOException {
super(name.getName(), repositorySettings);
String bucket = getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING);
if (bucket == null) {

View File

@ -336,7 +336,7 @@ public class RepositoryS3SettingsTests extends ESTestCase {
.build());
try {
new S3Repository(new RepositoryName("s3", "s3repo"), s3RepositorySettings, null, null);
new S3Repository(new RepositoryName("s3", "s3repo"), s3RepositorySettings, null);
fail("We should either raise a NPE or a RepositoryException or a IllegalArgumentException");
} catch (RepositoryException e) {
assertThat(e.getDetailedMessage(), containsString(expectedMessage));