diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/DeleteMappingClusterStateUpdateRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/DeleteMappingClusterStateUpdateRequest.java new file mode 100644 index 00000000000..bfee37d4370 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/DeleteMappingClusterStateUpdateRequest.java @@ -0,0 +1,65 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.mapping.delete; + +import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; + +/** + * Cluster state update request that allows to delete a mapping + */ +public class DeleteMappingClusterStateUpdateRequest extends ClusterStateUpdateRequest { + + private String[] indices; + private String type; + + DeleteMappingClusterStateUpdateRequest() { + + } + + /** + * Returns the indices the operation needs to be executed on + */ + public String[] indices() { + return indices; + } + + /** + * Sets the indices the operation needs to be executed on + */ + public DeleteMappingClusterStateUpdateRequest indices(String[] indices) { + this.indices = indices; + return this; + } + + /** + * Returns the type to be removed + */ + public String type() { + return type; + } + + /** + * Sets the type to be removed + */ + public DeleteMappingClusterStateUpdateRequest type(String type) { + this.type = type; + return this; + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/DeleteMappingRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/DeleteMappingRequest.java index ee28b71b3a3..a9af5f48512 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/DeleteMappingRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/DeleteMappingRequest.java @@ -19,8 +19,9 @@ package org.elasticsearch.action.admin.indices.mapping.delete; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.master.MasterNodeOperationRequest; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -29,9 +30,9 @@ import java.io.IOException; import static org.elasticsearch.action.ValidateActions.addValidationError; /** - * + * Represents a request to delete a mapping */ -public class DeleteMappingRequest extends MasterNodeOperationRequest { +public class DeleteMappingRequest extends AcknowledgedRequest { private String[] indices; @@ -41,7 +42,7 @@ public class DeleteMappingRequest extends MasterNodeOperationRequest { +public class DeleteMappingRequestBuilder extends AcknowledgedRequestBuilder { public DeleteMappingRequestBuilder(IndicesAdminClient indicesClient) { super((InternalIndicesAdminClient) indicesClient, new DeleteMappingRequest()); } + /** + * Sets the indices the delete mapping will execute on + */ public DeleteMappingRequestBuilder setIndices(String... indices) { request.indices(indices); return this; } /** - * The type of the mapping to remove. + * Sets the type of the mapping to remove */ public DeleteMappingRequestBuilder setType(String type) { request.type(type); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/DeleteMappingResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/DeleteMappingResponse.java index 72cfe8b35f8..6208de25a9b 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/DeleteMappingResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/DeleteMappingResponse.java @@ -19,7 +19,8 @@ package org.elasticsearch.action.admin.indices.mapping.delete; -import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.Version; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -28,19 +29,25 @@ import java.io.IOException; /** * The response of remove mapping operation. */ -public class DeleteMappingResponse extends ActionResponse { +public class DeleteMappingResponse extends AcknowledgedResponse { DeleteMappingResponse() { } + DeleteMappingResponse(boolean acknowledged) { + super(acknowledged); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); + readAcknowledged(in, Version.V_0_90_6); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + writeAcknowledged(out, Version.V_0_90_6); } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java index ab998092d36..93008f890a0 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java @@ -31,6 +31,8 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.MetaDataMappingService; @@ -106,25 +108,24 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc refreshAction.execute(Requests.refreshRequest(request.indices()), new ActionListener() { @Override public void onResponse(RefreshResponse refreshResponse) { - metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() { - @Override - public void onResponse(MetaDataMappingService.Response response) { - listener.onResponse(new DeleteMappingResponse()); - } - - @Override - public void onFailure(Throwable t) { - listener.onFailure(t); - } - }); + removeMapping(); } @Override public void onFailure(Throwable e) { - metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() { + removeMapping(); + } + + protected void removeMapping() { + DeleteMappingClusterStateUpdateRequest clusterStateUpdateRequest = new DeleteMappingClusterStateUpdateRequest() + .indices(request.indices()).type(request.type()) + .ackTimeout(request.timeout()) + .masterNodeTimeout(request.masterNodeTimeout()); + + metaDataMappingService.removeMapping(clusterStateUpdateRequest, new ClusterStateUpdateListener() { @Override - public void onResponse(MetaDataMappingService.Response response) { - listener.onResponse(new DeleteMappingResponse()); + public void onResponse(ClusterStateUpdateResponse response) { + listener.onResponse(new DeleteMappingResponse(response.isAcknowledged())); } @Override diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index 9fd5312c8f2..048ab7696b1 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -22,13 +22,15 @@ package org.elasticsearch.cluster.metadata; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingClusterStateUpdateRequest; import org.elasticsearch.action.support.master.MasterNodeOperationRequest; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; +import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.compress.CompressedString; @@ -293,12 +295,32 @@ public class MetaDataMappingService extends AbstractComponent { }); } - public void removeMapping(final RemoveRequest request, final Listener listener) { - clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", Priority.HIGH, new TimeoutClusterStateUpdateTask() { + public void removeMapping(final DeleteMappingClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) { + clusterService.submitStateUpdateTask("remove-mapping [" + request.type() + "]", Priority.HIGH, new AckedClusterStateUpdateTask() { + + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked(@Nullable Throwable t) { + listener.onResponse(new ClusterStateUpdateResponse(true)); + } + + @Override + public void onAckTimeout() { + listener.onResponse(new ClusterStateUpdateResponse(true)); + } + + @Override + public TimeValue ackTimeout() { + return request.ackTimeout(); + } @Override public TimeValue timeout() { - return request.masterTimeout; + return request.masterNodeTimeout(); } @Override @@ -308,18 +330,18 @@ public class MetaDataMappingService extends AbstractComponent { @Override public ClusterState execute(ClusterState currentState) { - if (request.indices.length == 0) { + if (request.indices().length == 0) { throw new IndexMissingException(new Index("_all")); } MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); boolean changed = false; String latestIndexWithout = null; - for (String indexName : request.indices) { + for (String indexName : request.indices()) { IndexMetaData indexMetaData = currentState.metaData().index(indexName); if (indexMetaData != null) { - if (indexMetaData.mappings().containsKey(request.mappingType)) { - builder.put(newIndexMetaDataBuilder(indexMetaData).removeMapping(request.mappingType)); + if (indexMetaData.mappings().containsKey(request.type())) { + builder.put(newIndexMetaDataBuilder(indexMetaData).removeMapping(request.type())); changed = true; } else { latestIndexWithout = indexMetaData.index(); @@ -328,17 +350,17 @@ public class MetaDataMappingService extends AbstractComponent { } if (!changed) { - throw new TypeMissingException(new Index(latestIndexWithout), request.mappingType); + throw new TypeMissingException(new Index(latestIndexWithout), request.type()); } - logger.info("[{}] remove_mapping [{}]", request.indices, request.mappingType); + logger.info("[{}] remove_mapping [{}]", request.indices(), request.type()); return ClusterState.builder().state(currentState).metaData(builder).build(); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new Response(true)); + } }); } @@ -526,23 +548,6 @@ public class MetaDataMappingService extends AbstractComponent { void onFailure(Throwable t); } - public static class RemoveRequest { - - final String[] indices; - final String mappingType; - TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT; - - public RemoveRequest(String[] indices, String mappingType) { - this.indices = indices; - this.mappingType = mappingType; - } - - public RemoveRequest masterTimeout(TimeValue masterTimeout) { - this.masterTimeout = masterTimeout; - return this; - } - } - public static class PutRequest { final String[] indices; diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/delete/RestDeleteMappingAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/delete/RestDeleteMappingAction.java index 52775c1c3b8..e69f947ed00 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/delete/RestDeleteMappingAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/delete/RestDeleteMappingAction.java @@ -19,22 +19,16 @@ package org.elasticsearch.rest.action.admin.indices.mapping.delete; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequest; import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.*; -import org.elasticsearch.rest.action.support.RestXContentBuilder; - -import java.io.IOException; import static org.elasticsearch.client.Requests.deleteMappingRequest; import static org.elasticsearch.rest.RestRequest.Method.DELETE; -import static org.elasticsearch.rest.RestStatus.OK; /** * @@ -53,29 +47,8 @@ public class RestDeleteMappingAction extends BaseRestHandler { DeleteMappingRequest deleteMappingRequest = deleteMappingRequest(Strings.splitStringByCommaToArray(request.param("index"))); deleteMappingRequest.listenerThreaded(false); deleteMappingRequest.type(request.param("type")); + deleteMappingRequest.timeout(request.paramAsTime("timeout", deleteMappingRequest.timeout())); deleteMappingRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteMappingRequest.masterNodeTimeout())); - client.admin().indices().deleteMapping(deleteMappingRequest, new ActionListener() { - @Override - public void onResponse(DeleteMappingResponse response) { - try { - XContentBuilder builder = RestXContentBuilder.restContentBuilder(request); - builder.startObject() - .field("ok", true); - builder.endObject(); - channel.sendResponse(new XContentRestResponse(request, OK, builder)); - } catch (IOException e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(new XContentThrowableRestResponse(request, e)); - } catch (IOException e1) { - logger.error("Failed to send failure response", e1); - } - } - }); + client.admin().indices().deleteMapping(deleteMappingRequest, new AcknowledgedRestResponseActionListener(request, channel, logger)); } } diff --git a/src/test/java/org/elasticsearch/cluster/ack/AckTests.java b/src/test/java/org/elasticsearch/cluster/ack/AckTests.java index 45166d32b0d..36d4525b3a4 100644 --- a/src/test/java/org/elasticsearch/cluster/ack/AckTests.java +++ b/src/test/java/org/elasticsearch/cluster/ack/AckTests.java @@ -21,6 +21,8 @@ package org.elasticsearch.cluster.ack; import com.google.common.collect.ImmutableList; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse; import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse; import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse; @@ -38,6 +40,7 @@ import java.util.Map; import static org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; import static org.elasticsearch.test.AbstractIntegrationTest.Scope.*; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; @ClusterScope(scope = SUITE) public class AckTests extends AbstractIntegrationTest { @@ -102,4 +105,24 @@ public class AckTests extends AbstractIntegrationTest { assertThat(getWarmersResponse.warmers().size(), equalTo(0)); } } + + @Test + public void testDeleteMappingAcknowledgement() { + client().admin().indices().prepareCreate("test") + .addMapping("type1", "field1", "type=string").get(); + ensureGreen(); + + client().prepareIndex("test", "type1").setSource("field1", "value1"); + + GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings("test").addTypes("type1").get(); + assertThat(getMappingsResponse.mappings().get("test").get("type1"), notNullValue()); + + DeleteMappingResponse deleteMappingResponse = client().admin().indices().prepareDeleteMapping("test").setType("type1").get(); + assertThat(deleteMappingResponse.isAcknowledged(), equalTo(true)); + + for (Client client : clients()) { + getMappingsResponse = client.admin().indices().prepareGetMappings("test").addTypes("type1").get(); + assertThat(getMappingsResponse.mappings().size(), equalTo(0)); + } + } }