From 630641f2927436b2954d79d110614f58400558a6 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Tue, 29 Oct 2013 18:29:55 +0100 Subject: [PATCH] Refactored put mapping api to make use of the new recently introduced generic ack mechanism Note: we were previously waiting for ack only from all nodes that contain shards for the indices that the mapping updatewas applied to. This change introduces a wait for ack from all nodes, consitent with other api as the ack is meant more on the cluster state itself, which is held by all nodes and needs to be updated on all nodes anyway. Closes #4228 --- .../PutMappingClusterStateUpdateRequest.java | 65 ++++++ .../mapping/put/PutMappingRequest.java | 39 +--- .../mapping/put/PutMappingRequestBuilder.java | 25 +-- .../mapping/put/PutMappingResponse.java | 20 +- .../put/TransportPutMappingAction.java | 14 +- .../elasticsearch/cluster/ClusterModule.java | 1 - .../action/index/MappingUpdatedAction.java | 6 +- .../index/NodeMappingCreatedAction.java | 171 --------------- .../metadata/MetaDataMappingService.java | 200 +++++------------- .../cluster/IndicesClusterStateService.java | 15 +- .../mapping/put/RestPutMappingAction.java | 34 +-- .../elasticsearch/cluster/ack/AckTests.java | 22 ++ .../hamcrest/ElasticsearchAssertions.java | 9 - 13 files changed, 169 insertions(+), 452 deletions(-) create mode 100644 src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingClusterStateUpdateRequest.java delete mode 100644 src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingClusterStateUpdateRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingClusterStateUpdateRequest.java new file mode 100644 index 00000000000..a0d212236f0 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingClusterStateUpdateRequest.java @@ -0,0 +1,65 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.mapping.put; + +import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest; + +/** + * Cluster state update request that allows to put a mapping + */ +public class PutMappingClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest { + + private String type; + + private String source; + + private boolean ignoreConflicts = false; + + PutMappingClusterStateUpdateRequest() { + + } + + public String type() { + return type; + } + + public PutMappingClusterStateUpdateRequest type(String type) { + this.type = type; + return this; + } + + public String source() { + return source; + } + + public PutMappingClusterStateUpdateRequest source(String source) { + this.source = source; + return this; + } + + public boolean ignoreConflicts() { + return ignoreConflicts; + } + + public PutMappingClusterStateUpdateRequest ignoreConflicts(boolean ignoreConflicts) { + this.ignoreConflicts = ignoreConflicts; + return this; + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequest.java index 9eb3352d999..6c2667ef179 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequest.java @@ -23,21 +23,18 @@ import com.carrotsearch.hppc.ObjectOpenHashSet; import org.elasticsearch.ElasticSearchGenerationException; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.master.MasterNodeOperationRequest; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; import java.util.Map; -import java.util.concurrent.TimeUnit; import static org.elasticsearch.action.ValidateActions.addValidationError; -import static org.elasticsearch.common.unit.TimeValue.readTimeValue; /** * Puts mapping definition registered under a specific type into one or more indices. Best created with @@ -51,7 +48,7 @@ import static org.elasticsearch.common.unit.TimeValue.readTimeValue; * @see org.elasticsearch.client.IndicesAdminClient#putMapping(PutMappingRequest) * @see PutMappingResponse */ -public class PutMappingRequest extends MasterNodeOperationRequest { +public class PutMappingRequest extends AcknowledgedRequest { private static ObjectOpenHashSet RESERVED_FIELDS = ObjectOpenHashSet.from( "_uid", "_id", "_type", "_source", "_all", "_analyzer", "_boost", "_parent", "_routing", "_index", @@ -64,8 +61,6 @@ public class PutMappingRequest extends MasterNodeOperationRequest10s. - */ - TimeValue timeout() { - return timeout; - } - - /** - * Timeout to wait till the put mapping gets acknowledged of all current cluster nodes. Defaults to - * 10s. - */ - public PutMappingRequest timeout(TimeValue timeout) { - this.timeout = timeout; - return this; - } - - /** - * Timeout to wait till the put mapping gets acknowledged of all current cluster nodes. Defaults to - * 10s. - */ - public PutMappingRequest timeout(String timeout) { - return timeout(TimeValue.parseTimeValue(timeout, null)); - } - /** * If there is already a mapping definition registered against the type, then it will be merged. If there are * elements that can't be merged are detected, the request will be rejected unless the @@ -274,7 +245,7 @@ public class PutMappingRequest extends MasterNodeOperationRequest { +public class PutMappingRequestBuilder extends AcknowledgedRequestBuilder { public PutMappingRequestBuilder(IndicesAdminClient indicesClient) { super((InternalIndicesAdminClient) indicesClient, new PutMappingRequest()); @@ -83,24 +82,6 @@ public class PutMappingRequestBuilder extends MasterNodeOperationRequestBuilder< return this; } - /** - * Timeout to wait till the put mapping gets acknowledged of all current cluster nodes. Defaults to - * 10s. - */ - public PutMappingRequestBuilder setTimeout(TimeValue timeout) { - request.timeout(timeout); - return this; - } - - /** - * Timeout to wait till the put mapping gets acknowledged of all current cluster nodes. Defaults to - * 10s. - */ - public PutMappingRequestBuilder setTimeout(String timeout) { - request.timeout(timeout); - return this; - } - /** * If there is already a mapping definition registered against the type, then it will be merged. If there are * elements that can't be merged are detected, the request will be rejected unless the diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingResponse.java index 58edc44ab02..33160862ab5 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingResponse.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.admin.indices.mapping.put; -import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -28,35 +28,25 @@ import java.io.IOException; /** * The response of put mapping operation. */ -public class PutMappingResponse extends ActionResponse { - - private boolean acknowledged; +public class PutMappingResponse extends AcknowledgedResponse { PutMappingResponse() { } PutMappingResponse(boolean acknowledged) { - this.acknowledged = acknowledged; - } - - /** - * Has the put mapping creation been acknowledged by all current cluster nodes within the - * provided {@link PutMappingRequest#timeout(org.elasticsearch.common.unit.TimeValue)}. - */ - public boolean isAcknowledged() { - return acknowledged; + super(acknowledged); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - acknowledged = in.readBoolean(); + readAcknowledged(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeBoolean(acknowledged); + writeAcknowledged(out); } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java index e12a4a5a922..270ca5a20d8 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java @@ -24,6 +24,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.MetaDataMappingService; @@ -81,10 +83,16 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio @Override protected void masterOperation(final PutMappingRequest request, final ClusterState state, final ActionListener listener) throws ElasticSearchException { - metaDataMappingService.putMapping(new MetaDataMappingService.PutRequest(request.indices(), request.type(), request.source()).ignoreConflicts(request.ignoreConflicts()).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() { + PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest() + .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) + .indices(request.indices()).type(request.type()) + .source(request.source()).ignoreConflicts(request.ignoreConflicts()); + + metaDataMappingService.putMapping(updateRequest, new ClusterStateUpdateListener() { + @Override - public void onResponse(MetaDataMappingService.Response response) { - listener.onResponse(new PutMappingResponse(response.acknowledged())); + public void onResponse(ClusterStateUpdateResponse response) { + listener.onResponse(new PutMappingResponse(response.isAcknowledged())); } @Override diff --git a/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 491deadf79a..5edfa236e04 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -73,7 +73,6 @@ public class ClusterModule extends AbstractModule implements SpawnModules { bind(ShardStateAction.class).asEagerSingleton(); bind(NodeIndexCreatedAction.class).asEagerSingleton(); bind(NodeIndexDeletedAction.class).asEagerSingleton(); - bind(NodeMappingCreatedAction.class).asEagerSingleton(); bind(NodeMappingRefreshAction.class).asEagerSingleton(); bind(MappingUpdatedAction.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index f8062700e57..f8cad2b88f8 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -28,6 +28,8 @@ import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataMappingService; import org.elasticsearch.common.compress.CompressedString; @@ -78,9 +80,9 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction listener) throws ElasticSearchException { - metaDataMappingService.updateMapping(request.index(), request.indexUUID(), request.type(), request.mappingSource(), new MetaDataMappingService.Listener() { + metaDataMappingService.updateMapping(request.index(), request.indexUUID(), request.type(), request.mappingSource(), new ClusterStateUpdateListener() { @Override - public void onResponse(MetaDataMappingService.Response response) { + public void onResponse(ClusterStateUpdateResponse response) { listener.onResponse(new MappingUpdatedResponse()); } diff --git a/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java deleted file mode 100644 index d39c6e88ecc..00000000000 --- a/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cluster.action.index; - -import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - * - */ -public class NodeMappingCreatedAction extends AbstractComponent { - - private final ThreadPool threadPool; - private final TransportService transportService; - private final List listeners = new CopyOnWriteArrayList(); - - @Inject - public NodeMappingCreatedAction(Settings settings, ThreadPool threadPool, TransportService transportService) { - super(settings); - this.threadPool = threadPool; - this.transportService = transportService; - transportService.registerHandler(NodeMappingCreatedTransportHandler.ACTION, new NodeMappingCreatedTransportHandler()); - } - - public void add(final Listener listener, TimeValue timeout) { - listeners.add(listener); - threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() { - @Override - public void run() { - boolean removed = listeners.remove(listener); - if (removed) { - listener.onTimeout(); - } - } - }); - } - - public void remove(Listener listener) { - listeners.remove(listener); - } - - public void nodeMappingCreated(final ClusterState state, final NodeMappingCreatedResponse response) throws ElasticSearchException { - logger.debug("Sending mapping created for index {}, type {} (cluster state version: {})", response.index, response.type, response.clusterStateVersion); - if (state.nodes().localNodeMaster()) { - threadPool.generic().execute(new Runnable() { - @Override - public void run() { - innerNodeIndexCreated(response); - } - }); - } else { - transportService.sendRequest(state.nodes().masterNode(), - NodeMappingCreatedTransportHandler.ACTION, response, EmptyTransportResponseHandler.INSTANCE_SAME); - } - } - - private void innerNodeIndexCreated(NodeMappingCreatedResponse response) { - for (Listener listener : listeners) { - listener.onNodeMappingCreated(response); - } - } - - - public static interface Listener { - void onNodeMappingCreated(NodeMappingCreatedResponse response); - - void onTimeout(); - } - - private class NodeMappingCreatedTransportHandler extends BaseTransportRequestHandler { - - static final String ACTION = "cluster/nodeMappingCreated"; - - @Override - public NodeMappingCreatedResponse newInstance() { - return new NodeMappingCreatedResponse(); - } - - @Override - public void messageReceived(NodeMappingCreatedResponse response, TransportChannel channel) throws Exception { - innerNodeIndexCreated(response); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - } - - public static class NodeMappingCreatedResponse extends TransportRequest { - - private String index; - private String type; - private String nodeId; - private long clusterStateVersion; - - private NodeMappingCreatedResponse() { - } - - public NodeMappingCreatedResponse(String index, String type, String nodeId, long clusterStateVersion) { - this.index = index; - this.type = type; - this.nodeId = nodeId; - this.clusterStateVersion = clusterStateVersion; - } - - public String index() { - return index; - } - - public String type() { - return type; - } - - public String nodeId() { - return nodeId; - } - - public long clusterStateVersion() { - return clusterStateVersion; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(index); - out.writeString(type); - out.writeString(nodeId); - out.writeVLong(clusterStateVersion); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - index = in.readString(); - type = in.readString(); - nodeId = in.readString(); - clusterStateVersion = in.readVLong(); - } - } -} diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index 412fd86a8cf..9ca59f86968 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -23,13 +23,14 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingClusterStateUpdateRequest; -import org.elasticsearch.action.support.master.MasterNodeOperationRequest; -import org.elasticsearch.cluster.*; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; -import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractComponent; @@ -38,7 +39,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; @@ -57,7 +57,7 @@ import static com.google.common.collect.Maps.newHashMap; import static org.elasticsearch.index.mapper.DocumentMapper.MergeFlags.mergeFlags; /** - * + * Service responsible for submitting mapping changes */ public class MetaDataMappingService extends AbstractComponent { @@ -65,16 +65,13 @@ public class MetaDataMappingService extends AbstractComponent { private final IndicesService indicesService; - private final NodeMappingCreatedAction mappingCreatedAction; - private final BlockingQueue refreshOrUpdateQueue = ConcurrentCollections.newBlockingQueue(); @Inject - public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeMappingCreatedAction mappingCreatedAction) { + public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService) { super(settings); this.clusterService = clusterService; this.indicesService = indicesService; - this.mappingCreatedAction = mappingCreatedAction; } static class MappingTask { @@ -99,9 +96,9 @@ public class MetaDataMappingService extends AbstractComponent { static class UpdateTask extends MappingTask { final String type; final CompressedString mappingSource; - final Listener listener; + final ClusterStateUpdateListener listener; - UpdateTask(String index, String indexUUID, String type, CompressedString mappingSource, Listener listener) { + UpdateTask(String index, String indexUUID, String type, CompressedString mappingSource, ClusterStateUpdateListener listener) { super(index, indexUUID); this.type = type; this.mappingSource = mappingSource; @@ -247,7 +244,7 @@ public class MetaDataMappingService extends AbstractComponent { } for (Object task : tasks) { if (task instanceof UpdateTask) { - ((UpdateTask) task).listener.onResponse(new Response(true)); + ((UpdateTask) task).listener.onResponse(new ClusterStateUpdateResponse(true)); } } } @@ -277,7 +274,7 @@ public class MetaDataMappingService extends AbstractComponent { }); } - public void updateMapping(final String index, final String indexUUID, final String type, final CompressedString mappingSource, final Listener listener) { + public void updateMapping(final String index, final String indexUUID, final String type, final CompressedString mappingSource, final ClusterStateUpdateListener listener) { refreshOrUpdateQueue.add(new UpdateTask(index, indexUUID, type, mappingSource, listener)); clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", Priority.HIGH, new ClusterStateUpdateTask() { @Override @@ -362,15 +359,33 @@ public class MetaDataMappingService extends AbstractComponent { }); } - public void putMapping(final PutRequest request, final Listener listener) { + public void putMapping(final PutMappingClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) { - clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", Priority.HIGH, new TimeoutClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]", Priority.HIGH, new AckedClusterStateUpdateTask() { - CountDownListener countDownListener; // used to count ack responses before confirming operation is complete + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked(@Nullable Throwable t) { + listener.onResponse(new ClusterStateUpdateResponse(true)); + } + + @Override + public void onAckTimeout() { + listener.onResponse(new ClusterStateUpdateResponse(false)); + } + + @Override + public TimeValue ackTimeout() { + return request.ackTimeout(); + } @Override public TimeValue timeout() { - return request.masterTimeout; + return request.masterNodeTimeout(); } @Override @@ -382,17 +397,17 @@ public class MetaDataMappingService extends AbstractComponent { public ClusterState execute(final ClusterState currentState) throws Exception { List indicesToClose = Lists.newArrayList(); try { - if (request.indices.length == 0) { + if (request.indices().length == 0) { throw new IndexMissingException(new Index("_all")); } - for (String index : request.indices) { + for (String index : request.indices()) { if (!currentState.metaData().hasIndex(index)) { throw new IndexMissingException(new Index(index)); } } // pre create indices here and add mappings to them so we can merge the mappings here if needed - for (String index : request.indices) { + for (String index : request.indices()) { if (indicesService.hasIndex(index)) { continue; } @@ -404,29 +419,29 @@ public class MetaDataMappingService extends AbstractComponent { indexService.mapperService().merge(MapperService.DEFAULT_MAPPING, indexMetaData.mappings().get(MapperService.DEFAULT_MAPPING).source(), false); } // only add the current relevant mapping (if exists) - if (indexMetaData.mappings().containsKey(request.mappingType)) { - indexService.mapperService().merge(request.mappingType, indexMetaData.mappings().get(request.mappingType).source(), false); + if (indexMetaData.mappings().containsKey(request.type())) { + indexService.mapperService().merge(request.type(), indexMetaData.mappings().get(request.type()).source(), false); } } Map newMappers = newHashMap(); Map existingMappers = newHashMap(); - for (String index : request.indices) { + for (String index : request.indices()) { IndexService indexService = indicesService.indexService(index); if (indexService != null) { // try and parse it (no need to add it here) so we can bail early in case of parsing exception DocumentMapper newMapper; - DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.mappingType); - if (MapperService.DEFAULT_MAPPING.equals(request.mappingType)) { + DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.type()); + if (MapperService.DEFAULT_MAPPING.equals(request.type())) { // _default_ types do not go through merging, but we do test the new settings. Also don't apply the old default - newMapper = indexService.mapperService().parse(request.mappingType, new CompressedString(request.mappingSource), false); + newMapper = indexService.mapperService().parse(request.type(), new CompressedString(request.source()), false); } else { - newMapper = indexService.mapperService().parse(request.mappingType, new CompressedString(request.mappingSource)); + newMapper = indexService.mapperService().parse(request.type(), new CompressedString(request.source())); if (existingMapper != null) { // first, simulate DocumentMapper.MergeResult mergeResult = existingMapper.merge(newMapper, mergeFlags().simulate(true)); // if we have conflicts, and we are not supposed to ignore them, throw an exception - if (!request.ignoreConflicts && mergeResult.hasConflicts()) { + if (!request.ignoreConflicts() && mergeResult.hasConflicts()) { throw new MergeMappingException(mergeResult.conflicts()); } } @@ -442,7 +457,7 @@ public class MetaDataMappingService extends AbstractComponent { } } - String mappingType = request.mappingType; + String mappingType = request.type(); if (mappingType == null) { mappingType = newMappers.values().iterator().next().type(); } else if (!mappingType.equals(newMappers.values().iterator().next().type())) { @@ -489,12 +504,11 @@ public class MetaDataMappingService extends AbstractComponent { if (mappings.isEmpty()) { // no changes, return - listener.onResponse(new Response(true)); return currentState; } MetaData.Builder builder = MetaData.builder(currentState.metaData()); - for (String indexName : request.indices) { + for (String indexName : request.indices()) { IndexMetaData indexMetaData = currentState.metaData().index(indexName); if (indexMetaData == null) { throw new IndexMissingException(new Index(indexName)); @@ -505,26 +519,7 @@ public class MetaDataMappingService extends AbstractComponent { } } - ClusterState updatedState = ClusterState.builder(currentState).metaData(builder).build(); - - int counter = 1; // we want to wait on the master node to apply it on its cluster state - // also wait for nodes that actually have the index created on them to apply the mappings internally - for (String index : request.indices) { - IndexRoutingTable indexRoutingTable = updatedState.routingTable().index(index); - if (indexRoutingTable != null) { - counter += indexRoutingTable.numberOfNodesShardsAreAllocatedOn(updatedState.nodes().masterNodeId()); - } - } - - logger.debug("Expecting {} mapping created responses for other nodes", counter - 1); - - // TODO: adding one to the version is based on knowledge on how the parent class will increment the version - // move this to the base class or add another callback before publishing the new cluster state so we - // capture its version. - countDownListener = new CountDownListener(counter, currentState.version() + 1, listener); - mappingCreatedAction.add(countDownListener, request.timeout); - - return updatedState; + return ClusterState.builder(currentState).metaData(builder).build(); } finally { for (String index : indicesToClose) { indicesService.removeIndex(index, "created for mapping processing"); @@ -534,107 +529,8 @@ public class MetaDataMappingService extends AbstractComponent { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (countDownListener != null) { - // the master has applied it on its cluster state - countDownListener.decrementCounter(); - } + } }); } - - public static interface Listener { - - void onResponse(Response response); - - void onFailure(Throwable t); - } - - public static class PutRequest { - - final String[] indices; - - final String mappingType; - - final String mappingSource; - - boolean ignoreConflicts = false; - - TimeValue timeout = TimeValue.timeValueSeconds(10); - TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT; - - public PutRequest(String[] indices, String mappingType, String mappingSource) { - this.indices = indices; - this.mappingType = mappingType; - this.mappingSource = mappingSource; - } - - public PutRequest ignoreConflicts(boolean ignoreConflicts) { - this.ignoreConflicts = ignoreConflicts; - return this; - } - - public PutRequest timeout(TimeValue timeout) { - this.timeout = timeout; - return this; - } - - public PutRequest masterTimeout(TimeValue masterTimeout) { - this.masterTimeout = masterTimeout; - return this; - } - } - - public static class Response { - private final boolean acknowledged; - - public Response(boolean acknowledged) { - this.acknowledged = acknowledged; - } - - public boolean acknowledged() { - return acknowledged; - } - } - - private class CountDownListener implements NodeMappingCreatedAction.Listener { - - private final CountDown countDown; - private final Listener listener; - private final long minClusterStateVersion; - - /** - * @param countDown initial counter value - * @param minClusterStateVersion the minimum cluster state version for which accept responses - * @param listener listener to call when counter reaches 0. - */ - public CountDownListener(int countDown, long minClusterStateVersion, Listener listener) { - this.countDown = new CountDown(countDown); - this.listener = listener; - this.minClusterStateVersion = minClusterStateVersion; - } - - @Override - public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) { - if (response.clusterStateVersion() < minClusterStateVersion) { - return; - } - decrementCounter(); - - } - - public void decrementCounter() { - if (countDown.countDown()) { - mappingCreatedAction.remove(this); - listener.onResponse(new Response(true)); - } - } - - @Override - public void onTimeout() { - if (countDown.fastForward()) { - mappingCreatedAction.remove(this); - listener.onResponse(new Response(false)); - } - } - } } diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index c92cb9120ce..b14c83d277c 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -30,7 +30,6 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; -import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.AliasMetaData; @@ -89,7 +88,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent(index, mappingType))) { seenMappings.put(new Tuple(index, mappingType), true); } @@ -410,8 +407,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent() { - @Override - public void onResponse(PutMappingResponse response) { - try { - XContentBuilder builder = RestXContentBuilder.restContentBuilder(request); - builder.startObject() - .field("ok", true) - .field("acknowledged", response.isAcknowledged()); - builder.endObject(); - channel.sendResponse(new XContentRestResponse(request, OK, builder)); - } catch (IOException e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(new XContentThrowableRestResponse(request, e)); - } catch (IOException e1) { - logger.error("Failed to send failure response", e1); - } - } - }); + client.admin().indices().putMapping(putMappingRequest, new AcknowledgedRestResponseActionListener(request, channel, logger)); } } diff --git a/src/test/java/org/elasticsearch/cluster/ack/AckTests.java b/src/test/java/org/elasticsearch/cluster/ack/AckTests.java index a4bbf4ac0c6..516f68e9df0 100644 --- a/src/test/java/org/elasticsearch/cluster/ack/AckTests.java +++ b/src/test/java/org/elasticsearch/cluster/ack/AckTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse; import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse; @@ -451,6 +452,27 @@ public class AckTests extends ElasticsearchIntegrationTest { assertThat(openIndexResponse.isAcknowledged(), equalTo(false)); } + @Test + public void testPutMappingAcknowledgement() { + createIndex("test"); + ensureGreen(); + + assertAcked(client().admin().indices().preparePutMapping("test").setType("test").setSource("field", "type=string,index=not_analyzed")); + + for (Client client : clients()) { + assertThat(getLocalClusterState(client).metaData().indices().get("test").mapping("test"), notNullValue()); + } + } + + @Test + public void testPutMappingNoAcknowledgement() { + createIndex("test"); + ensureGreen(); + + PutMappingResponse putMappingResponse = client().admin().indices().preparePutMapping("test").setType("test").setSource("field", "type=string,index=not_analyzed").setTimeout("0s").get(); + assertThat(putMappingResponse.isAcknowledged(), equalTo(false)); + } + private static ClusterState getLocalClusterState(Client client) { return client.admin().cluster().prepareState().setLocal(true).get().getState(); } diff --git a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 57fad0dde06..3fa71527c60 100644 --- a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -75,15 +75,6 @@ public class ElasticsearchAssertions { assertVersionSerializable(response); } - public static void assertAcked(PutMappingRequestBuilder builder) { - assertAcked(builder.get()); - } - - private static void assertAcked(PutMappingResponse response) { - assertThat("Put Mapping failed - not acked", response.isAcknowledged(), equalTo(true)); - assertVersionSerializable(response); - } - public static void assertAcked(DeleteIndexRequestBuilder builder) { assertAcked(builder.get()); }