From 294f09a1d7a603f26ebd554be6c4e74a6f5ed76b Mon Sep 17 00:00:00 2001 From: kimchy Date: Sun, 11 Jul 2010 20:41:58 +0300 Subject: [PATCH] Shutdown API: Improve behavior when shutting down the whole cluster, closes #250. --- .../org/elasticsearch/action/Actions.java | 30 ++ .../health/TransportClusterHealthAction.java | 2 +- .../node/shutdown/NodesShutdownRequest.java | 48 +++- .../node/shutdown/NodesShutdownResponse.java | 58 ++-- .../TransportNodesShutdownAction.java | 271 ++++++++++++------ .../state/TransportClusterStateAction.java | 2 +- .../alias/TransportIndicesAliasesAction.java | 2 +- .../create/TransportCreateIndexAction.java | 2 +- .../delete/TransportDeleteIndexAction.java | 2 +- .../put/TransportPutMappingAction.java | 2 +- .../TransportMasterNodeOperationAction.java | 23 +- .../nodes/TransportNodesOperationAction.java | 17 +- .../action/index/MappingUpdatedAction.java | 3 +- .../shutdown/RestNodesShutdownAction.java | 11 +- 14 files changed, 309 insertions(+), 164 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/Actions.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/Actions.java index 226e8dfa382..fa5179c26d5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/Actions.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/Actions.java @@ -19,6 +19,9 @@ package org.elasticsearch.action; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; + /** * @author kimchy (shay.banon) */ @@ -31,4 +34,31 @@ public class Actions { validationException.addValidationError(error); return validationException; } + + public static boolean isAllNodes(String... nodesIds) { + return nodesIds == null || nodesIds.length == 0 || (nodesIds.length == 1 && nodesIds[0].equals("_all")); + } + + public static String[] buildNodesIds(DiscoveryNodes nodes, String... nodesIds) { + if (isAllNodes(nodesIds)) { + int index = 0; + nodesIds = new String[nodes.size()]; + for (DiscoveryNode node : nodes) { + nodesIds[index++] = node.id(); + } + return nodesIds; + } else { + String[] resolvedNodesIds = new String[nodesIds.length]; + for (int i = 0; i < nodesIds.length; i++) { + if (nodesIds[i].equals("_local")) { + resolvedNodesIds[i] = nodes.localNodeId(); + } else if (nodesIds[i].equals("_master")) { + resolvedNodesIds[i] = nodes.masterNodeId(); + } else { + resolvedNodesIds[i] = nodesIds[i]; + } + } + return resolvedNodesIds; + } + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java index 166cae26ade..fe8806ac6b3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -64,7 +64,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc return new ClusterHealthResponse(); } - @Override protected ClusterHealthResponse masterOperation(ClusterHealthRequest request) throws ElasticSearchException { + @Override protected ClusterHealthResponse masterOperation(ClusterHealthRequest request, ClusterState state) throws ElasticSearchException { int waitFor = 3; if (request.waitForStatus() == null) { waitFor--; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodesShutdownRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodesShutdownRequest.java index ebe14445627..bf067fbfcad 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodesShutdownRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodesShutdownRequest.java @@ -19,7 +19,9 @@ package org.elasticsearch.action.admin.cluster.node.shutdown; -import org.elasticsearch.action.support.nodes.NodesOperationRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.MasterNodeOperationRequest; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; @@ -29,23 +31,24 @@ import java.io.IOException; import static org.elasticsearch.common.unit.TimeValue.*; /** - * A request to shutdown one ore more nodes (or the whole cluster). - * * @author kimchy (shay.banon) */ -public class NodesShutdownRequest extends NodesOperationRequest { +public class NodesShutdownRequest extends MasterNodeOperationRequest { + + String[] nodesIds = Strings.EMPTY_ARRAY; TimeValue delay = TimeValue.timeValueSeconds(1); - protected NodesShutdownRequest() { + NodesShutdownRequest() { } - /** - * Shuts down nodes based on the nodes ids specified. If none are passed, all - * nodes will be shutdown. - */ public NodesShutdownRequest(String... nodesIds) { - super(nodesIds); + this.nodesIds = nodesIds; + } + + public NodesShutdownRequest nodesIds(String... nodesIds) { + this.nodesIds = nodesIds; + return this; } /** @@ -56,6 +59,10 @@ public class NodesShutdownRequest extends NodesOperationRequest { return this; } + public TimeValue delay() { + return this.delay; + } + /** * The delay for the shutdown to occur. Defaults to 1s. */ @@ -63,17 +70,32 @@ public class NodesShutdownRequest extends NodesOperationRequest { return delay(TimeValue.parseTimeValue(delay, null)); } - public TimeValue delay() { - return this.delay; + @Override public ActionRequestValidationException validate() { + return null; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); delay = readTimeValue(in); + int size = in.readVInt(); + if (size > 0) { + nodesIds = new String[size]; + for (int i = 0; i < nodesIds.length; i++) { + nodesIds[i] = in.readUTF(); + } + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); delay.writeTo(out); + if (nodesIds == null) { + out.writeVInt(0); + } else { + out.writeVInt(nodesIds.length); + for (String nodeId : nodesIds) { + out.writeUTF(nodeId); + } + } } -} \ No newline at end of file +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodesShutdownResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodesShutdownResponse.java index b9a78596a08..81e7572ea44 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodesShutdownResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodesShutdownResponse.java @@ -19,8 +19,7 @@ package org.elasticsearch.action.admin.cluster.node.shutdown; -import org.elasticsearch.action.support.nodes.NodeOperationResponse; -import org.elasticsearch.action.support.nodes.NodesOperationResponse; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; @@ -31,44 +30,49 @@ import java.io.IOException; /** * @author kimchy (shay.banon) */ -public class NodesShutdownResponse extends NodesOperationResponse { +public class NodesShutdownResponse implements ActionResponse { + + private ClusterName clusterName; + + private DiscoveryNode[] nodes; NodesShutdownResponse() { } - public NodesShutdownResponse(ClusterName clusterName, NodeShutdownResponse[] nodes) { - super(clusterName, nodes); + public NodesShutdownResponse(ClusterName clusterName, DiscoveryNode[] nodes) { + this.clusterName = clusterName; + this.nodes = nodes; + } + + public ClusterName clusterName() { + return this.clusterName; + } + + public ClusterName getClusterName() { + return clusterName(); + } + + public DiscoveryNode[] nodes() { + return this.nodes; + } + + public DiscoveryNode[] getNodes() { + return nodes(); } @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - nodes = new NodeShutdownResponse[in.readVInt()]; + clusterName = ClusterName.readClusterName(in); + nodes = new DiscoveryNode[in.readVInt()]; for (int i = 0; i < nodes.length; i++) { - nodes[i] = NodeShutdownResponse.readNodeShutdownResponse(in); + nodes[i] = DiscoveryNode.readNode(in); } } @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + clusterName.writeTo(out); out.writeVInt(nodes.length); - for (NodeShutdownResponse node : nodes) { + for (DiscoveryNode node : nodes) { node.writeTo(out); } } - - public static class NodeShutdownResponse extends NodeOperationResponse { - - NodeShutdownResponse() { - } - - public NodeShutdownResponse(DiscoveryNode node) { - super(node); - } - - public static NodeShutdownResponse readNodeShutdownResponse(StreamInput in) throws IOException { - NodeShutdownResponse res = new NodeShutdownResponse(); - res.readFrom(in); - return res; - } - } -} \ No newline at end of file +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportNodesShutdownAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportNodesShutdownAction.java index 460de3d294f..935914ef336 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportNodesShutdownAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportNodesShutdownAction.java @@ -21,143 +21,236 @@ package org.elasticsearch.action.admin.cluster.node.shutdown; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalStateException; +import org.elasticsearch.action.Actions; import org.elasticsearch.action.TransportActions; -import org.elasticsearch.action.support.nodes.NodeOperationRequest; -import org.elasticsearch.action.support.nodes.TransportNodesOperationAction; +import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.VoidStreamable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.Node; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.*; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceArray; - -import static org.elasticsearch.common.collect.Lists.*; -import static org.elasticsearch.common.unit.TimeValue.*; +import java.util.Set; +import java.util.concurrent.CountDownLatch; /** * @author kimchy (shay.banon) */ -public class TransportNodesShutdownAction extends TransportNodesOperationAction { +public class TransportNodesShutdownAction extends TransportMasterNodeOperationAction { private final Node node; + private final ClusterName clusterName; + private final boolean disabled; - @Inject public TransportNodesShutdownAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, - ClusterService clusterService, TransportService transportService, - Node node) { - super(settings, clusterName, threadPool, clusterService, transportService); + private final TimeValue delay; + + @Inject public TransportNodesShutdownAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + Node node, ClusterName clusterName) { + super(settings, transportService, clusterService, threadPool); this.node = node; - disabled = componentSettings.getAsBoolean("disabled", false); + this.clusterName = clusterName; + this.disabled = componentSettings.getAsBoolean("disabled", false); + this.delay = componentSettings.getAsTime("delay", TimeValue.timeValueMillis(200)); + + this.transportService.registerHandler(NodeShutdownRequestHandler.ACTION, new NodeShutdownRequestHandler()); } @Override protected String transportAction() { return TransportActions.Admin.Cluster.Node.SHUTDOWN; } - @Override protected String transportNodeAction() { - return "/cluster/nodes/shutdown/node"; - } - - @Override protected NodesShutdownResponse newResponse(NodesShutdownRequest nodesShutdownRequest, AtomicReferenceArray responses) { - final List nodeShutdownResponses = newArrayList(); - for (int i = 0; i < responses.length(); i++) { - Object resp = responses.get(i); - if (resp instanceof NodesShutdownResponse.NodeShutdownResponse) { - nodeShutdownResponses.add((NodesShutdownResponse.NodeShutdownResponse) resp); - } - } - return new NodesShutdownResponse(clusterName, nodeShutdownResponses.toArray(new NodesShutdownResponse.NodeShutdownResponse[nodeShutdownResponses.size()])); - } - @Override protected NodesShutdownRequest newRequest() { return new NodesShutdownRequest(); } - @Override protected NodeShutdownRequest newNodeRequest() { - return new NodeShutdownRequest(); + @Override protected NodesShutdownResponse newResponse() { + return new NodesShutdownResponse(); } - @Override protected NodeShutdownRequest newNodeRequest(String nodeId, NodesShutdownRequest request) { - return new NodeShutdownRequest(nodeId, request.delay); + @Override protected void processBeforeDelegationToMaster(NodesShutdownRequest request, ClusterState state) { + String[] nodesIds = request.nodesIds; + if (nodesIds != null) { + for (int i = 0; i < nodesIds.length; i++) { + // replace the _local one, since it looses its meaning when going over to the master... + if ("_local".equals(nodesIds[i])) { + nodesIds[i] = state.nodes().localNodeId(); + } + } + } } - @Override protected NodesShutdownResponse.NodeShutdownResponse newNodeResponse() { - return new NodesShutdownResponse.NodeShutdownResponse(); - } - - @Override protected NodesShutdownResponse.NodeShutdownResponse nodeOperation(final NodeShutdownRequest request) throws ElasticSearchException { + @Override protected NodesShutdownResponse masterOperation(final NodesShutdownRequest request, final ClusterState state) throws ElasticSearchException { if (disabled) { throw new ElasticSearchIllegalStateException("Shutdown is disabled"); } - logger.info("Shutting down in [{}]", request.delay); - Thread t = new Thread(new Runnable() { - @Override public void run() { - try { - Thread.sleep(request.delay.millis()); - } catch (InterruptedException e) { - // ignore - } - boolean shutdownWithWrapper = false; - if (System.getProperty("elasticsearch-service") != null) { + Set nodes = Sets.newHashSet(); + if (Actions.isAllNodes(request.nodesIds)) { + logger.info("[cluster_shutdown]: requested, shutting down in [{}]", request.delay); + nodes.addAll(state.nodes().nodes().values()); + Thread t = new Thread(new Runnable() { + @Override public void run() { try { - Class wrapperManager = settings.getClassLoader().loadClass("org.tanukisoftware.wrapper.WrapperManager"); - logger.info("Initiating requested shutdown (using service)"); - wrapperManager.getMethod("stopAndReturn", int.class).invoke(null, 0); - shutdownWithWrapper = true; - } catch (Throwable e) { - e.printStackTrace(); + Thread.sleep(request.delay.millis()); + } catch (InterruptedException e) { + // ignore + } + // first, stop the cluster service + logger.trace("[cluster_shutdown]: stopping the cluster service so no re-routing will occur"); + clusterService.stop(); + + final CountDownLatch latch = new CountDownLatch(state.nodes().size()); + for (final DiscoveryNode node : state.nodes()) { + if (node.id().equals(state.nodes().masterNodeId())) { + // don't shutdown the master yet... + latch.countDown(); + } else { + logger.trace("[cluster_shutdown]: sending shutdown request to [{}]", node); + transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, VoidStreamable.INSTANCE, new VoidTransportResponseHandler() { + @Override public void handleResponse(VoidStreamable response) { + logger.trace("[cluster_shutdown]: received shutdown response from [{}]", node); + latch.countDown(); + } + + @Override public void handleException(RemoteTransportException exp) { + logger.warn("[cluster_shutdown]: received failed shutdown response from [{}]", exp, node); + latch.countDown(); + } + }); + } } - } - if (!shutdownWithWrapper) { - logger.info("Initiating requested shutdown"); try { - node.close(); - } catch (Exception e) { - logger.warn("Failed to shutdown", e); - } finally { - // make sure we initiate the shutdown hooks, so the Bootstrap#main thread will exit - System.exit(0); + latch.await(); + } catch (InterruptedException e) { + // ignore } + logger.info("[cluster_shutdown]: done shutting done all nodes except master, proceeding to master"); + + // now, kill the master + logger.trace("[cluster_shutdown]: shutting down the master [{}]", state.nodes().masterNode()); + transportService.sendRequest(state.nodes().masterNode(), NodeShutdownRequestHandler.ACTION, VoidStreamable.INSTANCE, new VoidTransportResponseHandler() { + @Override public void handleResponse(VoidStreamable response) { + logger.trace("[cluster_shutdown]: received shutdown response from master"); + } + + @Override public void handleException(RemoteTransportException exp) { + logger.warn("[cluster_shutdown]: received failed shutdown response master", exp); + } + }); + } + }); + t.start(); + } else { + final String[] nodesIds = Actions.buildNodesIds(state.nodes(), request.nodesIds); + logger.info("[partial_cluster_shutdown]: requested, shutting down [{}] in [{}]", nodesIds, request.delay); + + for (String nodeId : nodesIds) { + final DiscoveryNode node = state.nodes().get(nodeId); + if (node != null) { + nodes.add(node); } } - }); - t.start(); - return new NodesShutdownResponse.NodeShutdownResponse(clusterService.state().nodes().localNode()); + + Thread t = new Thread(new Runnable() { + @Override public void run() { + try { + Thread.sleep(request.delay.millis()); + } catch (InterruptedException e) { + // ignore + } + + final CountDownLatch latch = new CountDownLatch(nodesIds.length); + for (String nodeId : nodesIds) { + final DiscoveryNode node = state.nodes().get(nodeId); + if (node == null) { + logger.warn("[partial_cluster_shutdown]: no node to shutdown for node_id [{}]", nodeId); + latch.countDown(); + continue; + } + + logger.trace("[partial_cluster_shutdown]: sending shutdown request to [{}]", node); + transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, VoidStreamable.INSTANCE, new VoidTransportResponseHandler() { + @Override public void handleResponse(VoidStreamable response) { + logger.trace("[partial_cluster_shutdown]: received shutdown response from [{}]", node); + latch.countDown(); + } + + @Override public void handleException(RemoteTransportException exp) { + logger.warn("[partial_cluster_shutdown]: received failed shutdown response from [{}]", exp, node); + latch.countDown(); + } + }); + } + + try { + latch.await(); + } catch (InterruptedException e) { + // ignore + } + + logger.info("[partial_cluster_shutdown]: done shutting down [{}]", nodesIds); + } + }); + t.start(); + } + return new NodesShutdownResponse(clusterName, nodes.toArray(new DiscoveryNode[nodes.size()])); } - @Override protected boolean accumulateExceptions() { - return false; - } + private class NodeShutdownRequestHandler extends BaseTransportRequestHandler { - protected static class NodeShutdownRequest extends NodeOperationRequest { + static final String ACTION = "/cluster/nodes/shutdown/node"; - TimeValue delay; - - private NodeShutdownRequest() { + @Override public VoidStreamable newInstance() { + return VoidStreamable.INSTANCE; } - private NodeShutdownRequest(String nodeId, TimeValue delay) { - super(nodeId); - this.delay = delay; - } + @Override public void messageReceived(VoidStreamable request, TransportChannel channel) throws Exception { + if (disabled) { + throw new ElasticSearchIllegalStateException("Shutdown is disabled"); + } + logger.info("shutting down in [{}]", delay); + Thread t = new Thread(new Runnable() { + @Override public void run() { + try { + Thread.sleep(delay.millis()); + } catch (InterruptedException e) { + // ignore + } + boolean shutdownWithWrapper = false; + if (System.getProperty("elasticsearch-service") != null) { + try { + Class wrapperManager = settings.getClassLoader().loadClass("org.tanukisoftware.wrapper.WrapperManager"); + logger.info("initiating requested shutdown (using service)"); + wrapperManager.getMethod("stopAndReturn", int.class).invoke(null, 0); + shutdownWithWrapper = true; + } catch (Throwable e) { + e.printStackTrace(); + } + } + if (!shutdownWithWrapper) { + logger.info("initiating requested shutdown..."); + try { + node.close(); + } catch (Exception e) { + logger.warn("Failed to shutdown", e); + } finally { + // make sure we initiate the shutdown hooks, so the Bootstrap#main thread will exit + System.exit(0); + } + } + } + }); + t.start(); - @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - delay = readTimeValue(in); - } - - @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - delay.writeTo(out); + channel.sendResponse(VoidStreamable.INSTANCE); } } -} \ No newline at end of file +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java index b4b9de00717..eae3b7d9bff 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -62,7 +62,7 @@ public class TransportClusterStateAction extends TransportMasterNodeOperationAct return new ClusterStateResponse(); } - @Override protected ClusterStateResponse masterOperation(ClusterStateRequest request) throws ElasticSearchException { + @Override protected ClusterStateResponse masterOperation(ClusterStateRequest request, ClusterState state) throws ElasticSearchException { ClusterState currentState = clusterService.state(); ClusterState.Builder builder = newClusterStateBuilder(); if (!request.filterNodes()) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java index 56f6a21120f..63b7242d8c6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java @@ -63,7 +63,7 @@ public class TransportIndicesAliasesAction extends TransportMasterNodeOperationA } } - @Override protected IndicesAliasesResponse masterOperation(IndicesAliasesRequest request) throws ElasticSearchException { + @Override protected IndicesAliasesResponse masterOperation(IndicesAliasesRequest request, ClusterState state) throws ElasticSearchException { MetaDataService.IndicesAliasesResult indicesAliasesResult = metaDataService.indicesAliases(request.aliasActions()); return new IndicesAliasesResponse(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java index 2884a550e8e..30fedfa73be 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java @@ -62,7 +62,7 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi state.blocks().indexBlockedRaiseException(ClusterBlockLevel.METADATA, request.index()); } - @Override protected CreateIndexResponse masterOperation(CreateIndexRequest request) throws ElasticSearchException { + @Override protected CreateIndexResponse masterOperation(CreateIndexRequest request, ClusterState state) throws ElasticSearchException { String cause = request.cause(); if (cause.length() == 0) { cause = "api"; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java index 174ad24141c..4b25ad214b7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java @@ -62,7 +62,7 @@ public class TransportDeleteIndexAction extends TransportMasterNodeOperationActi state.blocks().indexBlockedRaiseException(ClusterBlockLevel.METADATA, request.index()); } - @Override protected DeleteIndexResponse masterOperation(DeleteIndexRequest request) throws ElasticSearchException { + @Override protected DeleteIndexResponse masterOperation(DeleteIndexRequest request, ClusterState state) throws ElasticSearchException { MetaDataService.DeleteIndexResult deleteIndexResult = metaDataService.deleteIndex(request.index(), request.timeout()); return new DeleteIndexResponse(deleteIndexResult.acknowledged()); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java index 52e5f82da97..18f0e7aa930 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java @@ -68,7 +68,7 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio } } - @Override protected PutMappingResponse masterOperation(PutMappingRequest request) throws ElasticSearchException { + @Override protected PutMappingResponse masterOperation(PutMappingRequest request, ClusterState state) throws ElasticSearchException { ClusterState clusterState = clusterService.state(); // update to concrete indices diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java index 760f7f14a82..abec904b21e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java @@ -34,7 +34,7 @@ import org.elasticsearch.transport.*; /** * A base class for operations that needs to be performed on the master node. * - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public abstract class TransportMasterNodeOperationAction extends BaseAction { @@ -59,20 +59,25 @@ public abstract class TransportMasterNodeOperationAction listener) { - DiscoveryNodes nodes = clusterService.state().nodes(); + final ClusterState clusterState = clusterService.state(); + DiscoveryNodes nodes = clusterState.nodes(); if (nodes.localNodeMaster()) { threadPool.execute(new Runnable() { @Override public void run() { try { - checkBlock(request, clusterService.state()); - Response response = masterOperation(request); + checkBlock(request, clusterState); + Response response = masterOperation(request, clusterState); listener.onResponse(response); } catch (Exception e) { listener.onFailure(e); @@ -83,6 +88,7 @@ public abstract class TransportMasterNodeOperationAction() { @Override public Response newInstance() { return newResponse(); @@ -106,9 +112,10 @@ public abstract class TransportMasterNodeOperationAction() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java index 454e6ee0c6d..731b6406309 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.support.nodes; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.Actions; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.NoSuchNodeException; import org.elasticsearch.action.support.BaseAction; @@ -108,21 +109,7 @@ public abstract class TransportNodesOperationAction(this.nodesIds.length); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index 0be21e0d1b5..a90cbcf020a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaDataService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; @@ -63,7 +64,7 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction() { - @Override public void onResponse(NodesShutdownResponse result) { + @Override public void onResponse(NodesShutdownResponse response) { try { XContentBuilder builder = restContentBuilder(request); builder.startObject(); - builder.field("cluster_name", result.clusterName().value()); + builder.field("cluster_name", response.clusterName().value()); builder.startObject("nodes"); - for (NodesShutdownResponse.NodeShutdownResponse nodeInfo : result) { - builder.startObject(nodeInfo.node().id()); - builder.field("name", nodeInfo.node().name()); + for (DiscoveryNode node : response.nodes()) { + builder.startObject(node.id()); + builder.field("name", node.name()); builder.endObject(); } builder.endObject();