Refactored put mapping api to make use of the new recently introduced generic ack mechanism

Note: we were previously waiting for ack only from all nodes that contain shards for the indices that the mapping updatewas applied to. This change introduces a wait for ack from all nodes, consitent with other api as the ack is meant more on the cluster state itself, which is held by all nodes and needs to be updated on all nodes anyway.

Closes #4228
This commit is contained in:
Luca Cavanna 2013-10-29 18:29:55 +01:00
parent 0ec3eaf53d
commit 630641f292
13 changed files with 169 additions and 452 deletions

View File

@ -0,0 +1,65 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.mapping.put;
import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;
/**
* Cluster state update request that allows to put a mapping
*/
public class PutMappingClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<PutMappingClusterStateUpdateRequest> {
private String type;
private String source;
private boolean ignoreConflicts = false;
PutMappingClusterStateUpdateRequest() {
}
public String type() {
return type;
}
public PutMappingClusterStateUpdateRequest type(String type) {
this.type = type;
return this;
}
public String source() {
return source;
}
public PutMappingClusterStateUpdateRequest source(String source) {
this.source = source;
return this;
}
public boolean ignoreConflicts() {
return ignoreConflicts;
}
public PutMappingClusterStateUpdateRequest ignoreConflicts(boolean ignoreConflicts) {
this.ignoreConflicts = ignoreConflicts;
return this;
}
}

View File

@ -23,21 +23,18 @@ import com.carrotsearch.hppc.ObjectOpenHashSet;
import org.elasticsearch.ElasticSearchGenerationException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
/**
* Puts mapping definition registered under a specific type into one or more indices. Best created with
@ -51,7 +48,7 @@ import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
* @see org.elasticsearch.client.IndicesAdminClient#putMapping(PutMappingRequest)
* @see PutMappingResponse
*/
public class PutMappingRequest extends MasterNodeOperationRequest<PutMappingRequest> {
public class PutMappingRequest extends AcknowledgedRequest<PutMappingRequest> {
private static ObjectOpenHashSet<String> RESERVED_FIELDS = ObjectOpenHashSet.from(
"_uid", "_id", "_type", "_source", "_all", "_analyzer", "_boost", "_parent", "_routing", "_index",
@ -64,8 +61,6 @@ public class PutMappingRequest extends MasterNodeOperationRequest<PutMappingRequ
private String source;
private TimeValue timeout = new TimeValue(10, TimeUnit.SECONDS);
private boolean ignoreConflicts = false;
PutMappingRequest() {
@ -206,6 +201,7 @@ public class PutMappingRequest extends MasterNodeOperationRequest<PutMappingRequ
/**
* The mapping source definition.
*/
@SuppressWarnings("unchecked")
public PutMappingRequest source(Map mappingSource) {
try {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
@ -224,31 +220,6 @@ public class PutMappingRequest extends MasterNodeOperationRequest<PutMappingRequ
return this;
}
/**
* Timeout to wait till the put mapping gets acknowledged of all current cluster nodes. Defaults to
* <tt>10s</tt>.
*/
TimeValue timeout() {
return timeout;
}
/**
* Timeout to wait till the put mapping gets acknowledged of all current cluster nodes. Defaults to
* <tt>10s</tt>.
*/
public PutMappingRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
/**
* Timeout to wait till the put mapping gets acknowledged of all current cluster nodes. Defaults to
* <tt>10s</tt>.
*/
public PutMappingRequest timeout(String timeout) {
return timeout(TimeValue.parseTimeValue(timeout, null));
}
/**
* If there is already a mapping definition registered against the type, then it will be merged. If there are
* elements that can't be merged are detected, the request will be rejected unless the
@ -274,7 +245,7 @@ public class PutMappingRequest extends MasterNodeOperationRequest<PutMappingRequ
indices = in.readStringArray();
type = in.readOptionalString();
source = in.readString();
timeout = readTimeValue(in);
readTimeout(in);
ignoreConflicts = in.readBoolean();
}
@ -284,7 +255,7 @@ public class PutMappingRequest extends MasterNodeOperationRequest<PutMappingRequ
out.writeStringArrayNullable(indices);
out.writeOptionalString(type);
out.writeString(source);
timeout.writeTo(out);
writeTimeout(out);
out.writeBoolean(ignoreConflicts);
}
}

View File

@ -20,18 +20,17 @@
package org.elasticsearch.action.admin.indices.mapping.put;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.internal.InternalIndicesAdminClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.util.Map;
/**
*
* Builder for a put mapping request
*/
public class PutMappingRequestBuilder extends MasterNodeOperationRequestBuilder<PutMappingRequest, PutMappingResponse, PutMappingRequestBuilder> {
public class PutMappingRequestBuilder extends AcknowledgedRequestBuilder<PutMappingRequest, PutMappingResponse, PutMappingRequestBuilder> {
public PutMappingRequestBuilder(IndicesAdminClient indicesClient) {
super((InternalIndicesAdminClient) indicesClient, new PutMappingRequest());
@ -83,24 +82,6 @@ public class PutMappingRequestBuilder extends MasterNodeOperationRequestBuilder<
return this;
}
/**
* Timeout to wait till the put mapping gets acknowledged of all current cluster nodes. Defaults to
* <tt>10s</tt>.
*/
public PutMappingRequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return this;
}
/**
* Timeout to wait till the put mapping gets acknowledged of all current cluster nodes. Defaults to
* <tt>10s</tt>.
*/
public PutMappingRequestBuilder setTimeout(String timeout) {
request.timeout(timeout);
return this;
}
/**
* If there is already a mapping definition registered against the type, then it will be merged. If there are
* elements that can't be merged are detected, the request will be rejected unless the

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.admin.indices.mapping.put;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -28,35 +28,25 @@ import java.io.IOException;
/**
* The response of put mapping operation.
*/
public class PutMappingResponse extends ActionResponse {
private boolean acknowledged;
public class PutMappingResponse extends AcknowledgedResponse {
PutMappingResponse() {
}
PutMappingResponse(boolean acknowledged) {
this.acknowledged = acknowledged;
}
/**
* Has the put mapping creation been acknowledged by all current cluster nodes within the
* provided {@link PutMappingRequest#timeout(org.elasticsearch.common.unit.TimeValue)}.
*/
public boolean isAcknowledged() {
return acknowledged;
super(acknowledged);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
readAcknowledged(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
writeAcknowledged(out);
}
}

View File

@ -24,6 +24,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
@ -81,10 +83,16 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio
@Override
protected void masterOperation(final PutMappingRequest request, final ClusterState state, final ActionListener<PutMappingResponse> listener) throws ElasticSearchException {
metaDataMappingService.putMapping(new MetaDataMappingService.PutRequest(request.indices(), request.type(), request.source()).ignoreConflicts(request.ignoreConflicts()).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() {
PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest()
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.indices(request.indices()).type(request.type())
.source(request.source()).ignoreConflicts(request.ignoreConflicts());
metaDataMappingService.putMapping(updateRequest, new ClusterStateUpdateListener() {
@Override
public void onResponse(MetaDataMappingService.Response response) {
listener.onResponse(new PutMappingResponse(response.acknowledged()));
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new PutMappingResponse(response.isAcknowledged()));
}
@Override

View File

@ -73,7 +73,6 @@ public class ClusterModule extends AbstractModule implements SpawnModules {
bind(ShardStateAction.class).asEagerSingleton();
bind(NodeIndexCreatedAction.class).asEagerSingleton();
bind(NodeIndexDeletedAction.class).asEagerSingleton();
bind(NodeMappingCreatedAction.class).asEagerSingleton();
bind(NodeMappingRefreshAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton();

View File

@ -28,6 +28,8 @@ import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.common.compress.CompressedString;
@ -78,9 +80,9 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
@Override
protected void masterOperation(final MappingUpdatedRequest request, final ClusterState state, final ActionListener<MappingUpdatedResponse> listener) throws ElasticSearchException {
metaDataMappingService.updateMapping(request.index(), request.indexUUID(), request.type(), request.mappingSource(), new MetaDataMappingService.Listener() {
metaDataMappingService.updateMapping(request.index(), request.indexUUID(), request.type(), request.mappingSource(), new ClusterStateUpdateListener() {
@Override
public void onResponse(MetaDataMappingService.Response response) {
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new MappingUpdatedResponse());
}

View File

@ -1,171 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.action.index;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
*
*/
public class NodeMappingCreatedAction extends AbstractComponent {
private final ThreadPool threadPool;
private final TransportService transportService;
private final List<Listener> listeners = new CopyOnWriteArrayList<Listener>();
@Inject
public NodeMappingCreatedAction(Settings settings, ThreadPool threadPool, TransportService transportService) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
transportService.registerHandler(NodeMappingCreatedTransportHandler.ACTION, new NodeMappingCreatedTransportHandler());
}
public void add(final Listener listener, TimeValue timeout) {
listeners.add(listener);
threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
boolean removed = listeners.remove(listener);
if (removed) {
listener.onTimeout();
}
}
});
}
public void remove(Listener listener) {
listeners.remove(listener);
}
public void nodeMappingCreated(final ClusterState state, final NodeMappingCreatedResponse response) throws ElasticSearchException {
logger.debug("Sending mapping created for index {}, type {} (cluster state version: {})", response.index, response.type, response.clusterStateVersion);
if (state.nodes().localNodeMaster()) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
innerNodeIndexCreated(response);
}
});
} else {
transportService.sendRequest(state.nodes().masterNode(),
NodeMappingCreatedTransportHandler.ACTION, response, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}
private void innerNodeIndexCreated(NodeMappingCreatedResponse response) {
for (Listener listener : listeners) {
listener.onNodeMappingCreated(response);
}
}
public static interface Listener {
void onNodeMappingCreated(NodeMappingCreatedResponse response);
void onTimeout();
}
private class NodeMappingCreatedTransportHandler extends BaseTransportRequestHandler<NodeMappingCreatedResponse> {
static final String ACTION = "cluster/nodeMappingCreated";
@Override
public NodeMappingCreatedResponse newInstance() {
return new NodeMappingCreatedResponse();
}
@Override
public void messageReceived(NodeMappingCreatedResponse response, TransportChannel channel) throws Exception {
innerNodeIndexCreated(response);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
public static class NodeMappingCreatedResponse extends TransportRequest {
private String index;
private String type;
private String nodeId;
private long clusterStateVersion;
private NodeMappingCreatedResponse() {
}
public NodeMappingCreatedResponse(String index, String type, String nodeId, long clusterStateVersion) {
this.index = index;
this.type = type;
this.nodeId = nodeId;
this.clusterStateVersion = clusterStateVersion;
}
public String index() {
return index;
}
public String type() {
return type;
}
public String nodeId() {
return nodeId;
}
public long clusterStateVersion() {
return clusterStateVersion;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeString(type);
out.writeString(nodeId);
out.writeVLong(clusterStateVersion);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
type = in.readString();
nodeId = in.readString();
clusterStateVersion = in.readVLong();
}
}
}

View File

@ -23,13 +23,14 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingClusterStateUpdateRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.*;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
@ -38,7 +39,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
@ -57,7 +57,7 @@ import static com.google.common.collect.Maps.newHashMap;
import static org.elasticsearch.index.mapper.DocumentMapper.MergeFlags.mergeFlags;
/**
*
* Service responsible for submitting mapping changes
*/
public class MetaDataMappingService extends AbstractComponent {
@ -65,16 +65,13 @@ public class MetaDataMappingService extends AbstractComponent {
private final IndicesService indicesService;
private final NodeMappingCreatedAction mappingCreatedAction;
private final BlockingQueue<MappingTask> refreshOrUpdateQueue = ConcurrentCollections.newBlockingQueue();
@Inject
public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeMappingCreatedAction mappingCreatedAction) {
public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService) {
super(settings);
this.clusterService = clusterService;
this.indicesService = indicesService;
this.mappingCreatedAction = mappingCreatedAction;
}
static class MappingTask {
@ -99,9 +96,9 @@ public class MetaDataMappingService extends AbstractComponent {
static class UpdateTask extends MappingTask {
final String type;
final CompressedString mappingSource;
final Listener listener;
final ClusterStateUpdateListener listener;
UpdateTask(String index, String indexUUID, String type, CompressedString mappingSource, Listener listener) {
UpdateTask(String index, String indexUUID, String type, CompressedString mappingSource, ClusterStateUpdateListener listener) {
super(index, indexUUID);
this.type = type;
this.mappingSource = mappingSource;
@ -247,7 +244,7 @@ public class MetaDataMappingService extends AbstractComponent {
}
for (Object task : tasks) {
if (task instanceof UpdateTask) {
((UpdateTask) task).listener.onResponse(new Response(true));
((UpdateTask) task).listener.onResponse(new ClusterStateUpdateResponse(true));
}
}
}
@ -277,7 +274,7 @@ public class MetaDataMappingService extends AbstractComponent {
});
}
public void updateMapping(final String index, final String indexUUID, final String type, final CompressedString mappingSource, final Listener listener) {
public void updateMapping(final String index, final String indexUUID, final String type, final CompressedString mappingSource, final ClusterStateUpdateListener listener) {
refreshOrUpdateQueue.add(new UpdateTask(index, indexUUID, type, mappingSource, listener));
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
@ -362,15 +359,33 @@ public class MetaDataMappingService extends AbstractComponent {
});
}
public void putMapping(final PutRequest request, final Listener listener) {
public void putMapping(final PutMappingClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) {
clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", Priority.HIGH, new TimeoutClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]", Priority.HIGH, new AckedClusterStateUpdateTask() {
CountDownListener countDownListener; // used to count ack responses before confirming operation is complete
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new ClusterStateUpdateResponse(true));
}
@Override
public void onAckTimeout() {
listener.onResponse(new ClusterStateUpdateResponse(false));
}
@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
@Override
public TimeValue timeout() {
return request.masterTimeout;
return request.masterNodeTimeout();
}
@Override
@ -382,17 +397,17 @@ public class MetaDataMappingService extends AbstractComponent {
public ClusterState execute(final ClusterState currentState) throws Exception {
List<String> indicesToClose = Lists.newArrayList();
try {
if (request.indices.length == 0) {
if (request.indices().length == 0) {
throw new IndexMissingException(new Index("_all"));
}
for (String index : request.indices) {
for (String index : request.indices()) {
if (!currentState.metaData().hasIndex(index)) {
throw new IndexMissingException(new Index(index));
}
}
// pre create indices here and add mappings to them so we can merge the mappings here if needed
for (String index : request.indices) {
for (String index : request.indices()) {
if (indicesService.hasIndex(index)) {
continue;
}
@ -404,29 +419,29 @@ public class MetaDataMappingService extends AbstractComponent {
indexService.mapperService().merge(MapperService.DEFAULT_MAPPING, indexMetaData.mappings().get(MapperService.DEFAULT_MAPPING).source(), false);
}
// only add the current relevant mapping (if exists)
if (indexMetaData.mappings().containsKey(request.mappingType)) {
indexService.mapperService().merge(request.mappingType, indexMetaData.mappings().get(request.mappingType).source(), false);
if (indexMetaData.mappings().containsKey(request.type())) {
indexService.mapperService().merge(request.type(), indexMetaData.mappings().get(request.type()).source(), false);
}
}
Map<String, DocumentMapper> newMappers = newHashMap();
Map<String, DocumentMapper> existingMappers = newHashMap();
for (String index : request.indices) {
for (String index : request.indices()) {
IndexService indexService = indicesService.indexService(index);
if (indexService != null) {
// try and parse it (no need to add it here) so we can bail early in case of parsing exception
DocumentMapper newMapper;
DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.mappingType);
if (MapperService.DEFAULT_MAPPING.equals(request.mappingType)) {
DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.type());
if (MapperService.DEFAULT_MAPPING.equals(request.type())) {
// _default_ types do not go through merging, but we do test the new settings. Also don't apply the old default
newMapper = indexService.mapperService().parse(request.mappingType, new CompressedString(request.mappingSource), false);
newMapper = indexService.mapperService().parse(request.type(), new CompressedString(request.source()), false);
} else {
newMapper = indexService.mapperService().parse(request.mappingType, new CompressedString(request.mappingSource));
newMapper = indexService.mapperService().parse(request.type(), new CompressedString(request.source()));
if (existingMapper != null) {
// first, simulate
DocumentMapper.MergeResult mergeResult = existingMapper.merge(newMapper, mergeFlags().simulate(true));
// if we have conflicts, and we are not supposed to ignore them, throw an exception
if (!request.ignoreConflicts && mergeResult.hasConflicts()) {
if (!request.ignoreConflicts() && mergeResult.hasConflicts()) {
throw new MergeMappingException(mergeResult.conflicts());
}
}
@ -442,7 +457,7 @@ public class MetaDataMappingService extends AbstractComponent {
}
}
String mappingType = request.mappingType;
String mappingType = request.type();
if (mappingType == null) {
mappingType = newMappers.values().iterator().next().type();
} else if (!mappingType.equals(newMappers.values().iterator().next().type())) {
@ -489,12 +504,11 @@ public class MetaDataMappingService extends AbstractComponent {
if (mappings.isEmpty()) {
// no changes, return
listener.onResponse(new Response(true));
return currentState;
}
MetaData.Builder builder = MetaData.builder(currentState.metaData());
for (String indexName : request.indices) {
for (String indexName : request.indices()) {
IndexMetaData indexMetaData = currentState.metaData().index(indexName);
if (indexMetaData == null) {
throw new IndexMissingException(new Index(indexName));
@ -505,26 +519,7 @@ public class MetaDataMappingService extends AbstractComponent {
}
}
ClusterState updatedState = ClusterState.builder(currentState).metaData(builder).build();
int counter = 1; // we want to wait on the master node to apply it on its cluster state
// also wait for nodes that actually have the index created on them to apply the mappings internally
for (String index : request.indices) {
IndexRoutingTable indexRoutingTable = updatedState.routingTable().index(index);
if (indexRoutingTable != null) {
counter += indexRoutingTable.numberOfNodesShardsAreAllocatedOn(updatedState.nodes().masterNodeId());
}
}
logger.debug("Expecting {} mapping created responses for other nodes", counter - 1);
// TODO: adding one to the version is based on knowledge on how the parent class will increment the version
// move this to the base class or add another callback before publishing the new cluster state so we
// capture its version.
countDownListener = new CountDownListener(counter, currentState.version() + 1, listener);
mappingCreatedAction.add(countDownListener, request.timeout);
return updatedState;
return ClusterState.builder(currentState).metaData(builder).build();
} finally {
for (String index : indicesToClose) {
indicesService.removeIndex(index, "created for mapping processing");
@ -534,107 +529,8 @@ public class MetaDataMappingService extends AbstractComponent {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (countDownListener != null) {
// the master has applied it on its cluster state
countDownListener.decrementCounter();
}
}
});
}
public static interface Listener {
void onResponse(Response response);
void onFailure(Throwable t);
}
public static class PutRequest {
final String[] indices;
final String mappingType;
final String mappingSource;
boolean ignoreConflicts = false;
TimeValue timeout = TimeValue.timeValueSeconds(10);
TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT;
public PutRequest(String[] indices, String mappingType, String mappingSource) {
this.indices = indices;
this.mappingType = mappingType;
this.mappingSource = mappingSource;
}
public PutRequest ignoreConflicts(boolean ignoreConflicts) {
this.ignoreConflicts = ignoreConflicts;
return this;
}
public PutRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
public PutRequest masterTimeout(TimeValue masterTimeout) {
this.masterTimeout = masterTimeout;
return this;
}
}
public static class Response {
private final boolean acknowledged;
public Response(boolean acknowledged) {
this.acknowledged = acknowledged;
}
public boolean acknowledged() {
return acknowledged;
}
}
private class CountDownListener implements NodeMappingCreatedAction.Listener {
private final CountDown countDown;
private final Listener listener;
private final long minClusterStateVersion;
/**
* @param countDown initial counter value
* @param minClusterStateVersion the minimum cluster state version for which accept responses
* @param listener listener to call when counter reaches 0.
*/
public CountDownListener(int countDown, long minClusterStateVersion, Listener listener) {
this.countDown = new CountDown(countDown);
this.listener = listener;
this.minClusterStateVersion = minClusterStateVersion;
}
@Override
public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) {
if (response.clusterStateVersion() < minClusterStateVersion) {
return;
}
decrementCounter();
}
public void decrementCounter() {
if (countDown.countDown()) {
mappingCreatedAction.remove(this);
listener.onResponse(new Response(true));
}
}
@Override
public void onTimeout() {
if (countDown.fastForward()) {
mappingCreatedAction.remove(this);
listener.onResponse(new Response(false));
}
}
}
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.AliasMetaData;
@ -89,7 +88,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final ShardStateAction shardStateAction;
private final NodeIndexCreatedAction nodeIndexCreatedAction;
private final NodeIndexDeletedAction nodeIndexDeletedAction;
private final NodeMappingCreatedAction nodeMappingCreatedAction;
private final NodeMappingRefreshAction nodeMappingRefreshAction;
// a map of mappings type we have seen per index due to cluster state
@ -119,7 +117,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
ThreadPool threadPool, RecoveryTarget recoveryTarget,
ShardStateAction shardStateAction,
NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction,
NodeMappingCreatedAction nodeMappingCreatedAction, NodeMappingRefreshAction nodeMappingRefreshAction) {
NodeMappingRefreshAction nodeMappingRefreshAction) {
super(settings);
this.indicesService = indicesService;
this.clusterService = clusterService;
@ -128,7 +126,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
this.shardStateAction = shardStateAction;
this.nodeIndexCreatedAction = nodeIndexCreatedAction;
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
this.nodeMappingCreatedAction = nodeMappingCreatedAction;
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
}
@ -357,7 +354,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
MapperService mapperService = indexService.mapperService();
// first, go over and update the _default_ mapping (if exists)
if (indexMetaData.mappings().containsKey(MapperService.DEFAULT_MAPPING)) {
processMapping(event, index, mapperService, MapperService.DEFAULT_MAPPING, indexMetaData.mapping(MapperService.DEFAULT_MAPPING).source());
processMapping(index, mapperService, MapperService.DEFAULT_MAPPING, indexMetaData.mapping(MapperService.DEFAULT_MAPPING).source());
}
// go over and add the relevant mappings (or update them)
@ -368,7 +365,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (mappingType.equals(MapperService.DEFAULT_MAPPING)) { // we processed _default_ first
continue;
}
boolean requireRefresh = processMapping(event, index, mapperService, mappingType, mappingSource);
boolean requireRefresh = processMapping(index, mapperService, mappingType, mappingSource);
if (requireRefresh) {
if (typesToRefresh == null) {
typesToRefresh = Lists.newArrayList();
@ -392,7 +389,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
private boolean processMapping(ClusterChangedEvent event, String index, MapperService mapperService, String mappingType, CompressedString mappingSource) {
private boolean processMapping(String index, MapperService mapperService, String mappingType, CompressedString mappingSource) {
if (!seenMappings.containsKey(new Tuple<String, String>(index, mappingType))) {
seenMappings.put(new Tuple<String, String>(index, mappingType), true);
}
@ -410,8 +407,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
logger.debug("[{}] parsed mapping [{}], and got different sources\noriginal:\n{}\nparsed:\n{}", index, mappingType, mappingSource, mapperService.documentMapper(mappingType).mappingSource());
requiresRefresh = true;
}
nodeMappingCreatedAction.nodeMappingCreated(event.state(),
new NodeMappingCreatedAction.NodeMappingCreatedResponse(index, mappingType, event.state().nodes().localNodeId(), event.state().version()));
} else {
DocumentMapper existingMapper = mapperService.documentMapper(mappingType);
if (!mappingSource.equals(existingMapper.mappingSource())) {
@ -426,8 +421,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// this might happen when upgrading from 0.15 to 0.16
logger.debug("[{}] parsed mapping [{}], and got different sources\noriginal:\n{}\nparsed:\n{}", index, mappingType, mappingSource, mapperService.documentMapper(mappingType).mappingSource());
}
nodeMappingCreatedAction.nodeMappingCreated(event.state(),
new NodeMappingCreatedAction.NodeMappingCreatedResponse(index, mappingType, event.state().nodes().localNodeId(), event.state().version()));
}
}
} catch (Throwable e) {

View File

@ -19,24 +19,17 @@
package org.elasticsearch.rest.action.admin.indices.mapping.put;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import java.io.IOException;
import static org.elasticsearch.client.Requests.putMappingRequest;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestRequest.Method.PUT;
import static org.elasticsearch.rest.RestStatus.OK;
/**
*
@ -59,32 +52,9 @@ public class RestPutMappingAction extends BaseRestHandler {
putMappingRequest.listenerThreaded(false);
putMappingRequest.type(request.param("type"));
putMappingRequest.source(request.content().toUtf8());
putMappingRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
putMappingRequest.timeout(request.paramAsTime("timeout", putMappingRequest.timeout()));
putMappingRequest.ignoreConflicts(request.paramAsBoolean("ignore_conflicts", putMappingRequest.ignoreConflicts()));
putMappingRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putMappingRequest.masterNodeTimeout()));
client.admin().indices().putMapping(putMappingRequest, new ActionListener<PutMappingResponse>() {
@Override
public void onResponse(PutMappingResponse response) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject()
.field("ok", true)
.field("acknowledged", response.isAcknowledged());
builder.endObject();
channel.sendResponse(new XContentRestResponse(request, OK, builder));
} catch (IOException e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
client.admin().indices().putMapping(putMappingRequest, new AcknowledgedRestResponseActionListener<PutMappingResponse>(request, channel, logger));
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse;
@ -451,6 +452,27 @@ public class AckTests extends ElasticsearchIntegrationTest {
assertThat(openIndexResponse.isAcknowledged(), equalTo(false));
}
@Test
public void testPutMappingAcknowledgement() {
createIndex("test");
ensureGreen();
assertAcked(client().admin().indices().preparePutMapping("test").setType("test").setSource("field", "type=string,index=not_analyzed"));
for (Client client : clients()) {
assertThat(getLocalClusterState(client).metaData().indices().get("test").mapping("test"), notNullValue());
}
}
@Test
public void testPutMappingNoAcknowledgement() {
createIndex("test");
ensureGreen();
PutMappingResponse putMappingResponse = client().admin().indices().preparePutMapping("test").setType("test").setSource("field", "type=string,index=not_analyzed").setTimeout("0s").get();
assertThat(putMappingResponse.isAcknowledged(), equalTo(false));
}
private static ClusterState getLocalClusterState(Client client) {
return client.admin().cluster().prepareState().setLocal(true).get().getState();
}

View File

@ -75,15 +75,6 @@ public class ElasticsearchAssertions {
assertVersionSerializable(response);
}
public static void assertAcked(PutMappingRequestBuilder builder) {
assertAcked(builder.get());
}
private static void assertAcked(PutMappingResponse response) {
assertThat("Put Mapping failed - not acked", response.isAcknowledged(), equalTo(true));
assertVersionSerializable(response);
}
public static void assertAcked(DeleteIndexRequestBuilder builder) {
assertAcked(builder.get());
}