From c23c5d2494aafc4416a98e6768a1f3a3022cd011 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Sat, 27 Jul 2013 10:42:02 +0200 Subject: [PATCH] Added support for acknowledgement from other nodes in open/close index api The open/close index api now waits for an acknowledgement from all the other nodes before returning its response, till the timeout (configurable, default 10 secs) expires. The returned acknowledged flag reflects whether the cluster state change was acknowledged by all the nodes or the timeout expired before. Closes #3400 --- .../indices/close/CloseIndexRequest.java | 8 +- .../admin/indices/open/OpenIndexRequest.java | 8 +- .../cluster/ClusterChangedEvent.java | 22 ++- .../elasticsearch/cluster/ClusterModule.java | 1 + .../index/NodeIndicesStateUpdatedAction.java | 154 ++++++++++++++++++ .../metadata/MetaDataStateIndexService.java | 74 ++++++++- .../cluster/IndicesClusterStateService.java | 12 +- .../AbstractSharedClusterTest.java | 4 + .../indices/state/OpenCloseIndexTests.java | 43 ++++- 9 files changed, 307 insertions(+), 19 deletions(-) create mode 100644 src/main/java/org/elasticsearch/cluster/action/index/NodeIndicesStateUpdatedAction.java diff --git a/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java index ab524de7a37..aea253427b0 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java @@ -45,7 +45,7 @@ public class CloseIndexRequest extends MasterNodeOperationRequest10s. */ TimeValue timeout() { @@ -87,7 +87,7 @@ public class CloseIndexRequest extends MasterNodeOperationRequest10s. */ public CloseIndexRequest timeout(TimeValue timeout) { @@ -96,7 +96,7 @@ public class CloseIndexRequest extends MasterNodeOperationRequest10s. */ public CloseIndexRequest timeout(String timeout) { diff --git a/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java index ee79644359c..dd9151b124f 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java @@ -45,7 +45,7 @@ public class OpenIndexRequest extends MasterNodeOperationRequest10s. */ TimeValue timeout() { @@ -87,7 +87,7 @@ public class OpenIndexRequest extends MasterNodeOperationRequest10s. */ public OpenIndexRequest timeout(TimeValue timeout) { @@ -96,7 +96,7 @@ public class OpenIndexRequest extends MasterNodeOperationRequest10s. */ public OpenIndexRequest timeout(String timeout) { diff --git a/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java b/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java index abf7a6af23f..305b539e25b 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java @@ -20,12 +20,14 @@ package org.elasticsearch.cluster; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import java.util.List; +import java.util.Map; /** * @@ -161,4 +163,22 @@ public class ClusterChangedEvent { public boolean nodesChanged() { return nodesRemoved() || nodesAdded(); } -} + + public boolean indicesStateChanged() { + if (metaDataChanged()) { + ImmutableMap indices = state.metaData().indices(); + ImmutableMap previousIndices = previousState.metaData().indices(); + + for (Map.Entry entry : indices.entrySet()) { + IndexMetaData indexMetaData = entry.getValue(); + IndexMetaData previousIndexMetaData = previousIndices.get(entry.getKey()); + if (previousIndexMetaData != null + && indexMetaData.state() != previousIndexMetaData.state()) { + return true; + } + } + } + + return false; + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 2adf89e5196..5df5395f96c 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -77,5 +77,6 @@ public class ClusterModule extends AbstractModule implements SpawnModules { bind(NodeMappingRefreshAction.class).asEagerSingleton(); bind(MappingUpdatedAction.class).asEagerSingleton(); bind(NodeAliasesUpdatedAction.class).asEagerSingleton(); + bind(NodeIndicesStateUpdatedAction.class).asEagerSingleton(); } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndicesStateUpdatedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndicesStateUpdatedAction.java new file mode 100644 index 00000000000..ad9394fae53 --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndicesStateUpdatedAction.java @@ -0,0 +1,154 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.cluster.action.index; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.*; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +public class NodeIndicesStateUpdatedAction extends AbstractComponent { + + private final ThreadPool threadPool; + + private final TransportService transportService; + + private final ClusterService clusterService; + + private final List listeners = new CopyOnWriteArrayList(); + + @Inject + public NodeIndicesStateUpdatedAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService) { + super(settings); + this.threadPool = threadPool; + this.transportService = transportService; + this.clusterService = clusterService; + transportService.registerHandler(NodeIndexStateUpdatedTransportHandler.ACTION, new NodeIndexStateUpdatedTransportHandler()); + } + + public void add(final Listener listener, TimeValue timeout) { + listeners.add(listener); + threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() { + @Override + public void run() { + boolean removed = listeners.remove(listener); + if (removed) { + listener.onTimeout(); + } + } + }); + } + + public void remove(Listener listener) { + listeners.remove(listener); + } + + public void nodeIndexStateUpdated(final NodeIndexStateUpdatedResponse response) throws ElasticSearchException { + DiscoveryNodes nodes = clusterService.state().nodes(); + if (nodes.localNodeMaster()) { + threadPool.generic().execute(new Runnable() { + @Override + public void run() { + innerNodeIndexStateUpdated(response); + } + }); + } else { + transportService.sendRequest(clusterService.state().nodes().masterNode(), + NodeIndexStateUpdatedTransportHandler.ACTION, response, EmptyTransportResponseHandler.INSTANCE_SAME); + } + } + + private void innerNodeIndexStateUpdated(NodeIndexStateUpdatedResponse response) { + for (Listener listener : listeners) { + listener.onIndexStateUpdated(response); + } + } + + private class NodeIndexStateUpdatedTransportHandler extends BaseTransportRequestHandler { + + static final String ACTION = "cluster/nodeIndexStateUpdated"; + + @Override + public NodeIndexStateUpdatedResponse newInstance() { + return new NodeIndexStateUpdatedResponse(); + } + + @Override + public void messageReceived(NodeIndexStateUpdatedResponse response, TransportChannel channel) throws Exception { + innerNodeIndexStateUpdated(response); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } + + public static interface Listener { + void onIndexStateUpdated(NodeIndexStateUpdatedResponse response); + void onTimeout(); + } + + public static class NodeIndexStateUpdatedResponse extends TransportRequest { + private String nodeId; + private long version; + + NodeIndexStateUpdatedResponse() { + } + + public NodeIndexStateUpdatedResponse(String nodeId, long version) { + this.nodeId = nodeId; + this.version = version; + } + + public String nodeId() { + return nodeId; + } + + public long version() { + return version; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(nodeId); + out.writeLong(version); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + nodeId = in.readString(); + version = in.readLong(); + } + } +} diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java index 9c9272ec842..4110b8890ef 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; +import org.elasticsearch.cluster.action.index.NodeIndicesStateUpdatedAction; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -45,6 +46,8 @@ import org.elasticsearch.rest.RestStatus; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** * @@ -57,11 +60,14 @@ public class MetaDataStateIndexService extends AbstractComponent { private final AllocationService allocationService; + private final NodeIndicesStateUpdatedAction indicesStateUpdatedAction; + @Inject - public MetaDataStateIndexService(Settings settings, ClusterService clusterService, AllocationService allocationService) { + public MetaDataStateIndexService(Settings settings, ClusterService clusterService, AllocationService allocationService, NodeIndicesStateUpdatedAction indicesStateUpdatedAction) { super(settings); this.clusterService = clusterService; this.allocationService = allocationService; + this.indicesStateUpdatedAction = indicesStateUpdatedAction; } public void closeIndex(final Request request, final Listener listener) { @@ -127,12 +133,19 @@ public class MetaDataStateIndexService extends AbstractComponent { RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder().state(updatedState).routingTable(rtBuilder).build()); - return ClusterState.builder().state(updatedState).routingResult(routingResult).build(); + ClusterState newClusterState = ClusterState.builder().state(updatedState).routingResult(routingResult).build(); + + waitForOtherNodes(newClusterState, listener, request.timeout); + + return newClusterState; } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new Response(true)); + if (oldState == newState) { + // we didn't do anything, callback + listener.onResponse(new Response(true)); + } } }); } @@ -192,16 +205,32 @@ public class MetaDataStateIndexService extends AbstractComponent { RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder().state(updatedState).routingTable(rtBuilder).build()); - return ClusterState.builder().state(updatedState).routingResult(routingResult).build(); + ClusterState newClusterState = ClusterState.builder().state(updatedState).routingResult(routingResult).build(); + + waitForOtherNodes(newClusterState, listener, request.timeout); + + return newClusterState; + } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new Response(true)); + if (oldState == newState) { + // we didn't do anything, callback + listener.onResponse(new Response(true)); + } } }); } + private void waitForOtherNodes(ClusterState updatedState, Listener listener, TimeValue timeout) { + // wait for responses from other nodes if needed + int responseCount = updatedState.nodes().size(); + long version = updatedState.version() + 1; + logger.trace("waiting for [{}] notifications with version [{}]", responseCount, version); + indicesStateUpdatedAction.add(new CountDownListener(responseCount, listener, version), timeout); + } + public static interface Listener { void onResponse(Response response); @@ -242,4 +271,39 @@ public class MetaDataStateIndexService extends AbstractComponent { return acknowledged; } } + + private class CountDownListener implements NodeIndicesStateUpdatedAction.Listener { + + private final AtomicBoolean notified = new AtomicBoolean(); + private final AtomicInteger countDown; + private final Listener listener; + private final long version; + + public CountDownListener(int countDown, Listener listener, long version) { + this.countDown = new AtomicInteger(countDown); + this.listener = listener; + this.version = version; + } + + @Override + public void onIndexStateUpdated(NodeIndicesStateUpdatedAction.NodeIndexStateUpdatedResponse response) { + if (version <= response.version()) { + logger.trace("Received NodeIndexStateUpdatedResponse with version [{}] from [{}]", response.version(), response.nodeId()); + if (countDown.decrementAndGet() == 0) { + indicesStateUpdatedAction.remove(this); + if (notified.compareAndSet(false, true)) { + listener.onResponse(new Response(true)); + } + } + } + } + + @Override + public void onTimeout() { + indicesStateUpdatedAction.remove(this); + if (notified.compareAndSet(false, true)) { + listener.onResponse(new Response(false)); + } + } + } } diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 11dd3f0a33f..82654cdf389 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -86,6 +86,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent clients() { + return cluster().clients(); + } + public ImmutableSettings.Builder randomSettingsBuilder() { // TODO RANDOMIZE return ImmutableSettings.builder(); diff --git a/src/test/java/org/elasticsearch/test/integration/indices/state/OpenCloseIndexTests.java b/src/test/java/org/elasticsearch/test/integration/indices/state/OpenCloseIndexTests.java index f2f5cd421b4..f5569aa7f7b 100644 --- a/src/test/java/org/elasticsearch/test/integration/indices/state/OpenCloseIndexTests.java +++ b/src/test/java/org/elasticsearch/test/integration/indices/state/OpenCloseIndexTests.java @@ -32,7 +32,6 @@ import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.test.integration.AbstractSharedClusterTest; import org.junit.Test; -import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -273,6 +272,21 @@ public class OpenCloseIndexTests extends AbstractSharedClusterTest { assertIndexIsOpened("test1", "test2"); } + @Test + public void testSimpleCloseOpenAcknowledged() { + createIndex("test1"); + ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + CloseIndexResponse closeIndexResponse = client().admin().indices().prepareClose("test1").execute().actionGet(); + assertThat(closeIndexResponse.isAcknowledged(), equalTo(true)); + assertIndexIsClosedOnAllNodes("test1"); + + OpenIndexResponse openIndexResponse = client().admin().indices().prepareOpen("test1").execute().actionGet(); + assertThat(openIndexResponse.isAcknowledged(), equalTo(true)); + assertIndexIsOpenedOnAllNodes("test1"); + } + private void assertIndexIsOpened(String... indices) { checkIndexState(IndexMetaData.State.OPEN, indices); } @@ -281,12 +295,33 @@ public class OpenCloseIndexTests extends AbstractSharedClusterTest { checkIndexState(IndexMetaData.State.CLOSE, indices); } - private void checkIndexState(IndexMetaData.State state, String... indices) { + private void assertIndexIsOpenedOnAllNodes(String... indices) { + checkIndexStateOnAllNodes(IndexMetaData.State.OPEN, indices); + } + + private void assertIndexIsClosedOnAllNodes(String... indices) { + checkIndexStateOnAllNodes(IndexMetaData.State.CLOSE, indices); + } + + private void checkIndexStateOnAllNodes(IndexMetaData.State state, String... indices) { + //we explicitly check the cluster state on all nodes forcing the local execution + // we want to make sure that acknowledged true means that all the nodes already hold the updated cluster state + for (Client client : clients()) { + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setLocal(true).execute().actionGet(); + checkIndexState(state, clusterStateResponse, indices); + } + } + + private void checkIndexState(IndexMetaData.State expectedState, String... indices) { ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().execute().actionGet(); + checkIndexState(expectedState, clusterStateResponse, indices); + } + + private void checkIndexState(IndexMetaData.State expectedState, ClusterStateResponse clusterState, String... indices) { for (String index : indices) { - IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().indices().get(index); + IndexMetaData indexMetaData = clusterState.getState().metaData().indices().get(index); assertThat(indexMetaData, notNullValue()); - assertThat(indexMetaData.getState(), equalTo(state)); + assertThat(indexMetaData.getState(), equalTo(expectedState)); } } } \ No newline at end of file