Added support for node acknowledgements in delete mapping api

Closes #3984
This commit is contained in:
Luca Cavanna 2013-10-26 02:35:59 +02:00
parent 5f1ebf20f7
commit f23a1e23b1
8 changed files with 166 additions and 86 deletions

View File

@ -0,0 +1,65 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.mapping.delete;
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
/**
* Cluster state update request that allows to delete a mapping
*/
public class DeleteMappingClusterStateUpdateRequest extends ClusterStateUpdateRequest<DeleteMappingClusterStateUpdateRequest> {
private String[] indices;
private String type;
DeleteMappingClusterStateUpdateRequest() {
}
/**
* Returns the indices the operation needs to be executed on
*/
public String[] indices() {
return indices;
}
/**
* Sets the indices the operation needs to be executed on
*/
public DeleteMappingClusterStateUpdateRequest indices(String[] indices) {
this.indices = indices;
return this;
}
/**
* Returns the type to be removed
*/
public String type() {
return type;
}
/**
* Sets the type to be removed
*/
public DeleteMappingClusterStateUpdateRequest type(String type) {
this.type = type;
return this;
}
}

View File

@ -19,8 +19,9 @@
package org.elasticsearch.action.admin.indices.mapping.delete;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -29,9 +30,9 @@ import java.io.IOException;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
*
* Represents a request to delete a mapping
*/
public class DeleteMappingRequest extends MasterNodeOperationRequest<DeleteMappingRequest> {
public class DeleteMappingRequest extends AcknowledgedRequest<DeleteMappingRequest> {
private String[] indices;
@ -41,7 +42,7 @@ public class DeleteMappingRequest extends MasterNodeOperationRequest<DeleteMappi
}
/**
* Constructs a new put mapping request against one or more indices. If nothing is set then
* Constructs a new delete mapping request against one or more indices. If nothing is set then
* it will be executed against all indices.
*/
public DeleteMappingRequest(String... indices) {
@ -58,7 +59,7 @@ public class DeleteMappingRequest extends MasterNodeOperationRequest<DeleteMappi
}
/**
* Sets the indices this put mapping operation will execute on.
* Sets the indices this delete mapping operation will execute on.
*/
public DeleteMappingRequest indices(String[] indices) {
this.indices = indices;
@ -66,7 +67,7 @@ public class DeleteMappingRequest extends MasterNodeOperationRequest<DeleteMappi
}
/**
* The indices the mappings will be put.
* The indices the mappings will be removed from.
*/
public String[] indices() {
return indices;
@ -97,6 +98,7 @@ public class DeleteMappingRequest extends MasterNodeOperationRequest<DeleteMappi
if (in.readBoolean()) {
type = in.readString();
}
readTimeout(in, Version.V_0_90_6);
}
@Override
@ -116,5 +118,6 @@ public class DeleteMappingRequest extends MasterNodeOperationRequest<DeleteMappi
out.writeBoolean(true);
out.writeString(type);
}
writeTimeout(out, Version.V_0_90_6);
}
}

View File

@ -20,26 +20,29 @@
package org.elasticsearch.action.admin.indices.mapping.delete;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.internal.InternalIndicesAdminClient;
/**
*
* Builder for a delete mapping request
*/
public class DeleteMappingRequestBuilder extends MasterNodeOperationRequestBuilder<DeleteMappingRequest, DeleteMappingResponse, DeleteMappingRequestBuilder> {
public class DeleteMappingRequestBuilder extends AcknowledgedRequestBuilder<DeleteMappingRequest, DeleteMappingResponse, DeleteMappingRequestBuilder> {
public DeleteMappingRequestBuilder(IndicesAdminClient indicesClient) {
super((InternalIndicesAdminClient) indicesClient, new DeleteMappingRequest());
}
/**
* Sets the indices the delete mapping will execute on
*/
public DeleteMappingRequestBuilder setIndices(String... indices) {
request.indices(indices);
return this;
}
/**
* The type of the mapping to remove.
* Sets the type of the mapping to remove
*/
public DeleteMappingRequestBuilder setType(String type) {
request.type(type);

View File

@ -19,7 +19,8 @@
package org.elasticsearch.action.admin.indices.mapping.delete;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -28,19 +29,25 @@ import java.io.IOException;
/**
* The response of remove mapping operation.
*/
public class DeleteMappingResponse extends ActionResponse {
public class DeleteMappingResponse extends AcknowledgedResponse {
DeleteMappingResponse() {
}
DeleteMappingResponse(boolean acknowledged) {
super(acknowledged);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in, Version.V_0_90_6);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out, Version.V_0_90_6);
}
}

View File

@ -31,6 +31,8 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
@ -106,25 +108,24 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc
refreshAction.execute(Requests.refreshRequest(request.indices()), new ActionListener<RefreshResponse>() {
@Override
public void onResponse(RefreshResponse refreshResponse) {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() {
@Override
public void onResponse(MetaDataMappingService.Response response) {
listener.onResponse(new DeleteMappingResponse());
}
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
}
});
removeMapping();
}
@Override
public void onFailure(Throwable e) {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() {
removeMapping();
}
protected void removeMapping() {
DeleteMappingClusterStateUpdateRequest clusterStateUpdateRequest = new DeleteMappingClusterStateUpdateRequest()
.indices(request.indices()).type(request.type())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout());
metaDataMappingService.removeMapping(clusterStateUpdateRequest, new ClusterStateUpdateListener() {
@Override
public void onResponse(MetaDataMappingService.Response response) {
listener.onResponse(new DeleteMappingResponse());
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new DeleteMappingResponse(response.isAcknowledged()));
}
@Override

View File

@ -22,13 +22,15 @@ package org.elasticsearch.cluster.metadata;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingClusterStateUpdateRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedString;
@ -293,12 +295,32 @@ public class MetaDataMappingService extends AbstractComponent {
});
}
public void removeMapping(final RemoveRequest request, final Listener listener) {
clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", Priority.HIGH, new TimeoutClusterStateUpdateTask() {
public void removeMapping(final DeleteMappingClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) {
clusterService.submitStateUpdateTask("remove-mapping [" + request.type() + "]", Priority.HIGH, new AckedClusterStateUpdateTask() {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new ClusterStateUpdateResponse(true));
}
@Override
public void onAckTimeout() {
listener.onResponse(new ClusterStateUpdateResponse(true));
}
@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
@Override
public TimeValue timeout() {
return request.masterTimeout;
return request.masterNodeTimeout();
}
@Override
@ -308,18 +330,18 @@ public class MetaDataMappingService extends AbstractComponent {
@Override
public ClusterState execute(ClusterState currentState) {
if (request.indices.length == 0) {
if (request.indices().length == 0) {
throw new IndexMissingException(new Index("_all"));
}
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
boolean changed = false;
String latestIndexWithout = null;
for (String indexName : request.indices) {
for (String indexName : request.indices()) {
IndexMetaData indexMetaData = currentState.metaData().index(indexName);
if (indexMetaData != null) {
if (indexMetaData.mappings().containsKey(request.mappingType)) {
builder.put(newIndexMetaDataBuilder(indexMetaData).removeMapping(request.mappingType));
if (indexMetaData.mappings().containsKey(request.type())) {
builder.put(newIndexMetaDataBuilder(indexMetaData).removeMapping(request.type()));
changed = true;
} else {
latestIndexWithout = indexMetaData.index();
@ -328,17 +350,17 @@ public class MetaDataMappingService extends AbstractComponent {
}
if (!changed) {
throw new TypeMissingException(new Index(latestIndexWithout), request.mappingType);
throw new TypeMissingException(new Index(latestIndexWithout), request.type());
}
logger.info("[{}] remove_mapping [{}]", request.indices, request.mappingType);
logger.info("[{}] remove_mapping [{}]", request.indices(), request.type());
return ClusterState.builder().state(currentState).metaData(builder).build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new Response(true));
}
});
}
@ -526,23 +548,6 @@ public class MetaDataMappingService extends AbstractComponent {
void onFailure(Throwable t);
}
public static class RemoveRequest {
final String[] indices;
final String mappingType;
TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT;
public RemoveRequest(String[] indices, String mappingType) {
this.indices = indices;
this.mappingType = mappingType;
}
public RemoveRequest masterTimeout(TimeValue masterTimeout) {
this.masterTimeout = masterTimeout;
return this;
}
}
public static class PutRequest {
final String[] indices;

View File

@ -19,22 +19,16 @@
package org.elasticsearch.rest.action.admin.indices.mapping.delete;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import java.io.IOException;
import static org.elasticsearch.client.Requests.deleteMappingRequest;
import static org.elasticsearch.rest.RestRequest.Method.DELETE;
import static org.elasticsearch.rest.RestStatus.OK;
/**
*
@ -53,29 +47,8 @@ public class RestDeleteMappingAction extends BaseRestHandler {
DeleteMappingRequest deleteMappingRequest = deleteMappingRequest(Strings.splitStringByCommaToArray(request.param("index")));
deleteMappingRequest.listenerThreaded(false);
deleteMappingRequest.type(request.param("type"));
deleteMappingRequest.timeout(request.paramAsTime("timeout", deleteMappingRequest.timeout()));
deleteMappingRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteMappingRequest.masterNodeTimeout()));
client.admin().indices().deleteMapping(deleteMappingRequest, new ActionListener<DeleteMappingResponse>() {
@Override
public void onResponse(DeleteMappingResponse response) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject()
.field("ok", true);
builder.endObject();
channel.sendResponse(new XContentRestResponse(request, OK, builder));
} catch (IOException e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
client.admin().indices().deleteMapping(deleteMappingRequest, new AcknowledgedRestResponseActionListener<DeleteMappingResponse>(request, channel, logger));
}
}

View File

@ -21,6 +21,8 @@ package org.elasticsearch.cluster.ack;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse;
import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse;
@ -38,6 +40,7 @@ import java.util.Map;
import static org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import static org.elasticsearch.test.AbstractIntegrationTest.Scope.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
@ClusterScope(scope = SUITE)
public class AckTests extends AbstractIntegrationTest {
@ -102,4 +105,24 @@ public class AckTests extends AbstractIntegrationTest {
assertThat(getWarmersResponse.warmers().size(), equalTo(0));
}
}
@Test
public void testDeleteMappingAcknowledgement() {
client().admin().indices().prepareCreate("test")
.addMapping("type1", "field1", "type=string").get();
ensureGreen();
client().prepareIndex("test", "type1").setSource("field1", "value1");
GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings("test").addTypes("type1").get();
assertThat(getMappingsResponse.mappings().get("test").get("type1"), notNullValue());
DeleteMappingResponse deleteMappingResponse = client().admin().indices().prepareDeleteMapping("test").setType("type1").get();
assertThat(deleteMappingResponse.isAcknowledged(), equalTo(true));
for (Client client : clients()) {
getMappingsResponse = client.admin().indices().prepareGetMappings("test").addTypes("type1").get();
assertThat(getMappingsResponse.mappings().size(), equalTo(0));
}
}
}