Added support for acknowledgement from other nodes in open/close index api

The open/close index api now waits for an acknowledgement from all the other nodes before returning its response, till the timeout (configurable, default 10 secs) expires. The returned acknowledged flag reflects whether the cluster state change was acknowledged by all the nodes or the timeout expired before.

Closes #3400
This commit is contained in:
Luca Cavanna 2013-07-27 10:42:02 +02:00
parent e0e6a58357
commit c23c5d2494
9 changed files with 307 additions and 19 deletions

View File

@ -45,7 +45,7 @@ public class CloseIndexRequest extends MasterNodeOperationRequest<CloseIndexRequ
}
/**
* Constructs a new delete index request for the specified index.
* Constructs a new close index request for the specified index.
*/
public CloseIndexRequest(String... indices) {
this.indices = indices;
@ -79,7 +79,7 @@ public class CloseIndexRequest extends MasterNodeOperationRequest<CloseIndexRequ
}
/**
* Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults
* Timeout to wait for the index closure to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
TimeValue timeout() {
@ -87,7 +87,7 @@ public class CloseIndexRequest extends MasterNodeOperationRequest<CloseIndexRequ
}
/**
* Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults
* Timeout to wait for the index closure to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public CloseIndexRequest timeout(TimeValue timeout) {
@ -96,7 +96,7 @@ public class CloseIndexRequest extends MasterNodeOperationRequest<CloseIndexRequ
}
/**
* Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults
* Timeout to wait for the index closure to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public CloseIndexRequest timeout(String timeout) {

View File

@ -45,7 +45,7 @@ public class OpenIndexRequest extends MasterNodeOperationRequest<OpenIndexReques
}
/**
* Constructs a new delete index request for the specified index.
* Constructs a new open index request for the specified index.
*/
public OpenIndexRequest(String... indices) {
this.indices = indices;
@ -79,7 +79,7 @@ public class OpenIndexRequest extends MasterNodeOperationRequest<OpenIndexReques
}
/**
* Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults
* Timeout to wait for the index opening to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
TimeValue timeout() {
@ -87,7 +87,7 @@ public class OpenIndexRequest extends MasterNodeOperationRequest<OpenIndexReques
}
/**
* Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults
* Timeout to wait for the index opening to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public OpenIndexRequest timeout(TimeValue timeout) {
@ -96,7 +96,7 @@ public class OpenIndexRequest extends MasterNodeOperationRequest<OpenIndexReques
}
/**
* Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults
* Timeout to wait for the index opening to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public OpenIndexRequest timeout(String timeout) {

View File

@ -20,12 +20,14 @@
package org.elasticsearch.cluster;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import java.util.List;
import java.util.Map;
/**
*
@ -161,4 +163,22 @@ public class ClusterChangedEvent {
public boolean nodesChanged() {
return nodesRemoved() || nodesAdded();
}
}
public boolean indicesStateChanged() {
if (metaDataChanged()) {
ImmutableMap<String,IndexMetaData> indices = state.metaData().indices();
ImmutableMap<String,IndexMetaData> previousIndices = previousState.metaData().indices();
for (Map.Entry<String, IndexMetaData> entry : indices.entrySet()) {
IndexMetaData indexMetaData = entry.getValue();
IndexMetaData previousIndexMetaData = previousIndices.get(entry.getKey());
if (previousIndexMetaData != null
&& indexMetaData.state() != previousIndexMetaData.state()) {
return true;
}
}
}
return false;
}
}

View File

@ -77,5 +77,6 @@ public class ClusterModule extends AbstractModule implements SpawnModules {
bind(NodeMappingRefreshAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton();
bind(NodeAliasesUpdatedAction.class).asEagerSingleton();
bind(NodeIndicesStateUpdatedAction.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.elasticsearch.cluster.action.index;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class NodeIndicesStateUpdatedAction extends AbstractComponent {
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterService clusterService;
private final List<Listener> listeners = new CopyOnWriteArrayList<Listener>();
@Inject
public NodeIndicesStateUpdatedAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterService = clusterService;
transportService.registerHandler(NodeIndexStateUpdatedTransportHandler.ACTION, new NodeIndexStateUpdatedTransportHandler());
}
public void add(final Listener listener, TimeValue timeout) {
listeners.add(listener);
threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
boolean removed = listeners.remove(listener);
if (removed) {
listener.onTimeout();
}
}
});
}
public void remove(Listener listener) {
listeners.remove(listener);
}
public void nodeIndexStateUpdated(final NodeIndexStateUpdatedResponse response) throws ElasticSearchException {
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
innerNodeIndexStateUpdated(response);
}
});
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
NodeIndexStateUpdatedTransportHandler.ACTION, response, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}
private void innerNodeIndexStateUpdated(NodeIndexStateUpdatedResponse response) {
for (Listener listener : listeners) {
listener.onIndexStateUpdated(response);
}
}
private class NodeIndexStateUpdatedTransportHandler extends BaseTransportRequestHandler<NodeIndexStateUpdatedResponse> {
static final String ACTION = "cluster/nodeIndexStateUpdated";
@Override
public NodeIndexStateUpdatedResponse newInstance() {
return new NodeIndexStateUpdatedResponse();
}
@Override
public void messageReceived(NodeIndexStateUpdatedResponse response, TransportChannel channel) throws Exception {
innerNodeIndexStateUpdated(response);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
public static interface Listener {
void onIndexStateUpdated(NodeIndexStateUpdatedResponse response);
void onTimeout();
}
public static class NodeIndexStateUpdatedResponse extends TransportRequest {
private String nodeId;
private long version;
NodeIndexStateUpdatedResponse() {
}
public NodeIndexStateUpdatedResponse(String nodeId, long version) {
this.nodeId = nodeId;
this.version = version;
}
public String nodeId() {
return nodeId;
}
public long version() {
return version;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(nodeId);
out.writeLong(version);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodeId = in.readString();
version = in.readLong();
}
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeIndicesStateUpdatedAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -45,6 +46,8 @@ import org.elasticsearch.rest.RestStatus;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
@ -57,11 +60,14 @@ public class MetaDataStateIndexService extends AbstractComponent {
private final AllocationService allocationService;
private final NodeIndicesStateUpdatedAction indicesStateUpdatedAction;
@Inject
public MetaDataStateIndexService(Settings settings, ClusterService clusterService, AllocationService allocationService) {
public MetaDataStateIndexService(Settings settings, ClusterService clusterService, AllocationService allocationService, NodeIndicesStateUpdatedAction indicesStateUpdatedAction) {
super(settings);
this.clusterService = clusterService;
this.allocationService = allocationService;
this.indicesStateUpdatedAction = indicesStateUpdatedAction;
}
public void closeIndex(final Request request, final Listener listener) {
@ -127,12 +133,19 @@ public class MetaDataStateIndexService extends AbstractComponent {
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder().state(updatedState).routingTable(rtBuilder).build());
return ClusterState.builder().state(updatedState).routingResult(routingResult).build();
ClusterState newClusterState = ClusterState.builder().state(updatedState).routingResult(routingResult).build();
waitForOtherNodes(newClusterState, listener, request.timeout);
return newClusterState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new Response(true));
if (oldState == newState) {
// we didn't do anything, callback
listener.onResponse(new Response(true));
}
}
});
}
@ -192,16 +205,32 @@ public class MetaDataStateIndexService extends AbstractComponent {
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder().state(updatedState).routingTable(rtBuilder).build());
return ClusterState.builder().state(updatedState).routingResult(routingResult).build();
ClusterState newClusterState = ClusterState.builder().state(updatedState).routingResult(routingResult).build();
waitForOtherNodes(newClusterState, listener, request.timeout);
return newClusterState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new Response(true));
if (oldState == newState) {
// we didn't do anything, callback
listener.onResponse(new Response(true));
}
}
});
}
private void waitForOtherNodes(ClusterState updatedState, Listener listener, TimeValue timeout) {
// wait for responses from other nodes if needed
int responseCount = updatedState.nodes().size();
long version = updatedState.version() + 1;
logger.trace("waiting for [{}] notifications with version [{}]", responseCount, version);
indicesStateUpdatedAction.add(new CountDownListener(responseCount, listener, version), timeout);
}
public static interface Listener {
void onResponse(Response response);
@ -242,4 +271,39 @@ public class MetaDataStateIndexService extends AbstractComponent {
return acknowledged;
}
}
private class CountDownListener implements NodeIndicesStateUpdatedAction.Listener {
private final AtomicBoolean notified = new AtomicBoolean();
private final AtomicInteger countDown;
private final Listener listener;
private final long version;
public CountDownListener(int countDown, Listener listener, long version) {
this.countDown = new AtomicInteger(countDown);
this.listener = listener;
this.version = version;
}
@Override
public void onIndexStateUpdated(NodeIndicesStateUpdatedAction.NodeIndexStateUpdatedResponse response) {
if (version <= response.version()) {
logger.trace("Received NodeIndexStateUpdatedResponse with version [{}] from [{}]", response.version(), response.nodeId());
if (countDown.decrementAndGet() == 0) {
indicesStateUpdatedAction.remove(this);
if (notified.compareAndSet(false, true)) {
listener.onResponse(new Response(true));
}
}
}
}
@Override
public void onTimeout() {
indicesStateUpdatedAction.remove(this);
if (notified.compareAndSet(false, true)) {
listener.onResponse(new Response(false));
}
}
}
}

View File

@ -86,6 +86,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final NodeMappingCreatedAction nodeMappingCreatedAction;
private final NodeMappingRefreshAction nodeMappingRefreshAction;
private final NodeAliasesUpdatedAction nodeAliasesUpdatedAction;
private final NodeIndicesStateUpdatedAction nodeIndicesStateUpdatedAction;
// a map of mappings type we have seen per index due to cluster state
// we need this so we won't remove types automatically created as part of the indexing process
@ -101,7 +102,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
ShardStateAction shardStateAction,
NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction,
NodeMappingCreatedAction nodeMappingCreatedAction, NodeMappingRefreshAction nodeMappingRefreshAction,
NodeAliasesUpdatedAction nodeAliasesUpdatedAction) {
NodeAliasesUpdatedAction nodeAliasesUpdatedAction, NodeIndicesStateUpdatedAction nodeIndicesStateUpdatedAction) {
super(settings);
this.indicesService = indicesService;
this.clusterService = clusterService;
@ -113,6 +114,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
this.nodeMappingCreatedAction = nodeMappingCreatedAction;
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
this.nodeAliasesUpdatedAction = nodeAliasesUpdatedAction;
this.nodeIndicesStateUpdatedAction = nodeIndicesStateUpdatedAction;
}
@Override
@ -167,6 +169,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
applyCleanedIndices(event);
applySettings(event);
sendIndexLifecycleEvents(event);
notifyIndicesStateChanged(event);
}
}
@ -187,6 +190,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
private void notifyIndicesStateChanged(final ClusterChangedEvent event) {
//handles open/close index notifications
if (event.indicesStateChanged()) {
nodeIndicesStateUpdatedAction.nodeIndexStateUpdated(new NodeIndicesStateUpdatedAction.NodeIndexStateUpdatedResponse(event.state().nodes().localNodeId(), event.state().version()));
}
}
private void applyCleanedIndices(final ClusterChangedEvent event) {
// handle closed indices, since they are not allocated on a node once they are closed
// so applyDeletedIndices might not take them into account

View File

@ -127,6 +127,10 @@ public abstract class AbstractSharedClusterTest extends ElasticsearchTestCase {
return cluster().client();
}
public static Iterable<Client> clients() {
return cluster().clients();
}
public ImmutableSettings.Builder randomSettingsBuilder() {
// TODO RANDOMIZE
return ImmutableSettings.builder();

View File

@ -32,7 +32,6 @@ import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.junit.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
@ -273,6 +272,21 @@ public class OpenCloseIndexTests extends AbstractSharedClusterTest {
assertIndexIsOpened("test1", "test2");
}
@Test
public void testSimpleCloseOpenAcknowledged() {
createIndex("test1");
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
CloseIndexResponse closeIndexResponse = client().admin().indices().prepareClose("test1").execute().actionGet();
assertThat(closeIndexResponse.isAcknowledged(), equalTo(true));
assertIndexIsClosedOnAllNodes("test1");
OpenIndexResponse openIndexResponse = client().admin().indices().prepareOpen("test1").execute().actionGet();
assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
assertIndexIsOpenedOnAllNodes("test1");
}
private void assertIndexIsOpened(String... indices) {
checkIndexState(IndexMetaData.State.OPEN, indices);
}
@ -281,12 +295,33 @@ public class OpenCloseIndexTests extends AbstractSharedClusterTest {
checkIndexState(IndexMetaData.State.CLOSE, indices);
}
private void checkIndexState(IndexMetaData.State state, String... indices) {
private void assertIndexIsOpenedOnAllNodes(String... indices) {
checkIndexStateOnAllNodes(IndexMetaData.State.OPEN, indices);
}
private void assertIndexIsClosedOnAllNodes(String... indices) {
checkIndexStateOnAllNodes(IndexMetaData.State.CLOSE, indices);
}
private void checkIndexStateOnAllNodes(IndexMetaData.State state, String... indices) {
//we explicitly check the cluster state on all nodes forcing the local execution
// we want to make sure that acknowledged true means that all the nodes already hold the updated cluster state
for (Client client : clients()) {
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setLocal(true).execute().actionGet();
checkIndexState(state, clusterStateResponse, indices);
}
}
private void checkIndexState(IndexMetaData.State expectedState, String... indices) {
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().execute().actionGet();
checkIndexState(expectedState, clusterStateResponse, indices);
}
private void checkIndexState(IndexMetaData.State expectedState, ClusterStateResponse clusterState, String... indices) {
for (String index : indices) {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().indices().get(index);
IndexMetaData indexMetaData = clusterState.getState().metaData().indices().get(index);
assertThat(indexMetaData, notNullValue());
assertThat(indexMetaData.getState(), equalTo(state));
assertThat(indexMetaData.getState(), equalTo(expectedState));
}
}
}