Update snapshot list when snapshot is deleted

This commit is contained in:
Igor Motov 2013-12-12 14:24:46 -05:00
parent aafd4ddfbd
commit 8c1073bb6e
3 changed files with 44 additions and 6 deletions

View File

@ -99,7 +99,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
private ImmutableBlobContainer snapshotsBlobContainer; private ImmutableBlobContainer snapshotsBlobContainer;
private final String repositoryName; protected final String repositoryName;
private static final String SNAPSHOT_PREFIX = "snapshot-"; private static final String SNAPSHOT_PREFIX = "snapshot-";
@ -244,6 +244,18 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
// Delete snapshot file first so we wouldn't end up with partially deleted snapshot that looks OK // Delete snapshot file first so we wouldn't end up with partially deleted snapshot that looks OK
snapshotsBlobContainer.deleteBlob(blobName); snapshotsBlobContainer.deleteBlob(blobName);
snapshotsBlobContainer.deleteBlob(metaDataBlobName(snapshotId)); snapshotsBlobContainer.deleteBlob(metaDataBlobName(snapshotId));
// Delete snapshot from the snapshot list
ImmutableList<SnapshotId> snapshotIds = snapshots();
if (snapshotIds.contains(snapshotId)) {
ImmutableList.Builder<SnapshotId> builder = ImmutableList.builder();
for (SnapshotId id : snapshotIds) {
if (!snapshotId.equals(id)) {
builder.add(id);
}
}
snapshotIds = builder.build();
}
writeSnapshotList(snapshotIds);
// Now delete all indices // Now delete all indices
for (String index : snapshot.indices()) { for (String index : snapshot.indices()) {
BlobPath indexPath = basePath().add("indices").add(index); BlobPath indexPath = basePath().add("indices").add(index);
@ -268,7 +280,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
*/ */
@Override @Override
public Snapshot finalizeSnapshot(SnapshotId snapshotId, String failure, int totalShards, ImmutableList<SnapshotShardFailure> shardFailures) { public Snapshot finalizeSnapshot(SnapshotId snapshotId, String failure, int totalShards, ImmutableList<SnapshotShardFailure> shardFailures) {
BlobStoreSnapshot snapshot = (BlobStoreSnapshot)readSnapshot(snapshotId); BlobStoreSnapshot snapshot = (BlobStoreSnapshot) readSnapshot(snapshotId);
if (snapshot == null) { if (snapshot == null) {
throw new SnapshotMissingException(snapshotId); throw new SnapshotMissingException(snapshotId);
} }

View File

@ -19,6 +19,8 @@
package org.elasticsearch.repositories.uri; package org.elasticsearch.repositories.uri;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.url.URLBlobStore; import org.elasticsearch.common.blobstore.url.URLBlobStore;
@ -52,6 +54,8 @@ public class URLRepository extends BlobStoreRepository {
private final BlobPath basePath; private final BlobPath basePath;
private boolean listDirectories;
/** /**
* Constructs new read-only URL-based repository * Constructs new read-only URL-based repository
* *
@ -72,6 +76,7 @@ public class URLRepository extends BlobStoreRepository {
} }
int concurrentStreams = repositorySettings.settings().getAsInt("concurrent_streams", componentSettings.getAsInt("concurrent_streams", 5)); int concurrentStreams = repositorySettings.settings().getAsInt("concurrent_streams", componentSettings.getAsInt("concurrent_streams", 5));
ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[fs_stream]")); ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[fs_stream]"));
listDirectories = repositorySettings.settings().getAsBoolean("list_directories", componentSettings.getAsBoolean("list_directories", true));
blobStore = new URLBlobStore(componentSettings, concurrentStreamPool, url); blobStore = new URLBlobStore(componentSettings, concurrentStreamPool, url);
basePath = BlobPath.cleanPath(); basePath = BlobPath.cleanPath();
} }
@ -88,4 +93,17 @@ public class URLRepository extends BlobStoreRepository {
protected BlobPath basePath() { protected BlobPath basePath() {
return basePath; return basePath;
} }
@Override
public ImmutableList<SnapshotId> snapshots() {
if (listDirectories) {
return super.snapshots();
} else {
try {
return readSnapshotList();
} catch (IOException ex) {
throw new RepositoryException(repositoryName, "failed to get snapshot list in repository", ex);
}
}
}
} }

View File

@ -25,6 +25,7 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
@ -784,7 +785,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
logger.info("--> trying to create a repository with different name"); logger.info("--> trying to create a repository with different name");
putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo-2") putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo-2")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", new File(repositoryLocation, "test")) .setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", new File(repositoryLocation, "test"))
).get(); ).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("--> unblocking blocked node"); logger.info("--> unblocking blocked node");
@ -851,13 +852,11 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
logger.info("--> delete index"); logger.info("--> delete index");
wipeIndices("test-idx"); wipeIndices("test-idx");
logger.info("--> delete file system repository");
wipeRepositories("test-repo");
logger.info("--> create read-only URL repository"); logger.info("--> create read-only URL repository");
putRepositoryResponse = client.admin().cluster().preparePutRepository("url-repo") putRepositoryResponse = client.admin().cluster().preparePutRepository("url-repo")
.setType("url").setSettings(ImmutableSettings.settingsBuilder() .setType("url").setSettings(ImmutableSettings.settingsBuilder()
.put("url", repositoryLocation.toURI().toURL()) .put("url", repositoryLocation.toURI().toURL())
.put("list_directories", randomBoolean())
).get(); ).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("--> restore index after deletion"); logger.info("--> restore index after deletion");
@ -870,6 +869,15 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
GetSnapshotsResponse getSnapshotsResponse = client.admin().cluster().prepareGetSnapshots("url-repo").get(); GetSnapshotsResponse getSnapshotsResponse = client.admin().cluster().prepareGetSnapshots("url-repo").get();
assertThat(getSnapshotsResponse.getSnapshots(), notNullValue()); assertThat(getSnapshotsResponse.getSnapshots(), notNullValue());
assertThat(getSnapshotsResponse.getSnapshots().size(), equalTo(1)); assertThat(getSnapshotsResponse.getSnapshots().size(), equalTo(1));
logger.info("--> delete snapshot");
DeleteSnapshotResponse deleteSnapshotResponse = client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap").get();
assertAcked(deleteSnapshotResponse);
logger.info("--> list available shapshot again, no snapshots should be returned");
getSnapshotsResponse = client.admin().cluster().prepareGetSnapshots("url-repo").get();
assertThat(getSnapshotsResponse.getSnapshots(), notNullValue());
assertThat(getSnapshotsResponse.getSnapshots().size(), equalTo(0));
} }
private boolean waitForIndex(String index, TimeValue timeout) throws InterruptedException { private boolean waitForIndex(String index, TimeValue timeout) throws InterruptedException {