Refactored open/close index api to make use of the new recently introduced generic ack mechanism
Closes #4169
This commit is contained in:
parent
8377f05408
commit
39c606c59a
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.close;
|
||||
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
|
||||
|
||||
/**
|
||||
* Cluster state update request that allows to close one or more indices
|
||||
*/
|
||||
public class CloseIndexClusterStateUpdateRequest extends ClusterStateUpdateRequest<CloseIndexClusterStateUpdateRequest> {
|
||||
|
||||
private String[] indices;
|
||||
|
||||
CloseIndexClusterStateUpdateRequest() {
|
||||
|
||||
}
|
||||
|
||||
public String[] indices() {
|
||||
return indices;
|
||||
}
|
||||
|
||||
public CloseIndexClusterStateUpdateRequest indices(String[] indices) {
|
||||
this.indices = indices;
|
||||
return this;
|
||||
}
|
||||
}
|
|
@ -21,24 +21,20 @@ package org.elasticsearch.action.admin.indices.close;
|
|||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.IgnoreIndices;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||
|
||||
/**
|
||||
* A request to close an index.
|
||||
*/
|
||||
public class CloseIndexRequest extends MasterNodeOperationRequest<CloseIndexRequest> {
|
||||
public class CloseIndexRequest extends AcknowledgedRequest<CloseIndexRequest> {
|
||||
|
||||
private String[] indices;
|
||||
private TimeValue timeout = timeValueSeconds(10);
|
||||
private IgnoreIndices ignoreIndices = IgnoreIndices.DEFAULT;
|
||||
|
||||
CloseIndexRequest() {
|
||||
|
@ -78,32 +74,6 @@ public class CloseIndexRequest extends MasterNodeOperationRequest<CloseIndexRequ
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Timeout to wait for the index closure to be acknowledged by current cluster nodes. Defaults
|
||||
* to <tt>10s</tt>.
|
||||
*/
|
||||
TimeValue timeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Timeout to wait for the index closure to be acknowledged by current cluster nodes. Defaults
|
||||
* to <tt>10s</tt>.
|
||||
*/
|
||||
public CloseIndexRequest timeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Timeout to wait for the index closure to be acknowledged by current cluster nodes. Defaults
|
||||
* to <tt>10s</tt>.
|
||||
*/
|
||||
public CloseIndexRequest timeout(String timeout) {
|
||||
return timeout(TimeValue.parseTimeValue(timeout, null));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Specifies what type of requested indices to ignore. For example indices that don't exist.
|
||||
* @return the desired behaviour regarding indices to ignore
|
||||
|
@ -126,7 +96,7 @@ public class CloseIndexRequest extends MasterNodeOperationRequest<CloseIndexRequ
|
|||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
indices = in.readStringArray();
|
||||
timeout = readTimeValue(in);
|
||||
readTimeout(in, null);
|
||||
ignoreIndices = IgnoreIndices.fromId(in.readByte());
|
||||
}
|
||||
|
||||
|
@ -134,7 +104,7 @@ public class CloseIndexRequest extends MasterNodeOperationRequest<CloseIndexRequ
|
|||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeStringArray(indices);
|
||||
timeout.writeTo(out);
|
||||
writeTimeout(out, null);
|
||||
out.writeByte(ignoreIndices.id());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,15 +21,14 @@ package org.elasticsearch.action.admin.indices.close;
|
|||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.IgnoreIndices;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.client.internal.InternalIndicesAdminClient;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
/**
|
||||
*
|
||||
* Builder for close index request
|
||||
*/
|
||||
public class CloseIndexRequestBuilder extends MasterNodeOperationRequestBuilder<CloseIndexRequest, CloseIndexResponse, CloseIndexRequestBuilder> {
|
||||
public class CloseIndexRequestBuilder extends AcknowledgedRequestBuilder<CloseIndexRequest, CloseIndexResponse, CloseIndexRequestBuilder> {
|
||||
|
||||
public CloseIndexRequestBuilder(IndicesAdminClient indicesClient) {
|
||||
super((InternalIndicesAdminClient) indicesClient, new CloseIndexRequest());
|
||||
|
@ -49,24 +48,6 @@ public class CloseIndexRequestBuilder extends MasterNodeOperationRequestBuilder<
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Timeout to wait for the operation to be acknowledged by current cluster nodes. Defaults
|
||||
* to <tt>10s</tt>.
|
||||
*/
|
||||
public CloseIndexRequestBuilder setTimeout(TimeValue timeout) {
|
||||
request.timeout(timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults
|
||||
* to <tt>10s</tt>.
|
||||
*/
|
||||
public CloseIndexRequestBuilder setTimeout(String timeout) {
|
||||
request.timeout(timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specifies what type of requested indices to ignore. For example indices that don't exist.
|
||||
* @param ignoreIndices the desired behaviour regarding indices to ignore
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.close;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
|
@ -28,30 +28,24 @@ import java.io.IOException;
|
|||
/**
|
||||
* A response for a close index action.
|
||||
*/
|
||||
public class CloseIndexResponse extends ActionResponse {
|
||||
|
||||
private boolean acknowledged;
|
||||
public class CloseIndexResponse extends AcknowledgedResponse {
|
||||
|
||||
CloseIndexResponse() {
|
||||
}
|
||||
|
||||
CloseIndexResponse(boolean acknowledged) {
|
||||
this.acknowledged = acknowledged;
|
||||
}
|
||||
|
||||
public boolean isAcknowledged() {
|
||||
return acknowledged;
|
||||
super(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
acknowledged = in.readBoolean();
|
||||
readAcknowledged(in, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeBoolean(acknowledged);
|
||||
writeAcknowledged(out, null);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,8 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
|
||||
|
@ -34,7 +36,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
/**
|
||||
* Delete index action.
|
||||
* Close index action
|
||||
*/
|
||||
public class TransportCloseIndexAction extends TransportMasterNodeOperationAction<CloseIndexRequest, CloseIndexResponse> {
|
||||
|
||||
|
@ -94,10 +96,16 @@ public class TransportCloseIndexAction extends TransportMasterNodeOperationActio
|
|||
|
||||
@Override
|
||||
protected void masterOperation(final CloseIndexRequest request, final ClusterState state, final ActionListener<CloseIndexResponse> listener) throws ElasticSearchException {
|
||||
indexStateService.closeIndex(new MetaDataIndexStateService.Request(request.indices()).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataIndexStateService.Listener() {
|
||||
|
||||
CloseIndexClusterStateUpdateRequest updateRequest = new CloseIndexClusterStateUpdateRequest()
|
||||
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
|
||||
.indices(request.indices());
|
||||
|
||||
indexStateService.closeIndex(updateRequest, new ClusterStateUpdateListener() {
|
||||
|
||||
@Override
|
||||
public void onResponse(MetaDataIndexStateService.Response response) {
|
||||
listener.onResponse(new CloseIndexResponse(response.acknowledged()));
|
||||
public void onResponse(ClusterStateUpdateResponse response) {
|
||||
listener.onResponse(new CloseIndexResponse(response.isAcknowledged()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.open;
|
||||
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
|
||||
|
||||
/**
|
||||
* Cluster state update request that allows to open one or more indices
|
||||
*/
|
||||
public class OpenIndexClusterStateUpdateRequest extends ClusterStateUpdateRequest<OpenIndexClusterStateUpdateRequest> {
|
||||
|
||||
private String[] indices;
|
||||
|
||||
OpenIndexClusterStateUpdateRequest() {
|
||||
|
||||
}
|
||||
|
||||
public String[] indices() {
|
||||
return indices;
|
||||
}
|
||||
|
||||
public OpenIndexClusterStateUpdateRequest indices(String[] indices) {
|
||||
this.indices = indices;
|
||||
return this;
|
||||
}
|
||||
}
|
|
@ -21,24 +21,20 @@ package org.elasticsearch.action.admin.indices.open;
|
|||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.IgnoreIndices;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||
|
||||
/**
|
||||
* A request to open an index.
|
||||
*/
|
||||
public class OpenIndexRequest extends MasterNodeOperationRequest<OpenIndexRequest> {
|
||||
public class OpenIndexRequest extends AcknowledgedRequest<OpenIndexRequest> {
|
||||
|
||||
private String[] indices;
|
||||
private TimeValue timeout = timeValueSeconds(10);
|
||||
private IgnoreIndices ignoreIndices = IgnoreIndices.DEFAULT;
|
||||
|
||||
OpenIndexRequest() {
|
||||
|
@ -78,31 +74,6 @@ public class OpenIndexRequest extends MasterNodeOperationRequest<OpenIndexReques
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Timeout to wait for the index opening to be acknowledged by current cluster nodes. Defaults
|
||||
* to <tt>10s</tt>.
|
||||
*/
|
||||
TimeValue timeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Timeout to wait for the index opening to be acknowledged by current cluster nodes. Defaults
|
||||
* to <tt>10s</tt>.
|
||||
*/
|
||||
public OpenIndexRequest timeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Timeout to wait for the index opening to be acknowledged by current cluster nodes. Defaults
|
||||
* to <tt>10s</tt>.
|
||||
*/
|
||||
public OpenIndexRequest timeout(String timeout) {
|
||||
return timeout(TimeValue.parseTimeValue(timeout, null));
|
||||
}
|
||||
|
||||
/**
|
||||
* Specifies what type of requested indices to ignore. For example indices that don't exist.
|
||||
* @return the current behaviour when it comes to index names
|
||||
|
@ -125,7 +96,7 @@ public class OpenIndexRequest extends MasterNodeOperationRequest<OpenIndexReques
|
|||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
indices = in.readStringArray();
|
||||
timeout = readTimeValue(in);
|
||||
readTimeout(in, null);
|
||||
ignoreIndices = IgnoreIndices.fromId(in.readByte());
|
||||
}
|
||||
|
||||
|
@ -133,7 +104,7 @@ public class OpenIndexRequest extends MasterNodeOperationRequest<OpenIndexReques
|
|||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeStringArray(indices);
|
||||
timeout.writeTo(out);
|
||||
writeTimeout(out, null);
|
||||
out.writeByte(ignoreIndices.id());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,15 +21,14 @@ package org.elasticsearch.action.admin.indices.open;
|
|||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.IgnoreIndices;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.client.internal.InternalIndicesAdminClient;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
/**
|
||||
*
|
||||
* Builder for for open index request
|
||||
*/
|
||||
public class OpenIndexRequestBuilder extends MasterNodeOperationRequestBuilder<OpenIndexRequest, OpenIndexResponse, OpenIndexRequestBuilder> {
|
||||
public class OpenIndexRequestBuilder extends AcknowledgedRequestBuilder<OpenIndexRequest, OpenIndexResponse, OpenIndexRequestBuilder> {
|
||||
|
||||
public OpenIndexRequestBuilder(IndicesAdminClient indicesClient) {
|
||||
super((InternalIndicesAdminClient) indicesClient, new OpenIndexRequest());
|
||||
|
@ -49,24 +48,6 @@ public class OpenIndexRequestBuilder extends MasterNodeOperationRequestBuilder<O
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Timeout to wait for the operation to be acknowledged by current cluster nodes. Defaults
|
||||
* to <tt>10s</tt>.
|
||||
*/
|
||||
public OpenIndexRequestBuilder setTimeout(TimeValue timeout) {
|
||||
request.timeout(timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults
|
||||
* to <tt>10s</tt>.
|
||||
*/
|
||||
public OpenIndexRequestBuilder setTimeout(String timeout) {
|
||||
request.timeout(timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specifies what type of requested indices to ignore. For example indices that don't exist.
|
||||
* @param ignoreIndices the desired behaviour regarding indices to ignore
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.open;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
|
@ -28,30 +28,24 @@ import java.io.IOException;
|
|||
/**
|
||||
* A response for a open index action.
|
||||
*/
|
||||
public class OpenIndexResponse extends ActionResponse {
|
||||
|
||||
private boolean acknowledged;
|
||||
public class OpenIndexResponse extends AcknowledgedResponse {
|
||||
|
||||
OpenIndexResponse() {
|
||||
}
|
||||
|
||||
OpenIndexResponse(boolean acknowledged) {
|
||||
this.acknowledged = acknowledged;
|
||||
}
|
||||
|
||||
public boolean isAcknowledged() {
|
||||
return acknowledged;
|
||||
super(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
acknowledged = in.readBoolean();
|
||||
readAcknowledged(in, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeBoolean(acknowledged);
|
||||
writeAcknowledged(out, null);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,8 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
|
||||
|
@ -33,7 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
/**
|
||||
* Delete index action.
|
||||
* Open index action
|
||||
*/
|
||||
public class TransportOpenIndexAction extends TransportMasterNodeOperationAction<OpenIndexRequest, OpenIndexResponse> {
|
||||
|
||||
|
@ -80,10 +82,16 @@ public class TransportOpenIndexAction extends TransportMasterNodeOperationAction
|
|||
|
||||
@Override
|
||||
protected void masterOperation(final OpenIndexRequest request, final ClusterState state, final ActionListener<OpenIndexResponse> listener) throws ElasticSearchException {
|
||||
indexStateService.openIndex(new MetaDataIndexStateService.Request(request.indices()).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataIndexStateService.Listener() {
|
||||
|
||||
OpenIndexClusterStateUpdateRequest updateRequest = new OpenIndexClusterStateUpdateRequest()
|
||||
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
|
||||
.indices(request.indices());
|
||||
|
||||
indexStateService.openIndex(updateRequest, new ClusterStateUpdateListener() {
|
||||
|
||||
@Override
|
||||
public void onResponse(MetaDataIndexStateService.Response response) {
|
||||
listener.onResponse(new OpenIndexResponse(response.acknowledged()));
|
||||
public void onResponse(ClusterStateUpdateResponse response) {
|
||||
listener.onResponse(new OpenIndexResponse(response.isAcknowledged()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -162,22 +161,4 @@ public class ClusterChangedEvent {
|
|||
public boolean nodesChanged() {
|
||||
return nodesRemoved() || nodesAdded();
|
||||
}
|
||||
|
||||
public boolean indicesStateChanged() {
|
||||
if (metaDataChanged()) {
|
||||
Map<String,IndexMetaData> indices = state.metaData().indices();
|
||||
Map<String,IndexMetaData> previousIndices = previousState.metaData().indices();
|
||||
|
||||
for (Map.Entry<String, IndexMetaData> entry : indices.entrySet()) {
|
||||
IndexMetaData indexMetaData = entry.getValue();
|
||||
IndexMetaData previousIndexMetaData = previousIndices.get(entry.getKey());
|
||||
if (previousIndexMetaData != null
|
||||
&& indexMetaData.state() != previousIndexMetaData.state()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -76,7 +76,6 @@ public class ClusterModule extends AbstractModule implements SpawnModules {
|
|||
bind(NodeMappingCreatedAction.class).asEagerSingleton();
|
||||
bind(NodeMappingRefreshAction.class).asEagerSingleton();
|
||||
bind(MappingUpdatedAction.class).asEagerSingleton();
|
||||
bind(NodeIndicesStateUpdatedAction.class).asEagerSingleton();
|
||||
|
||||
bind(ClusterInfoService.class).to(InternalClusterInfoService.class).asEagerSingleton();
|
||||
}
|
||||
|
|
|
@ -1,150 +0,0 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.action.index;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
public class NodeIndicesStateUpdatedAction extends AbstractComponent {
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
private final TransportService transportService;
|
||||
private final List<Listener> listeners = new CopyOnWriteArrayList<Listener>();
|
||||
|
||||
@Inject
|
||||
public NodeIndicesStateUpdatedAction(Settings settings, ThreadPool threadPool, TransportService transportService) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.transportService = transportService;
|
||||
transportService.registerHandler(NodeIndexStateUpdatedTransportHandler.ACTION, new NodeIndexStateUpdatedTransportHandler());
|
||||
}
|
||||
|
||||
public void add(final Listener listener, TimeValue timeout) {
|
||||
listeners.add(listener);
|
||||
threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
boolean removed = listeners.remove(listener);
|
||||
if (removed) {
|
||||
listener.onTimeout();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void remove(Listener listener) {
|
||||
listeners.remove(listener);
|
||||
}
|
||||
|
||||
public void nodeIndexStateUpdated(final ClusterState state, final NodeIndexStateUpdatedResponse response) throws ElasticSearchException {
|
||||
DiscoveryNodes nodes = state.nodes();
|
||||
if (nodes.localNodeMaster()) {
|
||||
threadPool.generic().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
innerNodeIndexStateUpdated(response);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
transportService.sendRequest(state.nodes().masterNode(),
|
||||
NodeIndexStateUpdatedTransportHandler.ACTION, response, EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||
}
|
||||
}
|
||||
|
||||
private void innerNodeIndexStateUpdated(NodeIndexStateUpdatedResponse response) {
|
||||
for (Listener listener : listeners) {
|
||||
listener.onIndexStateUpdated(response);
|
||||
}
|
||||
}
|
||||
|
||||
private class NodeIndexStateUpdatedTransportHandler extends BaseTransportRequestHandler<NodeIndexStateUpdatedResponse> {
|
||||
|
||||
static final String ACTION = "cluster/nodeIndexStateUpdated";
|
||||
|
||||
@Override
|
||||
public NodeIndexStateUpdatedResponse newInstance() {
|
||||
return new NodeIndexStateUpdatedResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(NodeIndexStateUpdatedResponse response, TransportChannel channel) throws Exception {
|
||||
innerNodeIndexStateUpdated(response);
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
}
|
||||
|
||||
public static interface Listener {
|
||||
void onIndexStateUpdated(NodeIndexStateUpdatedResponse response);
|
||||
|
||||
void onTimeout();
|
||||
}
|
||||
|
||||
public static class NodeIndexStateUpdatedResponse extends TransportRequest {
|
||||
private String nodeId;
|
||||
private long version;
|
||||
|
||||
NodeIndexStateUpdatedResponse() {
|
||||
}
|
||||
|
||||
public NodeIndexStateUpdatedResponse(String nodeId, long version) {
|
||||
this.nodeId = nodeId;
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
public String nodeId() {
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
public long version() {
|
||||
return version;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(nodeId);
|
||||
out.writeLong(version);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
nodeId = in.readString();
|
||||
version = in.readLong();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,25 +20,28 @@
|
|||
package org.elasticsearch.cluster.metadata;
|
||||
|
||||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
|
||||
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.action.index.NodeIndicesStateUpdatedAction;
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.indices.IndexMissingException;
|
||||
import org.elasticsearch.indices.IndexPrimaryShardNotAllocatedException;
|
||||
|
@ -49,7 +52,7 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
|
||||
/**
|
||||
*
|
||||
* Service responsible for submitting open/close index requests
|
||||
*/
|
||||
public class MetaDataIndexStateService extends AbstractComponent {
|
||||
|
||||
|
@ -59,27 +62,44 @@ public class MetaDataIndexStateService extends AbstractComponent {
|
|||
|
||||
private final AllocationService allocationService;
|
||||
|
||||
private final NodeIndicesStateUpdatedAction indicesStateUpdatedAction;
|
||||
|
||||
@Inject
|
||||
public MetaDataIndexStateService(Settings settings, ClusterService clusterService, AllocationService allocationService, NodeIndicesStateUpdatedAction indicesStateUpdatedAction) {
|
||||
public MetaDataIndexStateService(Settings settings, ClusterService clusterService, AllocationService allocationService) {
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.allocationService = allocationService;
|
||||
this.indicesStateUpdatedAction = indicesStateUpdatedAction;
|
||||
}
|
||||
|
||||
public void closeIndex(final Request request, final Listener listener) {
|
||||
if (request.indices == null || request.indices.length == 0) {
|
||||
public void closeIndex(final CloseIndexClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) {
|
||||
if (request.indices() == null || request.indices().length == 0) {
|
||||
throw new ElasticSearchIllegalArgumentException("Index name is required");
|
||||
}
|
||||
|
||||
final String indicesAsString = Arrays.toString(request.indices);
|
||||
clusterService.submitStateUpdateTask("close-indices " + indicesAsString, Priority.URGENT, new TimeoutClusterStateUpdateTask() {
|
||||
final String indicesAsString = Arrays.toString(request.indices());
|
||||
clusterService.submitStateUpdateTask("close-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask() {
|
||||
|
||||
@Override
|
||||
public boolean mustAck(DiscoveryNode discoveryNode) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAllNodesAcked(@Nullable Throwable t) {
|
||||
listener.onResponse(new ClusterStateUpdateResponse(true));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAckTimeout() {
|
||||
listener.onResponse(new ClusterStateUpdateResponse(false));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue ackTimeout() {
|
||||
return request.ackTimeout();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue timeout() {
|
||||
return request.masterTimeout;
|
||||
return request.masterNodeTimeout();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -90,7 +110,7 @@ public class MetaDataIndexStateService extends AbstractComponent {
|
|||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
List<String> indicesToClose = new ArrayList<String>();
|
||||
for (String index : request.indices) {
|
||||
for (String index : request.indices()) {
|
||||
IndexMetaData indexMetaData = currentState.metaData().index(index);
|
||||
if (indexMetaData == null) {
|
||||
throw new IndexMissingException(new Index(index));
|
||||
|
@ -129,34 +149,48 @@ public class MetaDataIndexStateService extends AbstractComponent {
|
|||
}
|
||||
|
||||
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(rtBuilder).build());
|
||||
|
||||
ClusterState newClusterState = ClusterState.builder(updatedState).routingResult(routingResult).build();
|
||||
|
||||
waitForOtherNodes(newClusterState, listener, request.timeout);
|
||||
|
||||
return newClusterState;
|
||||
//no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask
|
||||
return ClusterState.builder(updatedState).routingResult(routingResult).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
if (oldState == newState) {
|
||||
// we didn't do anything, callback
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void openIndex(final Request request, final Listener listener) {
|
||||
if (request.indices == null || request.indices.length == 0) {
|
||||
public void openIndex(final OpenIndexClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) {
|
||||
if (request.indices() == null || request.indices().length == 0) {
|
||||
throw new ElasticSearchIllegalArgumentException("Index name is required");
|
||||
}
|
||||
|
||||
final String indicesAsString = Arrays.toString(request.indices);
|
||||
clusterService.submitStateUpdateTask("open-indices " + indicesAsString, Priority.URGENT, new TimeoutClusterStateUpdateTask() {
|
||||
final String indicesAsString = Arrays.toString(request.indices());
|
||||
clusterService.submitStateUpdateTask("open-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask() {
|
||||
|
||||
@Override
|
||||
public boolean mustAck(DiscoveryNode discoveryNode) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAllNodesAcked(@Nullable Throwable t) {
|
||||
listener.onResponse(new ClusterStateUpdateResponse(true));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAckTimeout() {
|
||||
listener.onResponse(new ClusterStateUpdateResponse(false));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue ackTimeout() {
|
||||
return request.ackTimeout();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue timeout() {
|
||||
return request.masterTimeout;
|
||||
return request.masterNodeTimeout();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -167,7 +201,7 @@ public class MetaDataIndexStateService extends AbstractComponent {
|
|||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
List<String> indicesToOpen = new ArrayList<String>();
|
||||
for (String index : request.indices) {
|
||||
for (String index : request.indices()) {
|
||||
IndexMetaData indexMetaData = currentState.metaData().index(index);
|
||||
if (indexMetaData == null) {
|
||||
throw new IndexMissingException(new Index(index));
|
||||
|
@ -199,103 +233,15 @@ public class MetaDataIndexStateService extends AbstractComponent {
|
|||
}
|
||||
|
||||
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(rtBuilder).build());
|
||||
|
||||
ClusterState newClusterState = ClusterState.builder(updatedState).routingResult(routingResult).build();
|
||||
|
||||
waitForOtherNodes(newClusterState, listener, request.timeout);
|
||||
|
||||
return newClusterState;
|
||||
|
||||
//no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask
|
||||
return ClusterState.builder(updatedState).routingResult(routingResult).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
if (oldState == newState) {
|
||||
// we didn't do anything, callback
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void waitForOtherNodes(ClusterState updatedState, Listener listener, TimeValue timeout) {
|
||||
// wait for responses from other nodes if needed
|
||||
int responseCount = updatedState.nodes().size();
|
||||
long version = updatedState.version() + 1;
|
||||
logger.trace("waiting for [{}] notifications with version [{}]", responseCount, version);
|
||||
indicesStateUpdatedAction.add(new CountDownListener(responseCount, listener, version), timeout);
|
||||
}
|
||||
|
||||
public static interface Listener {
|
||||
|
||||
void onResponse(Response response);
|
||||
|
||||
void onFailure(Throwable t);
|
||||
}
|
||||
|
||||
public static class Request {
|
||||
|
||||
final String[] indices;
|
||||
|
||||
TimeValue timeout = TimeValue.timeValueSeconds(10);
|
||||
TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT;
|
||||
|
||||
public Request(String[] indices) {
|
||||
this.indices = indices;
|
||||
}
|
||||
|
||||
public Request timeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Request masterTimeout(TimeValue masterTimeout) {
|
||||
this.masterTimeout = masterTimeout;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Response {
|
||||
private final boolean acknowledged;
|
||||
|
||||
public Response(boolean acknowledged) {
|
||||
this.acknowledged = acknowledged;
|
||||
}
|
||||
|
||||
public boolean acknowledged() {
|
||||
return acknowledged;
|
||||
}
|
||||
}
|
||||
|
||||
private class CountDownListener implements NodeIndicesStateUpdatedAction.Listener {
|
||||
private final CountDown countDown;
|
||||
private final Listener listener;
|
||||
private final long version;
|
||||
|
||||
public CountDownListener(int count, Listener listener, long version) {
|
||||
this.countDown = new CountDown(count);
|
||||
this.listener = listener;
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onIndexStateUpdated(NodeIndicesStateUpdatedAction.NodeIndexStateUpdatedResponse response) {
|
||||
if (version <= response.version()) {
|
||||
logger.trace("Received NodeIndexStateUpdatedResponse with version [{}] from [{}]", response.version(), response.nodeId());
|
||||
if (countDown.countDown()) {
|
||||
indicesStateUpdatedAction.remove(this);
|
||||
logger.trace("NodeIndexStateUpdated was acknowledged by all expected nodes, returning");
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout() {
|
||||
if (countDown.fastForward()) {
|
||||
indicesStateUpdatedAction.remove(this);
|
||||
listener.onResponse(new Response(false));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -83,7 +83,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
private final NodeIndexDeletedAction nodeIndexDeletedAction;
|
||||
private final NodeMappingCreatedAction nodeMappingCreatedAction;
|
||||
private final NodeMappingRefreshAction nodeMappingRefreshAction;
|
||||
private final NodeIndicesStateUpdatedAction nodeIndicesStateUpdatedAction;
|
||||
|
||||
// a map of mappings type we have seen per index due to cluster state
|
||||
// we need this so we won't remove types automatically created as part of the indexing process
|
||||
|
@ -112,8 +111,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
ThreadPool threadPool, RecoveryTarget recoveryTarget,
|
||||
ShardStateAction shardStateAction,
|
||||
NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction,
|
||||
NodeMappingCreatedAction nodeMappingCreatedAction, NodeMappingRefreshAction nodeMappingRefreshAction,
|
||||
NodeIndicesStateUpdatedAction nodeIndicesStateUpdatedAction) {
|
||||
NodeMappingCreatedAction nodeMappingCreatedAction, NodeMappingRefreshAction nodeMappingRefreshAction) {
|
||||
super(settings);
|
||||
this.indicesService = indicesService;
|
||||
this.clusterService = clusterService;
|
||||
|
@ -124,7 +122,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
|
||||
this.nodeMappingCreatedAction = nodeMappingCreatedAction;
|
||||
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
|
||||
this.nodeIndicesStateUpdatedAction = nodeIndicesStateUpdatedAction;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -182,7 +179,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
applyCleanedIndices(event);
|
||||
applySettings(event);
|
||||
sendIndexLifecycleEvents(event);
|
||||
notifyIndicesStateChanged(event);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -205,18 +201,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
}
|
||||
}
|
||||
|
||||
private void notifyIndicesStateChanged(final ClusterChangedEvent event) {
|
||||
//handles open/close index notifications
|
||||
if (event.indicesStateChanged()) {
|
||||
try {
|
||||
nodeIndicesStateUpdatedAction.nodeIndexStateUpdated(event.state(),
|
||||
new NodeIndicesStateUpdatedAction.NodeIndexStateUpdatedResponse(event.state().nodes().localNodeId(), event.state().version()));
|
||||
} catch (Throwable e) {
|
||||
logger.debug("failed to send to master indices state change event", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanMismatchedIndexUUIDs(final ClusterChangedEvent event) {
|
||||
for (IndexService indexService : indicesService) {
|
||||
IndexMetaData indexMetaData = event.state().metaData().index(indexService.index().name());
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.rest.action.admin.indices.close;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
|
||||
import org.elasticsearch.action.support.IgnoreIndices;
|
||||
|
@ -27,15 +26,7 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.rest.*;
|
||||
import org.elasticsearch.rest.action.support.RestXContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||
import static org.elasticsearch.rest.RestStatus.OK;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -54,38 +45,10 @@ public class RestCloseIndexAction extends BaseRestHandler {
|
|||
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
closeIndexRequest.listenerThreaded(false);
|
||||
closeIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", closeIndexRequest.masterNodeTimeout()));
|
||||
closeIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
|
||||
closeIndexRequest.timeout(request.paramAsTime("timeout", closeIndexRequest.timeout()));
|
||||
if (request.hasParam("ignore_indices")) {
|
||||
closeIndexRequest.ignoreIndices(IgnoreIndices.fromString(request.param("ignore_indices")));
|
||||
}
|
||||
client.admin().indices().close(closeIndexRequest, new ActionListener<CloseIndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(CloseIndexResponse response) {
|
||||
try {
|
||||
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
|
||||
builder.startObject()
|
||||
.field(Fields.OK, true)
|
||||
.field(Fields.ACKNOWLEDGED, response.isAcknowledged())
|
||||
.endObject();
|
||||
channel.sendResponse(new XContentRestResponse(request, OK, builder));
|
||||
} catch (IOException e) {
|
||||
onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
try {
|
||||
channel.sendResponse(new XContentThrowableRestResponse(request, e));
|
||||
} catch (IOException e1) {
|
||||
logger.error("Failed to send failure response", e1);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString OK = new XContentBuilderString("ok");
|
||||
static final XContentBuilderString ACKNOWLEDGED = new XContentBuilderString("acknowledged");
|
||||
client.admin().indices().close(closeIndexRequest, new AcknowledgedRestResponseActionListener<CloseIndexResponse>(request, channel, logger));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.rest.action.admin.indices.open;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
|
||||
import org.elasticsearch.action.support.IgnoreIndices;
|
||||
|
@ -27,15 +26,7 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.rest.*;
|
||||
import org.elasticsearch.rest.action.support.RestXContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||
import static org.elasticsearch.rest.RestStatus.OK;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -53,39 +44,11 @@ public class RestOpenIndexAction extends BaseRestHandler {
|
|||
public void handleRequest(final RestRequest request, final RestChannel channel) {
|
||||
OpenIndexRequest openIndexRequest = new OpenIndexRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
openIndexRequest.listenerThreaded(false);
|
||||
openIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
|
||||
openIndexRequest.timeout(request.paramAsTime("timeout", openIndexRequest.timeout()));
|
||||
openIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", openIndexRequest.masterNodeTimeout()));
|
||||
if (request.hasParam("ignore_indices")) {
|
||||
openIndexRequest.ignoreIndices(IgnoreIndices.fromString(request.param("ignore_indices")));
|
||||
}
|
||||
client.admin().indices().open(openIndexRequest, new ActionListener<OpenIndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(OpenIndexResponse response) {
|
||||
try {
|
||||
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
|
||||
builder.startObject()
|
||||
.field(Fields.OK, true)
|
||||
.field(Fields.ACKNOWLEDGED, response.isAcknowledged())
|
||||
.endObject();
|
||||
channel.sendResponse(new XContentRestResponse(request, OK, builder));
|
||||
} catch (IOException e) {
|
||||
onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
try {
|
||||
channel.sendResponse(new XContentThrowableRestResponse(request, e));
|
||||
} catch (IOException e1) {
|
||||
logger.error("Failed to send failure response", e1);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString OK = new XContentBuilderString("ok");
|
||||
static final XContentBuilderString ACKNOWLEDGED = new XContentBuilderString("acknowledged");
|
||||
client.admin().indices().open(openIndexRequest, new AcknowledgedRestResponseActionListener<OpenIndexResponse>(request, channel, logger));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,8 +26,10 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
|
|||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
|
||||
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse;
|
||||
import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse;
|
||||
|
@ -35,6 +37,7 @@ import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.*;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
|
@ -412,4 +415,57 @@ public class AckTests extends ElasticsearchIntegrationTest {
|
|||
IndicesAliasesResponse indicesAliasesResponse = client().admin().indices().prepareAliases().addAlias("test", "alias").setTimeout("0s").get();
|
||||
assertThat(indicesAliasesResponse.isAcknowledged(), equalTo(false));
|
||||
}
|
||||
|
||||
public void testCloseIndexAcknowledgement() {
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
|
||||
CloseIndexResponse closeIndexResponse = client().admin().indices().prepareClose("test").execute().actionGet();
|
||||
assertThat(closeIndexResponse.isAcknowledged(), equalTo(true));
|
||||
|
||||
for (Client client : clients()) {
|
||||
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setLocal(true).execute().actionGet();
|
||||
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().indices().get("test");
|
||||
assertThat(indexMetaData.getState(), equalTo(IndexMetaData.State.CLOSE));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseIndexNoAcknowledgement() {
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
|
||||
CloseIndexResponse closeIndexResponse= client().admin().indices().prepareClose("test").setTimeout("0s").get();
|
||||
assertThat(closeIndexResponse.isAcknowledged(), equalTo(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOpenIndexAcknowledgement() {
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
|
||||
CloseIndexResponse closeIndexResponse = client().admin().indices().prepareClose("test").execute().actionGet();
|
||||
assertThat(closeIndexResponse.isAcknowledged(), equalTo(true));
|
||||
|
||||
OpenIndexResponse openIndexResponse= client().admin().indices().prepareOpen("test").execute().actionGet();
|
||||
assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
|
||||
|
||||
for (Client client : clients()) {
|
||||
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setLocal(true).execute().actionGet();
|
||||
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().indices().get("test");
|
||||
assertThat(indexMetaData.getState(), equalTo(IndexMetaData.State.OPEN));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOpenIndexNoAcknowledgement() {
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
|
||||
CloseIndexResponse closeIndexResponse = client().admin().indices().prepareClose("test").execute().actionGet();
|
||||
assertThat(closeIndexResponse.isAcknowledged(), equalTo(true));
|
||||
|
||||
OpenIndexResponse openIndexResponse = client().admin().indices().prepareOpen("test").setTimeout("0s").get();
|
||||
assertThat(openIndexResponse.isAcknowledged(), equalTo(false));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,10 +28,8 @@ import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
|
|||
import org.elasticsearch.action.support.IgnoreIndices;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.indices.IndexMissingException;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -253,7 +251,7 @@ public class OpenCloseIndexTests extends ElasticsearchIntegrationTest {
|
|||
assertIndexIsOpened("test1");
|
||||
}
|
||||
|
||||
@Test @TestLogging("cluster.metadata:TRACE")
|
||||
@Test
|
||||
public void testCloseOpenAliasMultipleIndices() {
|
||||
Client client = client();
|
||||
createIndex("test1", "test2");
|
||||
|
@ -274,25 +272,6 @@ public class OpenCloseIndexTests extends ElasticsearchIntegrationTest {
|
|||
assertIndexIsOpened("test1", "test2");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleCloseOpenAcknowledged() {
|
||||
createIndex("test1");
|
||||
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
|
||||
assertThat(healthResponse.isTimedOut(), equalTo(false));
|
||||
|
||||
CloseIndexResponse closeIndexResponse = client().admin().indices().prepareClose("test1").execute().actionGet();
|
||||
assertThat(closeIndexResponse.isAcknowledged(), equalTo(true));
|
||||
assertIndexIsClosedOnAllNodes("test1");
|
||||
|
||||
OpenIndexResponse openIndexResponse = client().admin().indices().prepareOpen("test1").execute().actionGet();
|
||||
assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
|
||||
assertIndexIsOpenedOnAllNodes("test1");
|
||||
|
||||
//we now set the timeout to 0, which means not wait for acknowledgement from other nodes
|
||||
closeIndexResponse = client().admin().indices().prepareClose("test1").setTimeout(TimeValue.timeValueMillis(0)).execute().actionGet();
|
||||
assertThat(closeIndexResponse.isAcknowledged(), equalTo(false));
|
||||
}
|
||||
|
||||
private void assertIndexIsOpened(String... indices) {
|
||||
checkIndexState(IndexMetaData.State.OPEN, indices);
|
||||
}
|
||||
|
@ -301,31 +280,10 @@ public class OpenCloseIndexTests extends ElasticsearchIntegrationTest {
|
|||
checkIndexState(IndexMetaData.State.CLOSE, indices);
|
||||
}
|
||||
|
||||
private void assertIndexIsOpenedOnAllNodes(String... indices) {
|
||||
checkIndexStateOnAllNodes(IndexMetaData.State.OPEN, indices);
|
||||
}
|
||||
|
||||
private void assertIndexIsClosedOnAllNodes(String... indices) {
|
||||
checkIndexStateOnAllNodes(IndexMetaData.State.CLOSE, indices);
|
||||
}
|
||||
|
||||
private void checkIndexStateOnAllNodes(IndexMetaData.State state, String... indices) {
|
||||
//we explicitly check the cluster state on all nodes forcing the local execution
|
||||
// we want to make sure that acknowledged true means that all the nodes already hold the updated cluster state
|
||||
for (Client client : clients()) {
|
||||
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setLocal(true).execute().actionGet();
|
||||
checkIndexState(state, clusterStateResponse, indices);
|
||||
}
|
||||
}
|
||||
|
||||
private void checkIndexState(IndexMetaData.State expectedState, String... indices) {
|
||||
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().execute().actionGet();
|
||||
checkIndexState(expectedState, clusterStateResponse, indices);
|
||||
}
|
||||
|
||||
private void checkIndexState(IndexMetaData.State expectedState, ClusterStateResponse clusterState, String... indices) {
|
||||
for (String index : indices) {
|
||||
IndexMetaData indexMetaData = clusterState.getState().metaData().indices().get(index);
|
||||
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().indices().get(index);
|
||||
assertThat(indexMetaData, notNullValue());
|
||||
assertThat(indexMetaData.getState(), equalTo(expectedState));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue