Remove Redundant Request Wrappers from RepositoryService (#40192) (#40404)

This commit is contained in:
Armin Braun 2019-03-25 16:36:02 +01:00 committed by GitHub
parent 61f49af497
commit 3968d46a17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 86 additions and 198 deletions

View File

@ -69,18 +69,17 @@ public class TransportDeleteRepositoryAction extends TransportMasterNodeAction<D
protected void masterOperation(final DeleteRepositoryRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
repositoriesService.unregisterRepository(
new RepositoriesService.UnregisterRepositoryRequest("delete_repository [" + request.name() + "]", request.name())
.masterNodeTimeout(request.masterNodeTimeout()).ackTimeout(request.timeout()),
new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse unregisterRepositoryResponse) {
listener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged()));
}
request,
new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse unregisterRepositoryResponse) {
listener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged()));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}

View File

@ -68,13 +68,7 @@ public class TransportPutRepositoryAction extends TransportMasterNodeAction<PutR
@Override
protected void masterOperation(final PutRepositoryRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
repositoriesService.registerRepository(
new RepositoriesService.RegisterRepositoryRequest("put_repository [" + request.name() + "]",
request.name(), request.type(), request.verify())
.settings(request.settings())
.masterNodeTimeout(request.masterNodeTimeout())
.ackTimeout(request.timeout()), new ActionListener<ClusterStateUpdateResponse>() {
repositoriesService.registerRepository(request, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
@ -87,5 +81,4 @@ public class TransportPutRepositoryAction extends TransportMasterNodeAction<PutR
}
});
}
}

View File

@ -26,13 +26,15 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.List;
/**
* Transport action for verifying repository operation
*/
@ -68,14 +70,10 @@ public class TransportVerifyRepositoryAction extends TransportMasterNodeAction<V
@Override
protected void masterOperation(final VerifyRepositoryRequest request, ClusterState state,
final ActionListener<VerifyRepositoryResponse> listener) {
repositoriesService.verifyRepository(request.name(), new ActionListener<RepositoriesService.VerifyResponse>() {
repositoriesService.verifyRepository(request.name(), new ActionListener<List<DiscoveryNode>>() {
@Override
public void onResponse(RepositoriesService.VerifyResponse verifyResponse) {
if (verifyResponse.failed()) {
listener.onFailure(new RepositoryVerificationException(request.name(), verifyResponse.failureDescription()));
} else {
listener.onResponse(new VerifyRepositoryResponse(verifyResponse.nodes()));
}
public void onResponse(List<DiscoveryNode> verifyResponse) {
listener.onResponse(new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0])));
}
@Override

View File

@ -23,11 +23,12 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
@ -43,12 +44,10 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Service responsible for maintaining and providing access to snapshot repositories on nodes.
@ -93,12 +92,12 @@ public class RepositoriesService implements ClusterStateApplier {
* @param request register repository request
* @param listener register repository listener
*/
public void registerRepository(final RegisterRepositoryRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
final RepositoryMetaData newRepositoryMetaData = new RepositoryMetaData(request.name, request.type, request.settings);
public void registerRepository(final PutRepositoryRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
final RepositoryMetaData newRepositoryMetaData = new RepositoryMetaData(request.name(), request.type(), request.settings());
final ActionListener<ClusterStateUpdateResponse> registrationListener;
if (request.verify) {
registrationListener = new VerifyingRegisterRepositoryListener(request.name, listener);
if (request.verify()) {
registrationListener = new VerifyingRegisterRepositoryListener(request.name(), listener);
} else {
registrationListener = listener;
}
@ -111,7 +110,7 @@ public class RepositoriesService implements ClusterStateApplier {
return;
}
clusterService.submitStateUpdateTask(request.cause,
clusterService.submitStateUpdateTask("put_repository [" + request.name() + "]",
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, registrationListener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
@ -120,14 +119,14 @@ public class RepositoriesService implements ClusterStateApplier {
@Override
public ClusterState execute(ClusterState currentState) {
ensureRepositoryNotInUse(currentState, request.name);
ensureRepositoryNotInUse(currentState, request.name());
MetaData metaData = currentState.metaData();
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
RepositoriesMetaData repositories = metaData.custom(RepositoriesMetaData.TYPE);
if (repositories == null) {
logger.info("put repository [{}]", request.name);
logger.info("put repository [{}]", request.name());
repositories = new RepositoriesMetaData(
Collections.singletonList(new RepositoryMetaData(request.name, request.type, request.settings)));
Collections.singletonList(new RepositoryMetaData(request.name(), request.type(), request.settings())));
} else {
boolean found = false;
List<RepositoryMetaData> repositoriesMetaData = new ArrayList<>(repositories.repositories().size() + 1);
@ -145,10 +144,10 @@ public class RepositoriesService implements ClusterStateApplier {
}
}
if (!found) {
logger.info("put repository [{}]", request.name);
repositoriesMetaData.add(new RepositoryMetaData(request.name, request.type, request.settings));
logger.info("put repository [{}]", request.name());
repositoriesMetaData.add(new RepositoryMetaData(request.name(), request.type(), request.settings()));
} else {
logger.info("update repository [{}]", request.name);
logger.info("update repository [{}]", request.name());
}
repositories = new RepositoriesMetaData(repositoriesMetaData);
}
@ -158,7 +157,7 @@ public class RepositoriesService implements ClusterStateApplier {
@Override
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to create repository [{}]", request.name), e);
logger.warn(() -> new ParameterizedMessage("failed to create repository [{}]", request.name()), e);
super.onFailure(source, e);
}
@ -177,51 +176,52 @@ public class RepositoriesService implements ClusterStateApplier {
* @param request unregister repository request
* @param listener unregister repository listener
*/
public void unregisterRepository(final UnregisterRepositoryRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}
public void unregisterRepository(final DeleteRepositoryRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
clusterService.submitStateUpdateTask("delete_repository [" + request.name() + "]",
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}
@Override
public ClusterState execute(ClusterState currentState) {
ensureRepositoryNotInUse(currentState, request.name);
MetaData metaData = currentState.metaData();
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
RepositoriesMetaData repositories = metaData.custom(RepositoriesMetaData.TYPE);
if (repositories != null && repositories.repositories().size() > 0) {
List<RepositoryMetaData> repositoriesMetaData = new ArrayList<>(repositories.repositories().size());
boolean changed = false;
for (RepositoryMetaData repositoryMetaData : repositories.repositories()) {
if (Regex.simpleMatch(request.name, repositoryMetaData.name())) {
logger.info("delete repository [{}]", repositoryMetaData.name());
changed = true;
} else {
repositoriesMetaData.add(repositoryMetaData);
@Override
public ClusterState execute(ClusterState currentState) {
ensureRepositoryNotInUse(currentState, request.name());
MetaData metaData = currentState.metaData();
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
RepositoriesMetaData repositories = metaData.custom(RepositoriesMetaData.TYPE);
if (repositories != null && repositories.repositories().size() > 0) {
List<RepositoryMetaData> repositoriesMetaData = new ArrayList<>(repositories.repositories().size());
boolean changed = false;
for (RepositoryMetaData repositoryMetaData : repositories.repositories()) {
if (Regex.simpleMatch(request.name(), repositoryMetaData.name())) {
logger.info("delete repository [{}]", repositoryMetaData.name());
changed = true;
} else {
repositoriesMetaData.add(repositoryMetaData);
}
}
if (changed) {
repositories = new RepositoriesMetaData(repositoriesMetaData);
mdBuilder.putCustom(RepositoriesMetaData.TYPE, repositories);
return ClusterState.builder(currentState).metaData(mdBuilder).build();
}
}
if (changed) {
repositories = new RepositoriesMetaData(repositoriesMetaData);
mdBuilder.putCustom(RepositoriesMetaData.TYPE, repositories);
return ClusterState.builder(currentState).metaData(mdBuilder).build();
if (Regex.isMatchAllPattern(request.name())) { // we use a wildcard so we don't barf if it's not present.
return currentState;
}
throw new RepositoryMissingException(request.name());
}
if (Regex.isMatchAllPattern(request.name)) { // we use a wildcard so we don't barf if it's not present.
return currentState;
}
throw new RepositoryMissingException(request.name);
}
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
// repository was created on both master and data nodes
return discoveryNode.isMasterNode() || discoveryNode.isDataNode();
}
});
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
// repository was created on both master and data nodes
return discoveryNode.isMasterNode() || discoveryNode.isDataNode();
}
});
}
public void verifyRepository(final String repositoryName, final ActionListener<VerifyResponse> listener) {
public void verifyRepository(final String repositoryName, final ActionListener<List<DiscoveryNode>> listener) {
final Repository repository = repository(repositoryName);
try {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
@ -229,9 +229,9 @@ public class RepositoriesService implements ClusterStateApplier {
final String verificationToken = repository.startVerification();
if (verificationToken != null) {
try {
verifyAction.verify(repositoryName, verificationToken, new ActionListener<VerifyResponse>() {
verifyAction.verify(repositoryName, verificationToken, new ActionListener<List<DiscoveryNode>>() {
@Override
public void onResponse(VerifyResponse verifyResponse) {
public void onResponse(List<DiscoveryNode> verifyResponse) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
repository.endVerification(verificationToken);
@ -263,7 +263,7 @@ public class RepositoriesService implements ClusterStateApplier {
});
}
} else {
listener.onResponse(new VerifyResponse(new DiscoveryNode[0], new VerificationFailure[0]));
listener.onResponse(Collections.emptyList());
}
} catch (Exception e) {
listener.onFailure(e);
@ -440,14 +440,10 @@ public class RepositoriesService implements ClusterStateApplier {
public void onResponse(final ClusterStateUpdateResponse clusterStateUpdateResponse) {
if (clusterStateUpdateResponse.isAcknowledged()) {
// The response was acknowledged - all nodes should know about the new repository, let's verify them
verifyRepository(name, new ActionListener<VerifyResponse>() {
verifyRepository(name, new ActionListener<List<DiscoveryNode>>() {
@Override
public void onResponse(VerifyResponse verifyResponse) {
if (verifyResponse.failed()) {
listener.onFailure(new RepositoryVerificationException(name, verifyResponse.failureDescription()));
} else {
listener.onResponse(clusterStateUpdateResponse);
}
public void onResponse(List<DiscoveryNode> verifyResponse) {
listener.onResponse(clusterStateUpdateResponse);
}
@Override
@ -465,104 +461,4 @@ public class RepositoriesService implements ClusterStateApplier {
listener.onFailure(e);
}
}
/**
* Register repository request
*/
public static class RegisterRepositoryRequest extends ClusterStateUpdateRequest<RegisterRepositoryRequest> {
final String cause;
final String name;
final String type;
final boolean verify;
Settings settings = Settings.EMPTY;
/**
* Constructs new register repository request
*
* @param cause repository registration cause
* @param name repository name
* @param type repository type
* @param verify verify repository after creation
*/
public RegisterRepositoryRequest(String cause, String name, String type, boolean verify) {
this.cause = cause;
this.name = name;
this.type = type;
this.verify = verify;
}
/**
* Sets repository settings
*
* @param settings repository settings
* @return this request
*/
public RegisterRepositoryRequest settings(Settings settings) {
this.settings = settings;
return this;
}
}
/**
* Unregister repository request
*/
public static class UnregisterRepositoryRequest extends ClusterStateUpdateRequest<UnregisterRepositoryRequest> {
final String cause;
final String name;
/**
* Creates a new unregister repository request
*
* @param cause repository unregistration cause
* @param name repository name
*/
public UnregisterRepositoryRequest(String cause, String name) {
this.cause = cause;
this.name = name;
}
}
/**
* Verify repository request
*/
public static class VerifyResponse {
private VerificationFailure[] failures;
private DiscoveryNode[] nodes;
public VerifyResponse(DiscoveryNode[] nodes, VerificationFailure[] failures) {
this.nodes = nodes;
this.failures = failures;
}
public VerificationFailure[] failures() {
return failures;
}
public DiscoveryNode[] nodes() {
return nodes;
}
public boolean failed() {
return failures.length > 0;
}
public String failureDescription() {
return Arrays
.stream(failures)
.map(failure -> failure.toString())
.collect(Collectors.joining(", ", "[", "]"));
}
}
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.repositories.RepositoriesService.VerifyResponse;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
@ -68,7 +67,7 @@ public class VerifyNodeRepositoryAction {
new VerifyNodeRepositoryRequestHandler());
}
public void verify(String repository, String verificationToken, final ActionListener<VerifyResponse> listener) {
public void verify(String repository, String verificationToken, final ActionListener<List<DiscoveryNode>> listener) {
final DiscoveryNodes discoNodes = clusterService.state().nodes();
final DiscoveryNode localNode = discoNodes.getLocalNode();
@ -89,7 +88,7 @@ public class VerifyNodeRepositoryAction {
errors.add(new VerificationFailure(node.getId(), e));
}
if (counter.decrementAndGet() == 0) {
finishVerification(listener, nodes, errors);
finishVerification(repository, listener, nodes, errors);
}
} else {
transportService.sendRequest(node, ACTION_NAME, new VerifyNodeRepositoryRequest(repository, verificationToken),
@ -97,7 +96,7 @@ public class VerifyNodeRepositoryAction {
@Override
public void handleResponse(TransportResponse.Empty response) {
if (counter.decrementAndGet() == 0) {
finishVerification(listener, nodes, errors);
finishVerification(repository, listener, nodes, errors);
}
}
@ -105,7 +104,7 @@ public class VerifyNodeRepositoryAction {
public void handleException(TransportException exp) {
errors.add(new VerificationFailure(node.getId(), exp));
if (counter.decrementAndGet() == 0) {
finishVerification(listener, nodes, errors);
finishVerification(repository, listener, nodes, errors);
}
}
});
@ -113,10 +112,13 @@ public class VerifyNodeRepositoryAction {
}
}
public void finishVerification(ActionListener<VerifyResponse> listener, List<DiscoveryNode> nodes,
private static void finishVerification(String repositoryName, ActionListener<List<DiscoveryNode>> listener, List<DiscoveryNode> nodes,
CopyOnWriteArrayList<VerificationFailure> errors) {
listener.onResponse(new RepositoriesService.VerifyResponse(nodes.toArray(new DiscoveryNode[nodes.size()]),
errors.toArray(new VerificationFailure[errors.size()])));
if (errors.isEmpty() == false) {
listener.onFailure(new RepositoryVerificationException(repositoryName, errors.toString()));
} else {
listener.onResponse(nodes);
}
}
private void doVerify(String repositoryName, String verificationToken, DiscoveryNode localNode) {