diff --git a/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 0f9d59d89c3..9b649fb02fe 100644 --- a/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -20,6 +20,7 @@ package org.elasticsearch.repositories; import com.google.common.collect.ImmutableMap; +import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; @@ -38,6 +39,8 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.snapshots.IndexShardRepository; +import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.snapshots.SnapshotsService; import java.util.ArrayList; import java.util.List; @@ -87,6 +90,7 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { + ensureRepositoryNotInUse(currentState, request.name); // Trying to create the new repository on master to make sure it works if (!registerRepository(newRepositoryMetaData)) { // The new repository has the same settings as the old one - ignore @@ -172,6 +176,7 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { + ensureRepositoryNotInUse(currentState, request.name); MetaData metaData = currentState.metaData(); MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); RepositoriesMetaData repositories = metaData.custom(RepositoriesMetaData.TYPE); @@ -260,22 +265,23 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta } ImmutableMap.Builder builder = ImmutableMap.builder(); - // Now go through all repositories and update existing or create missing - for (RepositoryMetaData repositoryMetaData : newMetaData.repositories()) { - RepositoryHolder holder = survivors.get(repositoryMetaData.name()); - if (holder != null) { - // Found previous version of this repository - if (!holder.type.equals(repositoryMetaData.type()) || !holder.settings.equals(repositoryMetaData.settings())) { - // Previous version is different from the version in settings - closeRepository(repositoryMetaData.name(), holder); + if (newMetaData != null) { + // Now go through all repositories and update existing or create missing + for (RepositoryMetaData repositoryMetaData : newMetaData.repositories()) { + RepositoryHolder holder = survivors.get(repositoryMetaData.name()); + if (holder != null) { + // Found previous version of this repository + if (!holder.type.equals(repositoryMetaData.type()) || !holder.settings.equals(repositoryMetaData.settings())) { + // Previous version is different from the version in settings + closeRepository(repositoryMetaData.name(), holder); + holder = createRepositoryHolder(repositoryMetaData); + } + } else { holder = createRepositoryHolder(repositoryMetaData); - //TODO: Error handling and proper Injector cleanup } - } else { - holder = createRepositoryHolder(repositoryMetaData); - } - if (holder != null) { - builder.put(repositoryMetaData.name(), holder); + if (holder != null) { + builder.put(repositoryMetaData.name(), holder); + } } } repositories = builder.build(); @@ -389,6 +395,12 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta } } + private void ensureRepositoryNotInUse(ClusterState clusterState, String repository) { + if (SnapshotsService.isRepositoryInUse(clusterState, repository) || RestoreService.isRepositoryInUse(clusterState, repository)) { + throw new ElasticSearchIllegalStateException("trying to modify or unregister repository that is currently used "); + } + } + /** * Internal data structure for holding repository with its configuration information and injector */ diff --git a/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 55d4dc90a76..89047881192 100644 --- a/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -423,6 +423,24 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis } } + /** + * Checks if a repository is currently in use by one of the snapshots + * @param clusterState cluster state + * @param repository repository id + * @return true if repository is currently in use by one of the running snapshots + */ + public static boolean isRepositoryInUse(ClusterState clusterState, String repository) { + MetaData metaData = clusterState.metaData(); + RestoreMetaData snapshots = metaData.custom(RestoreMetaData.TYPE); + if (snapshots != null) { + for(RestoreMetaData.Entry snapshot : snapshots.entries()) { + if(repository.equals(snapshot.snapshotId().getRepository())) { + return true; + } + } + } + return false; + } /** * Restore snapshot request diff --git a/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 7835ba6be26..016e24132a6 100644 --- a/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -292,8 +292,7 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL } } mdBuilder.putCustom(SnapshotMetaData.TYPE, new SnapshotMetaData(entries.build())); - ClusterState newState = ClusterState.builder(currentState).metaData(mdBuilder).build(); - return newState; + return ClusterState.builder(currentState).metaData(mdBuilder).build(); } @Override @@ -840,6 +839,25 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL }); } + /** + * Checks if a repository is currently in use by one of the snapshots + * @param clusterState cluster state + * @param repository repository id + * @return true if repository is currently in use by one of the running snapshots + */ + public static boolean isRepositoryInUse(ClusterState clusterState, String repository) { + MetaData metaData = clusterState.metaData(); + SnapshotMetaData snapshots = metaData.custom(SnapshotMetaData.TYPE); + if (snapshots != null) { + for(SnapshotMetaData.Entry snapshot : snapshots.entries()) { + if(repository.equals(snapshot.snapshotId().getRepository())) { + return true; + } + } + } + return false; + } + /** * Deletes snapshot from repository * @@ -941,8 +959,6 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL /** * Called if delete operation failed - * - * @param t */ void onFailure(Throwable t); } diff --git a/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java b/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java index 7f3445c0f41..3503fec95ff 100644 --- a/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java +++ b/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.snapshots; import com.carrotsearch.randomizedtesting.LifecycleScope; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.client.Client; diff --git a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java index 2d6040dc71f..b1cf306e3c3 100644 --- a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java +++ b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java @@ -21,7 +21,6 @@ package org.elasticsearch.snapshots; import com.carrotsearch.randomizedtesting.LifecycleScope; import com.google.common.collect.ImmutableList; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; @@ -729,6 +728,95 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests { assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L)); } + @Test + @TestLogging("cluster.routing.allocation.decider:TRACE") + public void deleteRepositoryWhileSnapshottingTest() throws Exception { + Client client = client(); + File repositoryLocation = newTempDir(LifecycleScope.TEST); + logger.info("--> creating repository"); + PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") + .setType(MockRepositoryModule.class.getCanonicalName()).setSettings( + ImmutableSettings.settingsBuilder() + .put("location", repositoryLocation) + .put("random", randomAsciiOfLength(10)) + .put("wait_after_unblock", 200) + ).get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + + // Create index on 2 nodes and make sure each node has a primary by setting no replicas + assertAcked(prepareCreate("test-idx", 2, ImmutableSettings.builder().put("number_of_replicas", 0))); + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L)); + + // Pick one node and block it + String blockedNode = blockNodeWithIndex("test-idx"); + + logger.info("--> snapshot"); + client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get(); + + logger.info("--> waiting for block to kick in"); + waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); + + logger.info("--> execution was blocked on node [{}], trying to delete repository", blockedNode); + + try { + client.admin().cluster().prepareDeleteRepository("test-repo").execute().get(); + fail("shouldn't be able to delete in-use repository"); + } catch (Exception ex) { + logger.info("--> in-use repository deletion failed"); + } + + logger.info("--> trying to move repository to another location"); + try { + client.admin().cluster().preparePutRepository("test-repo") + .setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", new File(repositoryLocation, "test")) + ).get(); + fail("shouldn't be able to replace in-use repository"); + } catch (Exception ex) { + logger.info("--> in-use repository replacement failed"); + } + + logger.info("--> trying to create a repository with different name"); + putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo-2") + .setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", new File(repositoryLocation, "test")) + ).get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + + logger.info("--> unblocking blocked node"); + unblockNode(blockedNode); + logger.info("--> waiting for completion"); + SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600)); + logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size()); + logger.info("--> done"); + + ImmutableList snapshotInfos = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots(); + + assertThat(snapshotInfos.size(), equalTo(1)); + assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfos.get(0).shardFailures().size(), equalTo(0)); + + logger.info("--> delete index"); + wipeIndices("test-idx"); + + logger.info("--> replace mock repository with real one at the same location"); + putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") + .setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", repositoryLocation) + ).get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + + logger.info("--> restore index"); + RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + + ensureGreen(); + assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L)); + } + @Test public void urlRepositoryTest() throws Exception { Client client = client();