diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingClusterStateUpdateRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingClusterStateUpdateRequest.java new file mode 100644 index 00000000000..a0d212236f0 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingClusterStateUpdateRequest.java @@ -0,0 +1,65 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.mapping.put; + +import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest; + +/** + * Cluster state update request that allows to put a mapping + */ +public class PutMappingClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest { + + private String type; + + private String source; + + private boolean ignoreConflicts = false; + + PutMappingClusterStateUpdateRequest() { + + } + + public String type() { + return type; + } + + public PutMappingClusterStateUpdateRequest type(String type) { + this.type = type; + return this; + } + + public String source() { + return source; + } + + public PutMappingClusterStateUpdateRequest source(String source) { + this.source = source; + return this; + } + + public boolean ignoreConflicts() { + return ignoreConflicts; + } + + public PutMappingClusterStateUpdateRequest ignoreConflicts(boolean ignoreConflicts) { + this.ignoreConflicts = ignoreConflicts; + return this; + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequest.java index 9eb3352d999..6c2667ef179 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequest.java @@ -23,21 +23,18 @@ import com.carrotsearch.hppc.ObjectOpenHashSet; import org.elasticsearch.ElasticSearchGenerationException; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.master.MasterNodeOperationRequest; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; import java.util.Map; -import java.util.concurrent.TimeUnit; import static org.elasticsearch.action.ValidateActions.addValidationError; -import static org.elasticsearch.common.unit.TimeValue.readTimeValue; /** * Puts mapping definition registered under a specific type into one or more indices. Best created with @@ -51,7 +48,7 @@ import static org.elasticsearch.common.unit.TimeValue.readTimeValue; * @see org.elasticsearch.client.IndicesAdminClient#putMapping(PutMappingRequest) * @see PutMappingResponse */ -public class PutMappingRequest extends MasterNodeOperationRequest { +public class PutMappingRequest extends AcknowledgedRequest { private static ObjectOpenHashSet RESERVED_FIELDS = ObjectOpenHashSet.from( "_uid", "_id", "_type", "_source", "_all", "_analyzer", "_boost", "_parent", "_routing", "_index", @@ -64,8 +61,6 @@ public class PutMappingRequest extends MasterNodeOperationRequest10s. - */ - TimeValue timeout() { - return timeout; - } - - /** - * Timeout to wait till the put mapping gets acknowledged of all current cluster nodes. Defaults to - * 10s. - */ - public PutMappingRequest timeout(TimeValue timeout) { - this.timeout = timeout; - return this; - } - - /** - * Timeout to wait till the put mapping gets acknowledged of all current cluster nodes. Defaults to - * 10s. - */ - public PutMappingRequest timeout(String timeout) { - return timeout(TimeValue.parseTimeValue(timeout, null)); - } - /** * If there is already a mapping definition registered against the type, then it will be merged. If there are * elements that can't be merged are detected, the request will be rejected unless the @@ -274,7 +245,7 @@ public class PutMappingRequest extends MasterNodeOperationRequest { +public class PutMappingRequestBuilder extends AcknowledgedRequestBuilder { public PutMappingRequestBuilder(IndicesAdminClient indicesClient) { super((InternalIndicesAdminClient) indicesClient, new PutMappingRequest()); @@ -83,24 +82,6 @@ public class PutMappingRequestBuilder extends MasterNodeOperationRequestBuilder< return this; } - /** - * Timeout to wait till the put mapping gets acknowledged of all current cluster nodes. Defaults to - * 10s. - */ - public PutMappingRequestBuilder setTimeout(TimeValue timeout) { - request.timeout(timeout); - return this; - } - - /** - * Timeout to wait till the put mapping gets acknowledged of all current cluster nodes. Defaults to - * 10s. - */ - public PutMappingRequestBuilder setTimeout(String timeout) { - request.timeout(timeout); - return this; - } - /** * If there is already a mapping definition registered against the type, then it will be merged. If there are * elements that can't be merged are detected, the request will be rejected unless the diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingResponse.java index 58edc44ab02..33160862ab5 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingResponse.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.admin.indices.mapping.put; -import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -28,35 +28,25 @@ import java.io.IOException; /** * The response of put mapping operation. */ -public class PutMappingResponse extends ActionResponse { - - private boolean acknowledged; +public class PutMappingResponse extends AcknowledgedResponse { PutMappingResponse() { } PutMappingResponse(boolean acknowledged) { - this.acknowledged = acknowledged; - } - - /** - * Has the put mapping creation been acknowledged by all current cluster nodes within the - * provided {@link PutMappingRequest#timeout(org.elasticsearch.common.unit.TimeValue)}. - */ - public boolean isAcknowledged() { - return acknowledged; + super(acknowledged); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - acknowledged = in.readBoolean(); + readAcknowledged(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeBoolean(acknowledged); + writeAcknowledged(out); } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java index e12a4a5a922..270ca5a20d8 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java @@ -24,6 +24,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.MetaDataMappingService; @@ -81,10 +83,16 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio @Override protected void masterOperation(final PutMappingRequest request, final ClusterState state, final ActionListener listener) throws ElasticSearchException { - metaDataMappingService.putMapping(new MetaDataMappingService.PutRequest(request.indices(), request.type(), request.source()).ignoreConflicts(request.ignoreConflicts()).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() { + PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest() + .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) + .indices(request.indices()).type(request.type()) + .source(request.source()).ignoreConflicts(request.ignoreConflicts()); + + metaDataMappingService.putMapping(updateRequest, new ClusterStateUpdateListener() { + @Override - public void onResponse(MetaDataMappingService.Response response) { - listener.onResponse(new PutMappingResponse(response.acknowledged())); + public void onResponse(ClusterStateUpdateResponse response) { + listener.onResponse(new PutMappingResponse(response.isAcknowledged())); } @Override diff --git a/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 491deadf79a..5edfa236e04 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -73,7 +73,6 @@ public class ClusterModule extends AbstractModule implements SpawnModules { bind(ShardStateAction.class).asEagerSingleton(); bind(NodeIndexCreatedAction.class).asEagerSingleton(); bind(NodeIndexDeletedAction.class).asEagerSingleton(); - bind(NodeMappingCreatedAction.class).asEagerSingleton(); bind(NodeMappingRefreshAction.class).asEagerSingleton(); bind(MappingUpdatedAction.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index f8062700e57..f8cad2b88f8 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -28,6 +28,8 @@ import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataMappingService; import org.elasticsearch.common.compress.CompressedString; @@ -78,9 +80,9 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction listener) throws ElasticSearchException { - metaDataMappingService.updateMapping(request.index(), request.indexUUID(), request.type(), request.mappingSource(), new MetaDataMappingService.Listener() { + metaDataMappingService.updateMapping(request.index(), request.indexUUID(), request.type(), request.mappingSource(), new ClusterStateUpdateListener() { @Override - public void onResponse(MetaDataMappingService.Response response) { + public void onResponse(ClusterStateUpdateResponse response) { listener.onResponse(new MappingUpdatedResponse()); } diff --git a/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java deleted file mode 100644 index d39c6e88ecc..00000000000 --- a/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cluster.action.index; - -import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - * - */ -public class NodeMappingCreatedAction extends AbstractComponent { - - private final ThreadPool threadPool; - private final TransportService transportService; - private final List listeners = new CopyOnWriteArrayList(); - - @Inject - public NodeMappingCreatedAction(Settings settings, ThreadPool threadPool, TransportService transportService) { - super(settings); - this.threadPool = threadPool; - this.transportService = transportService; - transportService.registerHandler(NodeMappingCreatedTransportHandler.ACTION, new NodeMappingCreatedTransportHandler()); - } - - public void add(final Listener listener, TimeValue timeout) { - listeners.add(listener); - threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() { - @Override - public void run() { - boolean removed = listeners.remove(listener); - if (removed) { - listener.onTimeout(); - } - } - }); - } - - public void remove(Listener listener) { - listeners.remove(listener); - } - - public void nodeMappingCreated(final ClusterState state, final NodeMappingCreatedResponse response) throws ElasticSearchException { - logger.debug("Sending mapping created for index {}, type {} (cluster state version: {})", response.index, response.type, response.clusterStateVersion); - if (state.nodes().localNodeMaster()) { - threadPool.generic().execute(new Runnable() { - @Override - public void run() { - innerNodeIndexCreated(response); - } - }); - } else { - transportService.sendRequest(state.nodes().masterNode(), - NodeMappingCreatedTransportHandler.ACTION, response, EmptyTransportResponseHandler.INSTANCE_SAME); - } - } - - private void innerNodeIndexCreated(NodeMappingCreatedResponse response) { - for (Listener listener : listeners) { - listener.onNodeMappingCreated(response); - } - } - - - public static interface Listener { - void onNodeMappingCreated(NodeMappingCreatedResponse response); - - void onTimeout(); - } - - private class NodeMappingCreatedTransportHandler extends BaseTransportRequestHandler { - - static final String ACTION = "cluster/nodeMappingCreated"; - - @Override - public NodeMappingCreatedResponse newInstance() { - return new NodeMappingCreatedResponse(); - } - - @Override - public void messageReceived(NodeMappingCreatedResponse response, TransportChannel channel) throws Exception { - innerNodeIndexCreated(response); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - } - - public static class NodeMappingCreatedResponse extends TransportRequest { - - private String index; - private String type; - private String nodeId; - private long clusterStateVersion; - - private NodeMappingCreatedResponse() { - } - - public NodeMappingCreatedResponse(String index, String type, String nodeId, long clusterStateVersion) { - this.index = index; - this.type = type; - this.nodeId = nodeId; - this.clusterStateVersion = clusterStateVersion; - } - - public String index() { - return index; - } - - public String type() { - return type; - } - - public String nodeId() { - return nodeId; - } - - public long clusterStateVersion() { - return clusterStateVersion; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(index); - out.writeString(type); - out.writeString(nodeId); - out.writeVLong(clusterStateVersion); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - index = in.readString(); - type = in.readString(); - nodeId = in.readString(); - clusterStateVersion = in.readVLong(); - } - } -} diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index 412fd86a8cf..9ca59f86968 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -23,13 +23,14 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingClusterStateUpdateRequest; -import org.elasticsearch.action.support.master.MasterNodeOperationRequest; -import org.elasticsearch.cluster.*; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; -import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractComponent; @@ -38,7 +39,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; @@ -57,7 +57,7 @@ import static com.google.common.collect.Maps.newHashMap; import static org.elasticsearch.index.mapper.DocumentMapper.MergeFlags.mergeFlags; /** - * + * Service responsible for submitting mapping changes */ public class MetaDataMappingService extends AbstractComponent { @@ -65,16 +65,13 @@ public class MetaDataMappingService extends AbstractComponent { private final IndicesService indicesService; - private final NodeMappingCreatedAction mappingCreatedAction; - private final BlockingQueue refreshOrUpdateQueue = ConcurrentCollections.newBlockingQueue(); @Inject - public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeMappingCreatedAction mappingCreatedAction) { + public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService) { super(settings); this.clusterService = clusterService; this.indicesService = indicesService; - this.mappingCreatedAction = mappingCreatedAction; } static class MappingTask { @@ -99,9 +96,9 @@ public class MetaDataMappingService extends AbstractComponent { static class UpdateTask extends MappingTask { final String type; final CompressedString mappingSource; - final Listener listener; + final ClusterStateUpdateListener listener; - UpdateTask(String index, String indexUUID, String type, CompressedString mappingSource, Listener listener) { + UpdateTask(String index, String indexUUID, String type, CompressedString mappingSource, ClusterStateUpdateListener listener) { super(index, indexUUID); this.type = type; this.mappingSource = mappingSource; @@ -247,7 +244,7 @@ public class MetaDataMappingService extends AbstractComponent { } for (Object task : tasks) { if (task instanceof UpdateTask) { - ((UpdateTask) task).listener.onResponse(new Response(true)); + ((UpdateTask) task).listener.onResponse(new ClusterStateUpdateResponse(true)); } } } @@ -277,7 +274,7 @@ public class MetaDataMappingService extends AbstractComponent { }); } - public void updateMapping(final String index, final String indexUUID, final String type, final CompressedString mappingSource, final Listener listener) { + public void updateMapping(final String index, final String indexUUID, final String type, final CompressedString mappingSource, final ClusterStateUpdateListener listener) { refreshOrUpdateQueue.add(new UpdateTask(index, indexUUID, type, mappingSource, listener)); clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", Priority.HIGH, new ClusterStateUpdateTask() { @Override @@ -362,15 +359,33 @@ public class MetaDataMappingService extends AbstractComponent { }); } - public void putMapping(final PutRequest request, final Listener listener) { + public void putMapping(final PutMappingClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) { - clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", Priority.HIGH, new TimeoutClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]", Priority.HIGH, new AckedClusterStateUpdateTask() { - CountDownListener countDownListener; // used to count ack responses before confirming operation is complete + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked(@Nullable Throwable t) { + listener.onResponse(new ClusterStateUpdateResponse(true)); + } + + @Override + public void onAckTimeout() { + listener.onResponse(new ClusterStateUpdateResponse(false)); + } + + @Override + public TimeValue ackTimeout() { + return request.ackTimeout(); + } @Override public TimeValue timeout() { - return request.masterTimeout; + return request.masterNodeTimeout(); } @Override @@ -382,17 +397,17 @@ public class MetaDataMappingService extends AbstractComponent { public ClusterState execute(final ClusterState currentState) throws Exception { List indicesToClose = Lists.newArrayList(); try { - if (request.indices.length == 0) { + if (request.indices().length == 0) { throw new IndexMissingException(new Index("_all")); } - for (String index : request.indices) { + for (String index : request.indices()) { if (!currentState.metaData().hasIndex(index)) { throw new IndexMissingException(new Index(index)); } } // pre create indices here and add mappings to them so we can merge the mappings here if needed - for (String index : request.indices) { + for (String index : request.indices()) { if (indicesService.hasIndex(index)) { continue; } @@ -404,29 +419,29 @@ public class MetaDataMappingService extends AbstractComponent { indexService.mapperService().merge(MapperService.DEFAULT_MAPPING, indexMetaData.mappings().get(MapperService.DEFAULT_MAPPING).source(), false); } // only add the current relevant mapping (if exists) - if (indexMetaData.mappings().containsKey(request.mappingType)) { - indexService.mapperService().merge(request.mappingType, indexMetaData.mappings().get(request.mappingType).source(), false); + if (indexMetaData.mappings().containsKey(request.type())) { + indexService.mapperService().merge(request.type(), indexMetaData.mappings().get(request.type()).source(), false); } } Map newMappers = newHashMap(); Map existingMappers = newHashMap(); - for (String index : request.indices) { + for (String index : request.indices()) { IndexService indexService = indicesService.indexService(index); if (indexService != null) { // try and parse it (no need to add it here) so we can bail early in case of parsing exception DocumentMapper newMapper; - DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.mappingType); - if (MapperService.DEFAULT_MAPPING.equals(request.mappingType)) { + DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.type()); + if (MapperService.DEFAULT_MAPPING.equals(request.type())) { // _default_ types do not go through merging, but we do test the new settings. Also don't apply the old default - newMapper = indexService.mapperService().parse(request.mappingType, new CompressedString(request.mappingSource), false); + newMapper = indexService.mapperService().parse(request.type(), new CompressedString(request.source()), false); } else { - newMapper = indexService.mapperService().parse(request.mappingType, new CompressedString(request.mappingSource)); + newMapper = indexService.mapperService().parse(request.type(), new CompressedString(request.source())); if (existingMapper != null) { // first, simulate DocumentMapper.MergeResult mergeResult = existingMapper.merge(newMapper, mergeFlags().simulate(true)); // if we have conflicts, and we are not supposed to ignore them, throw an exception - if (!request.ignoreConflicts && mergeResult.hasConflicts()) { + if (!request.ignoreConflicts() && mergeResult.hasConflicts()) { throw new MergeMappingException(mergeResult.conflicts()); } } @@ -442,7 +457,7 @@ public class MetaDataMappingService extends AbstractComponent { } } - String mappingType = request.mappingType; + String mappingType = request.type(); if (mappingType == null) { mappingType = newMappers.values().iterator().next().type(); } else if (!mappingType.equals(newMappers.values().iterator().next().type())) { @@ -489,12 +504,11 @@ public class MetaDataMappingService extends AbstractComponent { if (mappings.isEmpty()) { // no changes, return - listener.onResponse(new Response(true)); return currentState; } MetaData.Builder builder = MetaData.builder(currentState.metaData()); - for (String indexName : request.indices) { + for (String indexName : request.indices()) { IndexMetaData indexMetaData = currentState.metaData().index(indexName); if (indexMetaData == null) { throw new IndexMissingException(new Index(indexName)); @@ -505,26 +519,7 @@ public class MetaDataMappingService extends AbstractComponent { } } - ClusterState updatedState = ClusterState.builder(currentState).metaData(builder).build(); - - int counter = 1; // we want to wait on the master node to apply it on its cluster state - // also wait for nodes that actually have the index created on them to apply the mappings internally - for (String index : request.indices) { - IndexRoutingTable indexRoutingTable = updatedState.routingTable().index(index); - if (indexRoutingTable != null) { - counter += indexRoutingTable.numberOfNodesShardsAreAllocatedOn(updatedState.nodes().masterNodeId()); - } - } - - logger.debug("Expecting {} mapping created responses for other nodes", counter - 1); - - // TODO: adding one to the version is based on knowledge on how the parent class will increment the version - // move this to the base class or add another callback before publishing the new cluster state so we - // capture its version. - countDownListener = new CountDownListener(counter, currentState.version() + 1, listener); - mappingCreatedAction.add(countDownListener, request.timeout); - - return updatedState; + return ClusterState.builder(currentState).metaData(builder).build(); } finally { for (String index : indicesToClose) { indicesService.removeIndex(index, "created for mapping processing"); @@ -534,107 +529,8 @@ public class MetaDataMappingService extends AbstractComponent { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (countDownListener != null) { - // the master has applied it on its cluster state - countDownListener.decrementCounter(); - } + } }); } - - public static interface Listener { - - void onResponse(Response response); - - void onFailure(Throwable t); - } - - public static class PutRequest { - - final String[] indices; - - final String mappingType; - - final String mappingSource; - - boolean ignoreConflicts = false; - - TimeValue timeout = TimeValue.timeValueSeconds(10); - TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT; - - public PutRequest(String[] indices, String mappingType, String mappingSource) { - this.indices = indices; - this.mappingType = mappingType; - this.mappingSource = mappingSource; - } - - public PutRequest ignoreConflicts(boolean ignoreConflicts) { - this.ignoreConflicts = ignoreConflicts; - return this; - } - - public PutRequest timeout(TimeValue timeout) { - this.timeout = timeout; - return this; - } - - public PutRequest masterTimeout(TimeValue masterTimeout) { - this.masterTimeout = masterTimeout; - return this; - } - } - - public static class Response { - private final boolean acknowledged; - - public Response(boolean acknowledged) { - this.acknowledged = acknowledged; - } - - public boolean acknowledged() { - return acknowledged; - } - } - - private class CountDownListener implements NodeMappingCreatedAction.Listener { - - private final CountDown countDown; - private final Listener listener; - private final long minClusterStateVersion; - - /** - * @param countDown initial counter value - * @param minClusterStateVersion the minimum cluster state version for which accept responses - * @param listener listener to call when counter reaches 0. - */ - public CountDownListener(int countDown, long minClusterStateVersion, Listener listener) { - this.countDown = new CountDown(countDown); - this.listener = listener; - this.minClusterStateVersion = minClusterStateVersion; - } - - @Override - public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) { - if (response.clusterStateVersion() < minClusterStateVersion) { - return; - } - decrementCounter(); - - } - - public void decrementCounter() { - if (countDown.countDown()) { - mappingCreatedAction.remove(this); - listener.onResponse(new Response(true)); - } - } - - @Override - public void onTimeout() { - if (countDown.fastForward()) { - mappingCreatedAction.remove(this); - listener.onResponse(new Response(false)); - } - } - } } diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index c92cb9120ce..b14c83d277c 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -30,7 +30,6 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; -import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.AliasMetaData; @@ -89,7 +88,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent(index, mappingType))) { seenMappings.put(new Tuple(index, mappingType), true); } @@ -410,8 +407,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent() { - @Override - public void onResponse(PutMappingResponse response) { - try { - XContentBuilder builder = RestXContentBuilder.restContentBuilder(request); - builder.startObject() - .field("ok", true) - .field("acknowledged", response.isAcknowledged()); - builder.endObject(); - channel.sendResponse(new XContentRestResponse(request, OK, builder)); - } catch (IOException e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(new XContentThrowableRestResponse(request, e)); - } catch (IOException e1) { - logger.error("Failed to send failure response", e1); - } - } - }); + client.admin().indices().putMapping(putMappingRequest, new AcknowledgedRestResponseActionListener(request, channel, logger)); } } diff --git a/src/test/java/org/elasticsearch/cluster/ack/AckTests.java b/src/test/java/org/elasticsearch/cluster/ack/AckTests.java index a4bbf4ac0c6..516f68e9df0 100644 --- a/src/test/java/org/elasticsearch/cluster/ack/AckTests.java +++ b/src/test/java/org/elasticsearch/cluster/ack/AckTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse; import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse; @@ -451,6 +452,27 @@ public class AckTests extends ElasticsearchIntegrationTest { assertThat(openIndexResponse.isAcknowledged(), equalTo(false)); } + @Test + public void testPutMappingAcknowledgement() { + createIndex("test"); + ensureGreen(); + + assertAcked(client().admin().indices().preparePutMapping("test").setType("test").setSource("field", "type=string,index=not_analyzed")); + + for (Client client : clients()) { + assertThat(getLocalClusterState(client).metaData().indices().get("test").mapping("test"), notNullValue()); + } + } + + @Test + public void testPutMappingNoAcknowledgement() { + createIndex("test"); + ensureGreen(); + + PutMappingResponse putMappingResponse = client().admin().indices().preparePutMapping("test").setType("test").setSource("field", "type=string,index=not_analyzed").setTimeout("0s").get(); + assertThat(putMappingResponse.isAcknowledged(), equalTo(false)); + } + private static ClusterState getLocalClusterState(Client client) { return client.admin().cluster().prepareState().setLocal(true).get().getState(); } diff --git a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 57fad0dde06..3fa71527c60 100644 --- a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -75,15 +75,6 @@ public class ElasticsearchAssertions { assertVersionSerializable(response); } - public static void assertAcked(PutMappingRequestBuilder builder) { - assertAcked(builder.get()); - } - - private static void assertAcked(PutMappingResponse response) { - assertThat("Put Mapping failed - not acked", response.isAcknowledged(), equalTo(true)); - assertVersionSerializable(response); - } - public static void assertAcked(DeleteIndexRequestBuilder builder) { assertAcked(builder.get()); }