Prevent modification or deletion of repositories while snapshots are running

This commit is contained in:
Igor Motov 2013-11-19 19:20:27 -05:00
parent 2f66bf0720
commit 1d0bae0f50
5 changed files with 153 additions and 20 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.repositories; package org.elasticsearch.repositories;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
@ -38,6 +39,8 @@ import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -87,6 +90,7 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask() { clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
ensureRepositoryNotInUse(currentState, request.name);
// Trying to create the new repository on master to make sure it works // Trying to create the new repository on master to make sure it works
if (!registerRepository(newRepositoryMetaData)) { if (!registerRepository(newRepositoryMetaData)) {
// The new repository has the same settings as the old one - ignore // The new repository has the same settings as the old one - ignore
@ -172,6 +176,7 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask() { clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
ensureRepositoryNotInUse(currentState, request.name);
MetaData metaData = currentState.metaData(); MetaData metaData = currentState.metaData();
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
RepositoriesMetaData repositories = metaData.custom(RepositoriesMetaData.TYPE); RepositoriesMetaData repositories = metaData.custom(RepositoriesMetaData.TYPE);
@ -260,22 +265,23 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
} }
ImmutableMap.Builder<String, RepositoryHolder> builder = ImmutableMap.builder(); ImmutableMap.Builder<String, RepositoryHolder> builder = ImmutableMap.builder();
// Now go through all repositories and update existing or create missing if (newMetaData != null) {
for (RepositoryMetaData repositoryMetaData : newMetaData.repositories()) { // Now go through all repositories and update existing or create missing
RepositoryHolder holder = survivors.get(repositoryMetaData.name()); for (RepositoryMetaData repositoryMetaData : newMetaData.repositories()) {
if (holder != null) { RepositoryHolder holder = survivors.get(repositoryMetaData.name());
// Found previous version of this repository if (holder != null) {
if (!holder.type.equals(repositoryMetaData.type()) || !holder.settings.equals(repositoryMetaData.settings())) { // Found previous version of this repository
// Previous version is different from the version in settings if (!holder.type.equals(repositoryMetaData.type()) || !holder.settings.equals(repositoryMetaData.settings())) {
closeRepository(repositoryMetaData.name(), holder); // Previous version is different from the version in settings
closeRepository(repositoryMetaData.name(), holder);
holder = createRepositoryHolder(repositoryMetaData);
}
} else {
holder = createRepositoryHolder(repositoryMetaData); holder = createRepositoryHolder(repositoryMetaData);
//TODO: Error handling and proper Injector cleanup
} }
} else { if (holder != null) {
holder = createRepositoryHolder(repositoryMetaData); builder.put(repositoryMetaData.name(), holder);
} }
if (holder != null) {
builder.put(repositoryMetaData.name(), holder);
} }
} }
repositories = builder.build(); repositories = builder.build();
@ -389,6 +395,12 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
} }
} }
private void ensureRepositoryNotInUse(ClusterState clusterState, String repository) {
if (SnapshotsService.isRepositoryInUse(clusterState, repository) || RestoreService.isRepositoryInUse(clusterState, repository)) {
throw new ElasticSearchIllegalStateException("trying to modify or unregister repository that is currently used ");
}
}
/** /**
* Internal data structure for holding repository with its configuration information and injector * Internal data structure for holding repository with its configuration information and injector
*/ */

View File

@ -423,6 +423,24 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
} }
} }
/**
* Checks if a repository is currently in use by one of the snapshots
* @param clusterState cluster state
* @param repository repository id
* @return true if repository is currently in use by one of the running snapshots
*/
public static boolean isRepositoryInUse(ClusterState clusterState, String repository) {
MetaData metaData = clusterState.metaData();
RestoreMetaData snapshots = metaData.custom(RestoreMetaData.TYPE);
if (snapshots != null) {
for(RestoreMetaData.Entry snapshot : snapshots.entries()) {
if(repository.equals(snapshot.snapshotId().getRepository())) {
return true;
}
}
}
return false;
}
/** /**
* Restore snapshot request * Restore snapshot request

View File

@ -292,8 +292,7 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
} }
} }
mdBuilder.putCustom(SnapshotMetaData.TYPE, new SnapshotMetaData(entries.build())); mdBuilder.putCustom(SnapshotMetaData.TYPE, new SnapshotMetaData(entries.build()));
ClusterState newState = ClusterState.builder(currentState).metaData(mdBuilder).build(); return ClusterState.builder(currentState).metaData(mdBuilder).build();
return newState;
} }
@Override @Override
@ -840,6 +839,25 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
}); });
} }
/**
* Checks if a repository is currently in use by one of the snapshots
* @param clusterState cluster state
* @param repository repository id
* @return true if repository is currently in use by one of the running snapshots
*/
public static boolean isRepositoryInUse(ClusterState clusterState, String repository) {
MetaData metaData = clusterState.metaData();
SnapshotMetaData snapshots = metaData.custom(SnapshotMetaData.TYPE);
if (snapshots != null) {
for(SnapshotMetaData.Entry snapshot : snapshots.entries()) {
if(repository.equals(snapshot.snapshotId().getRepository())) {
return true;
}
}
}
return false;
}
/** /**
* Deletes snapshot from repository * Deletes snapshot from repository
* *
@ -941,8 +959,6 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
/** /**
* Called if delete operation failed * Called if delete operation failed
*
* @param t
*/ */
void onFailure(Throwable t); void onFailure(Throwable t);
} }

View File

@ -20,7 +20,6 @@
package org.elasticsearch.snapshots; package org.elasticsearch.snapshots;
import com.carrotsearch.randomizedtesting.LifecycleScope; import com.carrotsearch.randomizedtesting.LifecycleScope;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;

View File

@ -21,7 +21,6 @@ package org.elasticsearch.snapshots;
import com.carrotsearch.randomizedtesting.LifecycleScope; import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
@ -729,6 +728,95 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L)); assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
} }
@Test
@TestLogging("cluster.routing.allocation.decider:TRACE")
public void deleteRepositoryWhileSnapshottingTest() throws Exception {
Client client = client();
File repositoryLocation = newTempDir(LifecycleScope.TEST);
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType(MockRepositoryModule.class.getCanonicalName()).setSettings(
ImmutableSettings.settingsBuilder()
.put("location", repositoryLocation)
.put("random", randomAsciiOfLength(10))
.put("wait_after_unblock", 200)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
// Create index on 2 nodes and make sure each node has a primary by setting no replicas
assertAcked(prepareCreate("test-idx", 2, ImmutableSettings.builder().put("number_of_replicas", 0)));
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
// Pick one node and block it
String blockedNode = blockNodeWithIndex("test-idx");
logger.info("--> snapshot");
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
logger.info("--> waiting for block to kick in");
waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60));
logger.info("--> execution was blocked on node [{}], trying to delete repository", blockedNode);
try {
client.admin().cluster().prepareDeleteRepository("test-repo").execute().get();
fail("shouldn't be able to delete in-use repository");
} catch (Exception ex) {
logger.info("--> in-use repository deletion failed");
}
logger.info("--> trying to move repository to another location");
try {
client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", new File(repositoryLocation, "test"))
).get();
fail("shouldn't be able to replace in-use repository");
} catch (Exception ex) {
logger.info("--> in-use repository replacement failed");
}
logger.info("--> trying to create a repository with different name");
putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo-2")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", new File(repositoryLocation, "test"))
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("--> unblocking blocked node");
unblockNode(blockedNode);
logger.info("--> waiting for completion");
SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600));
logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size());
logger.info("--> done");
ImmutableList<SnapshotInfo> snapshotInfos = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots();
assertThat(snapshotInfos.size(), equalTo(1));
assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfos.get(0).shardFailures().size(), equalTo(0));
logger.info("--> delete index");
wipeIndices("test-idx");
logger.info("--> replace mock repository with real one at the same location");
putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", repositoryLocation)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("--> restore index");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
}
@Test @Test
public void urlRepositoryTest() throws Exception { public void urlRepositoryTest() throws Exception {
Client client = client(); Client client = client();