Refactored AckedClusterStateUpdateTask & co. to remove code repetitions in subclasses

Made AckedClusterStateUpdateTask an abstract class instead of an interface, which contains the common methods.
Also introduced the AckedRequest interface to mark both AcknowledgedRequest & ClusterStateUpdateRequest so that the different ways of updating the cluster state (with or without a MetaData*Service) can share the same code.
Removed ClusterStateUpdateListener as we can just use its base class ActionListener instead.

Closes #6559
This commit is contained in:
javanna 2014-06-18 23:29:13 +02:00 committed by Luca Cavanna
parent e52364a95a
commit f16451a446
25 changed files with 170 additions and 512 deletions

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
@ -76,10 +77,9 @@ public class TransportDeleteRepositoryAction extends TransportMasterNodeOperatio
repositoriesService.unregisterRepository(
new RepositoriesService.UnregisterRepositoryRequest("delete_repository [" + request.name() + "]", request.name())
.masterNodeTimeout(request.masterNodeTimeout()).ackTimeout(request.timeout()),
new ActionListener<RepositoriesService.UnregisterRepositoryResponse>() {
new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(RepositoriesService.UnregisterRepositoryResponse unregisterRepositoryResponse) {
public void onResponse(ClusterStateUpdateResponse unregisterRepositoryResponse) {
listener.onResponse(new DeleteRepositoryResponse(unregisterRepositoryResponse.isAcknowledged()));
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
@ -77,10 +78,10 @@ public class TransportPutRepositoryAction extends TransportMasterNodeOperationAc
repositoriesService.registerRepository(new RepositoriesService.RegisterRepositoryRequest("put_repository [" + request.name() + "]", request.name(), request.type())
.settings(request.settings())
.masterNodeTimeout(request.masterNodeTimeout())
.ackTimeout(request.timeout()), new ActionListener<RepositoriesService.RegisterRepositoryResponse>() {
.ackTimeout(request.timeout()), new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(RepositoriesService.RegisterRepositoryResponse response) {
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new PutRepositoryResponse(response.isAcknowledged()));
}

View File

@ -25,15 +25,12 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -73,19 +70,14 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
@Override
protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state, final ActionListener<ClusterRerouteResponse> listener) throws ElasticsearchException {
clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.IMMEDIATE, new AckedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.IMMEDIATE, new AckedClusterStateUpdateTask<ClusterRerouteResponse>(request, listener) {
private volatile ClusterState clusterStateToSend;
private volatile RoutingExplanations explanations;
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new ClusterRerouteResponse(true, clusterStateToSend, explanations));
protected ClusterRerouteResponse newResponse(boolean acknowledged) {
return new ClusterRerouteResponse(acknowledged, clusterStateToSend, explanations);
}
@Override
@ -93,20 +85,10 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
listener.onResponse(new ClusterRerouteResponse(false, clusterStateToSend, new RoutingExplanations()));
}
@Override
public TimeValue ackTimeout() {
return request.timeout();
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void onFailure(String source, Throwable t) {
logger.debug("failed to perform [{}]", t, source);
listener.onFailure(t);
super.onFailure(source, t);
}
@Override
@ -120,11 +102,6 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
}
return newState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
}
}

View File

@ -37,7 +37,6 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -87,13 +86,13 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
final ImmutableSettings.Builder transientUpdates = ImmutableSettings.settingsBuilder();
final ImmutableSettings.Builder persistentUpdates = ImmutableSettings.settingsBuilder();
clusterService.submitStateUpdateTask("cluster_update_settings", Priority.IMMEDIATE, new AckedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("cluster_update_settings", Priority.IMMEDIATE, new AckedClusterStateUpdateTask<ClusterUpdateSettingsResponse>(request, listener) {
private volatile boolean changed = false;
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
return new ClusterUpdateSettingsResponse(acknowledged, transientUpdates.build(), persistentUpdates.build());
}
@Override
@ -101,9 +100,8 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
if (changed) {
reroute(true);
} else {
listener.onResponse(new ClusterUpdateSettingsResponse(true, transientUpdates.build(), persistentUpdates.build()));
super.onAllNodesAcked(t);
}
}
@Override
@ -111,7 +109,7 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
if (changed) {
reroute(false);
} else {
listener.onResponse(new ClusterUpdateSettingsResponse(false, transientUpdates.build(), persistentUpdates.build()));
super.onAckTimeout();
}
}
@ -129,7 +127,7 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
// in the components (e.g. FilterAllocationDecider), so the changes made by the first call aren't visible
// to the components until the ClusterStateListener instances have been invoked, but are visible after
// the first update task has been completed.
clusterService.submitStateUpdateTask("reroute_after_cluster_update_settings", Priority.URGENT, new AckedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("reroute_after_cluster_update_settings", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterUpdateSettingsResponse>(request, listener) {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
@ -138,25 +136,9 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
//we return when the cluster reroute is acked (the acknowledged flag depends on whether the update settings was acknowledged)
listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build()));
}
@Override
public void onAckTimeout() {
//we return when the cluster reroute ack times out (acknowledged false)
listener.onResponse(new ClusterUpdateSettingsResponse(false, transientUpdates.build(), persistentUpdates.build()));
}
@Override
public TimeValue ackTimeout() {
return request.timeout();
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
//we return when the cluster reroute is acked or it times out but the acknowledged flag depends on whether the update settings was acknowledged
protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
return new ClusterUpdateSettingsResponse(updateSettingsAcked && acknowledged, transientUpdates.build(), persistentUpdates.build());
}
@Override
@ -175,27 +157,13 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
}
return ClusterState.builder(currentState).routingResult(routingResult).build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
}
@Override
public TimeValue ackTimeout() {
return request.timeout();
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void onFailure(String source, Throwable t) {
logger.debug("failed to perform [{}]", t, source);
listener.onFailure(t);
super.onFailure(source, t);
}
@Override
@ -252,11 +220,6 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
return builder(currentState).metaData(metaData).blocks(blocks).build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
}
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasA
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -123,7 +122,7 @@ public class TransportIndicesAliasesAction extends TransportMasterNodeOperationA
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.actions(finalActions.toArray(new AliasAction[finalActions.size()]));
indexAliasesService.indicesAliases(updateRequest, new ClusterStateUpdateListener() {
indexAliasesService.indicesAliases(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new IndicesAliasesResponse(response.isAcknowledged()));

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -91,7 +90,7 @@ public class TransportCloseIndexAction extends TransportMasterNodeOperationActio
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.indices(request.indices());
indexStateService.closeIndex(updateRequest, new ClusterStateUpdateListener() {
indexStateService.closeIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -87,7 +86,7 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases()).customs(request.customs());
createIndexService.createIndex(updateRequest, new ClusterStateUpdateListener() {
createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {

View File

@ -37,7 +37,6 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -179,7 +178,7 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout());
metaDataMappingService.removeMapping(clusterStateUpdateRequest, new ClusterStateUpdateListener() {
metaDataMappingService.removeMapping(clusterStateUpdateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new DeleteMappingResponse(response.isAcknowledged()));

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -87,7 +86,7 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio
.indices(request.indices()).type(request.type())
.source(request.source()).ignoreConflicts(request.ignoreConflicts());
metaDataMappingService.putMapping(updateRequest, new ClusterStateUpdateListener() {
metaDataMappingService.putMapping(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -91,7 +90,7 @@ public class TransportOpenIndexAction extends TransportMasterNodeOperationAction
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.indices(request.indices());
indexStateService.openIndex(updateRequest, new ClusterStateUpdateListener() {
indexStateService.openIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
import org.elasticsearch.common.inject.Inject;
@ -81,7 +80,7 @@ public class TransportUpdateSettingsAction extends TransportMasterNodeOperationA
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout());
updateSettingsService.updateSettings(clusterStateUpdateRequest, new ClusterStateUpdateListener() {
updateSettingsService.updateSettings(clusterStateUpdateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new UpdateSettingsResponse(response.isAcknowledged()));

View File

@ -29,12 +29,9 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.search.warmer.IndexWarmerMissingException;
@ -90,37 +87,17 @@ public class TransportDeleteWarmerAction extends TransportMasterNodeOperationAct
@Override
protected void masterOperation(final DeleteWarmerRequest request, final ClusterState state, final ActionListener<DeleteWarmerResponse> listener) throws ElasticsearchException {
clusterService.submitStateUpdateTask("delete_warmer [" + Arrays.toString(request.names()) + "]", new AckedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("delete_warmer [" + Arrays.toString(request.names()) + "]", new AckedClusterStateUpdateTask<DeleteWarmerResponse>(request, listener) {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new DeleteWarmerResponse(true));
}
@Override
public void onAckTimeout() {
listener.onResponse(new DeleteWarmerResponse(false));
}
@Override
public TimeValue ackTimeout() {
return request.timeout();
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
protected DeleteWarmerResponse newResponse(boolean acknowledged) {
return new DeleteWarmerResponse(acknowledged);
}
@Override
public void onFailure(String source, Throwable t) {
logger.debug("failed to delete warmer [{}] on indices [{}]", t, Arrays.toString(request.names()), request.indices());
listener.onFailure(t);
super.onFailure(source, t);
}
@Override
@ -184,11 +161,6 @@ public class TransportDeleteWarmerAction extends TransportMasterNodeOperationAct
return ClusterState.builder(currentState).metaData(mdBuilder).build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
}
}

View File

@ -31,12 +31,9 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.search.warmer.IndexWarmersMetaData;
@ -98,37 +95,17 @@ public class TransportPutWarmerAction extends TransportMasterNodeOperationAction
return;
}
clusterService.submitStateUpdateTask("put_warmer [" + request.name() + "]", new AckedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("put_warmer [" + request.name() + "]", new AckedClusterStateUpdateTask<PutWarmerResponse>(request, listener) {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new PutWarmerResponse(true));
}
@Override
public void onAckTimeout() {
listener.onResponse(new PutWarmerResponse(false));
}
@Override
public TimeValue ackTimeout() {
return request.timeout();
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
protected PutWarmerResponse newResponse(boolean acknowledged) {
return new PutWarmerResponse(acknowledged);
}
@Override
public void onFailure(String source, Throwable t) {
logger.debug("failed to put warmer [{}] on indices [{}]", t, request.name(), request.searchRequest().indices());
listener.onFailure(t);
super.onFailure(source, t);
}
@Override
@ -180,11 +157,6 @@ public class TransportPutWarmerAction extends TransportMasterNodeOperationAction
return ClusterState.builder(currentState).metaData(mdBuilder).build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.action.support.master;
import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
@ -31,7 +32,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
* Abstract class that allows to mark action requests that support acknowledgements.
* Facilitates consistency across different api.
*/
public abstract class AcknowledgedRequest<T extends MasterNodeOperationRequest> extends MasterNodeOperationRequest<T> {
public abstract class AcknowledgedRequest<T extends MasterNodeOperationRequest> extends MasterNodeOperationRequest<T> implements AckedRequest {
public static final TimeValue DEFAULT_ACK_TIMEOUT = timeValueSeconds(30);
@ -83,4 +84,9 @@ public abstract class AcknowledgedRequest<T extends MasterNodeOperationRequest>
protected void writeTimeout(StreamOutput out) throws IOException {
timeout.writeTo(out);
}
@Override
public TimeValue ackTimeout() {
return timeout;
}
}

View File

@ -18,6 +18,8 @@
*/
package org.elasticsearch.cluster;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
@ -26,30 +28,63 @@ import org.elasticsearch.common.unit.TimeValue;
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
* all the nodes have acknowledged a cluster state update request
*/
public interface AckedClusterStateUpdateTask extends TimeoutClusterStateUpdateTask {
public abstract class AckedClusterStateUpdateTask<Response> implements TimeoutClusterStateUpdateTask {
private final ActionListener<Response> listener;
private final AckedRequest request;
protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener<Response> listener) {
this.listener = listener;
this.request = request;
}
/**
* Called to determine which nodes the acknowledgement is expected from
* @param discoveryNode a node
* @return true if the node is expected to send ack back, false otherwise
*/
boolean mustAck(DiscoveryNode discoveryNode);
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
/**
* Called once all the nodes have acknowledged the cluster state update request. Must be
* very lightweight execution, since it gets executed on the cluster service thread.
* @param t optional error that might have been thrown
*/
void onAllNodesAcked(@Nullable Throwable t);
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(newResponse(true));
}
protected abstract Response newResponse(boolean acknowledged);
/**
* Called once the acknowledgement timeout defined by
* {@link AckedClusterStateUpdateTask#ackTimeout()} has expired
*/
void onAckTimeout();
public void onAckTimeout() {
listener.onResponse(newResponse(false));
}
@Override
public void onFailure(String source, Throwable t) {
listener.onFailure(t);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
/**
* Acknowledgement timeout, maximum time interval to wait for acknowledgements
*/
TimeValue ackTimeout();
public TimeValue ackTimeout() {
return request.ackTimeout();
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
}

View File

@ -19,19 +19,20 @@
package org.elasticsearch.cluster.ack;
import org.elasticsearch.common.unit.TimeValue;
/**
* Listener used for cluster state updates processing
* Supports acknowledgement logic
* Identifies a cluster state update request with acknowledgement support
*/
public interface ClusterStateUpdateListener {
public interface AckedRequest {
/**
* Called when the cluster state update is acknowledged
* Returns the acknowledgement timeout
*/
void onResponse(ClusterStateUpdateResponse response);
TimeValue ackTimeout();
/**
* Called when any error is thrown during the cluster state update processing
* Returns the timeout for the request to be completed on the master node
*/
void onFailure(Throwable t);
TimeValue masterNodeTimeout();
}

View File

@ -25,7 +25,7 @@ import org.elasticsearch.common.unit.TimeValue;
* Base class to be used when needing to update the cluster state
* Contains the basic fields that are always needed
*/
public abstract class ClusterStateUpdateRequest<T extends ClusterStateUpdateRequest<T>> {
public abstract class ClusterStateUpdateRequest<T extends ClusterStateUpdateRequest<T>> implements AckedRequest {
private TimeValue ackTimeout;
private TimeValue masterNodeTimeout;

View File

@ -27,7 +27,6 @@ import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
@ -148,7 +147,7 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
@Override
protected void masterOperation(final MappingUpdatedRequest request, final ClusterState state, final ActionListener<MappingUpdatedResponse> listener) throws ElasticsearchException {
metaDataMappingService.updateMapping(request.index(), request.indexUUID(), request.type(), request.mappingSource(), request.order, request.nodeId, new ClusterStateUpdateListener() {
metaDataMappingService.updateMapping(request.index(), request.indexUUID(), request.type(), request.mappingSource(), request.order, request.nodeId, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new MappingUpdatedResponse());

View File

@ -28,16 +28,15 @@ import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -51,7 +50,6 @@ import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
@ -112,7 +110,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
this.aliasValidator = aliasValidator;
}
public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) {
public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
for (Map.Entry<String, String> entry : request.settings().getAsMap().entrySet()) {
if (!entry.getKey().startsWith("index.")) {
@ -176,40 +174,30 @@ public class MetaDataCreateIndexService extends AbstractComponent {
}
}
private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ClusterStateUpdateListener listener, final Semaphore mdLock) {
clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", Priority.URGENT, new AckedClusterStateUpdateTask() {
private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener, final Semaphore mdLock) {
clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
mdLock.release();
listener.onResponse(new ClusterStateUpdateResponse(true));
super.onAllNodesAcked(t);
}
@Override
public void onAckTimeout() {
mdLock.release();
listener.onResponse(new ClusterStateUpdateResponse(false));
}
@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
super.onAckTimeout();
}
@Override
public void onFailure(String source, Throwable t) {
mdLock.release();
listener.onFailure(t);
super.onFailure(source, t);
}
@Override
@ -446,10 +434,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
}
}
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
}

View File

@ -22,20 +22,17 @@ package org.elasticsearch.cluster.metadata;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.service.IndexService;
@ -64,37 +61,11 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
this.aliasValidator = aliasValidator;
}
public void indicesAliases(final IndicesAliasesClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) {
clusterService.submitStateUpdateTask("index-aliases", Priority.URGENT, new AckedClusterStateUpdateTask() {
public void indicesAliases(final IndicesAliasesClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
clusterService.submitStateUpdateTask("index-aliases", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new ClusterStateUpdateResponse(true));
}
@Override
public void onAckTimeout() {
listener.onResponse(new ClusterStateUpdateResponse(false));
}
@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void onFailure(String source, Throwable t) {
listener.onFailure(t);
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}
@Override
@ -186,11 +157,6 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
}
}
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
}
}

View File

@ -20,28 +20,25 @@
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndexPrimaryShardNotAllocatedException;
@ -69,42 +66,16 @@ public class MetaDataIndexStateService extends AbstractComponent {
this.allocationService = allocationService;
}
public void closeIndex(final CloseIndexClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) {
public void closeIndex(final CloseIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
if (request.indices() == null || request.indices().length == 0) {
throw new ElasticsearchIllegalArgumentException("Index name is required");
}
final String indicesAsString = Arrays.toString(request.indices());
clusterService.submitStateUpdateTask("close-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("close-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new ClusterStateUpdateResponse(true));
}
@Override
public void onAckTimeout() {
listener.onResponse(new ClusterStateUpdateResponse(false));
}
@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void onFailure(String source, Throwable t) {
listener.onFailure(t);
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}
@Override
@ -152,50 +123,19 @@ public class MetaDataIndexStateService extends AbstractComponent {
//no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask
return ClusterState.builder(updatedState).routingResult(routingResult).build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
}
public void openIndex(final OpenIndexClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) {
public void openIndex(final OpenIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
if (request.indices() == null || request.indices().length == 0) {
throw new ElasticsearchIllegalArgumentException("Index name is required");
}
final String indicesAsString = Arrays.toString(request.indices());
clusterService.submitStateUpdateTask("open-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("open-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new ClusterStateUpdateResponse(true));
}
@Override
public void onAckTimeout() {
listener.onResponse(new ClusterStateUpdateResponse(false));
}
@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void onFailure(String source, Throwable t) {
listener.onFailure(t);
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}
@Override
@ -236,11 +176,6 @@ public class MetaDataIndexStateService extends AbstractComponent {
//no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask
return ClusterState.builder(updatedState).routingResult(routingResult).build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
}

View File

@ -22,22 +22,19 @@ package org.elasticsearch.cluster.metadata;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
@ -102,9 +99,9 @@ public class MetaDataMappingService extends AbstractComponent {
final CompressedString mappingSource;
final long order; // -1 for unknown
final String nodeId; // null fr unknown
final ClusterStateUpdateListener listener;
final ActionListener<ClusterStateUpdateResponse> listener;
UpdateTask(String index, String indexUUID, String type, CompressedString mappingSource, long order, String nodeId, ClusterStateUpdateListener listener) {
UpdateTask(String index, String indexUUID, String type, CompressedString mappingSource, long order, String nodeId, ActionListener<ClusterStateUpdateResponse> listener) {
super(index, indexUUID);
this.type = type;
this.mappingSource = mappingSource;
@ -365,7 +362,7 @@ public class MetaDataMappingService extends AbstractComponent {
});
}
public void updateMapping(final String index, final String indexUUID, final String type, final CompressedString mappingSource, final long order, final String nodeId, final ClusterStateUpdateListener listener) {
public void updateMapping(final String index, final String indexUUID, final String type, final CompressedString mappingSource, final long order, final String nodeId, final ActionListener<ClusterStateUpdateResponse> listener) {
final long insertOrder;
synchronized (refreshOrUpdateMutex) {
insertOrder = ++refreshOrUpdateInsertOrder;
@ -384,37 +381,11 @@ public class MetaDataMappingService extends AbstractComponent {
});
}
public void removeMapping(final DeleteMappingClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) {
clusterService.submitStateUpdateTask("remove-mapping [" + Arrays.toString(request.types()) + "]", Priority.HIGH, new AckedClusterStateUpdateTask() {
public void removeMapping(final DeleteMappingClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
clusterService.submitStateUpdateTask("remove-mapping [" + Arrays.toString(request.types()) + "]", Priority.HIGH, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new ClusterStateUpdateResponse(true));
}
@Override
public void onAckTimeout() {
listener.onResponse(new ClusterStateUpdateResponse(false));
}
@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void onFailure(String source, Throwable t) {
listener.onFailure(t);
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}
@Override
@ -455,46 +426,16 @@ public class MetaDataMappingService extends AbstractComponent {
return ClusterState.builder(currentState).metaData(builder).build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
}
public void putMapping(final PutMappingClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) {
public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]", Priority.HIGH, new AckedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]", Priority.HIGH, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new ClusterStateUpdateResponse(true));
}
@Override
public void onAckTimeout() {
listener.onResponse(new ClusterStateUpdateResponse(false));
}
@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void onFailure(String source, Throwable t) {
listener.onFailure(t);
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}
@Override
@ -627,11 +568,6 @@ public class MetaDataMappingService extends AbstractComponent {
}
}
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
}
}

View File

@ -21,19 +21,17 @@ package org.elasticsearch.cluster.metadata;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -149,7 +147,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
.ackTimeout(TimeValue.timeValueMillis(0)) //no need to wait for ack here
.masterNodeTimeout(TimeValue.timeValueMinutes(10));
updateSettings(updateRequest, new ClusterStateUpdateListener() {
updateSettings(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
for (String index : indices) {
@ -168,7 +166,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
}
}
public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) {
public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
for (Map.Entry<String, String> entry : request.settings().getAsMap().entrySet()) {
if (entry.getKey().equals("index")) {
@ -215,36 +213,11 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
}
final Settings openSettings = updatedSettingsBuilder.build();
clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new AckedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new ClusterStateUpdateResponse(true));
}
@Override
public void onAckTimeout() {
listener.onResponse(new ClusterStateUpdateResponse(false));
}
@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void onFailure(String source, Throwable t) {
listener.onFailure(t);
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}
@Override
@ -343,10 +316,6 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
return updatedState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
}
}

View File

@ -29,7 +29,6 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
@ -37,7 +36,6 @@ import org.elasticsearch.common.inject.Injectors;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
@ -84,10 +82,15 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
* @param request register repository request
* @param listener register repository listener
*/
public void registerRepository(final RegisterRepositoryRequest request, final ActionListener<RegisterRepositoryResponse> listener) {
public void registerRepository(final RegisterRepositoryRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
final RepositoryMetaData newRepositoryMetaData = new RepositoryMetaData(request.name, request.type, request.settings);
clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}
@Override
public ClusterState execute(ClusterState currentState) {
ensureRepositoryNotInUse(currentState, request.name);
@ -129,38 +132,13 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
@Override
public void onFailure(String source, Throwable t) {
logger.warn("failed to create repository [{}]", t, request.name);
listener.onFailure(t);
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
super.onFailure(source, t);
}
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return discoveryNode.masterNode();
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new RegisterRepositoryResponse(true));
}
@Override
public void onAckTimeout() {
listener.onResponse(new RegisterRepositoryResponse(false));
}
@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
});
}
@ -172,8 +150,13 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
* @param request unregister repository request
* @param listener unregister repository listener
*/
public void unregisterRepository(final UnregisterRepositoryRequest request, final ActionListener<UnregisterRepositoryResponse> listener) {
clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask() {
public void unregisterRepository(final UnregisterRepositoryRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}
@Override
public ClusterState execute(ClusterState currentState) {
ensureRepositoryNotInUse(currentState, request.name);
@ -200,40 +183,11 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
throw new RepositoryMissingException(request.name);
}
@Override
public void onFailure(String source, Throwable t) {
listener.onFailure(t);
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
// Since operation occurs only on masters, it's enough that only master-eligible nodes acked
return discoveryNode.masterNode();
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new UnregisterRepositoryResponse(true));
}
@Override
public void onAckTimeout() {
listener.onResponse(new UnregisterRepositoryResponse(false));
}
@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
});
}
@ -464,17 +418,6 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
}
}
/**
* Register repository response
*/
public static class RegisterRepositoryResponse extends ClusterStateUpdateResponse {
RegisterRepositoryResponse(boolean acknowledged) {
super(acknowledged);
}
}
/**
* Unregister repository request
*/
@ -496,13 +439,4 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
}
}
/**
* Unregister repository response
*/
public static class UnregisterRepositoryResponse extends ClusterStateUpdateResponse {
UnregisterRepositoryResponse(boolean acknowledged) {
super(acknowledged);
}
}
}

View File

@ -122,7 +122,12 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
final AtomicBoolean executed = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch processedLatch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask<Void>(null, null) {
@Override
protected Void newResponse(boolean acknowledged) {
return null;
}
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
@ -193,10 +198,10 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
final AtomicBoolean executed = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch processedLatch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask<Void>(null, null) {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
protected Void newResponse(boolean acknowledged) {
return null;
}
@Override
@ -263,7 +268,12 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
final AtomicBoolean onFailure = new AtomicBoolean(false);
final AtomicBoolean executed = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask<Void>(null, null) {
@Override
protected Void newResponse(boolean acknowledged) {
return null;
}
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return false;
@ -331,7 +341,12 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
final AtomicBoolean executed = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch processedLatch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask<Void>(null, null) {
@Override
protected Void newResponse(boolean acknowledged) {
return null;
}
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return false;