From 39c606c59a2e7e2a71d067c54316b47650450332 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Tue, 29 Oct 2013 11:09:51 +0100 Subject: [PATCH] Refactored open/close index api to make use of the new recently introduced generic ack mechanism Closes #4169 --- .../CloseIndexClusterStateUpdateRequest.java | 42 ++++ .../indices/close/CloseIndexRequest.java | 38 +--- .../close/CloseIndexRequestBuilder.java | 25 +-- .../indices/close/CloseIndexResponse.java | 16 +- .../close/TransportCloseIndexAction.java | 16 +- .../OpenIndexClusterStateUpdateRequest.java | 42 ++++ .../admin/indices/open/OpenIndexRequest.java | 37 +--- .../indices/open/OpenIndexRequestBuilder.java | 25 +-- .../admin/indices/open/OpenIndexResponse.java | 16 +- .../open/TransportOpenIndexAction.java | 16 +- .../cluster/ClusterChangedEvent.java | 19 -- .../elasticsearch/cluster/ClusterModule.java | 1 - .../index/NodeIndicesStateUpdatedAction.java | 150 -------------- .../metadata/MetaDataIndexStateService.java | 190 +++++++----------- .../cluster/IndicesClusterStateService.java | 18 +- .../indices/close/RestCloseIndexAction.java | 41 +--- .../indices/open/RestOpenIndexAction.java | 41 +--- .../elasticsearch/cluster/ack/AckTests.java | 56 ++++++ .../indices/state/OpenCloseIndexTests.java | 46 +---- 19 files changed, 263 insertions(+), 572 deletions(-) create mode 100644 src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java create mode 100644 src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexClusterStateUpdateRequest.java delete mode 100644 src/main/java/org/elasticsearch/cluster/action/index/NodeIndicesStateUpdatedAction.java diff --git a/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java new file mode 100644 index 00000000000..dc8537c6f76 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java @@ -0,0 +1,42 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.action.admin.indices.close; + +import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; + +/** + * Cluster state update request that allows to close one or more indices + */ +public class CloseIndexClusterStateUpdateRequest extends ClusterStateUpdateRequest { + + private String[] indices; + + CloseIndexClusterStateUpdateRequest() { + + } + + public String[] indices() { + return indices; + } + + public CloseIndexClusterStateUpdateRequest indices(String[] indices) { + this.indices = indices; + return this; + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java index aea253427b0..93c0b13fc21 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java @@ -21,24 +21,20 @@ package org.elasticsearch.action.admin.indices.close; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.IgnoreIndices; -import org.elasticsearch.action.support.master.MasterNodeOperationRequest; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; import static org.elasticsearch.action.ValidateActions.addValidationError; -import static org.elasticsearch.common.unit.TimeValue.readTimeValue; -import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; /** * A request to close an index. */ -public class CloseIndexRequest extends MasterNodeOperationRequest { +public class CloseIndexRequest extends AcknowledgedRequest { private String[] indices; - private TimeValue timeout = timeValueSeconds(10); private IgnoreIndices ignoreIndices = IgnoreIndices.DEFAULT; CloseIndexRequest() { @@ -78,32 +74,6 @@ public class CloseIndexRequest extends MasterNodeOperationRequest10s. - */ - TimeValue timeout() { - return timeout; - } - - /** - * Timeout to wait for the index closure to be acknowledged by current cluster nodes. Defaults - * to 10s. - */ - public CloseIndexRequest timeout(TimeValue timeout) { - this.timeout = timeout; - return this; - } - - /** - * Timeout to wait for the index closure to be acknowledged by current cluster nodes. Defaults - * to 10s. - */ - public CloseIndexRequest timeout(String timeout) { - return timeout(TimeValue.parseTimeValue(timeout, null)); - } - - /** * Specifies what type of requested indices to ignore. For example indices that don't exist. * @return the desired behaviour regarding indices to ignore @@ -126,7 +96,7 @@ public class CloseIndexRequest extends MasterNodeOperationRequest { +public class CloseIndexRequestBuilder extends AcknowledgedRequestBuilder { public CloseIndexRequestBuilder(IndicesAdminClient indicesClient) { super((InternalIndicesAdminClient) indicesClient, new CloseIndexRequest()); @@ -49,24 +48,6 @@ public class CloseIndexRequestBuilder extends MasterNodeOperationRequestBuilder< return this; } - /** - * Timeout to wait for the operation to be acknowledged by current cluster nodes. Defaults - * to 10s. - */ - public CloseIndexRequestBuilder setTimeout(TimeValue timeout) { - request.timeout(timeout); - return this; - } - - /** - * Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults - * to 10s. - */ - public CloseIndexRequestBuilder setTimeout(String timeout) { - request.timeout(timeout); - return this; - } - /** * Specifies what type of requested indices to ignore. For example indices that don't exist. * @param ignoreIndices the desired behaviour regarding indices to ignore diff --git a/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java index 4b7ca5532c1..685e0cc11cd 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.admin.indices.close; -import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -28,30 +28,24 @@ import java.io.IOException; /** * A response for a close index action. */ -public class CloseIndexResponse extends ActionResponse { - - private boolean acknowledged; +public class CloseIndexResponse extends AcknowledgedResponse { CloseIndexResponse() { } CloseIndexResponse(boolean acknowledged) { - this.acknowledged = acknowledged; - } - - public boolean isAcknowledged() { - return acknowledged; + super(acknowledged); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - acknowledged = in.readBoolean(); + readAcknowledged(in, null); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeBoolean(acknowledged); + writeAcknowledged(out, null); } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java b/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java index f625d7ddab5..0f2ecdf2278 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java @@ -25,6 +25,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; @@ -34,7 +36,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; /** - * Delete index action. + * Close index action */ public class TransportCloseIndexAction extends TransportMasterNodeOperationAction { @@ -94,10 +96,16 @@ public class TransportCloseIndexAction extends TransportMasterNodeOperationActio @Override protected void masterOperation(final CloseIndexRequest request, final ClusterState state, final ActionListener listener) throws ElasticSearchException { - indexStateService.closeIndex(new MetaDataIndexStateService.Request(request.indices()).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataIndexStateService.Listener() { + + CloseIndexClusterStateUpdateRequest updateRequest = new CloseIndexClusterStateUpdateRequest() + .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) + .indices(request.indices()); + + indexStateService.closeIndex(updateRequest, new ClusterStateUpdateListener() { + @Override - public void onResponse(MetaDataIndexStateService.Response response) { - listener.onResponse(new CloseIndexResponse(response.acknowledged())); + public void onResponse(ClusterStateUpdateResponse response) { + listener.onResponse(new CloseIndexResponse(response.isAcknowledged())); } @Override diff --git a/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexClusterStateUpdateRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexClusterStateUpdateRequest.java new file mode 100644 index 00000000000..3d80627597e --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexClusterStateUpdateRequest.java @@ -0,0 +1,42 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.action.admin.indices.open; + +import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; + +/** + * Cluster state update request that allows to open one or more indices + */ +public class OpenIndexClusterStateUpdateRequest extends ClusterStateUpdateRequest { + + private String[] indices; + + OpenIndexClusterStateUpdateRequest() { + + } + + public String[] indices() { + return indices; + } + + public OpenIndexClusterStateUpdateRequest indices(String[] indices) { + this.indices = indices; + return this; + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java index dd9151b124f..0b7f579fda1 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java @@ -21,24 +21,20 @@ package org.elasticsearch.action.admin.indices.open; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.IgnoreIndices; -import org.elasticsearch.action.support.master.MasterNodeOperationRequest; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; import static org.elasticsearch.action.ValidateActions.addValidationError; -import static org.elasticsearch.common.unit.TimeValue.readTimeValue; -import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; /** * A request to open an index. */ -public class OpenIndexRequest extends MasterNodeOperationRequest { +public class OpenIndexRequest extends AcknowledgedRequest { private String[] indices; - private TimeValue timeout = timeValueSeconds(10); private IgnoreIndices ignoreIndices = IgnoreIndices.DEFAULT; OpenIndexRequest() { @@ -78,31 +74,6 @@ public class OpenIndexRequest extends MasterNodeOperationRequest10s. - */ - TimeValue timeout() { - return timeout; - } - - /** - * Timeout to wait for the index opening to be acknowledged by current cluster nodes. Defaults - * to 10s. - */ - public OpenIndexRequest timeout(TimeValue timeout) { - this.timeout = timeout; - return this; - } - - /** - * Timeout to wait for the index opening to be acknowledged by current cluster nodes. Defaults - * to 10s. - */ - public OpenIndexRequest timeout(String timeout) { - return timeout(TimeValue.parseTimeValue(timeout, null)); - } - /** * Specifies what type of requested indices to ignore. For example indices that don't exist. * @return the current behaviour when it comes to index names @@ -125,7 +96,7 @@ public class OpenIndexRequest extends MasterNodeOperationRequest { +public class OpenIndexRequestBuilder extends AcknowledgedRequestBuilder { public OpenIndexRequestBuilder(IndicesAdminClient indicesClient) { super((InternalIndicesAdminClient) indicesClient, new OpenIndexRequest()); @@ -49,24 +48,6 @@ public class OpenIndexRequestBuilder extends MasterNodeOperationRequestBuilder10s. - */ - public OpenIndexRequestBuilder setTimeout(TimeValue timeout) { - request.timeout(timeout); - return this; - } - - /** - * Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults - * to 10s. - */ - public OpenIndexRequestBuilder setTimeout(String timeout) { - request.timeout(timeout); - return this; - } - /** * Specifies what type of requested indices to ignore. For example indices that don't exist. * @param ignoreIndices the desired behaviour regarding indices to ignore diff --git a/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexResponse.java index 097aa765a24..0a8ce844aea 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexResponse.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.admin.indices.open; -import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -28,30 +28,24 @@ import java.io.IOException; /** * A response for a open index action. */ -public class OpenIndexResponse extends ActionResponse { - - private boolean acknowledged; +public class OpenIndexResponse extends AcknowledgedResponse { OpenIndexResponse() { } OpenIndexResponse(boolean acknowledged) { - this.acknowledged = acknowledged; - } - - public boolean isAcknowledged() { - return acknowledged; + super(acknowledged); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - acknowledged = in.readBoolean(); + readAcknowledged(in, null); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeBoolean(acknowledged); + writeAcknowledged(out, null); } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java b/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java index 3d771253a3b..e0de3a05329 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java @@ -24,6 +24,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; @@ -33,7 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; /** - * Delete index action. + * Open index action */ public class TransportOpenIndexAction extends TransportMasterNodeOperationAction { @@ -80,10 +82,16 @@ public class TransportOpenIndexAction extends TransportMasterNodeOperationAction @Override protected void masterOperation(final OpenIndexRequest request, final ClusterState state, final ActionListener listener) throws ElasticSearchException { - indexStateService.openIndex(new MetaDataIndexStateService.Request(request.indices()).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataIndexStateService.Listener() { + + OpenIndexClusterStateUpdateRequest updateRequest = new OpenIndexClusterStateUpdateRequest() + .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) + .indices(request.indices()); + + indexStateService.openIndex(updateRequest, new ClusterStateUpdateListener() { + @Override - public void onResponse(MetaDataIndexStateService.Response response) { - listener.onResponse(new OpenIndexResponse(response.acknowledged())); + public void onResponse(ClusterStateUpdateResponse response) { + listener.onResponse(new OpenIndexResponse(response.isAcknowledged())); } @Override diff --git a/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java b/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java index 737496e723b..248c7dc4606 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java @@ -26,7 +26,6 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import java.util.List; -import java.util.Map; /** * @@ -162,22 +161,4 @@ public class ClusterChangedEvent { public boolean nodesChanged() { return nodesRemoved() || nodesAdded(); } - - public boolean indicesStateChanged() { - if (metaDataChanged()) { - Map indices = state.metaData().indices(); - Map previousIndices = previousState.metaData().indices(); - - for (Map.Entry entry : indices.entrySet()) { - IndexMetaData indexMetaData = entry.getValue(); - IndexMetaData previousIndexMetaData = previousIndices.get(entry.getKey()); - if (previousIndexMetaData != null - && indexMetaData.state() != previousIndexMetaData.state()) { - return true; - } - } - } - - return false; - } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 9863ec0c271..491deadf79a 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -76,7 +76,6 @@ public class ClusterModule extends AbstractModule implements SpawnModules { bind(NodeMappingCreatedAction.class).asEagerSingleton(); bind(NodeMappingRefreshAction.class).asEagerSingleton(); bind(MappingUpdatedAction.class).asEagerSingleton(); - bind(NodeIndicesStateUpdatedAction.class).asEagerSingleton(); bind(ClusterInfoService.class).to(InternalClusterInfoService.class).asEagerSingleton(); } diff --git a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndicesStateUpdatedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndicesStateUpdatedAction.java deleted file mode 100644 index 5a01f8ef24e..00000000000 --- a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndicesStateUpdatedAction.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.elasticsearch.cluster.action.index; - -import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -public class NodeIndicesStateUpdatedAction extends AbstractComponent { - - private final ThreadPool threadPool; - private final TransportService transportService; - private final List listeners = new CopyOnWriteArrayList(); - - @Inject - public NodeIndicesStateUpdatedAction(Settings settings, ThreadPool threadPool, TransportService transportService) { - super(settings); - this.threadPool = threadPool; - this.transportService = transportService; - transportService.registerHandler(NodeIndexStateUpdatedTransportHandler.ACTION, new NodeIndexStateUpdatedTransportHandler()); - } - - public void add(final Listener listener, TimeValue timeout) { - listeners.add(listener); - threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() { - @Override - public void run() { - boolean removed = listeners.remove(listener); - if (removed) { - listener.onTimeout(); - } - } - }); - } - - public void remove(Listener listener) { - listeners.remove(listener); - } - - public void nodeIndexStateUpdated(final ClusterState state, final NodeIndexStateUpdatedResponse response) throws ElasticSearchException { - DiscoveryNodes nodes = state.nodes(); - if (nodes.localNodeMaster()) { - threadPool.generic().execute(new Runnable() { - @Override - public void run() { - innerNodeIndexStateUpdated(response); - } - }); - } else { - transportService.sendRequest(state.nodes().masterNode(), - NodeIndexStateUpdatedTransportHandler.ACTION, response, EmptyTransportResponseHandler.INSTANCE_SAME); - } - } - - private void innerNodeIndexStateUpdated(NodeIndexStateUpdatedResponse response) { - for (Listener listener : listeners) { - listener.onIndexStateUpdated(response); - } - } - - private class NodeIndexStateUpdatedTransportHandler extends BaseTransportRequestHandler { - - static final String ACTION = "cluster/nodeIndexStateUpdated"; - - @Override - public NodeIndexStateUpdatedResponse newInstance() { - return new NodeIndexStateUpdatedResponse(); - } - - @Override - public void messageReceived(NodeIndexStateUpdatedResponse response, TransportChannel channel) throws Exception { - innerNodeIndexStateUpdated(response); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - } - - public static interface Listener { - void onIndexStateUpdated(NodeIndexStateUpdatedResponse response); - - void onTimeout(); - } - - public static class NodeIndexStateUpdatedResponse extends TransportRequest { - private String nodeId; - private long version; - - NodeIndexStateUpdatedResponse() { - } - - public NodeIndexStateUpdatedResponse(String nodeId, long version) { - this.nodeId = nodeId; - this.version = version; - } - - public String nodeId() { - return nodeId; - } - - public long version() { - return version; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(nodeId); - out.writeLong(version); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - nodeId = in.readString(); - version = in.readLong(); - } - } -} diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 7fde00def97..765183de3f8 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -20,25 +20,28 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.ElasticSearchIllegalArgumentException; -import org.elasticsearch.action.support.master.MasterNodeOperationRequest; +import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; -import org.elasticsearch.cluster.action.index.NodeIndicesStateUpdatedAction; +import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndexPrimaryShardNotAllocatedException; @@ -49,7 +52,7 @@ import java.util.Arrays; import java.util.List; /** - * + * Service responsible for submitting open/close index requests */ public class MetaDataIndexStateService extends AbstractComponent { @@ -59,27 +62,44 @@ public class MetaDataIndexStateService extends AbstractComponent { private final AllocationService allocationService; - private final NodeIndicesStateUpdatedAction indicesStateUpdatedAction; - @Inject - public MetaDataIndexStateService(Settings settings, ClusterService clusterService, AllocationService allocationService, NodeIndicesStateUpdatedAction indicesStateUpdatedAction) { + public MetaDataIndexStateService(Settings settings, ClusterService clusterService, AllocationService allocationService) { super(settings); this.clusterService = clusterService; this.allocationService = allocationService; - this.indicesStateUpdatedAction = indicesStateUpdatedAction; } - public void closeIndex(final Request request, final Listener listener) { - if (request.indices == null || request.indices.length == 0) { + public void closeIndex(final CloseIndexClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) { + if (request.indices() == null || request.indices().length == 0) { throw new ElasticSearchIllegalArgumentException("Index name is required"); } - final String indicesAsString = Arrays.toString(request.indices); - clusterService.submitStateUpdateTask("close-indices " + indicesAsString, Priority.URGENT, new TimeoutClusterStateUpdateTask() { + final String indicesAsString = Arrays.toString(request.indices()); + clusterService.submitStateUpdateTask("close-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask() { + + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked(@Nullable Throwable t) { + listener.onResponse(new ClusterStateUpdateResponse(true)); + } + + @Override + public void onAckTimeout() { + listener.onResponse(new ClusterStateUpdateResponse(false)); + } + + @Override + public TimeValue ackTimeout() { + return request.ackTimeout(); + } @Override public TimeValue timeout() { - return request.masterTimeout; + return request.masterNodeTimeout(); } @Override @@ -90,7 +110,7 @@ public class MetaDataIndexStateService extends AbstractComponent { @Override public ClusterState execute(ClusterState currentState) { List indicesToClose = new ArrayList(); - for (String index : request.indices) { + for (String index : request.indices()) { IndexMetaData indexMetaData = currentState.metaData().index(index); if (indexMetaData == null) { throw new IndexMissingException(new Index(index)); @@ -129,34 +149,48 @@ public class MetaDataIndexStateService extends AbstractComponent { } RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(rtBuilder).build()); - - ClusterState newClusterState = ClusterState.builder(updatedState).routingResult(routingResult).build(); - - waitForOtherNodes(newClusterState, listener, request.timeout); - - return newClusterState; + //no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask + return ClusterState.builder(updatedState).routingResult(routingResult).build(); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (oldState == newState) { - // we didn't do anything, callback - listener.onResponse(new Response(true)); - } + } }); } - public void openIndex(final Request request, final Listener listener) { - if (request.indices == null || request.indices.length == 0) { + public void openIndex(final OpenIndexClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) { + if (request.indices() == null || request.indices().length == 0) { throw new ElasticSearchIllegalArgumentException("Index name is required"); } - final String indicesAsString = Arrays.toString(request.indices); - clusterService.submitStateUpdateTask("open-indices " + indicesAsString, Priority.URGENT, new TimeoutClusterStateUpdateTask() { + final String indicesAsString = Arrays.toString(request.indices()); + clusterService.submitStateUpdateTask("open-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask() { + + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked(@Nullable Throwable t) { + listener.onResponse(new ClusterStateUpdateResponse(true)); + } + + @Override + public void onAckTimeout() { + listener.onResponse(new ClusterStateUpdateResponse(false)); + } + + @Override + public TimeValue ackTimeout() { + return request.ackTimeout(); + } + @Override public TimeValue timeout() { - return request.masterTimeout; + return request.masterNodeTimeout(); } @Override @@ -167,7 +201,7 @@ public class MetaDataIndexStateService extends AbstractComponent { @Override public ClusterState execute(ClusterState currentState) { List indicesToOpen = new ArrayList(); - for (String index : request.indices) { + for (String index : request.indices()) { IndexMetaData indexMetaData = currentState.metaData().index(index); if (indexMetaData == null) { throw new IndexMissingException(new Index(index)); @@ -199,103 +233,15 @@ public class MetaDataIndexStateService extends AbstractComponent { } RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(rtBuilder).build()); - - ClusterState newClusterState = ClusterState.builder(updatedState).routingResult(routingResult).build(); - - waitForOtherNodes(newClusterState, listener, request.timeout); - - return newClusterState; - + //no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask + return ClusterState.builder(updatedState).routingResult(routingResult).build(); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (oldState == newState) { - // we didn't do anything, callback - listener.onResponse(new Response(true)); - } + } }); } - private void waitForOtherNodes(ClusterState updatedState, Listener listener, TimeValue timeout) { - // wait for responses from other nodes if needed - int responseCount = updatedState.nodes().size(); - long version = updatedState.version() + 1; - logger.trace("waiting for [{}] notifications with version [{}]", responseCount, version); - indicesStateUpdatedAction.add(new CountDownListener(responseCount, listener, version), timeout); - } - - public static interface Listener { - - void onResponse(Response response); - - void onFailure(Throwable t); - } - - public static class Request { - - final String[] indices; - - TimeValue timeout = TimeValue.timeValueSeconds(10); - TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT; - - public Request(String[] indices) { - this.indices = indices; - } - - public Request timeout(TimeValue timeout) { - this.timeout = timeout; - return this; - } - - public Request masterTimeout(TimeValue masterTimeout) { - this.masterTimeout = masterTimeout; - return this; - } - } - - public static class Response { - private final boolean acknowledged; - - public Response(boolean acknowledged) { - this.acknowledged = acknowledged; - } - - public boolean acknowledged() { - return acknowledged; - } - } - - private class CountDownListener implements NodeIndicesStateUpdatedAction.Listener { - private final CountDown countDown; - private final Listener listener; - private final long version; - - public CountDownListener(int count, Listener listener, long version) { - this.countDown = new CountDown(count); - this.listener = listener; - this.version = version; - } - - @Override - public void onIndexStateUpdated(NodeIndicesStateUpdatedAction.NodeIndexStateUpdatedResponse response) { - if (version <= response.version()) { - logger.trace("Received NodeIndexStateUpdatedResponse with version [{}] from [{}]", response.version(), response.nodeId()); - if (countDown.countDown()) { - indicesStateUpdatedAction.remove(this); - logger.trace("NodeIndexStateUpdated was acknowledged by all expected nodes, returning"); - listener.onResponse(new Response(true)); - } - } - } - - @Override - public void onTimeout() { - if (countDown.fastForward()) { - indicesStateUpdatedAction.remove(this); - listener.onResponse(new Response(false)); - } - } - } } diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 79c6fa8a096..3d9ad984448 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -83,7 +83,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent() { - @Override - public void onResponse(CloseIndexResponse response) { - try { - XContentBuilder builder = RestXContentBuilder.restContentBuilder(request); - builder.startObject() - .field(Fields.OK, true) - .field(Fields.ACKNOWLEDGED, response.isAcknowledged()) - .endObject(); - channel.sendResponse(new XContentRestResponse(request, OK, builder)); - } catch (IOException e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(new XContentThrowableRestResponse(request, e)); - } catch (IOException e1) { - logger.error("Failed to send failure response", e1); - } - } - }); - } - - static final class Fields { - static final XContentBuilderString OK = new XContentBuilderString("ok"); - static final XContentBuilderString ACKNOWLEDGED = new XContentBuilderString("acknowledged"); + client.admin().indices().close(closeIndexRequest, new AcknowledgedRestResponseActionListener(request, channel, logger)); } } diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/open/RestOpenIndexAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/open/RestOpenIndexAction.java index cab3577fb21..78c75c7e309 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/open/RestOpenIndexAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/open/RestOpenIndexAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.rest.action.admin.indices.open; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; import org.elasticsearch.action.support.IgnoreIndices; @@ -27,15 +26,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.rest.*; -import org.elasticsearch.rest.action.support.RestXContentBuilder; - -import java.io.IOException; - -import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; -import static org.elasticsearch.rest.RestStatus.OK; /** * @@ -53,39 +44,11 @@ public class RestOpenIndexAction extends BaseRestHandler { public void handleRequest(final RestRequest request, final RestChannel channel) { OpenIndexRequest openIndexRequest = new OpenIndexRequest(Strings.splitStringByCommaToArray(request.param("index"))); openIndexRequest.listenerThreaded(false); - openIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10))); + openIndexRequest.timeout(request.paramAsTime("timeout", openIndexRequest.timeout())); openIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", openIndexRequest.masterNodeTimeout())); if (request.hasParam("ignore_indices")) { openIndexRequest.ignoreIndices(IgnoreIndices.fromString(request.param("ignore_indices"))); } - client.admin().indices().open(openIndexRequest, new ActionListener() { - @Override - public void onResponse(OpenIndexResponse response) { - try { - XContentBuilder builder = RestXContentBuilder.restContentBuilder(request); - builder.startObject() - .field(Fields.OK, true) - .field(Fields.ACKNOWLEDGED, response.isAcknowledged()) - .endObject(); - channel.sendResponse(new XContentRestResponse(request, OK, builder)); - } catch (IOException e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(new XContentThrowableRestResponse(request, e)); - } catch (IOException e1) { - logger.error("Failed to send failure response", e1); - } - } - }); - } - - static final class Fields { - static final XContentBuilderString OK = new XContentBuilderString("ok"); - static final XContentBuilderString ACKNOWLEDGED = new XContentBuilderString("acknowledged"); + client.admin().indices().open(openIndexRequest, new AcknowledgedRestResponseActionListener(request, channel, logger)); } } diff --git a/src/test/java/org/elasticsearch/cluster/ack/AckTests.java b/src/test/java/org/elasticsearch/cluster/ack/AckTests.java index 71ae8160660..d6d8578e4fc 100644 --- a/src/test/java/org/elasticsearch/cluster/ack/AckTests.java +++ b/src/test/java/org/elasticsearch/cluster/ack/AckTests.java @@ -26,8 +26,10 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse; import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse; import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse; @@ -35,6 +37,7 @@ import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.common.settings.ImmutableSettings; @@ -412,4 +415,57 @@ public class AckTests extends ElasticsearchIntegrationTest { IndicesAliasesResponse indicesAliasesResponse = client().admin().indices().prepareAliases().addAlias("test", "alias").setTimeout("0s").get(); assertThat(indicesAliasesResponse.isAcknowledged(), equalTo(false)); } + + public void testCloseIndexAcknowledgement() { + createIndex("test"); + ensureGreen(); + + CloseIndexResponse closeIndexResponse = client().admin().indices().prepareClose("test").execute().actionGet(); + assertThat(closeIndexResponse.isAcknowledged(), equalTo(true)); + + for (Client client : clients()) { + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setLocal(true).execute().actionGet(); + IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().indices().get("test"); + assertThat(indexMetaData.getState(), equalTo(IndexMetaData.State.CLOSE)); + } + } + + @Test + public void testCloseIndexNoAcknowledgement() { + createIndex("test"); + ensureGreen(); + + CloseIndexResponse closeIndexResponse= client().admin().indices().prepareClose("test").setTimeout("0s").get(); + assertThat(closeIndexResponse.isAcknowledged(), equalTo(false)); + } + + @Test + public void testOpenIndexAcknowledgement() { + createIndex("test"); + ensureGreen(); + + CloseIndexResponse closeIndexResponse = client().admin().indices().prepareClose("test").execute().actionGet(); + assertThat(closeIndexResponse.isAcknowledged(), equalTo(true)); + + OpenIndexResponse openIndexResponse= client().admin().indices().prepareOpen("test").execute().actionGet(); + assertThat(openIndexResponse.isAcknowledged(), equalTo(true)); + + for (Client client : clients()) { + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setLocal(true).execute().actionGet(); + IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().indices().get("test"); + assertThat(indexMetaData.getState(), equalTo(IndexMetaData.State.OPEN)); + } + } + + @Test + public void testOpenIndexNoAcknowledgement() { + createIndex("test"); + ensureGreen(); + + CloseIndexResponse closeIndexResponse = client().admin().indices().prepareClose("test").execute().actionGet(); + assertThat(closeIndexResponse.isAcknowledged(), equalTo(true)); + + OpenIndexResponse openIndexResponse = client().admin().indices().prepareOpen("test").setTimeout("0s").get(); + assertThat(openIndexResponse.isAcknowledged(), equalTo(false)); + } } diff --git a/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexTests.java b/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexTests.java index 43c4b596d87..213570dda94 100644 --- a/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexTests.java +++ b/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexTests.java @@ -28,10 +28,8 @@ import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; import org.elasticsearch.action.support.IgnoreIndices; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.junit.Test; import static org.hamcrest.Matchers.equalTo; @@ -253,7 +251,7 @@ public class OpenCloseIndexTests extends ElasticsearchIntegrationTest { assertIndexIsOpened("test1"); } - @Test @TestLogging("cluster.metadata:TRACE") + @Test public void testCloseOpenAliasMultipleIndices() { Client client = client(); createIndex("test1", "test2"); @@ -274,25 +272,6 @@ public class OpenCloseIndexTests extends ElasticsearchIntegrationTest { assertIndexIsOpened("test1", "test2"); } - @Test - public void testSimpleCloseOpenAcknowledged() { - createIndex("test1"); - ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); - assertThat(healthResponse.isTimedOut(), equalTo(false)); - - CloseIndexResponse closeIndexResponse = client().admin().indices().prepareClose("test1").execute().actionGet(); - assertThat(closeIndexResponse.isAcknowledged(), equalTo(true)); - assertIndexIsClosedOnAllNodes("test1"); - - OpenIndexResponse openIndexResponse = client().admin().indices().prepareOpen("test1").execute().actionGet(); - assertThat(openIndexResponse.isAcknowledged(), equalTo(true)); - assertIndexIsOpenedOnAllNodes("test1"); - - //we now set the timeout to 0, which means not wait for acknowledgement from other nodes - closeIndexResponse = client().admin().indices().prepareClose("test1").setTimeout(TimeValue.timeValueMillis(0)).execute().actionGet(); - assertThat(closeIndexResponse.isAcknowledged(), equalTo(false)); - } - private void assertIndexIsOpened(String... indices) { checkIndexState(IndexMetaData.State.OPEN, indices); } @@ -301,31 +280,10 @@ public class OpenCloseIndexTests extends ElasticsearchIntegrationTest { checkIndexState(IndexMetaData.State.CLOSE, indices); } - private void assertIndexIsOpenedOnAllNodes(String... indices) { - checkIndexStateOnAllNodes(IndexMetaData.State.OPEN, indices); - } - - private void assertIndexIsClosedOnAllNodes(String... indices) { - checkIndexStateOnAllNodes(IndexMetaData.State.CLOSE, indices); - } - - private void checkIndexStateOnAllNodes(IndexMetaData.State state, String... indices) { - //we explicitly check the cluster state on all nodes forcing the local execution - // we want to make sure that acknowledged true means that all the nodes already hold the updated cluster state - for (Client client : clients()) { - ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setLocal(true).execute().actionGet(); - checkIndexState(state, clusterStateResponse, indices); - } - } - private void checkIndexState(IndexMetaData.State expectedState, String... indices) { ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().execute().actionGet(); - checkIndexState(expectedState, clusterStateResponse, indices); - } - - private void checkIndexState(IndexMetaData.State expectedState, ClusterStateResponse clusterState, String... indices) { for (String index : indices) { - IndexMetaData indexMetaData = clusterState.getState().metaData().indices().get(index); + IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().indices().get(index); assertThat(indexMetaData, notNullValue()); assertThat(indexMetaData.getState(), equalTo(expectedState)); }