SNAPSHOT: Repo Creation out of ClusterStateTask (#36157)
* Move `createRepository` call out of cluster state tasks * Now only `RepositoriesService#applyClusterState` manipulates `this.repositories` * Closes #9488
This commit is contained in:
parent
c007a42ba0
commit
3c54b413ad
|
@ -41,7 +41,6 @@ import org.elasticsearch.snapshots.SnapshotsService;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -100,6 +99,14 @@ public class RepositoriesService implements ClusterStateApplier {
|
||||||
registrationListener = listener;
|
registrationListener = listener;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Trying to create the new repository on master to make sure it works
|
||||||
|
try {
|
||||||
|
closeRepository(createRepository(newRepositoryMetaData));
|
||||||
|
} catch (Exception e) {
|
||||||
|
registrationListener.onFailure(e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, registrationListener) {
|
clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, registrationListener) {
|
||||||
@Override
|
@Override
|
||||||
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
|
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
|
||||||
|
@ -107,13 +114,8 @@ public class RepositoriesService implements ClusterStateApplier {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) throws IOException {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
ensureRepositoryNotInUse(currentState, request.name);
|
ensureRepositoryNotInUse(currentState, request.name);
|
||||||
// Trying to create the new repository on master to make sure it works
|
|
||||||
if (!registerRepository(newRepositoryMetaData)) {
|
|
||||||
// The new repository has the same settings as the old one - ignore
|
|
||||||
return currentState;
|
|
||||||
}
|
|
||||||
MetaData metaData = currentState.metaData();
|
MetaData metaData = currentState.metaData();
|
||||||
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
|
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
|
||||||
RepositoriesMetaData repositories = metaData.custom(RepositoriesMetaData.TYPE);
|
RepositoriesMetaData repositories = metaData.custom(RepositoriesMetaData.TYPE);
|
||||||
|
@ -127,6 +129,10 @@ public class RepositoriesService implements ClusterStateApplier {
|
||||||
|
|
||||||
for (RepositoryMetaData repositoryMetaData : repositories.repositories()) {
|
for (RepositoryMetaData repositoryMetaData : repositories.repositories()) {
|
||||||
if (repositoryMetaData.name().equals(newRepositoryMetaData.name())) {
|
if (repositoryMetaData.name().equals(newRepositoryMetaData.name())) {
|
||||||
|
if (newRepositoryMetaData.equals(repositoryMetaData)) {
|
||||||
|
// Previous version is the same as this one no update is needed.
|
||||||
|
return currentState;
|
||||||
|
}
|
||||||
found = true;
|
found = true;
|
||||||
repositoriesMetaData.add(newRepositoryMetaData);
|
repositoriesMetaData.add(newRepositoryMetaData);
|
||||||
} else {
|
} else {
|
||||||
|
@ -352,37 +358,8 @@ public class RepositoriesService implements ClusterStateApplier {
|
||||||
throw new RepositoryMissingException(repositoryName);
|
throw new RepositoryMissingException(repositoryName);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new repository and adds it to the list of registered repositories.
|
|
||||||
* <p>
|
|
||||||
* If a repository with the same name but different types or settings already exists, it will be closed and
|
|
||||||
* replaced with the new repository. If a repository with the same name exists but it has the same type and settings
|
|
||||||
* the new repository is ignored.
|
|
||||||
*
|
|
||||||
* @param repositoryMetaData new repository metadata
|
|
||||||
* @return {@code true} if new repository was added or {@code false} if it was ignored
|
|
||||||
*/
|
|
||||||
private boolean registerRepository(RepositoryMetaData repositoryMetaData) throws IOException {
|
|
||||||
Repository previous = repositories.get(repositoryMetaData.name());
|
|
||||||
if (previous != null) {
|
|
||||||
RepositoryMetaData previousMetadata = previous.getMetadata();
|
|
||||||
if (previousMetadata.equals(repositoryMetaData)) {
|
|
||||||
// Previous version is the same as this one - ignore it
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Repository newRepo = createRepository(repositoryMetaData);
|
|
||||||
if (previous != null) {
|
|
||||||
closeRepository(previous);
|
|
||||||
}
|
|
||||||
Map<String, Repository> newRepositories = new HashMap<>(repositories);
|
|
||||||
newRepositories.put(repositoryMetaData.name(), newRepo);
|
|
||||||
repositories = newRepositories;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Closes the given repository. */
|
/** Closes the given repository. */
|
||||||
private void closeRepository(Repository repository) throws IOException {
|
private void closeRepository(Repository repository) {
|
||||||
logger.debug("closing repository [{}][{}]", repository.getMetadata().type(), repository.getMetadata().name());
|
logger.debug("closing repository [{}][{}]", repository.getMetadata().type(), repository.getMetadata().name());
|
||||||
repository.close();
|
repository.close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,6 +97,16 @@ public class RepositoriesIT extends AbstractSnapshotIntegTestCase {
|
||||||
assertThat(findRepository(repositoriesResponse.repositories(), "test-repo-1"), notNullValue());
|
assertThat(findRepository(repositoriesResponse.repositories(), "test-repo-1"), notNullValue());
|
||||||
assertThat(findRepository(repositoriesResponse.repositories(), "test-repo-2"), notNullValue());
|
assertThat(findRepository(repositoriesResponse.repositories(), "test-repo-2"), notNullValue());
|
||||||
|
|
||||||
|
logger.info("--> check that trying to create a repository with the same settings repeatedly does not update cluster state");
|
||||||
|
String beforeStateUuid = clusterStateResponse.getState().stateUUID();
|
||||||
|
assertThat(
|
||||||
|
client.admin().cluster().preparePutRepository("test-repo-1")
|
||||||
|
.setType("fs").setSettings(Settings.builder()
|
||||||
|
.put("location", location)
|
||||||
|
).get().isAcknowledged(),
|
||||||
|
equalTo(true));
|
||||||
|
assertEquals(beforeStateUuid, client.admin().cluster().prepareState().clear().get().getState().stateUUID());
|
||||||
|
|
||||||
logger.info("--> delete repository test-repo-1");
|
logger.info("--> delete repository test-repo-1");
|
||||||
client.admin().cluster().prepareDeleteRepository("test-repo-1").get();
|
client.admin().cluster().prepareDeleteRepository("test-repo-1").get();
|
||||||
repositoriesResponse = client.admin().cluster().prepareGetRepositories().get();
|
repositoriesResponse = client.admin().cluster().prepareGetRepositories().get();
|
||||||
|
|
Loading…
Reference in New Issue