From f16451a4464fe7d0a32d41693fc8c3f517610417 Mon Sep 17 00:00:00 2001 From: javanna Date: Wed, 18 Jun 2014 23:29:13 +0200 Subject: [PATCH] Refactored AckedClusterStateUpdateTask & co. to remove code repetitions in subclasses Made AckedClusterStateUpdateTask an abstract class instead of an interface, which contains the common methods. Also introduced the AckedRequest interface to mark both AcknowledgedRequest & ClusterStateUpdateRequest so that the different ways of updating the cluster state (with or without a MetaData*Service) can share the same code. Removed ClusterStateUpdateListener as we can just use its base class ActionListener instead. Closes #6559 --- .../TransportDeleteRepositoryAction.java | 6 +- .../put/TransportPutRepositoryAction.java | 5 +- .../TransportClusterRerouteAction.java | 33 +------ .../TransportClusterUpdateSettingsAction.java | 57 ++--------- .../alias/TransportIndicesAliasesAction.java | 3 +- .../close/TransportCloseIndexAction.java | 3 +- .../create/TransportCreateIndexAction.java | 3 +- .../delete/TransportDeleteMappingAction.java | 3 +- .../put/TransportPutMappingAction.java | 3 +- .../open/TransportOpenIndexAction.java | 3 +- .../put/TransportUpdateSettingsAction.java | 3 +- .../delete/TransportDeleteWarmerAction.java | 36 +------ .../warmer/put/TransportPutWarmerAction.java | 36 +------ .../support/master/AcknowledgedRequest.java | 8 +- .../cluster/AckedClusterStateUpdateTask.java | 45 ++++++++- ...eUpdateListener.java => AckedRequest.java} | 15 +-- .../ack/ClusterStateUpdateRequest.java | 2 +- .../action/index/MappingUpdatedAction.java | 3 +- .../metadata/MetaDataCreateIndexService.java | 34 ++----- .../metadata/MetaDataIndexAliasesService.java | 44 +-------- .../metadata/MetaDataIndexStateService.java | 83 ++-------------- .../metadata/MetaDataMappingService.java | 88 +++-------------- .../MetaDataUpdateSettingsService.java | 43 ++------- .../repositories/RepositoriesService.java | 96 +++---------------- .../cluster/ClusterServiceTests.java | 27 ++++-- 25 files changed, 170 insertions(+), 512 deletions(-) rename src/main/java/org/elasticsearch/cluster/ack/{ClusterStateUpdateListener.java => AckedRequest.java} (70%) diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java index a61498d94bb..33ddeb9a6dc 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.common.inject.Inject; @@ -76,10 +77,9 @@ public class TransportDeleteRepositoryAction extends TransportMasterNodeOperatio repositoriesService.unregisterRepository( new RepositoriesService.UnregisterRepositoryRequest("delete_repository [" + request.name() + "]", request.name()) .masterNodeTimeout(request.masterNodeTimeout()).ackTimeout(request.timeout()), - new ActionListener() { - + new ActionListener() { @Override - public void onResponse(RepositoriesService.UnregisterRepositoryResponse unregisterRepositoryResponse) { + public void onResponse(ClusterStateUpdateResponse unregisterRepositoryResponse) { listener.onResponse(new DeleteRepositoryResponse(unregisterRepositoryResponse.isAcknowledged())); } diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java index ed59359001a..a5aa7673819 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.common.inject.Inject; @@ -77,10 +78,10 @@ public class TransportPutRepositoryAction extends TransportMasterNodeOperationAc repositoriesService.registerRepository(new RepositoriesService.RegisterRepositoryRequest("put_repository [" + request.name() + "]", request.name(), request.type()) .settings(request.settings()) .masterNodeTimeout(request.masterNodeTimeout()) - .ackTimeout(request.timeout()), new ActionListener() { + .ackTimeout(request.timeout()), new ActionListener() { @Override - public void onResponse(RepositoriesService.RegisterRepositoryResponse response) { + public void onResponse(ClusterStateUpdateResponse response) { listener.onResponse(new PutRepositoryResponse(response.isAcknowledged())); } diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java index f3377992a96..143fd92f896 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java @@ -25,15 +25,12 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.allocation.RoutingExplanations; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.common.Nullable; +import org.elasticsearch.cluster.routing.allocation.RoutingExplanations; import org.elasticsearch.common.Priority; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -73,19 +70,14 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA @Override protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state, final ActionListener listener) throws ElasticsearchException { - clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.IMMEDIATE, new AckedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.IMMEDIATE, new AckedClusterStateUpdateTask(request, listener) { private volatile ClusterState clusterStateToSend; private volatile RoutingExplanations explanations; @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - return true; - } - - @Override - public void onAllNodesAcked(@Nullable Throwable t) { - listener.onResponse(new ClusterRerouteResponse(true, clusterStateToSend, explanations)); + protected ClusterRerouteResponse newResponse(boolean acknowledged) { + return new ClusterRerouteResponse(acknowledged, clusterStateToSend, explanations); } @Override @@ -93,20 +85,10 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA listener.onResponse(new ClusterRerouteResponse(false, clusterStateToSend, new RoutingExplanations())); } - @Override - public TimeValue ackTimeout() { - return request.timeout(); - } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } - @Override public void onFailure(String source, Throwable t) { logger.debug("failed to perform [{}]", t, source); - listener.onFailure(t); + super.onFailure(source, t); } @Override @@ -120,11 +102,6 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA } return newState; } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - - } }); } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index 3e8af922776..fb0842ecab5 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -37,7 +37,6 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -87,13 +86,13 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe final ImmutableSettings.Builder transientUpdates = ImmutableSettings.settingsBuilder(); final ImmutableSettings.Builder persistentUpdates = ImmutableSettings.settingsBuilder(); - clusterService.submitStateUpdateTask("cluster_update_settings", Priority.IMMEDIATE, new AckedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("cluster_update_settings", Priority.IMMEDIATE, new AckedClusterStateUpdateTask(request, listener) { private volatile boolean changed = false; @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - return true; + protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) { + return new ClusterUpdateSettingsResponse(acknowledged, transientUpdates.build(), persistentUpdates.build()); } @Override @@ -101,9 +100,8 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe if (changed) { reroute(true); } else { - listener.onResponse(new ClusterUpdateSettingsResponse(true, transientUpdates.build(), persistentUpdates.build())); + super.onAllNodesAcked(t); } - } @Override @@ -111,7 +109,7 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe if (changed) { reroute(false); } else { - listener.onResponse(new ClusterUpdateSettingsResponse(false, transientUpdates.build(), persistentUpdates.build())); + super.onAckTimeout(); } } @@ -129,7 +127,7 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe // in the components (e.g. FilterAllocationDecider), so the changes made by the first call aren't visible // to the components until the ClusterStateListener instances have been invoked, but are visible after // the first update task has been completed. - clusterService.submitStateUpdateTask("reroute_after_cluster_update_settings", Priority.URGENT, new AckedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("reroute_after_cluster_update_settings", Priority.URGENT, new AckedClusterStateUpdateTask(request, listener) { @Override public boolean mustAck(DiscoveryNode discoveryNode) { @@ -138,25 +136,9 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe } @Override - public void onAllNodesAcked(@Nullable Throwable t) { - //we return when the cluster reroute is acked (the acknowledged flag depends on whether the update settings was acknowledged) - listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build())); - } - - @Override - public void onAckTimeout() { - //we return when the cluster reroute ack times out (acknowledged false) - listener.onResponse(new ClusterUpdateSettingsResponse(false, transientUpdates.build(), persistentUpdates.build())); - } - - @Override - public TimeValue ackTimeout() { - return request.timeout(); - } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); + //we return when the cluster reroute is acked or it times out but the acknowledged flag depends on whether the update settings was acknowledged + protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) { + return new ClusterUpdateSettingsResponse(updateSettingsAcked && acknowledged, transientUpdates.build(), persistentUpdates.build()); } @Override @@ -175,27 +157,13 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe } return ClusterState.builder(currentState).routingResult(routingResult).build(); } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - } }); } - @Override - public TimeValue ackTimeout() { - return request.timeout(); - } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } - @Override public void onFailure(String source, Throwable t) { logger.debug("failed to perform [{}]", t, source); - listener.onFailure(t); + super.onFailure(source, t); } @Override @@ -252,11 +220,6 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe return builder(currentState).metaData(metaData).blocks(blocks).build(); } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - - } }); } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java b/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java index e4b26739438..f800685b6ce 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java @@ -26,7 +26,6 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasA import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -123,7 +122,7 @@ public class TransportIndicesAliasesAction extends TransportMasterNodeOperationA .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) .actions(finalActions.toArray(new AliasAction[finalActions.size()])); - indexAliasesService.indicesAliases(updateRequest, new ClusterStateUpdateListener() { + indexAliasesService.indicesAliases(updateRequest, new ActionListener() { @Override public void onResponse(ClusterStateUpdateResponse response) { listener.onResponse(new IndicesAliasesResponse(response.isAcknowledged())); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java b/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java index 77f889e4042..b73bb494639 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -91,7 +90,7 @@ public class TransportCloseIndexAction extends TransportMasterNodeOperationActio .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) .indices(request.indices()); - indexStateService.closeIndex(updateRequest, new ClusterStateUpdateListener() { + indexStateService.closeIndex(updateRequest, new ActionListener() { @Override public void onResponse(ClusterStateUpdateResponse response) { diff --git a/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java b/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java index bb0250f625c..5a85495aac2 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -87,7 +86,7 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi .settings(request.settings()).mappings(request.mappings()) .aliases(request.aliases()).customs(request.customs()); - createIndexService.createIndex(updateRequest, new ClusterStateUpdateListener() { + createIndexService.createIndex(updateRequest, new ActionListener() { @Override public void onResponse(ClusterStateUpdateResponse response) { diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java index 6390c225c84..549fc868477 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java @@ -37,7 +37,6 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -179,7 +178,7 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc .ackTimeout(request.timeout()) .masterNodeTimeout(request.masterNodeTimeout()); - metaDataMappingService.removeMapping(clusterStateUpdateRequest, new ClusterStateUpdateListener() { + metaDataMappingService.removeMapping(clusterStateUpdateRequest, new ActionListener() { @Override public void onResponse(ClusterStateUpdateResponse response) { listener.onResponse(new DeleteMappingResponse(response.isAcknowledged())); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java index 3b39c48835d..e89e7de23c3 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -87,7 +86,7 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio .indices(request.indices()).type(request.type()) .source(request.source()).ignoreConflicts(request.ignoreConflicts()); - metaDataMappingService.putMapping(updateRequest, new ClusterStateUpdateListener() { + metaDataMappingService.putMapping(updateRequest, new ActionListener() { @Override public void onResponse(ClusterStateUpdateResponse response) { diff --git a/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java b/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java index e4bcbfba865..9b8b3bddedb 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -91,7 +90,7 @@ public class TransportOpenIndexAction extends TransportMasterNodeOperationAction .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) .indices(request.indices()); - indexStateService.openIndex(updateRequest, new ClusterStateUpdateListener() { + indexStateService.openIndex(updateRequest, new ActionListener() { @Override public void onResponse(ClusterStateUpdateResponse response) { diff --git a/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java b/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java index 5d17b5234f8..bd052dbc5c8 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService; import org.elasticsearch.common.inject.Inject; @@ -81,7 +80,7 @@ public class TransportUpdateSettingsAction extends TransportMasterNodeOperationA .ackTimeout(request.timeout()) .masterNodeTimeout(request.masterNodeTimeout()); - updateSettingsService.updateSettings(clusterStateUpdateRequest, new ClusterStateUpdateListener() { + updateSettingsService.updateSettings(clusterStateUpdateRequest, new ActionListener() { @Override public void onResponse(ClusterStateUpdateResponse response) { listener.onResponse(new UpdateSettingsResponse(response.isAcknowledged())); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java index 5d904705ebb..d9c9b31160e 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java @@ -29,12 +29,9 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.search.warmer.IndexWarmerMissingException; @@ -90,37 +87,17 @@ public class TransportDeleteWarmerAction extends TransportMasterNodeOperationAct @Override protected void masterOperation(final DeleteWarmerRequest request, final ClusterState state, final ActionListener listener) throws ElasticsearchException { - clusterService.submitStateUpdateTask("delete_warmer [" + Arrays.toString(request.names()) + "]", new AckedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("delete_warmer [" + Arrays.toString(request.names()) + "]", new AckedClusterStateUpdateTask(request, listener) { @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - return true; - } - - @Override - public void onAllNodesAcked(@Nullable Throwable t) { - listener.onResponse(new DeleteWarmerResponse(true)); - } - - @Override - public void onAckTimeout() { - listener.onResponse(new DeleteWarmerResponse(false)); - } - - @Override - public TimeValue ackTimeout() { - return request.timeout(); - } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); + protected DeleteWarmerResponse newResponse(boolean acknowledged) { + return new DeleteWarmerResponse(acknowledged); } @Override public void onFailure(String source, Throwable t) { logger.debug("failed to delete warmer [{}] on indices [{}]", t, Arrays.toString(request.names()), request.indices()); - listener.onFailure(t); + super.onFailure(source, t); } @Override @@ -184,11 +161,6 @@ public class TransportDeleteWarmerAction extends TransportMasterNodeOperationAct return ClusterState.builder(currentState).metaData(mdBuilder).build(); } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - - } }); } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/TransportPutWarmerAction.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/TransportPutWarmerAction.java index c55afbf7bf3..28b9bcb6869 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/TransportPutWarmerAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/TransportPutWarmerAction.java @@ -31,12 +31,9 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.search.warmer.IndexWarmersMetaData; @@ -98,37 +95,17 @@ public class TransportPutWarmerAction extends TransportMasterNodeOperationAction return; } - clusterService.submitStateUpdateTask("put_warmer [" + request.name() + "]", new AckedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("put_warmer [" + request.name() + "]", new AckedClusterStateUpdateTask(request, listener) { @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - return true; - } - - @Override - public void onAllNodesAcked(@Nullable Throwable t) { - listener.onResponse(new PutWarmerResponse(true)); - } - - @Override - public void onAckTimeout() { - listener.onResponse(new PutWarmerResponse(false)); - } - - @Override - public TimeValue ackTimeout() { - return request.timeout(); - } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); + protected PutWarmerResponse newResponse(boolean acknowledged) { + return new PutWarmerResponse(acknowledged); } @Override public void onFailure(String source, Throwable t) { logger.debug("failed to put warmer [{}] on indices [{}]", t, request.name(), request.searchRequest().indices()); - listener.onFailure(t); + super.onFailure(source, t); } @Override @@ -180,11 +157,6 @@ public class TransportPutWarmerAction extends TransportMasterNodeOperationAction return ClusterState.builder(currentState).metaData(mdBuilder).build(); } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - - } }); } diff --git a/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java index 56ce2b38535..04ccd492ae8 100644 --- a/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java +++ b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.action.support.master; +import org.elasticsearch.cluster.ack.AckedRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; @@ -31,7 +32,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; * Abstract class that allows to mark action requests that support acknowledgements. * Facilitates consistency across different api. */ -public abstract class AcknowledgedRequest extends MasterNodeOperationRequest { +public abstract class AcknowledgedRequest extends MasterNodeOperationRequest implements AckedRequest { public static final TimeValue DEFAULT_ACK_TIMEOUT = timeValueSeconds(30); @@ -83,4 +84,9 @@ public abstract class AcknowledgedRequest protected void writeTimeout(StreamOutput out) throws IOException { timeout.writeTo(out); } + + @Override + public TimeValue ackTimeout() { + return timeout; + } } diff --git a/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java b/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java index b354e4d08fa..7cdee753873 100644 --- a/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java +++ b/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.cluster; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ack.AckedRequest; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; @@ -26,30 +28,63 @@ import org.elasticsearch.common.unit.TimeValue; * An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when * all the nodes have acknowledged a cluster state update request */ -public interface AckedClusterStateUpdateTask extends TimeoutClusterStateUpdateTask { +public abstract class AckedClusterStateUpdateTask implements TimeoutClusterStateUpdateTask { + + private final ActionListener listener; + private final AckedRequest request; + + protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener listener) { + this.listener = listener; + this.request = request; + } /** * Called to determine which nodes the acknowledgement is expected from * @param discoveryNode a node * @return true if the node is expected to send ack back, false otherwise */ - boolean mustAck(DiscoveryNode discoveryNode); + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } /** * Called once all the nodes have acknowledged the cluster state update request. Must be * very lightweight execution, since it gets executed on the cluster service thread. * @param t optional error that might have been thrown */ - void onAllNodesAcked(@Nullable Throwable t); + public void onAllNodesAcked(@Nullable Throwable t) { + listener.onResponse(newResponse(true)); + } + + protected abstract Response newResponse(boolean acknowledged); /** * Called once the acknowledgement timeout defined by * {@link AckedClusterStateUpdateTask#ackTimeout()} has expired */ - void onAckTimeout(); + public void onAckTimeout() { + listener.onResponse(newResponse(false)); + } + + @Override + public void onFailure(String source, Throwable t) { + listener.onFailure(t); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + + } /** * Acknowledgement timeout, maximum time interval to wait for acknowledgements */ - TimeValue ackTimeout(); + public TimeValue ackTimeout() { + return request.ackTimeout(); + } + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } } diff --git a/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateListener.java b/src/main/java/org/elasticsearch/cluster/ack/AckedRequest.java similarity index 70% rename from src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateListener.java rename to src/main/java/org/elasticsearch/cluster/ack/AckedRequest.java index 1ed19269360..ea835d209b0 100644 --- a/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateListener.java +++ b/src/main/java/org/elasticsearch/cluster/ack/AckedRequest.java @@ -19,19 +19,20 @@ package org.elasticsearch.cluster.ack; +import org.elasticsearch.common.unit.TimeValue; + /** - * Listener used for cluster state updates processing - * Supports acknowledgement logic + * Identifies a cluster state update request with acknowledgement support */ -public interface ClusterStateUpdateListener { +public interface AckedRequest { /** - * Called when the cluster state update is acknowledged + * Returns the acknowledgement timeout */ - void onResponse(ClusterStateUpdateResponse response); + TimeValue ackTimeout(); /** - * Called when any error is thrown during the cluster state update processing + * Returns the timeout for the request to be completed on the master node */ - void onFailure(Throwable t); + TimeValue masterNodeTimeout(); } diff --git a/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateRequest.java b/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateRequest.java index c15310b538b..5e73ccf252f 100644 --- a/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateRequest.java +++ b/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateRequest.java @@ -25,7 +25,7 @@ import org.elasticsearch.common.unit.TimeValue; * Base class to be used when needing to update the cluster state * Contains the basic fields that are always needed */ -public abstract class ClusterStateUpdateRequest> { +public abstract class ClusterStateUpdateRequest> implements AckedRequest { private TimeValue ackTimeout; private TimeValue masterNodeTimeout; diff --git a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index e702a3a04c1..13d19717f6a 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -27,7 +27,6 @@ import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataMappingService; @@ -148,7 +147,7 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction listener) throws ElasticsearchException { - metaDataMappingService.updateMapping(request.index(), request.indexUUID(), request.type(), request.mappingSource(), request.order, request.nodeId, new ClusterStateUpdateListener() { + metaDataMappingService.updateMapping(request.index(), request.indexUUID(), request.type(), request.mappingSource(), request.order, request.nodeId, new ActionListener() { @Override public void onResponse(ClusterStateUpdateResponse response) { listener.onResponse(new MappingUpdatedResponse()); diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 6e4d3e6060d..e43e56b6072 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -28,16 +28,15 @@ import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -51,7 +50,6 @@ import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; @@ -112,7 +110,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { this.aliasValidator = aliasValidator; } - public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) { + public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener listener) { ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder(); for (Map.Entry entry : request.settings().getAsMap().entrySet()) { if (!entry.getKey().startsWith("index.")) { @@ -176,40 +174,30 @@ public class MetaDataCreateIndexService extends AbstractComponent { } } - private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ClusterStateUpdateListener listener, final Semaphore mdLock) { - clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", Priority.URGENT, new AckedClusterStateUpdateTask() { + private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener listener, final Semaphore mdLock) { + clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", Priority.URGENT, new AckedClusterStateUpdateTask(request, listener) { @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - return true; + protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { + return new ClusterStateUpdateResponse(acknowledged); } @Override public void onAllNodesAcked(@Nullable Throwable t) { mdLock.release(); - listener.onResponse(new ClusterStateUpdateResponse(true)); + super.onAllNodesAcked(t); } @Override public void onAckTimeout() { mdLock.release(); - listener.onResponse(new ClusterStateUpdateResponse(false)); - } - - @Override - public TimeValue ackTimeout() { - return request.ackTimeout(); - } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); + super.onAckTimeout(); } @Override public void onFailure(String source, Throwable t) { mdLock.release(); - listener.onFailure(t); + super.onFailure(source, t); } @Override @@ -446,10 +434,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { } } } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - } }); } diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java index 1a28e60d48c..fe660f9083b 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java @@ -22,20 +22,17 @@ package org.elasticsearch.cluster.metadata; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.service.IndexService; @@ -64,37 +61,11 @@ public class MetaDataIndexAliasesService extends AbstractComponent { this.aliasValidator = aliasValidator; } - public void indicesAliases(final IndicesAliasesClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) { - clusterService.submitStateUpdateTask("index-aliases", Priority.URGENT, new AckedClusterStateUpdateTask() { - + public void indicesAliases(final IndicesAliasesClusterStateUpdateRequest request, final ActionListener listener) { + clusterService.submitStateUpdateTask("index-aliases", Priority.URGENT, new AckedClusterStateUpdateTask(request, listener) { @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - return true; - } - - @Override - public void onAllNodesAcked(@Nullable Throwable t) { - listener.onResponse(new ClusterStateUpdateResponse(true)); - } - - @Override - public void onAckTimeout() { - listener.onResponse(new ClusterStateUpdateResponse(false)); - } - - @Override - public TimeValue ackTimeout() { - return request.ackTimeout(); - } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } - - @Override - public void onFailure(String source, Throwable t) { - listener.onFailure(t); + protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { + return new ClusterStateUpdateResponse(acknowledged); } @Override @@ -186,11 +157,6 @@ public class MetaDataIndexAliasesService extends AbstractComponent { } } } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - - } }); } } diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 5432f0aee9b..5c556b86246 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -20,28 +20,25 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndexPrimaryShardNotAllocatedException; @@ -69,42 +66,16 @@ public class MetaDataIndexStateService extends AbstractComponent { this.allocationService = allocationService; } - public void closeIndex(final CloseIndexClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) { + public void closeIndex(final CloseIndexClusterStateUpdateRequest request, final ActionListener listener) { if (request.indices() == null || request.indices().length == 0) { throw new ElasticsearchIllegalArgumentException("Index name is required"); } final String indicesAsString = Arrays.toString(request.indices()); - clusterService.submitStateUpdateTask("close-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask() { - + clusterService.submitStateUpdateTask("close-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask(request, listener) { @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - return true; - } - - @Override - public void onAllNodesAcked(@Nullable Throwable t) { - listener.onResponse(new ClusterStateUpdateResponse(true)); - } - - @Override - public void onAckTimeout() { - listener.onResponse(new ClusterStateUpdateResponse(false)); - } - - @Override - public TimeValue ackTimeout() { - return request.ackTimeout(); - } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } - - @Override - public void onFailure(String source, Throwable t) { - listener.onFailure(t); + protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { + return new ClusterStateUpdateResponse(acknowledged); } @Override @@ -152,50 +123,19 @@ public class MetaDataIndexStateService extends AbstractComponent { //no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask return ClusterState.builder(updatedState).routingResult(routingResult).build(); } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - - } }); } - public void openIndex(final OpenIndexClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) { + public void openIndex(final OpenIndexClusterStateUpdateRequest request, final ActionListener listener) { if (request.indices() == null || request.indices().length == 0) { throw new ElasticsearchIllegalArgumentException("Index name is required"); } final String indicesAsString = Arrays.toString(request.indices()); - clusterService.submitStateUpdateTask("open-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask() { - + clusterService.submitStateUpdateTask("open-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask(request, listener) { @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - return true; - } - - @Override - public void onAllNodesAcked(@Nullable Throwable t) { - listener.onResponse(new ClusterStateUpdateResponse(true)); - } - - @Override - public void onAckTimeout() { - listener.onResponse(new ClusterStateUpdateResponse(false)); - } - - @Override - public TimeValue ackTimeout() { - return request.ackTimeout(); - } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } - - @Override - public void onFailure(String source, Throwable t) { - listener.onFailure(t); + protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { + return new ClusterStateUpdateResponse(acknowledged); } @Override @@ -236,11 +176,6 @@ public class MetaDataIndexStateService extends AbstractComponent { //no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask return ClusterState.builder(updatedState).routingResult(routingResult).build(); } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - - } }); } diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index de5f058d74f..3d5ae715859 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -22,22 +22,19 @@ package org.elasticsearch.cluster.metadata; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; @@ -102,9 +99,9 @@ public class MetaDataMappingService extends AbstractComponent { final CompressedString mappingSource; final long order; // -1 for unknown final String nodeId; // null fr unknown - final ClusterStateUpdateListener listener; + final ActionListener listener; - UpdateTask(String index, String indexUUID, String type, CompressedString mappingSource, long order, String nodeId, ClusterStateUpdateListener listener) { + UpdateTask(String index, String indexUUID, String type, CompressedString mappingSource, long order, String nodeId, ActionListener listener) { super(index, indexUUID); this.type = type; this.mappingSource = mappingSource; @@ -365,7 +362,7 @@ public class MetaDataMappingService extends AbstractComponent { }); } - public void updateMapping(final String index, final String indexUUID, final String type, final CompressedString mappingSource, final long order, final String nodeId, final ClusterStateUpdateListener listener) { + public void updateMapping(final String index, final String indexUUID, final String type, final CompressedString mappingSource, final long order, final String nodeId, final ActionListener listener) { final long insertOrder; synchronized (refreshOrUpdateMutex) { insertOrder = ++refreshOrUpdateInsertOrder; @@ -384,37 +381,11 @@ public class MetaDataMappingService extends AbstractComponent { }); } - public void removeMapping(final DeleteMappingClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) { - clusterService.submitStateUpdateTask("remove-mapping [" + Arrays.toString(request.types()) + "]", Priority.HIGH, new AckedClusterStateUpdateTask() { - + public void removeMapping(final DeleteMappingClusterStateUpdateRequest request, final ActionListener listener) { + clusterService.submitStateUpdateTask("remove-mapping [" + Arrays.toString(request.types()) + "]", Priority.HIGH, new AckedClusterStateUpdateTask(request, listener) { @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - return true; - } - - @Override - public void onAllNodesAcked(@Nullable Throwable t) { - listener.onResponse(new ClusterStateUpdateResponse(true)); - } - - @Override - public void onAckTimeout() { - listener.onResponse(new ClusterStateUpdateResponse(false)); - } - - @Override - public TimeValue ackTimeout() { - return request.ackTimeout(); - } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } - - @Override - public void onFailure(String source, Throwable t) { - listener.onFailure(t); + protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { + return new ClusterStateUpdateResponse(acknowledged); } @Override @@ -455,46 +426,16 @@ public class MetaDataMappingService extends AbstractComponent { return ClusterState.builder(currentState).metaData(builder).build(); } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - - } }); } - public void putMapping(final PutMappingClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) { + public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener listener) { - clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]", Priority.HIGH, new AckedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]", Priority.HIGH, new AckedClusterStateUpdateTask(request, listener) { @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - return true; - } - - @Override - public void onAllNodesAcked(@Nullable Throwable t) { - listener.onResponse(new ClusterStateUpdateResponse(true)); - } - - @Override - public void onAckTimeout() { - listener.onResponse(new ClusterStateUpdateResponse(false)); - } - - @Override - public TimeValue ackTimeout() { - return request.ackTimeout(); - } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } - - @Override - public void onFailure(String source, Throwable t) { - listener.onFailure(t); + protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { + return new ClusterStateUpdateResponse(acknowledged); } @Override @@ -627,11 +568,6 @@ public class MetaDataMappingService extends AbstractComponent { } } } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - - } }); } } diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java index 8b297b7bddf..8cec52884f8 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java @@ -21,19 +21,17 @@ package org.elasticsearch.cluster.metadata; import com.google.common.collect.Sets; import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.*; -import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.settings.DynamicSettings; import org.elasticsearch.common.Booleans; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -149,7 +147,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements .ackTimeout(TimeValue.timeValueMillis(0)) //no need to wait for ack here .masterNodeTimeout(TimeValue.timeValueMinutes(10)); - updateSettings(updateRequest, new ClusterStateUpdateListener() { + updateSettings(updateRequest, new ActionListener() { @Override public void onResponse(ClusterStateUpdateResponse response) { for (String index : indices) { @@ -168,7 +166,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements } } - public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) { + public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ActionListener listener) { ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder(); for (Map.Entry entry : request.settings().getAsMap().entrySet()) { if (entry.getKey().equals("index")) { @@ -215,36 +213,11 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements } final Settings openSettings = updatedSettingsBuilder.build(); - clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new AckedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new AckedClusterStateUpdateTask(request, listener) { @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - return true; - } - - @Override - public void onAllNodesAcked(@Nullable Throwable t) { - listener.onResponse(new ClusterStateUpdateResponse(true)); - } - - @Override - public void onAckTimeout() { - listener.onResponse(new ClusterStateUpdateResponse(false)); - } - - @Override - public TimeValue ackTimeout() { - return request.ackTimeout(); - } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } - - @Override - public void onFailure(String source, Throwable t) { - listener.onFailure(t); + protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { + return new ClusterStateUpdateResponse(acknowledged); } @Override @@ -343,10 +316,6 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements return updatedState; } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - } }); } } diff --git a/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 58c99c590a8..27f9792676b 100644 --- a/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoriesMetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Injector; @@ -37,7 +36,6 @@ import org.elasticsearch.common.inject.Injectors; import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotsService; @@ -84,10 +82,15 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta * @param request register repository request * @param listener register repository listener */ - public void registerRepository(final RegisterRepositoryRequest request, final ActionListener listener) { + public void registerRepository(final RegisterRepositoryRequest request, final ActionListener listener) { final RepositoryMetaData newRepositoryMetaData = new RepositoryMetaData(request.name, request.type, request.settings); - clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask(request, listener) { + @Override + protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { + return new ClusterStateUpdateResponse(acknowledged); + } + @Override public ClusterState execute(ClusterState currentState) { ensureRepositoryNotInUse(currentState, request.name); @@ -129,38 +132,13 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta @Override public void onFailure(String source, Throwable t) { logger.warn("failed to create repository [{}]", t, request.name); - listener.onFailure(t); - } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - + super.onFailure(source, t); } @Override public boolean mustAck(DiscoveryNode discoveryNode) { return discoveryNode.masterNode(); } - - @Override - public void onAllNodesAcked(@Nullable Throwable t) { - listener.onResponse(new RegisterRepositoryResponse(true)); - } - - @Override - public void onAckTimeout() { - listener.onResponse(new RegisterRepositoryResponse(false)); - } - - @Override - public TimeValue ackTimeout() { - return request.ackTimeout(); - } }); } @@ -172,8 +150,13 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta * @param request unregister repository request * @param listener unregister repository listener */ - public void unregisterRepository(final UnregisterRepositoryRequest request, final ActionListener listener) { - clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask() { + public void unregisterRepository(final UnregisterRepositoryRequest request, final ActionListener listener) { + clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask(request, listener) { + @Override + protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { + return new ClusterStateUpdateResponse(acknowledged); + } + @Override public ClusterState execute(ClusterState currentState) { ensureRepositoryNotInUse(currentState, request.name); @@ -200,40 +183,11 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta throw new RepositoryMissingException(request.name); } - @Override - public void onFailure(String source, Throwable t) { - listener.onFailure(t); - } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - } - @Override public boolean mustAck(DiscoveryNode discoveryNode) { // Since operation occurs only on masters, it's enough that only master-eligible nodes acked return discoveryNode.masterNode(); } - - @Override - public void onAllNodesAcked(@Nullable Throwable t) { - listener.onResponse(new UnregisterRepositoryResponse(true)); - } - - @Override - public void onAckTimeout() { - listener.onResponse(new UnregisterRepositoryResponse(false)); - } - - @Override - public TimeValue ackTimeout() { - return request.ackTimeout(); - } }); } @@ -464,17 +418,6 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta } } - /** - * Register repository response - */ - public static class RegisterRepositoryResponse extends ClusterStateUpdateResponse { - - RegisterRepositoryResponse(boolean acknowledged) { - super(acknowledged); - } - - } - /** * Unregister repository request */ @@ -496,13 +439,4 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta } } - - /** - * Unregister repository response - */ - public static class UnregisterRepositoryResponse extends ClusterStateUpdateResponse { - UnregisterRepositoryResponse(boolean acknowledged) { - super(acknowledged); - } - } } diff --git a/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java index d12d75676bd..ae9bd536921 100644 --- a/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java +++ b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java @@ -122,7 +122,12 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest { final AtomicBoolean executed = new AtomicBoolean(false); final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch processedLatch = new CountDownLatch(1); - clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask(null, null) { + @Override + protected Void newResponse(boolean acknowledged) { + return null; + } + @Override public boolean mustAck(DiscoveryNode discoveryNode) { return true; @@ -193,10 +198,10 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest { final AtomicBoolean executed = new AtomicBoolean(false); final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch processedLatch = new CountDownLatch(1); - clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask(null, null) { @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - return true; + protected Void newResponse(boolean acknowledged) { + return null; } @Override @@ -263,7 +268,12 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest { final AtomicBoolean onFailure = new AtomicBoolean(false); final AtomicBoolean executed = new AtomicBoolean(false); final CountDownLatch latch = new CountDownLatch(1); - clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask(null, null) { + @Override + protected Void newResponse(boolean acknowledged) { + return null; + } + @Override public boolean mustAck(DiscoveryNode discoveryNode) { return false; @@ -331,7 +341,12 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest { final AtomicBoolean executed = new AtomicBoolean(false); final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch processedLatch = new CountDownLatch(1); - clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask(null, null) { + @Override + protected Void newResponse(boolean acknowledged) { + return null; + } + @Override public boolean mustAck(DiscoveryNode discoveryNode) { return false;